在后端开发中,大家是否有遇到如下类型的开发场景
需要处理较多的异步事件
需要的外部服务可靠性较低
需要记录保存某个对象的复杂状态
在以往的开发过程中,可能更多的直接使用数据库、定时任务、消息队列等作为基础,来解决上面的问题
然而即便如此,在代码开发中,也会有很多代码跟业务无关,比如 外部服务低可靠性情况下的重试,多异步事件下的程序逻辑组织等。
最终会影响开发效率,并且可能会降低代码后期的维护性。
针对上面的场景,抽象为 工作流 模式的话,可以减轻开发成本以及维护成本
工作流:指业务过程的部分或整体在计算机应用环境下的自动化。是对工作流程及其各操作步骤之间的业务规则的抽象、概括描述
工作流主要解决的主要问题:为了实现某个业务目标,利用计算机在多个参与者之间按照某种预订的规则自动传递问文档、信息或者任务
工作流通常适用于,有状态的、异步、长时间执行等特性的业务场景,比较典型的场景包括
视频、音频、图片处理工作流
订单、审批流程
数据处理流水线
自动化运维
工作流框架还是比较多的,按照语言分类的话,有
Java: jBPM、Activiti、SWF
PHP: Tpflow、PHPworkflow
Go: Cadence(Cadence由Uber开发并开源,Maxim Fateev是Cadence的主架构师)、Temporal(Maxim Fateev为了推广Workflow编排引擎的商业化,另立门户创建了Temporal)
workflow 即表示工作流,在Temporal中,工作流是由函数或对象方法来实现(工作流样例见下文)。
一个workflow通常完成一个业务目标。同时,当多个workflow中,有同样的处理流程时,可以封装为一个子workflow,来达到代码复用的目的。
启动Workflow的时候,可以设置这个Wrokflow的执行超时时间,以及失败后的重试次数、任务队列名等参数,来更好的满足业务需求
支持的配置参数如下:
超时设置
ExecuteTimeout:Workflow的最大运行时间,包括失败后重试的时间。默认值是10年
RunTimeout:Workflow单次运行的时间,默认值为 ExecuteTimeout
TaskTimeout:从Worker从任务队列拉取到Workflow任务,到Worker开始执行Workflow的时间。如果超时,Server会认为Worker已经挂掉,会重新调度该Workflow给其他Worker,默认值10s
重试策略
InitialInterval:第一次重试前,需要等待多久。无默认值
BackoffCeofficient:退避系数,表示多次重试时,下次等待的时间是上次的多少倍,默认值:2
MaximumnInterval:下次重试时,最大等待时间。默认值:100*初始等待时间
MaximumAttempts:做大重试次数,默认值:0,表示无限重试
Non-Retryable:表示Workflow遇到哪些Error后,不再进行重试
Workflow Id:
一个Workflow,可由 命名空间,Workflow Id和Run id 唯一标识
启动Workflow的时候,可以指定一个ID,这个ID一般采用业务级的ID,如一个要处理的客户的ID或订单ID
多个Workflow使用相同ID时的策略配置
Allow duplicate failed only policy:只有前一个相同ID的Workflow失败后,才可以再启动下一个相同ID的Workflow
Allow duplicate policy:允许两个相同ID的Workflow同时运行。默认策略
Reject duplicate policy:任何时候,不允许有相同ID的Workflow
定时运行
启动Workflow的时候,可以设置为定时启动。
⚠️ 注意。如果到了下次运行Workflow的时候,但上次的Workflow还没执行完(可能任务执行耗时长,或由于失败后重试等原因),会跳过下次运行Workflow
根据工作流的定义:工作流指业务过程的部分或整体在计算机应用环境下的自动化,是对工作流程及其各操作步骤之间的业务规则的抽象、概括描述
Activities可以理解为一个业务操作单元
在Workflow执行过程中,会将Activity放入消息队列,由其他Worker获取后,执行该Activity,并将结果再返回给Workflow。
超时配置
ScheduleToStart:表示Activity任务放到消息队列,到Worker获取到的超时时间。如果超时后,也不会触发重试。非有特殊原因,不要设置该值
StartToClose:Activity实际执行超时时间。如果Activity执行时间不确定,最好按照最长时间设置。比如一个Activity可能需要2分钟、有时需要5分钟,那就设置为5分钟
ScheduleToClose:从Activity放入消息队列,到Activity执行完成的时间
Heartbeat:Activity和Server的心跳超时时间。在Activity运行需要较长时间时需要。用于Server检查执行Activity的Worker是否已经挂掉
重试策略
和Workflow的重试策略完全一致
执行时间超长的Activity
如果一个Activity运行时间较长,最好设置一个心跳间隔超时。这样当执行Activity的Woker挂掉时,Server可以及时知道
- package subscription
-
- import (
- "log"
- "time"
-
- "go.temporal.io/sdk/workflow"
- )
-
- // 此处定义该函数作为一个工作流,用于处理用户订阅的服务
- // 工作流的编写方式和普通函数相似,需要注意的地方是,函数中涉及到可能失败的情况都应该封装到Activity中执行
- func SubscriptionWorkflow(ctx workflow.Context, customer Customer) (string, error) {
- workflowCustomer := customer
- subscriptionCancelled := false
- billingPeriodNum := 0
- actResult := ""
-
- QueryCustomerIdName := "customerid"
- QueryBillingPeriodNumberName := "billingperiodnumber"
- QueryBillingPeriodChargeAmountName := "billingperiodchargeamount"
-
- logger := workflow.GetLogger(ctx)
-
- // 通过SetQueryHandler方法,外部服务向Temporal Server发送请求查询该工作流的内部变量状态
- // 比如,根据下面QueryHandler的定义,外部服务可以查看该工作流的账单信息
- err = workflow.SetQueryHandler(ctx, QueryBillingPeriodChargeAmountName, func() (int, error) {
- return workflowCustomer.Subscription.BillingPeriodCharge, nil
- })
- if err != nil {
- logger.Info("QueryBillingPeriodChargeAmountName handler failed.", "Error", err)
- return "Error", err
- }
-
-
- // 下面的代码定义了一个Selector,Selector通常用于等待一个或多个异步时间或超时通知
- // 此处定义的Selector,用于接受用户取消订阅的事件,收到该事件后,修改了变量 subscriptionCancelled 的值
- cancelSelector := workflow.NewSelector(ctx)
- cancelCh := workflow.GetSignalChannel(ctx, "cancelsubscription")
- cancelSelector.AddReceive(cancelCh, func(ch workflow.ReceiveChannel, _ bool) {
- var cancelSubSignal bool
- ch.Receive(ctx, &cancelSubSignal)
- subscriptionCancelled = cancelSubSignal
- })
-
-
- // 此处配置了Activity的执行超时时间
- ao := workflow.ActivityOptions{
- StartToCloseTimeout: time.Minute * 5,
- }
-
- ctx = workflow.WithActivityOptions(ctx, ao)
- logger.Info("Subscription workflow started for: " + customer.Id)
-
- var activities *Activities
-
- // 执行一个向用户发送欢迎邮件的Activity
- err = workflow.ExecuteActivity(ctx, activities.SendWelcomeEmail, workflowCustomer).Get(ctx, &actResult)
- if err != nil {
- log.Fatalln("Failure executing SendWelcomeEmail", err)
- }
-
- // 程序等待,直到超时或用户取消订阅
- workflow.AwaitWithTimeout(ctx, workflowCustomer.Subscription.TrialPeriod, func() bool {
- return subscriptionCancelled == true
- })
-
- // 如果用户在试用期,取消订阅,发送取消订阅的通知邮件
- if subscriptionCancelled == true {
- err = workflow.ExecuteActivity(ctx, activities.SendCancellationEmailDuringTrialPeriod, workflowCustomer).Get(ctx, &actResult)
- if err != nil {
- log.Fatalln("Failure executing SendCancellationEmailDuringTrialPeriod", err)
- }
- // 结束此工作流
- return "Subscription finished for: " + workflowCustomer.Id, err
- }
- ...
-
- return "Completed Subscription Workflow", err
- }
Temporal集群由Temporal Server和数据库服务组成
Temporal Server包含:Fronted、Matching、History、Worker
数据库支持:Mysql、PostgreSQL、Cassandra、ES
前端组件是一个单点网关,提供 Proto API。可以接受来自浏览器、tctl(Temporal的命令行工具)、以及业务方的调用请求。
前端组建主要用于 接口限速、授权认证、校验和请求路由。
记录服务用于记录Workflow的执行状态,并且支持横向拓展。
匹配服务用于管理任务队列,及任务分发,并且支持横向拓展
后台服务主要用于维护拷贝队列和执行一些Temporal服务自己的Wrokflow。
数据库保存了用于分发的任务信息、以及Workflow的执行状态、命名空间元数据,前端可视化配置
Temporal 官网提供了Web UI组建。可以通过浏览器查看Workflow的执行状态和结果
进入Web UI首页,即可查看最新的工作流执行状态列表
点击Workflow的RUN ID,可以查看单个workflow的执行详情。
如下下图显示,可以看到Workerflow的
开始时间
结束时间
执行结果状态
输入参数
输出参数
...
在查看单个Workflow的页面中,点击History标签,然后选择COMPACT视图模式,即可查看各个Activity的执行情况
由前面Temporal整体架构可以知道,Temporal的前端组件为单点,为了做到高可用,可以部署多个实例,通过公司内部的BNS做负载均衡达到高可用
Temporal的记录组件,支持横向拓展。但是要注意的是:记录组件的实例个数,需要在集群创建前规划好。集群创建好后,记录组件的个数就不可改变
Tremporal的匹配服务组建和Worker组件,可以根据需要随时拓展实例
数据库的高可用,可以使用百度内部的DDBS来做。
通过以上配置,可以确保Temporal工作流服务的高可用。
Temporal的各个组件支持通过HTTP接口导出自身内部的各种指标。
可以通过prometheus实时监控Temporal各组件的运行状态,并对关键指标配置告警。
并且可以通过配合Grafana,可以查看各组件的历史运行状态。
技术选型中,没有免费的午餐,只有权衡,工作流模式同样如此,在简单的业务场景下,采用请求-响应模式或消息队列模式就可以满足业务场景需要,此时如果引入工作流系统,反而会增加不必要的开发负担。
希望通过本文,可以让大家了解工作流,将工作流作为研发百宝箱的工具,在技术选型中多一选择,合理使用工作流,来降低大家的研发成本,提高效率。