• GO实现Redis:GO实现TCP服务器(1)



    • 本文实现一个Echo TCP Server
    • 完整代码:https://github.com/csgopher/go-redis
    • 本文涉及以下文件:
      handler:处理连接,客户端传来的指令
      server:服务端
      echo:测试
      main

    interface/tcp/Handler.go

    type Handler interface {
       Handle(ctx context.Context, conn net.Conn) // 处理连接
       Close() error
    }
    
    • Handler:业务逻辑的处理接口
      • Handle(ctx context.Context, conn net.Conn) 处理连接

    tcp/server.go

    type Config struct {
        Address string // 监听地址
    }
    
    func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
        closeChan := make(chan struct{})
        listen, err := net.Listen("tcp", cfg.Address)
        if err != nil {
           return err
       }
        logger.Info("start listen")
        ListenAndServe(listen, handler, closeChan)
        return nil
    }
    
    func ListenAndServe(listener net.Listener,
                        handler tcp.Handler,
                        closeChan <-chan struct{}) {
        ctx := context.Background() // ctx是上下文,可以传递一些参数。
        var waitDone sync.WaitGroup
        for true { // 死循环中接收到新连接时,让一个协程去处理连接
            conn, err := listener.Accept()
            if err != nil {
                break
            }
            logger.Info("accept link")
            waitDone.Add(1)
            go func() {
                defer func() {
                    waitDone.Done()
                }()
                handler.Handler(ctx, conn)
            }()
        }
        waitDone.Wait() // 如果listener.Accept()出错了就会break跳出来,这时候需要等待已经服务的客户端退出。使用WaitGroup等待客服端退出
    }
    
    • Config:启动tcp服务器的配置
      • Address:监听地址
    • ListenAndServe:ctx是上下文,可以传递一些参数。死循环中接收到新连接时,让一个协程去处理连接
    • 如果listener.Accept()出错了就会break跳出来,这时候需要等待已经服务的客户端退出。使用WaitGroup等待客服端退出

    func ListenAndServe(listener net.Listener,
                        handler tcp.Handler,
                        closeChan <-chan struct{}) {
    	// listener和handler在退出的时候需要关掉。如果用户直接kill掉了程序,我们也需要关掉listener和handler,这时候要使用closeChan,一旦接收到关闭信号,就执行关闭逻辑
        go func() {
           <-closeChan
           logger.Info("shutting down...")
           _ = listener.Close()
           _ = handler.Close()
       }()
    
        defer func() {
           _ = listener.Close()
           _ = handler.Close()
       }()
    
        ......
    }
    

    listener和handler在退出的时候需要关掉。如果用户直接kill掉了程序,我们也需要关掉listener和handler,这时候要使用closeChan,一旦接收到关闭信号,就执行关闭逻辑

    func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
    
        closeChan := make(chan struct{})
        sigCh := make(chan os.Signal)
        signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
        go func() {
           sig := <-sigCh // 当系统对程序发送信号时,sigCh会接收到信号
           switch sig {
              case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
              closeChan <- struct{}{}
          }
       }()
        listen, err := net.Listen("tcp", cfg.Address)
        if err != nil {
           return err
       }
        logger.Info("start listen")
        ListenAndServe(listen, handler, closeChan)
        return nil
    }
    

    当系统对程序发送信号时,sigCh会接收到信号

    tcp/echo.go

    type EchoHandler struct {
       activeConn sync.Map
       closing    atomic.Boolean
    }
    

    EchoHandler:

    • activeConn:记录连接
    • closing:是否正在关闭,有并发竞争,使用atomic.Boolean

    type EchoClient struct {
       Conn    net.Conn
       Waiting wait.Wait
    }
    
    func (c *EchoClient) Close() error {
    	c.Waiting.WaitWithTimeout(10 * time.Second)
    	_ = c.Conn.Close()
    	return nil
    }
    

    EchoClient:一个客户端就是一个连接。Close方法关闭客户端连接,超时时间设置为10s

    func MakeHandler() *EchoHandler {
    	return &EchoHandler{}
    }
    
    // Handle:处理客户端的连接。
    func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
       // 连接正在关闭,不接收新连接
       if h.closing.Get() {
          _ = conn.Close()
       }
    
       client := &EchoClient{
          Conn: conn,
       }
       h.activeConn.Store(client, struct{}{}) // 2.存储新连接,value用空结构体
    
       reader := bufio.NewReader(conn)
       for {
          msg, err := reader.ReadString('\n') // 3.使用缓存区接收用户发来的数据,使用\n作为结束的标志
          if err != nil {
             if err == io.EOF {
                logger.Info("connection close")
                h.activeConn.Delete(client)
             } else {
                logger.Warn(err)
             }
             return
          }
          // 正在处理业务,不要关掉
          client.Waiting.Add(1)
          // 将数据原封不动写回去,测试
          b := []byte(msg)
          _, _ = conn.Write(b)
          client.Waiting.Done()
       }
    }
    
    func (h *EchoHandler) Close() error {
       logger.Info("handler shutting down...")
       h.closing.Set(true)
       h.activeConn.Range(func(key interface{}, val interface{}) bool {
          client := key.(*EchoClient)
          _ = client.Close()
          return true
       })
       return nil
    }
    
    • MakeEchoHandler:创建EchoHandler
    • Handle:处理客户端的连接。
      • 1.连接正在关闭时,不接收新连接
      • 2.存储新连接,value用空结构体
      • 3.使用缓存区接收用户发来的数据,使用\n作为结束的标志
    • Close:将所有客户端连接关掉

    main.go

    const configFile string = "redis.conf"
    
    var defaultProperties = &config.ServerProperties{
       Bind: "0.0.0.0",
       Port: 6379,
    }
    
    func fileExists(filename string) bool {
       info, err := os.Stat(filename)
       return err == nil && !info.IsDir()
    }
    
    func main() {
       logger.Setup(&logger.Settings{
          Path:       "logs",
          Name:       "godis",
          Ext:        "log",
          TimeFormat: "2022-02-02",
       })
    
       if fileExists(configFile) {
          config.SetupConfig(configFile)
       } else {
          config.Properties = defaultProperties
       }
    
       err := tcp.ListenAndServeWithSignal(
          &tcp.Config{
             Address: fmt.Sprintf("%s:%d",
                config.Properties.Bind,
                config.Properties.Port),
          },
          EchoHandler.MakeHandler())
       if err != nil {
          logger.Error(err)
       }
    }
    
  • 相关阅读:
    Java 程序设计报告[对接java的迭代器接口]
    web前端面试题附答案041 - 曾经一个百度面试官问我,localStorage可以存对象吗?
    【计算机网络】IP协议第二讲(Mac帧、IP地址、碰撞检测、ARP协议介绍)
    使用MybatisPlus时出现的java.lang.NullPointerException异常~
    Linux服务器部署java项目
    【软考软件评测师】2020综合知识历年真题
    【C语言 数据结构】线性表 - 顺序表的实现
    Linux(五)__系统管理
    Kernel Memory 入门系列: RAG 简介
    Frida系列--Stalker原理分析
  • 原文地址:https://www.cnblogs.com/csgopher/p/17248642.html