• 适用于 Golang 的任务调度程序 AGScheduler


    以前一直使用 Python 的任务调度库 APScheduler(支持任务持久化,支持多种存储方式),但由于没有找到与它功能和使用方式类似的 Golang 库,所以模仿 APScheduler 3.x 写了个简易版本的 AGScheduler。

    AGScheduler

    Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持远程调用,支持集群

    特性

    • 支持三种调度类型
      • 一次性执行
      • 间隔执行
      • Cron 式调度
    • 支持多种作业存储方式
    • 支持远程调用
    • 支持集群
      • 远程工作节点
      • 调度器高可用(实验性)

    链接

    https://github.com/kwkwc/agscheduler

    架构

    请添加图片描述

    使用

    package main
    
    import (
    	"context"
    	"fmt"
    	"log/slog"
    	"time"
    
    	"github.com/kwkwc/agscheduler"
    	"github.com/kwkwc/agscheduler/stores"
    )
    
    func printMsg(ctx context.Context, j agscheduler.Job) {
    	slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
    }
    
    func main() {
    	agscheduler.RegisterFuncs(printMsg)
    
    	store := &stores.MemoryStore{}
    	scheduler := &agscheduler.Scheduler{}
    	scheduler.SetStore(store)
    
    	job1 := agscheduler.Job{
    		Name:     "Job1",
    		Type:     agscheduler.TYPE_INTERVAL,
    		Interval: "2s",
    		Timezone: "UTC",
    		Func:     printMsg,
    		Args:     map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
    	}
    	job1, _ = scheduler.AddJob(job1)
    	slog.Info(fmt.Sprintf("%s.\n\n", job1))
    
    	job2 := agscheduler.Job{
    		Name:     "Job2",
    		Type:     agscheduler.TYPE_CRON,
    		CronExpr: "*/1 * * * *",
    		Timezone: "Asia/Shanghai",
    		FuncName: "main.printMsg",
    		Args:     map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
    	}
    	job2, _ = s.AddJob(job2)
    	slog.Info(fmt.Sprintf("%s.\n\n", job2))
    
    	job3 := agscheduler.Job{
    		Name:     "Job3",
    		Type:     agscheduler.TYPE_DATETIME,
    		StartAt:  "2023-09-22 07:30:08",
    		Timezone: "America/New_York",
    		Func:     printMsg,
    		Args:     map[string]any{"arg8": "8", "arg9": "9"},
    	}
    	job3, _ = s.AddJob(job3)
    	slog.Info(fmt.Sprintf("%s.\n\n", job3))
    
    	jobs, _ := s.GetAllJobs()
    	slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))
    
    	scheduler.Start()
    
    	select {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    注册函数

    由于 golang 无法序列化函数,所以 scheduler.Start() 之前需要使用 RegisterFuncs 注册函数

    gRPC

    // Server
    grservice := services.GRPCService{
    	Scheduler: scheduler,
    	Address:   "127.0.0.1:36360",
    }
    grservice.Start()
    
    // Client
    conn, _ := grpc.Dial("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
    client := pb.NewSchedulerClient(conn)
    client.AddJob(ctx, job)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    HTTP

    // Server
    hservice := services.HTTPService{
    	Scheduler: scheduler,
    	Address:   "127.0.0.1:36370",
    }
    hservice.Start()
    
    // Client
    mJob := map[string]any{...}
    bJob, _ := json.Marshal(mJob)
    resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Cluster

    // Main Node
    cnMain := &agscheduler.ClusterNode{
    	Endpoint:     "127.0.0.1:36380",
    	EndpointGRPC: "127.0.0.1:36360",
    	EndpointHTTP: "127.0.0.1:36370",
    	Queue:        "default",
    }
    schedulerMain.SetStore(storeMain)
    schedulerMain.SetClusterNode(ctx, cnMain)
    cserviceMain := &services.ClusterService{Cn: cnMain}
    cserviceMain.Start()
    
    // Worker Node
    cnNode := &agscheduler.ClusterNode{
    	EndpointMain: "127.0.0.1:36380",
    	Endpoint:     "127.0.0.1:36381",
    	EndpointGRPC: "127.0.0.1:36361",
    	EndpointHTTP: "127.0.0.1:36371",
    	Queue:        "worker",
    }
    schedulerNode.SetStore(storeNode)
    schedulerNode.SetClusterNode(ctx, cnNode)
    cserviceNode := &services.ClusterService{Cn: cnNode}
    cserviceNode.Start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    Cluster HA (高可用,实验性)

    
    // HA 需要满足以下条件:
    //
    // 1. 集群中 HA 节点的数量必须为奇数
    // 2. 所有 HA 节点都需要连接到同一个存储(不包含 MemoryStore)
    // 3. ClusterNode 的 Mode 属性需要设置为 `HA`
    // 4. HA 主节点必须先启动
    
    // Main HA Node
    cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}
    
    // HA Node
    cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
    cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}
    
    // Worker Node
    cnNode3 := &agscheduler.ClusterNode{...}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Base API

    gRPC FunctionHTTP MethodHTTP Endpoint
    GetInfoGET/info

    Scheduler API

    gRPC FunctionHTTP MethodHTTP Endpoint
    AddJobPOST/scheduler/job
    GetJobGET/scheduler/job/:id
    GetAllJobsGET/scheduler/jobs
    UpdateJobPUT/scheduler/job
    DeleteJobDELETE/scheduler/job/:id
    DeleteAllJobsDELETE/scheduler/jobs
    PauseJobPOST/scheduler/job/:id/pause
    ResumeJobPOST/scheduler/job/:id/resume
    RunJobPOST/scheduler/job/run
    ScheduleJobPOST/scheduler/job/schedule
    StartPOST/scheduler/start
    StopPOST/scheduler/stop

    Cluster API

    gRPC FunctionHTTP MethodHTTP Endpoint
    GetNodesGET/cluster/nodes

    示例

    完整示例

    致谢

    APScheduler

    simple-raft

  • 相关阅读:
    【数据结构】详解动态顺序表
    【Linux-day11-线程的创建与同步】
    【学习日记2023.5.23】 之 Redis入门未入坑
    开源PHP 代挂机源码,可对接QQ、网易云、哔哩哔哩、QQ空间、等级加速等等
    MySQL数据库入门到大牛_04_运算符
    java计算机毕业设计高校科研信息管理系统MyBatis+系统+LW文档+源码+调试部署
    Spring Bean 生命周期详解
    win11家庭版docker和milvus
    Java面试题——你们怎么解决消息重复消费?
    日常学习记录随笔-zabix实战
  • 原文地址:https://blog.csdn.net/z497896143/article/details/133840137