个人主页: 【⭐️个人主页】
需要您的【💖 点赞+关注】支持 💯
JAVA并发系列文章
利用不可变对象解决并发问题的模式,就是不可变模式。
如果对象一旦被创建,状态就不会再发生任何变化,并且只允许存在只读方法,这个对象就是不可变对象。
要实现Immuatability
模式很简单,将一个类本身及其所有的属性都设为 final ,并且方法都是只读的,需要注意的是,如果类的属性也是引用类型,那么其对应的类也要满足不可变的特性。
final 应该都很熟悉了,用它来修饰类和方法,分别表示类不可继承、属性不可改变。
Java 中具备不可变性的类型包括:
• String
• final 修饰的基本数据类型
• Integer、Long、Double 等基本数据类型的包装类
• Collections 中的不可变集合
经常用到的String对象和各种基础类型的包装类,比如,Long,Integer都具备不可变性。更进一步,基本数据类型的包装类都用到了享元模式(Flyweight Pattern), 利用享元模式(Flyweight Pattern)可以减少创建对象的数量,从而减少内存占用。Java 语言里面 Long、Integer、Short、Byte 等这些基本数据类型的包装类都用到了享元模式。
但是在使用不可变模式时,一定要搞清楚特定不可变对象的边界在哪里。
😒: 比如,一个final类C的final成员变量a,当a的内部存在非final的其他对象时,并且C中存在着get_a的public接口,那么C就不是线程安全的。
写时复制
(Copy-on-Write
, COW或者CoW)是一种延时策略,只要在真正需要复制的时候才复制,而不是提前复制好。同时还支持按需复制
,COW
通常用于操作系统领域提升性能。
在操作系统领域,除了创建进程用到了 Copy-on-Write,很多文件系统也同样用到了,例如 Btrfs (B-Tree File System)、aufs(advanced multi-layered unification filesystem)等。相比较而言,Java 提供的 Copy-on-Write 容器,由于在修改的同时会复制整个容器,所以在提升读操作性能的同时,是以内存复制为代价的
。除了上面我们说的 Java 领域、操作系统领域,很多其他领域也都能看到 Copy-on-Write 的身影:Docker 容器镜像的设计是 Copy-on-Write,甚至分布式源码管理系统 Git 背后的设计思想都有 Copy-on-Write
Copy-on-Write模式适用于对数据的实时性不敏感,读多写少且对读性能要求极为苛刻的小数据场景。
具体的实现也很简单,当数据需要修改时,先复制一份出来,在复制的数据上进行修改,并发读还是在旧的数据上,当数据修改完成后,再将老数据替换为修改后的新数据即可。但需要注意的是,当发生并发写时,可以使用CAS的策略来完成。
CopyOnWriteArrayList
和CopyOnWriteArraySet
这两个 Copy-on-Write 容器在修改的时候会复制整个数组,所以如果容器经常被修改或者这个数组本身就非常大的时候,是不建议使用
的。反之,如果是修改非常少、数组数量也不大,并且对读性能要求苛刻
的场景,使用 Copy-on-Write 容器效果就非常好了。
🌴 总而言之,Copy-on-Write适合对读的性能要求很高,读多写少,弱一致性场景。如Dubbo中的路由表。
线程本地存储模式(ThreadLocal)
大多数并发问题都是由于变量的共享导致的,多个线程同时读写同一变量便会出现原子性,可见性等问题。局部变量是线程安全的,本质上也是由于各个线程各自拥有自己的变量,避免了变量的共享。
Java 中使用了 ThreadLocal
来实现避免变量共享的方案。ThreadLocal 保证在线程访问变量时,会创建一个这个变量的副本,这样每个线程都有自己的变量值,没有共享,从而避免了线程不安全的问题。
线程封闭的本质就是
避免共享
,除了局部变量,还有Java提供的线程本地存储
(ThreadLocal)也可以实现。
Java中的设计方案:
类Thread
拥有threadLocals
属性,threadLocals
属性的类型是ThreadLocalMap容器
,ThreadLocalMap容器
中以ThreadLocal
为key
,维护不同类型的value。ThreadLocalMap 里对 ThreadLocal 的引用是弱引用
(WeakReference),所以只要== Thread 对象可以被回收,那么 ThreadLocalMap 就能被回收==。这样不容易产生内存泄露
。
在线程池中使用ThreadLocal
很容易产生内存泄漏
,原因在于线程池中线程的存活时间过长
,往往都是和程序同生共死的。.
这意味着Thread持有的ThreadLocalMap一直都不会被回收,再加上ThreadLocalMap中的Entry对ThreadLocal是弱引用(WeakReference),所以只要ThreadLocal结束了自己的生命周期就可以被回收掉。但Entry中的Value是被Entry强引用的,所以即便Value的生命周期结束了,Value也是无法被回收的,从而导致内存泄漏。
那么在线程池中,我们需要自己手动释放对Value的强引用,可以使用try{}finally{}方案。
通过ThreadLocal创建的线程变量,其子线程是无法继承的,也就是通过ThreadLocal创建了线程变量V,后续该线程创建了子线程,而这个子线程中无法通过访问ThreadLocal来访问父线程的线程变量V。
Java提供了InheritableThreadLocal
来支持这种特性,InheritableThreadLocal
是ThreadLocal的子类,所以用法和ThreadLocal相同。我完全不建议你在线程池中使用 InheritableThreadLocal,不仅仅是因为它具有 ThreadLocal 相同的缺点——可能导致内存泄露,更重要的原因是:线程池中线程的创建是动态的,很容易导致继承关系错乱,如果你的业务逻辑依赖 InheritableThreadLocal,那么很可能导致业务逻辑计算错误,而这个错误往往比内存泄露更要命。
public class ThreadLocalTest {
private static final ThreadLocal<SimpleDateFormat> threadLocal =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
public static SimpleDateFormat safeDateFormat() {
return threadLocal.get();
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<SimpleDateFormat> task1 = new FutureTask<>(ThreadLocalTest::safeDateFormat);
FutureTask<SimpleDateFormat> task2 = new FutureTask<>(ThreadLocalTest::safeDateFormat);
Thread t1 = new Thread(task1);
Thread t2 = new Thread(task2);
t1.start();
t2.start();
System.out.println(task1.get() == task2.get());//返回false,表示两个对象不相等
}
}
Suspension 是“挂起”、“暂停”的意思,而 Guarded 则是“担保”的意思,连在一起 就是
确保挂起
。
当线程在访问某个对象时,发现条件不满足,就暂时挂起等待条件满足时 再次访问。
保护性暂停设计模式(Guarded Suspension) ,一般适用于 一个线程等待另外一个线程的执行结果,两个线程一一对应,可以划分为同步的设计模式.
Guarded Suspension 设计模式是很多设计模式的基础,比如生产者消费者模式,同样在 Java 并发包中的 BlockingQueue 中也大量使用到了 Guarded Suspension 设计模式。
如果你需要一个结果要源源不断的从一个线程到另外一个线程,那就需要使用生产者消费者模式->即消息队列.
Guarded Suspension 模式本质上是一种等待唤醒机制的实现,只不过 Guarded Suspension 模式将其规范化了。规范化的好处是你无需重头思考如何实现,也无需担心实现程序的可理解性问题,同时也能避免一不小心写出个 Bug 来。但 Guarded Suspension 模式在解决实际问题的时候,往往还是需要扩展的,扩展的方式有很多,本篇文章就直接对 GuardedObject 的功能进行了增强, 当然,你也可以创建新的类来实现对 Guarded Suspension 模式的扩展。Guarded Suspension 模式也常被称作 Guarded Wait 模式、Spin Lock 模式(因为使用了 while 循环去等待)。
Java的API很多都按照保护性暂停这种设计模式来的,比如 join,future.
Guarded Suspension 模式的结构图,非常简单,一个对象 GuardedObject
,内部有一个成员变量——受保护的对象,以及两个成员方法——get(Predicate p)
和onChanged(T obj)
方法。
class GuardedObject<T>{
//受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout=1;
//获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
//MESA管程推荐写法
while(p.test(obj)){
done.await(timeout, TimeUnit.SECONDS);
}
}catch(InterruptedException e){
throw new RuntimeException(e);
}finally{
lock.unlock();
}
//返回非空的受保护对象
return obj;
}
//事件通知方法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
done.signalAll();
} finally {
lock.unlock();
}
}
// TEST
@Test
public void testGuardedObject(){
SimpleThreadUtils.newLoopThread(2,"gs",1,(t)->{
if (t.getNo() % 2 != 0){
System.out.println("准备获取");
String s = go.get(Objects::isNull);
System.out.println("获取OK " + s );
}else {
try {
Thread.sleep(2000);
go.onChanged("hello world");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}) ;
SimpleThreadUtils.wait(1000,()->{});
}
}
GuardedObject类中增加如下代码,扩展GuardedSupsensionObject 管理功能
//保存所有GuardedObject
final static Map<Object, GuardedObject> gos=new ConcurrentHashMap<>();
//静态方法创建GuardedObject
static <K> GuardedObject create(K key){
GuardedObject go=new GuardedObject();
gos.put(key, go);
return go;
}
static <K, T> void fireEvent(K key, T obj){
GuardedObject go=gos.remove(key);
if (go != null){
go.onChanged(obj);
}
}
在Guarded Suspension模式中,如果消息未获得返回,消费者会始终阻塞在等待条件上,但需要快速放弃也是一个常见的需求。比如,自动存盘需求中,如果文件没有改变,就无需磁盘操作。又一个更常见的需求是,单次初始化操作时,使用init变量来控制。
通常会使用一个状态变量status
来控制是否存在改变,如果对原子性有要求,可以使用互斥锁,如果对原子性无特殊要求,直接使用
volatile`即可。
public class Balking {
/**
* 已编辑
*/
private volatile boolean status = false;
private String buffer;
private String file;
public synchronized void save() {
System.out.println("开始保存。。。");
if (!status) {
System.out.println("取消保存,已经处理了");
return;
}
writeToFile(buffer);
status = false;
}
public synchronized void edit(String changeContent) {
this.buffer = changeContent;
status = true;
}
/**
* 模拟写到文件中
*/
private void writeToFile(String newContent) {
this.file = newContent;
}
public void print(){
System.out.println("> file : " + this.file);
}
public static void main(String[] args) {
Balking balking = new Balking();
SimpleThreadUtils.newLoopThread(3,"balk",100,(a)->{
if (a.getNo() % 3 == 0){
balking.edit("hello world " + a.getName() +":"+ Math.floor(Math.random() * 10 ));
}else {
balking.save();
balking.print();
}
});
SimpleThreadUtils.wait(1000,()->{});
}
}
所谓Per, 就是 “每~” 的意思, 因此, Thread-Per-Message
模式直译过来就是 “每一个消息一个线程” 的意思, Message 在这里可以理解为 “请求” 或 “命令”, 为每个命令或请求分配一个新的线程
, 由这个新线程来执行处理- 这就是 Thread-Per-Message
模式, 在该模式中, 消息的 "委托端"
和 "执行端"
是不同的线程, 消息的委托端线程会告诉执行端的线程 “这项任务交给你执行了”
public class ThreadPerMessage {
public static class Host {
Helper helper = new Helper();
public void request(String request) {
createThreadRun(() -> {
helper.run(request);
});
}
private void createThreadRun(Runnable o) {
// 一般我们不直接new Thread,因为线程创建管理再java中很重,容易导致内存泄漏问题
new Thread(o, UUID.randomUUID().toString()).start();
// 可以使用线程池技术 juc包中的一些线程池或线程工厂进行线程的创建管理
// threadFactory.newThread(o).start();
}
}
public static class Helper {
public void run(String request) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName()+" : 执行请求 " + request);
}
}
public static void main(String[] args) {
Host host = new Host();
for (int i = 0; i < 100; i++) {
host.request("hello");
}
SimpleThreadUtils.wait(1000,()->{});
}
}
线程创建
也叫ThreadPool(线程池模式)
Java语言中,·Thread-Per-Message
·毕竟还是一种高消耗
的并发模式,如果可以使用阻塞队列
做任务池,再创建固定数量
的线程
消费队列中的任务,这种方式也是SDK并发工具包里提供的线程池方案
。
线程池有很多优点,比如,避免重复创建
,销毁线程
,能够限制创建线程的上限 等等。但需要重点关注并 设置阻塞队列 ,比如,无界的阻塞队列
可能在高并发下导致OOM
,不建议
使用,而有界的阻塞队列需要配合合理的拒绝策略
。即,在创建线程池时,建议使用有界队列,并清晰的指明拒绝策略,并且,最好还能为线程赋予一个业务相关的名字,便于调试。
更重要的是,在Worker Thread模式的线程池方案中,需要时刻关注提交到线程池中的任务是否相互独立,如果任务间存在依赖关系,则可能导致线程死锁。死锁产生的原因通常是,线程池中存在相互依赖的任务。比如,线程池中的全部线程都在执行任务1,并且阻塞等待任务2的完成,当提交任务2后,当前并没有任何空闲的线程去调度执行,于是世界静止,死锁产生。
import java.util.Date;
/**
* @author 孔翔
* @since 2023-11-07
* copyright for author : 孔翔 at 2023-11-07
* java-study
*/
public class WorkerThread {
/**
* 请求任务
*/
public static class RequestTask {
private final String taskName;
private final Date taskDate;
RequestTask(String taskName, Date taskDate) {
this.taskDate = taskDate;
this.taskName = taskName;
}
/**
* Runs this operation.
*/
public void runTask() {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(this.taskName + " : 任务执行完成" + ": by " + Thread.currentThread().getName());
}
}
/**
* 任务工厂处理
*/
public static class Channel {
public final static int MAX_WORKER = 10;
/**
* 任务流水线。使用队列
* 这里直接使用工具类,如果自己编写,可以参考
*
* @see GuardedSuspension
*/
private RequestTask[] requestTasks = new RequestTask[MAX_WORKER];
/**
* 线程池,使用可复用,数量可控的线程池降低线程开销
*/
private Worker[] workerThreadPool = new Worker[MAX_WORKER];
private volatile int queueSize ;
Channel(){
initWorkPool();
}
private void initWorkPool() {
for (int i = 0; i < MAX_WORKER; i++) {
workerThreadPool[i] = new Worker(this,"Worker-"+ i);
workerThreadPool[i].start();
}
}
public synchronized void putRequest(RequestTask requestTask) throws InterruptedException {
while (queueSize == MAX_WORKER){
wait();
}
System.out.println("-> "+ requestTask.taskName);
// 通知其他线程,queue可以put数据
requestTasks[queueSize] = requestTask;
queueSize++;
notifyAll();// 通知消费take
}
public synchronized RequestTask takeRequest() throws InterruptedException {
while (queueSize == 0 ){
wait();
}
queueSize--;
System.out.println("<- "+ requestTasks[queueSize].taskName);
notifyAll();
return requestTasks[queueSize];
}
}
/**
* 主要处理任务
*/
public static class Worker extends Thread {
private Channel channel;
public Worker(Channel channel ,String string) {
super( string);
this.channel = channel;
}
@Override
public void run() {
while (true){
try {
// 直接使用了阻塞队列。如果自己实现这里请增加阻塞代码
/*
lock.lock();
RequestTask request task = channel.taskRequest();
while(Task = channel.taskRequest() == null){
condition.await();
}
RequestTask requestTask = channel.taskRequest();
requestTask.runTask();
finally{
lock.unlock();
}
*/
RequestTask requestTask = channel.takeRequest();
requestTask.runTask();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
public static void main(String[] args) throws InterruptedException {
Channel channel = new Channel();
for (int i = 0; i < 100; i++) {
channel.putRequest(new RequestTask("task:"+ i , new Date()));
}
SimpleThreadUtils.holdOn();
}
}
Two-Phase Termination Patter
两阶段终止模式(Two-Phase Termination)是一种用于优雅终止线程或关闭应用程序的设计模式。它包含两个阶段:准备阶段
和终止阶段
。
前面所有的内容都是论述,如何启动多线程去执行一个异步任务。那么,如何优雅的终止线程呢?业界也有一套成熟的方案,叫做“两阶段终止”模式。即,将终止过程分成两个阶段,第一阶段由线程T1向线程T2发送终止指令,第二阶段是由线程T2响应终止指令。
在Java语言中,Java线程存在4种状态:初始状态(NEW)
,可运行/运行状态(RUNNABLE)
,休眠状态(BLOCKED,WAITING,TIMED_WAITING)
和终止状态(TERMINATED)
。如果要让一个线程优雅的进入终止状态,一个大前提是让线程首先进入RUNNABLE
状态,即,将线程从休眠状态中唤醒至可运行/运行状态,这个可以通过interrupt()
方法来实现。
实现上,可以使用一个状态标志位status
,在线程T1里设置T2.status
后,调用T2.interrupt
,将T2从可能的休眠状态中唤醒,T2检查status是否符合终止条件,如果符合,则尝试完成正确的收尾,并优雅退出。
好处:
线程安全
:两阶段终止模式可以确保在终止过程中不会发生并发问题,保证线程安全。可控性
:通过两个明确的阶段,可以更好地控制终止过程,确保在终止前完成必要的清理和资源释放操作。可靠性
:两阶段终止模式可以处理异常情况,即使在终止过程中出现异常,也能够正常地完成终止操作。总的来说,两阶段终止模式适用于需要优雅地终止线程或关闭应用程序,并确保在终止过程中完成必要的清理和资源释放操作的场景。
/**
* @author 孔翔
* @since 2023-11-07
* copyright for author : 孔翔 at 2023-11-07
* java-study
*/
public class TwoPhaseTermination {
// 两阶段终止分为两个阶段
// 1. 发送终止指令
// 》
// 2. 执行终止操作
public static class TwoPhaseThread implements Runnable {
private volatile boolean stopFlag;
// 1. 发送命令
public void terminal(){
this.stopFlag = true;
}
@Override
public void run() {
while (!stopFlag) {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "任务线程正在运行...");
} catch (InterruptedException e) {
// 执行命令
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().getName() + "任务线程收到终止信号,执行终止操作");
}
}
}
}
public static void main(String[] args) {
TwoPhaseThread twoPhaseThread = new TwoPhaseThread();
Thread thread = new Thread(twoPhaseThread);
thread.start();
// 发送终止线程
new Thread(()->{
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("发送执行终止线程命令");
stopThread(thread,twoPhaseThread);
}).start();
SimpleThreadUtils.holdOn();
}
private static void stopThread(Thread thread, TwoPhaseThread twoPhaseThread) {
twoPhaseThread.terminal();
thread.interrupt();
}
}
两个线程池通过一个阻塞队列连接起来,一个生产者线程池向队列添加任务,另一个消费者线程池从队列中消费任务,两个线程池并不知道对方的存在,符合架构设计上的“解耦”,支持异步,并可以通过控制两个线程池的线程数目,平衡生产者和消费者的速度差异。
并且,通过分析任务的类型,可以在消费端更精细化的进行控制。比如,如果执行单个任务的消耗与多个任务批量执行的消耗类似,则可以在大并发的场景下,消费端批量处理或者分阶段的批量处理来提高效率。
这个比较简单
import com.beust.ah.A;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* 生产者-消费者模式
*
* @author 孔翔
* @since 2023-11-07
* copyright for author : 孔翔 at 2023-11-07
* java-study
*/
public class ProductConsumer {
// 核心 : 阻塞队列连接
public static class ConsumerRunnable<T> implements Runnable {
private final BlockingQueue<T> queue;
ConsumerRunnable(BlockingQueue<T> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
try {
T take = queue.take();
handle(take);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void handle(T take) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("---> 执行任务" + take.toString());
}
}
public static class Product<T> {
private final BlockingQueue<T> queue;
Product(BlockingQueue<T> queue) {
this.queue = queue;
}
public void create(T t ){
try {
queue.put(t);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
ConsumerRunnable<String> consumerRunnable = new ConsumerRunnable<>(queue);
Product<String> product = new Product<>(queue);
new Thread(consumerRunnable).start();
new Thread(()->{
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
product.create("i="+ i );
}
}).start();
SimpleThreadUtils.holdOn();
}
}
https://www.jianshu.com/p/18a07025e7e0