• GO实现Redis:GO实现Redis的AOF持久化(4)


    • 将用户发来的指令以RESP协议的形式存储在本地的AOF文件,重启Redis后执行此文件恢复数据
    • https://github.com/csgopher/go-redis
    • 本文涉及以下文件:
      redis.conf:配置文件
      aof:实现aof

    redis.conf

    appendonly yes
    appendfilename appendonly.aof
    

    aof/aof.go

    type CmdLine = [][]byte
    
    const (
       aofQueueSize = 1 << 16
    )
    
    type payload struct {
       cmdLine CmdLine
       dbIndex int
    }
    
    // AofHandler:1.从管道中接收数据 2.写入AOF文件
    type AofHandler struct {
       db          databaseface.Database
       aofChan     chan *payload
       aofFile     *os.File
       aofFilename string
       currentDB   int
    }
    
    func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {
       handler := &AofHandler{}
       handler.aofFilename = config.Properties.AppendFilename
       handler.db = db
       handler.LoadAof()
       aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
       if err != nil {
          return nil, err
       }
       handler.aofFile = aofFile
       handler.aofChan = make(chan *payload, aofQueueSize)
       go func() {
          handler.handleAof()
       }()
       return handler, nil
    }
    
    // AddAof:用户的指令包装成payload放入管道
    func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {
       if config.Properties.AppendOnly && handler.aofChan != nil {
          handler.aofChan <- &payload{
             cmdLine: cmdLine,
             dbIndex: dbIndex,
          }
       }
    }
    
    // handleAof:将管道中的payload写入磁盘
    func (handler *AofHandler) handleAof() {
       handler.currentDB = 0
       for p := range handler.aofChan {
          if p.dbIndex != handler.currentDB {
             // select db
             data := reply.MakeMultiBulkReply(utils.ToCmdLine("SELECT", strconv.Itoa(p.dbIndex))).ToBytes()
             _, err := handler.aofFile.Write(data)
             if err != nil {
                logger.Warn(err)
                continue
             }
             handler.currentDB = p.dbIndex
          }
          data := reply.MakeMultiBulkReply(p.cmdLine).ToBytes()
          _, err := handler.aofFile.Write(data)
          if err != nil {
             logger.Warn(err)
          }
       }
    }
    
    // LoadAof:重启Redis后加载aof文件
    func (handler *AofHandler) LoadAof() {
       file, err := os.Open(handler.aofFilename)
       if err != nil {
          logger.Warn(err)
          return
       }
       defer file.Close()
       ch := parser.ParseStream(file)
       fakeConn := &connection.Connection{}
       for p := range ch {
          if p.Err != nil {
             if p.Err == io.EOF {
                break
             }
             logger.Error("parse error: " + p.Err.Error())
             continue
          }
          if p.Data == nil {
             logger.Error("empty payload")
             continue
          }
          r, ok := p.Data.(*reply.MultiBulkReply)
          if !ok {
             logger.Error("require multi bulk reply")
             continue
          }
          ret := handler.db.Exec(fakeConn, r.Args)
          if reply.IsErrorReply(ret) {
             logger.Error("exec err", err)
          }
       }
    }
    

    AofHandler:1.从管道中接收数据 2.写入AOF文件
    AddAof:用户的指令包装成payload放入管道
    handleAof:将管道中的payload写入磁盘
    LoadAof:重启Redis后加载aof文件

    database/database.go

    // 将AOF加入到database里
    type Database struct {
       dbSet []*DB
       aofHandler *aof.AofHandler
    }
    
    func NewDatabase() *Database {
       mdb := &Database{}
       if config.Properties.Databases == 0 {
          config.Properties.Databases = 16
       }
       mdb.dbSet = make([]*DB, config.Properties.Databases)
       for i := range mdb.dbSet {
          singleDB := makeDB()
          singleDB.index = i
          mdb.dbSet[i] = singleDB
       }
       if config.Properties.AppendOnly {
          aofHandler, err := aof.NewAOFHandler(mdb)
          if err != nil {
             panic(err)
          }
          mdb.aofHandler = aofHandler
          for _, db := range mdb.dbSet {
    	 //使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db
             singleDB := db
             singleDB.addAof = func(line CmdLine) {
                mdb.aofHandler.AddAof(singleDB.index, line)
             }
          }
       }
       return mdb
    }
    

    将AOF加入到database里
    使用singleDB的原因:因为在循环中获取返回变量的地址都完全相同,因此当我们想要访问数组中元素所在的地址时,不应该直接获取 range 返回的变量地址 db,而应该使用 singleDB := db

    database/db.go

    type DB struct {
       index int
       data   dict.Dict
       addAof func(CmdLine)
    }
    
    func makeDB() *DB {
    	db := &DB{
    		data:   dict.MakeSyncDict(),
    		addAof: func(line CmdLine) {},
    	}
    	return db
    }
    

    由于分数据库db引用不到aof,所以添加一个addAof匿名函数,在NewDatabase中用这个匿名函数调用AddAof

    database/keys.go

    func execDel(db *DB, args [][]byte) resp.Reply {
       ......
       if deleted > 0 {
          db.addAof(utils.ToCmdLine2("del", args...)) // 添加addAof方法
       }
       return reply.MakeIntReply(int64(deleted))
    }
    
    func execFlushDB(db *DB, args [][]byte) resp.Reply {
    	db.Flush()
    	db.addAof(utils.ToCmdLine2("flushdb", args...)) // 添加addAof方法
    	return &reply.OkReply{}
    }
    
    func execRename(db *DB, args [][]byte) resp.Reply {
    	......
    	db.addAof(utils.ToCmdLine2("rename", args...)) // 添加addAof方法
    	return &reply.OkReply{}
    }
    
    func execRenameNx(db *DB, args [][]byte) resp.Reply {
    	......
    	db.addAof(utils.ToCmdLine2("renamenx", args...)) // 添加addAof方法
    	return reply.MakeIntReply(1)
    }
    

    database/string.go

    func execSet(db *DB, args [][]byte) resp.Reply {
       ......
       db.addAof(utils.ToCmdLine2("set", args...)) // 添加addAof方法
       return &reply.OkReply{}
    }
    
    func execSetNX(db *DB, args [][]byte) resp.Reply {
       ......
       db.addAof(utils.ToCmdLine2("setnx", args...)) // 添加addAof方法
       return reply.MakeIntReply(int64(result))
    }
    
    func execGetSet(db *DB, args [][]byte) resp.Reply {
       key := string(args[0])
       value := args[1]
    
       entity, exists := db.GetEntity(key)
       db.PutEntity(key, &database.DataEntity{Data: value})
       db.addAof(utils.ToCmdLine2("getset", args...)) // 添加addAof方法
       ......
    }
    

    添加addAof方法

    测试命令

    *3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
    *2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
    *2\r\n$6\r\nSELECT\r\n$1\r\n1\r\n
    
  • 相关阅读:
    第1章 初识MyBatis框架
    服务器与客户端交互小栗子(java代码实现最基本的服务器实例)
    现货白银的价格如何变动
    CSS之实现线性渐变背景
    【Scan Kit】集成扫码服务时Android Studio总是报错OOM如何解决?
    Android 应用权限
    pdf转word需要密码怎么办?几个方法教你解决
    FPGA project : sdram
    Linux高性能服务器编程 学习笔记 第十三章 多线程编程
    简单代码,随机点名器
  • 原文地址:https://www.cnblogs.com/csgopher/p/17249337.html