-
- //报错 java.util.ConcurrentModificationException
- public class ListTest {
- public static void main(String[] args) {
- List
list= new CopyOnWriteArrayList<>(); - //并发下Arrayist边读边写会不安全的
- /**
- * 解决方案:
- * 1.List
list= new Vector<>(); - * 2.List
list= Collections.synchronizedList(new ArrayList<>()); - * 3.List
list= new CopyOnWriteArrayList<>(); - */
- for(int i=0;i<50;i++){
- new Thread(()->{
- list.add(UUID.randomUUID().toString().substring(0,5));
- System.out.println(list);
- },String.valueOf(i)).start();
- }
- }
- }
上面这个多个线程边读边写时会出现如下报错
java.util.ConcurrentModificationException

在CopyOnWriteArrayList的底层用一个这样的数据实现
volatile内存,模型中规定保证线程间的可见性,但不保证原子性

CopyOnWrite使用的是lock锁,Vertor使用的是synchronized,有sync都会很慢。

list的解决方案有使用vector这个安全类和工具类和 juc.
工具类是将其变成synchronized,但很慢,juc是用写入时复制。
set和List是一个同级的关系,都是Collection的子类。
所以set在边读边写时也会有java.util.ConcurrentModificationException报错。
但是set没有vector,只有工具类和juc的解决方案。
- public class SetList {
- public static void main(String[] args) {
- Set
set = new HashSet<>(); - // Set
set = Collections.synchronizedSet(new HashSet<>()); - // Set
set = new CopyOnWriteArraySet<>(); - for(int i=0;i<50;i++){
- new Thread(()->{
- set.add(UUID.randomUUID().toString().substring(0,5));
- System.out.println(set);
- },String.valueOf(i)).start();
- }
- }
- }
hashset底层就是hashmap。

hashset的add方法就是hashmap的put方法。

这个也有" java.util.ConcurrentModificationException报错
这里的解决方案是juc下的ConcurrentHashMap。
- public class MapList {
- public static void main(String[] args) {
- //加载因子,初始化容量 0.75和16
- // Map
map=new HashMap<>(); - Map
map=new ConcurrentHashMap<>(); -
- for(int i=0;i<50;i++){
- new Thread(()->{
-
- map.put(Thread.currentThread().getName(),UUID.randomUUID().toString().substring(0,5));
- System.out.println(map);
- },String.valueOf(i)).start();
- }
- }
- }
callable接口就类似于runnable接口,然而runnable接口不会返回结果也不会抛出异常,callable就可以。一个是call()方法,一个是run()方法.
callable接口需要提供一个泛型,泛型的参数等于方法的返回值。

如何用new Thread().start接收callable接口并开启线程
Thread()只能接收runnable参数,不认识callable()参数,所以callable要通过runnable去做一个桥梁,在runnable里面有如下的一些实现。

其中FutureTask这个实现类与Callable有联系,如下所示,有一个构造参数就是Callable

这里用到的应该就是适配器模式,这里的futuretask就是一个适配器。
1.两个同样的callable实现类开启的线程内的输出结果会被缓存。
2.结果可能会等待,会阻塞。
-
- public class CallavkeTest {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- // new Thread(new Runnable()).start(); //传统方式
- // new Thread(new FutureTask
()).start(); - // new Thread(new FutureTask
(Callable)).start(); - mythread mythread=new mythread(); //callable接口的实现类
- FutureTask futureTask = new FutureTask(mythread); //callable接口的适配类
- new Thread(futureTask,"A").start();
- new Thread(futureTask,"B").start(); //只会输出一个call,结果被缓存了,达到提高效率的目的
- String str=(String)futureTask.get(); //获取callable的返回结果,get方法会等待结果,可能产生阻塞,要将其放在最后
- //或者通过异步通信来处理!
- System.out.println(str);
- }
- }
-
- class mythread implements Callable
{ - @Override
- public String call() throws Exception {
- System.out.println("call方法被调用");
- //耗时操作
- return "1230";
- }
- }
-
- //class mythread implements Runnable{
- //
- // @Override
- // public void run() {
- //
- // }
- //}


原理:
countDownLatch.countDown(); //-1
countDownLatch.await(); //等待计数器归零再向下执行
- //计数器
- public class CountDownLatchDemo {
- public static void main(String[] args) throws InterruptedException {
- //总数是6
- CountDownLatch countDownLatch = new CountDownLatch(6);
- for(int i=0;i<6;i++){
- new Thread(()->{
- System.out.println(Thread.currentThread().getName()+"go out");
- countDownLatch.countDown(); //-1
-
- },String.valueOf(i)).start();
- }
- countDownLatch.await(); //等待计数器归零再向下执行
- System.out.println("Close door");
- // countDownLatch.countDown(); //-1
- }
- }
加法计数器

有两种构造参数,一个是传个计数,一个是传个计数完之后要执行的线程。
-
- public class CyclicBarrierDemo {
- public static void main(String[] args) {
-
- CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
- System.out.println("g盖亚!!!");
- });
-
- for(int i=0;i<7;i++){
-
- final int temp=i;
- //lamda表达式能操作到i吗?
- new Thread(()->{
- System.out.println(Thread.currentThread().getName()+"收集"+temp+"个"); //间接获得
- try {
- cyclicBarrier.await(); //等待
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (BrokenBarrierException e) {
- throw new RuntimeException(e);
- }
- }).start();
- }
- }
- }
如果计数为8但是线程只有7个的话,就会永远卡死在一个地方。


这个 的实现也有两种参数

- public class SemaphoreDemo {
- public static void main(String[] args) {
- //线程数量:停车位
- Semaphore semaphore = new Semaphore(3);
- for(int i=1;i<=6;i++){
- new Thread(()->{
- //acquire()得到
- try {
- semaphore.acquire();
- System.out.println(Thread.currentThread().getName()+"抢到车位");
- TimeUnit.SECONDS.sleep(2);
- System.out.println(Thread.currentThread().getName()+"离开车位 ");
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }finally {
- //realease() 释放
- semaphore.release();
- }
-
- },String.valueOf(i)).start();
-
- }
- }
- }
一开始只有三个进去了,后面三个都出去了,后三个才能进来,这里的主要应用场景就是限流。

原理:
semaphore.acquire(); //获取,假设已经的满了,就等待到资源被释放为止。
semaphore.release(); //释放,将当前信号量释放+1,然后唤醒等待线程。
作用:多个共享资源的互斥使用。并发限流,控制最大线程数。
readwritelock只有一个实现类,可重入的读写锁. 读的时候可以多个线程同时读,但是写的时候只能一个线程在写。
如下所示的一个自定义缓存类读写操作
- /**
- * readwritelock
- */
- public class readwritelockdemo {
- public static void main(String[] args) {
- MyCache myCache=new MyCache();
- //写入
- for(int i=0;i<5;i++){
- final int temp=i;
- new Thread(()->{
- myCache.put(temp+"",temp+"");
- },String.valueOf(i)).start();
- }
- //读取
- for(int i=0;i<5;i++){
- final int temp=i;
- new Thread(()->{
- myCache.get(temp+"");
- },String.valueOf(i)).start();
- }
- }
- }
-
-
- /**
- * 自定义缓存
- */
- class MyCache{
- private volatile Map
map=new HashMap<>(); -
-
- //存入
- public void put(String key,Object value){
- System.out.println(Thread.currentThread().getName()+"写入"+key);
- map.put(key,value);
- System.out.println(Thread.currentThread().getName()+"写入完毕");
- }
- //读取
- public void get(String key){
- System.out.println(Thread.currentThread().getName()+"读取"+key);
- Object o=map.get(key);
- System.out.println(Thread.currentThread().getName()+"读取完毕");
- }
- }
输出如下,在一个线程写入的过程中另一个线程也在写入,这种情况是不能发生的

使用了读写锁之后,写操作只会允许一个线程执行,读操作则会有多个线程同时进行.。
-
- /**
- * readwritelock
- */
- public class readwritelockdemo {
- public static void main(String[] args) {
- // MyCache myCache=new MyCache();
- MyCacheLock myCacheLock=new MyCacheLock();
-
- //写入
- for(int i=0;i<5;i++){
- final int temp=i;
- new Thread(()->{
- myCacheLock.put(temp+"",temp+"");
- },String.valueOf(i)).start();
- }
- //读取
- for(int i=0;i<5;i++){
- final int temp=i;
- new Thread(()->{
- myCacheLock.get(temp+"");
- },String.valueOf(i)).start();
- }
- }
- }
- /**
- * 加上锁之后
- */
- class MyCacheLock{
- private volatile Map
map=new HashMap<>(); - //读写锁,可以更加细力度的控制
- private ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
-
- //存入,只有一个线程写
- public void put(String key,Object value){
- readWriteLock.writeLock().lock();
- try{
- System.out.println(Thread.currentThread().getName()+"写入"+key);
- map.put(key,value);
- System.out.println(Thread.currentThread().getName()+"写入完毕");
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- readWriteLock.writeLock().unlock();
- }
- }
- //读取,所有线程都能读
- public void get(String key){
- readWriteLock.readLock().lock();
- try{
- System.out.println(Thread.currentThread().getName()+"读取"+key);
- Object o=map.get(key);
- System.out.println(Thread.currentThread().getName()+"读取完毕");
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- readWriteLock.readLock().unlock();
- }
-
- }
- }


JUC有这样一个阻塞队列的接口,t它的实现有一个SynchronousQueue同步队列,还有一些数组阻塞队列,和链表阻塞队列等等


可以看见Queue和List和Set是同一级别的,在Queue接口下有这个BlockingQueue接口和Deque和AbstractQueue。

典型使用场景: 多线程并发处理,线程池,生产者消费者。
1.抛出异常2.不会抛出异常3.阻塞等待4.超时等待
| 方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
| 添加 | add | offer() | put() | offer() |
| 移除 | remove | poll() | take() | poll() |
| 检测队首元素 | element | peek |
- public class Test {
- public static void main(String[] args) throws InterruptedException {
- // test1();
- test4();
- }
- /**
- * 抛出异常
- */
- public static void test1(){
- //设置队列大小
- ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
- System.out.println(blockingQueue.add("a"));
- System.out.println(blockingQueue.add("b"));
- System.out.println(blockingQueue.add("c"));
- //Exception in thread "main" java.lang.IllegalStateException: Queue full
- // blockingQueue.add("d");
- System.out.println("_______________");
- System.out.println(blockingQueue.remove());
- System.out.println(blockingQueue.remove());
- System.out.println(blockingQueue.remove());
- //Exception in thread "main" java.util.NoSuchElementException
- // System.out.println(blockingQueue.remove());
- }
-
-
- /**
- * 有返回值,不抛出异常
- */
- public static void test2(){
- ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
- System.out.println(blockingQueue.offer("a"));
- System.out.println(blockingQueue.offer("b"));
- System.out.println(blockingQueue.offer("c"));
- System.out.println(blockingQueue.offer("d")); //不抛出异常,返回false
- System.out.println("_______________————————————————————");
- System.out.println(blockingQueue.poll());
- System.out.println(blockingQueue.poll());
- System.out.println(blockingQueue.poll());
- System.out.println(blockingQueue.poll()); //也不抛出异常,返回null
- }
-
-
- /**
- * 等待,阻塞(一直阻塞)
- */
- public static void test3() throws InterruptedException {
- ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
- blockingQueue.put("a");
- blockingQueue.put("b");
- blockingQueue.put("c");
- // blockingQueue.put("d"); //一直阻塞
- System.out.println(blockingQueue.take());
- System.out.println(blockingQueue.take());
- System.out.println(blockingQueue.take());
- System.out.println(blockingQueue.take());//也是一直阻塞
- }
-
- /**
- * 等待,阻塞(等待超时)
- */
- public static void test4() throws InterruptedException {
- ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
- blockingQueue.offer("a");
- blockingQueue.offer("b");
- blockingQueue.offer("c");
- blockingQueue.offer("d",2, TimeUnit.SECONDS); //超时时间和单位
- System.out.println(blockingQueue.poll());
- System.out.println(blockingQueue.poll());
- System.out.println(blockingQueue.poll());
- System.out.println(blockingQueue.poll(2, TimeUnit.SECONDS));
- }
- }
没有容量,进去一个元素必须等待取出来之后,才能再往里面放一个元素。
- /**
- * 同步队列
- */
- public class SynchronousQueueDemo {
- public static void main(String[] args) {
- SynchronousQueue
blockingQueue = new SynchronousQueue<>();//同步队列 - new Thread(()->{
- try {
- System.out.println(Thread.currentThread().getName()+"put 1");
- blockingQueue.put("1");
- System.out.println(Thread.currentThread().getName()+"put 2");
- blockingQueue.put("2");
- System.out.println(Thread.currentThread().getName()+"put 3");
- blockingQueue.put("3");
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- },"t1").start();
- new Thread(()->{
- try {
- TimeUnit.SECONDS.sleep(3);
- System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
- TimeUnit.SECONDS.sleep(3);
- System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
- TimeUnit.SECONDS.sleep(3);
- System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- },"t2").start();
- }
- }
线程池:三大方法,7大参数,4中拒绝策略。
池化技术:事先准备好资源,来人就用,用完放回。
程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术
线程池,连接池,内存池,对象池。
1、降低资源的消耗
2、提高响应的速度
3、方便管理。
线程复用、可以控制最大并发数、管理线程

- //Executors 工具类、三大方法。
- public class Demo01 {
- public static void main(String[] args) {
- // ExecutorService threadPoll = Executors.newSingleThreadExecutor();//单个线程
- // ExecutorService threadPoll =Executors.newFixedThreadPool(5); //创建一个固定线程池的大小
- ExecutorService threadPoll =Executors.newCachedThreadPool(); //可伸缩
-
- try {
- for(int i=0;i<10;i++){
- //使用了线程池后,使用线程池创建线程
- threadPoll.execute(()->{
- System.out.println(Thread.currentThread().getName()+" OK");
- });
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }finally {
- //线程池用完,程序结束,关闭线程池
- threadPoll.shutdown();
- }
- }
- }
源码分析
第一个方法的源码
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
())); - }
第二个方法的源码
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
()); - }
第三个方法的源码
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue
()); - }
可以看见三种开启方法都是用的ThreadPoolExecutor,它的源码如下
可以看见有7个参数
1.核心线程池大小2.最大核心线程池大小3.存活时间,4.超时单位5.阻塞队列6.线程工厂,用于创建线程7.拒绝策略。
- public ThreadPoolExecutor(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue
workQueue, - ThreadFactory threadFactory,
- RejectedExecutionHandler handler) {
- if (corePoolSize < 0 ||
- maximumPoolSize <= 0 ||
- maximumPoolSize < corePoolSize ||
- keepAliveTime < 0)
- throw new IllegalArgumentException();
- if (workQueue == null || threadFactory == null || handler == null)
- throw new NullPointerException();
- this.acc = System.getSecurityManager() == null ?
- null :
- AccessController.getContext();
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
前面三个方法的前两个参数分别是1,1 5,5 0,Inter.MaxValue。(21亿大小)
因此阿里巴巴的手册里面才会这样写。
四大策略 核心线程池大小为2,最大为5,一开始只有2个,但是阻塞队列里面满了之后又来人了会开放剩下三个,又慢了之后就不给进了,这就是拒绝策略。
等到了那三个队列空闲后后,经过了超时时间就会关闭释放。

四个实现类对应四大策略
自定义线程池、会抛出异常.
- //Executors 工具类、三大方法。
- public class Demo01 {
- public static void main(String[] args) {
- ExecutorService threadPoll =new ThreadPoolExecutor(
- 2,
- 5,
- 3,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(3),
- Executors.defaultThreadFactory(), //一般不变
- new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略是银行满了,还有人进来时就不处理该人并抛出异常。
-
- try {
- for(int i=0;i<10;i++){
- //使用了线程池后,使用线程池创建线程
- threadPoll.execute(()->{
- System.out.println(Thread.currentThread().getName()+" OK");
- });
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }finally {
- //线程池用完,程序结束,关闭线程池
- threadPoll.shutdown();
- }
- }
- }

哪来的回哪里去,由原本的线程来执行。
- //Executors 工具类、三大方法。
- public class Demo01 {
- public static void main(String[] args) {
- ExecutorService threadPoll =new ThreadPoolExecutor(
- 2,
- 5,
- 3,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(3),
- Executors.defaultThreadFactory(), //一般不变
- new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里!
-
- try {
- for(int i=0;i<10;i++){
- //使用了线程池后,使用线程池创建线程
- threadPoll.execute(()->{
- System.out.println(Thread.currentThread().getName()+" OK");
- });
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }finally {
- //线程池用完,程序结束,关闭线程池
- threadPoll.shutdown();
- }
- }
- }

队列满了不会抛出异常。会直接丢掉任务。
- package com.yhy.pool;
-
- import java.util.concurrent.*;
-
- /**
- * new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略是银行满了,还有人进来时就不处理该人并抛出异常。
- * new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里!
- * new ThreadPoolExecutor.DiscardPolicy()); //队列满了就踢了并且不抛出异常。
- * new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!
- */
- public class Demo01 {
- public static void main(String[] args) {
-
-
- ExecutorService threadPoll =new ThreadPoolExecutor(
- 2,
- 5,
- 3,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(3),
- Executors.defaultThreadFactory(), //一般不变
- new ThreadPoolExecutor.DiscardPolicy());
-
- try {
- for(int i=0;i<10;i++){
- //使用了线程池后,使用线程池创建线程
- threadPoll.execute(()->{
- System.out.println(Thread.currentThread().getName()+" OK");
- });
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }finally {
- //线程池用完,程序结束,关闭线程池
- threadPoll.shutdown();
- }
- }
- }
可以看见只有8条输出,有两条被踢了。

- //Executors 工具类、三大方法。
- public class Demo01 {
- public static void main(String[] args) {
- ExecutorService threadPoll =new ThreadPoolExecutor(
- 2,
- 5,
- 3,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(3),
- Executors.defaultThreadFactory(), //一般不变
- new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!
-
- try {
- for(int i=0;i<10;i++){
- //使用了线程池后,使用线程池创建线程
- threadPoll.execute(()->{
- System.out.println(Thread.currentThread().getName()+" OK");
- });
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }finally {
- //线程池用完,程序结束,关闭线程池
- threadPoll.shutdown();
- }
- }
- }
经常会被问,池的最大大小如何去设置。
CPU密集型: 12核的CPU最多12条线程同时执行,多少核就
IO密集型: 程序 15个大型任务 IO十分占用资源 ,一般设置为两倍30个线程。
- /**
- * new ThreadPoolExecutor.AbortPolicy()); //该拒绝策略是银行满了,还有人进来时就不处理该人并抛出异常。
- * new ThreadPoolExecutor.CallerRunsPolicy()); //哪来的去哪里!
- * new ThreadPoolExecutor.DiscardPolicy()); //队列满了就踢了并且不抛出异常。
- * new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!
- */
- public class Demo01 {
- public static void main(String[] args) {
-
- //自定义线程池! 工作 ThreadPoolExecutor
- //最大线程池如何定义
- //1、CPU 密集型,几核,就是几,可以保CPu的效率最高!
- //2、IO密集型
- // 程序 15个大型任务 IO十分占用资源
-
- //获取CPU核数
- System.out.println(Runtime.getRuntime().availableProcessors());
-
- ExecutorService threadPoll =new ThreadPoolExecutor(
- 2,
- Runtime.getRuntime().availableProcessors(),
- 3,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(3),
- Executors.defaultThreadFactory(), //一般不变
- new ThreadPoolExecutor.DiscardOldestPolicy()); //队列满了不会抛出异常,尝试去和最早的竞争,也不会抛出异常!
-
- try {
- for(int i=0;i<10;i++){
- //使用了线程池后,使用线程池创建线程
- threadPoll.execute(()->{
- System.out.println(Thread.currentThread().getName()+" OK");
- });
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }finally {
- //线程池用完,程序结束,关闭线程池
- threadPoll.shutdown();
- }
- }
- }