• 3.控制结构


    控制结构

    分支

    可以在工作流中定义多个分支,更根据表达式的值选择执行哪个分支。

    使用CreateBranch()方法定义分支

    //创建两个分支
    var branch1 = builder.CreateBranch()
        .StartWith<PrintMessage>()
            .Input(step => step.Message, data => "hi from 1")
        .Then<PrintMessage>()
            .Input(step => step.Message, data => "bye from 1");
    
    var branch2 = builder.CreateBranch()
        .StartWith<PrintMessage>()
            .Input(step => step.Message, data => "hi from 2")
        .Then<PrintMessage>()
            .Input(step => step.Message, data => "bye from 2");
    
    //如果data.value1的值为one,则执行branch1
    //如果data.value1的值为two,则执行branch2
    builder
        .StartWith<HelloWorld>()
        .Decide(data => data.Value1)
            .Branch((data, outcome) => data.Value1 == "one", branch1)
            .Branch((data, outcome) => data.Value1 == "two", branch2);
    

    foreach

    public class ForEachWorkflow : IWorkflow
    {
        public string Id => "Foreach";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith<SayHello>()
                .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                    .Do(x => x
                        .StartWith<DisplayContext>()
                            .Input(step => step.Message, (data, context) => context.Item)
                        .Then<DoSomething>())
                .Then<SayGoodbye>();
        }        
    }
    

    while

    public class WhileWorkflow : IWorkflow<MyData>
    {
        public string Id => "While";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith<SayHello>()
                .While(data => data.Counter < 3)
                    .Do(x => x
                        .StartWith<DoSomething>()
                        .Then<IncrementStep>()
                            .Input(step => step.Value1, data => data.Counter)
                            .Output(data => data.Counter, step => step.Value2))
                .Then<SayGoodbye>();
        }        
    }
    

    if

    public class IfWorkflow : IWorkflow<MyData>
    { 
        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith<SayHello>()
                .If(data => data.Counter < 3).Do(then => then
                    .StartWith<PrintMessage>()
                        .Input(step => step.Message, data => "Value is less than 3")
                )
                .If(data => data.Counter < 5).Do(then => then
                    .StartWith<PrintMessage>()
                        .Input(step => step.Message, data => "Value is less than 5")
                )
                .Then<SayGoodbye>();
        }        
    }
    

    并行

    public class ParallelWorkflow : IWorkflow<MyData>
    {
        public string Id => "parallel-sample";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith<SayHello>()
                .Parallel()
                    .Do(then => 
                        then.StartWith<Task1dot1>()
                            .Then<Task1dot2>()
                    .Do(then =>
                        then.StartWith<Task2dot1>()
                            .Then<Task2dot2>()
                    .Do(then =>
                        then.StartWith<Task3dot1>()
                            .Then<Task3dot2>()
                .Join()
                .Then<SayGoodbye>();
        }        
    }
    

    按计划执行

    在未来按照计划执行,使得工作流在后台异步执行

    builder
        .StartWith(context => Console.WriteLine("Hello"))
        .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
            .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
        )
        .Then(context => Console.WriteLine("Doing normal tasks"));
    

    延迟执行

    延迟工作流的当前分支一段时间

    builder
        .StartWith(context => Console.WriteLine("Hello"))
        .Delay(p => TimeSpan.FromSeconds(5) );
    

    重复

    设置一组重复的后台步骤,知道满足特定条件

    builder
        .StartWith(context => Console.WriteLine("Hello"))
        .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
            .StartWith(context => Console.WriteLine("Doing recurring task"))
        )
        .Then(context => Console.WriteLine("Carry on"));
    

    案例 Parallel ForEach

    • steps
    public class SayHello : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Hello");
            return ExecutionResult.Next();
        }
    }
    public class DisplayContext : StepBody
    {        
    
        public object Item { get; set; }
    
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine($"Working on item {Item}");
            return ExecutionResult.Next();
        }
    }
    public class DoSomething : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Doing something...");
            return ExecutionResult.Next();
        }
    }
    public class SayGoodbye : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Goodbye");
            return ExecutionResult.Next();
        }
    }
    
    • workflow
    public class ForEachWorkflow : IWorkflow
    {
        public string Id => "Foreach";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith<SayHello>()
                .ForEach(data => new List<int> { 1, 2, 3, 4 })
                    .Do(x => x
                        .StartWith<DisplayContext>()
                            .Input(step => step.Item, (data, context) => context.Item)
                        .Then<DoSomething>())
                .Then<SayGoodbye>();
        }        
    }    
    
    • program
    IServiceCollection services = new ServiceCollection();
    services.AddLogging();
    services.AddWorkflow();
    var serviceProvider = services.BuildServiceProvider();
    var host = serviceProvider.GetService<IWorkflowHost>();
    host.RegisterWorkflow<ForEachWorkflow>();
    host.Start();
    Console.WriteLine("Starting workflow...");
    string workflowId = host.StartWorkflow("Foreach").Result;
    Console.ReadLine();
    host.Stop();
    

    在这里插入图片描述

    案例If

    • steps
    public class PrintMessage : StepBody
    {
        public string Message { get; set; }
    
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine(Message);
            return ExecutionResult.Next();
        }
    }
    public class SayGoodbye : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Goodbye");
            return ExecutionResult.Next();
        }
    }
    public class SayHello : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Hello");
            return ExecutionResult.Next();
        }
    }
    
    • workflow
    public class MyData
    {
        public int Counter { get; set; }
    }
    public class IfWorkflow : IWorkflow<MyData>
    {
        public string Id => "if-sample";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith<SayHello>()
                .If(data => data.Counter < 3).Do(then => then
                    .StartWith<PrintMessage>()
                        .Input(step => step.Message, data => "Value is less than 3")
                )
                .If(data => data.Counter < 5).Do(then => then
                    .StartWith<PrintMessage>()
                        .Input(step => step.Message, data => "Value is less than 5")
                )
                .Then<SayGoodbye>();
        }        
    }
    
    • program
    IServiceCollection services = new ServiceCollection();
    services.AddLogging();
    services.AddWorkflow();
    var serviceProvider = services.BuildServiceProvider();
    var host = serviceProvider.GetService<IWorkflowHost>();
    host.RegisterWorkflow<IfWorkflow, MyData>();
    host.Start();
    
    Console.WriteLine("Starting workflow...");
    string workflowId = host.StartWorkflow("if-sample", new MyData { Counter = 4 }).Result;
    
    Console.ReadLine();
    host.Stop();
    
    

    在这里插入图片描述

    案例 Parallel Tasks

    • steps
    public class PrintMessage : StepBody
    {
        public string Message { get; set; }
    
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("当前线程:"+ Thread.GetCurrentProcessorId().ToString()+ "message:"+Message);
            return ExecutionResult.Next();
        }
    }
    public class SayHello : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Hello");
            return ExecutionResult.Next();
        }
    }
    public class SayGoodbye : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Goodbye");
            return ExecutionResult.Next();
        }
    }
    
    • workflow
    public class MyData
    {
        public int Counter { get; set; }
    }
    public class ParallelWorkflow : IWorkflow<MyData>
    {
        public string Id => "parallel-sample";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith<SayHello>()
                .Parallel()
                    .Do(then => 
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 1.2"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.2")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 2.3"))
                    .Do(then =>
                        then.StartWith<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.1")
                            .Then<PrintMessage>()
                                .Input(step => step.Message, data => "Item 3.2"))
                .Join()
                .Then<SayGoodbye>();
        }        
    }
    
    • program
    IServiceCollection services = new ServiceCollection();
    services.AddLogging();
    services.AddWorkflow();
    var serviceProvider = services.BuildServiceProvider();
    var host = serviceProvider.GetService<IWorkflowHost>();
    var controller = serviceProvider.GetService<IWorkflowController>();
    controller.RegisterWorkflow<ParallelWorkflow, MyData>();//使用容器进行注册
    
    Console.WriteLine("Starting workflow...");
    controller.StartWorkflow<MyData>("parallel-sample");//使用容器开始
    
    Console.ReadLine();
    host.Stop();
    

    在这里插入图片描述

    案例Schedule

    • workflow
    class ScheduleWorkflow : IWorkflow
    {
        public string Id => "schedule-sample";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith(context => Console.WriteLine("Hello"))
                .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
                    .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
                )
                .Then(context => { Console.WriteLine("Doing normal tasks"); Console.WriteLine("等待5s"); });
        }
    }
    

    在这里插入图片描述

    案例Recur

    public class MyData
    {
        public int Counter { get; set; }
    }
    class RecurSampleWorkflow : IWorkflow<MyData>
    {
        public string Id => "recur-sample";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<MyData> builder)
        {
            builder
                .StartWith(context => Console.WriteLine("Hello"))
                //5s一次,直到data.Counter > 5
                .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
                    .StartWith(context => Console.WriteLine("Doing recurring task"))
                )
                .Then(context => Console.WriteLine("Carry on"));
        }
    }
    

    在这里插入图片描述

    案例Branch

    • steps
    public class PrintMessage : StepBody
    {
        public string Message { get; set; }
    
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine(Message);
            return ExecutionResult.Next();
        }
    }
    public class SayHello : StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Hello");
            return ExecutionResult.Next();
        }
    }
    
    • workflow
    public class MyData
    {
        public int Value { get; set; }
    }
    
    public class OutcomeWorkflow : IWorkflow<MyData>
    {
        public string Id => "outcome-sample";
        public int Version => 1;
    
        public void Build(IWorkflowBuilder<MyData> builder)
        {
            var branch1 = builder.CreateBranch()
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "hi from 1")
                .Then<PrintMessage>()
                    .Input(step => step.Message, data => "bye from 1");
    
            var branch2 = builder.CreateBranch()
                .StartWith<PrintMessage>()
                    .Input(step => step.Message, data => "hi from 2")
                .Then<PrintMessage>()
                    .Input(step => step.Message, data => "bye from 2");
    //如果data.value1的值为one,则执行branch1
    //如果data.value1的值为two,则执行branch2
            builder
                .StartWith<SayHello>()
                .Decide(data => data.Value)
                    .Branch(1, branch1)
                    .Branch(2, branch2);
        }        
    }
    
    • program
    IServiceCollection services = new ServiceCollection();
    services.AddLogging();
    services.AddWorkflow();
    var serviceProvider = services.BuildServiceProvider();
    var host = serviceProvider.GetService<IWorkflowHost>();
    host.RegisterWorkflow<OutcomeWorkflow, MyData>();
    host.Start();
    
    Console.WriteLine("Starting workflow...");
    host.StartWorkflow("outcome-sample", new MyData { Value = 2 });
    
    
    Console.ReadLine();
    host.Stop();
    
    

    在这里插入图片描述

  • 相关阅读:
    腾讯mini项目-【指标监控服务重构】2023-08-17
    从零学习 InfiniBand-network架构(六)—IB协议链路层QoS如何实现
    一位工作七年的Java工程师给毕业生的经验分享
    Mac上好用的翻译软件推荐 兼容m
    雪崩问题以及sentinel的使用
    Pytorch-YOLOv4梳理——原理和复现
    docker安装open web ui
    Kafka管理平台LogiKM调研
    代码随想录训练营 DP序列
    Java8 新特性之Stream(九)-- Stream的sorted()详细用法
  • 原文地址:https://blog.csdn.net/weixin_44064908/article/details/127071575