• .Net Core工作流WorkFlowCore


    前言

    WorkFlowCore是一个针对.NetCore的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。

    本篇开发环境为.Net7,此处不演示Jsonyaml配置,详细文档请查看官方文档项目源码地址

     一、安装与基础使用

    通过以下命令安装

    Install-Package WorkflowCore

    然后注入WorkFlowCore

    builder.Services.AddWorkflow();

     WorkFlowCore主要分为两部分:步骤工作流

     步骤

     多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类StepBody或StepBodyAsync的类,并且实现Run或RunAsync方法来定义步骤,很明显它们的区别是是否异步

    复制代码
    public class FirstStepBody: StepBody
        {
            public override ExecutionResult Run(IStepExecutionContext context)
            {
                Console.WriteLine("Hello world!First");
                return ExecutionResult.Next();
            }
        }
    复制代码

    工作流

     通过继承IWorkflow接口定义一个工作流,接口只有IdVersionBuild方法(内部可以执行多个步骤),工作流主机使用这些信息来标识工作流

    复制代码
    public class MyWorkflow :IWorkflow
        {
            public string Id => "HelloWorld";
            public int Version => 1;
            public void Build(IWorkflowBuilder<object> builder)
            {
                builder
                    .StartWith()
                    .Then();
            }
        }
    复制代码

    工作流如果想使用必须在工作流主机中通过RegisterWorkflow()方法注册,并且通过Start()方法启动主机,当然也可以通过Stop()方法停止工作流。执行工作流需要使用StartWorkflow()方法,参数为工作流类的Id,如下

    复制代码
     [ApiController]
        [Route("[controller]")]
        public class WeatherForecastController : ControllerBase
        {
            private readonly IWorkflowHost _workflowHost;
            public WeatherForecastController(IWorkflowHost workflowHost)
            {
                _workflowHost = workflowHost;
            }
            [HttpGet(Name = "get")]
            public ContentResult Get()
            {
                if (!_workflowHost.Registry.IsRegistered("HelloWorld",1))
                {
                    _workflowHost.RegisterWorkflow();
                }
                _workflowHost.Start();
                _workflowHost.StartWorkflow("HelloWorld");
                //host.Stop();
                return Content("ok");
            }
        }
    复制代码

     当然也可以在构建web服务的时候统一注册,然后就可以直接执行啦

    var host = app.Services.GetService();
    host.RegisterWorkflow();
    host.Start();

    二、在步骤之间传递参数

    每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。

    以下示例显示了如何定义步骤的输入和输出,然后显示了如何使用内部数据的类型化类定义工作流,以及如何将输入和输出映射到自定义数据类的属性。

    复制代码
    //步骤包含属性,并且计算
        public class FirstStepBody: StepBody
        {
            public int Input1 { get; set; }
            public int Input2 { get; set; }
            public int Output { get; set; }
            public override ExecutionResult Run(IStepExecutionContext context)
            {
                Output = Input1 + Input2;
                Console.WriteLine(Output);
                return ExecutionResult.Next();
            }
        }
        //工作流包含输入输出的赋值
        public class MyWorkflow :IWorkflow
        {
            public string Id => "HelloWorld";
            public int Version => 1;
            public void Build(IWorkflowBuilder builder)
            {
                builder
                    .StartWith()
                    .Input(step => step.Input1,data => data.Value1)
                    .Input(step => step.Input2, data => 100)
                    .Output(data => data.Answer, step => step.Output)
                    .Then()
                    .Input(step => step.Input1, data => data.Value1)
                    .Input(step => step.Input2, data => data.Answer)
                    .Output(data => data.Answer, step => step.Output);
            }
        }
        //工作流的属性类
        public class MyDataClass
        {
            public int Value1 { get; set; }
            public int Value2 { get; set; }
            public int Answer { get; set; }
        }
        //执行工作流传入参数
        MyDataClass myDataClass = new MyDataClass();
        myDataClass.Value1 = 100;
        myDataClass.Value2 = 200;
        //不传入myDataClass则每次执行都是新的数据对象
        _workflowHost.StartWorkflow("HelloWorld", myDataClass);
    复制代码

    从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值),可以用Input方法传入,Output方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在StartWorkflow方法中不传myDataClass,运行结果是100100,否则是200300

    三、外部事件

    工作流可以使用WaitFor方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:

    复制代码
    public class MyWorkflow :IWorkflow
        {
            //省略。。。。
            public void Build(IWorkflowBuilder builder)
            {
                builder
                    .StartWith()
                    .Input(step => step.Input1,data => data.Value1)
                    .Input(step => step.Input2, data => 100)
                    .Output(data => data.Answer, step => step.Output)
                    .WaitFor("MyEvent",key => "EventKey")
                    .Output(data => data.Answer,step => step.EventData)
                    .Then()
                    .Input(step => step.Input1, data => data.Value1)
                    .Input(step => step.Input2, data => data.Answer)
                    .Output(data => data.Answer, step => step.Output);
            }
        }
        //。。。
        [HttpGet(Name = "get")]
        public ContentResult Get()
        {
            MyDataClass myDataClass = new MyDataClass();
            myDataClass.Value1 = 100;
            myDataClass.Value2 = 200;
            _workflowHost.StartWorkflow("HelloWorld", myDataClass);
                return Content("ok");
            }
      [HttpPost(Name = "event")]
      public ContentResult PublishEvent()
      {
        _workflowHost.PublishEvent("MyEvent", "EventKey", 200);
        return Content("ok");
      }
    复制代码

     使用WaitFor方法可以使工作流等待监听指定事件的执行,有两个入参事件名称事件关键字。通过工作流主机去触发PublishEvent执行指定的事件,有三个入参触发事件名称触发事件关键字和事件参数

     需要执行事件,工作流才会继续下一步,如下动图演示:

     

     

      可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。

    WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))

    四、活动

    活动被定义为在工作流中可以被等待的外部工作队列中的步骤。

    在本例中,工作流将等待活动activity-1,直到活动完成才继续工作流。它还将data.Value1的值传递给活动,然后将活动的结果映射到data.Value2

    然后我们创建一个worker来处理活动项的队列。它使用GetPendingActivity方法来获取工作流正在等待的活动和数据。

    复制代码
        //.....
        builder
        .StartWith()
        .Input(step => step.Input1,data => data.Value1)
        .Input(step => step.Input2, data => 100)
        .Output(data => data.Answer, step => step.Output)
        .Activity("activity-1", (data) => data.Value1)
        .Output(data => data.Value2, step => step.Result)
        .Then()
        .Input(step => step.Input1, data => data.Value1)
        .Input(step => step.Input2, data => data.Answer)
        .Output(data => data.Answer, step => step.Output);
        //....
        [HttpPost(Name = "active")]
       public ContentResult PublishEvent()
       {
        var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
        if (activity != null)
        {
          Console.WriteLine(activity.Parameters);
          _workflowHost.SubmitActivitySuccess(activity.Token, 100);
        }
        return Content("ok");
       }
    复制代码

    活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。

    五、错误处理

    每个步骤都可以配置自己的错误处理行为,可以在以后重试、挂起工作流或终止工作流。

    复制代码
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder                
                .StartWith()
                    .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10))
                .Then();
        }
    复制代码

    六、流程控制

    工作流的流程控制包括分支、循环等各种操作

    决策分支

    在工作流中定义多个独立分支,并根据表达式值选择满足条件的分支执行。

    使用IWorkflowBuilderCreateBranch方法定义分支。然后我们可以使用branch方法选择一个分支。

    选择表达式将与通过branch方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。

    如果data.Value1的值为1,则此工作流将选择branch1,如果为2,则选择branch2

    复制代码
      var branch1 = builder.CreateBranch()
        .StartWith()
            .Input(step => step.Message, data => "hi from 1")
        .Then()
            .Input(step => step.Message, data => "bye from 1");
    
      var branch2 = builder.CreateBranch()
        .StartWith()
            .Input(step => step.Message, data => "hi from 2")
        .Then()
            .Input(step => step.Message, data => "bye from 2");
      builder
        .StartWith()
        .Decide(data => data.Value1)
            .Branch((data, outcome) => data.Value1 == "one", branch1)
            .Branch((data, outcome) => data.Value1 == "two", branch2);
    复制代码

    并行ForEach

    使用ForEach方法启动并行for循环

    复制代码
      public class ForEachWorkflow : IWorkflow
      {
          public string Id => "Foreach";
          public int Version => 1;
          public void Build(IWorkflowBuilder<object> builder)
          {
              builder
                  .StartWith()
                  .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                      .Do(x => x
                          .StartWith()
                              .Input(step => step.Message, (data, context) => context.Item)
                          .Then())
                  .Then();
          }        
      }
    复制代码

    While循环

    使用While方法启动while循环

    复制代码
      public class WhileWorkflow : IWorkflow
      {
          public string Id => "While";
          public int Version => 1;
          public void Build(IWorkflowBuilder builder)
          {
              builder
                  .StartWith()
                  .While(data => data.Counter < 3)
                      .Do(x => x
                          .StartWith()
                          .Then()
                              .Input(step => step.Value1, data => data.Counter)
                              .Output(data => data.Counter, step => step.Value2))
                  .Then();
          }        
      }
    复制代码

    If判断

    使用If方法执行if判断

    复制代码
      public class IfWorkflow : IWorkflow
      { 
          public void Build(IWorkflowBuilder builder)
          {
              builder
                  .StartWith()
                  .If(data => data.Counter < 3).Do(then => then
                      .StartWith()
                          .Input(step => step.Message, data => "Value is less than 3")
                  )
                  .If(data => data.Counter < 5).Do(then => then
                      .StartWith()
                          .Input(step => step.Message, data => "Value is less than 5")
                  )
                  .Then();
          }        
      }
    复制代码

    并行

    使用Parallel方法并行执行任务

    复制代码
      public class ParallelWorkflow : IWorkflow
      {
          public string Id => "parallel-sample";
          public int Version => 1;
          public void Build(IWorkflowBuilder builder)
          {
              builder
                  .StartWith()
                  .Parallel()
                      .Do(then => 
                          then.StartWith()
                              .Then()
                      .Do(then =>
                          then.StartWith()
                              .Then()
                  .Join()
                  .Then();
        }        
    }
    复制代码

    Schedule

    使用Schedule方法在工作流中注册在指定时间后执行的异步方法

    复制代码
    builder
        .StartWith(context => Console.WriteLine("Hello"))
        .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
            .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
        )
        .Then(context => Console.WriteLine("Doing normal tasks"));
    复制代码

    Recur

    使用Recure方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止

    复制代码
    builder
        .StartWith(context => Console.WriteLine("Hello"))
        .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
            .StartWith(context => Console.WriteLine("Doing recurring task"))
        )
        .Then(context => Console.WriteLine("Carry on"));
    复制代码

    七、Saga transaction 

    saga允许在saga transaction中封装一系列步骤,并为每一个步骤提供补偿步骤,使用CompensateWith方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。

    如下示例,步骤Task2如果抛出一个异常,那么补偿步骤UndoTask2UndoTask1将被触发。

    复制代码
    builder
        .StartWith(context => Console.WriteLine("Begin"))
        .Saga(saga => saga
            .StartWith()
                .CompensateWith()
            .Then()
                .CompensateWith()
            .Then()
                .CompensateWith()
        )
            .CompensateWith()
        .Then(context => Console.WriteLine("End"));
    复制代码

    也可以指定重试策略,在指定时间间隔后重试。

    复制代码
    builder
        .StartWith(context => Console.WriteLine("Begin"))
        .Saga(saga => saga
            .StartWith()
                .CompensateWith()
            .Then()
                .CompensateWith()
            .Then()
                .CompensateWith()
        )
        .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
        .Then(context => Console.WriteLine("End"));
    复制代码

    八、持久化

    可以使用RedisMongdbSqlserver等持久化,具体可以看文档,此处使用Redis,先安装nuget

    Install-Package WorkflowCore.Providers.Redis

    然后注入就可以了

    复制代码
    builder.Services.AddWorkflow(cfg =>
    {
        cfg.UseRedisPersistence("localhost:6379", "app-name");
        cfg.UseRedisLocking("localhost:6379");
        cfg.UseRedisQueues("localhost:6379", "app-name");
        cfg.UseRedisEventHub("localhost:6379", "channel-name");
        //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow");
        //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");
    });
    复制代码

    运行打开可以看到

     

  • 相关阅读:
    CTFHub Git泄露
    Keil C51宏及宏函数的应用
    【微机接口】中断系统:中断的应用
    策略模式(Strategy)
    机器学习【线性回归算法1】
    分库分表真实案例,扩容10倍容量
    Object Storage 东西虽小作用很大
    什么是S参数?它有哪些主要类型?
    react 自定义日历 手把手教你
    微服务使用SockJs+Stomp实现Websocket 前后端实例 | Vuex形式断开重连、跨域等等问题踩坑(一)
  • 原文地址:https://www.cnblogs.com/xwc1996/p/17306568.html