• 【go】异步任务解决方案Asynq实战


    一.Asynq介绍

    Asynq 是一个 Go 库,一个高效的分布式任务队列。

    Asynq 工作原理:

    • 客户端(生产者)将任务放入队列
    • 服务器(消费者)从队列中拉出任务并为每个任务启动一个工作 goroutine
    • 多个工作人员同时处理任务

    git库:https://github.com/hibiken/asynq

    二.所需工具

    Asynq 使用 Redis 作为消息代理。client 和 server 都需要连接到 Redis 进行写入和读取。

    PS:请确保所使用redis >= 5.0

    三.代码示例

    以记录操作的中间件函数向数据库写数据的情景为例。

    1. 生产者(客户端)函数调用入口:

    其中 map 为需向数据库写入的内容

    client.Call("audit:opera", map[string]any{
    	"uri":        uri,
    	"method":     method,
    	"params":     string(paramsByte),
    	"headers":    string(headerByte),
    	"code":       codeInt,
    	"model":      model,
    	"action":     action,
    	"user_id":    userId,
    	"company_id": companyId,
    	"user_name":  userName,
    	"company":    companyName,
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    1. 生产者函数
    func Call(t string, payload map[string]any) error {
    	// redis连接
    	client := asynq.NewClient(asynq.RedisClientOpt{
    		Addr:     "127.0.0.1:6379",
    		Password: "",
    		DB:       1,
    	})
    	defer client.Close()
    
    	switch t {
    	case "audit:opera":
    	    // 初始化新任务
    		task, err := server.NewOperateSendTask(payload)
    		if err != nil {
    			return err
    		}
    
           // 任务入队
    		_, err = client.Enqueue(task, asynq.Queue("audit"))
    		if err != nil {
    			log.Err(err).Msg(fmt.Sprintf("task: %v\n", task))
    			return err
    		}
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    func NewOperateSendTask(data map[string]any) (*asynq.Task, error) {
    	payload, err := json.Marshal(data)
    	if err != nil {
    		return nil, err
    	}
    	return asynq.NewTask(consts.TypeAuditOpera, payload), nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    1. 消费者函数
    func HandlerAuditOperateTask(ctx context.Context, t *asynq.Task) error {
    	var record ent.OperateRecord
    
    	// 队列中取任务
    	err := json.Unmarshal(t.Payload(), &record)
    	if err != nil {
    		log.Err(err).Msg("task.json.Unmarshal")
    		return err
    	}
    
    	// 真正的数据库操作
    	err = dao.OperateRecord.CreateOperateRecord(&record)
    	if err != nil {
    		log.Err(err).Msg("task.dao.OperateRecord.CreateOperateRecord")
    		return err
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    1. asynq初始化(消费者启动入口,项目初始化时自动启动)
    func InitAsynq(ip string, port int, passwd string) {
    	addr := fmt.Sprintf("%s:%d", ip, port)
    	srv := asynq.NewServer(
    		asynq.RedisClientOpt{
    			Addr:     "127.0.0.1:6379",
    			Password: "",
    			DB:       1,
    		},
    		// 异步队列
    		asynq.Config{
    			Queues: map[string]int{
    				"audit": 3,
    			},
    		},
    	)
    
    	mux := asynq.NewServeMux()
    	// 启动消费者
    	mux.HandleFunc("audit:opera", server.HandlerAuditOperateTask)
    	
    	go srv.Run(mux)
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    四.Reference

    Go异步任务解决方案之Asynq库详解:
    https://www.jb51.net/article/275392.htm

  • 相关阅读:
    猴子吃桃问题
    以智能化为舵手,引领现代计算机系统架构新航向
    【数字IC前端入门】01-数字IC专栏内容概述
    2022牛客多校(三)
    git cherry-pick命令(合并单个或多个提交记录到当前分支)
    数据库查询语法
    爬取医药卫生知识服务系统的药品数据——超详细流程
    Vue3记录
    408王道数据结构强化——算法题
    新手GitHub使用指南
  • 原文地址:https://blog.csdn.net/qq_45859826/article/details/132605367