目录
HashMap 与 ConcurrentHashMap 演示
Java程序员进行并发编程时,相比于其他语言的程序员而言要倍感幸福,因为并发大师Doug Lea(道格·利)不遗余力地为 Java 开发者提供了非常多的并发容器和框架。
ConcurrentHashMap是线程安全且高效的HashMap,使用它一般基于以下两个原因:
- 常规的HashMap在并发情况下不安全(JDK1.7还有死链问题)
- 线程安全的HashTable效率非常低下(使用synchronized关键字保证线程安全)
在ConcurrentHashMap中,首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的似乎还,其他段的数据也能被其他线程访问。
图片来源:百度
如上图所示,JDK1.7中ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry 则用于存储键值对数据。一个ConcurrentHashMap里包含一个 Segment 数组。Segment 的结构和HashMap类似,是一种数组和链表结构。一个 Segment 里包含一个HashEntry数组,每个 HashEntry 是一个链表结构的元素,每个 Segment 守护着一个HashEntry数组里的元素,当对 HashEntry 数组的数据进行修改时,必须先获得与它对应的 Segment 锁。 而JDK1.8中将锁的粒度更为细化,相当于对每一个Node节点添加了一把与之对应的锁,另外在Node节点基础上引入了TreeBin。
- public class ConcurrentHashMapDemo {
-
- static final Map
hashMap = new HashMap<>(); - static final Map
concurrentHashMap = new ConcurrentHashMap<>(); - static AtomicInteger count = new AtomicInteger(0);
-
- public static void main(String[] args) throws InterruptedException {
- testHashMap();
- testConcurrentHashMap();
- // 睡眠3s保证所有线程已运行完毕,确保拿到的size是最终结果
- Thread.sleep(3000);
- System.out.println("HashMap最终元素个数: " + hashMap.size());
- System.out.println("ConcurrentHashMap最终元素个数: " + concurrentHashMap.size());
- }
-
- public static void testHashMap() {
- for (int i = 0; i < 100; i++) {
- new Thread(() -> {
- for (int j = 0; j < 500; j++) {
- hashMap.put("hello" + count.getAndIncrement(), "world");
- }
- }).start();
- }
- }
-
- public static void testConcurrentHashMap() {
- for (int i = 0; i < 100; i++) {
- new Thread(() -> {
- for (int j = 0; j < 500; j++) {
- concurrentHashMap.put("hello" + count.getAndIncrement(), "world");
- }
- }).start();
- }
- }
-
- }
可以发现 HashMap 在多线程下出现了节点覆盖现象,而ConcurrentHashMap没有。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列:
- 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满
- 支持阻塞的移除方法:意思是当队列空时,获取元素的线程会等待队列变为非空
阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。
图片来源:百度
注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用 put 或 offer 方法永远不会被阻塞,而且使用 offer 方法时永远返回 true。
一个由数组结构组成的有界阻塞队列,按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
两个使用场景:
- 缓存系统的设计:如果能从DelayQueue中取到数据说明缓存已过期
- 定时任务调度:如果能从DelayQueue中取到数据说明可以开始执行任务
SynchronousQueue 是一个不存储元素的阻塞队列。每一个put 操作必须等待一个 take 操作,否则不能继续添加元素。默认情况下线程采用非公平性策略访问队列,同时它支持公平性访问队列。
LinkedTransferQueue 是一个由链表结构组成的无界阻塞TransferQueue队列。相较于其它队列,它多了 transfer 和 tryTransfer 两个方法。
- transfer:如果当前有消费者正在等待接收元素,该方法可以把生产者传入的元素立刻传输给消费者。如果没有,则会将元素放在队列的 tail 节点。
- tryTransfer:用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。与transfer相比少了放入队列的过程。
LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移除元素。
Fork/Join 框架是Java7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
图片来源:百度
工作窃取算法是指某个线程从其他队列里窃取任务来执行。通常会使用双端队列来完成此算法,被窃取的任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
优点:充分利用线程进行并行计算,减少了线程间的竞争。缺点:在双端队列里只有一个任务时存在竞争。并且该算法消耗了更多的系统资源,比如创建了多个线程和多个双端队列。
使用Fork/Join框架计算 0 - 100 的和
- public class ForkJoinDemo extends RecursiveTask
{ -
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- ForkJoinPool forkJoinPool = new ForkJoinPool();
- // 生成一个计算任务,负责计算 0 - 100的和
- ForkJoinDemo task = new ForkJoinDemo(0, 100);
- ForkJoinTask
result = forkJoinPool.submit(task); - Integer sum = result.get();
- System.out.println("0 - 100 的和为: " + sum);
- }
-
- // 任务执行阈值
- private static final int THRESHOLD = 2;
-
- private int start;
- private int end;
- public ForkJoinDemo(int start, int end) {
- this.start = start;
- this.end = end;
- }
-
- @Override
- protected Integer compute() {
- int sum = 0;
- // 两数之差小于等于阈值就计算任务
- boolean canCompute = (end - start) <= THRESHOLD;
- if (canCompute) {
- for (int i = start; i <= end; i++) {
- sum += i;
- }
- } else {
- // 否则分解任务
- int middle = (start + end) / 2;
- ForkJoinDemo leftTask = new ForkJoinDemo(start, middle);
- ForkJoinDemo rightTask = new ForkJoinDemo(middle + 1, end);
- // 执行子任务
- leftTask.fork();
- rightTask.fork();
- // 等待子任务执行完成
- Integer leftResult = leftTask.join();
- Integer rightResult = rightTask.join();
- // 合并子任务
- sum = leftResult + rightResult;
- }
-
- return sum;
- }
- }