布景
MOSN在热晋级上面,也曾做过自己的探索;业界虽然有Nginx Envoy 也都完结了热晋级办法,那么这儿有什么异同呢?
Nginx: 经过Fork的办法直接继承父进程的监听信息和链接信息等,仅仅只用于重启
Envoy: Envoy对端口监听(Listener)进行了搬迁,可是对树立的衔接(connection)则经过指令的办法,进行自动断开重连
MOSN: 鉴于低版本Bolt 等协议,不支持自动断连,且是长衔接,导致MOSN在进行热晋级的时分,不仅仅进行了端口监听的搬迁,还有connection的搬迁,确保了热晋级过程中链接不中断,客户端服务端无感的晋级体会
晋级流程
触发状况
这儿赘述一下,MOSN的热晋级办法,是经过一个Operator完结的
- Operator在Pod中添加新的MOSN容器
- New Mosn发动时分,观测到有Old Mosn存在,敞开热晋级的逻辑
- New Mosn发动成功后,Old Mosn进行退出
- Operator销毁Old Mosn的Container
至此,一个完好的流程热晋级流程结束
整体规划
MOSN热晋级中中心的数据
- 装备数据(这样New Mosn才知道代理的是什么应用,有哪些装备信息等等)
- 监听端口
- 衔接
MOSN热晋级的交互流程规划如上,接下来对各个模块搬迁逻辑进行一下盯梢
源码解析
代码逻辑仅保留中心流程
socket监听状况
func (stm *StageManager) Run() {
// 1: parser params
stm.runParamsParsedStage()
// 2: init
stm.runInitStage()
// 3: pre start
stm.runPreStartStage()
// 4: run
stm.runStartStage()
// 5: after start
stm.runAfterStartStage()
stm.SetState(Running)
}
MOSN发动分为上述几个流程,其中 热晋级逻辑首要散布在 InitStage
和 StartStage
InitStage: 搬迁装备信息 和 搬迁Listener
StartStage: 创立 reconfig.sock 的监听 和 搬迁 connection
在MOSN中有4个socket监听
reconfig.sock: 由 old mosn 监听,用于 new mosn 感知 old mosn存在
listen.sock: 由new mosn 监听,用于old mosn传递 listener数据给new mosn
conn.sock: 由 new mosn监听,用于 old mosn 传递 connection 的 fd和connection读取到的数据 (没有则不传)给 new mosn
mosnconfig.sock: 由 new mosn 监听,用于 old mosn 传递装备信息给 new mosn
Old Mosn 搬迁主流程
func ReconfigureHandler() error {
// dump lastest config, and stop DumpConfigHandler()
configmanager.DumpLock()
configmanager.DumpConfig()
// if reconfigure failed, enable DumpConfigHandler()
defer configmanager.DumpUnlock()
// transfer listen fd
var listenSockConn net.Conn
var err error
var n int
var buf [1]byte
if listenSockConn, err = sendInheritListeners(); err != nil {
return err
}
if enableInheritOldMosnconfig {
if err = SendInheritConfig(); err != nil {
listenSockConn.Close()
log.DefaultLogger.Alertf(types.ErrorKeyReconfigure, "[old mosn] [SendInheritConfig] new mosn start failed")
return err
}
}
// Wait new mosn parse configuration
listenSockConn.SetReadDeadline(time.Now().Add(10 * time.Minute))
n, err = listenSockConn.Read(buf[:])
if n != 1 {
log.DefaultLogger.Alertf(types.ErrorKeyReconfigure, "[old mosn] [read ack] new mosn start failed")
return err
}
// ack new mosn
if _, err := listenSockConn.Write([]byte{0}); err != nil {
log.DefaultLogger.Alertf(types.ErrorKeyReconfigure, "[old mosn] [write ack] new mosn start failed")
return err
}
// stop other services
store.StopService()
// Wait for new mosn start
time.Sleep(3 * time.Second)
// Stop accepting new connections & graceful close the existing connections if they supports graceful close.
shutdownServers()
// Wait for all connections to be finished
WaitConnectionsDone(GracefulTimeout)
log.DefaultLogger.Infof("[server] [reconfigure] process %d gracefully shutdown", os.Getpid())
// will stop the current old mosn in stage manager
return nil
}
- old mosn 首要将 listener 搬迁到new mosn
- 然后 搬迁 装备信息,这一步是可选的
- 等候 new mosn ack完结后
- 调用 WaitConnectionsDone 将 connection close掉,搬迁 connection
装备搬迁
装备搬迁是用过 mosnconfig.sock 来完结的,new mosn 监听,old mosn 衔接上去并传输数据
new mosn监听是在 GetInheritConfig
这个办法中完结的
func GetInheritConfig() (*v2.MOSNConfig, error) {
......
l, err := net.Listen("unix", types.TransferMosnconfigDomainSocket)
......
defer l.Close()
ul := l.(*net.UnixListener)
ul.SetDeadline(time.Now().Add(time.Second * 10))
uc, err := ul.AcceptUnix()
......
defer uc.Close()
log.StartLogger.Infof("[server] Get GetInheritConfig Accept")
configData := make([]byte, 0)
buf := make([]byte, 1024)
for {
n, err := uc.Read(buf)
configData = append(configData, buf[:n]...)
.......
}
// log.StartLogger.Infof("[server] inherit mosn config data: %v", string(configData))
oldConfig := &v2.MOSNConfig{}
err = json.Unmarshal(configData, oldConfig)
if err != nil {
return nil, err
}
return oldConfig, nil
}
New mosn 树立好socket监听后,卡在 Accept 函数中,等候 old mosn 树立链接
然后 接纳old mosn 传递过来的数据即可
old mosn是在 SendInheritConfig
中与 new mosn树立链接并传递装备信息的
func SendInheritConfig() error {
var unixConn net.Conn
var err error
// retry 10 time
for i := 0; i < 10; i++ {
unixConn, err = net.DialTimeout("unix", types.TransferMosnconfigDomainSocket, 1*time.Second)
......
}
......
configData, err := configmanager.InheritMosnconfig()
......
uc := unixConn.(*net.UnixConn)
defer uc.Close()
n, err := uc.Write(configData)
......
return nil
}
至此,装备数据传递也就完结了
监听端口搬迁
listener搬迁是用过 listen.sock 来完结的,new mosn 监听,old mosn 衔接上去并传输数据,跟装备传输的逻辑差不多
new mosn监听是在 GetInheritListeners
这个办法中完结的,并获取一切的listener
func GetInheritListeners() ([]net.Listener, []net.PacketConn, net.Conn, error) {
l, err := net.Listen("unix", types.TransferListenDomainSocket)
......
defer l.Close()
ul := l.(*net.UnixListener)
ul.SetDeadline(time.Now().Add(time.Second * 10))
// 这儿卡主,等候 old mosn衔接
uc, err := ul.AcceptUnix()
buf := make([]byte, 1)
oob := make([]byte, 1024)
// 接纳 old mosn传递过来的数据
_, oobn, _, _, err := uc.ReadMsgUnix(buf, oob)
scms, err := unix.ParseSocketControlMessage(oob[0:oobn])
// 解析出来fd
gotFds, err := unix.ParseUnixRights(&scms[0])
var listeners []net.Listener
var packetConn []net.PacketConn
for i := 0; i < len(gotFds); i++ {
fd := uintptr(gotFds[i])
file := os.NewFile(fd, "")
.....
defer file.Close()
// 经过fd 康复 listener,本质是对fd的监听
fileListener, err := net.FileListener(file)
if err != nil {
pc, err := net.FilePacketConn(file)
if err == nil {
packetConn = append(packetConn, pc)
} else {
log.StartLogger.Errorf("[server] recover listener from fd %d failed: %s", fd, err)
return nil, nil, nil, err
}
} else {
// for tcp or unix listener
listeners = append(listeners, fileListener)
}
}
return listeners, packetConn, uc, nil
}
经过上述办法,new mosn树立 socket监听,并等候 old mosn的衔接,old mosn衔接上后,等候 old mosn 传递 一切listener 的fd,然后 new mosn 进行康复即可
可是 同一个 fd,有两个监听,不就乱了吗,所以 当 old mosn 传递过来fd后,会自动 stop accept,不再进行监听
new mosn 的listener 传递逻辑在sendInheritListeners
里边
func sendInheritListeners() (net.Conn, error) {
// 列出来一切的 listener,返回格式 os.File
lf := ListListenersFile()
......
lsf, lerr := admin.ListServiceListenersFile()
......
var files []*os.File
files = append(files, lf...)
files = append(files, lsf...)
......
fds := make([]int, len(files))
for i, f := range files {
// 获取 file 的 fd
fds[i] = int(f.Fd())
defer f.Close()
}
var unixConn net.Conn
var err error
// retry 10 time
for i := 0; i < 10; i++ {
unixConn, err = net.DialTimeout("unix", types.TransferListenDomainSocket, 1*time.Second)
.......
}
......
uc := unixConn.(*net.UnixConn)
buf := make([]byte, 1)
// 将 fd 转成 socket message
rights := syscall.UnixRights(fds...)
n, oobn, err := uc.WriteMsgUnix(buf, rights, nil)
......
return uc, nil
}
Old mosn 经过 ListListenersFile 将一切的listener罗列出来,这个首要得益于MOSN良好的规划模式;mosn维护了一个大局的servers,而server的结构如下
type server struct {
serverName string
stopChan chan struct{}
handler types.ConnectionHandler
}
type ConnectionHandler interface {
......
// ListListenersFD reports all listeners' fd
ListListenersFile(lctx context.Context) []*os.File
}
每个server对自己的listener描述明晰
持续回到 old mosn传递listener的过程
- old mosn 罗列出来一切的listener,并获取file
- old mosn 获取一切 file的fd,并将fd经过UnixRights转成 socket message,供传递
- new mosn 接纳到 socket message,转成fd,并经过文件树立listener
然后 old mosn 会封闭掉一切的listener,中止accept 新的链接
func shutdownServers() {
for _, server := range servers {
server.Shutdown()
}
}
func (ch *connHandler) GracefulStopListeners() error {
var failed bool
listeners := ch.listeners
wg := sync.WaitGroup{}
wg.Add(len(listeners))
for _, l := range listeners {
al := l
log.DefaultLogger.Infof("graceful shutdown listener %v", al.listener.Name())
// Shutdown listener in parallel
utils.GoWithRecover(func() {
defer wg.Done()
if err := al.listener.Shutdown(); err != nil {
log.DefaultLogger.Errorf("failed to shutdown listener %v: %v", al.listener.Name(), err)
failed = true
}
}, nil)
}
wg.Wait()
return nil
}
func (l *listener) Shutdown() error {
changed, err := l.stopAccept()
if changed {
l.cb.OnShutdown()
}
return err
}
至此,只要new mosn 能树立新的链接,old mosn不再树立新的链接了
listener传递完结后,新的链接都树立到 new mosn上去了,剩下的就是存量长链接了
长链接搬迁
长链接搬迁是用过 conn.sock 来完结的,同上,也是由new mosn 监听,old mosn 衔接上去并传输数据;不过,这儿并没有传递装备和listener那样简略,需求考虑许多边沿问题
在这儿,链接分为两部分
- client/server -> mosn的链接
- mosn -> client/server 的链接
咱们只需求考虑 client/server -> mosn的链接
的状况即可,mosn -> client/server 的链接
这种状况,由于mosn是自动衔接方,断开并不会对下游造成任何影响
先来看下 长链接搬迁的流程
- Client 发送恳求到 MOSN
- MOSN 经过 domain socket(conn.sock) 把 TCP1 的 FD 和衔接的状况数据发送给 New MOSN
- New MOSN 承受 FD 和恳求数据创立新的 Conection 结构,然后把 Connection id 传给 MOSN,New MOSN 此刻就具有了TCP1 的一个复制。Old MOSN 中止读取 TCP1 的恳求,New MOSN 开始读取 TCP1的恳求,TCP1的搬迁就完结了
- New MOSN 经过 LB 选取一个新的 Server,树立 TCP3 衔接,转发恳求到 Server
- Server 回复响应到 New MOSN
- New MOSN 经过 MOSN 传递来的 TCP1 的复制,回复响应到 Client
接下来看下代码
注: 前面 WaitConnectionsDone
现已将 connection.stopChan close掉了
链接搬迁是在 startReadLoop
中完结的
func (c *connection) startReadLoop() {
var transferTime time.Time
for {
......
select {
case <-c.stopChan:
// 首要设置 transfer 时刻
if transferTime.IsZero() {
if c.transferCallbacks != nil && c.transferCallbacks() {
randTime := time.Duration(rand.Intn(int(TransferTimeout.Nanoseconds())))
transferTime = time.Now().Add(TransferTimeout).Add(randTime)
log.DefaultLogger.Infof("[network] [read loop] transferTime: Wait %d Second", (TransferTimeout+randTime)/1e9)
} else {
// set a long time, not transfer connection, wait mosn exit.
transferTime = time.Now().Add(10 * TransferTimeout)
log.DefaultLogger.Infof("[network] [read loop] not support transfer connection, Connection = %d, Local Address = %+v, Remote Address = %+v",
c.id, c.rawConnection.LocalAddr(), c.RemoteAddr())
}
} else {
if transferTime.Before(time.Now()) {
c.transfer()
return
}
}
default:
}
.......
}
- 在 old mosn 调用
WaitConnectionsDone
将 connection.stopChan close之后,在startReadLoop
循环中 首要设置以下 随机 transfer 时刻 - 在 transfer时刻到了之后,开始 搬迁 链接
func (c *connection) transfer() {
c.notifyTransfer()
id, _ := transferRead(c)
c.transferWrite(id)
}
func transferRead(c *connection) (uint64, error) {
......
unixConn, err := net.Dial("unix", types.TransferConnDomainSocket)
......
defer unixConn.Close()
file, tlsConn, err := transferGetFile(c)
......
uc := unixConn.(*net.UnixConn)
// send type and TCP FD
err = transferSendType(uc, file)
......
// send header + buffer + TLS
err = transferReadSendData(uc, tlsConn, c.readBuffer)
......
// recv ID
id := transferRecvID(uc)
log.DefaultLogger.Infof("[network] [transfer] [read] TransferRead NewConn Id = %d, oldId = %d, %p, addrass = %s", id, c.id, c, c.RemoteAddr().String())
return id, nil
}
- 每一个connection搬迁的时分,会首要构建一个 unix connection 用于 old mosn 和 new mosn交互
- 首要将 connection 的 fd 经过 scoket传递给new mosn
- 然后 将 connection tls 和 读取的buf数据再传递给 new mosn 处理
- 最终 记录下来 new mosn 依据 fd 创立的 新的connection的id
这儿mosn结构了一个简略的socket协议,用于 传递 connection 的tls和buf数据
/**
* transfer read protocol
* header (8 bytes) + (readBuffer data) + TLS
*
* 0 4 8
* +-----+-----+-----+-----+-----+-----+-----+-----+
* | data length | TLS length |
* +-----+-----+-----+-----+-----+-----+-----+-----+
* | data |
* +-----+-----+-----+-----+-----+-----+-----+-----+
* | TLS |
* +-----+-----+-----+-----+-----+-----+-----+-----+
*
接下来持续看下,new mosn 收到 connection后的处理, new mosn 处理是在 transferHandler
中完结的
func transferHandler(c net.Conn, handler types.ConnectionHandler, transferMap *sync.Map) {
......
uc, ok := c.(*net.UnixConn)
......
// recv type
conn, err := transferRecvType(uc)
......
if conn != nil {
// transfer read
// recv header + buffer
dataBuf, tlsBuf, err := transferReadRecvData(uc)
......
connection := transferNewConn(conn, dataBuf, tlsBuf, handler, transferMap)
if connection != nil {
transferSendID(uc, connection.id)
} else {
transferSendID(uc, transferErr)
}
}
......
}
new mosn 收到connection搬迁恳求后,依据传递过来的fd,首要转换成connection,然后 依据 tls 数据,构建为 tls conn,这些完结后,依据 conn的监听信息和handler类型,找到对应的listener,并将这个connection加进去,然后就可以开始处理 传递过来的buf数据了
最终,new mosn 将新的connection的id,传给old mosn,用于传递 写恳求
至此,old mosn connection的 readLoop 也退出了,不再读取新的数据,数据也都由new mosn来读取了
接下来就是写恳求了
old mosn 假如持续往衔接里边写数据,可能会和new mosn抵触,导致数据操作,所以 old mosn的写恳求,是直接转给 new mosn来处理的
Old mosn 搬迁写恳求:
func transferWrite(c *connection, id uint64) error {
......
unixConn, err := net.Dial("unix", types.TransferConnDomainSocket)
......
defer unixConn.Close()
uc := unixConn.(*net.UnixConn)
err = transferSendType(uc, nil)
......
// build net.Buffers to IoBuffer
buf := transferBuildIoBuffer(c)
// send header + buffer
err = transferWriteSendData(uc, int(id), buf)
......
return nil
}
Mosn 也为写恳求,构建了一个简略的socket协议
* transfer write protocol
* header (8 bytes) + (writeBuffer data)
*
* 0 4 8
* +-----+-----+-----+-----+-----+-----+-----+-----+
* | data length | connection ID |
* +-----+-----+-----+-----+-----+-----+-----+-----+
* | data |
* +-----+-----+-----+-----+-----+-----+-----+-----+
*
这儿会将,搬迁 读恳求时获取到的 connection id 记录下来,并传递给 new mosn,让 new mosn 依据 id 找到对应的链接
new mosn 接纳写恳求处理逻辑,也是在 transferHandler
中完结的:
func transferHandler(c net.Conn, handler types.ConnectionHandler, transferMap *sync.Map) {
......
// transfer write
// recv header + buffer
id, buf, err := transferWriteRecvData(uc)
......
connection := transferFindConnection(transferMap, uint64(id))
......
err = transferWriteBuffer(connection, buf)
......
}
New mosn 从 old mosn的socket恳求中,解析出来 connection id 和 buf 数据
依据 id 找到 new mosn中的 connection,然后将buf写入即完结
至此,读写恳求都搬迁完结,整个长衔接的搬迁也就完结了
总结
MOSN对热晋级的处理,是做到极致的,深度是远远高于市场上其他产品的;同时,深度也往往伴随着危险,做到链接搬迁层面,可能也并不是MOSN的本意,而是历史原因的驱动
在源码逻辑层面,个人也有一点简略的看法
- 整个热晋级模块,更像是函数驱动,而缺少规划,从 conn.sock listen.sock 等多个sock文件就可以看出,假如规划好点的话,完全可以经过构建socket协议,而规避掉多个socket文件,同时,逻辑也会更明晰简练一点,而非散落在各个办法里边
- 有些鸿沟性问题还是没有处理的很好;例如,listener搬迁的时分,假如有listener close掉了,可是装备还存在MOSN中,这儿就会panic了;eg: reconfig.sock 在处理最终会删除,假如走到了这一步,然后回滚了,后续也无法进行热晋级
最终,MOSN还是很优异的,respect!!!