Java的线程调度策略是基于线程优先级的抢占式调度,同优先级的线程的执行顺序是看JVM心情的、不可预测的。那有什么办法来控制多线程按照开发者意愿的顺序执行呢?方法总比问题多:
在网上有些文章说Cyclicbarrier也可以实现控制顺序,但研究下来并不是同意义上的顺序,Cyclicbarrier是标记若干个目标点(目标点有需满足的线程数),在并发时,需等到规定线程数到达标记点,然后才可以继续往下执行。那谁先到达目标点还是要看JVM的心情啊,并且目标点后,所有线程继续执行,那执行顺序也是不受控的。(代码在最后有提供,可参考)
这里先给一个不加任何控制操作的三个线程执行示例,方便对照测试:
public class ThreadSequence {
volatile static Thread t0, t1, t2;
public static void main(String[] args) {
//初始化线程
usePriority();
//启动线程
t0.start();
t1.start();
t2.start();
}
private static void usePriority() {
t0 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "执行完成");
}
});
}
}
可以运行多几次,就会发现会出现顺序错乱的现象。
通过设置线程的优先级来控制执行顺序,但不能设置同一优先级,所以这个方案最多支持10个线程的顺序执行。优先级1-10从高到低顺序执行。【t0->t1->t2】
public class ThreadSequence {
volatile static Thread t0,t1,t2;
public static void main(String[] args) {
usePriority();
t0.setPriority(3);
t1.setPriority(2);
t2.setPriority(1);
t1.start();
t0.start();
t2.start();
}
private static void usePriority(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
}
Thread类中定义的join()方法,调用此方法使所属的线程对象进入执行run()方法中任务,而当前线程进入无限的阻塞,直到join()的线程执行完成。
在主线程按顺序join()子线程,来控制线程按顺序执行:【t0->t1->t2】
public class ThreadSequence {
volatile static Thread t0,t1,t2;
public static void main(String[] args) {
//使用join
useJoin();
t0.start();
t0.join();
t1.start();
t1.join();
t2.start();
}
//使用join
private static void useJoin(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
}
在子线程前置子线程的join(),来控制线程按顺序执行:【t2->t1->t0】
public class ThreadSequence {
volatile static Thread t0,t1,t2;
public static void main(String[] args) {
//使用join
useJoin();
t0.start();
t1.start();
t2.start();
}
//使用join
private static void useJoin(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
try {
t1.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
}
wait()方法是Object类的方法,该方法用来将当前线程置入“预执行队列”中,并且在wait()所在的代码行处停止执行,直到接到通知或被中断为止。
在调用wait()之前,线程必须获取到该对象的对象级别锁,即只能在同步方法或同步块中调用wait()方法。在执行wait()方法后,当前线程释放锁。【t0->t1->t2】
public class ThreadSequence {
volatile static Thread t0,t1,t2;
static boolean t1Run = false,t2Run = false; //定义n-1个运行条件
static Object lock1 = new Object(),lock2 = new Object(); //定义n-1个锁资源
public static void main(String[] args) {
useWait();
t2.start();
t0.start();
t1.start();
}
private static void useWait(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
//t0执行
synchronized (lock1){
System.out.println(Thread.currentThread().getName()+"执行");
//通知t1执行
t1Run = true;
//释放lock1
lock1.notify();
System.out.println(Thread.currentThread().getName()+"执行完成");
}
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock1){
if (!t1Run){
try {
//lock1等待
lock1.wait();
System.out.println(Thread.currentThread().getName()+"进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"执行");
synchronized (lock2) {
//通知t2执行
t2Run = true;
//释放lock2
lock2.notify();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock2){
if (!t2Run){
//t2执行
try {
//lock2等待
lock2.wait();
System.out.println(Thread.currentThread().getName()+"进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName()+"执行");
System.out.println(Thread.currentThread().getName()+"执行完成");
}
}
});
}
}
Condition可以实现多路通知,也就是在一个Lock对象里面可以创建多个Condition(即对象监视器)实例,线程对象可以注册在指定的Condition中,从而可以有选择性的进行线程通知,在调度线程上更加灵活。在使用notify/notifyAll通知时,被通知的线程却是由JVM随机选择的。【t0->t1->t2】
public class ThreadSequence {
volatile static Thread t0,t1,t2;
static boolean t1Run = false,t2Run = false; //定义n-1个运行条件
static Lock lock = new ReentrantLock();
static Condition condition1 = lock.newCondition();
static Condition condition2 = lock.newCondition();
public static void main(String[] args) {
useCondition();
t2.start();
t0.start();
t1.start();
}
private static void useCondition(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
//t0获取锁
lock.lock();
System.out.println(Thread.currentThread().getName()+"执行");
//设置t1可运行
t1Run = true;
//通知t1运行
condition1.signal();
//t0释放锁
lock.unlock();
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
//t1获取锁
lock.lock();
try {
//是否可运行状态
if (!t1Run){
//不可运行状态,进入等待
condition1.await();
System.out.println(Thread.currentThread().getName()+"进入等待");
}
System.out.println(Thread.currentThread().getName()+"执行");
//设置t2为可运行
t2Run = true;
//通知t2运行
condition2.signal();
}catch (Exception e){
e.printStackTrace();
}finally {
//t1释放锁
lock.unlock();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
//t2获取锁
lock.lock();
try {
//是否可运行状态
if (!t2Run){
//不可运行状态,进入等待
condition2.await();
System.out.println(Thread.currentThread().getName()+"进入等待");
}
System.out.println(Thread.currentThread().getName()+"执行");
}catch (Exception e){
e.printStackTrace();
}finally {
//t2释放锁
lock.unlock();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
}
ountDownLatch 的作用是:当一个线程需要另外一个或多个线程完成后,再开始执行。比如主线程要等待一个子线程完成环境相关配置的加载工作,主线程才继续执行,就可以利用 CountDownLatch 来实现。
用到的方法:
public class ThreadSequence {
volatile static Thread t0,t1,t2;
/**
* 计数器1 用于T0线程通知T1线程
* 计数器2 用于T1线程通知T2线程
* 注意:这里个数都设置成立1 ,当T0执行完成后,执行countDown,来通知T1线程
*/
static CountDownLatch countDownLatch1 = new CountDownLatch(1);
static CountDownLatch countDownLatch2 = new CountDownLatch(1);
public static void main(String[] args) {
useCountDownLatch();
t2.start();
t1.start();
t0.start();
}
private static void useCountDownLatch(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"执行");
//唤醒阻塞线程t1
countDownLatch1.countDown();
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch1.await();
System.out.println(Thread.currentThread().getName()+"进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行");
//唤醒阻塞线程t2
countDownLatch2.countDown();
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
//t2等待
countDownLatch2.await();
System.out.println(Thread.currentThread().getName()+"进入等待");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行");
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
}
Semaphore计数信号量,常用于限制可以访问某些资源(物理或逻辑的)线程数目。
用到的方法:
Semaphore(int permits): 构造方法,permits就是允许同时运行的线程数目
public Semaphore(int permits,boolean fair): permits就是允许同时运行的线程数目 ,fair 是否为公平锁,如果是公平锁,那么获得锁的顺序与线程启动顺序有关
void acquire(): 从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
tryAcquire(): 尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
release() : 释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
【t0->t1->t2】
public class ThreadSequence {
volatile static Thread t0,t1,t2;
static Semaphore semaphore1 = new Semaphore(0);
static Semaphore semaphore2 = new Semaphore(0);
public static void main(String[] args) {
useSemaphore();
t1.start();
t0.start();
t2.start();
}
private static void useSemaphore(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
try {
//释放资源,semaphore1加1
semaphore1.release();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
//获取资源,semaphore1减1,为0时进入阻塞
semaphore1.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//释放资源,semaphore2加1
semaphore2.release();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
//获取资源,semaphore2减1 为0时进入阻塞
semaphore2.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
}
使用newSingleThreadExecutor线程池,由于核心线程数只有一个,所以只能按提交顺序执行。【t0->t1->t2】
public class ThreadSequence {
volatile static Thread t0,t1,t2;
//创建单例的线程池
static ExecutorService executorService = Executors.newSingleThreadExecutor();
public static void main(String[] args) {
//按指定顺序提交
useSingleThreadExecutor();
executorService.submit(t0);
executorService.submit(t1);
executorService.submit(t2);
//销毁
executorService.shutdown();
}
//使用单例线程池-newSingleThreadExecutor
private static void useSingleThreadExecutor(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"0执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"1执行完成");
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"2执行完成");
}
});
}
}
public class ThreadSequence {
volatile static Thread t0,t1,t2;
/**
* 设置n-1个等待标志
* 设置n-1个线程互相等待,直到到达同一个同步点,再继续一起执行。T0不执行完,T1就永远不会执行
*/
static CyclicBarrier cyclicBarrier1 = new CyclicBarrier(2);
static CyclicBarrier cyclicBarrier2 = new CyclicBarrier(2);
public static void main(String[] args) {
useCyclicBarrier();
t1.start();
t0.start();
t2.start();
}
//使用Cyclicbarrier
private static void useCyclicBarrier(){
t0 = new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier1.await();
System.out.println(Thread.currentThread().getName()+"到达目的1,进入等待");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行");
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier1.await();
System.out.println(Thread.currentThread().getName() + "到达目的1,进入等待");
System.out.println(Thread.currentThread().getName() + "执行");
System.out.println(Thread.currentThread().getName() + "执行完成");
cyclicBarrier2.await();
System.out.println(Thread.currentThread().getName() + "到达目的2,进入等待");
} catch (Exception e) {
e.printStackTrace();
}
}
});
t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
cyclicBarrier2.await();
System.out.println(Thread.currentThread().getName()+"到达目的2,进入等待");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"执行");
System.out.println(Thread.currentThread().getName()+"执行完成");
}
});
}
}