以前一直使用 Python 的任务调度库 APScheduler(支持任务持久化,支持多种存储方式),但由于没有找到与它功能和使用方式类似的 Golang 库,所以模仿 APScheduler 3.x 写了个简易版本的 AGScheduler。
Advanced Golang Scheduler (AGScheduler) 是一款适用于 Golang 的任务调度库,支持多种调度类型,支持动态更改和持久化作业,支持远程调用,支持集群
https://github.com/kwkwc/agscheduler
package main
import (
"context"
"fmt"
"log/slog"
"time"
"github.com/kwkwc/agscheduler"
"github.com/kwkwc/agscheduler/stores"
)
func printMsg(ctx context.Context, j agscheduler.Job) {
slog.Info(fmt.Sprintf("Run job `%s` %s\n\n", j.FullName(), j.Args))
}
func main() {
agscheduler.RegisterFuncs(printMsg)
store := &stores.MemoryStore{}
scheduler := &agscheduler.Scheduler{}
scheduler.SetStore(store)
job1 := agscheduler.Job{
Name: "Job1",
Type: agscheduler.TYPE_INTERVAL,
Interval: "2s",
Timezone: "UTC",
Func: printMsg,
Args: map[string]any{"arg1": "1", "arg2": "2", "arg3": "3"},
}
job1, _ = scheduler.AddJob(job1)
slog.Info(fmt.Sprintf("%s.\n\n", job1))
job2 := agscheduler.Job{
Name: "Job2",
Type: agscheduler.TYPE_CRON,
CronExpr: "*/1 * * * *",
Timezone: "Asia/Shanghai",
FuncName: "main.printMsg",
Args: map[string]any{"arg4": "4", "arg5": "5", "arg6": "6", "arg7": "7"},
}
job2, _ = s.AddJob(job2)
slog.Info(fmt.Sprintf("%s.\n\n", job2))
job3 := agscheduler.Job{
Name: "Job3",
Type: agscheduler.TYPE_DATETIME,
StartAt: "2023-09-22 07:30:08",
Timezone: "America/New_York",
Func: printMsg,
Args: map[string]any{"arg8": "8", "arg9": "9"},
}
job3, _ = s.AddJob(job3)
slog.Info(fmt.Sprintf("%s.\n\n", job3))
jobs, _ := s.GetAllJobs()
slog.Info(fmt.Sprintf("Scheduler get all jobs %s.\n\n", jobs))
scheduler.Start()
select {}
}
由于 golang 无法序列化函数,所以
scheduler.Start()
之前需要使用RegisterFuncs
注册函数
// Server
grservice := services.GRPCService{
Scheduler: scheduler,
Address: "127.0.0.1:36360",
}
grservice.Start()
// Client
conn, _ := grpc.Dial("127.0.0.1:36360", grpc.WithTransportCredentials(insecure.NewCredentials()))
client := pb.NewSchedulerClient(conn)
client.AddJob(ctx, job)
// Server
hservice := services.HTTPService{
Scheduler: scheduler,
Address: "127.0.0.1:36370",
}
hservice.Start()
// Client
mJob := map[string]any{...}
bJob, _ := json.Marshal(mJob)
resp, _ := http.Post("http://127.0.0.1:36370/scheduler/job", "application/json", bytes.NewReader(bJob))
// Main Node
cnMain := &agscheduler.ClusterNode{
Endpoint: "127.0.0.1:36380",
EndpointGRPC: "127.0.0.1:36360",
EndpointHTTP: "127.0.0.1:36370",
Queue: "default",
}
schedulerMain.SetStore(storeMain)
schedulerMain.SetClusterNode(ctx, cnMain)
cserviceMain := &services.ClusterService{Cn: cnMain}
cserviceMain.Start()
// Worker Node
cnNode := &agscheduler.ClusterNode{
EndpointMain: "127.0.0.1:36380",
Endpoint: "127.0.0.1:36381",
EndpointGRPC: "127.0.0.1:36361",
EndpointHTTP: "127.0.0.1:36371",
Queue: "worker",
}
schedulerNode.SetStore(storeNode)
schedulerNode.SetClusterNode(ctx, cnNode)
cserviceNode := &services.ClusterService{Cn: cnNode}
cserviceNode.Start()
// HA 需要满足以下条件:
//
// 1. 集群中 HA 节点的数量必须为奇数
// 2. 所有 HA 节点都需要连接到同一个存储(不包含 MemoryStore)
// 3. ClusterNode 的 Mode 属性需要设置为 `HA`
// 4. HA 主节点必须先启动
// Main HA Node
cnMain := &agscheduler.ClusterNode{..., Mode: "HA"}
// HA Node
cnNode1 := &agscheduler.ClusterNode{..., Mode: "HA"}
cnNode2 := &agscheduler.ClusterNode{..., Mode: "HA"}
// Worker Node
cnNode3 := &agscheduler.ClusterNode{...}
gRPC Function | HTTP Method | HTTP Endpoint |
---|---|---|
GetInfo | GET | /info |
gRPC Function | HTTP Method | HTTP Endpoint |
---|---|---|
AddJob | POST | /scheduler/job |
GetJob | GET | /scheduler/job/:id |
GetAllJobs | GET | /scheduler/jobs |
UpdateJob | PUT | /scheduler/job |
DeleteJob | DELETE | /scheduler/job/:id |
DeleteAllJobs | DELETE | /scheduler/jobs |
PauseJob | POST | /scheduler/job/:id/pause |
ResumeJob | POST | /scheduler/job/:id/resume |
RunJob | POST | /scheduler/job/run |
ScheduleJob | POST | /scheduler/job/schedule |
Start | POST | /scheduler/start |
Stop | POST | /scheduler/stop |
gRPC Function | HTTP Method | HTTP Endpoint |
---|---|---|
GetNodes | GET | /cluster/nodes |