• 浅谈并发容器


    Hashtable
    HashTable实现了Map接口,所有的方法都被synchronized修饰,自带锁线程安全,工作中我基本没用过,了解即可。
    以下小程序,验证了Hashtable是线程安全的,定义一个常量类,两个属性分别是10000个数量和100个线程数,启动100个线程往Hashtable中写10000个键值对,验证多线程环境下插入后的结果。

    package com.my.controller;
    
    public class Constants {
        public static final int COUNT = 10000;
        public static final int THREAD_COUNT = 100;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com.my.controller;
    
    import java.util.Hashtable;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    public class TestHashTable {
        static Hashtable m = new Hashtable<>();
    
        static int count = Constants.COUNT;
        static UUID[] keys = new UUID[count];
        static UUID[] values = new UUID[count];
        static final int THREAD_COUNT = Constants.THREAD_COUNT;
    
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
    
        static class MyThread extends Thread {
            int start;
            int gap = count/THREAD_COUNT;
    
            public MyThread(int start) {
                this.start = start;
            }
    
            @Override
            public void run() {
                for(int i=start; i{
                    for (int j = 0; j < 10000000; j++) {
                        m.get(keys[10]);
                    }
                });
            }
    
            for(Thread t : threads) {
                t.start();
            }
    
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102

    在这里插入图片描述

    Hashmap
    上面的小程序如果使用的是Hashmap,那么毫无疑问,最后容器中的元素数量是小于10000的,因为Hashmap线程不安全,多线程同时操作下,势必会有问题。
    在这里插入图片描述

    SynchronizedHashMap
    这个容器可以理解为是Hashmap加锁的版本,它的源码自己做了一个Object作为锁对象,然后每次都是Synchronized(Object),严格来讲他和那个Hashtable效率上区别不大,锁的粒度要比Hashtable要小一些,因为Hashtable对外提供的方法都是被synchronized修饰,而synchronizedHashmap提供的方法内部使用synchronized(obj),相比之下,锁的粒度较前者小

    package com.my.controller;
    
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.UUID;
    
    public class TestSynchronizedHashMap {
        static Map m = Collections.synchronizedMap(new HashMap());
    
        static int count = Constants.COUNT;
        static UUID[] keys = new UUID[count];
        static UUID[] values = new UUID[count];
        static final int THREAD_COUNT = Constants.THREAD_COUNT;
    
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
    
        static class MyThread extends Thread {
            int start;
            int gap = count/THREAD_COUNT;
    
            public MyThread(int start) {
                this.start = start;
            }
    
            @Override
            public void run() {
                for(int i=start; i{
                    for (int j = 0; j < 10000000; j++) {
                        m.get(keys[10]);
                    }
                });
            }
    
            for(Thread t : threads) {
                t.start();
            }
    
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    在这里插入图片描述
    ConcurrentHashMap
    ConcurrentHashMap是用hash表实现的这样一个高并发容器,是多线程里面真正用的,以后我们多线程用的基本就是它。这个ConcurrentHashMap提高效率主要提高在读上面,由于它往里插的时候内部又做了各种各样的判断,本来是链表的,到8之后又变成了红黑树,然后里面又做了各种各样的cas的判断,所以他往里插的数据是要更低一些的。HashMap和Hashtable虽然说读的效率会稍微低一些,但是它往里插的时候检查的东西特别的少,就加个锁然后往里一插。所以,关于效率,还是看你实际当中的需求,理论上没有绝对一说。

    package com.my.controller;
    
    import java.util.Map;
    import java.util.UUID;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class TestConcurrentHashMap {
        static Map m = new ConcurrentHashMap<>();
    
        static int count = Constants.COUNT;
        static UUID[] keys = new UUID[count];
        static UUID[] values = new UUID[count];
        static final int THREAD_COUNT = Constants.THREAD_COUNT;
    
        static {
            for (int i = 0; i < count; i++) {
                keys[i] = UUID.randomUUID();
                values[i] = UUID.randomUUID();
            }
        }
    
        static class MyThread extends Thread {
            int start;
            int gap = count/THREAD_COUNT;
    
            public MyThread(int start) {
                this.start = start;
            }
    
            @Override
            public void run() {
                for(int i=start; i{
                    for (int j = 0; j < 10000000; j++) {
                        m.get(keys[10]);
                    }
                });
            }
    
            for(Thread t : threads) {
                t.start();
            }
    
            for(Thread t : threads) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    在这里插入图片描述
    ArrayList
    我们在认识一下Vector到Queue的发展历程,下面有这样一个小程序叫TicketSeller给票做销售的这么一个小程序,写法比较简单,我们先来用一个List把这些票全装进去,往里面装一万张票,然后10个线程也就是10个窗口对外销售,只要size大于零,只要还有剩余的票时我就往外卖,取一张往外卖remove。大家想象一下到最后一张票的时候,好几个线程执行到这里所以线程都发现了size大于零,所有线程都往外买了一张票,那么会发生什么情形,只有一个线程拿到了这张票,其他的拿到的都是空值,就是超卖的现象。没有加锁,线程不安全。

    package com.my.controller;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 有N张火车票,每张票都有一个编号
     * 同时有10个窗口对外售票
     * 请写一个模拟程序
     *
     * 分析下面的程序可能会产生哪些问题?
     * 重复销售?超量销售?
     *
     *
     * @author Young
     */
    public class TicketSeller1 {
        static List list = new ArrayList<>();
        static {
            for (int i = 0; i < 10000; i++) {
                list.add("编号:"+i);
            }
        }
    
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    while(list.size() > 0){
                        System.out.println(Thread.currentThread().getName()+"销售了----"+list.remove(0));
                    }
                }).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在这里插入图片描述
    Vector
    我们来看最早的这个容器Vector,内部是自带锁的,你去读它的时候就会看到很多方法synchronized二话不说先加上锁在说,所以你用Vector的时候请放心它一定是线程安全的。100张票,10个窗口,读这个程序还是有问题的,还是不对。锁为了线程的安全,就是当我们调用size方法的时候它加锁了,调用remove的时候它也加锁了,可是很不幸的是在你这两步中间它没有加锁,也就是说没有保证这两步之间是原子操作,那么,好多个线程还会判断依然这个size还是大于0的,大家伙又超卖了

     package com.my.controller;
    
    import java.util.Vector;
    import java.util.concurrent.TimeUnit;
    
    
    public class TicketSeller2 {
        static Vector tickets = new Vector<>();
    //    static Vector tickets1 = new Vector<>();
    
    
        static {
            for(int i=0; i<1000; i++) tickets.add("票编号" + i);
        }
    
        public static void main(String[] args) {
    //        vector里边没有元素的时候,调用remove()会报错Array index out of range: 0
    //        System.out.println("-------"+tickets1.remove(0));
    
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(tickets.size() > 0) {
    
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
    
                        System.out.println("销售了--" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    在这里插入图片描述
    LinkedList
    虽然你用了这个加锁的容器了,由于在你调用这个并发容器的时候,你是调用了其中的两个原子方法,所以你在外层还得在加一把锁synchronized(tickets),继续判断size,售出去不断的remove,这个就没有问题了,它会踏踏实实的往外销售,但不是效率最高的方案

    package com.my.controller;
    
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller3 {
        static List tickets = new LinkedList<>();
    //    static List tickets = new ArrayList<>();
    //    static Vector tickets = new Vector<>();
    
        static {
            for(int i=0; i<1000; i++) tickets.add("票编号" + i);
        }
    
        public static void main(String[] args) {
    
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    //一个线程拿到锁后,其他线程要不断的尝试拿锁
                    while(true) {
                        synchronized(tickets) {
                            if(tickets.size() <= 0) break;
    
                            try {
                                TimeUnit.MILLISECONDS.sleep(10);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
    
                            System.out.println(Thread.currentThread().getName()+"销售了--" + tickets.remove(0));
                        }
                    }
                }).start();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    Queue
    效率最高的就是这个Queue,这是最新的一个接口,他的主要目标就是为了高并发用的,就是为了多线程用的。所以,以后考虑多线程这种单个元素的时候多考虑Queue。看程序前面初始化不说了,这个使用的是ConcurrentLinkedQueue,然后里面并没有说加锁,我就直接调用了一个方法叫poll,poll的意思就是我从tickets去取值,这个值什么时候取空了就说明里面的值已经没了,所以这个while(true)不断的往外销售,一直到他突然发现伸手去取票的时候这里面没了,那我这个窗口就可以关了不用卖票了。poll的意思取一下去得到我们这个queue上的头元素,得到并且去除掉这里面这个值,如果这个已经是空我就返回null值。

    package com.my.controller;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class TicketSeller4 {
        static Queue tickets = new ConcurrentLinkedQueue<>();
        
    
        static {
            for (int i = 0; i < 10; i++) tickets.add("票编号" + i);
        }
    
        public static void main(String[] args) {
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    while (true) {
                        String s = tickets.poll();
                        if (s == null) {
                            break;
                        } else {
                            System.out.println(Thread.currentThread().getName() + "销售了--" + s);
                        }
                    }
                }).start();
    
            }
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    所以刚才讲的这八个小程序,主要是呈现容器演化的过程,从Map这个角度来讲最早是从Hashtable,二话不说先加锁到HashMap去除掉锁,再到synchronizedHashMap加一个带锁的版本,到ConcurrentHashMap多线程专用;Vector自带锁到多线程环境下友好的Queue

    多线程环境下经常使用的map

    1. ConcurrentHashMap,用hash表实现的这样一个高并发容器。
    2. ConcurrentSkipListMap,通过跳表来实现的高并发容器并且这个Map是有排序的.跳表是什么样的结构呢?底层本身存储的元素一个链表,它是排好顺序的,大家知道当一个链表排好顺序的时候往里插入是特别困难的,查找的时候也特别麻烦,因为你得从头去遍历查找这个元素到底在哪里,所以就出现了这个跳表的结构,底层是一个链表,链表查找的时候比较困难怎么办,那么我们在这些链表的基础上在拿出一些关键元素来,在上面做一层,那这个关键元素的这一层也是一个链表,那这个数量特别大的话在这个基础之上在拿一层出来再做一个链表,每层链表的数据越来越少,而且它是分层,在我们查找的时候从顶层往下开始查找,所以呢,查找容易了很多,同时它无锁的实现难度比TreeMap又容易很多,因此在JUC里面提供了ConcurrentSkipListMap这个类。
    3. 他们两个的区别一个是有序的一个是无序的,同时都支持并发的操作。
    4. 以下小程序是对多线程环境下,多种线程安全map写效率的对比
    package com.my.controller;
    
    import org.apache.commons.lang3.time.StopWatch;
    
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.CountDownLatch;
    import java.util.stream.Collectors;
    
    /**
     * 对比多种线程安全的map类型的容器效率
     */
    public class TestConcurrentMap {
        public static void main(String[] args) {
            StopWatch stopWatch = new StopWatch();
    //        ConcurrentHashMap map = new ConcurrentHashMap<>();// 6728ms
    //        Map map = new ConcurrentSkipListMap<>(); //高并发并且排序  7252ms
    //        Map map = Collections.synchronizedMap(new HashMap<>());//带锁Hashmap  6582ms
            Hashtable map = new Hashtable<>(); // 7570ms
            Thread[] ths = new Thread[100];
            CountDownLatch latch = new CountDownLatch(ths.length);
            stopWatch.start();
            for(int i=0; i{
                    for(int j=0; j<10000; j++){
                        System.out.println(Thread.currentThread().getName()+ j);
                        //此处要加"-"分隔一下,否则会导致key重复,造成线程不安全的假象
                        map.put(Thread.currentThread().getName() +"-"+ j, Thread.currentThread().getName()+ j);
                    }
                    latch.countDown();
                });
            }
            Arrays.asList(ths).forEach(t->t.start());
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(stopWatch.getTime());
            System.out.println(map.size());
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    CopyOnWrite
    再来说一个在并发的时候经常使用的一个类,这个类叫CopyOnWrite。CopyOnWriteList、CopyOnWriteSet有两个。CopyOnWrite的意思叫写时复制。我们看这个小程序,用了一个容器,这个容器是List,一个一个元素往里装,每100个线程往里面装1000个字符串,各种各样的实现,可以用ArrayList、Vector,但是ArrayList会出并发问题,因为多线程访问没有锁,可以用CopyOnWriteArrayList。这个CopyOnWrite解释一下,你通过这个名字进行分析一下,当Write的时候我们要进行复制,写时复制,写的时候进行复制。这个原理非常简单,当我们需要往里面加元素的时候你把里面的元素得复制出来。在很多情况下,写的时候特别少,读的时候很多。在这个时候就可以考虑CopyOnWrite这种方式来提高效率,CopyOnWrite为什么会提高效率呢,是因为我写的时候不加锁,大家知道Vector写的时候加锁,读的时候也加锁。那么用CopyOnWriteList的时候我读的时候不加锁,写的时候会在原来的基础上拷贝一个,拷贝的时候扩展出一个新元素来,然后把你新添加的这个元素扔到最后这个位置上,于此同时把指向老的容器的一个引用指向新的,这个写法就是写时复制。我这里只是写了一个写线程,没有模拟读线程,这个写时复制,写的效率比较低,因为每次写都要复制。在读比较多写比较少的情况下使用CopyOnWrite

    package com.my.controller;
    
    import java.util.*;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    /**
     * 使用copyOnWriteList,写的效率会低一些,读的效率很高
     * 适用于读操作比较多,写操作比较少的场景
     *
     * vector和synchronizedList的读写效率差别不大,但是二者写的效率都略高于copyOnWriteList(经过下面小程序实测)
     */
    public class TestCopyOnWriteList {
        public static void main(String[] args) {
            List lists =
    //                new ArrayList<>(); //这个会出并发问题!
    //                new Vector();
                        new CopyOnWriteArrayList<>();
    //                  Collections.synchronizedList(list);
            Random r = new Random();
            Thread[] ths = new Thread[100];
    
            for(int i=0; i{
                long start1 = System.currentTimeMillis();
                for (int i = 0; i < 1000000000; i++) {
                    lists.get(1000);
                }
                long end1 = System.currentTimeMillis();
                System.out.println("读取消耗时间为:"+ (end1 - start1));
            }).start();
    
        }
    
        static void runAndComputeTime(Thread[] ths) {
            long s1 = System.currentTimeMillis();
            Arrays.asList(ths).forEach(t->t.start());
            Arrays.asList(ths).forEach(t->{
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long s2 = System.currentTimeMillis();
            System.out.println(s2 - s1);
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    BlockingQueue
    BlockingQueue阻塞队列,是给线程池做准备的,他提供了一系列的方法,我们可以在这些方法的基础之上做到让线程实现自动的阻塞。对于Queue经常用的接口就这么几个,以下小程序中列举到

    package com.my.controller;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    /**
     * blockingQueue中常用的方法
     * offer()/add()、poll()/peek()
     */
    public class TestConcurrentQueue {
        public static void main(String[] args) {
            Queue strs = new ConcurrentLinkedQueue<>();
    
            for (int i = 0; i < 10; i++) {
                // offer添加元素到队列,伴有返回值,添加成功true,否则false
                boolean offer = strs.offer("a" + i);
                // add添加元素到队列,如果添加失败会抛异常出来
                // strs.add("a" + i);
            }
    
            System.out.println(strs);
    
            System.out.println(strs.size());
            // 取出元素并在队列中删除
            System.out.println(strs.poll());
            System.out.println(strs.size());
            // 去除元素队列中不删除
            System.out.println(strs.peek());
            System.out.println(strs.size());
    
            //双端队列Deque 自行了解
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    LinkedBlockingQueue
    用链表实现的BlockingQueue,是一个无界队列(Integer.MAX_VALUE)。BlockingQueue在Queue的基础上又添加了两个方法,这两个方法一个叫put,一个叫take。这两个方法是真真正正的实现了阻塞。put往队列里装如果满了的话我这个线程会阻塞住,take往外取如果空了的话线程会阻塞住。所以这个BlockingQueue就实现了生产者消费者里面的那个容器。
    看以下小程序,这个小程序是往队列里面装了100个字符串,a开头i结尾,每装一个的时候睡1秒钟。然后,后面又启动了5个线程不断的从里面take,空了我就等着,什么时候新加了我就马上给它取出来。

    package com.my.controller;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class TestLinkedBlockingQueue {
    // 队列初始化的时候如果给定容量,就固定容量;如果为给定,默认容量Integer.MAX_VALUE.
    // 	static BlockingQueue strs = new LinkedBlockingQueue<>();
        static BlockingQueue strs = new LinkedBlockingQueue<>(50);
    
        static Random r = new Random();
    
        public static void main(String[] args) {
            new Thread(() -> {
                for (int i = 0; i < 100; i++) {
                    try {
    //                    使用add方法,如果队列满的情况下,继续add是会报错的java.lang.IllegalStateException: Queue full
    //                    strs.add("a" + i);
                        //使用put()方法如果队列满了,就会等待
                        strs.put("a" + i);
                        System.out.println(i);
    //                    boolean offer = strs.offer("a" + i);
    //                    boolean offer = strs.offer("a" + i,5,TimeUnit.MILLISECONDS);
    //                    System.out.println(offer);
    //                    TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "p1").start();
    
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    for (; ; ) {
                        try {
    //                        strs.poll();
    //                        strs.peek();
                            System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, "c" + i).start();
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    ArrayBlockingQueue
    ArrayBlockingQueue是有界的,必须指定容量,你可以指定它一个固定的值11,它容量就是11,那么当你往里面扔元素的时候,一旦他满了这个put方法就会阻塞住。然后你可以看看用add方法满了之后他会报异常。offer用返回值来判断到底加没加成功,offer还有另外一个写法你可以指定一个时间尝试着往里面加1秒钟,1秒钟之后如果加不进去它就返回了。

    回到那个面试经常被问到的问题,Queue和List的区别到底在哪里,主要就在这里,添加了offer、peek、poll、put、take这些个对线程友好的或者阻塞,或者等待方法。

    package com.my.controller;
    
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class TestArrayBlockingQueue {
        static BlockingQueue strs = new ArrayBlockingQueue<>(11);
    
        static Random r = new Random();
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 10; i++) {
                strs.put("a" + i);
            }
    
            //strs.put("aaa"); //满了就会等待,程序阻塞
            //strs.add("aaa");
            //strs.offer("aaa");
            boolean aaa = strs.offer("aaa", 1, TimeUnit.SECONDS);
            System.out.println(aaa);
            System.out.println(strs);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    DelayQueue
    DelayQueue可以实现在时间上的排序,这个DelayQueue能实现按照在里面等待的时间来进行排序。这里我们new了一个DelayQueue,他是BlockingQueue的一种也是用于阻塞的队列,这个阻塞队列装任务的时候要求你必须实现Delayed接口,Delayed往后拖延推迟,Delayed需要做一个比较compareTo,最后这个队列的实现,这个时间等待越短的就会有优先的得到运行,所以你需要做一个比较 ,这里面他就有一个排序了,这个排序是按时间来排的,所以去做好,哪个时间返回什么样的值,不同的内容比较的时候可以按照时间来排序。总而言之,你要实现Comparable接口重写 compareTo方法来确定你这个任务之间是怎么排序的。getDelay去拿到你Delay多长时间了。往里头装任务的时候首先拿到当前时间,在当前时间的基础之上指定在多长时间之后这个任务要运行,但是当我们去拿的时候,一般的队列是先加哪个先往外拿哪个,先进先出。这个队列是不一样的,按时间进行排序(按紧迫程度进行排序)。DelayQueue就是按照时间进行任务调度。

    package com.my.controller;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class TestDelayQueue {
        static BlockingQueue tasks = new DelayQueue<>();
    
        static class MyTask implements Delayed {
            String name;
            long runningTime;
    
            MyTask(String name, long rt) {
                this.name = name;
                this.runningTime = rt;
            }
    
            @Override
            public int compareTo(Delayed o) {
                if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                    return -1;
                else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
                    return 1;
                else
                    return 0;
    //            return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
    
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
    
    
            @Override
            public String toString() {
                return name + " " + runningTime;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            long now = System.currentTimeMillis();
            MyTask t1 = new MyTask("t1", now + 1000);
            MyTask t2 = new MyTask("t2", now + 2000);
            MyTask t3 = new MyTask("t3", now + 1500);
            MyTask t4 = new MyTask("t4", now + 2500);
            MyTask t5 = new MyTask("t5", now + 500);
    
            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);
    
            System.out.println(tasks);
    
            for(int i=0; i<5; i++) {
                System.out.println(tasks+"--");
                System.out.println(tasks.take());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    疑问:看下面的控制台,t2任务的时间小于t4,不是应该排在t4的前面吗?待解决
    在这里插入图片描述
    DelayQueue本质上用的是一个PriorityQueue,PriorityQueue是从AbstractQueue继承的。PriorityQueue特点是它内部你往里装的时候并不是按顺序往里装的,而是内部进行了一个排序。按照优先级,最小的优先。它内部实现的结构是一个二叉树,这个二叉树可以认为是堆排序里面的那个最小堆值排在最上面。

    package com.my.controller;
    
    import java.util.PriorityQueue;
    
    public class TestPriorityQueue {
        public static void main(String[] args) {
            PriorityQueue q = new PriorityQueue<>();
    
            q.add("c");
            q.add("e");
            q.add("a");
            q.add("d");
            q.add("z");
            System.out.println(q);
            for (int i = 0; i < 5; i++) {
                System.out.println(q.poll());
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述
    SynchronousQueue
    SynchronousQueue容量为0,就是这个东西它不是用来装内容的,SynchronousQueue是专门用来两个线程之间传内容的,给线程下达任务的,本质上Exchanger与这个容器的概念是一样的。看下面代码,有一个线程起来等着take,里面没有值一定是take不到的,然后就等着。然后当put的时候能取出来,take到了之后能打印出来,最后打印这个容器的size一定是0,打印出aaa来这个没问题。那当把线程注释掉,在运行一下程序就会在这阻塞,永远等着。如果add方法直接就报错,原因是满了,这个容器为0,你不可以往里面扔东西。这个Queue和其他的很重要的区别就是你不能往里头装东西,只能用来阻塞式的put调用,要求是前面得有人等着拿这个东西的时候你才可以往里装,但容量为0,其实说白了就是我要递到另外一个的手里才可以。这个SynchronousQueue看似没有用,其实不然,SynchronousQueue在线程池里用处特别大,很多的线程取任务,互相之间进行任务的一个调度的时候用的都是它。

    package com.my.controller;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class TestSynchronusQueue {
        public static void main(String[] args) throws InterruptedException {
            // 容量为0
            BlockingQueue strs = new SynchronousQueue<>();
    
            new Thread(()->{
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
    //        strs.put("aaa"); //阻塞等待消费者消费
            strs.put("bbb");
            System.out.println(strs.size());
            // 如果没有线程提前等着取,再add是会报错的,再put会阻塞
            strs.add("aaa"); //报错
            strs.put("bbb"); //阻塞
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    在这里插入图片描述
    TransferQueue
    TransferQueue传递,实际上是前面各种各样Queue的一个组合,它可以给线程来传递任务,与此同时不像是SynchronousQueue只能传递一个,TransferQueue做成列表可以传好多个。比较牛X的是它添加了一个方法叫transfer,如果我们用put就相当于一个线程来了往里一装就不管了。transfer就是装完在这等着,阻塞等有人把它取走我这个线程才回去干我自己的事情。一般使用场景:是我做了一件事情,我这个事情要求有一个结果,有了这个结果之后我才可以继续进行我下面的这个事情,比方说我付了钱,这个订单我付账完成了,但是我一直要等这个付账的结果完成才可以给客户反馈。

    package com.my.controller;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class TestTransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue strs = new LinkedTransferQueue<>();
    
            new Thread(() -> {
                while (!strs.isEmpty()){
                    try {
                        System.out.println(strs.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
            }).start();
            // transfer提交完任务我得等着它被消费掉,我才肯罢休
            strs.transfer("aaa");
            strs.transfer("bbb");
    
            //strs.put("aaa");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    内容总结:

    1. 从Hashtable一直到这个ConcurrentHashMap,这些不是一个替代的关系,它们各自有各自的用途
    2. Vector到Queue的这样的一个过程,List和Queue区别主要就是Queue添加了许多对线程友好的API,offer、peek、poll,他的一个子类型叫BlockingQueue对线程友好的API又添加了put和take,这两个实现了阻塞操作。
  • 相关阅读:
    Intel oneAPI笔记(2)--jupyter官方文档(oneAPI_Intro)学习笔记
    STM32实现USB转TTL串口工具
    一个移动应用程序是如何让我被动地赚到 500 美元以上的?
    在阿里云上配置开放端口的详细教程
    maven工程结构搭建
    JavaScript命名冲突不可避免?冲突源有哪些
    网课查题公众号快速搭建法 内含接口及新手教程
    Docker命令大全
    如何做一场高质量故障复盘
    为什么说 HashMap 是无序的
  • 原文地址:https://blog.csdn.net/qq_36184384/article/details/126699325