• 适用于 Golang 的任务调度程序 AGScheduler


    以前一直使用 Python 的任务调度库 APScheduler(支持任务持久化,支持多种存储方式),但由于没有找到与它功能和使用方式类似的 Golang 库,所以模仿 APScheduler 3.x 写了个简易版本的 AGScheduler。

    AGScheduler

    Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持远程调用,支持集群

    特性

    • 支持三种调度类型
      • 一次性执行
      • 间隔执行
      • Cron 式调度
    • 支持多种作业存储方式
    • 支持远程调用
    • 支持集群
      • 远程工作节点
      • 调度器高可用(实验性)

    链接

    https://github.com/kwkwc/agscheduler

    架构

    请添加图片描述

    使用

    package main
    
    import (
    	"context"
    	"fmt"
    	"log/slog"
    	"time"
    
    	"github.com/kwkwc/agscheduler"
    	"github.com/kwkwc/agscheduler/stores"
    )
    
    func printMsg(ctx context.Context, j agscheduler.Job) {
    	slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
    }
    
    func main() {
    	agscheduler.RegisterFuncs(printMsg)
    
    	store := &stores.MemoryStore{}
    	scheduler := &agscheduler.Scheduler{}
    	scheduler.SetStore(store)
    
    	job1 := agscheduler.Job{
    		Name:     "Job1",
    		Type:     agscheduler.TYPE_INTERVAL,
    		Interval: "2s",
    		Timezone: "UTC",
    		Func:     printMsg,
    		Args:     map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
    	}
    	job1, _ = scheduler.AddJob(job1)
    	slog.Info(fmt.Sprintf("%s.\n\n", job1))
    
    	job2 := agscheduler.Job{
    		Name:     "Job2",
    		Type:     agscheduler.TYPE_CRON,
    		CronExpr: "*/1 * * * *",
    		Timezone: "Asia/Shanghai",
    		FuncName: "main.printMsg",
    		Args:     map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
    	}
    	job2, _ = s.AddJob(job2)
    	slog.Info(fmt.Sprintf("%s.\n\n", job2))
    
    	job3 := agscheduler.Job{
    		Name:     "Job3",
    		Type:     agscheduler.TYPE_DATETIME,
    		StartAt:  "2023-09-22 07:30:08",
    		Timezone: "America/New_York",
    		Func:     printMsg,
    		Args:     map[string]any{"arg8": "8", "arg9": "9"},
    	}
    	job3, _ = s.AddJob(job3)
    	slog.Info(fmt.Sprintf("%s.\n\n", job3))
    
    	jobs, _ := s.GetAllJobs()
    	slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))
    
    	scheduler.Start()
    
    	select {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    注册函数

    由于 golang 无法序列化函数,所以 scheduler.Start() 之前需要使用 RegisterFuncs 注册函数

    gRPC

    // Server
    grservice := services.GRPCService{
    	Scheduler: scheduler,
    	Address:   "127.0.0.1:36360",
    }
    grservice.Start()
    
    // Client
    conn, _ := grpc.Dial("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
    client := pb.NewSchedulerClient(conn)
    client.AddJob(ctx, job)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    HTTP

    // Server
    hservice := services.HTTPService{
    	Scheduler: scheduler,
    	Address:   "127.0.0.1:36370",
    }
    hservice.Start()
    
    // Client
    mJob := map[string]any{...}
    bJob, _ := json.Marshal(mJob)
    resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Cluster

    // Main Node
    cnMain := &agscheduler.ClusterNode{
    	Endpoint:     "127.0.0.1:36380",
    	EndpointGRPC: "127.0.0.1:36360",
    	EndpointHTTP: "127.0.0.1:36370",
    	Queue:        "default",
    }
    schedulerMain.SetStore(storeMain)
    schedulerMain.SetClusterNode(ctx, cnMain)
    cserviceMain := &services.ClusterService{Cn: cnMain}
    cserviceMain.Start()
    
    // Worker Node
    cnNode := &agscheduler.ClusterNode{
    	EndpointMain: "127.0.0.1:36380",
    	Endpoint:     "127.0.0.1:36381",
    	EndpointGRPC: "127.0.0.1:36361",
    	EndpointHTTP: "127.0.0.1:36371",
    	Queue:        "worker",
    }
    schedulerNode.SetStore(storeNode)
    schedulerNode.SetClusterNode(ctx, cnNode)
    cserviceNode := &services.ClusterService{Cn: cnNode}
    cserviceNode.Start()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    Cluster HA (高可用,实验性)

    
    // HA 需要满足以下条件:
    //
    // 1. 集群中 HA 节点的数量必须为奇数
    // 2. 所有 HA 节点都需要连接到同一个存储(不包含 MemoryStore)
    // 3. ClusterNode 的 Mode 属性需要设置为 `HA`
    // 4. HA 主节点必须先启动
    
    // Main HA Node
    cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}
    
    // HA Node
    cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
    cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}
    
    // Worker Node
    cnNode3 := &agscheduler.ClusterNode{...}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Base API

    gRPC FunctionHTTP MethodHTTP Endpoint
    GetInfoGET/info

    Scheduler API

    gRPC FunctionHTTP MethodHTTP Endpoint
    AddJobPOST/scheduler/job
    GetJobGET/scheduler/job/:id
    GetAllJobsGET/scheduler/jobs
    UpdateJobPUT/scheduler/job
    DeleteJobDELETE/scheduler/job/:id
    DeleteAllJobsDELETE/scheduler/jobs
    PauseJobPOST/scheduler/job/:id/pause
    ResumeJobPOST/scheduler/job/:id/resume
    RunJobPOST/scheduler/job/run
    ScheduleJobPOST/scheduler/job/schedule
    StartPOST/scheduler/start
    StopPOST/scheduler/stop

    Cluster API

    gRPC FunctionHTTP MethodHTTP Endpoint
    GetNodesGET/cluster/nodes

    示例

    完整示例

    致谢

    APScheduler

    simple-raft

  • 相关阅读:
    DNS网络故障排除命令dig&nslookup
    css---定位
    阴影text-shadow和box-shadow详解
    数据结构与算法(C语言版)P2---线性表之顺序表
    短视频简单无脑玩法,播放量10w+的藏头诗玩法,操作思路分享给你!
    第14章 操作重载与类型转换【C++】
    C#,根据路径获取某个数字开头的所有文件夹,并获取最新文件夹进行替换文件
    C#操作modbus
    TestStand-调用LabVIEW
    Mac版本破解Typora,解决Mac安装软件的“已损坏,无法打开。 您应该将它移到废纸篓”问题
  • 原文地址:https://blog.csdn.net/z497896143/article/details/133840137