• # Java 常用代码片段


    Java 常用代码片段

    延迟队列

    • JDK 中提供了一组实现延迟队列的API,位于Java.util.concurrent包下DelayQueue
      DelayQueue是一个BlockingQueue(无界阻塞)队列,它本质就是封装了一个PriorityQueue(优先队列),PriorityQueue内部使用完全二叉堆(来实现队列元素排序,我们在向DelayQueue队列中添加元素时,会给元素一个Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了Delay时间才允许从队列中取出。

    • DelayDemo.java

    public class DelayDemo implements Delayed {
    
        /**
         * 延迟时间
         */
        @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
        private Long time;
    
        /**
         * 任务名称
         */
        private String name;
    
    
        public DelayDemo(Long time, String name, TimeUnit unit) {
            this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
            this.name = name;
        }
    
        /**
         * 获取延迟时间
         *
         * @param unit unit
         * @return long
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return time - System.currentTimeMillis();
        }
    
        /**
         * 对延迟队列中的元素进行排序
         *
         * @param o
         * @return int
         */
        @Override
        public int compareTo(Delayed o) {
            DelayDemo delayDemo = (DelayDemo) o;
            return time.compareTo(delayDemo.getTime());
        }
    
        public Long getTime() {
            return time;
        }
    
        public void setTime(Long time) {
            this.time = time;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • DelayQueueTest.java
    public class DelayQueueTest {
    
        private static final Logger logger = LoggerFactory.getLogger(DelayQueueTest.class);
    
        @Test
        public void test() throws InterruptedException {
            DelayDemo delayDemo1 = new DelayDemo(5l, "订单1", TimeUnit.SECONDS);
            DelayDemo delayDemo2 = new DelayDemo(10l, "订单2", TimeUnit.SECONDS);
            DelayDemo delayDemo3 = new DelayDemo(15l, "订单3", TimeUnit.SECONDS);
    
            DelayQueue<DelayDemo> delayQueue = new DelayQueue<>();
            delayQueue.add(delayDemo1);
            delayQueue.add(delayDemo2);
            delayQueue.add(delayDemo3);
    
            logger.info("延迟队列开始时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            while (delayQueue.size() != 0) {
                DelayDemo delayDemo = delayQueue.poll();
                if (Objects.nonNull(delayDemo)) {
                    logger.info("任务:" + delayDemo.getName() + "被取消, 取消时间: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                }
                Thread.sleep(1000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    线程池 ThreadPoolExecutor

    参数说明

    • corePoolSize:核心线程数
    • maxPoolSize:最大线程数,当阻塞队列满了的时候会使用最大线程数的线程提交任务
    • keepAliveTime:线程空闲的时候保持活跃的时间

    队列

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序
    • LinkedBlockingQueue一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
    • SynchronousQueue:不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:具有优先级的无限阻塞队列。

    拒绝策略

    • 当提交的任务数大于队列和最大线程数的时候就会触发线程池的拒绝策略。

    • AbortPolicyThreadPoolExecutor中默认的拒绝策略就是AbortPolicy,直接抛出异常。

    • CallerRunsPolicy :使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。

    • DiscardPolicy :不做任何处理直接抛弃任务。

    • DiscardOldestPolicy :先将阻塞队列中进入最早的任务丢弃,再尝试提交任务。

    示例代码

    @Test
    public void test2() {
        // 核心线程数
        int corePoolSize = Runtime.getRuntime().availableProcessors() + 1;
        // 最大线程数
        int maxPoolSize = Runtime.getRuntime().availableProcessors() << 1;
        // 当线程空闲时,保持活跃的时间 1000 毫秒 1s
        int keepAliveTime = 1000;
        // 阻塞队列大小
        int blockQueueSize = 1000;
    
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize,
                                                               maxPoolSize,
                                                               keepAliveTime,
                                                               TimeUnit.MILLISECONDS,
                                                               new LinkedBlockingQueue<>(blockQueueSize),
                                                               new ThreadPoolExecutor.AbortPolicy());
    
        threadPool.execute(() -> {
            logger.info("自定义线程池的使用...");
        });
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            logger.error("Error occur:", e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    创建线程

    获取线程执行结果(Callable)

    • 可以拿到线程中抛出的异常。
    • 线程执行完成后可以返回结果。
    示例
    • DoOneCallable.java
    public class DoOneCallable implements Callable<Object> {
    
        private static final Logger logger = LoggerFactory.getLogger(DoOneCallable.class);
    
        @Override
        public Object call() {
            try {
                logger.info("test Thread!");
                return 4 / 0;
            } catch (Exception e) {
                throw e;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • CallableTest.java
    public class CallableTest {
    
        private static final Logger logger = LoggerFactory.getLogger(CallableTest.class);
    
        /**
         * Callable 的使用场景:加入子线程中有错误抛出、或者拿到线程执行的结果
         *
         * @param args args
         */
        public static void main(String[] args) {
            Callable<Object> callable = new DoOneCallable();
            FutureTask<Object> futureTask = new FutureTask<>(callable);
            try {
                // run 方法启动线程
                futureTask.run();
                // get() 获得当前线程的执行结果
                futureTask.get();
            } catch (Exception e) {
                logger.error("Error occur:{}", e.getMessage());
            }
            logger.info("hello!");
        }
    
        /**
         * callable 可以拿到线程中的异常
         */
        @Test
        public void test() {
            Callable<Object> callable = new DoOneCallable();
            try {
                callable.call();
            } catch (Exception e) {
                logger.error("Error occur:{}", e.getMessage());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    实现Runnable

    • 使用Runnable创建线程可以达到共享资源的目的
    • 外部无法捕获线程中的异常和结果
    示例
    public class CreateThreadByRunnableTest {
    
        private static final Logger logger = LoggerFactory.getLogger(CreateThreadByRunnableTest.class);
        private static final int TICKET_NUM = 11;
    
        /**
         * 使用 Runnable 的方式创建代码可以达到相同代码公用共同的资源
         */
        @Test
        public void test() {
            int num = 10;
            BuyTicketsRunnable buyTicketsRunnable = new BuyTicketsRunnable(num);
            for (int i = 0; i < TICKET_NUM; i++) {
                Thread thread = new Thread(buyTicketsRunnable);
                thread.start();
                if (Thread.holdsLock(Thread.currentThread())) {
                    logger.info("当前线程持有对象监视器!");
                }
            }
        }
    
        class BuyTicketsRunnable implements Runnable {
    
            private final Logger logger = LoggerFactory.getLogger(BuyTicketsRunnable.class);
    
            private int num;
    
            BuyTicketsRunnable(int ticketNum) {
                this.num = ticketNum;
            }
    
            @Override
            public void run() {
                synchronized (this) {
                    if (num > 0) {
                        num--;
                        logger.info("Thread {} 买到一张票 还剩:{} 张票", Thread.currentThread().getId(), num);
                    } else {
                        logger.info("Thread {} 没有抢到票 还剩:{} 张票", Thread.currentThread().getId(), num);
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    继承Thread重写run()

    • 继承Thread,重写run方法,Thread类实现Runnable接口
    • class Thread implements Runnable
    示例代码
    public class CreateThreadByThreadTest {
    
        private static final Logger logger = LoggerFactory.getLogger(CreateThreadByThreadTest.class);
    
        @Test
        public void test() {
            new TestExtendsThread().start();
        }
    
        class TestExtendsThread extends Thread {
            @Override
            public void run() {
                logger.info("我是Test1ExtendsThread的线程!");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    Executors四种类型的线程池

    • 通过源码分析得出还是对ThreadPoolExecutor的使用,指定对应参数达到一定的效果

    newFixedThreadPool

    • newFixedThreadPool(n):创建一个指定工作线程数量的线程池,如果线程数量达到初始化大小,则将提交的任务保存到池队列中。提高效率节省开销,不会释放空闲资源。
    public static void userFixThreadPool() throws InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            int index = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(2 * 1000);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
                logger.info("{}  {}", Thread.currentThread().getName(), index);
            });
        }
        executor.shutdown();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    newCachedThreadPool

    • newCachedThreadPool():缓存线程池,可以灵活收回空闲线程,若无可回收则创建新的。默认为1分钟。
    private void useCachedThreadPool(int y) throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < y; i++) {
            int index = i;
            Thread.sleep(4000);
            executor.execute(() -> logger.info("{}  {}", Thread.currentThread().getName(), index));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    newSingThreadExcutor

    • newSingThreadExcutor():只创建唯一的工作线程来执行任务,保证线程按照指定的书序执行,保证顺序的执行任务。
    private void useSingleThreadExecutor() {
        // 单线程的线程池底层是创建一个核心线程为1最大线程为1的 threadPool
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            int index = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(TWO_THOUSAND);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                    Thread.currentThread().interrupt();
                }
                logger.info("{}   {}", Thread.currentThread().getName(), index);
            });
        }
        executor.shutdown();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    newScheduledThreadPool

    • newScheduledThreadPool(n):支持定时及周期性任务执行。
    public static void useScheduledThreadPool() {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);
        executor.scheduleAtFixedRate(() -> {
            long start = System.currentTimeMillis();
            logger.info("scheduleAtFixedRate 开始执行时间:{}", DateFormat.getTimeInstance().format(new Date()));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            long end = System.currentTimeMillis();
            logger.info("scheduleAtFixedRate 执行花费时间={}m", (end - start) / 1000);
            logger.info("scheduleAtFixedRate 执行完成时间:{}", DateFormat.getTimeInstance().format(new Date()));
            logger.info("======================================");
        }, 1, 5, TimeUnit.SECONDS);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    线程计数器CountDownLatch

    • CountDownLatch可以使一个获多个线程等待其他线程各自执行完毕后再执行。
    • CountDownLatch 定义了一个计数器,和一个阻塞队列, 当计数器的值递减为0之前,阻塞队列里面的线程处于挂起状态,当计数器递减到0时会唤醒阻塞队列所有线程,这里的计数器是一个标志,可以表示一个任务一个线程,也可以表示一个倒计时器,CountDownLatch可以解决那些一个或者多个线程在执行之前必须依赖于某些必要的前提业务先执行的场景。
    public class CountDownLatchTest {
    
        private static final Logger logger = LoggerFactory.getLogger(CountDownLatch.class);
    
        @Test
        public void test() {
            int num = 12;
            CountDownLatch countDownLatch = new CountDownLatch(num);
            try {
                for (int i = 0; i < num; i++) {
                    DemoTask demoTask = new DemoTask(countDownLatch);
                    new Thread(demoTask).start();
                }
                countDownLatch.await();
                logger.info("主线程开始...");
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
                Thread.currentThread().interrupt();
            }
        }
    
        class DemoTask implements Runnable {
    
            private CountDownLatch countDownLatch;
    
            DemoTask(CountDownLatch countDownLatch) {
                this.countDownLatch = countDownLatch;
            }
    
            @Override
            public void run() {
                try {
                    // TODO 在这里处理逻辑
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
                    countDownLatch.countDown();
                    logger.info("线程计数器的个数为:{}", countDownLatch.getCount());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    ThreadLocal 使用

    • ThreadLocal为解决多线程程序的并发问题提供了一种新的思路。使用这个工具类可以很简洁地编写出优美的多线程程序,ThreadLocal并不是一个Thread,而是Thread的局部变量。ThreadLocal的作用是提供线程范围内的局部变量,这种变量在线程的生命周期内起作用。作用:提供一个线程内公共变量,减少同一个线程内多个函数或者组件之间一些公共变量的传递的复杂度,或者为线程提供一个私有的变量副本,这样每一个线程都可以随意修改自己的变量副本,而不会对其他线程产生影响。
    public class ThreadLocalTest {
    
        private static final Logger logger = Logger.getLogger(String.valueOf(ThreadLocalTest.class));
    
        /**
         * 当使用 ThreadLocal 维护变量时,ThreadLocal 为每个使用该变量的线程提供独立的变量副本,
         * 所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
         * 为了避免重复创建TSO(thread specific object,即与线程相关的变量) 使用 static final 修饰
         */
        private static final ThreadLocal<Map<String, String>> THREAD_LOCAL_MAP = new ThreadLocal<>();
    
        @Test
        public void test1() {
            Map<String, String> map = new HashMap<>();
            map.put("methodTest", "张三");
            map.put("test2", "李四");
            THREAD_LOCAL_MAP.set(map);
            getThreadLocalMap();
            THREAD_LOCAL_MAP.remove();
        }
    
        private void getThreadLocalMap() {
            Map<String, String> map = THREAD_LOCAL_MAP.get();
            logger.info(String.valueOf(map));
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    函数式接口

    • 有且仅有一个抽象方法的接口,一般会出现一个名词叫做“语法糖*”,即使用更加方便而原理不变的代码语法,如Lambda可以认为是匿名内部类的语法糖。接口可以包含其他的方法(默认、静态、私有)

    • Lambda表达方法:具有延迟加载的特性,前提是存在函数式接口,这样在调用时候,先进性条件判断,然后再进行拼接,否则,函数体内的方法不执行,因此节约了时间开销。

    • CalculateNum.java

    @FunctionalInterface
    public interface CalculateNum {
    
        /**
         * 加法
         *
         * @param numA numA
         * @param numB numB
         * @return int
         */
        int add(int numA, int numB);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • FunctionApiTest.java
    public class FunctionApiTest {
    
        private static final Logger logger = LoggerFactory.getLogger(FunctionApiTest.class);
    
        private int calculate(CalculateNum calculateNum, int a, int b) {
            return calculateNum.add(a, b);
        }
    
        @Test
        public void test() {
            int addResult = calculate((a, b) -> a - b, 1, 2);
            int subResult = calculate(Integer::sum, 1, 2);
            logger.info(String.valueOf(addResult));
            logger.info(String.valueOf(subResult));
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    Lock 的使用

    • Lock锁,可以得到和 synchronized一样的效果,即实现原子性、有序性和可见性。
    • 相较于synchronizedLock锁可手动获取锁和释放锁、可中断的获取锁、超时获取锁。
    • Lock 是一个接口,两个直接实现类:ReentrantLock(重入锁), ReentrantReadWriteLock(读写锁)。
    @Test
    public void test1() {
        Tickets tickets = new Tickets();
        new Thread(tickets, "1号窗口").start();
        new Thread(tickets, "2号窗口").start();
        new Thread(tickets, "3号窗口").start();
    }
    
    static class Tickets implements Runnable {
    
        private final Logger logger = LoggerFactory.getLogger(Tickets.class);
        private int tickets = 100;
        private final Lock lock = new ReentrantLock();
    
        @Override
        public void run() {
            while (true) {
                /*上Lock锁*/
                lock.lock();
                try {
                    Thread.sleep(4000);
                    if (tickets > 0) {
                        logger.info(Thread.currentThread().getName() + "======" + "完成售票,余票为" + --tickets);
                    }
                } catch (Exception e) {
                    logger.error("Error Occur:{}", e.getMessage(), e);
                } finally {
                    /*释放Lock锁避免发生死锁*/
                    lock.unlock();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    Synchronized使用

    • synchronized用的锁是存在Java对象头里的。
    • JVM基于进入和退出Monitor对象来实现方法同步和代码块同步。代码块同步是使用monitorentermonitorexit指令实现的,monitorenter指令是在编译后插入到同步代码块的开始位置,而monitorexit是插入到方法结束处和异常处。任何对象都有一个monitor与之关联,当且一个monitor被持有后,它将处于锁定状态。
    • 根据虚拟机规范的要求,在执行monitorenter指令时,首先要去尝试获取对象的锁,如果这个对象没被锁定,或者当前线程已经拥有了那个对象的锁,把锁的计数器加1;相应地,在执行monitorexit指令时会将锁计数器减1,当计数器被减到0时,锁就释放了。如果获取对象锁失败了,那当前线程就要阻塞等待,直到对象锁被另一个线程释放为止。
    @Test
    public void test1() {
        ThreadTest t1 = new ThreadTest("线程1");
        ThreadTest t2 = new ThreadTest("线程2");
        ThreadTest t3 = new ThreadTest("线程3");
        new Thread(t1).start();
        new Thread(t2).start();
        new Thread(t3).start();
    }
    
    static class ThreadTest implements Runnable {
    
        private final Logger logger = LoggerFactory.getLogger(SynchronizedTest.class);
        private int ticket = 20;
        private final String threadName;
        private final Object key = "";
    
        public ThreadTest(String name) {
            this.threadName = name;
        }
    
        @Override
        public void run() {
            while (ticket > 0) {
                synchronized (key) {
                    logger.info(threadName + "售出" + ticket + "张票");
                    ticket--;
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    logger.info("Error Occur:{}", e.getCause());
                }
            
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    Condition

    • 用来替代传统的Objectwait()notify()实现线程间的协作,相比使用Objectwait()notify(),使用Conditionawait()signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,阻塞队列实际上是使用了Condition来模拟线程间协作。

    • Condition是个接口,基本的方法就是await()signal()方法;

    • Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()

    • 调用Conditionawait()signal()方法,都必须在lock保护之内,就是说必须在lock.lock()lock.unlock之间才可以使用

    @Test
    public void test1() throws InterruptedException {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        new Thread(() -> {
            try {
                lock.tryLock();
                logger.info("线程:{} 等待信号", Thread.currentThread().getId());
                condition.await();
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            } finally {
                logger.info("线程:{} 得到信号", Thread.currentThread().getId());
                lock.unlock();
            }
        }).start();
        TimeUnit.MILLISECONDS.sleep(10);
        new Thread(() -> {
            lock.tryLock();
            logger.info("线程:{} 拿到锁", Thread.currentThread().getId());
            condition.signal();
            logger.info("线程:{} 发出信号", Thread.currentThread().getId());
            lock.unlock();
        }).start();
        TimeUnit.SECONDS.sleep(2);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。

    信号量(Semaphore)

    • 是一种计数器,用来保护一个或者多个共享资源的访问。如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于0,信号量减1,然后允许共享这个资源;否则,如果信号量的计数器等于0,信号量将会把线程置入休眠直至计数器大于0.当信号量使用完时,必须释放。

    • 可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止。

    • 使用示例

    public class SemaphoreTest {
    
        private static final Logger logger = LoggerFactory.getLogger(SemaphoreTest.class);
    
        private static final Semaphore semaphore1 = new Semaphore(0);
        private static final Semaphore semaphore2 = new Semaphore(0);
    
        class One extends Thread {
            @Override
            public void run() {
                logger.info("=====》One线程执行完成...");
                semaphore1.release();
            }
        }
    
        class Two extends Thread {
            @Override
            public void run() {
                try {
                    semaphore1.acquire();
                    logger.info("=====》Two线程执行完成...");
                    semaphore2.release();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    
        class Three extends Thread {
            @Override
            public void run() {
                try {
                    semaphore2.acquire();
                    logger.info("======》Three线程执行完成...");
                    semaphore2.release();
                } catch (InterruptedException e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    
        @Test
        public void test1() throws InterruptedException {
            Thread one = new One();
            one.start();
            Thread two = new Two();
            two.start();
            Thread three = new Three();
            three.start();
            Thread.sleep(5000);
            logger.info("=====>三个子线程结束...");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    同步屏障(CyclicBarrier)

    • CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,被阻塞的线程才能继续执行。

    • CyclicBarrier好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。

    • 示例代码

    private CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
    
    @Test
    public void test2() {
        List<Athlete> athleteList = new ArrayList<>();
        athleteList.add(new Athlete(cyclicBarrier, "博尔特"));
        athleteList.add(new Athlete(cyclicBarrier, "鲍威尔"));
        athleteList.add(new Athlete(cyclicBarrier, "盖伊"));
        Executor executor = Executors.newFixedThreadPool(8);
        for (Athlete athlete : athleteList) {
            executor.execute(athlete);
        }
        logger.info("所有运动员就位了...");
    }
    
    class Athlete implements Runnable {
    
        private CyclicBarrier cyclicBarrier;
        private String name;
    
        public Athlete(CyclicBarrier cyclicBarrier, String name) {
            this.cyclicBarrier = cyclicBarrier;
            this.name = name;
        }
    
        @Override
        public void run() {
            logger.info("运动员 {} 就位", name);
            try {
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    Object wait()

    • Object wait() 方法让当前线程进入等待状态。直到其他线程调用此对象的 notify()notifyAll()
    • 当前线程必须是此对象的监视器所有者,否则还是会发生 IllegalMonitorStateException 异常。
    • 如果当前线程在等待之前或在等待时被任何线程中断,则会抛出 InterruptedException 异常。
    public class ObjectWaitTest {
    
        private static final Logger logger = LoggerFactory.getLogger(ObjectWaitTest.class);
    
        private final Object objectOne = new Object();
        private final Object objectTwo = new Object();
    
        /**
         * 子线程是否运行完成的标志
         */
        private static boolean ONE_RUN_FLAG = true;
        private static boolean TWO_RUN_FLAG = true;
    
        class One extends Thread {
            @Override
            public void run() {
                synchronized (objectOne) {
                    logger.info("=====》 我是One线程");
                    ONE_RUN_FLAG = false;
                    objectOne.notify();
                    logger.info("=====》 One线程执行完成...");
                }
            }
        }
    
        class Two extends Thread {
            @Override
            public void run() {
                synchronized (objectOne) {
                    try {
                        if (ONE_RUN_FLAG) {
                            logger.info("=====> 线程One没有执行完成,线程二等待中...");
                            objectOne.wait();
                        }
                        synchronized (objectTwo) {
                            logger.info("=====》 我是Two线程");
                            objectTwo.notify();
                            TWO_RUN_FLAG = false;
                            logger.info("=====》Two线程执行完成...");
                        }
                    } catch (InterruptedException e) {
                        logger.error(e.getMessage(), e);
                    }
                }
            }
        }
    
        class Three extends Thread {
            @Override
            public void run() {
                synchronized (objectTwo) {
                    if (TWO_RUN_FLAG) {
                        try {
                            logger.info("=====> 线程Two没有执行完成,线程三等待中...");
                            objectTwo.wait();
                        } catch (InterruptedException e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info("=====》 我是Three线程");
                    logger.info("=====》 Three线程执行完成...");
                }
            }
        }
    
    
        @Test
        public void test1() {
            Thread one = new One();
            one.start();
            Thread two = new Two();
            two.start();
            Thread three = new Three();
            three.start();
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
            logger.info("=====>三个子线程结束...");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82

    独占锁(ReentrantLock)

    • ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。
    • ReentrantLock锁在同一个时间点只能被一个线程锁持有;可重入表示,ReentrantLock锁可以被同一个线程多次获取。
    • ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
    public class ReentrantLockTest {
    
        private static final Logger logger = LoggerFactory.getLogger(ReentrantLock.class);
    
        @Test
        public void test1() throws InterruptedException {
            // 创建锁对象
            ReentrantLock lock = new ReentrantLock();
            for (int i = 0; i < 3; i++) {
                new Thread(new ReentrantLockThread(lock), "线程" + i).start();
            }
            Thread.sleep(15000);
        }
    
        static class ReentrantLockThread implements Runnable {
    
            private final ReentrantLock lock;
    
            public ReentrantLockThread(ReentrantLock lock) {
                this.lock = lock;
            }
    
            @Override
            public void run() {
                try {
                    //  获取锁
                    lock.lock();
                    logger.info("lock threadName {}", Thread.currentThread().getName());
                    Thread.sleep(5000);
                    logger.info("unlock threadName {}", Thread.currentThread().getName());
                    // 释放锁
                    lock.unlock();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    原子操作(AtomicInteger)

    • Java语言中,++ii++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则通过一种线程安全的加减操作接口。

    • 示例代码

    public class AtomicIntegerTest {
    
        private static final Logger logger = LoggerFactory.getLogger(AtomicIntegerTest.class);
    
        @Test
        public void test1() throws InterruptedException {
            AtomicInteger num = new AtomicInteger(0);
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    for (int j = 0; j < 10; j++) {
                        dealMethodOne(num);
                    }
                }).start();
    
            }
            Thread.sleep(3000);
            logger.info(String.valueOf(num.get()));
        }
    
        private void dealMethodOne(AtomicInteger num) {
            int i = num.incrementAndGet();
            logger.info("Current thread {} i value {}", Thread.currentThread().getName(), i);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    IO 拷贝

    • 从磁盘读数据到缓冲区,该过程不依赖CPU
    • 将数据从内核缓冲区拷贝到用户空间缓冲区
    • 用户进程从用户缓冲区读取数据
    public class IoTest {
    
        private static final Logger logger = LoggerFactory.getLogger(IoTest.class);
    
        private static final String FILE_NAME = "../JavaTest/file/filecopy.txt";
        private static final String OUT_PUT_FILE_NAME = "../JavaTest/file/fileoutput.txt";
        private static final int BUFFER_SIZE = 1024;
    
        private void createFile() throws IOException {
            File file1 = new File(FILE_NAME);
            File file2 = new File(OUT_PUT_FILE_NAME);
            if (!file1.exists()) {
                file1.createNewFile();
            }
            if (!file2.exists()) {
                file2.createNewFile();
            }
        }
    
        /**
         * Io 文件拷贝
         */
        @Test
        public void test1() throws IOException {
            createFile();
            long start = System.currentTimeMillis();
            try (InputStream inputStream = new FileInputStream(FILE_NAME)) {
                OutputStream outputStream = new FileOutputStream(new File(OUT_PUT_FILE_NAME));
                byte[] buffer = new byte[BUFFER_SIZE];
                int len;
                while ((len = inputStream.read(buffer)) > 0) {
                    outputStream.write(buffer, 0, len);
                }
                logger.info("IO file copy cost {} msc", System.currentTimeMillis() - start);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    NIO拷贝

    管道方式

    public void test2() {
        createFile();
        long start = System.currentTimeMillis();
        try (FileChannel inputChannel = new FileInputStream(FILE_NAME).getChannel();
             FileChannel outputChannel = new FileOutputStream(OUT_PUT_FILE_NAME).getChannel()) {
            outputChannel.transferFrom(inputChannel, 0, inputChannel.size());
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
        logger.info("IO file copy cost {} msc", System.currentTimeMillis() - start);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    文件内存映射

    • 把内核空间地址与用户空间的虚拟地址映射到同一个物理地址,用户进程直接从内存中读取文件内容,应用只需要和内存打交道,不需要进行缓冲区来回拷贝,大大提高了IO拷贝的效率。加载内存映射文件所使用的内存在Java堆区之外
    public void test3() {
        createFile();
        long start = System.currentTimeMillis();
        try (FileInputStream fileInputStream = new FileInputStream(FILE_NAME);
             FileOutputStream fileOutputStream = new FileOutputStream(OUT_PUT_FILE_NAME)) {
            FileChannel inputStreamChannel = fileInputStream.getChannel();
            FileChannel outputStreamChannel = fileOutputStream.getChannel();
            MappedByteBuffer mappedByteBuffer = inputStreamChannel.map(FileChannel.MapMode.READ_ONLY, 0, inputStreamChannel.size());
            outputStreamChannel.write(mappedByteBuffer);
        } catch (IOException e) {
            logger.error(e.getMessage(), e);
        }
        logger.info("IO file copy cost {} msc", System.currentTimeMillis() - start);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    测试PySpark
    马斯克雷军竞速「机器人」背后,一场机器革命正在发生 | 幂集创新
    HCNP Routing&Switching之DHCP中继
    JavaScript检测窗口是否滚动到底部
    电力系统可视化——比PowerWorld还精彩(Matlab实现)
    第二次修有关路基和隧道的CASIO 5800P 万能曲线计算程序可以正反标
    vue3状态管理工具pinia的插件书写,pinia全局错误处理插件安排
    隐私计算头条周刊(9.4-9.10)
    无痕 PS、读得懂文字,OpenAI 的二代 DALL·E 惊艳亮相
    Java筑基35-反射(框架底层,必须掌握)
  • 原文地址:https://blog.csdn.net/qq_37248504/article/details/126412947