JDK
中提供了一组实现延迟队列的API
,位于Java.util.concurrent
包下DelayQueue
。
DelayQueue
是一个BlockingQueue
(无界阻塞)队列,它本质就是封装了一个PriorityQueue
(优先队列),PriorityQueue
内部使用完全二叉堆(来实现队列元素排序,我们在向DelayQueue
队列中添加元素时,会给元素一个Delay
(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay
时间才允许从队列中取出。
DelayDemo.java
public class DelayDemo implements Delayed {
/**
* 延迟时间
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private Long time;
/**
* 任务名称
*/
private String name;
public DelayDemo(Long time, String name, TimeUnit unit) {
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
this.name = name;
}
/**
* 获取延迟时间
*
* @param unit unit
* @return long
*/
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
/**
* 对延迟队列中的元素进行排序
*
* @param o
* @return int
*/
@Override
public int compareTo(Delayed o) {
DelayDemo delayDemo = (DelayDemo) o;
return time.compareTo(delayDemo.getTime());
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
DelayQueueTest.java
public class DelayQueueTest {
private static final Logger logger = LoggerFactory.getLogger(DelayQueueTest.class);
@Test
public void test() throws InterruptedException {
DelayDemo delayDemo1 = new DelayDemo(5l, "订单1", TimeUnit.SECONDS);
DelayDemo delayDemo2 = new DelayDemo(10l, "订单2", TimeUnit.SECONDS);
DelayDemo delayDemo3 = new DelayDemo(15l, "订单3", TimeUnit.SECONDS);
DelayQueue<DelayDemo> delayQueue = new DelayQueue<>();
delayQueue.add(delayDemo1);
delayQueue.add(delayDemo2);
delayQueue.add(delayDemo3);
logger.info("延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
while (delayQueue.size() != 0) {
DelayDemo delayDemo = delayQueue.poll();
if (Objects.nonNull(delayDemo)) {
logger.info("任务:" + delayDemo.getName() + "被取消, 取消时间: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
}
}
corePoolSize
:核心线程数maxPoolSize
:最大线程数,当阻塞队列满了的时候会使用最大线程数的线程提交任务keepAliveTime
:线程空闲的时候保持活跃的时间ArrayBlockingQueue
:是一个基于数组结构的有界阻塞队列,此队列按 FIFO
(先进先出)原则对元素进行排序LinkedBlockingQueue
一个基于链表结构的阻塞队列,此队列按FIFO
(先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue
。静态工厂方法Executors.newFixedThreadPool()
使用了这个队列SynchronousQueue
:不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
,静态工厂方法Executors.newCachedThreadPool
使用了这个队列。PriorityBlockingQueue
:具有优先级的无限阻塞队列。当提交的任务数大于队列和最大线程数的时候就会触发线程池的拒绝策略。
AbortPolicy
:ThreadPoolExecutor
中默认的拒绝策略就是AbortPolicy
,直接抛出异常。
CallerRunsPolicy
:使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。
DiscardPolicy
:不做任何处理直接抛弃任务。
DiscardOldestPolicy
:先将阻塞队列中进入最早的任务丢弃,再尝试提交任务。
@Test
public void test2() {
// 核心线程数
int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
// 最大线程数
int maxPoolSize = Runtime.getRuntime().availableProcessors() << 1;
// 当线程空闲时,保持活跃的时间 1000 毫秒 1s
int keepAliveTime = 1000;
// 阻塞队列大小
int blockQueueSize = 1000;
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(blockQueueSize),
new ThreadPoolExecutor.AbortPolicy());
threadPool.execute(() -> {
logger.info("自定义线程池的使用...");
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
logger.error("Error occur:", e);
}
}
DoOneCallable.java
public class DoOneCallable implements Callable<Object> {
private static final Logger logger = LoggerFactory.getLogger(DoOneCallable.class);
@Override
public Object call() {
try {
logger.info("test Thread!");
return 4 / 0;
} catch (Exception e) {
throw e;
}
}
}
CallableTest.java
public class CallableTest {
private static final Logger logger = LoggerFactory.getLogger(CallableTest.class);
/**
* Callable 的使用场景:加入子线程中有错误抛出、或者拿到线程执行的结果
*
* @param args args
*/
public static void main(String[] args) {
Callable<Object> callable = new DoOneCallable();
FutureTask<Object> futureTask = new FutureTask<>(callable);
try {
// run 方法启动线程
futureTask.run();
// get() 获得当前线程的执行结果
futureTask.get();
} catch (Exception e) {
logger.error("Error occur:{}", e.getMessage());
}
logger.info("hello!");
}
/**
* callable 可以拿到线程中的异常
*/
@Test
public void test() {
Callable<Object> callable = new DoOneCallable();
try {
callable.call();
} catch (Exception e) {
logger.error("Error occur:{}", e.getMessage());
}
}
}
Runnable
创建线程可以达到共享资源的目的public class CreateThreadByRunnableTest {
private static final Logger logger = LoggerFactory.getLogger(CreateThreadByRunnableTest.class);
private static final int TICKET_NUM = 11;
/**
* 使用 Runnable 的方式创建代码可以达到相同代码公用共同的资源
*/
@Test
public void test() {
int num = 10;
BuyTicketsRunnable buyTicketsRunnable = new BuyTicketsRunnable(num);
for (int i = 0; i < TICKET_NUM; i++) {
Thread thread = new Thread(buyTicketsRunnable);
thread.start();
if (Thread.holdsLock(Thread.currentThread())) {
logger.info("当前线程持有对象监视器!");
}
}
}
class BuyTicketsRunnable implements Runnable {
private final Logger logger = LoggerFactory.getLogger(BuyTicketsRunnable.class);
private int num;
BuyTicketsRunnable(int ticketNum) {
this.num = ticketNum;
}
@Override
public void run() {
synchronized (this) {
if (num > 0) {
num--;
logger.info("Thread {} 买到一张票 还剩:{} 张票", Thread.currentThread().getId(), num);
} else {
logger.info("Thread {} 没有抢到票 还剩:{} 张票", Thread.currentThread().getId(), num);
}
}
}
}
}
Thread
,重写run
方法,Thread
类实现Runnable
接口class Thread implements Runnable
public class CreateThreadByThreadTest {
private static final Logger logger = LoggerFactory.getLogger(CreateThreadByThreadTest.class);
@Test
public void test() {
new TestExtendsThread().start();
}
class TestExtendsThread extends Thread {
@Override
public void run() {
logger.info("我是Test1ExtendsThread的线程!");
}
}
}
ThreadPoolExecutor
的使用,指定对应参数达到一定的效果newFixedThreadPool(n)
:创建一个指定工作线程数量的线程池,如果线程数量达到初始化大小,则将提交的任务保存到池队列中。提高效率节省开销,不会释放空闲资源。public static void userFixThreadPool() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
int index = i;
executor.execute(() -> {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
logger.info("{} {}", Thread.currentThread().getName(), index);
});
}
executor.shutdown();
}
newCachedThreadPool()
:缓存线程池,可以灵活收回空闲线程,若无可回收则创建新的。默认为1分钟。private void useCachedThreadPool(int y) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < y; i++) {
int index = i;
Thread.sleep(4000);
executor.execute(() -> logger.info("{} {}", Thread.currentThread().getName(), index));
}
}
newSingThreadExcutor()
:只创建唯一的工作线程来执行任务,保证线程按照指定的书序执行,保证顺序的执行任务。private void useSingleThreadExecutor() {
// 单线程的线程池底层是创建一个核心线程为1最大线程为1的 threadPool
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
int index = i;
executor.execute(() -> {
try {
Thread.sleep(TWO_THOUSAND);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
logger.info("{} {}", Thread.currentThread().getName(), index);
});
}
executor.shutdown();
}
newScheduledThreadPool(n)
:支持定时及周期性任务执行。public static void useScheduledThreadPool() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
executor.scheduleAtFixedRate(() -> {
long start = System.currentTimeMillis();
logger.info("scheduleAtFixedRate 开始执行时间:{}", DateFormat.getTimeInstance().format(new Date()));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
long end = System.currentTimeMillis();
logger.info("scheduleAtFixedRate 执行花费时间={}m", (end - start) / 1000);
logger.info("scheduleAtFixedRate 执行完成时间:{}", DateFormat.getTimeInstance().format(new Date()));
logger.info("======================================");
}, 1, 5, TimeUnit.SECONDS);
}
CountDownLatch
可以使一个获多个线程等待其他线程各自执行完毕后再执行。CountDownLatch
定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0
之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch
可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。public class CountDownLatchTest {
private static final Logger logger = LoggerFactory.getLogger(CountDownLatch.class);
@Test
public void test() {
int num = 12;
CountDownLatch countDownLatch = new CountDownLatch(num);
try {
for (int i = 0; i < num; i++) {
DemoTask demoTask = new DemoTask(countDownLatch);
new Thread(demoTask).start();
}
countDownLatch.await();
logger.info("主线程开始...");
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
class DemoTask implements Runnable {
private CountDownLatch countDownLatch;
DemoTask(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
// TODO 在这里处理逻辑
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
countDownLatch.countDown();
logger.info("线程计数器的个数为:{}", countDownLatch.getCount());
}
}
}
}
ThreadLocal
为解决多线程程序的并发问题提供了一种新的思路。使用这个工具类可以很简洁地编写出优美的多线程程序,ThreadLocal
并不是一个Thread
,而是Thread
的局部变量。ThreadLocal
的作用是提供线程范围内的局部变量,这种变量在线程的生命周期内起作用。作用:提供一个线程内公共变量,减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度,或者为线程提供一个私有的变量副本,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。public class ThreadLocalTest {
private static final Logger logger = Logger.getLogger(String.valueOf(ThreadLocalTest.class));
/**
* 当使用 ThreadLocal 维护变量时,ThreadLocal 为每个使用该变量的线程提供独立的变量副本,
* 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
* 为了避免重复创建TSO(thread specific object,即与线程相关的变量) 使用 static final 修饰
*/
private static final ThreadLocal<Map<String, String>> THREAD_LOCAL_MAP = new ThreadLocal<>();
@Test
public void test1() {
Map<String, String> map = new HashMap<>();
map.put("methodTest", "张三");
map.put("test2", "李四");
THREAD_LOCAL_MAP.set(map);
getThreadLocalMap();
THREAD_LOCAL_MAP.remove();
}
private void getThreadLocalMap() {
Map<String, String> map = THREAD_LOCAL_MAP.get();
logger.info(String.valueOf(map));
}
}
有且仅有一个抽象方法的接口,一般会出现一个名词叫做“语法糖*”,即使用更加方便而原理不变的代码语法,如Lambda
可以认为是匿名内部类的语法糖。接口可以包含其他的方法(默认、静态、私有)
Lambda
表达方法:具有延迟加载的特性,前提是存在函数式接口,这样在调用时候,先进性条件判断,然后再进行拼接,否则,函数体内的方法不执行,因此节约了时间开销。
CalculateNum.java
@FunctionalInterface
public interface CalculateNum {
/**
* 加法
*
* @param numA numA
* @param numB numB
* @return int
*/
int add(int numA, int numB);
}
FunctionApiTest.java
public class FunctionApiTest {
private static final Logger logger = LoggerFactory.getLogger(FunctionApiTest.class);
private int calculate(CalculateNum calculateNum, int a, int b) {
return calculateNum.add(a, b);
}
@Test
public void test() {
int addResult = calculate((a, b) -> a - b, 1, 2);
int subResult = calculate(Integer::sum, 1, 2);
logger.info(String.valueOf(addResult));
logger.info(String.valueOf(subResult));
}
}
Lock
锁,可以得到和 synchronized
一样的效果,即实现原子性、有序性和可见性。synchronized
,Lock
锁可手动获取锁和释放锁、可中断的获取锁、超时获取锁。Lock
是一个接口,两个直接实现类:ReentrantLock
(重入锁), ReentrantReadWriteLock
(读写锁)。@Test
public void test1() {
Tickets tickets = new Tickets();
new Thread(tickets, "1号窗口").start();
new Thread(tickets, "2号窗口").start();
new Thread(tickets, "3号窗口").start();
}
static class Tickets implements Runnable {
private final Logger logger = LoggerFactory.getLogger(Tickets.class);
private int tickets = 100;
private final Lock lock = new ReentrantLock();
@Override
public void run() {
while (true) {
/*上Lock锁*/
lock.lock();
try {
Thread.sleep(4000);
if (tickets > 0) {
logger.info(Thread.currentThread().getName() + "======" + "完成售票,余票为" + --tickets);
}
} catch (Exception e) {
logger.error("Error Occur:{}", e.getMessage(), e);
} finally {
/*释放Lock锁避免发生死锁*/
lock.unlock();
}
}
}
}
synchronized
用的锁是存在Java
对象头里的。JVM
基于进入和退出Monitor
对象来实现方法同步和代码块同步。代码块同步是使用monitorenter
和monitorexit
指令实现的,monitorenter
指令是在编译后插入到同步代码块的开始位置,而monitorexit
是插入到方法结束处和异常处。任何对象都有一个monitor
与之关联,当且一个monitor
被持有后,它将处于锁定状态。monitorenter
指令时,首先要去尝试获取对象的锁,如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1
;相应地,在执行monitorexit
指令时会将锁计数器减1
,当计数器被减到0
时,锁就释放了。如果获取对象锁失败了,那当前线程就要阻塞等待,直到对象锁被另一个线程释放为止。@Test
public void test1() {
ThreadTest t1 = new ThreadTest("线程1");
ThreadTest t2 = new ThreadTest("线程2");
ThreadTest t3 = new ThreadTest("线程3");
new Thread(t1).start();
new Thread(t2).start();
new Thread(t3).start();
}
static class ThreadTest implements Runnable {
private final Logger logger = LoggerFactory.getLogger(SynchronizedTest.class);
private int ticket = 20;
private final String threadName;
private final Object key = "";
public ThreadTest(String name) {
this.threadName = name;
}
@Override
public void run() {
while (ticket > 0) {
synchronized (key) {
logger.info(threadName + "售出" + ticket + "张票");
ticket--;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.info("Error Occur:{}", e.getCause());
}
}
}
用来替代传统的Object
的wait()
、notify()
实现线程间的协作,相比使用Object
的wait()
、notify()
,使用Condition
的await()
、signal()
这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition
,阻塞队列实际上是使用了Condition
来模拟线程间协作。
Condition
是个接口,基本的方法就是await()
和signal()
方法;
Condition
依赖于Lock
接口,生成一个Condition
的基本代码是lock.newCondition()
调用Condition
的await()
和signal()
方法,都必须在lock
保护之内,就是说必须在lock.lock()
和lock.unlock
之间才可以使用
@Test
public void test1() throws InterruptedException {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
try {
lock.tryLock();
logger.info("线程:{} 等待信号", Thread.currentThread().getId());
condition.await();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
} finally {
logger.info("线程:{} 得到信号", Thread.currentThread().getId());
lock.unlock();
}
}).start();
TimeUnit.MILLISECONDS.sleep(10);
new Thread(() -> {
lock.tryLock();
logger.info("线程:{} 拿到锁", Thread.currentThread().getId());
condition.signal();
logger.info("线程:{} 发出信号", Thread.currentThread().getId());
lock.unlock();
}).start();
TimeUnit.SECONDS.sleep(2);
}
Condition
是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition
),只有当该条件具备( signal
或者 signalAll
方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。是一种计数器,用来保护一个或者多个共享资源的访问。如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于0
,信号量减1
,然后允许共享这个资源;否则,如果信号量的计数器等于0
,信号量将会把线程置入休眠直至计数器大于0
.当信号量使用完时,必须释放。
可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1
,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1
,当显示屏上的剩余车位为0
时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。
使用示例
public class SemaphoreTest {
private static final Logger logger = LoggerFactory.getLogger(SemaphoreTest.class);
private static final Semaphore semaphore1 = new Semaphore(0);
private static final Semaphore semaphore2 = new Semaphore(0);
class One extends Thread {
@Override
public void run() {
logger.info("=====》One线程执行完成...");
semaphore1.release();
}
}
class Two extends Thread {
@Override
public void run() {
try {
semaphore1.acquire();
logger.info("=====》Two线程执行完成...");
semaphore2.release();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
class Three extends Thread {
@Override
public void run() {
try {
semaphore2.acquire();
logger.info("======》Three线程执行完成...");
semaphore2.release();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
@Test
public void test1() throws InterruptedException {
Thread one = new One();
one.start();
Thread two = new Two();
two.start();
Thread three = new Three();
three.start();
Thread.sleep(5000);
logger.info("=====>三个子线程结束...");
}
}
CyclicBarrier
也叫同步屏障,在JDK1.5
被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,被阻塞的线程才能继续执行。
CyclicBarrier
好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。
示例代码
private CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
@Test
public void test2() {
List<Athlete> athleteList = new ArrayList<>();
athleteList.add(new Athlete(cyclicBarrier, "博尔特"));
athleteList.add(new Athlete(cyclicBarrier, "鲍威尔"));
athleteList.add(new Athlete(cyclicBarrier, "盖伊"));
Executor executor = Executors.newFixedThreadPool(8);
for (Athlete athlete : athleteList) {
executor.execute(athlete);
}
logger.info("所有运动员就位了...");
}
class Athlete implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;
public Athlete(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}
@Override
public void run() {
logger.info("运动员 {} 就位", name);
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
logger.error(e.getMessage(), e);
}
}
}
Object wait()
方法让当前线程进入等待状态。直到其他线程调用此对象的 notify()
或 notifyAll()
public class ObjectWaitTest {
private static final Logger logger = LoggerFactory.getLogger(ObjectWaitTest.class);
private final Object objectOne = new Object();
private final Object objectTwo = new Object();
/**
* 子线程是否运行完成的标志
*/
private static boolean ONE_RUN_FLAG = true;
private static boolean TWO_RUN_FLAG = true;
class One extends Thread {
@Override
public void run() {
synchronized (objectOne) {
logger.info("=====》 我是One线程");
ONE_RUN_FLAG = false;
objectOne.notify();
logger.info("=====》 One线程执行完成...");
}
}
}
class Two extends Thread {
@Override
public void run() {
synchronized (objectOne) {
try {
if (ONE_RUN_FLAG) {
logger.info("=====> 线程One没有执行完成,线程二等待中...");
objectOne.wait();
}
synchronized (objectTwo) {
logger.info("=====》 我是Two线程");
objectTwo.notify();
TWO_RUN_FLAG = false;
logger.info("=====》Two线程执行完成...");
}
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}
}
class Three extends Thread {
@Override
public void run() {
synchronized (objectTwo) {
if (TWO_RUN_FLAG) {
try {
logger.info("=====> 线程Two没有执行完成,线程三等待中...");
objectTwo.wait();
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
logger.info("=====》 我是Three线程");
logger.info("=====》 Three线程执行完成...");
}
}
}
@Test
public void test1() {
Thread one = new One();
one.start();
Thread two = new Two();
two.start();
Thread three = new Three();
three.start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
logger.info("=====>三个子线程结束...");
}
}
ReentrantLock
是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock
锁在同一个时间点只能被一个线程锁持有;可重入表示,ReentrantLock
锁可以被同一个线程多次获取。ReentraantLock
是通过一个FIFO
的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。public class ReentrantLockTest {
private static final Logger logger = LoggerFactory.getLogger(ReentrantLock.class);
@Test
public void test1() throws InterruptedException {
// 创建锁对象
ReentrantLock lock = new ReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(new ReentrantLockThread(lock), "线程" + i).start();
}
Thread.sleep(15000);
}
static class ReentrantLockThread implements Runnable {
private final ReentrantLock lock;
public ReentrantLockThread(ReentrantLock lock) {
this.lock = lock;
}
@Override
public void run() {
try {
// 获取锁
lock.lock();
logger.info("lock threadName {}", Thread.currentThread().getName());
Thread.sleep(5000);
logger.info("unlock threadName {}", Thread.currentThread().getName());
// 释放锁
lock.unlock();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
}
在Java
语言中,++i
和i++
操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized
关键字。而AtomicInteger
则通过一种线程安全的加减操作接口。
示例代码
public class AtomicIntegerTest {
private static final Logger logger = LoggerFactory.getLogger(AtomicIntegerTest.class);
@Test
public void test1() throws InterruptedException {
AtomicInteger num = new AtomicInteger(0);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
for (int j = 0; j < 10; j++) {
dealMethodOne(num);
}
}).start();
}
Thread.sleep(3000);
logger.info(String.valueOf(num.get()));
}
private void dealMethodOne(AtomicInteger num) {
int i = num.incrementAndGet();
logger.info("Current thread {} i value {}", Thread.currentThread().getName(), i);
}
}
CPU
public class IoTest {
private static final Logger logger = LoggerFactory.getLogger(IoTest.class);
private static final String FILE_NAME = "../JavaTest/file/filecopy.txt";
private static final String OUT_PUT_FILE_NAME = "../JavaTest/file/fileoutput.txt";
private static final int BUFFER_SIZE = 1024;
private void createFile() throws IOException {
File file1 = new File(FILE_NAME);
File file2 = new File(OUT_PUT_FILE_NAME);
if (!file1.exists()) {
file1.createNewFile();
}
if (!file2.exists()) {
file2.createNewFile();
}
}
/**
* Io 文件拷贝
*/
@Test
public void test1() throws IOException {
createFile();
long start = System.currentTimeMillis();
try (InputStream inputStream = new FileInputStream(FILE_NAME)) {
OutputStream outputStream = new FileOutputStream(new File(OUT_PUT_FILE_NAME));
byte[] buffer = new byte[BUFFER_SIZE];
int len;
while ((len = inputStream.read(buffer)) > 0) {
outputStream.write(buffer, 0, len);
}
logger.info("IO file copy cost {} msc", System.currentTimeMillis() - start);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
public void test2() {
createFile();
long start = System.currentTimeMillis();
try (FileChannel inputChannel = new FileInputStream(FILE_NAME).getChannel();
FileChannel outputChannel = new FileOutputStream(OUT_PUT_FILE_NAME).getChannel()) {
outputChannel.transferFrom(inputChannel, 0, inputChannel.size());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
logger.info("IO file copy cost {} msc", System.currentTimeMillis() - start);
}
IO
拷贝的效率。加载内存映射文件所使用的内存在Java堆区之外public void test3() {
createFile();
long start = System.currentTimeMillis();
try (FileInputStream fileInputStream = new FileInputStream(FILE_NAME);
FileOutputStream fileOutputStream = new FileOutputStream(OUT_PUT_FILE_NAME)) {
FileChannel inputStreamChannel = fileInputStream.getChannel();
FileChannel outputStreamChannel = fileOutputStream.getChannel();
MappedByteBuffer mappedByteBuffer = inputStreamChannel.map(FileChannel.MapMode.READ_ONLY, 0, inputStreamChannel.size());
outputStreamChannel.write(mappedByteBuffer);
} catch (IOException e) {
logger.error(e.getMessage(), e);
}
logger.info("IO file copy cost {} msc", System.currentTimeMillis() - start);
}