• Golang之封装Mysql Slave小例子


    这个例子是在学习了mysql协议时,根据canal中源码采用 go 语言来写的 ,主要是用于学习 mysql slave 是如何注册到 mysql 上面,包括 mysql协议 如何发送数据包以及读取数据包;由于空闲时间不多,所以功能不是很完善,主要是实现了tls 握手以及 binlog 网路流的开启;只用于学习协议使用(代码很烂,大佬们轻喷);废话不多说,上代码

    java版以及协议详细说明:https://blog.csdn.net/weixin_43915643/article/details/126506492?spm=1001.2014.3001.5501

    1. 包结构

    • command:封装命令包实体
    • auth:插件验证包
    • common:通用常量
    • connector:连接器
    • event:事件读取包
      在这里插入图片描述

    2. 代码实例

    本次例子的目的是:通过 tls 开启 binlog 流就算成功

    func main() {
    	connector.NewConnection(connector.ConnectionConfig{
    		Addr:     "localhost:3306",
    		Username: "root",
    		Password: "123456",
    	})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    我这里使用两个依赖:github上面的开源项目,组件已经封装的比较完全,我借用了 mysql.PacketIO 用于读取数据和写入数据,但是 binlog 流读取数据的格式跟 mysql.PacketIO 中提供的 api ReadPacket 读取的格式不一样所以,只能读第一个事件,后续mysql就会发送读取位置错误的包;这不影响,本次例子只在前置握手阶段,不做事件的解析,以后有时间的话会替换 mysql.PacketIO 对象自己封装tcp流

    • github.com/flike/kingshard/backend
    • github.com/flike/kingshard/mysql

    在这里插入图片描述

    3. 代码

    注:mysql新版协议中,COM_BINLOG_DUMP 新增了一个 BINLOG_DUMP_NON_BLOCK 用来表示是否开启一个binlog阻塞流,有了这个功能之后,就不需要执行 COM_REGISTER_SLAVE 协议来注册slave了

    3.1 连接器(connector)

    最主要的方法 NewConnection() 开启一个 tcp 连接,根据连接 mysql 服务端返回的初始包,设置对应的客户端能力,并且将服务端升级为 tls 进行数据传输

    package connector
    
    import (
    	"context"
    	"crypto/tls"
    	"errors"
    	"fmt"
    	_ "fmt"
    	"gee/mysql_connector/auth"
    	"gee/mysql_connector/command"
    	"gee/mysql_connector/event"
    	"gee/mysql_connector/pack"
    	_ "github.com/flike/kingshard/backend"
    	"github.com/flike/kingshard/mysql"
    	"log"
    	"net"
    	"reflect"
    	"strconv"
    	"time"
    )
    
    type ConnectionConfig struct {
    	//ip地址
    	Addr string
    	//用户名
    	Username string
    	//密码
    	Password string
    	//数据库
    	Schema string
    }
    
    // Connector 连接实体
    type Connector struct {
    	//配置信息
    	connectionConfig ConnectionConfig
    	//获取到mysql连接数据流
    	pkg *mysql.PacketIO
    	//初始握手包
    	initialHandshake pack.InitialHandshake
    	//binlog 文件的名称
    	binlogName string
    	//binlog 文件的事件位置
    	binlogPosition int
    	//是否连接成功
    	isSuccess bool
    	//会为每一个事件写入一个校检值,用于检查每个事件的完整性
    	checkSumType string
    	//服务端id
    	serverId int
    }
    
    // Execute 执行对应的sql
    func (c *Connector) Execute(sql string) {
    
    }
    
    func (c *Connector) showMasterStatus() {
    	queryCommand := command.QueryCommand{
    		Sql: "show master status",
    	}
    	c.WriteCommand(queryCommand)
    	resultSet := c.readResultSet()
    	if len(resultSet) <= 0 {
    		panic(errors.New("Failed to determine binlog filename/position"))
    	}
    	c.binlogName = resultSet[0].GetValue(0)
    	c.binlogPosition, _ = strconv.Atoi(resultSet[0].GetValue(1))
    	fmt.Printf("读取到Binlog文件名称:%s, Position位置:%d\n", c.binlogName, c.binlogPosition)
    }
    
    // binlogCheckSum 获取binglog_checksum属性
    func (c *Connector) binlogCheckSum() {
    	//默认采用 crc32 防止写入校检符 占4个字节
    	queryCommand := command.QueryCommand{
    		Sql: "show global variables like 'binlog_checksum'",
    	}
    	c.WriteCommand(queryCommand)
    	resultSet := c.readResultSet()
    	if len(resultSet) != 0 {
    		c.checkSumType = resultSet[0].GetValue(1)
    		if len(c.checkSumType) > 0 {
    			queryCommand = command.QueryCommand{
    				Sql: "set @master_binlog_checksum= @@global.binlog_checksum",
    			}
    			c.readResultSet()
    		}
    	}
    }
    
    // 获取到服务端id
    func (c *Connector) getMasterServerId() {
    	queryCommand := command.QueryCommand{
    		Sql: "select @@server_id",
    	}
    	c.WriteCommand(queryCommand)
    	resultSet := c.readResultSet()
    	if len(resultSet) >= 0 {
    		c.serverId, _ = strconv.Atoi(resultSet[0].GetValue(0))
    	}
    }
    
    // 发起binlog的dump请求
    func (c *Connector) requestBinaryLogStream() {
    	dumpBinaryLogCommand := command.DumpBinaryLogCommand{
    		ServerId:       c.serverId,
    		BinlogPosition: c.binlogPosition,
    		BinlogFilename: c.binlogName,
    	}
    	c.WriteCommand(dumpBinaryLogCommand)
    	for {
    		packet, _ := c.pkg.ReadPacket()
    		c.getReader()
    		c.checkError(packet)
    		e := new(event.Head)
    		e.Read(packet)
    		//读取了头部数据之后根据事件类型读取对应的数据
    		fmt.Printf("事件类型:%d\n事件时间:%d\n事件长度:%d\n", e.EventId, e.Timestamp, e.EventLength)
    	}
    }
    
    func (c *Connector) getReader() {
    	value := reflect.TypeOf(c.pkg)
    	if value.Kind() == reflect.Pointer {
    		v := value.Elem().Field(0)
    		fmt.Println(v)
    	}
    }
    
    func (c *Connector) readResultSet() []command.ResultSet {
    	//先读取第一个包是否是错误包
    	if data, err := c.pkg.ReadPacket(); err == nil {
    		c.checkError(data)
    	}
    	//循环读取数据包
    	var rss []command.ResultSet
    	//当读取到 E0F包时,后面的就是数据
    	for {
    		data, _ := c.pkg.ReadPacket()
    		if data[0] == 0xFE {
    			break
    		}
    	}
    	//跳过0xFE的包
    	for {
    		data, _ := c.pkg.ReadPacket()
    		if data[0] == 0xFE {
    			break
    		}
    		c.checkError(data)
    		rs := new(command.ResultSet)
    		rs.ReadSet(data)
    		rss = append(rss, *rs)
    	}
    	return rss
    }
    
    // checkError 检查错误信息
    func (c *Connector) checkError(data []byte) {
    	if data[0] == 0xFF {
    		errorPacket := make([]byte, len(data)-1)
    		copy(errorPacket[:], data[1:])
    		e := new(command.ErrorPacket)
    		e.Read(errorPacket)
    		panic(errors.New(e.ToString()))
    	}
    }
    
    func (c *Connector) WriteCommand(command command.Command) {
    	bytes := command.ToByteArray()
    	if c.isSuccess {
    		c.pkg.Sequence = 0
    	}
    	if err := c.pkg.WritePacket(bytes); err != nil {
    		panic(err)
    	}
    }
    
    // readInitPacket 读取初始数据包
    func (c *Connector) readInitPacket() {
    	data, err := c.pkg.ReadPacket()
    	if err != nil {
    		panic(err)
    	}
    	//读取初始包
    	initialHandPacket := pack.InitialHandshake{}
    	initialHandPacket.ReadInitialHandshake(data)
    	c.initialHandshake = initialHandPacket
    }
    
    // upgradeTls 升级为tls握手
    func (c *Connector) upgradeTls(conn net.Conn) {
    	//构建升级tls包
    	upgradeTls := new(command.UpgradeTls)
    	upgradeTls.InitPack = c.initialHandshake
    	bytes := upgradeTls.ToByteArray()
    	_ = c.pkg.WritePacket(bytes)
    	//进行tls握手,InsecureSkipVerify 跳过证书的验证
    	client := tls.Client(conn, &tls.Config{
    		InsecureSkipVerify: true,
    	})
    	if err := client.Handshake(); err != nil {
    		log.Fatalln(err.Error())
    		return
    	}
    	//握手成功之后,将IO进行替换
    	sequence := c.pkg.Sequence
    	c.pkg = mysql.NewPacketIO(client)
    	c.pkg.Sequence = sequence
    	//获取到验证插件进行
    	authenticator := auth.Authenticator{
    		Username:    c.connectionConfig.Username,
    		Password:    c.connectionConfig.Password,
    		Schema:      c.connectionConfig.Schema,
    		GreetPacket: c.initialHandshake,
    		Pkg:         c.pkg,
    	}
    	c.isSuccess = authenticator.Authenticate()
    }
    
    // NewConnection 创建一个tcp的连接
    func NewConnection(connectionConfig ConnectionConfig) (*Connector, error) {
    	connector := new(Connector)
    	connector.connectionConfig = connectionConfig
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
    	defer cancel()
    	dialer := &net.Dialer{}
    	conn, err := dialer.DialContext(ctx, "tcp", connectionConfig.Addr)
    	if err != nil {
    		log.Fatalln("连接Mysql服务器失败:" + err.Error())
    		return nil, err
    	}
    	//转换为tcp类型
    	tcpConn := conn.(*net.TCPConn)
    	//设置不需要延时
    	tcpConn.SetNoDelay(false)
    	//设置保持长连接
    	tcpConn.SetKeepAlive(true)
    	//将tcp连接包装成 PacketIO实体
    	connector.pkg = mysql.NewPacketIO(tcpConn)
    	//读取初始实体
    	connector.readInitPacket()
    	//升级tls连接
    	connector.upgradeTls(conn)
    	//读取binlog文件的名称
    	connector.showMasterStatus()
    	//设置服务id
    	connector.getMasterServerId()
    	//发起日志
    	connector.requestBinaryLogStream()
    	return connector, nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253

    3.2 command

    命令包,主要用于封装发送的协议包以及读取错误和结果包

    • Command:定义的接口
    • InitialHandshake:初始握手包
    • UpgradeTls:发送请求升级为tls
    • AuthenticateSecurityPasswordCommand:对密码做完加密之后发送mysql
    • QueryCommand:封装查询sql的实体
    • ResultSet:读取查询完之后返回的数据
    • ErrorPacket:错误包信息
    • DumpBinaryLogCommand:发起binlog请求

    Command

    type Command interface {
    
    	//ToByteArray 将数据转换为字节数组
    	ToByteArray() []byte
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    InitialHandshake

    type InitialHandshake struct {
    	//协议版本号
    	ProtocolVersion int
    	//服务版本号
    	ServerVersion string
    	//连接id
    	ConnectionId uint32
    	//加密字符串
    	Salt string
    	//服务端能力标识
    	ServerCapability uint32
    	//采用的字符集
    	CharacterSet byte
    	//状态标识符
    	StatusFlags uint16
    	//插件名称
    	AuthPluginName string
    }
    
    // ReadInitialHandshake 读取初始包
    func (s *InitialHandshake) ReadInitialHandshake(data []byte) {
    	buffer := bytes.NewBuffer(data)
    	//判断数据包的类型是否是错误数据包
    	protocolVersion, _ := buffer.ReadByte()
    	if protocolVersion == mysql.ERR_HEADER {
    		panic(errors.New("read initial handshake error"))
    	}
    
    	//判断mysql服务端的版本号
    	if protocolVersion < mysql.MinProtocolVersion {
    		panic(fmt.Errorf("invalid protocol version %d, must >= 10", data[0]))
    	}
    	s.ProtocolVersion = int(protocolVersion)
    	//读取服务版本号
    	s.ServerVersion, _ = buffer.ReadString(0x00)
    	//读取连接id
    	s.ConnectionId = binary.LittleEndian.Uint32(buffer.Next(4))
    	//读取加密字符串
    	s.Salt = string(buffer.Next(8))
    	buffer.Next(1)
    	//读取低位
    	s.ServerCapability = uint32(binary.LittleEndian.Uint16(buffer.Next(2)))
    	characterSetByte, _ := buffer.ReadByte()
    	//读取字符集
    	s.CharacterSet = characterSetByte
    	//读取状态标识
    	s.StatusFlags = binary.LittleEndian.Uint16(buffer.Next(2))
    	//读取高位
    	s.ServerCapability = uint32(binary.LittleEndian.Uint16(buffer.Next(2)))<<16 | s.ServerCapability
    
    	//根据客户端能力是否支持对应的能力
    	if s.ServerCapability&mysql.CLIENT_PLUGIN_AUTH != 0 {
    		pluginLength, _ := buffer.ReadByte()
    		//跳过10字节的填充位
    		buffer.Next(10)
    		if s.ServerCapability&mysql.CLIENT_SECURE_CONNECTION != 0 {
    			s.Salt += string(buffer.Next(int(math.Max(float64(13), float64(pluginLength-8))) - 1))
    			buffer.Next(1)
    		}
    		s.AuthPluginName = string(buffer.Next(buffer.Len() - 1))
    		buffer.Next(1)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    UpgradeTls

    type UpgradeTls struct {
    	//客户端能力标识
    	CapabilityFlags uint32
    	//初始包
    	InitPack pack.InitialHandshake
    }
    
    func (u UpgradeTls) ToByteArray() []byte {
    	//升级ssl请求数据的长度为32
    	data := make([]byte, 36)
    	// 设置当前客户端的能力标识符
    	capability := mysql.CLIENT_PROTOCOL_41 |
    		mysql.CLIENT_SECURE_CONNECTION |
    		mysql.CLIENT_LONG_FLAG |
    		mysql.CLIENT_SSL |
    		mysql.CLIENT_PLUGIN_AUTH
    
    	u.CapabilityFlags = capability
    
    	data[4] = byte(u.CapabilityFlags)
    	data[5] = byte(u.CapabilityFlags >> 8)
    	data[6] = byte(u.CapabilityFlags >> 16)
    	data[7] = byte(u.CapabilityFlags >> 24)
    	//MaxPacket 直接默认为0
    
    	//设置字符集
    	data[12] = u.InitPack.CharacterSet
    
    	return data
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    AuthenticateSecurityPasswordCommand

    type AuthenticateSecurityPasswordCommand struct {
    	//用户名
    	Username string
    	//密码
    	Password string
    	//数据库名称
    	Schema string
    	//加密字符串
    	Salt string
    	//采用的字符集
    	Collation uint8
    	//插件名称
    	PluginName string
    }
    
    func (a AuthenticateSecurityPasswordCommand) ToByteArray() []byte {
    	length := 4
    	//能力标识符
    	capability := mysql.CLIENT_PROTOCOL_41 |
    		mysql.CLIENT_SECURE_CONNECTION |
    		mysql.CLIENT_LONG_PASSWORD |
    		mysql.CLIENT_TRANSACTIONS |
    		mysql.CLIENT_LONG_FLAG |
    		mysql.CLIENT_SSL |
    		mysql.CLIENT_PLUGIN_AUTH
    	//长度:能力标识符、最大包长度、字符集、23个0的填充位
    	length += 4 + 4 + 1 + 23
    
    	//用户名的长度 0 结尾
    	length += len(a.Username) + 1
    	//计算出的密码
    	calcPassword := mysql.CalcPassword([]byte(a.Salt), []byte(a.Password))
    	//加上密码的长度
    	length += 1 + len(calcPassword)
    
    	//判断是否有指定的数据库
    	if len(a.Schema) > 0 {
    		capability |= mysql.CLIENT_CONNECT_WITH_DB
    		length += len(a.Schema) + 1
    	}
    	length += len(common.MYSQL_NATIVE) + 1
    	data := make([]byte, length)
    	data[4] = byte(capability)
    	data[5] = byte(capability >> 8)
    	data[6] = byte(capability >> 16)
    	data[7] = byte(capability >> 24)
    	//跳过最大包长度 maxSize
    	pos := 4 + 4 + 4 + 1 + 23
    	//直接写入字符集
    	data[12] = a.Collation
    	//写入用户名
    	pos += copy(data[pos:], a.Username)
    	// 0x00 分割
    	pos++
    	data[pos] = byte(len(calcPassword))
    	pos++
    	//写入密码
    	pos += copy(data[pos:], calcPassword)
    	if len(a.Schema) > 0 {
    		pos += copy(data[pos:], a.Schema)
    		pos++
    	}
    	pos += copy(data[pos:], common.MYSQL_NATIVE)
    	return data
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    QueryCommand

    type QueryCommand struct {
    	//查询sql
    	Sql string
    }
    
    func (q QueryCommand) ToByteArray() []byte {
    	b := new(bytes.Buffer)
    	//保留头出来,让后面写的时候可以写入头包
    	b.Write([]byte{0, 0, 0, 0})
    	//写入协议号
    	b.WriteByte(3)
    	//写入sql
    	b.WriteString(q.Sql)
    	//返回数组
    	return b.Bytes()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    ResultSet

    type ResultSet struct {
    	values []string
    }
    
    func (r *ResultSet) ReadSet(data []byte) {
    	buffer := bytes.NewBuffer(data)
    	var values []string
    	for {
    		if buffer.Len() <= 0 {
    			break
    		}
    		lengthByte, _ := buffer.ReadByte()
    		//读取字段的长度
    		length := int(lengthByte)
    		value := string(buffer.Next(length))
    		values = append(values, value)
    	}
    	r.values = values
    }
    
    func (r ResultSet) GetValue(index int) string {
    	if index < 0 || len(r.values) < 0 {
    		panic(errors.New("索引索引必须大于或等于0"))
    	}
    	return r.values[index]
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    DumpBinaryLogCommand

    type DumpBinaryLogCommand struct {
    	ServerId       int
    	BinlogFilename string
    	BinlogPosition int
    }
    
    func (d DumpBinaryLogCommand) ToByteArray() []byte {
    	buffer := bytes.NewBuffer([]byte{})
    	buffer.Write([]byte{0, 0, 0, 0})
    	buffer.WriteByte(18)
    	//写入binlog的位置
    	buffer.Write([]byte{
    		byte(d.BinlogPosition),
    		byte((d.BinlogPosition >> 8) & 0x000000FF),
    		byte((d.BinlogPosition >> 16) & 0x000000FF),
    		byte((d.BinlogPosition >> 24) & 0x000000FF)})
    	buffer.Write([]byte{0, 0})
    	//写入服务id
    	buffer.Write([]byte{
    		byte(d.ServerId),
    		byte((d.ServerId >> 8) & 0x000000FF),
    		byte((d.ServerId >> 16) & 0x000000FF),
    		byte((d.ServerId >> 24) & 0x000000FF)})
    	//写入binlog名称
    	buffer.WriteString(d.BinlogFilename)
    	return buffer.Bytes()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    ErrorPacket

    type ErrorPacket struct {
    	code      int
    	sqlStatus string
    	sqlFlag   string
    	errorMsg  string
    }
    
    func (e *ErrorPacket) Read(data []byte) {
    	buffer := bytes.NewBuffer(data)
    	codeByte, _ := buffer.ReadByte()
    	e.code = int(codeByte)
    	sqlStatusByte, _ := buffer.ReadByte()
    	e.sqlStatus = string(sqlStatusByte)
    	sqlFlagByte := buffer.Next(5)
    	e.sqlFlag = string(sqlFlagByte)
    	buffer.Next(1)
    	e.errorMsg = string(buffer.Next(buffer.Len()))
    }
    
    func (e *ErrorPacket) ToString() string {
    	return fmt.Sprintf("错误信息:%s\n", e.errorMsg)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3.3 Authenticator

    根据mysql中密码的加密插件进行对应的插件选择

    type Authenticator struct {
    	//用户名
    	Username string
    	//密码
    	Password string
    	//数据库名称
    	Schema string
    	//初始化包
    	GreetPacket pack.InitialHandshake
    	//管道数据流
    	Pkg *mysql.PacketIO
    }
    
    // Authenticate 根据对应的插件名称创建对应插件验证器
    func (a *Authenticator) Authenticate() bool {
    	pluginName := strings.TrimSpace(a.GreetPacket.AuthPluginName)
    	var c command.Command
    	if pluginName == common.MYSQL_NATIVE {
    		c = &command.AuthenticateSecurityPasswordCommand{
    			Username:   a.Username,
    			Password:   a.Password,
    			Schema:     a.Schema,
    			Salt:       a.GreetPacket.Salt,
    			PluginName: a.GreetPacket.AuthPluginName,
    			Collation:  a.GreetPacket.CharacterSet,
    		}
    	} else {
    
    	}
    	//获取到字节数组信息
    	bytes := c.ToByteArray()
    	//写入流中
    	_ = a.Pkg.WritePacket(bytes)
    	//读取对应的数据包
    	result, _ := a.Pkg.ReadPacket()
    	switch int(result[0]) {
    	case -2:
    	case -1:
    		log.Fatalln("login failed")
    		return false
    	case 0:
    		log.Printf("login success..........")
    		return true
    	}
    	return false
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    3.4 common

    const (
    	SHA2_PASSWORD string = "caching_sha2_password"
    
    	MYSQL_NATIVE string = "mysql_native_password"
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    3.5 事件

    对于 binlog 的事件解析,这里只解析了头部,需要自己解析详细事件的大佬可以看官网的协议说明

    type Head struct {
    	//时间戳
    	Timestamp int32
    
    	//事件id
    	EventId byte
    
    	//服务id
    	ServerId int32
    
    	//事件长度
    	EventLength int32
    
    	//下一个事件在binlog文件中的位置
    	NextEventPosition int32
    
    	//标识符
    	Flag int16
    }
    
    func (h *Head) Read(data []byte) {
    	buffer := bytes.NewBuffer(data)
    	//将包首付包类型读跳过
    	buffer.Next(1)
    	timestampBytes := buffer.Next(4)
    	//读取时间戳
    	h.Timestamp |= int32(timestampBytes[0]) | int32(timestampBytes[1])<<8 | int32(timestampBytes[2])<<16 | int32(timestampBytes[3]<<24)
    	h.EventId, _ = buffer.ReadByte()
    
    	serverIdBytes := buffer.Next(4)
    	h.ServerId |= int32(serverIdBytes[0]) | int32(serverIdBytes[1])<<8 | int32(serverIdBytes[2])<<16 | int32(serverIdBytes[3]<<24)
    
    	eventLengthBytes := buffer.Next(4)
    	h.EventLength |= int32(eventLengthBytes[0]) | int32(eventLengthBytes[1])<<8 | int32(eventLengthBytes[2])<<16 | int32(eventLengthBytes[3]<<24)
    
    	nextEventPositionBytes := buffer.Next(4)
    	h.NextEventPosition |= int32(nextEventPositionBytes[0]) | int32(nextEventPositionBytes[1])<<8 | int32(nextEventPositionBytes[2])<<16 | int32(nextEventPositionBytes[3]<<24)
    
    	flagBytes := buffer.Next(2)
    	h.Flag |= int16(flagBytes[0]) | int16(flagBytes[1]<<8)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
  • 相关阅读:
    OSI和TCP的握手/挥手
    豆瓣电影信息爬虫实战-2024年6月
    写给小白 : 如何学习编程?
    Skywalking流程分析_8(拦截器插件的加载)
    JS手写深拷贝
    文件管理技巧:如何利用文件名关键字进行整理
    基于变因子加权学习与邻代维度交叉策略的改进乌鸦算法求解单目标优化问题含Matlab代码
    如何分析粉丝兴趣?
    Windows技巧
    中学教学参考杂志中学教学参考编辑部中学教学参考杂志社2022年第18期目录
  • 原文地址:https://blog.csdn.net/weixin_43915643/article/details/126505640