• 高并发编程:并发容器


    一、概述

     

    常见的容器如下图,我们会挑选高并发中常用的容器进行介绍。

    二、ConcurrentHashMap

    个ConcurrentHashMap提高效率主要提高在读上面,由于它往里插的时候内部又做了各种各样的判断,本来是链表的,到8之后又变成了红黑树,然后里面又做了各种各样的cas的判断,所以他往里插的数据是要更低一些的。HashMap和Hashtable虽然说读的效率会稍微低一些,但是它往里插的时候检查的东西特别的少,就加个锁然后往里一插。所以,关于效率,还是看你实际当中的需求。用几个简单的小程序来给大家列举了这几个不同的区别。

    1. public class T04_TestConcurrentHashMap {
    2. static Map m = new ConcurrentHashMap<>();
    3. static int count = Constants.COUNT;
    4. static UUID[] keys = new UUID[count];
    5. static UUID[] values = new UUID[count];
    6. static final int THREAD_COUNT = Constants.THREAD_COUNT;
    7. static {
    8. for (int i = 0; i < count; i++) {
    9. keys[i] = UUID.randomUUID();
    10. values[i] = UUID.randomUUID();
    11. }
    12. }
    13. static class MyThread extends Thread {
    14. int start;
    15. int gap = count/THREAD_COUNT;
    16. public MyThread(int start) {
    17. this.start = start;
    18. }
    19. @Override
    20. public void run() {
    21. for(int i=start; i
    22. m.put(keys[i], values[i]);
    23. }
    24. }
    25. }
    26. public static void main(String[] args) {
    27. long start = System.currentTimeMillis();
    28. Thread[] threads = new Thread[THREAD_COUNT];
    29. for(int i=0; i
    30. threads[i] =
    31. new MyThread(i * (count/THREAD_COUNT));
    32. }
    33. for(Thread t : threads) {
    34. t.start();
    35. }
    36. for(Thread t : threads) {
    37. try {
    38. t.join();
    39. } catch (InterruptedException e) {
    40. e.printStackTrace();
    41. }
    42. }
    43. long end = System.currentTimeMillis();
    44. System.out.println(end - start);
    45. System.out.println(m.size());
    46. //-----------------------------------
    47. start = System.currentTimeMillis();
    48. for (int i = 0; i < threads.length; i++) {
    49. threads[i] = new Thread(()->{
    50. for (int j = 0; j < 10000000; j++) {
    51. m.get(keys[10]);
    52. }
    53. });
    54. }
    55. for(Thread t : threads) {
    56. t.start();
    57. }
    58. for(Thread t : threads) {
    59. try {
    60. t.join();
    61. } catch (InterruptedException e) {
    62. e.printStackTrace();
    63. }
    64. }
    65. end = System.currentTimeMillis();
    66. System.out.println(end - start);
    67. }
    68. }

    三、BlockingQueue

    BlockingQueue,是我们后面讲线程池需要用到的这方面的内容,是给线程池来做准备的。BlockingQueue的概念重点是在Blocking上,Blocking阻塞,Queue队列,是阻塞队列。他提供了一系列的方法,我们可以在这些方法的基础之上做到让线程实现自动的阻塞。

    我们现在聊的就是这个Queue里面所提供的一些可以给多线程比较友好的接口。他提供了一些什么接口呢,第一个就是offer对应的是原来的那个add,提供了poll取数据,然后提供了peek拿出来这个数据。那么这个是什么意思呢,我们读一下这个offer的概念,offer是往里头添加,加进去没加进去它会给你一个布尔类型的返回值,和原来的add是什么区别呢,add如果加不进去了是会抛异常的。所以一般的情况下我们用的最多的Queue里面都用offer,它会给你一个返回值,peek的概念是去取并不是让你remove掉,poll是取并且remove掉,而且这几个对于BlockingQueue来说也确实是线程安全的一个操作。对于Queue经常用的接口就这么几个,大家了解就可以。

    1. public class T04_ConcurrentQueue {
    2. public static void main(String[] args) {
    3. Queue strs = new ConcurrentLinkedQueue<>();
    4. for(int i=0; i<10; i++) {
    5. strs.offer("a" + i); //add
    6. }
    7. System.out.println(strs);
    8. System.out.println(strs.size());
    9. System.out.println(strs.poll());
    10. System.out.println(strs.size());
    11. System.out.println(strs.peek());
    12. System.out.println(strs.size());
    13. //双端队列Deque
    14. }
    15. }

    四、BlockingQueue

    LinkedBlockingQueue,体现Concurrent的这个点在哪里呢,我们来看这个LinkedBlockingQueue,用链表实现的BlockingQueue,是一个无界队列。就是它可以一直装到你内存满了为止,一直添加。

    来看一下这个小程序,这么一些线程,第一个线程是我往里头加内容,加put。BlockingQueue在Queue的基础上又添加了两个方法,这两个方法一个叫put,一个叫take。这两个方法是真真正正的实现了阻塞。put往里装如果满了的话我这个线程会阻塞住,take往外取如果空了的话线程会阻塞住。所以这个BlockingQueue就实现了生产者消费者里面的那个容器。这个小程序是往里面装了100个字符串,a开头i结尾,每装一个的时候睡1秒钟。然后,后面又启动了5个线程不断的从里面take,空了我就等着,什么时候新加了我就马上给它取出来。这是BlockingQueue和Queue的一个基本的概念。

    1. public class T05_LinkedBlockingQueue {
    2. static BlockingQueue strs = new LinkedBlockingQueue<>();
    3. static Random r = new Random();
    4. public static void main(String[] args) {
    5. new Thread(() -> {
    6. for (int i = 0; i < 100; i++) {
    7. try {
    8. strs.put("a" + i); //如果满了,就会等待
    9. TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
    10. } catch (InterruptedException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. }, "p1").start();
    15. for (int i = 0; i < 5; i++) {
    16. new Thread(() -> {
    17. for (;;) {
    18. try {
    19. System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. }
    24. }, "c" + i).start();
    25. }
    26. }
    27. }

    五、ArrayBlockingQueue

    ArrayBlockingQueue是有界的,你可以指定它一个固定的值10,它容器就是10,那么当你往里面扔容器的时候,一旦他满了这个put方法就会阻塞住。然后你可以看看用add方法满了之后他会报异常。offer用返回值来判断到底加没加成功,offer还有另外一个写法你可以指定一个时间尝试着往里面加1秒钟,1秒钟之后如果加不进去它就返回了。

    回到那个面试经常被问到的问题,Queue和List的区别到底在哪里,主要就在这里,添加了offer、peek、poll、put、take这些个对线程友好的或者阻塞,或者等待方法。

    1. public class T06_ArrayBlockingQueue {
    2. static BlockingQueue strs = new ArrayBlockingQueue<>(10);
    3. static Random r = new Random();
    4. public static void main(String[] args) throws InterruptedException {
    5. for (int i = 0; i < 10; i++) {
    6. strs.put("a" + i);
    7. }
    8. //strs.put("aaa"); //满了就会等待,程序阻塞
    9. //strs.add("aaa");
    10. //strs.offer("aaa");
    11. strs.offer("aaa", 1, TimeUnit.SECONDS);
    12. System.out.println(strs);
    13. }
    14. }

    六、DelayQueue

    DelayQueue可以实现在时间上的排序,这个DelayQueue能实现按照在里面等待的时间来进行排序。这里我们new了一个DelayQueue,他是BlockingQueue的一种也是用于阻塞的队列,这个阻塞队列装任务的时候要求你必须实现Delayed接口,Delayed往后拖延推迟,Delayed需要做一个比较compareTo,最后这个队列的实现,这个时间等待越短的就会有优先的得到运行,所以你需要做一个比较 ,这里面他就有一个排序了,这个排序是按时间来排的,所以去做好,哪个时间返回什么样的值,不同的内容比较的时候可以按照时间来排序。总而言之,你要实现Comparable接口重写 compareTo方法来确定你这个任务之间是怎么排序的。getDelay去拿到你Delay多长时间了。往里头装任务的时候首先拿到当前时间,在当前时间的基础之上指定在多长时间之后这个任务要运行,添加顺序参看代码,但是当我们去拿的时候,一般的队列是先加那个先往外拿那个,先进先出。这个队列是不一样的,按时间进行排序(按紧迫程度进行排序)。DelayQueue就是按照时间进行任务调度。

    1. public class T07_DelayQueue {
    2. static BlockingQueue tasks = new DelayQueue<>();
    3. static Random r = new Random();
    4. static class MyTask implements Delayed {
    5. String name;
    6. long runningTime;
    7. MyTask(String name, long rt) {
    8. this.name = name;
    9. this.runningTime = rt;
    10. }
    11. @Override
    12. public int compareTo(Delayed o) {
    13. if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
    14. return -1;
    15. else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
    16. return 1;
    17. else
    18. return 0;
    19. }
    20. @Override
    21. public long getDelay(TimeUnit unit) {
    22. return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    23. }
    24. @Override
    25. public String toString() {
    26. return name + " " + runningTime;
    27. }
    28. }
    29. public static void main(String[] args) throws InterruptedException {
    30. long now = System.currentTimeMillis();
    31. MyTask t1 = new MyTask("t1", now + 1000);
    32. MyTask t2 = new MyTask("t2", now + 2000);
    33. MyTask t3 = new MyTask("t3", now + 1500);
    34. MyTask t4 = new MyTask("t4", now + 2500);
    35. MyTask t5 = new MyTask("t5", now + 500);
    36. tasks.put(t1);
    37. tasks.put(t2);
    38. tasks.put(t3);
    39. tasks.put(t4);
    40. tasks.put(t5);
    41. System.out.println(tasks);
    42. for(int i=0; i<5; i++) {
    43. System.out.println(tasks.take());
    44. }
    45. }
    46. }

    DelayQueue本质上用的是一个PriorityQueue,PriorityQueue是从AbstractQueue继承的。PriorityQueue特点是它内部你往里装的时候并不是按顺序往里装的,而是内部进行了一个排序。按照优先级,最小的优先。它内部实现的结构是一个二叉树,这个二叉树可以认为是堆排序里面的那个最小堆值排在最上面。

    1. public class T07_01_PriorityQueque {
    2. public static void main(String[] args) {
    3. PriorityQueue q = new PriorityQueue<>();
    4. q.add("c");
    5. q.add("e");
    6. q.add("a");
    7. q.add("d");
    8. q.add("z");
    9. for (int i = 0; i < 5; i++) {
    10. System.out.println(q.poll());
    11. }
    12. }
    13. }

    七、SynchronousQueue

    SynchronousQueue容量为0,就是这个东西它不是用来装内容的,SynchronousQueue是专门用来两个线程之间传内容的,给线程下达任务的,之前讲过一个容器叫Exchanger,本质上这个容器的概念是一样的。看下面代码,有一个线程起来等着take,里面没有值一定是take不到的,然后就等着。然后当put的时候能取出来,take到了之后能打印出来,最后打印这个容器的size一定是0,打印出aaa来这个没问题。那当把线程注释掉,在运行一下程序就会在这阻塞,永远等着。如果add方法直接就报错,原因是满了,这个容器为0,你不可以往里面扔东西。这个Queue和其他的很重要的区别就是你不能往里头装东西,只能用来阻塞式的put调用,要求是前面得有人等着拿这个东西的时候你才可以往里装,但容量为0,其实说白了就是我要递到另外一个的手里才可以。这个SynchronousQueue看似没有用,其实不然,SynchronousQueue在线程池里用处特别大,很多的线程取任务,互相之间进行任务的一个调度的时候用的都是它。

    1. public class T08_SynchronusQueue { //容量为0
    2. public static void main(String[] args) throws InterruptedException {
    3. BlockingQueue strs = new SynchronousQueue<>();
    4. new Thread(()->{
    5. try {
    6. System.out.println(strs.take());
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. }).start();
    11. strs.put("aaa"); //阻塞等待消费者消费
    12. //strs.put("bbb");
    13. //strs.add("aaa");
    14. System.out.println(strs.size());
    15. }
    16. }

    八、TransferQueue

    TransferQueue传递,实际上是前面这各种各样Queue的一个组合,它可以给线程来传递任务,以此同时不像是SynchronousQueue只能传递一个,TransferQueue做成列表可以传好多个。比较牛X的是它添加了一个方法叫transfer,如果我们用put就相当于一个线程来了往里一装它就走了。transfer就是装完在这等着,阻塞等有人把它取走我这个线程才回去干我自己的事情。一般使用场景:是我做了一件事情,我这个事情要求有一个结果,有了这个结果之后我可以继续进行我下面的这个事情的时候,比方说我付了钱,这个订单我付账完成了,但是我一直要等这个付账的结果完成才可以给客户反馈。

    1. public class T09_TransferQueue {
    2. public static void main(String[] args) throws InterruptedException {
    3. LinkedTransferQueue strs = new LinkedTransferQueue<>();
    4. new Thread(() -> {
    5. try {
    6. System.out.println(strs.take());
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. }).start();
    11. strs.transfer("aaa");
    12. //strs.put("aaa");
    13. /*new Thread(() -> {
    14. try {
    15. System.out.println(strs.take());
    16. } catch (InterruptedException e) {
    17. e.printStackTrace();
    18. }
    19. }).start();*/
    20. }
    21. }

  • 相关阅读:
    【C#语言】WinForm窗体
    电力感知边缘计算网关产品设计方案-电力采集
    实践篇1:深度学习之----LetNet之tensorflow2的实现
    搜维尔科技提供电影和动画的动作捕捉解决方案
    百度集团:AI重构,走到哪了?
    Conditional GAN
    力控关节机器人(关节扭矩传感器力控)
    如何实现点击消息并刷新成消息置顶?
    在Linux系统安装Kafka
    6种常见分布式唯一ID生成策略及它们的优缺点对比
  • 原文地址:https://blog.csdn.net/weixin_55229531/article/details/131084680