• 性能优化-如何爽玩多线程来开发


    前言

    多线程大家肯定都不陌生,理论滚瓜烂熟,八股天花乱坠,但是大家有多少在代码中实践过呢?很多人在实际开发中可能就用用@Async,new Thread()。线程池也很少有人会自己去建,默认的随便用用。在工作中大家对于多线程开发,大多是用在异步,比如发消息,但是对于提效这块最重要的优势却很少有人涉及。因此本篇文章会结合我自己的工作场景带大家去发掘项目中的多线程场景,让你的代码快如闪电。

    多线程普及

    多线程解决了什么问题?带来了什么问题?

    Cpu为了均衡与内存的速度差异,增加了缓存--导致了可见性问题

    操作系统增加了进程和线程,分时复用CPU,进而均衡CPU与IO设备的速度差异--导致了原子性问题

    编译程序优化指令排序(JVM指令重排序)--导致了有序性问题

    可见性问题--线程A修改共享变量,修改后CPU缓存中的数据没有及时同步到内存,线程B读取了内存中老数据

    原子性问题--多个线程增加数据,有几个线程挂了,这数据就少了

    有序性问题--经典的对象创建三步,堆中分配内存-->初始化-->变量指向内存地址,如果排序重排会出现132,导致没有初始化的对象被创建

    JVM提供了什么工具去解决线程不安全问题?Java代码有哪些实现思路?

    JVM提供了三个关键词,synchronized、volatile、final和JMM(线程操作内存规范,例如8个happen before原则)

    Java代码实践可从三方面入手

    1. 同步:synchronized和ReentrantLock
    2. 非同步:CAS
    3. 线程安全:局部变量(虚拟机栈或者本地方法栈,线程私有)和ThreadLocal(本地线程变量副本,空间换安全,每个线程一份)

    如何开启线程?

    基础的Thread、runable、callable,进阶的ThreadExecutor和Future,以及JDK8的终极武器CompletableFuture

    线程间如何协作?

    基础三件套

    1. join()--Thread的join方法,在b线程中调用a.join(),b会等a执行完毕
    2. wait() notify() notifyAll()--Object类自带,等待与唤醒
    3. await() signal() signalAll()--JUC包中Condition类自带,与wait类似,但是增加了条件参数更加自由

    进阶的有JDK5开始提供的Semaphore(信号量)、CyclicBarrier、CountDownLatch以及JDK8的CompletableFuture

    场景实战

    多线程处理场景

    并行聚合处理数据

    以下案例主要运用CompletableFuture.allOf()方法,将原本串行的操作改为并行。本案例相对比较常规,算是CompletableFuture的基本操作,其他特性就不一一介绍了。

    1. AtomicReference<List<SellOrderList>> orderLists = new AtomicReference<>();
    2. AtomicReference<List<InternationalSalesList>> salesLists = new AtomicReference<>();
    3. AtomicReference<Map<String, BigDecimal>> productMap = new AtomicReference<>();
    4. .........
    5. //逻辑A
    6. CompletableFuture<Void> orderListCom =
    7. CompletableFuture.runAsync(() -> {
    8. orderLists.set(sellOrderListService.lambdaQuery()
    9. .ge(SellOrderList::getOrderCreateDate, startDate)
    10. .le(SellOrderList::getOrderCreateDate, endDate)
    11. .eq(SellOrderList::getIsDelete, 0).list());
    12. });
    13. CompletableFuture<Void> productCom = CompletableFuture.runAsync(() -> {
    14. //逻辑B});
    15. CompletableFuture<Void> euLineCom = CompletableFuture.runAsync(() -> {
    16. //逻辑C});
    17. //汇总线程操作
    18. CompletableFuture.allOf(orderListCom, productCom, euCloudCom).handle((res, e) -> {
    19. if (e != null) {
    20. log.error("客户订单定时任务聚合数据异常", e);
    21. } else {
    22. try {
    23. //获取全部数据后处理数据
    24. aggregateData(customerList, saleMonth, orderLists, salesLists, productMap, euLineList, asLineList,
    25. euCloudList, asCloudList, itemMap, deliveryMap, parities);
    26. } catch (Exception ex) {
    27. log.error("客户订单处理数据异常", ex);
    28. }
    29. }
    30. return null;
    31. });
    32. 复制代码

    修改for循环为并行操作

    这里借鉴了parallelStream流的思路,将串行的for循环分割成多个集合后,对分割后的集合进行循环。这应该是最普遍的多线程应用场景了,需要注意的是线程池需要自定义大小、不安全的集合例如ArrayList并行add时需要加锁,加好日志就完事了。

    1. //自建线程池,ForkJoinPool默认的太小,一般是逻辑CPU数量-1
    2. int logicCpus = Runtime.getRuntime().availableProcessors();
    3. ForkJoinPool forkJoinPool = new ForkJoinPool(logicCpus * 80);
    4. //指定集合大小,避免频繁扩容
    5. List<RedundantErpSl> slAddList = new ArrayList<>(50000);
    6. //谷歌提供工具类切分集合--import com.google.common.collect.Lists;
    7. List<List<SlErpDTO>> partition = Lists.partition(slErpList, 1000);
    8. int finalLastStatus = lastStatus;
    9. CompletableFuture<Void> handle = CompletableFuture.allOf(partition.stream().map(addPartitionList ->
    10. CompletableFuture.runAsync(() -> {
    11. for (SlErpDTO slErp : addPartitionList) {
    12. //TODO 逻辑处理
    13. synchronized (slAddList) {
    14. //ArrayList线程不安全,多线程会出现数据覆盖,体现为数据丢失
    15. slAddList.add(sl);
    16. }
    17. }
    18. }, forkJoinPool)).toArray(CompletableFuture[]::new))
    19. .whenComplete((res, e) -> {
    20. if (e != null) {
    21. log.error("多线程组装数据失败", e);
    22. } else {
    23. try {
    24. //进一步处理循环后的结果
    25. slService.batchSchedule(versionNum, slAddList);
    26. } catch (Exception ex) {
    27. log.error("批量插入失败", ex);
    28. }
    29. }
    30. });
    31. handle.join();
    32. 复制代码

    多线程新增

    我个人在开发中会使用一些小工具来提高开发效率,接下来公开一个我常用的批量插入的小工具,这个小工具最开始是同事给我的,然后我做了优化和扩充,主要是扩充了多线程以及service块的代码。

    总览

    该工具类用于生成复制可用的代码,这里需要提前指定一些固定变量。除了entity和serviceName需要根据实际情况变化之外,方法名和参数名可以不变。生成了四个方法,分别是mapper类中的方法、mapper.xml中的foreach批量插入代码、普通无事务的多线程批量插入代码、多线程事务代码

    1. //批量方法名,对应mapper和xml中id
    2. String methodName = "batchSchedule";
    3. //mapper参数名称
    4. String paramName = "addList";
    5. //实际代码里面的service命名
    6. String serviceName = "baseInfoService";
    7. Class<?> entity = BudgetBase.class;
    8. //批量插入
    9. printMapper(entity.getSimpleName(), methodName, paramName);
    10. printXml(entity, methodName, paramName);
    11. //普通多线程批量插入,无事务
    12. printSave(entity.getSimpleName(), serviceName, paramName, 1000);
    13. //多线程事务,慎用
    14. printAddTransaction(entity.getSimpleName(), paramName, 1000);
    15. 复制代码

    mapper方法

    xml批量插入语句

    多线程批量插入

    这个多线程插入其实就是我上面多线程处理场景中for循环改造的变种,将集合拆分进行并行批量插入

    1. if (CollectionUtils.isNotEmpty(addList)) {
    2. List<List<BudgetBase>> partition = Lists.partition(addList, 1000);
    3. CompletableFuture.allOf(partition.stream().map(addPartitionList ->
    4. CompletableFuture.runAsync(() -> baseInfoService.getBaseMapper().batchSchedule(addPartitionList)))
    5. .toArray(CompletableFuture[]::new))
    6. .exceptionally(e -> {
    7. log.error("多线程处理异常", e);
    8. return null;
    9. });
    10. }
    11. 复制代码

    花里胡哨-多线程事务提交

    这个才是本文的重点,接下来我会详细介绍我在开发中遇到的坑和知识点,敲黑板了啊,重点来了!

    我写的这个多线程事务本质就是根据2PC理论手写了一个分布式事务,涉及到多线程、Spring事务、ThreadLocal、LockSupport这些知识点,在线上一定要慎重使用,最好不用,可作炫技用,秀就完了。

    深刻理解Spring事务、ThreadLocal

    从头说起,既然是多线程事务,那自然不能使用注解@Transactional去开启事务,Spring事务采用ThreadLocal来做线程隔离,ThreadLocalMap内部key为当前线程的ThreadLocal对象,也可以当作以当前线程为key,value也是个map,看源码可以知道,map里面key为数据源,value为数据库连接。

    当然上来看源码,肯定认识不够深刻,接下来是一段错误代码示范,充分展示了理解上面那段话的重要性。我的第一次失败就是如下一段代码,首先肯定是能运行的,不能运行的例子我就不给了,先来看看这段代码。

    1. //存储事务集合
    2. List<TransactionStatus> traStatusList = new ArrayList<>();
    3. //最外部更新或者删除时手动创建一个新事务
    4. DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    5. def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    6. def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
    7. TransactionStatus statusStart = transactionManager.getTransaction(def);
    8. traStatusList.add(statusStart);
    9. //外部DML操作
    10. lambdaUpdate().set(RedundantErpSl::getIsDelete, 1).set(RedundantErpSl::getUpdateTime, new Date())
    11. .eq(RedundantErpSl::getVersionNum, versionNumber).eq(RedundantErpSl::getIsDelete, 0).update();
    12. List<List<RedundantErpSl>> partition = Lists.partition(RedundantErpSlList, 1000);
    13. try {
    14. CompletableFuture<Void> future = CompletableFuture.allOf(partition.stream().map(addPartitionList ->
    15. CompletableFuture.runAsync(() -> {
    16. //Spring事务内部由ThreadLocal存储事务绑定信息,因此需要每个线程新开一个事务
    17. DefaultTransactionDefinition defGo = new DefaultTransactionDefinition();
    18. defGo.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    19. defGo.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED);
    20. TransactionStatus statusGo = transactionManager.getTransaction(defGo);
    21. //ArrayList线程不安全,多线程会出现数据覆盖,体现为数据丢失
    22. synchronized (traStatusList) {
    23. traStatusList.add(statusGo);
    24. }
    25. getBaseMapper().batchSchedule(addPartitionList);
    26. })).toArray(CompletableFuture[]::new))
    27. .exceptionally(e -> {
    28. log.error("批量导入出现异常", e);
    29. //向外抛出异常,确保最外面catch回滚
    30. throw new PtmException(e.getMessage());
    31. });
    32. future.join();
    33. for (TransactionStatus status : traStatusList) {
    34. transactionManager.commit(status);
    35. }
    36. } catch (Exception e) {
    37. log.error("批量导入出现异常回滚开始", e);
    38. for (TransactionStatus status : traStatusList) {
    39. transactionManager.rollback(status);
    40. }
    41. }
    42. 复制代码

    先说说这个错误例子我当时开发的思路,手动开启事务后,在每个线程操作开始的时候都创建一个事务,Spring事务传播级别用的TransactionDefinition.PROPAGATION_REQUIRES_NEW,即默认创建新事务。隔离级别一开始没改,然后我就尝试着操作了一下,好家伙批量新增的时候直接锁了。

    查看正在锁的事务 SELECT * FROM INFORMATION_SCHEMA.INNODB_LOCKS;

    查看等待锁的事务 SELECT * FROM INFORMATION_SCHEMA.INNODB_LOCK_WAITS;

    异常如下图,锁超时异常

    第一次看见下图这个错的时候,我是疑惑的,没有当回事,以为是多数据源的问题。我项目里有直连Oracle和MySQL两种关系型数据库,当时怀疑是多数据源事务没有正确解绑导致的问题。

    PS:事实上这个坑给足了我提示,根本原因就是多线程事务解绑失败,但是我理解出现了偏差,为后文埋下了伏笔。

    我当时一看有锁,这我能惯着,马上修改事务隔离级别为TransactionDefinition.ISOLATION_READ_COMMITTED读已提交(MySQL默认事务隔离级别为可重复读,这里下调了一级,总共四级)。

    顺利插入但还是报上面这个错,错误位置是下面这个循环提交时报的,第二次循环的时候一定会报错。

    1. for (TransactionStatus status : traStatusList) {
    2. transactionManager.commit(status);
    3. }
    4. 复制代码

    当时一度以为是多数据源的问题,但是Debug后发现resource里面只有一个数据源key,解绑一次后就没了,第二个循环解绑的时候就报上面这个错,因为找不到可以解绑的key了。我就很疑惑,为啥就一个数据源key,我不是在别的线程开了事务嘛,按理说开了多少个线程就有多少个事务,这个问题困扰了我大概一天左右的时间。然后我想到了Spring事务的实现原理ThreadLocal,然后联想到我的多线程开启事务,再看到我在主线程里面进行傻叉循环解绑,我瞬间为梦想窒息。

    所以破案了,我在主线程是操作不了子线程事务,这也是代码报key找不到的原因,因为用主线程做key在ThreadLocal里肯定是拿不到子线程信息的,只能拿到主线程自己的。

    多线程事务提交方案

    因此解决方案就很简单,子线程的事务自己操作,那么多线程事务处理哪家强,JDK里找CompletableFuture!当然这里使用CountDownLatch也是可行的,网上也有案例。多线程事务在处理逻辑上其实和分布式事务很像,因此我这里采用2PC的思想,一阶段所有子线程全部开启事务并执行SQL,然后阻塞等待,二阶段判断是否全部成功,是就唤醒所有线程提交事务,否就全部回滚。

    1. -----------需要注入Bean,一个是Spring Boot事务管理,一个是线程池-----------
    2. @Autowired
    3. private PlatformTransactionManager transactionManager;
    4. @Autowired
    5. @Qualifier("ioDenseExecutor")
    6. private ThreadPoolTaskExecutor ioDense;
    7. -----------多线程事务新增操作-----------
    8. private void batchSchedule(List<BudgetBase> addList) {
    9. if (!CollectionUtils.isEmpty(addList)) {
    10. //定义局部变量,是否成功、顺序标识、等待线程队列
    11. AtomicBoolean isSuccess = new AtomicBoolean(true);
    12. AtomicInteger cur = new AtomicInteger(1);
    13. List<Thread> unfinishedList = new ArrayList<>();
    14. //切分新增集合
    15. List<List<BudgetBase>> partition = Lists.partition(addList, 1000);
    16. int totalSize = partition.size();
    17. //多线程处理开始
    18. CompletableFuture<Void> future =
    19. CompletableFuture.allOf(partition.stream().map(addPartitionList -> CompletableFuture.runAsync(() -> {
    20. //Spring事务内部由ThreadLocal存储事务绑定信息,因此需要每个线程新开一个事务
    21. DefaultTransactionDefinition defGo = new DefaultTransactionDefinition();
    22. defGo.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    23. TransactionStatus statusGo = transactionManager.getTransaction(defGo);
    24. int curInt = cur.getAndIncrement();
    25. try {
    26. log.info("当前是第{}个线程开始启动,线程名={}", curInt, Thread.currentThread().getName());
    27. baseInfoService.getBaseMapper().batchSchedule(addPartitionList);
    28. log.info("当前是第{}个线程完成批量插入,开始加入等待队列,线程名={}", curInt, Thread.currentThread().getName());
    29. //ArrayList线程不安全,多线程会出现数据覆盖,体现为数据丢失
    30. synchronized (unfinishedList) {
    31. unfinishedList.add(Thread.currentThread());
    32. }
    33. log.info("当前是第{}个线程已加入队列,开始休眠,线程名={}", curInt, Thread.currentThread().getName());
    34. notifyAllThread(unfinishedList, totalSize, false);
    35. LockSupport.park();
    36. if (isSuccess.get()) {
    37. log.info("当前是第{}个线程提交,线程名={}", curInt, Thread.currentThread().getName());
    38. transactionManager.commit(statusGo);
    39. } else {
    40. log.info("当前是第{}个线程回滚,线程名={}", curInt, Thread.currentThread().getName());
    41. transactionManager.rollback(statusGo);
    42. }
    43. } catch (Exception e) {
    44. log.error("当前是第{}个线程出现异常,线程名={}", curInt, Thread.currentThread().getName(), e);
    45. transactionManager.rollback(statusGo);
    46. isSuccess.set(false);
    47. notifyAllThread(unfinishedList, totalSize, true);
    48. }
    49. }, ioDense)).toArray(CompletableFuture[]::new));
    50. future.join();
    51. }
    52. }
    53. private void notifyAllThread(List<Thread> unfinishedList, int totalSize, boolean isForce) {
    54. if (isForce || unfinishedList.size() >= totalSize) {
    55. log.info("唤醒当前所有休眠线程,线程数={},总线程数={},是否强制={}", unfinishedList.size(), totalSize, isForce);
    56. for (Thread thread : unfinishedList) {
    57. log.info("当前线程={}被唤醒", thread.getName());
    58. LockSupport.unpark(thread);
    59. }
    60. }
    61. }
    62. 复制代码

    方案详解

    为什么用LockSupport的park()和unpark()而不用Thread.sleep()、Object.wait()、Condition.await()?

    1. 更简单,不需要获取锁,能直接阻塞线程。
    2. 更直观,以thread为操作对象更符合阻塞线程的直观定义;
    3. 更精确,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程);
    4. 更灵活 ,unpark方法可以在park方法前调用。

    第4点很重要,如果不能提前使用unpark()的话,按照代码逻辑最后一个线程会被永久阻塞。

    为什么要自建线程池?

    CompletableFuture默认的线程池ForkJoinPool.commonPool()偏向于计算密集型任务处理,核心线程数和逻辑CPU数少1,对于多线程事务这种IO密集型任务来说核心线程数偏少。并且上述方法在操作中都是阻塞线程,无法一次性开启全部线程的话,会导致notifyAllThread方法无法执行,老线程阻塞新线程无法开启,就尬住了。

    ForkJoinPool基于工作窃取算法,所以最适合的是计算密集型任务,这里我们开启一个参数调整为IO密集型(多核心少队列)的ThreadPoolTaskExecutor线程池即可。

    注意MySQL/Druid等数据库的最大连接数

    使用多线程的时候也别忘了调整其他组件的最大连接数。Druid线程池这个代码配置可以调,MySQL5.7默认151得用配置文件调整。MySQL最大连接数调整的方法之前从零开始的SQL修炼手册-实战篇有讲解过,欢迎读者们翻翻我之前写的干货。

    写在最后

    排除掉聊聊Java的两篇学习总结,这算是我正式写的第十篇原创文章了,值得纪念。本期干货满满,开篇普及了一些多线程的基础知识,再配合场景进行一波实战,有详细代码、有流程图、有方法详解,自我感觉是比较清晰的文章了。希望我在工作中的小技巧,能给大家提提效,能有所收获。

    接下来聊点别的,讲讲这三个多月写文章的感受。总计十篇原创文章,3个多月的跨度,差不多一周半一篇,说实话这速度我个人来说是比较满意的。因为工作实在是太紧张了,我只能抽工作日晚上,周末时间来写文章,原创文章其实很麻烦,选题、构思、成文、查资料、总结一堆事。我想写原创文章的初衷来源于我想建立自己的知识体系,顺带总结下我的学习和实践经验,每一篇文章都是成体系的知识,我不会拆分成几篇来写,并且会尽量把相关的知识也融入进来。

    这十篇文章里面,我个人觉得写的最好的是Filebeat+Kafka+数据处理服务+Elasticsearch+Kibana+Skywalking日志收集系统、从零开始的SQL修炼手册以及这篇性能优化-如何爽玩多线程来开发。目前从数据来看,日志收集系统无疑是最好的,我第一篇破千阅读的文章,这篇文章是我写的第二篇文章也是特别用心的一篇文章,从架构的角度剖析了我在设计日志收集系统的三次迭代过程。但是从比例来看,从零开始的SQL修炼手册的数据是最离谱的,500多的阅读量竟然有20的收藏数,哈哈。我也对我之前的文章会时不时的更新一波,期待大家回顾哈。

    也回顾下之前的其他文章吧,不能厚此薄彼。

    • 第一篇原创文章Spring Boot Starter开发指北(案例+代码地址),算是开了个引子,第一次写文章,碎碎念很多,现在看也是感慨良多。
    • 后端思想-如何抽取业务组件这一篇没上掘金推荐,我个人感觉写的一般,但是想写点东西但是没写出来。
    • 后端思想-单点登录组件的设计与思考这一篇挺让我意外的,上推荐了并且数据还不错,但是细细回想,这一篇我没有普及什么知识,只是详细的把我设计和开发过程给描述了一遍,流程图也是之前给同事讲的时候画的,这一篇文章写的很顺,没有额外查什么资料。
    • 从零开始的SQL修炼手册-理论篇这一篇其实也是知识的总结,但是可能理论的东西太多了吧,噎住了,实际上我是想和实战篇放一块的,但是篇幅太长掘金放不下,只能拆开了。
    • 后端思想-如何设计一个操作和管理Excel的业务模块这一篇是业务组件那一篇的拓展,真实记录了我在工作中设计的excel模块的全部流程和操作,很奇怪这篇为啥没上掘金推荐。
    • 分布式事务Seata-1.5.2使用全路线指北这一篇是我第一次写配置说明书,也是我那会正好升级seata1.5.2,看网上没人写我就写了,第一个吃螃蟹就得冲冲冲。
    • 设计方案-定时任务接口数据存储及更新策略这一篇也是很意外,因为我在构想时,这篇文章实际上是我对本篇多线程的一个铺垫,没想到大家这么喜欢,说实话让我捉摸不透了,懵圈了。

    总结完毕,接下来我会继续更新好的文章,提升自我的同时也让大家一起提升,UPUP。我在工作中也经常和同事说让他们自己写点文章去总结自己的知识,因为我觉得这是一个很好的总结和提升自我的方式,一方面锻炼了文笔,一方面也加深了自己对所学知识的理解。最后,希望读者大大们看了我的文章后,能有所收获,评论互动大大的欢迎。

  • 相关阅读:
    Spring之IOC
    【pandas小技巧】--字符串转数值
    视频调整帧率、分辨率+音画同步
    InputMan12.0J
    电脑入门:电脑键位中英文对照表、电脑开始菜单运行里常用的命令
    知识经验分享——YOLOv5-6.0训练出错及解决方法(RuntimeError)
    前端|babel升级
    Linux命令type和which的区别
    MinIO使用
    《Video MAE》何恺明团队提出视频版本的MAE,进行高效视频预训练!Mask Ratio高达90%时效果很好!...
  • 原文地址:https://blog.csdn.net/m0_73311735/article/details/126745389