• Temporal介绍


    1、前言

    在后端开发中,大家是否有遇到如下类型的开发场景

    1. 需要处理较多的异步事件

    2. 需要的外部服务可靠性较低

    3. 需要记录保存某个对象的复杂状态

    在以往的开发过程中,可能更多的直接使用数据库、定时任务、消息队列等作为基础,来解决上面的问题

    然而即便如此,在代码开发中,也会有很多代码跟业务无关,比如 外部服务低可靠性情况下的重试,多异步事件下的程序逻辑组织等。

    最终会影响开发效率,并且可能会降低代码后期的维护性。

    针对上面的场景,抽象为 工作流 模式的话,可以减轻开发成本以及维护成本

    2、工作流介绍

    2.1、定义

    工作流:指业务过程的部分或整体在计算机应用环境下的自动化。是对工作流程及其各操作步骤之间的业务规则的抽象、概括描述

    2.2、目标

    工作流主要解决的主要问题:为了实现某个业务目标,利用计算机在多个参与者之间按照某种预订的规则自动传递问文档、信息或者任务

    2.3、适用场景

    工作流通常适用于,有状态的、异步、长时间执行等特性的业务场景,比较典型的场景包括

    1. 视频、音频、图片处理工作流

    2. 订单、审批流程

    3. 数据处理流水线

    4. 自动化运维
       

    2.4、常见工作流框架

    工作流框架还是比较多的,按照语言分类的话,有

    Java: jBPM、Activiti、SWF

    PHP: Tpflow、PHPworkflow

    Go: Cadence(Cadence由Uber开发并开源,Maxim Fateev是Cadence的主架构师)、Temporal(Maxim Fateev为了推广Workflow编排引擎的商业化,另立门户创建了Temporal)

    3、Temporal工作流基本概念

    3.1、Workflow

    workflow 即表示工作流,在Temporal中,工作流是由函数或对象方法来实现(工作流样例见下文)。

    一个workflow通常完成一个业务目标。同时,当多个workflow中,有同样的处理流程时,可以封装为一个子workflow,来达到代码复用的目的。

    3.1.1、工作流选项

    启动Workflow的时候,可以设置这个Wrokflow的执行超时时间,以及失败后的重试次数、任务队列名等参数,来更好的满足业务需求

    支持的配置参数如下:

    超时设置

    1. ExecuteTimeout:Workflow的最大运行时间,包括失败后重试的时间。默认值是10年

    2. RunTimeout:Workflow单次运行的时间,默认值为 ExecuteTimeout

    3. TaskTimeout:从Worker从任务队列拉取到Workflow任务,到Worker开始执行Workflow的时间。如果超时,Server会认为Worker已经挂掉,会重新调度该Workflow给其他Worker,默认值10s

    重试策略

    1. InitialInterval:第一次重试前,需要等待多久。无默认值

    2. BackoffCeofficient:退避系数,表示多次重试时,下次等待的时间是上次的多少倍,默认值:2

    3. MaximumnInterval:下次重试时,最大等待时间。默认值:100*初始等待时间

    4. MaximumAttempts:做大重试次数,默认值:0,表示无限重试

    5. Non-Retryable:表示Workflow遇到哪些Error后,不再进行重试

    Workflow Id:

    一个Workflow,可由 命名空间,Workflow Id和Run id 唯一标识

    启动Workflow的时候,可以指定一个ID,这个ID一般采用业务级的ID,如一个要处理的客户的ID或订单ID

    多个Workflow使用相同ID时的策略配置

    1. Allow duplicate failed only policy:只有前一个相同ID的Workflow失败后,才可以再启动下一个相同ID的Workflow

    2. Allow duplicate policy:允许两个相同ID的Workflow同时运行。默认策略

    3. Reject duplicate policy:任何时候,不允许有相同ID的Workflow


    定时运行

    启动Workflow的时候,可以设置为定时启动。

    ⚠️ 注意。如果到了下次运行Workflow的时候,但上次的Workflow还没执行完(可能任务执行耗时长,或由于失败后重试等原因),会跳过下次运行Workflow

    3.2、 Activities

    根据工作流的定义:工作流指业务过程的部分或整体在计算机应用环境下的自动化,是对工作流程及其各操作步骤之间的业务规则的抽象、概括描述

    Activities可以理解为一个业务操作单元

    在Workflow执行过程中,会将Activity放入消息队列,由其他Worker获取后,执行该Activity,并将结果再返回给Workflow。

    3.2.1、Activity选项

    超时配置

    1. ScheduleToStart:表示Activity任务放到消息队列,到Worker获取到的超时时间。如果超时后,也不会触发重试。非有特殊原因,不要设置该值

    2. StartToClose:Activity实际执行超时时间。如果Activity执行时间不确定,最好按照最长时间设置。比如一个Activity可能需要2分钟、有时需要5分钟,那就设置为5分钟

    3. ScheduleToClose:从Activity放入消息队列,到Activity执行完成的时间

    4. Heartbeat:Activity和Server的心跳超时时间。在Activity运行需要较长时间时需要。用于Server检查执行Activity的Worker是否已经挂掉

    重试策略

    和Workflow的重试策略完全一致

    执行时间超长的Activity

    如果一个Activity运行时间较长,最好设置一个心跳间隔超时。这样当执行Activity的Woker挂掉时,Server可以及时知道

    4、工作流样例

    1. package subscription
    2. import (
    3. "log"
    4. "time"
    5. "go.temporal.io/sdk/workflow"
    6. )
    7. // 此处定义该函数作为一个工作流,用于处理用户订阅的服务
    8. // 工作流的编写方式和普通函数相似,需要注意的地方是,函数中涉及到可能失败的情况都应该封装到Activity中执行
    9. func SubscriptionWorkflow(ctx workflow.Context, customer Customer) (string, error) {
    10. workflowCustomer := customer
    11. subscriptionCancelled := false
    12. billingPeriodNum := 0
    13. actResult := ""
    14. QueryCustomerIdName := "customerid"
    15. QueryBillingPeriodNumberName := "billingperiodnumber"
    16. QueryBillingPeriodChargeAmountName := "billingperiodchargeamount"
    17. logger := workflow.GetLogger(ctx)
    18. // 通过SetQueryHandler方法,外部服务向Temporal Server发送请求查询该工作流的内部变量状态
    19. // 比如,根据下面QueryHandler的定义,外部服务可以查看该工作流的账单信息
    20. err = workflow.SetQueryHandler(ctx, QueryBillingPeriodChargeAmountName, func() (int, error) {
    21. return workflowCustomer.Subscription.BillingPeriodCharge, nil
    22. })
    23. if err != nil {
    24. logger.Info("QueryBillingPeriodChargeAmountName handler failed.", "Error", err)
    25. return "Error", err
    26. }
    27. // 下面的代码定义了一个Selector,Selector通常用于等待一个或多个异步时间或超时通知
    28. // 此处定义的Selector,用于接受用户取消订阅的事件,收到该事件后,修改了变量 subscriptionCancelled 的值
    29. cancelSelector := workflow.NewSelector(ctx)
    30. cancelCh := workflow.GetSignalChannel(ctx, "cancelsubscription")
    31. cancelSelector.AddReceive(cancelCh, func(ch workflow.ReceiveChannel, _ bool) {
    32. var cancelSubSignal bool
    33. ch.Receive(ctx, &cancelSubSignal)
    34. subscriptionCancelled = cancelSubSignal
    35. })
    36. // 此处配置了Activity的执行超时时间
    37. ao := workflow.ActivityOptions{
    38. StartToCloseTimeout: time.Minute * 5,
    39. }
    40. ctx = workflow.WithActivityOptions(ctx, ao)
    41. logger.Info("Subscription workflow started for: " + customer.Id)
    42. var activities *Activities
    43. // 执行一个向用户发送欢迎邮件的Activity
    44. err = workflow.ExecuteActivity(ctx, activities.SendWelcomeEmail, workflowCustomer).Get(ctx, &actResult)
    45. if err != nil {
    46. log.Fatalln("Failure executing SendWelcomeEmail", err)
    47. }
    48. // 程序等待,直到超时或用户取消订阅
    49. workflow.AwaitWithTimeout(ctx, workflowCustomer.Subscription.TrialPeriod, func() bool {
    50. return subscriptionCancelled == true
    51. })
    52. // 如果用户在试用期,取消订阅,发送取消订阅的通知邮件
    53. if subscriptionCancelled == true {
    54. err = workflow.ExecuteActivity(ctx, activities.SendCancellationEmailDuringTrialPeriod, workflowCustomer).Get(ctx, &actResult)
    55. if err != nil {
    56. log.Fatalln("Failure executing SendCancellationEmailDuringTrialPeriod", err)
    57. }
    58. // 结束此工作流
    59. return "Subscription finished for: " + workflowCustomer.Id, err
    60. }
    61. ...
    62. return "Completed Subscription Workflow", err
    63. }

    5、Temporal集群整体架构

    Temporal集群由Temporal Server和数据库服务组成

    Temporal Server包含:Fronted、Matching、History、Worker

    数据库支持:Mysql、PostgreSQL、Cassandra、ES

    5.1、各组件介绍

    5.1.1、前端组件(Frontend Service)

    前端组件是一个单点网关,提供 Proto API。可以接受来自浏览器、tctl(Temporal的命令行工具)、以及业务方的调用请求。

    前端组建主要用于 接口限速、授权认证、校验和请求路由。

    5.1.2、 记录服务(History service)

    记录服务用于记录Workflow的执行状态,并且支持横向拓展。

    5.1.3、 匹配服务(Matching service)

    匹配服务用于管理任务队列,及任务分发,并且支持横向拓展

    5.1.4、 后台服务(Worker service)

    后台服务主要用于维护拷贝队列和执行一些Temporal服务自己的Wrokflow。

    5.1.5、 数据库

    数据库保存了用于分发的任务信息、以及Workflow的执行状态、命名空间元数据,前端可视化配置

    6、Workflow执行状态、结果查看

    Temporal 官网提供了Web UI组建。可以通过浏览器查看Workflow的执行状态和结果

    6.1、批量查看Workflow

    进入Web UI首页,即可查看最新的工作流执行状态列表

    6.2、查看单个Workflow详情

    点击Workflow的RUN ID,可以查看单个workflow的执行详情。

    如下下图显示,可以看到Workerflow的

    1. 开始时间

    2. 结束时间

    3. 执行结果状态

    4. 输入参数

    5. 输出参数

    6. ...

    6.3、查看单个Workflow的Activity的执行详情

    在查看单个Workflow的页面中,点击History标签,然后选择COMPACT视图模式,即可查看各个Activity的执行情况

    7、Temporal高可用

    7.1、部署

    由前面Temporal整体架构可以知道,Temporal的前端组件为单点,为了做到高可用,可以部署多个实例,通过公司内部的BNS做负载均衡达到高可用

    Temporal的记录组件,支持横向拓展。但是要注意的是:记录组件的实例个数,需要在集群创建前规划好。集群创建好后,记录组件的个数就不可改变

    Tremporal的匹配服务组建和Worker组件,可以根据需要随时拓展实例

    数据库的高可用,可以使用百度内部的DDBS来做。

    通过以上配置,可以确保Temporal工作流服务的高可用。

    7.2、服务监控

    Temporal的各个组件支持通过HTTP接口导出自身内部的各种指标。

    可以通过prometheus实时监控Temporal各组件的运行状态,并对关键指标配置告警。

     并且可以通过配合Grafana,可以查看各组件的历史运行状态。

    结语

    技术选型中,没有免费的午餐,只有权衡,工作流模式同样如此,在简单的业务场景下,采用请求-响应模式或消息队列模式就可以满足业务场景需要,此时如果引入工作流系统,反而会增加不必要的开发负担。

    希望通过本文,可以让大家了解工作流,将工作流作为研发百宝箱的工具,在技术选型中多一选择,合理使用工作流,来降低大家的研发成本,提高效率。

  • 相关阅读:
    LoRaWan模块应用于智慧城市景观灯
    PlantUML 绘制时序图
    LeetCode 26. 删除有序数组中的重复项 简单
    调优zuul1.x(基于arthas)
    Day657.思考题解答⑤ -Java业务开发常见错误
    中间件安全:Apache 目录穿透.(CVE-2021-41773)
    cadence SPB17.4 - CIS DB - add MECHANICAL part
    Nature|高性能柔性纤维电池 (柔性智能织物/可穿戴电子/界面调控/柔性电池/柔性电子)
    F. Rats Rats(二分 or 预处理)[UTPC Contest 09-02-22 Div. 2 (Beginner)]
    js中批量修改对象属性
  • 原文地址:https://blog.csdn.net/spirit_8023/article/details/125992113