JUC是Java并发编程包
wait 和sleep
管程,也就是Monitor,在Java中叫锁,在操作系统中叫监视器,是一种同步机制,保证同一个时间,只有一个线程访问被保护的数据或者代码。
JVM中的同步是基于进入和退出操作实现的,而进入和退出是基于管程对象实现的,每一个对象都有一个管程对象,管程对象会随着Java对象的创建和销毁而创建或者销毁,例如在执行线程操作的时候,首先要持有这个线程的管程对象,然后再执行方法,当你在执行方法的时候,其他线程就不能持有同一个管程对象,当你的 方法执行完毕后,再释放管程对象,交给其他线程处理,通过这种方式来实现多线程中加锁的操作。
用户线程:例如new thread 这就是用户线程
守护线程:例如垃圾回收就是守护线程,在后台默默的执行
public static void main(String[] args) {
Thread thread = new Thread(()->{
//isDaemon方法如果返回true就表示是守护线程,如果是false就表示是用户线程
System.out.println(Thread.currentThread().getName() + "::" + Thread.currentThread().isDaemon());
while (true){
}
},"UserThread");
thread.start();
System.out.println(Thread.currentThread().getName());
}
打印结果:先打印出了main线程,然后userThread 为false表示该线程是用户线程。此时主线程已经结束,但是用户线程还是存活并执行的情况,而且jvm也是在运行中。
那么接下来我们将userThread设置为守护线程看一下,如下,此时我们发现打印结果
public static void main(String[] args) {
Thread thread = new Thread(()->{
//isDaemon方法如果返回true就表示是守护线程,如果是false就表示是用户线程
System.out.println(Thread.currentThread().getName() + "::" + Thread.currentThread().isDaemon());
while (true){
}
},"UserThread");
thread.setDaemon(ture);//设置userThread为守护线程
thread.start();
System.out.println(Thread.currentThread().getName());
}
此时我们发现打印结果如下,并且jvm已经结束
Synchronized 是Java中的关键字,是一种同步锁,它修饰的对象有以下几种
public class Ticket {
Object lock = new Object();
public void blockMethod1(){
synchronized (lock){
for (int i = 0; i < 3; i++) {
System.out.println("generalMethod1 excute"+i+" time");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
虽然可以使用synchronized来定义方法,但是synchronized并不属于方法定义的一部分,因此,synchronized关键字不能被继承。如果在弗雷中的某个方法使用了synchronized关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的,而必须显示的在子类的方法中加上synchronized关键字才行。当然还可以在子类方法中调用父类被synchronized修饰的方法。
public class Ticket {
//一共有一百张票
private int num = 100;
//卖票方法
public synchronized void sell(){
if (num > 0){
num-- ;
System.out.println(Thread.currentThread().getName() + "买了一张,余票还有:"+num);
}
}
public static void main(String[] args) {
Ticket ticket = new Ticket();
Runnable runnable = new Runnable() {
@Override
public void run() {
for (int i = 0; i < 50; i++) {
ticket.sell();
}
}
};
new Thread(runnable, "t1").start();
new Thread(runnable, "t2").start();
new Thread(runnable, "t3").start();
new Thread(runnable, "t4").start();
}
}
最终打印结果如下
java.util.concurrent.locks.Lock 是一个类似于synchronized 块的线程同步机制。但是 Lock比 synchronized 块更加灵活。Lock是个接口,既然是接口那就有实现类,如下图
Lock 和 synchronized 都是可重入锁的一种,只不过synchronized是一种隐式的可重入锁,Lock是一种显示的,这里的ReentrantLock只是Lock接口的一个实现,那么什么是可重入锁?
下面我们会详细介绍。
public class LockTicket {
private final ReentrantLock lock = new ReentrantLock();
private Integer num = 100;//一百张票
public void sell(){
//加锁
lock.lock();
try {
if (num > 0){
num-- ;
System.out.println(Thread.currentThread().getName() + "买了一张,余票还有:"+num);
}
}finally {
//释放锁,finally保证在代码执行过程中抛出异常也可以释放锁
lock.unlock();
}
}
public static void main(String[] args) {
LockTicket lockTicket = new LockTicket();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
lockTicket.sell();
}
},"AA").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
lockTicket.sell();
}
},"BB").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
lockTicket.sell();
}
},"CC").start();
}
}
最终结果:可以看出来在多次执行的过程中线程顺序不一样,执行次数也超过票数,但是依然有序并且保证了不会卖超。在这里有个问题,就是我们new Thread 之后的start方法会不会立即创建一个线程?答案是不一定,查看源码我们就会发现,这里创建线程最终是java代码调用了操作系统资源,如果系统空闲则会立即创建,如果系统繁忙则不一定会立即创建
@FunctionalInterface
注解,这说明这两个接口都是函数式接口,可以使用java8 lambda表达式;在上面的案例中我们提到,new Thread 线程是否会立即创建,并且按照创建的顺序执行?答案是否定的,那么我们如何让线程按照我们指定的顺序去执行?这就是线程间通信!
现在我们有这样一个需求,就是让两个线程分别对同一个初始值为0的变量做+1 和 -1 的操作,重复交替操作很多次。最终实现的效果就是A线程+1 ,变量值为1,B线程-1 变量值回到0 。最终变量的值就是10101010101010… A线程在变量值为0的时候做+1操作,在变量值不为0的时候等待,同理B线程在变量值为1的时候做-1操作,在变量值不为1的时候等待
我们都知道java中Object
类(如下图),这个类中有一些方法,例如wait() notify() 等,我们就使用这个方法来分别使用synchronized和lock实现上面这个需求;
//第一步 创建资源类
class Share{
private int number = 0;
//第二步创建资源操作方法
//加1操作方法
public synchronized void incr() throws InterruptedException {
//判断
if (number != 0){
this.wait();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + "::" + number);
//通知
this.notifyAll();
}
//减一操作方法
public synchronized void decr() throws InterruptedException {
//判断
if (number != 1){
this.wait();
}
//干活
number--;
System.out.println(Thread.currentThread().getName() + "::" + number);
//通知
this.notifyAll();
}
}
public class ThreadDemo1 {
//第三步创建多个线程调用资源操作方法
public static void main(String[] args) {
Share share = new Share();
new Thread(() -> {
for (int i = 1; i <= 10 ; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(() -> {
for (int i = 1; i <= 10 ; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
}
}
最终效果:
在上面的例子中我们使用了两个线程AA和BB,那么如果我们再多加两个线程CC和DD,让CC加一,DD减一,最终会是什么效果呢?我们用伪代码举例并观察最终结果
//第一步 创建资源类
class Share{
//... 同上
}
public class ThreadDemo1 {
//第三步创建多个线程调用资源操作方法
public static void main(String[] args) {
//... 同上
new Thread(() -> {
//...
},"AA").start();
new Thread(() -> {
//...
},"BB").start();
new Thread(() -> {
//...
},"CC").start();
new Thread(() -> {
//...
},"DD").start();
}
}
最终结果:通过多次执行下图我们可以发现,当增加两个线程后,结果和我们预期的并不相同,并没有出现我们预期的[AA:1] [BB:0] [CC:1] [DD:0] ...
那么是为什么呢?
我们可以看一下官方文档中对于Object类的wait() 方法的解释
在这段文字中有一句话对于某一个参数的版本,实现中断和虚假唤醒是可能的,而且此方法应始终在循环中使用,这其实就是线程的虚假唤醒导致的,wait()方法应该写在while循环中,并且wait() 方法会释放锁
,下面我们用一个表格来描述一下发生这种现象的原因,如下,
我们发现一旦线程在AA和CC或者BB和DD之间重复切换执行,则会导致num大于1或者小于0的情况发生,其原因就是因为wait()方法,该方法的特点就是在那里睡着,唤醒之后就从哪里继续执行
,例如上面例子中 这段代码,当wait之后,下次被唤醒就会直接从wait往下执行,不去执行if判断,这就是线程虚假唤醒导致最终线程执行结果异常的原因。
在上面的关于wait方法的官方文档中其实已经说明,将if判断改为while循环即可,原因就是每次wait之后,不论是什么时候唤醒,都要再执行一次while循环,此时就相当于while的条件就是if判断,我们修改之后再执行看看效果
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
//第一步创建资源类和资源操作方法
class Share1 {
private int number = 0;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
//+1操作
public void incr() throws InterruptedException {
//加锁
lock.lock();
try {
//判断
while (number != 0){
condition.await();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + "::" + number);
//通知
condition.signalAll();
}finally {
//解锁
lock.unlock();
}
}
//-1操作
public void decr() throws InterruptedException {
//加锁
lock.lock();
try {
//判断
while (number != 1){
condition.await();
}
//干活
number--;
System.out.println(Thread.currentThread().getName() + "::" + number);
//通知
condition.signalAll();
}finally {
//解锁
lock.unlock();
}
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
Share1 share = new Share1();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.incr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
share.decr();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"DD").start();
}
}
最终结果同样是正常,但是上面的例子我们发现了一个问题,就是我们所有的通知都是使用的All的方式,就是通知所有其他线程,那么可不可以只通知某个线程来执行当前方法呢?欲知后事如何,请看下一小节:线程间定制化通信!
现在有个需求,就是有三个线程,我们需要实现的效果就是 AA线程打印2次,BB线程打印3次,CC线程打印4次,然后循环AA又打印2次,BB又3次,CC又4次… 循环执行5次
大概思路就是在资源类中定义一个变量,然后根据变量的变化来通知对应的线程执行方法
实现效果如下:
//创建资源类及资源类操作方法
class ShareResource{
//标志位
private int flag = 1;
private final Lock lock = new ReentrantLock();
private final Condition c1 = lock.newCondition();
private final Condition c2 = lock.newCondition();
private final Condition c3 = lock.newCondition();
//创建操作资源的方法
public void print2(int loop) throws InterruptedException {
//上锁
lock.lock();
try {
//判断
while (flag != 1){
c1.await();
}
//干活
for (int i = 1; i <= 2; i++) {
System.out.println(Thread.currentThread().getName() + "打印第" + i + "次 第" + loop + "轮打印");
}
//修改标志位,通知第二位线程
flag = 2;
c2.signal();
}finally {
//解锁
lock.unlock();
}
}
public void print3(int loop) throws InterruptedException {
//上锁
lock.lock();
try {
//判断
while (flag != 2){
c2.await();
}
//干活
for (int i = 1; i <= 3; i++) {
System.out.println(Thread.currentThread().getName() + "打印第" + i + "次 第" + loop + "轮打印");
}
//修改标志位,通知第二位线程
flag = 3;
c3.signal();
}finally {
//解锁
lock.unlock();
}
}
public void print4(int loop) throws InterruptedException {
//上锁
lock.lock();
try {
//判断
while (flag != 3){
c3.await();
}
//干活
for (int i = 1; i <= 4; i++) {
System.out.println(Thread.currentThread().getName() + "打印第" + i + "次 第" + loop + "轮打印");
}
//修改标志位,通知第二位线程
flag = 1;
c1.signal();
}finally {
//解锁
lock.unlock();
}
}
}
public class ThreadDemo3 {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareResource.print2(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"AA").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareResource.print3(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"BB").start();
new Thread(() -> {
for (int i = 1; i <= 5; i++) {
try {
shareResource.print4(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"CC").start();
}
}
最终结果
综上,我们可以总结出来多线程编程的步骤
看下面这段代码,在main方法中声明一个ArrayList集合,由于该集合是线程不安全的,因此在多线程环境中,一边放元素,一边取元素的时候,就会出现java.util.ConcurrentModificationException
异常,并发修改异常,
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0; i < 50; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
最终结果
链接: 参考链接
Vector也是List接口的一个实现类,只不过它是线程安全的,只需要将上面代码中new ArrayList()
换成 new Vector()
即可
通过Collections工具类中提供的方法,返回一个线程安全的集合,解决方法也是与上面的类似,将代码中的new ArrayList()
换成 Collections.synchronizedList(new ArrayList<>())
即可
上面两种方法相对来说比较古老,我们用的也比较少,一般情况都是用JUC包中给我们提供的解决方法,也是就是CopyOnWriteArrayList 写时复制技术;
我们在使用CopyOnWriteArrayList的时候,首先它会允许并发读集合,如果有写入,则把当前集合复制一份,然后让单一线程写入复制的集合,等到写入完成之后,将两个集合做一个合并,再让其他线程读这个新集合,通过这种方式支持高并发。这样做的好处就是支持并发读,也照顾到了独立写,就不会存在上面的java.util.ConcurrentModificationException
异常。个人理解就是有一个读集合,这个时候大家都来读,读是不存在问题,这个时候如果A来写,然后A持有了添加方法的锁,其他写入线程智能等待A释放锁,等到A写完之后,原来读数据的线程读到的也就是新数组了,这个时候B再来获取锁,再接着写,再接着合并。
看一下源码:
public static void main(String[] args) throws InterruptedException {
Set<String> set = new HashSet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
}, String.valueOf(i)).start();
}
Thread.sleep(2000);
System.out.println("最后添加元素个数:" + set.size());
}
与上面的代码一样,只不过是将ArrayList换成了HashSet,同样还是会出现上面的异常,解决方案也类似,是使用Set
在这里复习一下HashSet集合,HashSet的特点就是无序不重复,其底层原理就是使用的HashMap的Key,HashMap的key就是不重复,不重复的原理是计算了Key的HashCode。1.8之后的hashMap已经不是原来的数组+链表了,而是引入了红黑树,有兴趣可以看看 这位老铁的文章链接
public static void main(String[] args) throws InterruptedException {
Set<String> set = new HashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0,8));
System.out.println(set);
}, String.valueOf(i)).start();
}
Thread.sleep(2000);
System.out.println("最后添加元素个数:" + set.size());
}
与上面的解决方案类似,将HashMap换成ConcurrentHashMap即可
class Phone {
public synchronized void sendMsg(){
System.out.println("-----> 发送短信!");
}
public synchronized void sendEmail(){
System.out.println("-----> 发送邮件");
}
public void sayHolle(){
System.out.println("hello");
}
public void block(){
synchronized (this){
//do something...
}
}
}
public class ThreadDemoArray {
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
phone.sendMsg();
},"AA").start();
Thread.sleep(100);
new Thread(() -> {
phone.sendEmail();
},"BB").start();
}
}
首先看一下上面的代码,有一个类叫phone,里面有两个加了synchronized关键字的方法 sendMsg和sendEmail 还有一个普通方法,然后在main方法中创建两个线程并且让AA线程睡眠一百毫秒
TimeUnit.SECONDS.sleep(4);
,也就是说让AA线程睡四秒,最终打印的结果与上面一样,原因也是一样的,就是AA线程持有了当前对象的锁,其他线程需要等到AA线程释放锁才能执行其他操作。public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
new Thread(() -> {
phone.sendMsg();
},"AA").start();
Thread.sleep(100);
new Thread(() -> {
phone.sayHello();
},"BB").start();
}
public static void main(String[] args) throws InterruptedException {
Phone phone = new Phone();
Phone phone2 = new Phone();
new Thread(() -> {
phone.sendMsg();
},"AA").start();
Thread.sleep(100);
new Thread(() -> {
phone2 .sendEmail();
},"BB").start();
}
总结:对于同步方法,锁的是当前实例对象,也就是this,对于静态同步方法,锁的是当前类的Class对象,对于同步方法块,锁的是synchronized括号里配置的对象
同步方法块的意思就是例子中block方法里的synchronized括号里锁的对象,例如上面例子中synchronized(this) 中这个this就是当前对象,也就相当于是block方法上的synchronized。如果写法改为
public class Phone{
private App app;
public void block(){
synchronized(app){
//do something
}
}
}
则这种情况就相当于是所住了这个App对象,其他线程如果在调用时涉及到Phone中的app对象,则就需要等待其他涉及该对象的线程释放锁才能继续操作Phone对象中的这个app对象
public class LockTicket {
private final ReentrantLock lock = new ReentrantLock();
private Integer num = 50;//一百张票
public void sell(){
//加锁
lock.lock();
try {
if (num > 0){
num-- ;
System.out.println(Thread.currentThread().getName() + "买了一张,余票还有:"+num);
}
}finally {
//释放锁,finally保证在代码执行过程中抛出异常也可以释放锁
lock.unlock();
}
}
public static void main(String[] args) {
LockTicket lockTicket = new LockTicket();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
lockTicket.sell();
}
},"AA").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
lockTicket.sell();
}
},"BB").start();
new Thread(() -> {
for (int i = 0; i < 40; i++) {
lockTicket.sell();
}
},"CC").start();
}
}
这段代码依然是上面使用lock实现卖票的例子,我们可以看到这里我们使用的是可重入锁ReentrantLock ,再看一下最终运行的效果如下图,我们可以看到这个AA线程从第一张票一直买到地10张票,直到它的次数用完了才到BB线程,而CC线程完全没用,这就是非公平锁,意思就是可能存在线程饿死的情况,也就是CC线程这种完全没用的情况,那么如何实现公平锁呢,只需要private final ReentrantLock lock = new ReentrantLock(true);
这样既可,在构造方法中传入参数true就可以了,我们看看效果如下图,可以看出来公平锁的情况三个线程都有参与。
那么公平锁与非公平锁的区别是什么?
//可重入锁的构造器,里面判断是否公平
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
可重入锁又称递归锁,是指同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提是锁对象得是同一个对象),不会因为之前已经获取过锁还没有释放而阻塞。
看代码
示例一:
public class ReentryLock {
public static void main(String[] args) {
Object o = new Object();
new Thread(() -> {
boolean b0 = Thread.holdsLock(o);//synchronized 使用 Thread.holdsLock()判断是否持有锁
System.out.println(Thread.currentThread().getName() + "未进入同步,是否持有object的锁" + b0);
synchronized (o){
System.out.println(Thread.currentThread().getName() + "外层,是否持有object的锁" + Thread.holdsLock(o));
synchronized (o){
System.out.println(Thread.currentThread().getName() + "中层,是否持有object的锁"+ Thread.holdsLock(o));
synchronized (o){
System.out.println(Thread.currentThread().getName() + "内层,是否持有object的锁"+ Thread.holdsLock(o));
}
System.out.println(Thread.currentThread().getName() + "内层出来,是否持有object的锁"+ Thread.holdsLock(o));
}
System.out.println(Thread.currentThread().getName() + "中层出来,是否持有object的锁"+ Thread.holdsLock(o));
}
System.out.println(Thread.currentThread().getName() + "外层出来,是否持有object的锁"+ Thread.holdsLock(o));
},"T1").start();
}
}
最终结果
这个实例的意思就是当前线程只要拿到了最外层的锁,则里面的锁也可以任意进出,根据上面的线程八锁,我们可以看出来,AA线程访问了发送短信方法的时候,其他线程是无法访问发送邮件的方法的,但是AA线程是可以随时访问发送邮件的,这就是可重入锁的一种表现,因为AA线程此时持有锁,而锁的作用范围是当前对象,其他线程要访问发送邮件,首先要持有当前对象的锁
示例二:
class Message{
public synchronized void sendA() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + "发送给A,时间:" + System.currentTimeMillis());
Thread.sleep(5000);
sendB();
}
public synchronized void sendB(){
System.out.println(Thread.currentThread().getName() + "发送给B,时间:" + + System.currentTimeMillis());
}
}
public class ReentryLock1 {
public static void main(String[] args) throws InterruptedException {
Message message = new Message();
new Thread(() -> {
try {
System.out.println( Thread.currentThread().getName() + "开始发送,时间:" + System.currentTimeMillis());
message.sendA();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "开始发送,时间:" + System.currentTimeMillis());
message.sendB();
},"T2").start();
}
}
最终结果:
这段代码的意思就是main方法开始执行,创建message对象,创建T1线程,T1开始执行,打印开始发送和时间
,同一时间进入sendA方法打印发送给A
,然后T1线程开始睡觉5秒,一秒后创建T2线程并打印开始发送和时间
,同一时间进入sendB方法,但是此刻message对象的锁被T1线程持有,T2线程只能等待T1释放锁,4秒后T1睡觉结束,执行sendB方法,执行结束释放锁,T2线程获取锁,执行sendB并打印。通过这个示例可以看出,T1线程持有了这个对象的锁,便可以任意访问该对象内的同步方法。那么问题来了,如果T1持有的不是对象锁呢?
看示例三
class Message{
public synchronized void sendA() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + "发送给A,时间:" + System.currentTimeMillis());
Thread.sleep(5000);
sendB();
}
public synchronized void sendB(){
System.out.println(Thread.currentThread().getName() + "发送给B,时间:" + + System.currentTimeMillis());
}
}
class Message2{
private Message message = new Message();
public void sendC() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + "发送给C,时间:" + + System.currentTimeMillis());
synchronized (message){
message.sendA();
}
}
public synchronized void sendD(){
System.out.println(Thread.currentThread().getName() + "发送给D,时间:" + + System.currentTimeMillis());
message.sendB();
}
public Message getMessage(){
return this.message;
}
}
public class ReentryLock1 {
public static void main(String[] args) throws InterruptedException {
Message2 message2 = new Message2();
new Thread(() -> {
try {
System.out.println( Thread.currentThread().getName() + "开始发送,时间:" + System.currentTimeMillis());
message2.sendC();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T1").start();
Thread.sleep(1000);
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "开始发送,时间:" + System.currentTimeMillis());
try {
message2.getMessage().sendA();
} catch (InterruptedException e) {
e.printStackTrace();
}
},"T2").start();
}
}
最终结果:
我们来分析一下步骤,首先创建Message2对象,同时创建Message对象,然后线程T1开始创建执行,打印开始发送
,同一时间调用Message2的非同步方法sendC并打印发送给C
,然后锁住message属性调用message的sendA方法打印发送给A
,然后开始睡觉5秒,一秒后T2线程创建并打印开始发送
,然后调用Message的sendA方法,但是此刻的sendA方法所在的对象message是被锁住的,并且持有锁的线程是T1,T2只能等待释放锁,4秒后T1睡觉结束,打印发送给B
后释放锁,T2获取message的锁,调用sendA,然后睡5秒再调用sendB。基本上与上面的结论是一致的,锁的作用范围包括可重入锁的效果都能看出来。那么如果T2线程直接调用sendD方法会是什么效果呢?看一下最终结果
分析一下步骤,首先创建T1线程,打印T1开始发送
,然后调用sendC方法,打印发送给C
,然后进入同步方法调用sendA,打印T1发送给A
,然后开始睡觉5秒,一秒后T2线程创建,打印T2开始发送
,然后调用sendD方法,此时我们可以发现,T2直接调用了sendD,因为T1调用的sendC方法是非同步方法,它也没有锁住当前对象,它只是方法内部锁住了message对象,而非message2对象,如果锁住了message2对象,那么sendD方法是不能执行的。当T2打印完发送给D
的时候,开始调用message的sendB方法的时候,这个时候就出现了争抢锁的问题,此时的message的锁还是在T1手中,所以只能等待T1释放锁。基本上上面的线程8锁已经将这几种情况都说明了。
public class ReentryLock {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "第一层");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "第二层");
}finally {
lock.unlock();
}
}finally {
lock.unlock();
}
},"T1").start();
}
}
最终结果:
这里可以看出来Lock与synchronized的效果一样,都是可重入锁的表现。那么如果我第二个锁不解锁呢?并且如果我有第二个线程要加锁呢?
public class ReentryLock {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
new Thread(() -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "第一层");
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "第二层");
}finally {
//注释掉第二个解锁
//lock.unlock();
}
}finally {
lock.unlock();
}
},"T1").start();
//第二个线程
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "T2 开始执行");
}finally {
lock.unlock();
}
},"T2").start();
}
}
最终结果
我们可以看到,这T1线程中还是正常执行的,但是T2线程却一直没有执行,因为没有释放锁它就无法获取到锁,所以就一直在等待,因此,加一把锁就要释放一把锁,不论是不是可重入,都要一对一的加锁释放锁
两个或两个以上的线程在执行的过程中,因为争夺资源而导致的互相等待的现象,如果没有外力干涉则无法在继续执行,这种情况就是死锁。
如图,线程A持有锁A,试图获取锁B,线程B持有锁B,试图获取锁A,两个线程都在等待对方释放锁,这就产生了死锁
看代码
public static void main(String[] args) {
Object a = new Object();
Object b = new Object();
new Thread(() -> {
synchronized (a){
System.out.println(Thread.currentThread().getName() + "持有锁A,试图获取锁B");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (b){
System.out.println(Thread.currentThread().getName() + "持有锁B");
}
}
},"T1").start();
//第二个线程
new Thread(()->{
synchronized (b) {
System.out.println(Thread.currentThread().getName() + "持有锁A,试图获取锁B");
synchronized (a) {
System.out.println(Thread.currentThread().getName() + "持有锁B");
}
}
},"T2").start();
}
在这个例子中就是T1线程持有a锁,T2线程持有b锁,然后互相等待对方释放锁,于是死锁
jps -l
命令,如下图中第一行就是我们的代码位置,然后使用第二个命令jstack 15792
这里的15792 就是上面的jps命令中打出来的我们代码死锁的进程号,第二个命令中打出来的tid就是死锁的线程idCallable接口与Runnable接口的区别就是Callable有返回值
Callable<Object> callable = new Callable<Object>() {
@Override
public Object call() throws Exception {
return null;
}
};
对比Runnable可以发现Runnable接口的run方法是没有抛出异常的,而Callable接口的call方法是会抛出异常。看下图,我们在新建的线程中传入的Callable接口时会报错,原因是Thread的构造器中没有Callable的入参,只有Runnable的入参。那么怎么解决呢?只能找一个既和Runnable接口有关系的又和Callable接口有关系的
看一下Runnable接口的文档
在这里有一个FutureTask
的实现类,就用他,这个类的构造方法中有入参是Callable接口的方法
public static void main(String[] args) throws ExecutionException, InterruptedException {
//这里使用匿名实现类的方式创建Callable接口,也可以使用java8lambda表达式简化,Callable接口被函数式接口的注解@FunctionalInterface标注,可以直接使用lambda表达式简化
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 100;
}
});
Integer i = futureTask.get();//获取Callable接口的返回值
boolean done = futureTask.isDone();//计算任务执行完成返回true
}
例如我们在处理某些复杂逻辑或耗时任务时,可以使用FutureTask单开一个线程去处理这些复杂任务,而主线程继续执行,最终汇总结果即可。
例如有4个同学,A同学计算1+1 B同学计算1+2+3+4+5+6+…+500 C同学计算5+5 D同学计算10+10 这样当主线程(老师)去问同学计算结果的时候,很明显B同学一下子算不完,因此可以单开一个线程给B同学,然后老师继续统计C同学D同学,最终将所有结果汇总即可。
看代码:
第一种情况
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName()+"futureTask execute");
return 100;
});
System.out.println(futureTask.get());
new Thread(futureTask,"AA").start();
System.out.println(futureTask.get());
}
在这个例子中我们会发现程序什么都不打印,一直在等待中,这是为什么呢,因为程序运行时先new了一个futureTask对象,然后等待线程调用执行,但是一直没有线程去调用,因此我们在第一个打印中调用get方法的时候,就会一直等待,看一下源码
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
创建FutureTask 的时候,设置当前状态为新建,然后调用get方法,判断当state小于等于completing,s = awaitDone(false,0L)
,直到等待完成,进入report方法,最终返回结果;
第二种情况
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println(Thread.currentThread().getName()+" futureTask execute");
return 100;
});
new Thread(futureTask,"AA").start();
System.out.println(futureTask.get());
new Thread(futureTask,"BB").start();
System.out.println(futureTask.get());
}
最终结果:
从这里我们可以看出来,只有AA线程执行了,BB线程是什么情况,啥也没干?直接返回结果?,这个futureTask只执行了一次,其他调用get方法的时候直接返回结果了,至于为啥直接返回结果,那是因为futureTask对象在第一次执行完之后state就变成了了2,然后在再次执行get方法的时候,直接去了report方法了,然后这个方法直接返回了outCome,这个outCome是从哪里来到我也不知道。这是为啥呢?看了半天源码也没看明白,淦
CountDownLatch 类可以设置一个计数器,然后通过countDown
方法来进行减一的操作,使用await
方法等待计数器不大于0,直到计数器为0才会唤醒await方法之后
ok 思考这样一个业务场景,当班里的同学全部都离开了,班长才能锁门、或者反过来,当班长来了开了门,同学们才能进入教室、当同学全部考完试,老师才能计算总分。当所有车辆把货物运送目的地,才能结算等等。查看
看个小例子
public static void main(String[] args) throws ExecutionException, InterruptedException {
//同学都离开了才能锁门
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println( Thread.currentThread().getName() + " 号同学离开了");
},String.valueOf(i)).start();
}
System.out.println(Thread.currentThread().getName() + "班长锁门了");
}
最终结果:通过这个例子可以发现,在不适用CountDownLatch时,无法保证所有人都离开了才锁门。
使用CountDownLatch后
public static void main(String[] args) throws ExecutionException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
//同学都离开了才能锁门
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println( Thread.currentThread().getName() + " 号同学离开了");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门了");
}
最终结果:
在这里我们会发现,在其他线程中去执行逻辑减Countdown的时候,我们的主线程是一直在等待状态,直到扣减为0的时候,主线程继续执行了,还有一种情况就是多个线程等待,直到某个线程让countdown为0
示例
CountDownLatch countDownLatch = new CountDownLatch(1);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
//准备完毕……运动员都阻塞在这,等待号令
countDownLatch.await();
String parter = "【" + Thread.currentThread().getName() + "】";
System.out.println(parter + "开始执行……");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(2000);// 裁判准备发令
System.out.println("开始执行");
countDownLatch.countDown();// 发令枪:执行发令
这段代码来自 赵彦军歹佬
首先看一下CycleBarrier的构造器
/**
* 创建一个CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,
* 但他不会在启动barrier时执行预定义操作
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* 创建一个CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时
* 执行给定的屏障操作,该操作由最后一个进入barrier的线程执行
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
看一个示例代码
public class ReentryLock {
private static final int NUMBER = 7;
public static void main(String[] args) throws ExecutionException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(NUMBER,() -> {
try {
System.out.println("先睡五秒再召唤");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("集齐7颗龙珠" + Thread.currentThread().getName() + "号线程召唤神龙");
});
for (int i = 1; i <= 7; i++) {
new Thread(() ->{
System.out.println(Thread.currentThread().getName() + "号线程收集到1颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("龙珠集齐,等待结束" + Thread.currentThread().getName() + "号线程开始执行");
},String.valueOf(i)).start();
}
}
}
最终结果:
看这个图就可以发现,当给定数量的参与者处于等待状态时,将激活预定义方法,并且是由最后一个进入barrier的线程执行,并且只有在预定义操作之后其他线程才会并行。
文档解释
一个计数信号量。 在概念上,信号量维持一组许可证。 如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。 但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。
构造方法
Semaphore(int permits)
创建一个 Semaphore与给定数量的许可证和非公平公平设置。
Semaphore(int permits, boolean fair)
创建一个 Semaphore与给定数量的许可证和给定的公平设置。
通俗解释就是通过这个类颁发一个指定数量的许可证,只有抢占了这个许可证的线程可以继续执行,其它没有获取许可证的线程只能等待。
例如现在一共有六辆车,三个车位,演示一个六辆车抢占三个车位的案例,在下面这个案例中我们先声明了一个拥有三个许可证的Semaphore对象,然后让六个线程去使用这三个许可证,当一开始三个线程抢占了所有许可证之后,剩下的线程就要等待,知道其中某个线程释放了许可证,这个时候就会唤醒正在等待的线程去抢占许可证,如果许可证使用完了其他线程又要继续等待。
public static void main(String[] args) throws ExecutionException, InterruptedException {
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6 ; i++) {
new Thread(()->{
try {
//抢占车位,也就是获取许可
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " -->抢占到了车位");
//随机秒数之后离开车位
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + " <--离开了车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
最终结果:
悲观锁和乐观锁
表锁与行锁
读写锁
读锁和写锁都会发生死锁,首先说读锁,例如两个线程都在读取某一条数据,同是这两个线程都对这条数据做了修改,这个时候A线程要等待B线程读完才能做修改,而B线程同样要等A线程读完才能做写操作,这个时候就是A等B B也在等A,发生死锁;然后是写锁也就是独占锁,当A操作这条数据的时候就会将这行数据独占,不允许其他数据访问,此时另一个线程B在操作另外一条数据,而此时A也需要操作B锁的这条数据,而B也需要操作A锁住的这条数据,于是双方又陷入了互相等待,发生死锁.
读写锁的使用就是在读的方法上加读锁,在写的方法上加写锁。我们做个小案例,就是模拟在缓存中存取数据,下面是没有锁的情况
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
//放数据
public void put(String key,Object value){
try {
System.out.println(Thread.currentThread().getName() +" 写入数据 " + key);
TimeUnit.MICROSECONDS.sleep(300);
map.put(key,value);
System.out.println(Thread.currentThread().getName() +" 写入数据成功 " + key);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取数据
public Object get(String key){
Object result = null;
try {
System.out.println(Thread.currentThread().getName() +" 读取数据 " + key);
TimeUnit.MICROSECONDS.sleep(100);
result = map.get(key);
System.out.println(Thread.currentThread().getName() +" 读取数据成功 key:" + key + " value:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result;
}
}
public class ReentryLock {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCache cache = new MyCache();
String[] values = {"A","B","C","D","E"};
for (int i = 1; i <= 5; i++) {
final int key = i;
new Thread(() -> {
cache.put(key+"" ,values[key-1]);
},String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
final int key = i;
new Thread(() -> {
cache.get(key+"" );
},String.valueOf(i)).start();
}
}
}
最终结果
可以看出,在没有锁的情况,即便是正在写的过程中,还是有线程是可以读的,在读的过程中也是有线程可以写,例如上面箭头所指的2写入过程中,就有其他线程在读在写,整个过程中各个线程并没有所谓的控制随心所欲的读写。正确的方式应该是读完了再写,写的时候不能读,写完了再读,那么怎么解决呢,就是用下面的这个读写锁,我们将上面的 代码做一下修改
class MyCache{
private volatile Map<String,Object> map = new HashMap<>();
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
//放数据
public void put(String key,Object value){
try {
rwLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() +" 写入数据 " + key);
TimeUnit.MICROSECONDS.sleep(300);
map.put(key,value);
System.out.println(Thread.currentThread().getName() +" 写入数据成功 " + key);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
rwLock.writeLock().unlock();
}
}
//取数据
public Object get(String key){
Object result = null;
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() +" 读取数据 " + key);
TimeUnit.MICROSECONDS.sleep(100);
result = map.get(key);
System.out.println(Thread.currentThread().getName() +" 读取数据成功 key:" + key + " value:" + result);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
rwLock.readLock().unlock();
}
return result;
}
}
最终结果
通过这个结果我们可以发现,读和写都是一起的,要么读完要么写完,并且可以看出来在1、2读的时候存在共享的情况,也就是1和2线程一起在读,因为1读的时候2也在读,2读完了1才读完。
在上面的读写锁阶段,我们可以发现读锁可以共享,而读的时候不能写,必须要等到读完才行,而写的时候是可以读的,这就是一个锁降级的过程;锁降级是如何实现的?首先获取写锁,然后获取读锁,然后释放写锁,然后释放读锁…这??这什么玩意
public static void main(String[] args) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = rwLock.readLock();//读锁
ReentrantReadWriteLock.WriteLock writeLock = rwLock.writeLock();//写锁
//获取写锁
writeLock.lock();
System.out.println("write lock do something...");
//获取读锁
readLock.lock();
System.out.println("read lock do something...");
//释放写锁
writeLock.unlock();
//释放读锁
readLock.unlock();
}
写锁可以降级为读锁,但是读锁不能升级为写锁
我们可以将上面的读写锁换个位置,先获取读锁,再获取写锁,这个时候可以发现,当程序运行到写锁的时候就已经停住了,因为必须要等到读锁释放才能写。
队列:先进先出
栈:先进后出
链接: 看看人这写的多好
当放入元素的时候,如果队列满了则阻塞,当取出元素的时候,如果队列为空,则阻塞,一旦不为空则开始取。
在多线程领域,所谓阻塞,就是在某些情况下会挂起线程(既阻塞),一旦条件满足,被挂起的线程又会被自动唤醒。
数组实现的有界队列
public static void main(String[] args) {
/**
* 创建一个给定(固定)容量和默认访问策略的ArrayBlockingQueue,容量必须大于0
*/
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
//添加元素,只能添加长度内的个数元素,超出元素个数将抛异常
boolean a = queue.add("a");
//添加元素,如果超过长度则返回false
boolean a = queue.offer("a");
//在此队列的尾部插入指定的元素,如果队列已满,则在指定的等待时间内等待空间可用。
boolean a5 = queue.offer("a", 3L, TimeUnit.SECONDS);
//往队列里放元素,如果超过长度,则阻塞
queue.put("a");
//移除队列的头部第一个元素,如果队列为空,则抛出异常
queue.remove();
//从此队列中删除指定元素的单个实例,如果存在一个或多个元素则移除头部1个匹配到的元素并返回true,如果不存在则返回false
boolean remove = queue.remove("a");
//取出队列的头部,如果队列为空则返回null
String poll = queue.poll();
//取出队列的头部,如果需要元素可用,则等待指定的等待时间
String poll = queue.poll(3L, TimeUnit.SECONDS);
//从队列里取出元素,如果队列为空,则阻塞
String take = queue.take();
//检索但不删除此队列的头部。此方法与{@link#peek peek}的不同之处仅在于如果此队列为空则抛出异常
String element = queue.element();
//队列中是否包含该元素
boolean con = queue.contains("a");
}
在上面的例子中,我们可以看出ArrayBlockingQueue的特点,首先是拥有定长,然后是拥有队列先进先出特点,大部分操作都是放入取出,放就是放在尾部,出就是从头部出。
public static void main(String[] args) throws InterruptedException {
//创建固定数量的线程的线程池
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//创建只有一个线程的线程池
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
//创建一个可缓存的线程池,池中线程数量根据处理数量多少决定,例如十个请求可能会有5个左右的线程,
//20个请求可能会有12个左右的线程
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
try {
for (int i = 0; i < 20; i++) {
final int num = i;
executorService.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 正在处理第" + num + "个任务...");
});
}
}catch (Exception e){
}finally {
//关闭线程池
executorService.shutdown();
}
}
结果:
看一下这三个线程池的源代码
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
我们可以发现,最终使用的还是这个ThreadPoolExecutor这个对象,这个对象的参数有七个,也就是常见的线程池调优7个参数,下面分别介绍这7个参数
工作流程:
如上图,在执行executorService.execute(Runnable.run())
execute方法或者说是执行run方法的时候,我们线程池里的线程才会创建,例如上图中的流程,在执行executer方法后,创建了一个核心线程数为2最大线程数为5的这样一个线程池,然后当第一个第二个任务来临时会优先使用核心线程中的线程去处理,如果这个时候又来了第三第四第五个任务,则会加入到阻塞队列中去等待执行(也就是上图中的黑点队列),如果这个时候又来了第六第七第八个线程,则会新建线程去处理,也就是说第六第七这种新建线程处理的情况会优先于阻塞队列的任务去执行,直到达到最大线程数。如果这个时候第九个线程又来了,而此时线程池已经没有更多资源去处理,则会执行拒绝策略将该任务拒绝掉。
拒绝策略:
RejectedExecutionHandler
在实际开发中我们不会用到上面这几种方式来创建线程池,而是自己定义,原因如下图
public static void main(String[] args) throws InterruptedException {
/**
* 自定义一个线程池
* 核心线程数2
* 最大线程数5
* 存活时间2秒
* 长度为3的有界队列
* 默认的线程工厂
* 线程池线程占用完后抛出异常的拒绝策略
*/
ThreadPoolExecutor customerThreadPool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
try {
for (int i = 0; i < 10; i++) {
final int num = i;
customerThreadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 正在处理" + num + "...");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
customerThreadPool.shutdown();
}
}
最终结果:
在这里我执行了很多遍,每次都是处理7个任务,第八个就开始报错被拒绝了,不知道这是为啥。
需求:将0-100的数相加,如果两个数的差值大于10 则做拆分,如果小于10则相加;
//定义任务类
class MyTask extends RecursiveTask<Integer> {
private static final int VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
//如果两个数的差值大于10 则做拆分,否则相加
if ((end-begin) <= VALUE){
for (int i = begin; i <= end; i++) {
result = result + i;
}
}else{
//拆分
int middle = (end+begin)/2;
final MyTask myTask1 = new MyTask(begin, middle);
final MyTask myTask2 = new MyTask(middle+1, end);
myTask1.fork();
myTask2.fork();
result = myTask1.join() + myTask2.join();
}
return result;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建任务类
MyTask myTask = new MyTask(0,100);
//创建分支合并池
ForkJoinPool forkJoinPool = new ForkJoinPool();
final ForkJoinTask<Integer> submit = forkJoinPool.submit(myTask);
//获取最终合并的结果
final Integer result = submit.get();
System.out.println(result);
//关闭池对象
forkJoinPool.shutdown();
}
同步就是我最终要结果,但是在这之前我要把abcd几个任务都挨个执行完才行,如果c任务执行时间很长,那我就必须等待c任务执行完毕才能到d任务。异步就是我a执行完之后可以直接将bcd这种比较耗时的操作分发出去,我只需要最后在结果的时候汇总。
例如客户在下单之后要通知仓库发货,要计算满减优惠,要增加账户积分,要扣减账户余额,等等这些操作,而这些操作中有些需要顺序执行例如先计算优惠再扣减余额,有些不需要顺序执行,例如通知仓库发货增加账户积分等等,那么在这个过程中我们可以在余额扣减后使用异步任务去通知仓库发货,去增加账户积分等,直接结束当前线程返回给客户响应。如果通知仓库和增加积分失败,也可以重新发起。
通过上面的FutureTask 未来任务,我们可以发现这个异步任务和未来任务很相似,其实看一下CompletableFuture的继承结构就会发现,CompletableFuture也是Future接口的一个实现,可以说是对未来任务的一个增强