package com.mace.juc2;
/**
* 1.如果不做特殊配置,一般为用户线程,是系统的工作线程,用来完成业务操作。
* 2.守护线程为其他线程服务,完成一些系统服务,如果用户线程全部结束,守护线程也将终止,JVM退出。
*/
public class DemonThread {
public static void main(String[] args) {
//This method must be invoked before the thread is started.
//setDaemon()设置为守护线程时必须在线程启动之前,
//否则:IllegalThreadStateException非法的线程状态异常
boolean daemon = Thread.currentThread().isDaemon();
System.out.println(daemon);
}
}
Future 定义了对异步任务的操作,例如:取消异步任务、获取结果、判断任务是否结束等。
/**
* Future 异步任务顶级接口
* FutureTask implements RunnableFuture 多线程+任务+有返回值
* 线程池+FutureTask 可以显著提升程序处理速度
* get()方法容易阻塞,一般放在程序最后。也可以设置等待时间,过时不候。
* isDone()任务是否完成,轮询判断任务状态,当任务完成后再获取结果。
*/
public class FeatureDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<String> futureTask = new FutureTask<>(new Mythread());
Thread thread = new Thread(futureTask);
thread.start();
String string = futureTask.get();
System.out.println(string);
}
}
class Mythread implements Callable<String>{
@Override
public String call() throws Exception {
// TODO Auto-generated method stub
return "Hello call() "+Thread.currentThread().getName();
}
}
/**
* FutureTask获取结果
*/
public class FutureTaskDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
FutureTask<String> task = new FutureTask<>(()->{
TimeUnit.SECONDS.sleep(5);
return "TASK OVER";
});
Thread thread = new Thread(task);
thread.start();
System.out.println("Main thread over");
while(true) {
if(task.isDone()) {
System.out.println(task.get());
break;
}else {
TimeUnit.MILLISECONDS.sleep(500);
System.out.println("任务未结束,请等待...");
}
}
}
}
CompletableFuture提供了一种观察者类似的模式,可以在任务完成后通知监听的一方。
public class CompletableFuture implements Future, CompletionStage
CompletionStage:代表异步计算过程中的某个阶段,一个阶段完成后可能会触发另外一个阶段。
四个常用静态方法:
public class CompleteablefutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//使用默认的线程池ForkJoinPool.commonPool,无返回值。
CompletableFuture<Void> v1 = CompletableFuture.runAsync(()->{
System.out.println("Hello runAsync");
System.out.println(Thread.currentThread().getName());
});
System.out.println(v1.get());
ExecutorService pool = Executors.newFixedThreadPool(3);
//有返回值且使用自己创建的线程池
CompletableFuture<String> v2 = CompletableFuture.supplyAsync(()->{
System.out.println("Hello supplyAsync");
System.out.println(Thread.currentThread().getName());
return "Hello World";
},pool);
System.out.println(v2.get());
pool.shutdown();
}
}
public class CompletableFuturePro {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//使用默认的线程池,相当于守护线程,主线程结束后自动结束得不到结果。
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = ThreadLocalRandom.current().nextInt(10);
System.out.println("计算出结果:" + i);
if(i<4){
int x = i/0;
}
return i;
},executorService).whenComplete((x,e)->{//完成时调用
if(e == null){
System.out.println("获取到结果:"+x);
}
}).exceptionally(e->{//出现异常时调用
e.printStackTrace();
System.out.println(e.getCause()+"\t"+e.getMessage());
return null;
});
System.out.println(Thread.currentThread().getName()+" do other something");
}
private static void taskone() throws InterruptedException, ExecutionException {
CompletableFuture<Integer> task = CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " come in");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
int i = ThreadLocalRandom.current().nextInt(10);
System.out.println("计算出结果:" + i);
return i;
});
System.out.println(Thread.currentThread().getName()+" do other something");
System.out.println(task.get());
}
}
CompletableFuture流式计算,get方法有异常,join方法没有异常。
package com.mace.juc2;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 电商比价案例:
* 1.查询同一个产品在不同电商平台下的价格
*/
public class CompleteableFutureDemo {
public static List<NetMall> malls = Arrays.asList(
new NetMall("jd"),new NetMall("dd"),new NetMall("tb")
);
//step by step
public static List<String> StepByStep(List<NetMall> malls,String name){
return malls.stream().map((n)->{
return String.format("《 %s 》 in %s price is %.2f", name,n.getName(),n.caleMall(name));
}).collect(Collectors.toList());
}
//CompletableFuture异步任务实现
public static List<String> FutureSalce(List<NetMall> malls,String name){
//map映射后必须进行collect收集计算,否则不是异步执行。
return malls.stream().map(mall -> {
return CompletableFuture.supplyAsync(()-> {
return String.format("《 %s 》 in %s price is %.2f", name,mall.getName(),mall.caleMall(name));
}
);
}).collect(Collectors.toList()).stream().map(f -> {
return f.join();
}).collect(Collectors.toList());
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
List<String> stepByStep = StepByStep(malls,"mysql");
System.out.println(stepByStep.toString());
long end = System.currentTimeMillis();
System.out.println("耗时:"+(end-start)+"毫秒");//3513
System.out.println("-----------------");
long start2 = System.currentTimeMillis();
List<String> futureSalce = FutureSalce(malls,"mysql");
System.out.println(futureSalce.toString());
long end2 = System.currentTimeMillis();
System.out.println("耗时:"+(end2-start2)+"毫秒");//1022
}
}
class NetMall{
private String name;
public NetMall(String name) {
super();
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double caleMall(String name) {
double cale =ThreadLocalRandom.current().nextDouble()*2+name.charAt(0);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return cale;
}
}
获得结果和触发计算
public class ComAbleFutureAPI {
public static void main(String[] args) throws Exception{
CompletableFuture<String> task = CompletableFuture.supplyAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "Java";
});
//延时等待任务完成,则正常获取计算结果。
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//直接获取,任务没有计算完成,返回默认值。
boolean complete = task.complete("python");
System.out.println(task.get());
}
}
对计算结果进行处理
public class ComAbleFutureAPI2 {
public static void main(String[] args) throws Exception{
CompletableFuture.supplyAsync(()->{
System.out.println("one step");
return 1;
}).thenApply(v->{
System.out.println("two step");
return v + 1;
}).thenApply(v->{
System.out.println("three step");
return v + 2;
}).whenComplete((v,e)->{
if(e== null) {
System.out.println(v);
}
}).exceptionally(e->{
e.printStackTrace();
return null;
});
}
}
对计算结果进行消费
public class ComAbleFutureAPI3 {
public static void main(String[] args) throws Exception{
CompletableFuture<String> t1 = CompletableFuture.supplyAsync(()->{
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "A";
});
CompletableFuture<String> t2 = CompletableFuture.supplyAsync(()->{
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return "B";
});
CompletableFuture<String> applyToEither = t1.applyToEither(t2, (f)->{
return f + " is fast";
});
System.out.println(applyToEither.join());
System.out.println(Thread.currentThread().getName()+" do something。。。");
}
}
对计算结果进行合并
class Phone{
public synchronized void sendEmail() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("send email");
}
public static synchronized void sendEms() {
System.out.println("send ems");
}
}
public class 对象锁和类锁 {
public static void main(String[] args) {
Phone phone = new Phone();
new Thread(()->{
phone.sendEmail();
}).start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
new Thread(()->{
phone.sendEms();
}).start();
}
}
package com.mace.juc2;
/**
* 公平锁和非公平锁
*/
import java.util.concurrent.locks.ReentrantLock;
class Tickets{
private int ticket = 50;
//ReentrantLock lock = new ReentrantLock();//默认非公平相当于传入false
ReentrantLock lock = new ReentrantLock(true);//公平锁
public void sale() {
lock.lock();
try {
if(ticket > 0) {
System.out.println(Thread.currentThread().getName()+" sale "+ ticket +" ticket");
ticket--;
}
} catch (Exception e) {
// TODO: handle exception
}finally {
lock.unlock();
}
}
}
public class FairLock {
public static void main(String[] args) {
Tickets tickets = new Tickets();
new Thread(()->{
for (int i = 0; i < 60; i++) {
tickets.sale();
}
},"A").start();
new Thread(()->{
for (int i = 0; i < 60; i++) {
tickets.sale();
}
},"B").start();
new Thread(()->{
for (int i = 0; i < 60; i++) {
tickets.sale();
}
},"C").start();
}
}
指同一个线程在外层方法获取锁的时候,再进入该线程的内部方法会自动获取锁(锁对象是同一把锁),不会因为之前已经获取过没有释放而阻塞。ReentrantLock及synchronized都是可重入锁,可重入锁的优点是可一定程度避免死锁。
/**
* 可重入锁
*/
public class SyncReenLock {
public static void main(String[] args) {
Object lock = new Object();
new Thread(()->{
synchronized (lock) {
System.out.println(Thread.currentThread().getName()+" come in 外层");
synchronized (lock) {
System.out.println(Thread.currentThread().getName()+" come in 中层");
synchronized (lock) {
System.out.println(Thread.currentThread().getName()+" come in 内层");
}
}
}
}).start();
}
}
方式一:
概念:用于创建锁和其他同步类的基本线程阻塞原语。
该类与使用它的每个线程关联一个许可证(在Semaphore类的意义上)。 如果许可证可用,将立即返回park ,并在此过程中消费; 否则可能会阻止。 如果尚未提供许可,则致电unpark获得许可。 (与Semaphores不同,许可证不会累积。最多只有一个。)可靠的使用需要使用volatile(或原子)变量来控制何时停放或取消停放。 对于易失性变量访问保持对这些方法的调用的顺序,但不一定是非易失性变量访问。
package juc;
import java.util.concurrent.TimeUnit;
/**
* @Author zhangxuhui
* @Date 2022/7/31
* @email zxh_1633@163.com
*
* waith和sleep的区别
*
* 1、每个对象都有一个锁来控制同步访问,Synchronized关键字可以和对象的锁交互,来实现同步方法或同步块。
* sleep()方法正在执行的线程主动让出CPU(然后CPU就可以去执行其他任务),
* 在sleep指定时间后CPU再回到该线程继续往下执行(注意:sleep方法只让出了CPU,而并不会释放同步资源锁!!!);
* wait()方法则是指当前线程让自己暂时退让出同步资源锁,以便其他正在等待该资源的线程得到该资源进而运行,
* 只有调用了notify()方法,之前调用wait()的线程才会解除wait状态,可以去参与竞争同步资源锁,进而得到执行。
* (注意:notify的作用相当于叫醒睡着的人,而并不会给他分配任务,就是说notify只是让之前调用wait的线程有权利重新参与线程的调度);
*
* 2、sleep()方法可以在任何地方使用;wait()方法则只能在同步方法或同步块中使用;
*
* 3、sleep()是线程线程类(Thread)的方法,调用会暂停此线程指定的时间,但监控依然保持,不会释放对象锁,
* 到时间自动恢复;wait()是Object的方法,调用会放弃对象锁,进入等待队列,
* 待调用notify()/notifyAll()唤醒指定的线程或者所有线程,才会进入锁池,
* 不再次获得对象锁才会进入运行状态;
*/
public class WaitNotifyDemo {
public static void main(String[] args) throws InterruptedException {
Object lock = new Object();
new Thread(()->{
synchronized (lock){
try {
System.out.println(Thread.currentThread().getName()+" come in");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName()+" wait");
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+" over exit");
}
}).start();
TimeUnit.SECONDS.sleep(2);
new Thread(()->{
synchronized (lock){
lock.notify();
System.out.println(Thread.currentThread().getName()+" send notify");
}
}).start();
}
}
package juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @Author zhangxuhui
* @Date 2022/7/31
* @email zxh_1633@163.com
* 必须在lock中且成对出现
*/
public class AwaitSingleDemo {
public static void main(String[] args) throws InterruptedException {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(()->{
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+"\t come in");
condition.await();
System.out.println(Thread.currentThread().getName()+"\t 被唤醒");
}catch (InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}).start();
TimeUnit.SECONDS.sleep(2);
new Thread(()->{
lock.lock();
try {
condition.signal();
System.out.println(Thread.currentThread().getName()+"\t 发出唤醒通知。。。");
}finally {
lock.unlock();
}
}).start();
}
}
package juc;
import java.util.concurrent.locks.LockSupport;
/**
* @Author zhangxuhui
* @Date 2022/7/31
* @email zxh_1633@163.com
*/
public class LockSupportDemo {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t come in");
LockSupport.park();//检查凭证,有则放行无则阻塞
System.out.println(Thread.currentThread().getName() + "\t over");
});
t.start();
Thread.sleep(2000);
new Thread(()->{
LockSupport.unpark(t);
System.out.println(Thread.currentThread().getName()+"\t 发放通行证");
}).start();
}
}
概念:一个线程不应该由其他线程来强制中断或停止,而是应该由该线程自己停止,在Java中没有办法立即停止一条线程,Java提供了一种用于停止线程的协商机制-中断,也即中断标识协商机制。中断的过程完全需要程序员自己实现,若要中断一个线程,需要手动调用interrupt方法,该方法也仅仅将线程对象的中断标识设为true,后续判断中断标识等处理需手动实现。
public class StopThreadByVoliate {
//线程之间可见
private static volatile boolean isStop = false;
public static void main(String[] args) throws Exception {
new Thread(()->{
while (true){
if(isStop){
System.out.println("isStop="+isStop);
break;
}
System.out.println("thread running...");
}
},"a").start();
TimeUnit.SECONDS.sleep(1);
isStop = true;
}
}
public class StopThreadByAtomicBoolean {
static AtomicBoolean flag = new AtomicBoolean(false);
public static void main(String[] args) throws InterruptedException {
new Thread(()->{
while (true){
if(flag.get()){
System.out.println("flag="+flag);
break;
}
System.out.println("thread running...");
}
},"a").start();
TimeUnit.MILLISECONDS.sleep(1);
flag.set(true);
}
}
public class StopThreadByInterrupt {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()->{
while (true){
if(Thread.currentThread().isInterrupted()){
System.out.println("isInterrupted="+Thread.currentThread().isInterrupted());
break;
}
System.out.println("thread running...");
}
},"a");
t.start();
System.out.println("默认:"+t.isInterrupted());
TimeUnit.MILLISECONDS.sleep(1);
t.interrupt();
}
}
public class TestInterruptException {
public static void main(String[] args) throws Exception {
Thread t = new Thread(() -> {
while (true) {
if(Thread.currentThread().isInterrupted()){
System.out.println("thread is interrupted ");
break;
}
try {
Thread.sleep(200);
} catch (InterruptedException e) {
//如果该线程阻塞的调用wait() , wait(long) ,或wait(long, int)的方法Object类,
// 或的join() , join(long) , join(long, int) , sleep(long) ,或sleep(long, int) ,
// 这个类的方法,那么它的中断状态将被清除,并且将收到InterrupsstedException 。
//清除后需要重新设置
Thread.currentThread().interrupt();
e.printStackTrace();
}
System.out.println("thread running...");
}
});
t.start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
t.interrupt();
}).start();
}
}
总结:
JVM规范中定义了Java内存模型(Java memory model)来屏蔽掉各种硬件和操作系统的内存访问差异,以实现让Java程序在各种平台下都能达到一致的内存访问效果。
JMM本身时一种抽象的概念并不是真实存在它仅仅描述的是一组约定和规范,通过这组规范定义了程序中(尤其是多线程)各个变量的读写方式并决定一个线程对共享变量的写入何时以及如何变成对另一个线程可见,关键技术点都是围绕多线程的原子性、可见性和有序性展开的。
当一个线程修改了某个共享变量的值,其他线程是否能够立即知道该变量,JMM规定了所有的变量都存储在主内存中。
系统主内存共享变量数据修改被写入的时机是不确定的,多线程并发下很可能出现脏读,所以每个线程都有自己的工作内存,线程自己的工作内存中保存了该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在自己的工作内存中进行,而不能够直接读写主内存中的变量。不同线程之间也无法直接访问对方工作内存中的变量,线程间变量值的传递需要同过主内存来完成。
指一个操作是不可打断的,即多线程环境下,操作不能被其他线程干扰。
在JMM中,如果一个操作执行的结果需要对另一个操作可见性或者代码重排序,那么这两个操作之间必须存在happens-before先行发生原则。