📕作者简介: 过去日记,致力于Java、GoLang,Rust等多种编程语言,热爱技术,喜欢游戏的博主。
📗本文收录于Ainx系列,大家有兴趣的可以看一看
📘相关专栏Rust初阶教程、go语言基础系列、spring教程等,大家有兴趣的可以看一看
📙Java并发编程系列,设计模式系列、go web开发框架 系列正在发展中,喜欢Java,GoLang,Rust,的朋友们可以关注一下哦!
📙 本文大部分都是借鉴刘丹冰大佬的zinx框架和文章,更推荐大家去读大佬的原文,本文只是个人学习的记录
现在我们就给用户提供一个自定义的conn处理业务的接口吧,很显然,我们不能把业务处理的方法绑死在type HandFunc func(*net.TCPConn, []byte, int) error
这种格式中,我们需要定一些interface{}来让用户填写任意格式的连接处理业务方法。
那么,很显然func是满足不了我们需求的,我们需要再做几个抽象的接口类。
我们现在需要把客户端请求的连接信息 和 请求的数据,放在一个叫Request的请求类里,这样的好处是我们可以从Request里得到全部客户端的请求信息,也为我们之后拓展框架有一定的作用,一旦客户端有额外的含义的数据信息,都可以放在这个Request里。可以理解为每次客户端的全部请求数据,Zinx都会把它们一起放到一个Request结构体里。
在ainterface下创建新文件irequest.go。
ainx/ainterface/irequest.go
package ainterface
/*
IRequest 接口
实际是把客户端请求链接信息和请求数据包放在Request里
*/
type IRequest interface {
GetConnection() IConnection //获取请求链接信息
GetData() []byte //获取请求消息的数据
GetMsgID() uint32 //获取消息ID
}
不难看出,当前的抽象层只提供了两个Getter方法,所以有个成员应该是必须的,一个是客户端连接,一个是客户端传递进来的数据,当然随着Zinx框架的功能丰富,这里面还应该继续添加新的成员。
在anet下创建IRequest抽象接口的一个实例类文件request.go
ainx/anet/request.go
package anet
import "ainx/ainterface"
type Request struct {
conn ainterface.IConnection //已经和客户端建立好的链接
date []byte //客户端请求数据
}
// 获取请求链接信息
func (r *Request) GetConnection() ainterface.IConnection {
return r.conn
}
// 获取请求消息的数据
func (r *Request) GetData() []byte {
return r.msg.GetData()
}
// 获取请求的消息的ID
func (r *Request) GetMsgID() uint32 {
return r.msg.GetMsgId()
}
现在我们来给Zinx实现一个非常简单基础的路由功能,目的当然就是为了快速的让Zinx步入到路由的阶段。后续我们会不断的完善路由功能。
在ainterface下创建irouter.go文件
ainx/ainterface/irouter.go
package ainterface
/*
路由接口,这里路由是 使用框架框架者给该链接自定的 处理业务方法
路由里的IRequest 则包含用该链接的链接信息和该链接的请求数据信息
*/
type IRouter interface {
PreHandle(request IRequest) //在处理conn业务之前的钩子方法
Handle(request IRequest) //处理conn业务的方法
PostHandle(request IRequest) //处理conn业务之后的钩子方法
}
我们知道router实际上的作用就是,服务端应用可以给Zinx框架配置当前链接的处理业务方法,之前的Zinx-V0.2我们的Zinx框架处理链接请求的方法是固定的,现在是可以自定义,并且有3种接口可以重写。
Handle:是处理当前链接的主业务函数
PreHandle:如果需要在主业务函数之前有前置业务,可以重写这个方法
PostHandle:如果需要在主业务函数之后又后置业务,可以重写这个方法
当然每个方法都有一个唯一的形参IRequest对象,也就是客户端请求过来的连接和请求数据,作为我们业务方法的输入数据。
在anet下创建router.go文件
package anet
import (
"ainx/ainterface"
)
// 实现router时,先嵌入这个基类,然后根据需要对这个基类的方法进行重写
type BaseRouter struct{}
// 这里之所以BaseRouter的方法都为空,
// 是因为有的Router不希望有PreHandle或PostHandle
// 所以Router全部继承BaseRouter的好处是,不需要实现PreHandle和PostHandle也可以实例化
func (br *BaseRouter) PreHandle(req ainterface.IRequest) {}
func (br *BaseRouter) Handle(req ainterface.IRequest) {}
func (br *BaseRouter) PostHandle(req ainterface.IRequest) {}
我们当前的ainx目录结构应该如下:
├─ainterface
│ iconnection.go
│ irequest.go
│ irouter.go
│ iserver.go
│
└─anet
connection.go
request.go
router.go
server.go
server_test.go
我们需要给IServer类,增加一个抽象方法AddRouter,目的也是让Ainx框架使用者,可以自定一个Router处理业务方法。
ainx/ainterface/irouter.go
// 定义服务器接口
type IServer interface {
//启动服务器方法
Start()
//停止服务器方法
Stop()
//开启业务服务方法
Serve()
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
AddRouter(router IRouter)
// todo 路由分组 未来目标 添加类似hertz Group分组
}
有了抽象的方法,自然Server就要实现,并且还要添加一个Router成员.
ainx/anet/server.go
type Server struct {
// 设置服务器名称
Name string
// 设置网络协议版本
IPVersion string
// 设置服务器绑定IP
IP string
// 设置端口号
Port string
//当前Server由用户绑定的回调router,也就是Server注册的链接对应的处理业务
Router ainterface.IRouter
//todo 未来目标提供更多option字段来控制server实例化
}
然后NewServer()方法, 初始化Server对象的方法也要加一个初始化成员
/*
创建一个服务器句柄
*/
func NewServer(name string) ainterface.IServer {
s := &Server{
Name: name,
IPVersion: "tcp4",
IP: "0.0.0.0",
Port: "8080",
Router: nil,
}
return s
}
ainx/anet/connection.go
type Connection struct {
//当前链接的socket TCP套接字
Conn *net.TCPConn
// 当前链接的ID也可以称作SessionID,ID全局唯一
ConnID uint32
// 当前链接的关闭状态
isClosed bool
//该连接的处理方法router
Router ainterface.IRouter
// 告知该链接已经退出/停止的channel
ExitBuffChan chan bool
}
ainx/anet/connection.go
// 处理conn读数据的Goroutine
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
//读取我们最大的数据到buf中
buf := make([]byte, 512)
_, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err ", err)
c.ExitBuffChan <- true
continue
}
//得到当前客户端请求的Request数据
req := Request{
conn: c,
data: buf,
}
//从路由Routers 中找到注册绑定Conn的对应Handle
go func(request ainterface.IRequest) {
//执行注册的路由方法
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
}
}
这里我们在conn读取完客户端数据之后,将数据和conn封装到一个Request中,作为Router的输入数据。
然后我们开启一个goroutine去调用给Zinx框架注册好的路由业务。
ainx/anet/server.go
package anet
import (
"ainx/ainterface"
"errors"
"fmt"
"net"
"time"
)
type Server struct {
// 设置服务器名称
Name string
// 设置网络协议版本
IPVersion string
// 设置服务器绑定IP
IP string
// 设置端口号
Port string
//当前Server由用户绑定的回调router,也就是Server注册的链接对应的处理业务
Router ainterface.IRouter
//todo 未来目标提供更多option字段来控制server实例化
}
// ============== 定义当前客户端链接的handle api ===========
func CallBackToClient(conn *net.TCPConn, data []byte, cnt int) error {
//回显业务
fmt.Println("[Conn Handle] CallBackToClient ...")
if _, err := conn.Write(data[:cnt]); err != nil {
fmt.Println("write back buf err", err)
return errors.New("CallBackToClient error")
}
return nil
}
// ============== 实现 ainterface.IServer 里的全部接口方法 ========
// 开启网络服务
func (s *Server) Start() {
fmt.Printf("[START] Server listenner at IP: %s, Port %s, is starting\n", s.IP, s.Port)
// 开启一个go去做服务端的Listener业务
// todo 未来目标是提供更多协议,可以利用if或者switch对IPVersion进行判断而选择采取哪种协议,下面整个方法要重写
go func() {
//1 获取一个TCP的Addr
addr, err := net.ResolveTCPAddr(s.IPVersion, s.IP+":"+s.Port)
if err != nil {
fmt.Println("resolve tcp addr err: ", err)
return
}
// 2 监听服务器地址
listener, err := net.ListenTCP(s.IPVersion, addr)
if err != nil {
fmt.Println("listen", s.IPVersion, "err", err)
return
}
// 已经成功监听
fmt.Println("start Ainx server ", s.Name, " success, now listenning...")
//TODO server.go 应该有一个自动生成ID的方法
var cid uint32
cid = 0
//3 启动server网络连接业务
for {
//3.1 阻塞等待客户端建立连接请求
conn, err := listener.AcceptTCP()
if err != nil {
fmt.Println("Accept err ", err)
continue
}
//3.2 TODO Server.Start() 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
//3.3 处理该新连接请求的 业务 方法, 此时应该有 handler 和 conn是绑定的
dealConn := NewConnection(conn, cid, s.Router)
cid++
//3.4 启动当前链接的处理业务
go dealConn.Start()
}
}()
}
func (s *Server) Stop() {
fmt.Println("[STOP] Zinx server , name ", s.Name)
//TODO Server.Stop() 将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
}
func (s *Server) Serve() {
s.Start()
//TODO Server.Serve() 是否在启动服务的时候 还要处理其他的事情呢 可以在这里添加
//阻塞,否则主Go退出, listenner的go将会退出
for {
time.Sleep(10 * time.Second)
}
}
func (s *Server) AddRouter(router ainterface.IRouter) {
s.Router = router
fmt.Println("Add Router succ! ")
}
/*
创建一个服务器句柄
*/
func NewServer(name string) ainterface.IServer {
s := &Server{
Name: name,
IPVersion: "tcp4",
IP: "0.0.0.0",
Port: "8080",
Router: nil,
}
return s
}
ainx/anet/conneciont.go
package anet
import (
"ainx/ainterface"
"fmt"
"net"
)
type Connection struct {
//当前链接的socket TCP套接字
Conn *net.TCPConn
// 当前链接的ID也可以称作SessionID,ID全局唯一
ConnID uint32
// 当前链接的关闭状态
isClosed bool
//该连接的处理方法router
Router ainterface.IRouter
// 告知该链接已经退出/停止的channel
ExitBuffChan chan bool
}
func (c *Connection) GetConnection() net.Conn {
return c.Conn
}
// 创建链接的方法
func NewConnection(conn *net.TCPConn, connID uint32, router ainterface.IRouter) *Connection {
c := &Connection{
Conn: conn,
ConnID: connID,
isClosed: false,
Router: router,
ExitBuffChan: make(chan bool, 1),
}
return c
}
// 处理conn读数据的Goroutine
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), " conn reader exit!")
defer c.Stop()
for {
//读取我们最大的数据到buf中
buf := make([]byte, 512)
_, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err ", err)
c.ExitBuffChan <- true
continue
}
//得到当前客户端请求的Request数据
req := Request{
conn: c,
data: buf,
}
//从路由Routers 中找到注册绑定Conn的对应Handle
go func(request ainterface.IRequest) {
//执行注册的路由方法
c.Router.PreHandle(request)
c.Router.Handle(request)
c.Router.PostHandle(request)
}(&req)
}
}
// 启动连接,让当前链接工作
func (c *Connection) Start() {
// 开启处理该链接读取到客户端数据之后的请求业务
go c.StartReader()
for {
select {
case <-c.ExitBuffChan:
// 得到退出消息,不再阻塞
return
}
}
}
// 停止链接,结束当前链接状态M
func (c *Connection) Stop() {
//1.如果当前链接关闭
if c.isClosed == true {
return
}
c.isClosed = true
//TODO Connection Stop() 如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
// 关闭socket链接
err := c.Conn.Close()
if err != nil {
return
}
//通知从缓冲队列读数据的业务,该链接已经关闭
c.ExitBuffChan <- true
//关闭该链接全部管道
close(c.ExitBuffChan)
}
// 从当前链接获取原始的socket TCPConn
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
// 获取当前链接ID
func (c *Connection) GetConnID() uint32 {
return c.ConnID
}
// 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
接下来我们在基于Ainx写服务器,就可以配置一个简单的路由功能了。
Server.go
package main
import (
"fmt"
"net"
"time"
)
/*
模拟客户端
*/
func main() {
fmt.Println("Client Test ... start")
//3秒之后发起测试请求,给服务端开启服务的机会
time.Sleep(3 * time.Second)
conn, err := net.Dial("tcp", "127.0.0.1:8080")
if err != nil {
fmt.Println("client start err, exit!")
return
}
for {
_, err := conn.Write([]byte("Ainx V0.3"))
if err != nil {
fmt.Println("write error err ", err)
return
}
buf := make([]byte, 512)
cnt, err := conn.Read(buf)
if err != nil {
fmt.Println("read buf error ")
return
}
fmt.Printf(" server call back : %s, cnt = %d\n", buf, cnt)
time.Sleep(1 * time.Second)
}
}
Client.go
package main
import (
"ainx/ainterface"
"ainx/anet"
"fmt"
)
// ping test 自定义路由
type PingRouter struct {
anet.BaseRouter //一定要先基础BaseRouter
}
// Test PreHandle
func (this *PingRouter) PreHandle(request ainterface.IRequest) {
fmt.Println("Call Router PreHandle")
_, err := request.GetConnection().GetConnection().Write([]byte("before ping ....\n"))
if err != nil {
fmt.Println("call back ping ping ping error")
}
}
// Test Handle
func (this *PingRouter) Handle(request ainterface.IRequest) {
fmt.Println("Call PingRouter Handle")
_, err := request.GetConnection().GetConnection().Write([]byte("ping...ping...ping\n"))
if err != nil {
fmt.Println("call back ping ping ping error")
}
}
// Test PostHandle
func (this *PingRouter) PostHandle(request ainterface.IRequest) {
fmt.Println("Call Router PostHandle")
_, err := request.GetConnection().GetConnection().Write([]byte("After ping .....\n"))
if err != nil {
fmt.Println("call back ping ping ping error")
}
}
func main() {
//创建一个server句柄
s := anet.NewServer("[ainx V0.3]")
s.AddRouter(&PingRouter{})
//2 开启服务
s.Serve()
}