注意 DataSourceTransactionManager.rollback和 TransactionStatus.setRollbackOnly区别
setRollbackOnly | rollback | |
不同点 | 控制范围小,资源释放需要配合自动事物 | 控制范围大,释放不需要自动事物管理 |
相同点 | 能回滚 | 能回滚 |
情况一手动事物死锁
情况二手动事物不锁
情况三自动+手动事物不锁表(注意手动需要开启新事物不然冲突)
情况四自动+手动事物锁表
情况五自动+手动事物锁表
情况六自动+手动事物不锁表
- 查看mysql事物锁
- //当前运行的所有事务 首先我们查看被锁表的进程
- SELECT * FROM information_schema.INNODB_TRX;
- //当前出现的锁
- SELECT * FROM information_schema.INNODB_LOCKs;
- //锁等待的对应关系
- SELECT * FROM information_schema.INNODB_LOCK_waits;
- 解决:kill事务,清理表数据,转移到历史表,检查定时任务
- 然后找到进程号,即 trx_mysql_thread_id
- 然后执行;
- kill 进程号;
注意线程不能用太多,调整mysql最大连接数5.7默认151改成500,根据cpu情况来
推荐最大核心数=cpu*2+1,但是还有其它线程使用sleep和await,所见建议处理大数据量的时候
线程控制在10以内
- @Override
- public void testTranslationOfThreads6() throws SQLException {
- //准备数据50000条
- List
list = new ArrayList<>(); - for (int i = 0; i < 50000; i++) {
- Cdr cdr = new Cdr();
- cdr.setSrc("" + i);
- cdr.setDatetime(DateUtils.formatTime(new Date()));
- list.add(cdr);
- }
- // 获取数据库连接,获取会话(内部自有事务)
- SqlSession sqlSession = sqlContext.getSqlSession();
- Connection connection = sqlSession.getConnection();
- try {
- // 设置手动提交
- connection.setAutoCommit(false);
- //获取mapper
- CdrMapper employeeMapper = sqlSession.getMapper(CdrMapper.class);
- //先做删除操作
- // employeeMapper.remove()
- //获取执行器
- ExecutorService service = ExecutorConfig.getThreadPool();
- List
> callableList = new ArrayList<>(); - //拆分list
- List
> lists = averageAssign(list, 5);
- AtomicBoolean atomicBoolean = new AtomicBoolean(true);
- for (int i = 0; i < lists.size(); i++) {
- if (i == lists.size() - 1) {
- atomicBoolean.set(false);
- }
- List
list1 = lists.get(i); - //使用返回结果的callable去执行,
- Callable
callable = () -> { - //让最后一个线程抛出异常
- if (!atomicBoolean.get()) {
- throw new JeecgBootException(001, "出现异常");
- }
- int insert = employeeMapper.insert(list1.get(0));
-
- return insert;
- };
- callableList.add(callable);
- }
- //执行子线程
- List
> futures = service.invokeAll(callableList); - for (Future
future : futures) { - //如果有一个执行不成功,则全部回滚
- if (future.get() <= 0) {
- connection.rollback();
- return;
- }
- }
- connection.commit();
- System.out.println("添加完毕");
- } catch (Exception e) {
- connection.rollback();
- log.info("error", e);
- throw new JeecgBootException(002, "出现异常");
- } finally {
- connection.close();
- }
-
- }
- @Override
- public void testTranslationOfThreads3() {
- long startTime = System.currentTimeMillis();
- //准备数据50000条
- List
list = new ArrayList<>(); - for (int i = 0; i < 50000; i++) {
- Cdr cdr = new Cdr();
- cdr.setSrc("" + i);
- cdr.setDatetime(DateUtils.formatTime(new Date()));
- list.add(cdr);
- }
- // 线程数量
- final Integer threadCount = 2;
- //每个线程处理的数据量
- final Integer dataPartionLength = (list.size() + threadCount - 1) / threadCount;
- // 创建多线程处理任务
- ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);
- CountDownLatch threadLatchs = new CountDownLatch(threadCount);
- AtomicBoolean isError = new AtomicBoolean(false);
- try {
- for (int i = 0; i < threadCount; i++) {
- // 每个线程处理的数据
- List
threadDatas = list.stream() - .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());
- studentThreadPool.execute(() -> {
- try {
- try {
- this.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);
- } catch (Throwable e) {
- e.printStackTrace();
- isError.set(true);
- } finally {
- threadLatchs.countDown();
- }
- } catch (Exception e) {
- e.printStackTrace();
- isError.set(true);
- }
- });
- }
-
- // 倒计时锁设置超时时间 30s
- boolean await = threadLatchs.await(30, TimeUnit.SECONDS);
- // 判断是否超时
- if (!await) {
- isError.set(true);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- isError.set(true);
- }
- if (!transactionStatuses.isEmpty()) {
- if (isError.get()) {
- transactionStatuses.forEach(s -> transactionManager.rollback(s));
- } else {
- transactionStatuses.forEach(s -> transactionManager.commit(s));
- }
- }
- long endTime = System.currentTimeMillis();
- log.info("共耗时:{}", (endTime - startTime) / 1000 + "秒");
- System.out.println("主线程完成");
- }