overseer主要完成了三部分功能:
1、连接的无损关闭,2、连接的平滑重启,3、文件变更的自动重启。
下面依次讲一下:
golang官方的net包是不支持连接的无损关闭的,当主监听协程退出时,并不会等待各个实际work协程的处理完成。
以下是golang官方代码:
Go/src/net/http/server.go
- func (srv *Server) Serve(l net.Listener) error {
- if fn := testHookServerServe; fn != nil {
- fn(srv, l) // call hook with unwrapped listener
- }
-
- origListener := l
- l = &onceCloseListener{Listener: l}
- defer l.Close()
-
- if err := srv.setupHTTP2_Serve(); err != nil {
- return err
- }
-
- if !srv.trackListener(&l, true) {
- return ErrServerClosed
- }
- defer srv.trackListener(&l, false)
-
- baseCtx := context.Background()
- if srv.BaseContext != nil {
- baseCtx = srv.BaseContext(origListener)
- if baseCtx == nil {
- panic("BaseContext returned a nil context")
- }
- }
-
- var tempDelay time.Duration // how long to sleep on accept failure
-
- ctx := context.WithValue(baseCtx, ServerContextKey, srv)
- for {
- rw, err := l.Accept()
- if err != nil {
- if srv.shuttingDown() {
- return ErrServerClosed
- }
- if ne, ok := err.(net.Error); ok && ne.Temporary() {
- if tempDelay == 0 {
- tempDelay = 5 * time.Millisecond
- } else {
- tempDelay *= 2
- }
- if max := 1 * time.Second; tempDelay > max {
- tempDelay = max
- }
- srv.logf("http: Accept error: %v; retrying in %v", err, tempDelay)
- time.Sleep(tempDelay)
- continue
- }
- return err
- }
- connCtx := ctx
- if cc := srv.ConnContext; cc != nil {
- connCtx = cc(connCtx, rw)
- if connCtx == nil {
- panic("ConnContext returned nil")
- }
- }
- tempDelay = 0
- c := srv.newConn(rw)
- c.setState(c.rwc, StateNew, runHooks) // before Serve can return
- go c.serve(connCtx)
- }
- }
当监听套接字关闭,l.Accept()退出循环时,并不会等待go c.serve(connCtx)协程的处理完成。
overseer的处理方式是,包装了golang的监听套接字和连接套接字,通过sync.WaitGroup提供了对主协程异步等待work协程处理完成的支持。
overseer代码如下:
overseer-v1.1.6\graceful.go
- func (l *overseerListener) Accept() (net.Conn, error) {
- conn, err := l.Listener.(*net.TCPListener).AcceptTCP()
- if err != nil {
- return nil, err
- }
- conn.SetKeepAlive(true) // see http.tcpKeepAliveListener
- conn.SetKeepAlivePeriod(3 * time.Minute) // see http.tcpKeepAliveListener
- uconn := overseerConn{
- Conn: conn,
- wg: &l.wg,
- closed: make(chan bool),
- }
- go func() {
- //connection watcher
- select {
- case <-l.closeByForce:
- uconn.Close()
- case <-uconn.closed:
- //closed manually
- }
- }()
- l.wg.Add(1)
- return uconn, nil
- }
-
- //non-blocking trigger close
- func (l *overseerListener) release(timeout time.Duration) {
- //stop accepting connections - release fd
- l.closeError = l.Listener.Close()
- //start timer, close by force if deadline not met
- waited := make(chan bool)
- go func() {
- l.wg.Wait()
- waited <- true
- }()
- go func() {
- select {
- case <-time.After(timeout):
- close(l.closeByForce)
- case <-waited:
- //no need to force close
- }
- }()
- }
-
- //blocking wait for close
- func (l *overseerListener) Close() error {
- l.wg.Wait()
- return l.closeError
- }
-
- func (o overseerConn) Close() error {
- err := o.Conn.Close()
- if err == nil {
- o.wg.Done()
- o.closed <- true
- }
- return err
- }
在(l *overseerListener) Accept函数中,每生成一个work连接,执行l.wg.Add(1),在(o overseerConn) Close函数中,每关闭一个work连接,执行o.wg.Done()。
在异步关闭模式(l *overseerListener) release函数中和在同步关闭模式(l *overseerListener) Close函数中都会调用l.wg.Wait()以等待work协程的处理完成。
监听套接字关闭流程:
1、work进程收到重启信号,或者master进程收到重启信号然后转发到work进程。
2、work进程的信号处理里包含对(l *overseerListener) release的调用。
3、在(l *overseerListener) release里关闭监听套接字,并异步l.wg.Wait()。
4、在官方包net/http/server.go的 (srv *Server) Serve里l.Accept()出错返回,退出监听循环,然后执行defer l.Close(),即(l *overseerListener) Close。
5、在(l *overseerListener) Close里同步执行l.wg.Wait(),等待work连接处理完成。
6、work连接处理完成时,会调用(o overseerConn) Close(),进而调用o.wg.Done()。
7、所有work连接处理完成后,向master进程发送SIGUSR1信号。
8、master进程收到SIGUSR1信号后,将true写入mp.descriptorsReleased管道。
9、master进程的(mp *master) fork里,收到mp.descriptorsReleased后,结束本次fork,进入下一次fork。
所谓平滑重启,就是重启不会造成客户端的断连,对客户端无感知,比如原有的排队连接不会被丢弃,所以监听套接字通过master进程在新旧work进程间传递,而不是新启的work进程重新创建监听连接。
监听套接字由master进程创建:
overseer-v1.1.6/proc_master.go
- func (mp *master) retreiveFileDescriptors() error {
- mp.slaveExtraFiles = make([]*os.File, len(mp.Config.Addresses))
- for i, addr := range mp.Config.Addresses {
- a, err := net.ResolveTCPAddr("tcp", addr)
- if err != nil {
- return fmt.Errorf("Invalid address %s (%s)", addr, err)
- }
- l, err := net.ListenTCP("tcp", a)
- if err != nil {
- return err
- }
- f, err := l.File()
- if err != nil {
- return fmt.Errorf("Failed to retreive fd for: %s (%s)", addr, err)
- }
- if err := l.Close(); err != nil {
- return fmt.Errorf("Failed to close listener for: %s (%s)", addr, err)
- }
- mp.slaveExtraFiles[i] = f
- }
- return nil
- }
从mp.Config.Addresses中拿到地址,建立监听连接,最后把文件句柄存入mp.slaveExtraFiles。
在这个过程中调用了(l *TCPListener) Close,但其实对work进程无影响,影响的只是master进程自己不能读写监听套接字。
这里引用下对网络套接字close和shutdown的区别:
close ---- 关闭本进程的socket id,但连接还是开着的,用这个socket id的其它进程还能用这个连接,能读或写这个socket id。
shutdown ---- 则破坏了socket 连接,读的时候可能侦探到EOF结束符,写的时候可能会收到一个SIGPIPE信号,这个信号可能直到socket buffer被填充了才收到,shutdown还有一个关闭方式的参数,0 不能再读,1不能再写,2 读写都不能。
将mp.slaveExtraFiles传递给子进程即work进程:
overseer-v1.1.6/proc_master.go
- func (mp *master) fork() error {
- mp.debugf("starting %s", mp.binPath)
- cmd := exec.Command(mp.binPath)
- //mark this new process as the "active" slave process.
- //this process is assumed to be holding the socket files.
- mp.slaveCmd = cmd
- mp.slaveID++
- //provide the slave process with some state
- e := os.Environ()
- e = append(e, envBinID+"="+hex.EncodeToString(mp.binHash))
- e = append(e, envBinPath+"="+mp.binPath)
- e = append(e, envSlaveID+"="+strconv.Itoa(mp.slaveID))
- e = append(e, envIsSlave+"=1")
- e = append(e, envNumFDs+"="+strconv.Itoa(len(mp.slaveExtraFiles)))
- cmd.Env = e
- //inherit master args/stdfiles
- cmd.Args = os.Args
- cmd.Stdin = os.Stdin
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- //include socket files
- cmd.ExtraFiles = mp.slaveExtraFiles
- if err := cmd.Start(); err != nil {
- return fmt.Errorf("Failed to start slave process: %s", err)
- }
- //was scheduled to restart, notify success
- if mp.restarting {
- mp.restartedAt = time.Now()
- mp.restarting = false
- mp.restarted <- true
- }
- //convert wait into channel
- cmdwait := make(chan error)
- go func() {
- cmdwait <- cmd.Wait()
- }()
- //wait....
- select {
- case err := <-cmdwait:
- //program exited before releasing descriptors
- //proxy exit code out to master
- code := 0
- if err != nil {
- code = 1
- if exiterr, ok := err.(*exec.ExitError); ok {
- if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
- code = status.ExitStatus()
- }
- }
- }
- mp.debugf("prog exited with %d", code)
- //if a restarts are disabled or if it was an
- //unexpected crash, proxy this exit straight
- //through to the main process
- if mp.NoRestart || !mp.restarting {
- os.Exit(code)
- }
- case <-mp.descriptorsReleased:
- //if descriptors are released, the program
- //has yielded control of its sockets and
- //a parallel instance of the program can be
- //started safely. it should serve state.Listeners
- //to ensure downtime is kept at <1sec. The previous
- //cmd.Wait() will still be consumed though the
- //result will be discarded.
- }
- return nil
- }
通过cmd.ExtraFiles = mp.slaveExtraFiles语句向子进程传递套接字,这个参数最终传递给fork系统调用,传递的fd会被子进程继承。
子进程即work进程处理继承的套接字:
overseer-v1.1.6/proc_slave.go
- func (sp *slave) run() error {
- sp.id = os.Getenv(envSlaveID)
- sp.debugf("run")
- sp.state.Enabled = true
- sp.state.ID = os.Getenv(envBinID)
- sp.state.StartedAt = time.Now()
- sp.state.Address = sp.Config.Address
- sp.state.Addresses = sp.Config.Addresses
- sp.state.GracefulShutdown = make(chan bool, 1)
- sp.state.BinPath = os.Getenv(envBinPath)
- if err := sp.watchParent(); err != nil {
- return err
- }
- if err := sp.initFileDescriptors(); err != nil {
- return err
- }
- sp.watchSignal()
- //run program with state
- sp.debugf("start program")
- sp.Config.Program(sp.state)
- return nil
- }
-
- func (sp *slave) initFileDescriptors() error {
- //inspect file descriptors
- numFDs, err := strconv.Atoi(os.Getenv(envNumFDs))
- if err != nil {
- return fmt.Errorf("invalid %s integer", envNumFDs)
- }
- sp.listeners = make([]*overseerListener, numFDs)
- sp.state.Listeners = make([]net.Listener, numFDs)
- for i := 0; i < numFDs; i++ {
- f := os.NewFile(uintptr(3+i), "")
- l, err := net.FileListener(f)
- if err != nil {
- return fmt.Errorf("failed to inherit file descriptor: %d", i)
- }
- u := newOverseerListener(l)
- sp.listeners[i] = u
- sp.state.Listeners[i] = u
- }
- if len(sp.state.Listeners) > 0 {
- sp.state.Listener = sp.state.Listeners[0]
- }
- return nil
- }
子进程只是重新包装套接字,并没有新建监听连接,包装成u := newOverseerListener(l)类型,这些监听套接字最后传递给sp.Config.Program(sp.state),即用户的启动程序:
overseer-v1.1.6/example/main.go
- // convert your 'main()' into a 'prog(state)'
- // 'prog()' is run in a child process
- func prog(state overseer.State) {
- fmt.Printf("app#%s (%s) listening...\n", BuildID, state.ID)
- http.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- d, _ := time.ParseDuration(r.URL.Query().Get("d"))
- time.Sleep(d)
- fmt.Fprintf(w, "app#%s (%s) %v says hello\n", BuildID, state.ID, state.StartedAt)
- }))
- http.Serve(state.Listener, nil)
- fmt.Printf("app#%s (%s) exiting...\n", BuildID, state.ID)
- }
-
- // then create another 'main' which runs the upgrades
- // 'main()' is run in the initial process
- func main() {
- overseer.Run(overseer.Config{
- Program: prog,
- Address: ":5001",
- Fetcher: &fetcher.File{Path: "my_app_next"},
- Debug: true, //display log of overseer actions
- TerminateTimeout: 10 * time.Minute,
- })
- }
在用户程序中http.Serve(state.Listener, nil)调用:
1、使用的accept方式是包装后的(l *overseerListener) Accept()。
2、defer l.Close()使用也是包装后的(l *overseerListener) Close()。
3、由(l *overseerListener) Accept()创建的work连接也都包装成了overseerConn连接,在关闭时会调用(o overseerConn) Close()
能够自动监视文件变化,有变更时自动触发重启流程。
在master进程启动时检查配置,如果设置了mp.Config.Fetcher则进入fetchLoop:
overseer-v1.1.6/proc_master.go
- // fetchLoop is run in a goroutine
- func (mp *master) fetchLoop() {
- min := mp.Config.MinFetchInterval
- time.Sleep(min)
- for {
- t0 := time.Now()
- mp.fetch()
- //duration fetch of fetch
- diff := time.Now().Sub(t0)
- if diff < min {
- delay := min - diff
- //ensures at least MinFetchInterval delay.
- //should be throttled by the fetcher!
- time.Sleep(delay)
- }
- }
- }
mp.Config.MinFetchInterval默认是1秒,也就是每秒检查一次变更。time.Duration类型,可以设置更小的粒度。
已经支持的fetcher包括:fetcher_file.go、fetcher_github.go、fetcher_http.go、fetcher_s3.go。
以fetcher_file.go为例说明。
1、文件变更的判断:
overseer-v1.1.6/proc_master.go
- //tee off to sha1
- hash := sha1.New()
- reader = io.TeeReader(reader, hash)
- //write to a temp file
- _, err = io.Copy(tmpBin, reader)
- if err != nil {
- mp.warnf("failed to write temp binary: %s", err)
- return
- }
- //compare hash
- newHash := hash.Sum(nil)
- if bytes.Equal(mp.binHash, newHash) {
- mp.debugf("hash match - skip")
- return
- }
通过sha1算法实现,比较新旧hash值,并没有关注文件时间戳。
2、验证是可执行文件,且是支持overseer的:
overseer-v1.1.6/proc_master.go
- tokenIn := token()
- cmd := exec.Command(tmpBinPath)
- cmd.Env = append(os.Environ(), []string{envBinCheck + "=" + tokenIn}...)
- cmd.Args = os.Args
- returned := false
- go func() {
- time.Sleep(5 * time.Second)
- if !returned {
- mp.warnf("sanity check against fetched executable timed-out, check overseer is running")
- if cmd.Process != nil {
- cmd.Process.Kill()
- }
- }
- }()
- tokenOut, err := cmd.CombinedOutput()
- returned = true
- if err != nil {
- mp.warnf("failed to run temp binary: %s (%s) output \"%s\"", err, tmpBinPath, tokenOut)
- return
- }
- if tokenIn != string(tokenOut) {
- mp.warnf("sanity check failed")
- return
- }
这是通过overseer预埋的代码实现的:
overseer-v1.1.6/overseer.go
- //sanityCheck returns true if a check was performed
- func sanityCheck() bool {
- //sanity check
- if token := os.Getenv(envBinCheck); token != "" {
- fmt.Fprint(os.Stdout, token)
- return true
- }
- //legacy sanity check using old env var
- if token := os.Getenv(envBinCheckLegacy); token != "" {
- fmt.Fprint(os.Stdout, token)
- return true
- }
- return false
- }
这段代码在main启动时在overseer.Run里会调用到,传递固定的环境变量,然后命令行输出会原样显示出来即为成功。
3、覆盖旧文件,并触发重启。
overseer-v1.1.6/proc_master.go
- //overwrite!
- if err := overwrite(mp.binPath, tmpBinPath); err != nil {
- mp.warnf("failed to overwrite binary: %s", err)
- return
- }
- mp.debugf("upgraded binary (%x -> %x)", mp.binHash[:12], newHash[:12])
- mp.binHash = newHash
- //binary successfully replaced
- if !mp.Config.NoRestartAfterFetch {
- mp.triggerRestart()
- }
由(mp *master) triggerRestart进入重启流程:
overseer-v1.1.6/proc_master.go
- func (mp *master) triggerRestart() {
- if mp.restarting {
- mp.debugf("already graceful restarting")
- return //skip
- } else if mp.slaveCmd == nil || mp.restarting {
- mp.debugf("no slave process")
- return //skip
- }
- mp.debugf("graceful restart triggered")
- mp.restarting = true
- mp.awaitingUSR1 = true
- mp.signalledAt = time.Now()
- mp.sendSignal(mp.Config.RestartSignal) //ask nicely to terminate
- select {
- case <-mp.restarted:
- //success
- mp.debugf("restart success")
- case <-time.After(mp.TerminateTimeout):
- //times up mr. process, we did ask nicely!
- mp.debugf("graceful timeout, forcing exit")
- mp.sendSignal(os.Kill)
- }
- }
向子进程发送mp.Config.RestartSignal信号,子进程收到信号后,关闭监听套接字然后向父进程发送SIGUSR1信号:
overseer-v1.1.6/proc_slave.go
- if len(sp.listeners) > 0 {
- //perform graceful shutdown
- for _, l := range sp.listeners {
- l.release(sp.Config.TerminateTimeout)
- }
- //signal release of held sockets, allows master to start
- //a new process before this child has actually exited.
- //early restarts not supported with restarts disabled.
- if !sp.NoRestart {
- sp.masterProc.Signal(SIGUSR1)
- }
- //listeners should be waiting on connections to close...
- }
父进程收到SIGUSR1信号后,通知mp.descriptorsReleased管道监听套接字已经关闭:
overseer-v1.1.6/proc_master.go
- //**during a restart** a SIGUSR1 signals
- //to the master process that, the file
- //descriptors have been released
- if mp.awaitingUSR1 && s == SIGUSR1 {
- mp.debugf("signaled, sockets ready")
- mp.awaitingUSR1 = false
- mp.descriptorsReleased <- true
- } else
最终回到(mp *master) fork函数,fork函数一直在等待mp.descriptorsReleased通知或者cmd.Wait子进程退出,收到管道通知后fork退出,进入下一轮fork循环。
overseer-v1.1.6/proc_master.go
- func (mp *master) fork() error {
- //... ...
- //... ...
- //... ...
- //convert wait into channel
- cmdwait := make(chan error)
- go func() {
- cmdwait <- cmd.Wait()
- }()
- //wait....
- select {
- case err := <-cmdwait:
- //program exited before releasing descriptors
- //proxy exit code out to master
- code := 0
- if err != nil {
- code = 1
- if exiterr, ok := err.(*exec.ExitError); ok {
- if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
- code = status.ExitStatus()
- }
- }
- }
- mp.debugf("prog exited with %d", code)
- //if a restarts are disabled or if it was an
- //unexpected crash, proxy this exit straight
- //through to the main process
- if mp.NoRestart || !mp.restarting {
- os.Exit(code)
- }
- case <-mp.descriptorsReleased:
- //if descriptors are released, the program
- //has yielded control of its sockets and
- //a parallel instance of the program can be
- //started safely. it should serve state.Listeners
- //to ensure downtime is kept at <1sec. The previous
- //cmd.Wait() will still be consumed though the
- //result will be discarded.
- }
- return nil
- }
--end--