引言
又是两个月没有写博客了,也有一个月没有玩单片机做手工学习了;前几天在某个群里看到,有个群友说自己用了个内存队列用来保存某个task的数据,然后在某一处又使用死循环来判断内存队列的数据是否大于0,针对这个问题,才引发了这一边博客,哈哈,之前看到过有些人碰到这种场景是开线程使用死循环来进行数据传输处理。其实针对这个问题,while并不算是一个很好的解决方案,具体的还得结合场景去进行判断如何找到最优的解决方案,在本篇博客,我会罗列出我所已知和这个议题相关的几种方案,以及写了的部分代码。
Channel
ChannelManager
在net core3.0的时候,上新了一个类库,System.Threading.Channels,里面提供了可以用户进程内部传输数据进行通讯的通道Channel泛型类,里面提供了供数据提供方写入数据的ChannelWriter以及从通道读取数据的ChannelReader,当我们数据提供方,需要将数据传入到接收方的时候,就需要让提供方获取到ChannelWriter的变量,接收方获取到ChannelReader的变量,实现通道的进程内的数据流动;当我们需要往一个集合里面写入数据,并且,其他地方不停的循环等待写入数据,这个场景,则可以使用Channel来实现,类似于发布订阅。
我们设计一个ChannelManager用来给数据的接收方和发送方,提供Reader以及Writer,然后使用一个标识,用来区分是属于哪一个业务,或者发布订阅中的Topic,同时约定好数据流动的格式约束,定义一个SendMsgModel的类,主题标识用来获取指定的Channel,Message则为发送方发送的数据。在Manager里,我们可以指定创建有无约束的通道,可以看到,如果我们是先发布,则发布时会首先定义Channel,并且将之放入线程安全的字典里,对应主题和通道,反之在订阅方获取Reader的时候,如果存在,则使用原先定义好的通道的Reader。从而实现的数据进程内的流动
public class SendMsgModel { public string Topic { get; set; } public string Message { get; set; } }
public class ChannelManager { private readonly ConcurrentDictionary<string, Channel> Channels; public ChannelManager() { Channels=new ConcurrentDictionary<string, Channel >(); } public ChannelReader ? CreateReader(string Topic,int Range=0) { Channel channel; if (Channels.TryGetValue(Topic, out channel)) { return channel.Reader; } else { var channels = Range == 0 ? Channel.CreateUnbounded () : Channel.CreateBounded (Range); if (Channels.TryAdd(Topic, channels)) { return channels.Reader; } } return null; } public ChannelWriter ? CreateWriter(string Topic, int Range=0) { Channel channel; if (Channels.TryGetValue(Topic,out channel)) { return channel.Writer; } else { var channels = Range==0? Channel.CreateUnbounded (): Channel.CreateBounded (Range); if (Channels.TryAdd(Topic, channels)) { return channels.Writer; } } return null; } }
IPublish
发布方的代码很简单,从ChannelManager里面获取我们所需要的Writer,然后写入我们的数据即可,
public interface IPublish { public ValueTask<bool> PublishAsync(SendMsgModel sendMsgModel); } public class Publisher : IPublish { public Publisher(ChannelManager channelManager) { ChannelManager = channelManager; } public ChannelManager ChannelManager { get; } public ValueTask<bool> PublishAsync(SendMsgModel sendMsgModel) { var writer = ChannelManager.CreateWriter(sendMsgModel.Topic); if (writer is not null) { writer.WriteAsync(sendMsgModel); return writer.WaitToWriteAsync(); } return new ValueTask<bool>(false); } }
ISubScribe
订阅方的代码,当调用了SubScribe方法之后,会去Manager里面获取Reader,如果发布之后在订阅,此处则会把订阅 之前的数据也会读出来,如果需要控制,则可以在发布处或者Manager处做处理,可以自行扩展。
public interface ISubScribe { void SubScribe(string Topic,ActionSendMsgModel); } public class Subscribe : ISubScribe { public Subscribe(ChannelManager channelManager) { ChannelManager = channelManager; } public ChannelManager ChannelManager { get; } public void SubScribe(string Topic, Action SendMsgModel) { var reader = ChannelManager.CreateReader(Topic); if (reader is not null) { _ = Test(reader, SendMsgModel); } } private async Task Test(ChannelReader channelReader, Action SendMsgModel) { var res=await channelReader.ReadAsync(); SendMsgModel(res); _ = Test(channelReader, SendMsgModel); } }
app.MapGet("/publish",async s => { var topic = s.Request.Query["Topic"]; var publist=s.RequestServices.GetService(); await publist.PublishAsync(new SendMsgModel { Topic = topic, Message = Guid.NewGuid().ToString() }); using (var writer=new StreamWriter(s.Response.Body)) await writer.WriteLineAsync("Ok"); }); app.MapGet("/subscribe", async s => { var topic = s.Request.Query["Topic"]; var subScribe = s.RequestServices.GetService (); subScribe.SubScribe(topic, s => { Console.WriteLine(s.Message+":"+s.Topic); }); using (var writer = new StreamWriter(s.Response.Body)) await writer.WriteLineAsync("Ok"); }); app.MapGet("/subscribe1", async s => { var topic = s.Request.Query["Topic"]; var subScribe = s.RequestServices.GetService (); subScribe.SubScribe(topic, s => { Console.WriteLine(s.Message + ":" + s.Topic); }); using (var writer = new StreamWriter(s.Response.Body)) await writer.WriteLineAsync("Ok"); });
从例子可以看到我们定义了三个接口一个发布,两个订阅用来实现不同的主题的订阅和发布。
DataFlow
在net core之后,提供了一个用于进程内数据流动传输以及构建业务管道数据处理的一个库,System.Threading.Tasks.Dataflow
里面包含了可以用来发送数据在接收数据后调用自定义回调的
ActionBlock类,此类单管道下(即只有一个ActionBlock,不包含其他Block)可以在构造函数传入一个委托,该委托用来在发送方发送数据后做数据处理或者其他操作,在调用该类的Post(同步)或者扩展方法SendAsync(异步)之后,回进入到构造函数传入的委托,当调用了Block里的Complete方法之后,对象则不能写入数据,即调用了Post或者SendAsync之后再去读取数据依旧是Complete之前最后一次写入的数据;
当然构建管道仅仅是一个ActionBlock是不够的,里面还有批量数据处理的BatchBlock,多类型批量处理的BatchedJoinBlock,BatchBlock只支持单类型,而BatchedJoinBlock支持两个类型以上的数据处理,可以想一下,我们在批量更新数据库数据的时候,需要获取到每一个更新的状态以及异常的时候,就可以使用此类来进行数据传输的处理,此类的构造函数中需要传入一个int类型的参数,用来做每次管道传入的数据的总和,即我们的数据有四百个,构造函数传入数据的批量总和为100,BatchedJoinBlock我们用了两个类型,我们在SendAsync或者Post之后,再去调用Receive方法获取到的数据T类型和T1类型获取到的IList
同时也有每次获取数据仅获取写入的最新的数据的BroadcastBlock,即调用了上百次的Post传输数据,获取的时候总是最新的一个数据,还有队列先进先出的BufferBlock,JoinBlock和BatchedJoinBlock不一样的是,一个是单数据多类型传输,一个是多数据多类型传输,以及只写能写一次的WriteOnceBlock和有输入输出的传输管道TransFormBlock TransFormManyBlock。更多的信息可以参考官网
DataFlowManager
在DataFlowManager中,同样用线程安全的集合保存我们订阅的主题以及发送和接收的Block。
public class DataFlowManager { private readonly ConcurrentDictionary<string, BroadcastBlock> Channels;//主题和管道列表 public DataFlowManager() { Channels=new ConcurrentDictionary<string, BroadcastBlock >(); } public BroadcastBlock GetActionBlock(String Topic) { //WriteOnceBlock 仅使用发送方发送一次数据,后续读取都是第一次发送之后的数据 //batchblock,如果订阅之前,发布方发布了多条数据,且订阅之前的数据需要处理,可以使用batchblock用来发送数据ActionBlock接收数据,其中ActionBlock泛型为泛型数组 //bufferblock 先进先出队列,即订阅之前 ,发布方先发布数据后,订阅收到的数据顺序是最先发布的, BroadcastBlock block = null;//用BroadcastBlock原因是只取最新发布的数据,考虑是如果先发布,但是订阅方还没有订阅,发布方一直发布,使用其他传输块在接收的时候会把之前未订阅之前的数据也会接收到,有需要自己修改 if (Channels.TryGetValue(Topic,out block)) { return block; } else { block =new BroadcastBlock (null); Channels.TryAdd(Topic, block); return block; } } }
IDataFlow
DataFlow定义了发布数据的方法,我们获取到了数据传输管道之后,开始去写入数据,在没有订阅之前写入数据也是不影响,因为使用了仅获取最新数据的BroadcastBlock,
public interface IDataFlow { public Task<bool> PublishAsync(SendMsgModel sendMsgModel); } public class DataFlow : IDataFlow { public DataFlow(DataFlowManager dataFlowManager) { DataFlowManager = dataFlowManager; } public DataFlowManager DataFlowManager { get; } public async Task<bool> PublishAsync(SendMsgModel sendMsgModel) { var writer = DataFlowManager.GetActionBlock(sendMsgModel.Topic);//获取此主题的传输管道 if (writer is not null) { return await writer.SendAsync(sendMsgModel);//写入数据 } return await Task.FromResult(false); } }
ISubscriber
接收方的代码:从ChannelManager获取到管道,然后创建一个ActionBlock的对象,将订阅方的委托传入进去之后,使用获取到的管道进行链接,从而在发布方调用Post或者SendAsync传输数据的时候,我们的ActionBlock也可以获取到数据然后传入到我们的回调SendMsgModel委托中去
public interface ISubscriber { void SubScribe(string Topic, ActionSendMsgModel); } public class Subscriber : ISubscriber { public Subscriber(DataFlowManager channelManager) { ChannelManager = channelManager; } public DataFlowManager ChannelManager { get; } public async void SubScribe(string Topic, Action SendMsgModel) { var reader = ChannelManager.GetActionBlock(Topic); if (reader != null) { var action = new ActionBlock (SendMsgModel); reader.LinkTo(action);//管道连接然后等待管道接收数据调用回调reader. await reader.Completion.ContinueWith(s => action.Complete()); } } }
例子代码参考Channel的代码,只需要改一下注入即可
其他方案
在回调通知,数据传输等场景,还可以使用观察者模式,自己手写发布订阅模式,或者回到最初的议题,我们创建一个包装类,用来存放我们的集合,在Add或者Remove的时候,定义一个委托回调,用来通知使用方来做一些业务处理,可以参考Wpf的双向绑定,一方更改调用属性更改事件进行通知,也可以使用信号量来进行此操作,集合在写入一批数据或者单个数据之后,发出信号,使其他代码块收到信号后,去读取集合数据然后去进行操作也是一种方案,总之实现此类场景的方案还是有很多种,具体可以根据自己的场景去进行选择,或者封装,包装等。
结尾
我想送给众多同行一句话,有道无术,术尚可求,有术无道,止于术也。意思就是有解决问题的方法,但是不知道这个方法从何而来,思路是怎么来的,你也只能解决当下的问题;相反,你有解决问题的思路,就有学习,提升自己的方向,不仅仅能解决当下的问题。用一句我昨天说的话:世人求得外而不习内,得术而不解其道,故不得其逍遥,不解其难。虽尽术解其当下难,而道化无穷尽难。注重思维的发展,提升内在的观察力,学习力,才可以更持久,是持久。
韵不万通,术难以成。
术不精进,韵难以生。
代码已上传至:http://121.43.235.192:8082/s/k7mmtn3BTrEnjFK 有需要可以下载,
如有不足,望请指出,需要探讨的话可以加群,此群是之前net7交流群之后的群,net7已被封了,这个是新群,或者看所在群有叫四川观察的,可以艾特看是不是我。谢谢各位看官,下次再见咯