• golang中零停机重启服务之套接字复用,endless


    本文借助 endless 源码来阐述。重点看 fork() 之后的逻辑。

    func (srv *endlessServer) fork() (err error) {
    	runningServerReg.Lock()
    	defer runningServerReg.Unlock()
    
    	// only one server instance should fork!
    	if runningServersForked {
    		return errors.New("Another process already forked. Ignoring this one.")
    	}
    
    	runningServersForked = true
    
    	var files = make([]*os.File, len(runningServers))
    	var orderArgs = make([]string, len(runningServers))
    	// get the accessor socket fds for _all_ server instances
    	for _, srvPtr := range runningServers {
    		// introspect.PrintTypeDump(srvPtr.EndlessListener)
    		switch srvPtr.EndlessListener.(type) {
    		case *endlessListener:
    			// normal listener
    			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.EndlessListener.(*endlessListener).File()
    		default:
    			// tls listener
    			files[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.tlsInnerListener.File()
    		}
    		orderArgs[socketPtrOffsetMap[srvPtr.Server.Addr]] = srvPtr.Server.Addr
    	}
    
    	env := append(
    		os.Environ(),
    		"ENDLESS_CONTINUE=1",
    	)
    	if len(runningServers) > 1 {
    		env = append(env, fmt.Sprintf(`ENDLESS_SOCKET_ORDER=%s`, strings.Join(orderArgs, ",")))
    	}
    
    	path := os.Args[0]
    	var args []string
    	if len(os.Args) > 1 {
    		args = os.Args[1:]
    	}
    
    	cmd := exec.Command(path, args...)
    	cmd.Stdout = os.Stdout
    	cmd.Stderr = os.Stderr
    	cmd.ExtraFiles = files
    	cmd.Env = env
    
    	err = cmd.Start()
    	if err != nil {
    		log.Fatalf("Restart: Failed to launch, error: %v", err)
    	}
    
    	return
    }
    
    func (el *endlessListener) File() *os.File {
    	// returns a dup(2) - FD_CLOEXEC flag *not* set
        // 这个注释不对
    	tl := el.Listener.(*net.TCPListener)
    	fl, _ := tl.File()
    	return fl
    }
    
    // File returns a copy of the underlying os.File.
    // It is the caller's responsibility to close f when finished.
    // Closing l does not affect f, and closing f does not affect l.
    //
    // The returned os.File's file descriptor is different from the
    // connection's. Attempting to change properties of the original
    // using this duplicate may or may not have the desired effect.
    func (l *TCPListener) File() (f *os.File, err error) {
    	if !l.ok() {
    		return nil, syscall.EINVAL
    	}
    	f, err = l.file()
    	if err != nil {
    		return nil, &OpError{Op: "file", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    	}
    	return
    }
    
    // dup 得到的是 fd 的一个副本
    func (ln *TCPListener) file() (*os.File, error) {
    	f, err := ln.fd.dup()
    	if err != nil {
    		return nil, err
    	}
    	return f, nil
    }
    
    // Network file descriptor.
    type netFD struct {
    	pfd poll.FD
    
    	// immutable until Close
    	family      int
    	sotype      int
    	isConnected bool // handshake completed or use of association with peer
    	net         string
    	laddr       Addr
    	raddr       Addr
    }
    
    // net/fd_unix.go
    func (fd *netFD) dup() (f *os.File, err error) {
    	ns, call, err := fd.pfd.Dup()
    	if err != nil {
    		if call != "" {
    			err = os.NewSyscallError(call, err)
    		}
    		return nil, err
    	}
    
    	return os.NewFile(uintptr(ns), fd.name()), nil
    }
    
    // internal/poll/fd_mutex.go
    // incref adds a reference to mu.
    // It reports whether mu is available for reading or writing.
    
    // decref removes a reference from mu.
    // It reports whether there is no remaining reference.
    
    // internal/poll/fd_unix.go
    // Dup duplicates the file descriptor.
    func (fd *FD) Dup() (int, string, error) {
    	if err := fd.incref(); err != nil {
    		return -1, "", err
    	}
    	defer fd.decref()
    	return DupCloseOnExec(fd.Sysfd)
    }
    
    // dupCloseOnExecOld is the traditional way to dup an fd and
    // set its O_CLOEXEC bit, using two system calls.
    func dupCloseOnExecOld(fd int) (int, string, error) {
    	syscall.ForkLock.RLock()
    	defer syscall.ForkLock.RUnlock()
    	newfd, err := syscall.Dup(fd)
    	if err != nil {
    		return -1, "dup", err
    	}
        // 为新的fd设置close_on_exec标志位
    	syscall.CloseOnExec(newfd)
    	return newfd, "", nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146

    fork() 调用 File() 方法得到的是 listener fd 的一个副本 fd-1,并且 fdfd-1 都被设置了 O_CLOEXEC 标志位

    1、关于dup系统调用syscall.Dup(fd)

    • dup系统调用会创建文件描述符的一个拷贝。
    • 新生成的文件描述符是进程当前可用的文件描述符编号的最小值。
    • 如果拷贝成功,两者都指向同一个打开的文件,因此共享所有的锁定,读写指针,和各项权限或标志位,因此可能存在交叉使用。
    • 如果拷贝错误则返回-1,错误代码存入errno中。
    • 调用dup族类函数得到的新文件描述符将清除O_CLOEXEC模式,这也是为什么源码中在dup之后重新设置O_CLOEXEC

    2、关于 O_CLOEXEC

    • 在一个进程中通过exec调用启动一个子进程,这个子进程会继承父进程打开的fd以及进行操作,出于安全考虑,有些fd,我们要求在子进程中无法继承,就需要设置这些fd的 O_CLOEXEC ,那么在启动子进程成功之后,内核会关闭子进程中的这些fd。
    • 调用open函数 O_CLOEXEC 模式打开的文件描述符在执行exec调用得到的新程序中关闭,且为原子操作。
    • 调用open函数不使用 O_CLOEXEC 模式打开的文件描述符,然后调用 fcntl 函数设置 FD_CLOEXEC 选项,效果和使用 O_CLOEXEC 选项open函数相同,但分别调用open、fcnt两个l函数,不是原子操作,多线程环境中存在竞态条件,故用open函数O_CLOEXEC选项代替之。

    在golang中,出于安全考虑,所有打开的文件描述符都会被设置 O_CLOEXEC,这使得子进程中继承过去的 fd 都会被内核关掉(这和传统的 exec 的效果不一样,默认子进程会继承父进程打开的 fd),因此,想要传递过去 fd ,就需要手动来操作,比如赋值 Cmd.ExtraFiles。比如打开文件操作,以及上面的 socket fd 皆是如此。

    func openFile(name string, flag int, perm FileMode) (file *File, err error) {
    	r, e := syscall.Open(fixLongPath(name), flag|syscall.O_CLOEXEC, syscallMode(perm))
    	if e != nil {
    		return nil, e
    	}
    	return newFile(r, name, "file"), nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在golang中,对于 Stdin, Stdout, Stderr 三个,可以通过 Cmd 结构体来传递,如果不传,会默认填充为/dev/null

    这也解释了,为什么golang中可以使用 3+i来给Cmd.ExtraFiles中的文件编号,因为0-stdin,1-stdout,2-stderr,多余的 fd 都被清掉了,剩下的就是Cmd.ExtraFiles,而 fd 的编号是从可以使用的最小的开始,比如现在有0,1,2,3,4,5,6,然后4被释放了,新来的 fd 就是4,而不是7,

    至此,fd-1 文件描述符就被成功传入子进程中,它是 fd 的副本,文件描述符都会对应一个内核中已经打开的文件,只不过 socket fd 比较特殊,它不是指向文件系统,而是保存在内存中的结构体SOCKET,在这个结构体里面有一个发送队列和一个接收队列。fd, fd-1 目前同时指向了这个SOCKET。

    下面进入子进程的启动流程,首先是getListener()

    func (srv *endlessServer) getListener(laddr string) (l net.Listener, err error) {
    	if srv.isChild {
    		var ptrOffset uint = 0
    		runningServerReg.RLock()
    		defer runningServerReg.RUnlock()
    		if len(socketPtrOffsetMap) > 0 {
    			ptrOffset = socketPtrOffsetMap[laddr]
    		}
    		// 使用 fd 编号构建 *os.File 对象
    		f := os.NewFile(uintptr(3+ptrOffset), "")
             // 使用此 *os.File 对象构建 net.Listener 对象
    		l, err = net.FileListener(f)
    		if err != nil {
    			err = fmt.Errorf("net.FileListener error: %v", err)
    			return
    		}
    	} else {
    		l, err = net.Listen("tcp", laddr)
    		if err != nil {
    			err = fmt.Errorf("net.Listen error: %v", err)
    			return
    		}
    	}
    	return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    fd-1 已经传递了进来,编号为3,但是还需要从代码层面进行包装才能被使用,首先构建成*os.File对象,然后构建成net.Listener对象,这里只是做了包装操作,并没有其他操作,因为 fd-1 对应的SOCKET已经处于监听状态了,这是父进程中net.Listen()完成的。

    至此,就变成了两个进程来消费SOCKET中的一个接收队列,这两个进程可以同时存在,以轮训的方式来消费。

    // Listen announces on the local network address.
    //
    // See func Listen for a description of the network and address
    // parameters.
    func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    	addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil)
    	if err != nil {
    		return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err}
    	}
    	sl := &sysListener{
    		ListenConfig: *lc,
    		network:      network,
    		address:      address,
    	}
    	var l Listener
    	la := addrs.first(isIPv4)
    	switch la := la.(type) {
    	case *TCPAddr:
    		l, err = sl.listenTCP(ctx, la) // *******
    	case *UnixAddr:
    		l, err = sl.listenUnix(ctx, la)
    	default:
    		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}}
    	}
    	if err != nil {
    		return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer
    	}
    	return l, nil
    }
    
    func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    	fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    	if err != nil {
    		return nil, err
    	}
    	return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
    }
    
    // 在这里完成了 socker_create, socker_bind, socket_listen 等基础工作
    func internetSocket(ctx context.Context, net string, laddr, raddr sockaddr, sotype, proto int, mode string, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    	if (runtime.GOOS == "aix" || runtime.GOOS == "windows" || runtime.GOOS == "openbsd") && mode == "dial" && raddr.isWildcard() {
    		raddr = raddr.toLocal(net)
    	}
    	family, ipv6only := favoriteAddrFamily(net, laddr, raddr, mode)
    	return socket(ctx, net, family, sotype, proto, ipv6only, laddr, raddr, ctrlFn)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    然后去发信号关闭父进程。

    func (el *endlessListener) Close() error {
    	if el.stopped {
    		return syscall.EINVAL
    	}
    
    	el.stopped = true
    	return el.Listener.Close()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    // internal/poll/fd_unix.go
    // Close closes the FD. The underlying file descriptor is closed by the
    // destroy method when there are no remaining references.
    func (fd *FD) Close() error {
    	if !fd.fdmu.increfAndClose() {
    		return errClosing(fd.isFile)
    	}
    
    	// Unblock any I/O.  Once it all unblocks and returns,
    	// so that it cannot be referring to fd.sysfd anymore,
    	// the final decref will close fd.sysfd. This should happen
    	// fairly quickly, since all the I/O is non-blocking, and any
    	// attempts to block in the pollDesc will return errClosing(fd.isFile).
    	fd.pd.evict()
    
    	// The call to decref will call destroy if there are no other
    	// references.
    	err := fd.decref()
    
    	// Wait until the descriptor is closed. If this was the only
    	// reference, it is already closed. Only wait if the file has
    	// not been set to blocking mode, as otherwise any current I/O
    	// may be blocking, and that would block the Close.
    	// No need for an atomic read of isBlocking, increfAndClose means
    	// we have exclusive access to fd.
    	if fd.isBlocking == 0 {
    		runtime_Semacquire(&fd.csema)
    	}
    
    	return err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    Listener.Close()只是关闭了fd,使其不再accept,而底层的SOCKET是由内核来回收的,当没有fd再指向它的时候就会被销毁。在这之前其实 fd-1 就已经指向了它,所以监听继续进行。

    关于查看进程的fd

    lsof -p 1299
    COMMAND   PID USER   FD      TYPE DEVICE SIZE/OFF     NODE NAME
    testproc 1299 root  cwd       DIR   0,58     4096      415 /data/www/golang/project/hashcompare
    testproc 1299 root  rtd       DIR  253,0     4096      128 /
    testproc 1299 root  txt       REG   0,58  6223068      433 /data/www/golang/project/hashcompare/testproc
    testproc 1299 root  mem       REG  253,0  2156240 33632456 /usr/lib64/libc-2.17.so
    testproc 1299 root  mem       REG  253,0   142144 34128089 /usr/lib64/libpthread-2.17.so
    testproc 1299 root  mem       REG  253,0   163312 34127918 /usr/lib64/ld-2.17.so
    testproc 1299 root    0u      CHR  136,0      0t0        3 /dev/pts/0
    testproc 1299 root    1u      CHR  136,0      0t0        3 /dev/pts/0
    testproc 1299 root    2u      CHR  136,0      0t0        3 /dev/pts/0
    testproc 1299 root    3u      REG   0,58      166      435 /data/www/golang/project/hashcompare/testproc_f1.txt
    testproc 1299 root    4u  a_inode    0,9        0     4554 [eventpoll]
    testproc 1299 root    5r     FIFO    0,8      0t0   936660 pipe
    testproc 1299 root    6w     FIFO    0,8      0t0   936660 pipe
    testproc 1299 root    7u     IPv6 936664      0t0      TCP *:6060 (LISTEN)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    或者

    ls -l /proc/1299/fd
    总用量 0
    lrwx------ 1 root root 64 1116 09:53 0 -> /dev/pts/0
    lrwx------ 1 root root 64 1116 09:53 1 -> /dev/pts/0
    lrwx------ 1 root root 64 1116 09:53 2 -> /dev/pts/0
    lrwx------ 1 root root 64 1116 09:53 3 -> /data/www/golang/project/hashcompare/testproc_f1.txt
    lrwx------ 1 root root 64 1116 09:53 4 -> anon_inode:[eventpoll]
    lr-x------ 1 root root 64 1116 09:53 5 -> pipe:[936660]
    l-wx------ 1 root root 64 1116 09:53 6 -> pipe:[936660]
    lrwx------ 1 root root 64 1116 09:53 7 -> socket:[936664]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    Unity—UGUI
    activiti7 报错Couldn‘t resolve collection expression nor variable reference
    如何设置代理ip服务器地址
    打家劫舍 III
    update select
    第7章 C语言的系统复习 (七)
    贪心算法--找换硬币
    Sql Server数据库附加数据库失败警告方法,有关详细信息,请单击“消息”列中的超链接(Win11)
    高德 几千条数据,点标记Marker转海量标注 LabelMarker
    需求文档的写法与项目汇报的流程
  • 原文地址:https://blog.csdn.net/raoxiaoya/article/details/127885836