哈哈,主要是为了练习一下rust的语法,不喜勿喷。
一.Executor申明
- struct AExecutor
{ - results:Arc
u32,T>>>, //1 - functions:Arc
Vec>>> //2 - }
1.results:用来存放结果,其中u32是一个future的id,T表示存放的数据
2.functions: 存放所有submit的任务
二.Task申明
- struct ATask
{ - func:Box<dyn Fn()-> T + 'static + Send>, //1
- cond:Arc
,//2 - id:u32//3
- }
1.func:任务对应的lambda函数
2.cond:用来唤醒客户端(调用future.get等待结果的客户端)
3.id:future对应的id
三.Future申明
- struct AFuture
{ - id:u32, //1
- map:Arc
u32,T>>>,//2 - cond:Arc
//3 - }
1.id:这个future的id,通过这个id可以在map中找到结果
2.map:存放结果的map
3.cond:用来等待计算完成的condition,和Task结构体中的cond是同一个,这样task完成后就能唤醒等待了。
四.Future实现
- impl
AFuture { - fn get(&self)-> T {
- loop {
- let mut map = self.map.lock().unwrap();
- let value = map.remove(&self.id);
- match value {
- Some(v) => {return v;}
- None=> {
- self.cond.wait(map);
- }
- }
- }
- }
- }
只有一个get函数,如果有数据,就返回该数据,否则就等待唤醒
五.Executor提交任务
- fn submit(&self,func:Box<dyn Fn()-> T + 'static + Send>)->AFuture
{ - let cond = Arc::new(Condvar::new()); //1
- let id:u32 = 1;
- let task = ATask{
- func:func,
- cond:cond.clone(),
- id:id.clone()
- };
-
- {
- let mut ll = self.functions.lock().unwrap();
- ll.push(task); //2
- }
-
- AFuture {
- id:id.clone(),
- map:self.results.clone(),
- cond:cond.clone(),
- }//3
- }
1.创建一个Condition,用智能指针控制,这样计算和等待的线程都能使用。这个Condition会同时存放到task/future中。
2.将task存放到任务队列,计算线程就是从这个队列中获取任务的。
3.返回future,cond就是步骤1创建的cond
六:Executor执行任务
- fn work(&self) {
- loop {
- println!("work start");
- thread::sleep(Duration::from_secs(10)); //1
- let mut ll: std::sync::MutexGuard<'_, Vec
>> = self.functions.lock().unwrap(); - println!("len is {}",ll.len());
- if ll.len() == 0 {
- continue;
- }
-
- let task = ll.pop().unwrap(); //2
- drop(ll);//3
-
- let result = (task.func)();
- {
- let mut results = self.results.lock().unwrap();
- results.insert(task.id,result);//4
- }
- task.cond.notify_all();//5
- }
- }
1.这里用来管理任务的队列没有做成blocking的,所以暂时用一个sleep代替
2.从任务队列里面取出task
3.释放ll(lockguard),这样锁就释放了
4.将结果存放到结果的表里
5.唤醒等待线程
运行结果:
