• golang平滑重启库overseer实现原理


    overseer主要完成了三部分功能:

    1、连接的无损关闭,2、连接的平滑重启,3、文件变更的自动重启。

    下面依次讲一下:

    一、连接的无损关闭

    golang官方的net包是不支持连接的无损关闭的,当主监听协程退出时,并不会等待各个实际work协程的处理完成。

    以下是golang官方代码:

    Go/src/net/http/server.go

    1. func (srv *Server) Serve(l net.Listener) error {
    2. if fn := testHookServerServe; fn != nil {
    3. fn(srv, l) // call hook with unwrapped listener
    4. }
    5. origListener := l
    6. l = &onceCloseListener{Listener: l}
    7. defer l.Close()
    8. if err := srv.setupHTTP2_Serve(); err != nil {
    9. return err
    10. }
    11. if !srv.trackListener(&l, true) {
    12. return ErrServerClosed
    13. }
    14. defer srv.trackListener(&l, false)
    15. baseCtx := context.Background()
    16. if srv.BaseContext != nil {
    17. baseCtx = srv.BaseContext(origListener)
    18. if baseCtx == nil {
    19. panic("BaseContext returned a nil context")
    20. }
    21. }
    22. var tempDelay time.Duration // how long to sleep on accept failure
    23. ctx := context.WithValue(baseCtx, ServerContextKey, srv)
    24. for {
    25. rw, err := l.Accept()
    26. if err != nil {
    27. if srv.shuttingDown() {
    28. return ErrServerClosed
    29. }
    30. if ne, ok := err.(net.Error); ok && ne.Temporary() {
    31. if tempDelay == 0 {
    32. tempDelay = 5 * time.Millisecond
    33. } else {
    34. tempDelay *= 2
    35. }
    36. if max := 1 * time.Second; tempDelay > max {
    37. tempDelay = max
    38. }
    39. srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
    40. time.Sleep(tempDelay)
    41. continue
    42. }
    43. return err
    44. }
    45. connCtx := ctx
    46. if cc := srv.ConnContext; cc != nil {
    47. connCtx = cc(connCtx, rw)
    48. if connCtx == nil {
    49. panic("ConnContext returned nil")
    50. }
    51. }
    52. tempDelay = 0
    53. c := srv.newConn(rw)
    54. c.setState(c.rwc, StateNew, runHooks) // before Serve can return
    55. go c.serve(connCtx)
    56. }
    57. }

    当监听套接字关闭,l.Accept()退出循环时,并不会等待go c.serve(connCtx)协程的处理完成。

    overseer的处理方式是,包装了golang的监听套接字和连接套接字,通过sync.WaitGroup提供了对主协程异步等待work协程处理完成的支持。

    overseer代码如下:

    overseer-v1.1.6\graceful.go

    1. func (l *overseerListener) Accept() (net.Conn, error) {
    2. conn, err := l.Listener.(*net.TCPListener).AcceptTCP()
    3. if err != nil {
    4. return nil, err
    5. }
    6. conn.SetKeepAlive(true) // see http.tcpKeepAliveListener
    7. conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
    8. uconn := overseerConn{
    9. Conn: conn,
    10. wg: &l.wg,
    11. closed: make(chan bool),
    12. }
    13. go func() {
    14. //connection watcher
    15. select {
    16. case <-l.closeByForce:
    17. uconn.Close()
    18. case <-uconn.closed:
    19. //closed manually
    20. }
    21. }()
    22. l.wg.Add(1)
    23. return uconn, nil
    24. }
    25. //non-blocking trigger close
    26. func (l *overseerListener) release(timeout time.Duration) {
    27. //stop accepting connections - release fd
    28. l.closeError = l.Listener.Close()
    29. //start timer, close by force if deadline not met
    30. waited := make(chan bool)
    31. go func() {
    32. l.wg.Wait()
    33. waited <- true
    34. }()
    35. go func() {
    36. select {
    37. case <-time.After(timeout):
    38. close(l.closeByForce)
    39. case <-waited:
    40. //no need to force close
    41. }
    42. }()
    43. }
    44. //blocking wait for close
    45. func (l *overseerListener) Close() error {
    46. l.wg.Wait()
    47. return l.closeError
    48. }
    49. func (o overseerConn) Close() error {
    50. err := o.Conn.Close()
    51. if err == nil {
    52. o.wg.Done()
    53. o.closed <- true
    54. }
    55. return err
    56. }

    在(l *overseerListener) Accept函数中,每生成一个work连接,执行l.wg.Add(1),在(o overseerConn) Close函数中,每关闭一个work连接,执行o.wg.Done()。

    在异步关闭模式(l *overseerListener) release函数中和在同步关闭模式(l *overseerListener) Close函数中都会调用l.wg.Wait()以等待work协程的处理完成。

    监听套接字关闭流程:

    1、work进程收到重启信号,或者master进程收到重启信号然后转发到work进程。

    2、work进程的信号处理里包含对(l *overseerListener) release的调用。

    3、在(l *overseerListener) release里关闭监听套接字,并异步l.wg.Wait()。

    4、在官方包net/http/server.go的 (srv *Server) Serve里l.Accept()出错返回,退出监听循环,然后执行defer l.Close(),即(l *overseerListener) Close。

    5、在(l *overseerListener) Close里同步执行l.wg.Wait(),等待work连接处理完成。

    6、work连接处理完成时,会调用(o overseerConn) Close(),进而调用o.wg.Done()。

    7、所有work连接处理完成后,向master进程发送SIGUSR1信号。

    8、master进程收到SIGUSR1信号后,将true写入mp.descriptorsReleased管道。

    9、master进程的(mp *master) fork里,收到mp.descriptorsReleased后,结束本次fork,进入下一次fork。

    二、连接的平滑重启

    所谓平滑重启,就是重启不会造成客户端的断连,对客户端无感知,比如原有的排队连接不会被丢弃,所以监听套接字通过master进程在新旧work进程间传递,而不是新启的work进程重新创建监听连接。

    监听套接字由master进程创建:

    overseer-v1.1.6/proc_master.go

    1. func (mp *master) retreiveFileDescriptors() error {
    2. mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses))
    3. for i, addr := range mp.Config.Addresses {
    4. a, err := net.ResolveTCPAddr("tcp", addr)
    5. if err != nil {
    6. return fmt.Errorf("Invalid address %s (%s)", addr, err)
    7. }
    8. l, err := net.ListenTCP("tcp", a)
    9. if err != nil {
    10. return err
    11. }
    12. f, err := l.File()
    13. if err != nil {
    14. return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err)
    15. }
    16. if err := l.Close(); err != nil {
    17. return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err)
    18. }
    19. mp.slaveExtraFiles[i] = f
    20. }
    21. return nil
    22. }

    从mp.Config.Addresses中拿到地址,建立监听连接,最后把文件句柄存入mp.slaveExtraFiles。

    在这个过程中调用了(l *TCPListener) Close,但其实对work进程无影响,影响的只是master进程自己不能读写监听套接字。

    这里引用下对网络套接字close和shutdown的区别:

    close ---- 关闭本进程的socket id,但连接还是开着的,用这个socket id的其它进程还能用这个连接,能读或写这个socket id。
    shutdown ---- 则破坏了socket 连接,读的时候可能侦探到EOF结束符,写的时候可能会收到一个SIGPIPE信号,这个信号可能直到socket buffer被填充了才收到,shutdown还有一个关闭方式的参数,0 不能再读,1不能再写,2 读写都不能。

    将mp.slaveExtraFiles传递给子进程即work进程:

    overseer-v1.1.6/proc_master.go

    1. func (mp *master) fork() error {
    2. mp.debugf("starting %s", mp.binPath)
    3. cmd := exec.Command(mp.binPath)
    4. //mark this new process as the "active" slave process.
    5. //this process is assumed to be holding the socket files.
    6. mp.slaveCmd = cmd
    7. mp.slaveID++
    8. //provide the slave process with some state
    9. e := os.Environ()
    10. e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash))
    11. e = append(e, envBinPath+"="+mp.binPath)
    12. e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID))
    13. e = append(e, envIsSlave+"=1")
    14. e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles)))
    15. cmd.Env = e
    16. //inherit master args/stdfiles
    17. cmd.Args = os.Args
    18. cmd.Stdin = os.Stdin
    19. cmd.Stdout = os.Stdout
    20. cmd.Stderr = os.Stderr
    21. //include socket files
    22. cmd.ExtraFiles = mp.slaveExtraFiles
    23. if err := cmd.Start(); err != nil {
    24. return fmt.Errorf("Failed to start slave process: %s", err)
    25. }
    26. //was scheduled to restart, notify success
    27. if mp.restarting {
    28. mp.restartedAt = time.Now()
    29. mp.restarting = false
    30. mp.restarted <- true
    31. }
    32. //convert wait into channel
    33. cmdwait := make(chan error)
    34. go func() {
    35. cmdwait <- cmd.Wait()
    36. }()
    37. //wait....
    38. select {
    39. case err := <-cmdwait:
    40. //program exited before releasing descriptors
    41. //proxy exit code out to master
    42. code := 0
    43. if err != nil {
    44. code = 1
    45. if exiterr, ok := err.(*exec.ExitError); ok {
    46. if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
    47. code = status.ExitStatus()
    48. }
    49. }
    50. }
    51. mp.debugf("prog exited with %d", code)
    52. //if a restarts are disabled or if it was an
    53. //unexpected crash, proxy this exit straight
    54. //through to the main process
    55. if mp.NoRestart || !mp.restarting {
    56. os.Exit(code)
    57. }
    58. case <-mp.descriptorsReleased:
    59. //if descriptors are released, the program
    60. //has yielded control of its sockets and
    61. //a parallel instance of the program can be
    62. //started safely. it should serve state.Listeners
    63. //to ensure downtime is kept at <1sec. The previous
    64. //cmd.Wait() will still be consumed though the
    65. //result will be discarded.
    66. }
    67. return nil
    68. }

    通过cmd.ExtraFiles = mp.slaveExtraFiles语句向子进程传递套接字,这个参数最终传递给fork系统调用,传递的fd会被子进程继承。

    子进程即work进程处理继承的套接字:

    overseer-v1.1.6/proc_slave.go

    1. func (sp *slave) run() error {
    2. sp.id = os.Getenv(envSlaveID)
    3. sp.debugf("run")
    4. sp.state.Enabled = true
    5. sp.state.ID = os.Getenv(envBinID)
    6. sp.state.StartedAt = time.Now()
    7. sp.state.Address = sp.Config.Address
    8. sp.state.Addresses = sp.Config.Addresses
    9. sp.state.GracefulShutdown = make(chan bool, 1)
    10. sp.state.BinPath = os.Getenv(envBinPath)
    11. if err := sp.watchParent(); err != nil {
    12. return err
    13. }
    14. if err := sp.initFileDescriptors(); err != nil {
    15. return err
    16. }
    17. sp.watchSignal()
    18. //run program with state
    19. sp.debugf("start program")
    20. sp.Config.Program(sp.state)
    21. return nil
    22. }
    23. func (sp *slave) initFileDescriptors() error {
    24. //inspect file descriptors
    25. numFDs, err := strconv.Atoi(os.Getenv(envNumFDs))
    26. if err != nil {
    27. return fmt.Errorf("invalid %s integer", envNumFDs)
    28. }
    29. sp.listeners = make([]*overseerListener, numFDs)
    30. sp.state.Listeners = make([]net.Listener, numFDs)
    31. for i := 0; i < numFDs; i++ {
    32. f := os.NewFile(uintptr(3+i), "")
    33. l, err := net.FileListener(f)
    34. if err != nil {
    35. return fmt.Errorf("failed to inherit file descriptor: %d", i)
    36. }
    37. u := newOverseerListener(l)
    38. sp.listeners[i] = u
    39. sp.state.Listeners[i] = u
    40. }
    41. if len(sp.state.Listeners) > 0 {
    42. sp.state.Listener = sp.state.Listeners[0]
    43. }
    44. return nil
    45. }

    子进程只是重新包装套接字,并没有新建监听连接,包装成u := newOverseerListener(l)类型,这些监听套接字最后传递给sp.Config.Program(sp.state),即用户的启动程序:

    overseer-v1.1.6/example/main.go

    1. // convert your 'main()' into a 'prog(state)'
    2. // 'prog()' is run in a child process
    3. func prog(state overseer.State) {
    4. fmt.Printf("app#%s (%s) listening...\n", BuildID, state.ID)
    5. http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    6. d, _ := time.ParseDuration(r.URL.Query().Get("d"))
    7. time.Sleep(d)
    8. fmt.Fprintf(w, "app#%s (%s) %v says hello\n", BuildID, state.ID, state.StartedAt)
    9. }))
    10. http.Serve(state.Listener, nil)
    11. fmt.Printf("app#%s (%s) exiting...\n", BuildID, state.ID)
    12. }
    13. // then create another 'main' which runs the upgrades
    14. // 'main()' is run in the initial process
    15. func main() {
    16. overseer.Run(overseer.Config{
    17. Program: prog,
    18. Address: ":5001",
    19. Fetcher: &fetcher.File{Path: "my_app_next"},
    20. Debug: true, //display log of overseer actions
    21. TerminateTimeout: 10 * time.Minute,
    22. })
    23. }

    在用户程序中http.Serve(state.Listener, nil)调用:

    1、使用的accept方式是包装后的(l *overseerListener) Accept()。

    2、defer l.Close()使用也是包装后的(l *overseerListener) Close()。

    3、由(l *overseerListener) Accept()创建的work连接也都包装成了overseerConn连接,在关闭时会调用(o overseerConn) Close()

    三、文件变更的自动重启

    能够自动监视文件变化,有变更时自动触发重启流程。

    在master进程启动时检查配置,如果设置了mp.Config.Fetcher则进入fetchLoop:

    overseer-v1.1.6/proc_master.go

    1. // fetchLoop is run in a goroutine
    2. func (mp *master) fetchLoop() {
    3. min := mp.Config.MinFetchInterval
    4. time.Sleep(min)
    5. for {
    6. t0 := time.Now()
    7. mp.fetch()
    8. //duration fetch of fetch
    9. diff := time.Now().Sub(t0)
    10. if diff < min {
    11. delay := min - diff
    12. //ensures at least MinFetchInterval delay.
    13. //should be throttled by the fetcher!
    14. time.Sleep(delay)
    15. }
    16. }
    17. }

    mp.Config.MinFetchInterval默认是1秒,也就是每秒检查一次变更。time.Duration类型,可以设置更小的粒度。

    已经支持的fetcher包括:fetcher_file.go、fetcher_github.go、fetcher_http.go、fetcher_s3.go。

    以fetcher_file.go为例说明。

    1、文件变更的判断:

    overseer-v1.1.6/proc_master.go

    1. //tee off to sha1
    2. hash := sha1.New()
    3. reader = io.TeeReader(reader, hash)
    4. //write to a temp file
    5. _, err = io.Copy(tmpBin, reader)
    6. if err != nil {
    7. mp.warnf("failed to write temp binary: %s", err)
    8. return
    9. }
    10. //compare hash
    11. newHash := hash.Sum(nil)
    12. if bytes.Equal(mp.binHash, newHash) {
    13. mp.debugf("hash match - skip")
    14. return
    15. }

    通过sha1算法实现,比较新旧hash值,并没有关注文件时间戳。

    2、验证是可执行文件,且是支持overseer的:

    overseer-v1.1.6/proc_master.go

    1. tokenIn := token()
    2. cmd := exec.Command(tmpBinPath)
    3. cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...)
    4. cmd.Args = os.Args
    5. returned := false
    6. go func() {
    7. time.Sleep(5 * time.Second)
    8. if !returned {
    9. mp.warnf("sanity check against fetched executable timed-out, check overseer is running")
    10. if cmd.Process != nil {
    11. cmd.Process.Kill()
    12. }
    13. }
    14. }()
    15. tokenOut, err := cmd.CombinedOutput()
    16. returned = true
    17. if err != nil {
    18. mp.warnf("failed to run temp binary: %s (%s) output \"%s\"", err, tmpBinPath, tokenOut)
    19. return
    20. }
    21. if tokenIn != string(tokenOut) {
    22. mp.warnf("sanity check failed")
    23. return
    24. }

    这是通过overseer预埋的代码实现的:

    overseer-v1.1.6/overseer.go

    1. //sanityCheck returns true if a check was performed
    2. func sanityCheck() bool {
    3. //sanity check
    4. if token := os.Getenv(envBinCheck); token != "" {
    5. fmt.Fprint(os.Stdout, token)
    6. return true
    7. }
    8. //legacy sanity check using old env var
    9. if token := os.Getenv(envBinCheckLegacy); token != "" {
    10. fmt.Fprint(os.Stdout, token)
    11. return true
    12. }
    13. return false
    14. }

    这段代码在main启动时在overseer.Run里会调用到,传递固定的环境变量,然后命令行输出会原样显示出来即为成功。

    3、覆盖旧文件,并触发重启。

    overseer-v1.1.6/proc_master.go

    1. //overwrite!
    2. if err := overwrite(mp.binPath, tmpBinPath); err != nil {
    3. mp.warnf("failed to overwrite binary: %s", err)
    4. return
    5. }
    6. mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12])
    7. mp.binHash = newHash
    8. //binary successfully replaced
    9. if !mp.Config.NoRestartAfterFetch {
    10. mp.triggerRestart()
    11. }

    由(mp *master) triggerRestart进入重启流程:

    overseer-v1.1.6/proc_master.go

    1. func (mp *master) triggerRestart() {
    2. if mp.restarting {
    3. mp.debugf("already graceful restarting")
    4. return //skip
    5. } else if mp.slaveCmd == nil || mp.restarting {
    6. mp.debugf("no slave process")
    7. return //skip
    8. }
    9. mp.debugf("graceful restart triggered")
    10. mp.restarting = true
    11. mp.awaitingUSR1 = true
    12. mp.signalledAt = time.Now()
    13. mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate
    14. select {
    15. case <-mp.restarted:
    16. //success
    17. mp.debugf("restart success")
    18. case <-time.After(mp.TerminateTimeout):
    19. //times up mr. process, we did ask nicely!
    20. mp.debugf("graceful timeout, forcing exit")
    21. mp.sendSignal(os.Kill)
    22. }
    23. }

    向子进程发送mp.Config.RestartSignal信号,子进程收到信号后,关闭监听套接字然后向父进程发送SIGUSR1信号:

    overseer-v1.1.6/proc_slave.go

    1. if len(sp.listeners) > 0 {
    2. //perform graceful shutdown
    3. for _, l := range sp.listeners {
    4. l.release(sp.Config.TerminateTimeout)
    5. }
    6. //signal release of held sockets, allows master to start
    7. //a new process before this child has actually exited.
    8. //early restarts not supported with restarts disabled.
    9. if !sp.NoRestart {
    10. sp.masterProc.Signal(SIGUSR1)
    11. }
    12. //listeners should be waiting on connections to close...
    13. }

    父进程收到SIGUSR1信号后,通知mp.descriptorsReleased管道监听套接字已经关闭:

    overseer-v1.1.6/proc_master.go

    1. //**during a restart** a SIGUSR1 signals
    2. //to the master process that, the file
    3. //descriptors have been released
    4. if mp.awaitingUSR1 && s == SIGUSR1 {
    5. mp.debugf("signaled, sockets ready")
    6. mp.awaitingUSR1 = false
    7. mp.descriptorsReleased <- true
    8. } else

    最终回到(mp *master) fork函数,fork函数一直在等待mp.descriptorsReleased通知或者cmd.Wait子进程退出,收到管道通知后fork退出,进入下一轮fork循环。

    overseer-v1.1.6/proc_master.go

    1. func (mp *master) fork() error {
    2. //... ...
    3. //... ...
    4. //... ...
    5. //convert wait into channel
    6. cmdwait := make(chan error)
    7. go func() {
    8. cmdwait <- cmd.Wait()
    9. }()
    10. //wait....
    11. select {
    12. case err := <-cmdwait:
    13. //program exited before releasing descriptors
    14. //proxy exit code out to master
    15. code := 0
    16. if err != nil {
    17. code = 1
    18. if exiterr, ok := err.(*exec.ExitError); ok {
    19. if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
    20. code = status.ExitStatus()
    21. }
    22. }
    23. }
    24. mp.debugf("prog exited with %d", code)
    25. //if a restarts are disabled or if it was an
    26. //unexpected crash, proxy this exit straight
    27. //through to the main process
    28. if mp.NoRestart || !mp.restarting {
    29. os.Exit(code)
    30. }
    31. case <-mp.descriptorsReleased:
    32. //if descriptors are released, the program
    33. //has yielded control of its sockets and
    34. //a parallel instance of the program can be
    35. //started safely. it should serve state.Listeners
    36. //to ensure downtime is kept at <1sec. The previous
    37. //cmd.Wait() will still be consumed though the
    38. //result will be discarded.
    39. }
    40. return nil
    41. }

    --end--

     

     

  • 相关阅读:
    详述对 Bean 的作用域及生命周期的理解
    【笔试刷题训练】day_15
    MES在注塑制造领域的运用
    StringBuffer与StringBuilder[37]
    懒人必备爬虫神器—playwright
    【设计模式】JAVA Design Patterns——Arrange/Act/Assert(安排/执行/断言模式)
    CSS学习(2)-盒子模型
    软考下午第5题——面向对象程序设计——代码填空(老程序员必得15分)
    Java_代码块/单例设计模式(饿汉式 / 懒汉式)
    三台centos7部署redis6.2版本集群
  • 原文地址:https://blog.csdn.net/flynetcn/article/details/134084549