• Reactor And Gev 详解 通俗易懂


    reactor 详解

    在类似网关这种海量连接, 很高的并发的场景, 比如有 10W+ 连接, go 开始变得吃力. 因为频繁的 goroutine 调度和 gc 导致程序性能很差. 这个时候我们可以考虑用经典的 reactor 网络模型来应对这种需求

    常见网络模型

    下面是目前常见的网络模型

    image-20220913173513514

    go 原生网络模型

    go 通过 IO 多路复用构建了一套简洁而高性能原生网络模型, 让开发者可以使用同步的模式编写异步的逻辑(给每个连接开一个协程处理), 极大降低开发的难度

    goroutine 从 fd Read() 数据但是又没有数据的时候, 会将当前的 goroutine 给 pack 住, 直到这个 fd 发生读事件这个 goroutine 才会重新 ready 激活(再次发生读事件就是 epoll 之类 IO 多路复用触发的)

    贴一个 给每个连接开一个协程处理 的 demo 代码

    package main
    
    import "net"
    
    func main() {
    	l, _ := net.Listen("tcp", ":5000")
    	for {
    		// 新的连接
    		conn, _ := l.Accept()
    		// 给每个连接开启一个协程处理
    		go handler(conn)
    	}
    }
    
    func handler(conn net.Conn) {
    	for {
    		// 读取数据
    		// 业务处理
    		// 写入返回数据
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    但是在类似网关这种海量连接, 很高的并发的场景, 比如有 10W+ 连接, go 开始变得吃力. 因为频繁的 goroutine 调度和 gc 导致程序性能很差. 这个时候我们可以考虑用经典的 reactor 来应对这种需求

    go 原生网络模型不是讨论的重点, 学习请参考 reference

    Proactor

    img

    第五个 Proactor 是从底层支持的真正的异步 IO, 底层在 IO 完成后会通知应用层

    其他都是同步 IO, 因为没有像同步 IO 一样成熟和广泛使用, 所以不常用

    Reactor 网络模型

    单 Reactor 单线程

    image-20220914103116065

    • Reactor 通过 IO多路复用 监听 IO 事件, 然后将 IO 事件分配给 Acceptor 或者对应的 Handler 处理

    • 如果是建立连接事件: 交给 Acceptor 处理, Acceptor 会通过 accept 方法获得一个连接, 然后再创建一个 handler 来处理对应的响应事件

    • 如果不是建立连接事件: 交个当前连接对应的 Handler 对象进行响应

    • Handler 对象需要从连接 read 数据 -> 业务处理 -> 将返回数据 write 到连接中

    Redis 单 Reactor 单线程模型

    image-20220914115332132

    • 使用 IO多路复用 循环获取事件并进行事件分发
    • 事件分发器会通过回调函数 串行 执行所有的事件(accept 连接创建, read, 业务处理, write)

    单 Reactor 多线程

    image-20220914112644927

    • Reactor 通过 IO多路复用 监听 IO 事件, 然后将 IO 事件分配给 Acceptor 或者对应的 Handler 处理

    • 如果不是建立连接事件: 交给 Acceptor 处理, Acceptor 会通过 accept 方法获得一个连接, 然后再创建一个 handler 来处理对应的响应事件

    • 如果不是是事件, 就交个当前连接对应的 Handler 对象进行响应

    前面都是一样的

    • 主线程 Handler 对象需要从连接 read 数据 , 然后给线程池 processor 执行
    • 线程池执行业务完毕后会添加写事件到 Reactor
    • Reactor 下一次事件循环的时候会处理写事件

    其实上面的模型给人一种 handler 要等待 processor 执行完成再执行 write 的感觉, 其实这样和单线程没啥区别, 所以不能这样理解

    正确的是 processor 执行完后将写事件添加到 Reactor 然后 Reactor 下一次循环的时候处理写事件

    下面是数据流动正确的图

    image-20220914144406927

    redis 6.0 的多线程 IO

    不同的是

    • redis 6.0 : IO 是多线程, 业务处理是主线程
    • 单 Reactor 多线程网络模型: IO 是主线程, 业务处理是多线程

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-q6XiqPss-1663395879556)(https://markdown-1304103443.cos.ap-guangzhou.myqcloud.com/2022-02-041460000040376115)]

    • 主线程 使用 Reactor epoll_wait (IO 多路复用) 获取可读的 socket fd, 并分配给 IO 线程池读取数据, 主线程阻塞等待 IO 线程池将数据读取完毕
    • IO 线程层读取数据并解析为 redis 命令, 放到读缓冲区
    • 主线程顺序执行解析好的命令, 并将返回的结果放到写缓冲区, 阻塞等待 IO 线程数据写入完毕
    • IO 线程写入返回的数据到 fd
    • 清空缓冲区和队列, 继续事件循环

    多 Reactor 多线程

    因为一个 Reactor 对象承担所有事件的监听和响应, 而且只在主线程中运行, 在面对瞬间高并发的场景时, 容易成为性能的瓶颈

    所以就考虑多 Reactor 来解决这个问题

    image-20220914152413657

    • 主线程的 Reactor 负责 accept 连接, 然后将连接交给线程池的 Reactor
    • 线程池的 Reactor 负责获取连接的读事件, 然后交给 handler 执行

    reference

    如何深刻理解Reactor和Proactor? - 知乎 (zhihu.com)

    如何用Go实现一个异步网络库? - 知乎 (zhihu.com)

    Go netpoller 网络模型之源码全面解析 - 知乎 (zhihu.com)

    Redis中的Reactor模型 - 掘金 (juejin.cn)

    Redis 6.0 多线程IO处理过程详解 - 知乎 (zhihu.com)

    Redis 6.0 新特性:带你 100% 掌握多线程模型 - SegmentFault 思否

    Millions of websocket and go

    gev 源码分析

    阅读这篇文章前, 请一定先了解 Reactor 网络模型

    简介

    gev 是一个轻量、快速的基于 Reactor 模式的非阻塞 TCP 网络库 / websocket server,支持自定义协议,轻松快速搭建高性能服务器。

    特点
    • 基于 epoll 和 kqueue 实现的高性能事件循环
    • 支持多核多线程
    • 动态扩容 Ring Buffer 实现的读写缓冲区
    • 异步读写
    • 自动清理空闲连接
    • SO_REUSEPORT 端口重用支持
    • 支持 WebSocket/Protobuf, 自定义协议
    • 支持定时任务,延时任务
    • 开箱即用的高性能 websocket server
    网络模型

    gev 只使用极少的 goroutine, 一个 goroutine 负责监听客户端连接,其他 goroutine (work 协程)负责处理已连接客户端的读写事件,work 协程数量可以配置,默认与运行主机 CPU 数量相同。

    gev 网络模型

    多 Reactor 多线程

    gev 是基于多 Reactor 多线程网络模型构建的, listener(MainReactor) 通过负载均衡策略将新连接分配给 worker(子线程Reactor)

    image-20220914152413657

    gev.NewServer 初始化

    // NewServer 创建 Server
    // @param handler 用户定义的 Handler 处理逻辑
    // @param opts 配置
    func NewServer(handler Handler, opts ...Option) (server *Server, err error)
    
    • 1
    • 2
    • 3
    • 4

    Handler 用户自定义的处理逻辑的函数

    // Handler Server 注册接口
    type Handler interface {
    	CallBack
    	OnConnect(c *Connection)
    }
    
    type CallBack interface {
    	OnMessage(c *Connection, ctx interface{}, data []byte) interface{}
    	OnClose(c *Connection)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    配置参数

    // Options 服务配置
    type Options struct {
    	Network   string              // 目前只支持 tcp
    	Address   string              // 监听地址
    	NumLoops  int                 // worker 的数量
    	ReusePort bool                // 是否可重用端口
    	IdleTime  time.Duration       // 最大空闲时间
    	Protocol  Protocol            // 自定义数据包的拆包解包 Pack, UnPack
    	Strategy  LoadBalanceStrategy // 负载均衡策略, 目前支持轮询和最小连接数
    
    	tick                        time.Duration // 层级时间轮的 tick
    	wheelSize                   int64         // 层级时间轮 size
    	metricsPath, metricsAddress string        // metric 指标暴露地址
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    Server

    // Server gev Server
    type Server struct {
    	listener  *listener              // 监听的主 Reactor
    	workLoops []*eventloop.EventLoop // worker Reactor
    	callback  Handler                // 用户定义的 Handler 函数
    
    	timingWheel *timingwheel.TimingWheel // 类似 kafka 中的层级定时器
    	opts        *Options                 // Options 配置
    	running     atomic.Bool              // 状态是否为正在运行
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    listener

    // listener 监听TCP连接, 作为多Reactor多线程网络模型中的主 Reactor
    type listener struct {
    	file     *os.File             // 监听的 socket 文件
    	fd       int                  // 监听的 socket 文件 fd
    	handleC  handleConnFunc       // 处理新连接 Conn 的函数(默认是通过负载均衡交给 worker)
    	listener net.Listener         // Listener 对象
    	loop     *eventloop.EventLoop // 事件循环
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    eventloop: listener 和 worker 都是通过 eventloop 来实现的

    type eventLoopLocal struct {
    	ConnCunt   atomic.Int64      // 连接数
    	needWake   *atomic.Bool      // 是否需要唤醒
    	poll       *poller.Poller    // poller linux 下由 epoll 实现
    	mu         spinlock.SpinLock // 自旋锁
    	sockets    map[int]Socket    // fd -> Socket
    	packet     []byte            // 内部使用,临时缓冲区
    	taskQueueW []func()          // 写事件队列
    	taskQueueR []func()          // 读事件队列
    
    	UserBuffer *[]byte // 用户缓冲区
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    poller.Poller 在 linux 下由 epoll 实现, eventloop 通过 poller.Poller 来实现事件循环

    type Poller struct {
    	fd       int
    	eventFd  int
    	buf      []byte
    	running  atomic.Bool
    	waitDone chan struct{}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总结:

    • NewServer 初始化 Server, 会初始化一个 listener (主 Reactor), 一个或多个 worker Reactor, 一个层级时间轮, 一个用户自定义的 Handler
    • listener, 和 worker 通过 poller.Poller 来实现事件循环, listener 通过负载均衡策略将新连接分配给 worker (Reactor 网络模型是: 一个主 Reactor, 多个 worker Reactor)

    image-20220914152413657

    s.Start() 运行

    // Start 启动 Server
    // 在协程中启动主 Reactor 和 worker Reactor, 开始事件循环
    func (s *Server) Start() {
    	sw := sync.WaitGroupWrapper{}
    	// 启动定时器
    	s.timingWheel.Start()
    
    	// 启动 worker
    	length := len(s.workLoops)
    	for i := 0; i < length; i++ {
    		sw.AddAndRun(s.workLoops[i].Run)
    	}
    
    	// 启动主 Reactor
    	sw.AddAndRun(s.listener.Run)
    	s.running.Set(true)
    	sw.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    listener.Run 和 worker.Run 都是通过 eventloop.Run 来实现的, 传入的回调函数不同

    // Run 启动事件循环
    func (l *EventLoop) Run() {
    	l.poll.Poll(l.handlerEvent)
    }
    
    • 1
    • 2
    • 3
    • 4

    eventloop.Run 是由 poll 实现的

    // Poll 启动 epoll wait 循环
    func (ep *Poller) Poll(handler func(fd int, event Event)) {
    	defer func() {
    		close(ep.waitDone)
    	}()
    	events := make([]unix.EpollEvent, waitEventsBegin)
    	var (
    		wake bool
    		msec int
    	)
    	ep.running.Set(true)
    	for {
    		// wait 等待事件并读取
    		n, err := unix.EpollWait(ep.fd, events, msec)
    		if err != nil && err != unix.EINTR {
    			log.Error("EpollWait: ", err)
    			continue
    		}
    		if n <= 0 {
    			msec = -1
    			runtime.Gosched()
    			continue
    		}
    		msec = 0
    		// 遍历读取的事件
    		for i := 0; i < n; i++ {
    			fd := int(events[i].Fd)
    			if fd != ep.eventFd {
    				var rEvents Event
    				if ((events[i].Events & unix.POLLHUP) != 0) && ((events[i].Events & unix.POLLIN) == 0) {
    					rEvents |= EventErr
    				}
    				if (events[i].Events&unix.EPOLLERR != 0) || (events[i].Events&unix.EPOLLOUT != 0) {
    					rEvents |= EventWrite
    				}
    				if events[i].Events&(unix.EPOLLIN|unix.EPOLLPRI|unix.EPOLLRDHUP) != 0 {
    					rEvents |= EventRead
    				}
    				// 回调函数处理事件
    				handler(fd, rEvents)
    			} else {
    				ep.wakeHandlerRead()
    				wake = true
    			}
    		}
    
    		if wake {
    			handler(-1, 0)
    			wake = false
    			if !ep.running.Get() {
    				return
    			}
    		}
    		if n == len(events) {
    			events = make([]unix.EpollEvent, n*2)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    poll 会从 EpollWait 中等待读/写事件, 并交给回调函数处理

    我们看看 listener.Run 的回调函数 handleNewConnection

    func (s *Server) handleNewConnection(fd int, sa unix.Sockaddr) {
    	// 通过负载均衡找到一个 worker
    	loop := s.opts.Strategy(s.workLoops)
    
    	// 创建连接
    	c := NewConnection(fd, loop, sa, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
    
    	// 添加一个事件, 将连接放到 worker 里面
    	loop.QueueInLoop(func() {
    		s.callback.OnConnect(c)
    		if err := loop.AddSocketAndEnableRead(fd, c); err != nil {
    			log.Error("[AddSocketAndEnableRead]", err)
    		}
    	})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    我们看看 workLoops[i].Run 的回调函数 handleLoop

    // workLoops[i].Run 的回调函数
    func (l *EventLoop) handlerEvent(fd int, events poller.Event) {
    	if fd != -1 {
    		// 找到 fd 对应的 socket 调用 socket.HandleEvent
    		s, ok := l.sockets[fd]
    		if ok {
    			s.HandleEvent(fd, events)
    		}
    	} else {
    		l.needWake.Set(true)
    		l.doPendingFunc()
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    EventLoop.handleLoop 回调的是 Socket.HandleEvent

    Socket 是一个接口, Connection 是 Socket 的实现类

    我们看看 Collection.HandleEvent

    // HandleEvent 内部使用,event loop 回调
    func (c *Connection) HandleEvent(fd int, events poller.Event) {
    	// ...
    
    	// 处理错误事件, 关闭连接
    	if events&poller.EventErr != 0 {
    		c.handleClose(fd)
    		return
    	}
    
    	// 处理写事件
    	if !c.outBuffer.IsEmpty() {
    		if events&poller.EventWrite != 0 {
    			// if return true, it means closed
    			if c.handleWrite(fd) {
    				return
    			}
    			// ...
    		}
    	} else if events&poller.EventRead != 0 {
    		// 处理读事件
    		if c.handleRead(fd) {
    			return
    		}
    
    		// ...
    	}
    
    	// ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    我们看看处理读事件 c.handleRead(fd)

    // 处理读事件
    func (c *Connection) handleRead(fd int) (closed bool) {
    	// TODO 避免这次内存拷贝
    	// 将数据读取到 buf
    	buf := c.loop.PacketBuf()
    	n, err := unix.Read(c.fd, buf)
    	// ...
    
    	// read buffer 读完了
    	if c.inBuffer.IsEmpty() {
    		// 将 buf 中的数据写入 buffer
    		c.buffer.WithData(buf[:n])
    		buf = buf[n:n]
    		// 协议解码 UnPacket -> 协议处理 OnMessage -> 返回的数据编码 Packet
    		c.handlerProtocol(&buf, c.buffer)
    
    		if !c.buffer.IsEmpty() {
    			first, _ := c.buffer.PeekAll()
    			_, _ = c.inBuffer.Write(first)
    		}
    	} else { // read buffer 还有数据
    		// 将 buf 中的数据写入 read buffer
    		_, _ = c.inBuffer.Write(buf[:n])
    		buf = buf[:0]
    		// 协议解码 UnPacket -> 协议处理 OnMessage -> 返回的数据编码 Packet
    		c.handlerProtocol(&buf, c.inBuffer)
    	}
    
    	// 将返回的数据发送写给客户端 fd
    	if len(buf) != 0 {
    		closed = c.sendInLoop(buf)
    	}
    	return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    我们看看 handlerProtocol 怎么做的

    // 协议解码 UnPacket -> 协议处理 OnMessage -> 返回的数据编码 Packet
    // @param tmpBuffer 临时 buffer, 用于存储返回的数据
    // @param buffer 已经读取好的 buffer
    func (c *Connection) handlerProtocol(tmpBuffer *[]byte, buffer *ringbuffer.RingBuffer) {
    	ctx, receivedData := c.protocol.UnPacket(c, buffer)
    	for ctx != nil || len(receivedData) != 0 {
    		sendData := c.callBack.OnMessage(c, ctx, receivedData)
    		if sendData != nil {
    			*tmpBuffer = append(*tmpBuffer, c.protocol.Packet(c, sendData)...)
    		}
    		// 如果有多个数据包,继续解析处理
    		ctx, receivedData = c.protocol.UnPacket(c, buffer)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    OnMessage 就是我们为了实现 Handler 接口定义的回调函数, 例如

    type example struct {
    	Count atomic.Int64
    }
    
    func (s *example) OnConnect(c *gev.Connection) {
    	s.Count.Add(1)
    	//log.Println(" OnConnect : ", c.PeerAddr())
    }
    func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
    	//log.Println("OnMessage")
    	out = data
    	return
    }
    
    func (s *example) OnClose(c *gev.Connection) {
    	s.Count.Add(-1)
    	//log.Println("OnClose")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    我们看看 OnMessage 处理完后返回的数据怎么返回给客户端的 c.sendInLoop(buf)

    buf 是 OnMessage 处理完后返回的数据, 并且 Packet 之后的数据

    // 将返回的数据放到 outBuffer 中
    func (c *Connection) sendInLoop(data []byte) (closed bool) {
    	// 如果 outBuffer 为空, 直接将数据放到 outBuffer 中
    	if !c.outBuffer.IsEmpty() {
    		_, _ = c.outBuffer.Write(data)
    	} else {
    		// 如果 outBuffer 不为空, 尝试写入数据
    		n, err := unix.Write(c.fd, data)
    		if err != nil && err != unix.EAGAIN {
    			c.handleClose(c.fd)
    			closed = true
    			return
    		}
    
    		// 把没写完的数据放到 outBuffer 中
    		if n <= 0 {
    			_, _ = c.outBuffer.Write(data)
    		} else if n < len(data) {
    			_, _ = c.outBuffer.Write(data[n:])
    		}
    
    		// 如果 outBuffer 不为空, 则注册写事件
    		if !c.outBuffer.IsEmpty() {
    			_ = c.loop.EnableReadWrite(c.fd)
    		}
    	}
    
    	return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    gev 处理请求后, 会将返回的数据储存到 outBuffer 里面 (如果 outBuffer 为空会先尝试直接将数据发送到网卡), 然后注册写事件到 poller (linux 使用 epoll 实现), 循环 epoll 事件的时候, 如果有写事件, 就会将 outBuffer 中的数据发送到网卡

    快速开始

    参考 GitHub 库中的快速开始和 example, 文档里面已经写的很好了, 这里就不再赘述了

    贴一个 echo 的快速开始

    package main
    
    import (
    	"flag"
    	"net/http"
    	_ "net/http/pprof"
    	"strconv"
    	"time"
    
    	"github.com/Allenxuxu/gev"
    	"github.com/Allenxuxu/gev/log"
    	"github.com/Allenxuxu/toolkit/sync/atomic"
    )
    
    type example struct {
    	Count atomic.Int64
    }
    
    func (s *example) OnConnect(c *gev.Connection) {
    	s.Count.Add(1)
    	//log.Println(" OnConnect : ", c.PeerAddr())
    }
    func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
    	//log.Println("OnMessage")
    	out = data
    	return
    }
    
    func (s *example) OnClose(c *gev.Connection) {
    	s.Count.Add(-1)
    	//log.Println("OnClose")
    }
    
    func main() {
    	go func() {
    		if err := http.ListenAndServe(":6060", nil); err != nil {
    			panic(err)
    		}
    	}()
    
    	handler := new(example)
    	var port int
    	var loops int
    
    	flag.IntVar(&port, "port", 1833, "server port")
    	flag.IntVar(&loops, "loops", -1, "num loops")
    	flag.Parse()
    
    	s, err := gev.NewServer(handler,
    		gev.Network("tcp"),
    		gev.Address(":"+strconv.Itoa(port)),
    		gev.NumLoops(loops),
    		gev.MetricsServer("", ":9091"),
    	)
    	if err != nil {
    		panic(err)
    	}
    
    	s.RunEvery(time.Second*2, func() {
    		log.Info("connections :", handler.Count.Get())
    	})
    
    	s.Start()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    reference

    https://github.com/Allenxuxu/gev

  • 相关阅读:
    Rust 过程宏 proc-macro 是个啥
    Python之第七章 函数 --- 迭代器、生成器、装饰器
    springsecurity实现单点登录
    太速科技-基于XC7Z100+灵汐KA200的图像处理类脑平台
    java spring cloud 企业电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展
    锂电池保护IC:HC5106锂电池保护IC,供应储能电源、无线吸尘器等电动工具、电动自行车、无人机、笔记本电脑等电子产品
    基于模糊PID的液压舵机伺服系统
    RedisObject各属性结构的作用
    C函数使用
    VS2015+opencv 3.4.6开发环境
  • 原文地址:https://blog.csdn.net/jarvan5/article/details/126905052