utils.go
package utils
import (
"fmt"
"log"
"runtime"
)
func Go(f func()) {
go func() {
defer PrintPanicStack()
f()
}()
}
func PrintPanicStack() {
if x := recover(); x != nil {
fmt.Println(x)
for i := 0; i < 10; i++ {
funcName, file, line, ok := runtime.Caller(i)
if ok {
log.Printf("%d frame :[func:%s,file:%s,line:%d]\n", i, runtime.FuncForPC(funcName).Name(), file, line)
}
}
}
}
deal_msg.go
package msgqueuetest
import (
"errors"
"gotest/msgqueuetest/utils"
"hash/crc64"
"log"
"sync"
"sync/atomic"
"time"
)
func init() {
// 初始化消息处理管理器
dealMsgMgr.InitDealMsgMgr()
}
// 常量定义
const (
MSG_CH_LENGTH = 100000 // 发送队列长度
WORK_CNT = 10 // 工作池大小
MSG_MAX_VALID_TIME = time.Second * 5 // 消息存在的最大有效时长
)
// 运行状态
const (
RUN_STAT_NORMAL = 0 // 正常状态
RUN_STAT_QUIT = 1 // 退出状态
)
// 变量定义
var (
dealMsgMgr dealMsgManager // 处理消息管理器
)
// 错误信息
var (
Err_Param_Invalid = errors.New("参数异常")
Err_Msg_Invalid = errors.New("消息信息异常")
Err_Send_Ch_Full = errors.New("发送队列已满")
Err_Repeat_Send = errors.New("重复发送消息")
Err_App_Quit = errors.New("发送应用已退出")
)
// 需要处理的消息信息
type dealMsgInfo struct {
Data string // 发送数据pb
RecvID string // 接收者节点地址
RecvMid string // 接收者machineID
Key uint64 // 消息key值
AddTime time.Time // 添加时间
}
// 消息处理管理器
type dealMsgManager struct {
chMsgs chan *dealMsgInfo // 待发送的消息队列
mapExistMsg sync.Map // 存放已存在的消息key信息
runState int32 // 运行状态
chQuit chan uint64 // 意外退出的工作协程
}
// 初始化消息处理管理器
func (dmm *dealMsgManager) InitDealMsgMgr() {
dmm.chMsgs = make(chan *dealMsgInfo, MSG_CH_LENGTH)
dmm.chQuit = make(chan uint64, WORK_CNT)
// 开启工作协程池处理任务
for i := 0; i < WORK_CNT; i++ {
utils.Go(dmm.loopSendMsg)
}
utils.Go(dmm.monitorWorks)
}
/*
* 添加消息到消息队列中
*
* @param recvId AddressNet 接收者id
* @param recvMid string 接收者machineID
* @param msg *MessageData 消息pb信息
* @return err error 错误信息
*/
func AddMsgToQueue(recvId, recvMid, msg string) error {
if atomic.LoadInt32(&dealMsgMgr.runState) == RUN_STAT_QUIT {
return Err_App_Quit
}
// 1. 参数检查
// 1.1 检查消息是否有效
if len(msg) == 0 {
return Err_Msg_Invalid
}
// 1.2 检查发送队列是否已满
if len(dealMsgMgr.chMsgs) == MSG_CH_LENGTH {
return Err_Send_Ch_Full
}
// 1.3 检查接收者id是否合法
if len(recvId) == 0 {
return Err_Param_Invalid
}
// 2. 检查消息是否已发送
key := Crc64([]byte(msg))
if _, exist := dealMsgMgr.mapExistMsg.Load(key); exist {
return Err_Repeat_Send
}
// 3. 添加到发送队列中
// 3.1 构建待处理消息结构
var dealMsg = new(dealMsgInfo)
dealMsg.Data = msg
dealMsg.RecvID = recvId
dealMsg.RecvMid = recvMid
dealMsg.Key = key
dealMsg.AddTime = time.Now()
// 3.2 把数据发送到处理队列中
select {
case dealMsgMgr.chMsgs <- dealMsg:
default:
// 添加失败
return Err_Send_Ch_Full
}
// 4. 添加消息发送记录信息
dealMsgMgr.mapExistMsg.Store(key, struct{}{})
return nil
}
// 停止消息发送功能
func StopSendMsg() {
// 添加退出信号
atomic.StoreInt32(&dealMsgMgr.runState, RUN_STAT_QUIT)
}
// 重新启用发送消息管理器
func ReuseSendMsg() {
// 重置发送队列
if atomic.LoadInt32(&dealMsgMgr.runState) == RUN_STAT_NORMAL {
return
}
// 重置发送队列
dealMsgMgr.chMsgs = make(chan *dealMsgInfo, MSG_CH_LENGTH)
// 重置key保存map
dealMsgMgr.mapExistMsg = sync.Map{}
// 更新状态
atomic.StoreInt32(&dealMsgMgr.runState, RUN_STAT_NORMAL)
}
// 异步处理发送队列中的数据
func (dmm *dealMsgManager) loopSendMsg() {
var curKey uint64
defer func() {
select {
case dmm.chQuit <- curKey:
default:
}
log.Println("协程异常退出, 对应的key为", curKey)
}()
for {
if atomic.LoadInt32(&dmm.runState) == RUN_STAT_QUIT {
time.Sleep(time.Second)
continue
}
data := <-dmm.chMsgs
if data == nil {
continue
}
// 删除对应得记录信息
curKey = data.Key
dmm.mapExistMsg.Delete(data.Key)
// 判断消息是否超时处理
if time.Since(data.AddTime) > MSG_MAX_VALID_TIME {
// 超时, 直接丢弃
// log.Printf("[%d]消息超时, 直接被丢弃\n", data.Key)
continue
}
if data.Data == "1000" || data.Data == "2000" || data.Data == "3000" || data.Data == "4000" || data.Data == "5000" || data.Data == "6000" || data.Data == "7000" || data.Data == "8000" || data.Data == "9000" {
panic(data.Data)
}
// 解析消息
// log.Println("发送消息:", data.Data)
time.Sleep(time.Millisecond * 10)
}
}
// 监听工作协程, 当有线程异常退出时, 将再次启动相应协程工作
func (dmm *dealMsgManager) monitorWorks() {
for k := range dmm.chQuit {
dmm.mapExistMsg.Delete(k)
log.Println("启动新得协程, 删除对应key", k)
utils.Go(dmm.loopSendMsg)
}
}
func Crc64(v []byte) uint64 {
return crc64.Checksum(v, crc64.MakeTable(crc64.ECMA))
}
测试用例
deal_msg_test.go
package msgqueuetest
import (
"log"
"strconv"
"testing"
)
func TestMsgValue(t *testing.T) {
for i := 0; i < 10000; i++ {
value := strconv.Itoa(i)
AddMsgToQueue(value, "", value)
}
log.Println("发送消息成功!!!!!")
select {}
}