前几天在公司的项目上有个同事使用了多线程统计数据,当时出现了一个用户一直使用服务器首次登录信息作为查询信息。找了半天才发现,线程池资源同步了。后面手动将数据set进去的。
等待线程全部执行完毕,这里使用的是减法计数器,也可而已用加法计数器:CyclicBarrier
package com.quxiao.controller; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; /** * @program: package1 * @author: quxiao * @create: 2023-09-27 15:22 **/ public class t3 { static final ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(4); FutureTask task1 = new FutureTask<>(new t(2, countDownLatch)); FutureTask task2 = new FutureTask<>(new t(3, countDownLatch)); FutureTask task3 = new FutureTask<>(new t(4, countDownLatch)); FutureTask task4 = new FutureTask<>(new t(5, countDownLatch)); service.execute(task1); service.execute(task2); service.execute(task3); service.execute(task4); countDownLatch.await(); System.out.println(task1.get()); System.out.println(task2.get()); System.out.println(task3.get()); System.out.println(task4.get()); // service.shutdown(); } static class t implements Callable> {
Listlist = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9); int sum = 0; CountDownLatch countDownLatch; public t(int sum, CountDownLatch countDownLatch) { this.sum = sum; this.countDownLatch = countDownLatch; } @Override public Listcall() throws Exception { TimeUnit.SECONDS.sleep(1L); ListreturnList = list.stream().filter(x -> x >= sum).collect(Collectors.toList()); countDownLatch.countDown(); return returnList; } } }限流操作,限制人数访问,超过预定值就等待:
package com.quxiao.controller; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; /** * @program: package1 * @author: quxiao * @create: 2023-09-27 15:22 **/ public class t3 { static final ExecutorService service = Executors.newFixedThreadPool(10); public static void main(String[] args) throws ExecutionException, InterruptedException { Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 9; i++) { final int t = i; new Thread(() -> { try { semaphore.acquire(); TimeUnit.SECONDS.sleep(5L); System.out.println(Thread.currentThread().getName() + "进来了"); System.out.println(Thread.currentThread().getName() + "出去了"); System.out.println(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { semaphore.release(); } }, "" + i).start(); } } }