作者:Grey
原文地址:
顺序(sequential)用于表示多个操作『依次』处理。比如把十个操作交给一个人处理时,这个人要一个一个地按顺序来处理。
并行(parallel)用于表示多个操作『同时』处理”。比如十个操作分给两个人处理时,这两个人会并行来处理。
并发(concurrent)相对于顺序和并行来说比较抽象,用于表示『将一个操作分割成多个部分并且允许无序处理』。比如将十个操作分成相对独立的两类,这样便可以开始并发处理了。如果一个人来处理,这个人就是顺序处理分开的并发操作,而如果是两个人。这两个人就可以并行处理同一操作。
如果 CPU 只有一个,那么并发处理就是顺序执行的,而如果有多个 CPU,那么并发处理就可能会并行运行。
程序是计算机的可执行文件;
进程是计算机资源分配的基本单位;
线程是资源调度执行的基本单位,也可以说:线程是一个程序里面不同的执行路径,多个线程共享进程中的资源;
协程是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。协程在子程序内部可中断的,然后转而执行别的子程序,在适当的时候再返回来接着执行。
协程的特点在于是一个线程执行,那和多线程比,协程有如下优势:
优势一:极高的执行效率:因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销,和多线程比,线程数量越多,协程的性能优势就越明显;
优势二:不需要多线程的锁机制:因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不加锁,只需要判断状态就好了,所以执行效率比多线程高很多。
注意:协程避免了无意义的调度,由此可以提高性能,但是程序员必须自己承担调度的责任,同时,协程也失去了标准线程使用多 CPU 的能力。
一个简单的协程示例, 代码如下:
注:
需要引入quasar-core依赖包。
如果在 Java SE 16 以及更高版本上运行,需要增加如下参数
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED
package git.snippets.juc;
import co.paralleluniverse.fibers.Fiber;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import java.util.concurrent.ExecutionException;
/**
* Java协程示例
* JDK 11 ~ JDK 15 没问题,
*
* JDK 16 开始,需要增加如下参数
*
* --add-exports=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-exports=jdk.unsupported/sun.misc=ALL-UNNAMED
*
* @since jdk11
* 需要引入:quasar-core依赖包
*/
public class FiberSample {
private static void printer(Channel<Integer> in) throws SuspendExecution, InterruptedException {
Integer v;
while ((v = in.receive()) != null) {
System.out.println(v);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException, SuspendExecution {
//定义两个Channel
try (Channel<Integer> naturals = Channels.newChannel(-1); Channel<Integer> squares = Channels.newChannel(-1)) {
//运行两个Fiber实现.
new Fiber(() -> {
for (int i = 0; i < 10; i++) {
naturals.send(i);
}
naturals.close();
}).start();
new Fiber(() -> {
Integer v;
while ((v = naturals.receive()) != null) {
squares.send(v * v);
}
squares.close();
}).start();
printer(squares);
}
}
}
线程就是轻量级进程,是程序执行的最小单位。
多进程的方式也可以实现并发,为什么我们要使用多线程?主要是基于以下两方面的原因:
共享资源在线程间的通信比较容易。
线程开销更小。
进程是一个独立的运行环境,而线程是在进程中执行的一个任务。他们两个本质的区别在于是否单独占有内存地址空间及其它系统资源。
进程是操作系统进行资源分配的基本单位,而线程是操作系统进行调度的基本单位,即 CPU 分配时间的单位。
进程单独占有一定的内存地址空间,所以进程间存在内存隔离,数据是分开的,数据共享复杂但是同步简单,各个进程之间互不干扰;而线程共享所属进程占有的内存地址空间和资源,数据共享简单,但是同步复杂。
进程单独占有一定的内存地址空间,一个进程出现问题不会影响其他进程,不影响主程序的稳定性,可靠性高;一个线程崩溃可能影响整个程序的稳定性,可靠性较低。
进程的创建和销毁不仅需要保存寄存器和栈信息,还需要资源的分配回收以及页调度,开销较大;线程只需要保存寄存器和栈信息,开销较小。
类变量(类里面 static 修饰的变量)保存在“方法区”
实例变量(类里面的普通变量)保存在“堆”
局部变量(方法里声明的变量)“虚拟机栈”
“方法区”和“堆”都属于线程共享数据区,“虚拟机栈”属于线程私有数据区。
因此,局部变量是不能多个线程共享的,而类变量和实例变量是可以多个线程共享的。事实上,在 Java 中,多线程间进行通信的唯一途径就是通过类变量和实例变量。也就是说,如果一段多线程程序中如果没有类变量和实例变量,那么这段多线程程序就一定是线程安全的。
开发过程中,为了解决线程安全问题,有如下角度可以考虑:
第一种方案:尽量使用局部变量,代替实例变量和静态变量。
第二种方案:如果必须是实例变量,那么可以考虑创建多个对象,这样实例变量的内存就不共享了( 1 个线程对应 1 个对象,100 个对象对应 100 个对象,对象不共享,就没有数据安全问题了)
第三种方案:如果不使用局部变量。对象也不能创建多个。这个时候,就只能选择syncharonized
了。
其中共享资源包括:
进程代码段
进程的公有数据
进程打开的文件描述符、信号的处理器、进程的当前目录和进程用户 ID 与进程组 ID。
独有资源包括:
线程ID:每个线程都有自己的线程 ID,这个 ID 在本进程中是唯一的。进程用此来标识线程。
寄存器组的值:由于线程间是并发运行的,每个线程有自己不同的运行线索,当从一个线程切换到另一个线程上时,必须将原有的线程的寄存器集合的状态保存,以便将来该线程在被重新切换到时能得以恢复。
线程的堆栈:堆栈是保证线程独立运行所必须的。线程函数可以调用函数,而被调用函数中又是可以层层嵌套的,所以线程必须拥有自己的函数堆栈, 使得函数调用可以正常执行,不受其他线程的影响。
错误返回码:由于同一个进程中有很多个线程在同时运行,可能某个线程进行系统调用后设置了 err no 值,而在该线程还没有处理这个错误,另外一个线程就在此时被调度器投入运行,这样错误值就有可能被修改。所以,不同的线程应该拥有自己的错误返回码变量。
线程的信号屏蔽码:由于每个线程所感兴趣的信号不同,所以线程的信号屏蔽码应该由线程自己管理。但所有的线程都共享同样的信号处理器。
线程的优先级:由于线程需要像进程那样能够被调度,那么就必须要有可供调度使用的参数,这个参数就是线程的优先级。
从底层角度上看,CPU 主要由如下三部分组成,分别是:
ALU: 计算单元
Registers: 寄存器组
PC:存储到底执行到哪条指令
T1 线程在执行的时候,将 T1 线程的指令放在 PC,数据放在 Registers,假设此时要切换成 T2 线 程,T1 线程的指令和数据放 cache,然后把 T2 线程的指令放 PC,数据放 Registers,执行 T2 线程即可。
以上的整个过程是通过操作系统来调度的,且线程的调度是要消耗资源的,所以,线程不是设置越多越好。
示例:
单线程和多线程来累加 1 亿个数。 示例代码如下
package git.snippets.juc;
import java.text.DecimalFormat;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* 多线程求1亿个Double类型的数据
*
* @author Grey
* @date 2021/7/7
* @since
*/
public class CountSum {
private static final double[] NUMS = new double[1_0000_0000];
private static final Random R = new Random();
private static final DecimalFormat FORMAT = new DecimalFormat("0.00");
static {
for (int i = 0; i < NUMS.length; i++) {
NUMS[i] = R.nextDouble();
}
}
static double result1 = 0.0, result2 = 0.0, result = 0.0;
public static void rand() {
for (int i = 0; i < NUMS.length; i++) {
NUMS[i] = R.nextDouble();
}
}
/**
* 单线程计算一亿个Double类型的数据之和
*
* @return
*/
public static String m1() {
long start = System.currentTimeMillis();
double result = 0.0;
for (double num : NUMS) {
result += num;
}
long end = System.currentTimeMillis();
System.out.println("计算1亿个随机Double类型数据之和[单线程], 结果是:result = " + FORMAT.format(result) + " 耗时 : " + (end - start) + "ms");
return String.valueOf(FORMAT.format(result));
}
/**
* 两个线程计算一亿个Double类型的数据之和
*
* @return
*/
private static String m2() throws Exception {
long start = System.currentTimeMillis();
result1 = 0.0;
result2 = 0.0;
int len = (NUMS.length >> 1);
Thread t1 = new Thread(() -> {
for (int i = 0; i < len; i++) {
result1 += NUMS[i];
}
});
Thread t2 = new Thread(() -> {
for (int i = len; i < NUMS.length; i++) {
result2 += NUMS[i];
}
});
t1.start();
t2.start();
t1.join();
t2.join();
result = result1 + result2;
long end = System.currentTimeMillis();
System.out.println("计算1亿个随机Double类型数据之和[2个线程], 结果是:result = " + FORMAT.format(result) + " 耗时 : " + (end - start) + "ms");
return String.valueOf(FORMAT.format(result));
}
/**
* 10个线程计算一亿个Double类型的数据之和
*
* @return
*/
private static String m3() throws Exception {
long start = System.currentTimeMillis();
final int threadCount = 10;
Thread[] threads = new Thread[threadCount];
double[] results = new double[threadCount];
final int segmentCount = NUMS.length / threadCount;
CountDownLatch latch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
int m = i;
threads[i] = new Thread(() -> {
for (int j = m * segmentCount; j < (m + 1) * segmentCount && j < NUMS.length; j++) {
results[m] += NUMS[j];
}
latch.countDown();
});
}
double resultM3 = 0.0;
for (Thread t : threads) {
t.start();
}
latch.await();
for (double v : results) {
resultM3 += v;
}
long end = System.currentTimeMillis();
System.out.println("计算1亿个随机Double类型数据之和[10个线程], 结果是:result = " + FORMAT.format(resultM3) + " 耗时 : " + (end - start) + "ms");
return String.valueOf(FORMAT.format(resultM3));
}
public static void main(String[] args) throws Exception {
int testCount = 10;
boolean correct = true;
for (int i = 0; i < testCount; i++) {
rand();
String s = m1();
String s1 = m2();
String s2 = m3();
if (!s1.equals(s2) || !s1.equals(s)) {
System.out.println("oops!");
System.out.println(s1);
System.out.println(s2);
System.out.println(s);
correct = false;
break;
}
}
if (correct) {
System.out.println("test finished");
}
}
}
运行结果
……
计算1亿个随机Double类型数据之和[单线程], 结果是:result = 49998124.71 耗时 : 114ms
计算1亿个随机Double类型数据之和[2个线程], 结果是:result = 49998124.71 耗时 : 53ms
计算1亿个随机Double类型数据之和[10个线程], 结果是:result = 49998124.71 耗时 : 54ms
计算1亿个随机Double类型数据之和[单线程], 结果是:result = 50000309.80 耗时 : 102ms
计算1亿个随机Double类型数据之和[2个线程], 结果是:result = 50000309.80 耗时 : 53ms
计算1亿个随机Double类型数据之和[10个线程], 结果是:result = 50000309.80 耗时 : 35ms
计算1亿个随机Double类型数据之和[单线程], 结果是:result = 50001943.57 耗时 : 108ms
计算1亿个随机Double类型数据之和[2个线程], 结果是:result = 50001943.57 耗时 : 58ms
计算1亿个随机Double类型数据之和[10个线程], 结果是:result = 50001943.57 耗时 : 41ms
计算1亿个随机Double类型数据之和[单线程], 结果是:result = 49997176.44 耗时 : 102ms
计算1亿个随机Double类型数据之和[2个线程], 结果是:result = 49997176.44 耗时 : 53ms
计算1亿个随机Double类型数据之和[10个线程], 结果是:result = 49997176.44 耗时 : 29ms
……
可以看到结果中,创建 10 个线程 不一定会比创建 2 个线程要执行更快。
有意义,因为线程的操作中可能有不消耗 CPU 的操作,比如:等待网络的传输,或者线程 sleep,此时就可以让出 CPU 去执行其他线程。可以充分利用 CPU 资源。
N = Ncpu * Ucpu * (1 + W/C)
其中:
Ncpu 是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()
得到
Ucpu 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)
W/C 是等待时间和计算时间的比率。
更深入的分析,可以参考这篇文章
使用如下代码:
public class HowManyThreadHelloWorld {
public static void main(String[] args) {
Thread t = Thread.currentThread();
System.out.println("\n线程:" + t.getName() + "\n");
System.out.println("hello world!");
for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
Thread thread = entry.getKey();
StackTraceElement[] stackTraceElements = entry.getValue();
if (thread.equals(Thread.currentThread())) {
continue;
}
System.out.println("\n线程: " + thread.getName() + "\n");
for (StackTraceElement element : stackTraceElements) {
System.out.println("\t" + element + "\n");
}
}
}
}
在 Java SE 11 下执行,可以看到,有如下线程信息
线程:main
线程: Reference Handler
线程: Signal Dispatcher
线程: Finalizer
线程: Common-Cleaner
线程: Attach Listener
在 Java SE 8 下执行,有如下线程信息
线程:main
线程: Finalizer
线程: Attach Listener
线程: Signal Dispatcher
线程: Reference Handler
其中
Reference Handler:处理引用对象本身的垃圾回收
Finalizer:处理用户的 Finalizer 方法
Signal Dispatcher:外部 jvm 命令的转发器
Attach Listener: jvm 提供一种 jvm 进程间通信的能力,能让一个进程传命令给另外一个进程
Common-Cleaner: 该线程是 Java SE 9 之后新增的守护线程,用来更高效的处理垃圾回收
继承Thread
类,重写run
方法。
实现Runnable
接口,实现run
方法,这比方式 1 更好,因为一个类实现了Runnable
以后,还可以继承其他类
通过线程池创建。
在需要返回值的时候,可以通过Callable
、Future
与FutureTask
来创建。
示例代码如下
package git.snippets.juc;
import java.util.concurrent.*;
/**
* 创建线程的方式
*
* @author Grey
* @date 2021/7/7
* @since 1.8
*/
public class HelloThread {
public static void main(String[] args) throws Exception {
MyFirstThread t1 = new MyFirstThread();
Thread t2 = new Thread(new MySecondThread());
Thread t3 = new Thread(new FutureTask<>(new CallableThreadTest()));
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> System.out.println("方式3:使用线程池来创建线程。"));
t1.start();
t2.start();
t3.start();
executor.shutdown();
boolean b = executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println(b ? "停止成功" : "停止失败");
}
static class MyFirstThread extends Thread {
@Override
public void run() {
System.out.println("方式1:继承Thread类并重写run方法来创建线程");
}
}
/**
* 方式二, 实现Runnable接口来创建线程
*/
static class MySecondThread implements Runnable {
@Override
public void run() {
System.out.println("方式2:实现Runnable方式来创建线程");
}
}
static class CallableThreadTest implements Callable<Integer> {
@Override
public Integer call() {
int i;
for (i = 0; i < 10; i++) {
i++;
}
System.out.println("方式4,实现Callable接口方式来创建有返回值的线程,返回值是:" + i);
return i;
}
}
}
NEW:线程刚刚创建,还没有启动,New Thread 的时候,还没有调用start
方法时候,就是这个状态
RUNNABLE:可运行状态,由线程调度器可以安排执行,包括以下两种情况:
READY
RUNNING
READY 和 RUNNING 通过yield
方法来切换
WAITING:等待被唤醒
TIMED_WAITING:隔一段时间后自动唤醒
BLOCKED:被阻塞,正在等待锁,只有在synchronized
的时候在会进入BLOCKED
状态
TERMINATED:线程执行完毕后,是这个状态
各个线程状态切换如下
sleep:当前线程睡一段时间
yield:这是一个静态方法,一旦执行,它会使当前线程让出一下 CPU。但要注意,让出 CPU 并不表示当前线程不执行了。当前线程在让出 CPU 后,还会进行 CPU 资源的争夺,但是是否能够再次被分配到就不一定了。
join:等待另外一个线程的结束,当前线程才会运行,示例代码如下:
public class ThreadBasicOperation {
static volatile int sum = 0;
public static void main(String[] args) throws Exception {
Thread t = new Thread(() -> {
for (int i = 1; i <= 100; i++) {
sum += i;
}
});
t.start();
// join 方法表示主线程愿意等待子线程执行完毕后才继续执行
// 如果不使用join方法,那么sum输出的可能是一个很小的值,因为还没等子线程
// 执行完毕后,主线程就已经执行了打印sum的操作
t.join();
System.out.println(sum);
}
}
interrupt:打断线程执行,有三个方法。
// 打断某个线程(设置标志位)
interrupt()
// 查询某线程是否被打断过(查询标志位)
isInterrupted()
// 查询当前线程是否被打断过,并重置打断标志位
Thread.interrupted()
示例代码如下
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* interrupt示例
*
* @author Grey
* @since 1.8
*/
public class ThreadInterrupt {
private static final ReentrantLock LOCK = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(() -> {
for (; ; ) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("t thread interrupted");
System.out.println(Thread.currentThread().isInterrupted());
break;
}
}
});
t.start();
TimeUnit.SECONDS.sleep(3);
t.interrupt();
Thread t2 = new Thread(() -> {
for (; ; ) {
if (Thread.interrupted()) {
System.out.println("t2 thread interrupted");
// Thread.interrupted()会将线程中断状态置为false
System.out.println(Thread.currentThread().isInterrupted());
break;
}
}
});
t2.start();
TimeUnit.SECONDS.sleep(3);
t2.interrupt();
Thread t3 = new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
System.out.println("t3 interrupted");
// 如果不加上这一句,那么Thread.currentThread().isInterrupted()将会都是false,因为在捕捉到InterruptedException异常的时候就会自动的中断标志置为了false
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().isInterrupted());
}
});
t3.start();
TimeUnit.SECONDS.sleep(3);
t3.interrupt();
final Object o = new Object();
Thread t4 = new Thread(() -> {
synchronized (o) {
try {
o.wait();
} catch (InterruptedException e) {
System.out.println("t4 interrupted!");
Thread.currentThread().interrupt();
System.out.println(Thread.currentThread().isInterrupted());
}
}
});
t4.start();
TimeUnit.SECONDS.sleep(10);
t4.interrupt();
Thread t5 = new Thread(() -> {
synchronized (o) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t5.start();
TimeUnit.SECONDS.sleep(1);
Thread t6 = new Thread(() -> {
synchronized (o) {
}
System.out.println("t6 finished");
});
t6.start();
t6.interrupt();
Thread t7 = new Thread(() -> {
LOCK.lock();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
System.out.println("t7 end");
});
t7.start();
TimeUnit.SECONDS.sleep(1);
Thread t8 = new Thread(() -> {
LOCK.lock();
try {
} finally {
LOCK.unlock();
}
System.out.println("t8 end");
});
t8.start();
TimeUnit.SECONDS.sleep(1);
t8.interrupt();
Thread t9 = new Thread(() -> {
LOCK.lock();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
LOCK.unlock();
}
System.out.println("t7 end");
});
t9.start();
TimeUnit.SECONDS.sleep(1);
Thread t10 = new Thread(() -> {
System.out.println("t10 start");
try {
LOCK.lockInterruptibly();
} catch (InterruptedException e) {
System.out.println("t10 interrupted");
} finally {
LOCK.unlock();
}
System.out.println("t8 end");
});
t10.start();
TimeUnit.SECONDS.sleep(1);
t10.interrupt();
}
}
问题1:反复调用同一个线程的start()
方法是否可行?
问题2:假如一个线程执行完毕(此时处于 TERMINATED 状态),再次调用这个线程的start()
方法是否可行?
两个问题的答案都是不可行,在调用一次start()
之后,threadStatus
的值会改变(threadStatus !=0
),此时再次调用start()
方法会抛出IllegalThreadStateException
异常。
不推荐的方式
stop
方法
suspend
结合resume
方法
以上两种方式都不建议使用, 因为会释放所有的锁, 所以容易产生数据不一致的问题。
优雅的方式
如果不依赖循环的具体次数或者中间状态, 可以通过设置标志位的方式来控制。
如果要依赖循环的具体次数或者中间状态, 则可以用interrupt
方法。
上述四种方式的示例代码如下:
package git.snippets.juc;
import java.util.concurrent.TimeUnit;
/**
* 如何结束一个线程
*
* @author Grey
* @since 1.8
*/
public class ThreadFinished {
private static volatile boolean flag = true;
public static void main(String[] args) throws InterruptedException {
// 推荐方式:设置标志位
useVolatile();
// 推荐方式:使用interrupt
useInterrupt();
// 使用stop方法来结束线程,不推荐
useStop();
// 使用suspend/resume方法来结束线程,不推荐
useResumeAndSuspend();
}
private static void useResumeAndSuspend() throws InterruptedException {
Thread t2 = new Thread(() -> {
System.out.println("t2 start");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// e.printStackTrace();
}
System.out.println("t2 finished");
});
t2.start();
TimeUnit.SECONDS.sleep(1);
t2.suspend();
TimeUnit.SECONDS.sleep(1);
t2.resume();
}
private static void useStop() throws InterruptedException {
Thread t = new Thread(() -> {
System.out.println("t start");
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
// e.printStackTrace();
}
System.out.println("t finished");
});
t.start();
TimeUnit.SECONDS.sleep(1);
t.stop();
}
private static void useInterrupt() throws InterruptedException {
Thread t4 = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
}
System.out.println("t4 end");
});
t4.start();
TimeUnit.SECONDS.sleep(1);
t4.interrupt();
}
private static void useVolatile() throws InterruptedException {
Thread t3 = new Thread(() -> {
long i = 0L;
while (flag) {
i++;
}
System.out.println("count sum i = " + i);
});
t3.start();
TimeUnit.SECONDS.sleep(1);
flag = false;
}
}
本文涉及到的所有代码和图例
更多内容见:Java 多线程