• 【并发编程十:CompletableFuture的应用】


    上一篇
    【并发编程九:线程安全问题分析及锁的介绍(2)synchronized】

    一、CompletableFuture

    CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

    • 1、supplyAsync方法:供给一个对象
    • 2、thenAccept方法:然后同步消费掉这个对象
    • 3、runAfterBoth方法:和执行另一个消费方法,
    •  执行完毕后都去执行一个Runnable
      
      • 1
    • 4、whenComplete方法:执行结束后的消费方法
    •  调用此方法的线程会阻塞到有返回值为止。
      
      • 1
    • 5、allOf方法:
    • 6、anyOf方法:
    /**
     * CompletableFuture的应用
     * 1、supplyAsync方法:供给一个对象
     * 2、thenAccept方法:然后同步消费掉这个对象
     * 3、runAfterBoth方法:和执行另一个消费方法,
     * 		执行完毕后都去执行一个Runnable
     * 4、whenComplete方法:执行结束后的消费方法
     * 		调用此方法的线程会阻塞到有返回值为止。
     * ---------------本类到此为止---------------------------
     * 5、allOf方法:
     * 6、anyOf方法:
     * @author Peter
     */
    public class CompletableFutureTest02 {
    	
    	/**
    	 * 消耗3秒的方法
    	 * @param str
    	 */
    	private  static void oneStrPrintln(String str){
    		String threadName = Thread.currentThread().getName() + ":";
    		System.out.println(threadName+"开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(3_000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println(threadName+"消费字符串:"+str+"完毕!");
    	}
    	
    	
    	/**
    	 * 消耗1秒的方法
    	 * @param str
    	 */
    	private  static void twoStrPrintln(String str){
    		System.out.println("开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(1_000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println("消费字符串:"+str+"完毕!");
    	}
    
    	public static void main(String[] args) throws Exception {
    		ExecutorService threadPool = Executors.newCachedThreadPool();
    		//runAfterBoth、runAfterBothAsync
    		CompletableFuture.supplyAsync(() ->"我是第一个被供给的字符串!")
    			.thenAccept(CompletableFutureTest02::oneStrPrintln)
    			.runAfterEither(
    				CompletableFuture.supplyAsync(() -> 
    					Thread.currentThread().getName() + ":我是第二个被供给的字符串!")
    				.thenAccept(CompletableFutureTest02::twoStrPrintln), 
    				() ->{
    				System.out.println(Thread.currentThread().getName() +
    						":这句话是在runAfterBoth方法的"
    						+ "第一个参数逻辑执行完之后输出的");
    			}/*,threadPool*/)/*.whenComplete((result,throwable) ->{
    				System.out.println("返回值是:"+result);
    				System.out.println("抛出的东西是:"+throwable);
    			})*/;
    		
    		//来一个死循环等待的特殊写法
    		Thread.currentThread().join();
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    二、CompletableFuture的应用

    
    /**
     * CompletableFuture的应用
     *  1、whenCompleteAsync方法:
     * 		调用此方法的返回的Future的get方法的线程不会阻塞,
     * 		而是直接返回结果。
     * whenComplete 和 whenCompleteAsync 的区别:
     * whenComplete:
     * 	是执行当前任务的线程执行继续执行 whenComplete 的任务。
     * whenCompleteAsync:
     * 	是执行把 whenCompleteAsync 这个任务继续提交给线程池来进行执行。
     * 
     * 2、thenApply方法:处理之后再转换成其它类型的一个对象不处理异常
     * 3、handle方法:处理之后再转换成其它类型的一个对象会处理异常
     * 4、exceptionally方法:处理中出现异常后对其进行处理,入参是
     * 		一个异常的父类
     * @author Peter
     */
    public class CompletableFutureTest04 {
    	
    	/**
    	 * 消耗3秒的方法
    	 * @param str
    	 */
    	private  static void oneStrPrintln(String str){
    		System.out.println("开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(3_000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println("消费字符串:"+str+"完毕!");
    	}
    	
    	/**
    	 * 消耗1秒的方法
    	 * @param str
    	 */
    	private  static void twoStrPrintln(String str){
    		System.out.println("开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(1_000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println("消费字符串:"+str+"完毕!");
    	}
    	
    	
    	/**
    	 * 消耗1秒的供给方法
    	 * @param str
    	 */
    	private  static String oneStrSupply(){
    		String threadName = Thread.currentThread().getName() + ":";
    		System.out.println(threadName+"开始供给……");
    		try {
    			Thread.sleep(1_000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		String str = "我是一个花费1秒才能供给的字符串";
    		System.out.println(threadName+"供给字符串:"+str+"----完毕!");
    		return str;
    	}
    	
    	/**
    	 * 消耗5秒的供给方法
    	 * @param str
    	 */
    	private  static String twoStrSupply(){
    		System.out.println("开始供给……");
    		try {
    			Thread.sleep(5_000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		String str = "我是一个花费5秒才能供给的字符串";
    		System.out.println("供给字符串:"+str+"-----完毕!");
    		return str;
    	}
    
    	public static void main(String[] args) throws Exception {
    		
    		ExecutorService threadPool = Executors.newCachedThreadPool();
    		
    		CompletableFuture<String> tmpFuture 
    		
    				= CompletableFuture.supplyAsync(CompletableFutureTest04::oneStrSupply,threadPool);
    		
    		//whenCompleteAsync、whenComplete
    		tmpFuture.whenCompleteAsync((result,throwable) ->{
    			String threadName = Thread.currentThread().getName() + ":";
    				//假设这里需要3秒
    				try {
    					Thread.sleep(3_000);
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    				System.out.println(threadName+"返回值是:"+result);
    				System.out.println(threadName+"抛出的东西是:"+throwable);
    			});
    		
    		
    //		System.out.println("最终get到的结果是:"+tmpFuture.get());
    		System.out.println("输出这句话表示没有被阻塞……");
    		
    		
    		
    //		CompletableFuture<Integer> thenApplyFuture = 
    //					CompletableFuture.supplyAsync(CompletableFutureTest04::oneStrSupply)
    //					.thenApply(String::length);
    //		
    //		System.out.println("最终get到的结果是:"+thenApplyFuture.get());
    		
    		
    //		CompletableFuture<Integer> thenApplyFuture = 
    //				CompletableFuture.supplyAsync(CompletableFutureTest04::oneStrSupply)
    //				.handle((result,throwable) ->result.length());
    //		
    //		
    //		
    //		
    //	
    //	System.out.println("最终get到的结果是:"+thenApplyFuture.get());
    		
    
    		
    		//来一个死循环等待的特殊写法
    		Thread.currentThread().join();
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    /**
     * CompletableFuture的应用
     * 1、whenCompleteAsync方法:调用此方法的线程不会阻塞,
     * 		而是直接返回结果。
     * 2、thenApply方法:处理之后再转换成其它类型的一个对象不处理异常
     * 3、handle方法:处理之后再转换成其它类型的一个对象会处理异常
     * 4、exceptionally方法:处理中出现异常后对其进行处理,入参是
     * 		一个异常的父类
     * @author Peter
     */
    public class CompletableFutureTest05 {
    	
    	/**
    	 * 消耗3秒的消费方法
    	 * @param str
    	 */
    	private  static void oneStrPrintln(String str){
    		System.out.println("开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(3_000);
    			throw new RuntimeException("xixixixi");
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println("消费字符串:"+str+"完毕!");
    	}
    	
    	/**
    	 * 消耗1秒的消费方法
    	 * @param str
    	 */
    	private  static void twoStrPrintln(String str){
    		System.out.println("开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(1_000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println("消费字符串:"+str+"完毕!");
    	}
    	
    	
    	/**
    	 * 消耗1秒的供给方法
    	 * @param str
    	 */
    	private  static String oneStrSupply(){
    		System.out.println("开始供给……");
    		try {
    			Thread.sleep(1_000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		String str = "我是一个花费1秒才能供给的字符串";
    		System.out.println("供给字符串:"+str+"----完毕!");
    		return str;
    	}
    	
    	/**
    	 * 消耗3秒的供给方法
    	 * @param str
    	 */
    	private  static String twoStrSupply(){
    		System.out.println("开始供给……");
    		try {
    			Thread.sleep(3_000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		String str = "我是一个花费3秒才能供给的字符串";
    		System.out.println("供给字符串:"+str+"-----完毕!");
    		return str;
    	}
    	
    	/**
    	 * 立即供给的方法
    	 * @param str
    	 */
    	private  static String nowStrSupply(){
    		System.out.println("开始供给……");
    		
    		String str = "我是一个立即供给的字符串";
    	
    		System.out.println("供给字符串:"+str+"-----完毕!");
    		return str;
    	}
    	
    
    	public static void main(String[] args) throws Exception {
    		
    //		//先立即提供一个字符串
    //		CompletableFuture.supplyAsync(CompletableFutureTest05::nowStrSupply)
    //			//同步调用消费方法
    			.thenAccept(CompletableFutureTest05::oneStrPrintln);
    //			//异步调用消费方法
    //			.thenAcceptAsync(CompletableFutureTest05::oneStrPrintln);
    		
    		
    //		System.out.println("主线程在调用方法后的打印!");
    		
    		
    		//先立即提供一个字符串
    		CompletableFuture<String> supplyAsyncFuture 
    				= CompletableFuture.supplyAsync(CompletableFutureTest05::nowStrSupply);
    
    		
    		//同步地调用需要消耗3秒的消费方法消费掉这个字符串
    		supplyAsyncFuture.thenAccept(CompletableFutureTest05::oneStrPrintln)
    		.exceptionally(throwable -> {
    			System.out.println("得到的异常信息是:"+throwable.getMessage());
    			return null;
    		});
    		
    		
    		
    		//来一个死循环等待的特殊写法
    		Thread.currentThread().join();
    		
    		
    		
    		
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126

    三、CompletableFuture的应用

    
    /**
     * CompletableFuture的应用
     * 1、thenAcceptBoth方法:组合,将两个逻辑组合执行,并行执行。
     * 2、acceptEither方法:提供两个逻辑供给(供给对象类型必须相同),
     * 		 然后取其中执行快的去消费。
     * 	总结:acceptXXX都会去接收上一步执行逻辑的返回值
     * 3、runAfterBoth方法:组合,将两个逻辑组合执行,并行执行,
     * 		两个逻辑都执行完毕再执行第二个参数的逻辑(Runable)但是不接收上一步结果。
     * 4、runAfterEither方法:任意一个执行完了就开始执行第二个参数的逻辑(Runable)
     * 		但是不接收上一步结果。
     * 5、thenCombine同thenAcceptBoth方法,
     * 		不同的是可以对两个逻辑的结果还可以进行一个处理逻辑。
     * 6、thenCompose上一个逻辑的出参作为当前逻辑的入参,
     * 		然后再去执行一个消费逻辑。
     * @author Peter
     */
    public class CompletableFutureTest06 {
    	
    	/**
    	 * 消耗3秒的方法
    	 * @param str
    	 */
    	private  static void oneStrPrintln(String str){
    		System.out.println("开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(3_000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println("消费字符串:"+str+"完毕!");
    	}
    	
    	/**
    	 * 消耗1秒的方法
    	 * @param str
    	 */
    	private  static void twoStrPrintln(String str){
    		System.out.println("开始消费字符串:"+str+"……");
    		try {
    			Thread.sleep(1_000);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    		System.out.println("消费字符串:"+str+"完毕!");
    	}
    	
    	
    	/**
    	 * 消耗1秒的供给方法
    	 * @param str
    	 */
    	private  static String oneStrSupply(){
    		System.out.println("开始供给……");
    		try {
    			Thread.sleep(1_000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		String str = "我是一个花费1秒才能供给的字符串";
    		System.out.println("供给字符串:"+str+"----完毕!");
    		return str;
    	}
    	
    	/**
    	 * 消耗5秒的供给方法
    	 * @param str
    	 */
    	private  static String twoStrSupply(){
    		System.out.println("开始供给……");
    		try {
    			Thread.sleep(5_000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		String str = "我是一个花费5秒才能供给的字符串";
    		System.out.println("供给字符串:"+str+"-----完毕!");
    		return str;
    	}
    
    	public static void main(String[] args) throws Exception {
    		
    //		CompletableFuture.supplyAsync(null).thenCompose(fn)
    
    		
    		
    		//来一个死循环等待的特殊写法
    		Thread.currentThread().join();
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
  • 相关阅读:
    基于粒子群算法优化的lssvm回归预测-附代码
    VUE前端工程化-2
    实现简单BS架构案例
    45 深度学习(九):transformer
    虚树 (模板)
    Soft-NMS – Improving Object Detection With One Line of Code
    融云IM(即时通讯服务)
    使用BiSeNet实现自己的数据集
    windows下C++管道通信
    MYSQL--JDBC优化
  • 原文地址:https://blog.csdn.net/weixin_43333483/article/details/125457535