import lombok.AllArgsConstructor;
import org.testng.collections.Maps;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* 多线程事务
*
* @author zhs
* @date 2023/9/4 15:40
*/
public class MultiThreadTransaction {
/**
* 线程池
*/
public static final ExecutorService threadPool =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
/**
* jdbc工具类
*/
static class JdbcUtil {
/**
* 数据库类型枚举
*/
@AllArgsConstructor
static enum DatabaseTypeEnum {
MYSQL("com.mysql.cj.jdbc.Driver", "jdbc:mysql://"),
ORACLE("oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@");
String driverName;
String prefix;
}
public static Connection getDefaultConnection() {
String url = DatabaseTypeEnum.MYSQL.prefix + "127.0.0.1:3306/my-test-database";
String username = "root";
String password = "1";
return getConnection(DatabaseTypeEnum.MYSQL, url, username, password, true);
}
/**
* 要使用JDBC获取数据源连接,你需要执行以下步骤:
* 1. 导入JDBC驱动程序:首先,你需要将数据库的JDBC驱动程序添加到你的项目中。这通常是通过将驱动程序的JAR文件添加到项目的类路径中来实现的。不同的数据库有不同的JDBC驱动程序,你需要根据你使用的数据库选择相应的驱动程序。
* 2. 加载驱动程序:在你的Java代码中,使用 Class.forName() 方法加载数据库驱动程序。例如,对于MySQL数据库,可以使用以下代码加载驱动程序:
* 3. 建立连接:使用 DriverManager.getConnection() 方法建立与数据库的连接。你需要提供数据库的连接URL、用户名和密码。以下是一个示例:
*
* @date 2023/9/4 15:32
* @param url
* @param userName
* @param password
* @param isEnableAutoCommit 是否启用事务
* @return java.sql.Connection
*/
public static Connection getConnection(DatabaseTypeEnum datasourceType, String url, String userName, String password, boolean isEnableAutoCommit) {
// 1. 导入JDBC驱动程序
try {
Class.forName(datasourceType.driverName);
} catch (ClassNotFoundException e) {
e.printStackTrace();
throw new RuntimeException("load driver error!");
}
// 2. 建立连接
Connection connection;
try {
connection = DriverManager.getConnection(url, userName, password);
if (isEnableAutoCommit) {
// 开启事务, 设置为手动提交
connection.setAutoCommit(false);
}
return connection;
} catch (SQLException e) {
e.printStackTrace();
throw new RuntimeException("get jdbc connection error!");
}
}
}
// 阻塞主线程
static CountDownLatch childCountDownLatch = new CountDownLatch(3);
// 阻塞所有子线程
static CountDownLatch parentCountDownLatch = new CountDownLatch(1);
// 是否首次拿到行锁
static AtomicBoolean isFirst = new AtomicBoolean(false);
// 是否等待超时 (volatile失效线程可见, 线程可以获取到主内存的最新数据)
static volatile Boolean isExpire = false;
/**
* 记录子线程执行状态
*
* 由于需要线程可见性, 所以使用volatile修饰
*
*/
static volatile Map<Thread, Boolean> childThreadExecuteFlagHolder =
Maps.newLinkedHashMap();
public static void main(String[] args) throws InterruptedException {
/*
* 1、如果update 不同id, 是否可以实现多线程事务的“原子性”?
* 2、如果update 同一个id, 持有相同的行锁, 会不会导致java代码死锁?
* 3、如果死锁产生, 并且主线程等待超时(childCountDownLatch), 要如何处理每个子线程的后续事务? 同时回滚?
*/
// 开启三个线程, 分别执行一条sql
new Thread(() -> {
execute(1, null);
}, "线程1").start();
new Thread(() -> {
execute(1, () -> {
//throw new RuntimeException("测试线程2手动抛出一个异常!");
});
}, "线程2").start();
new Thread(() -> {
execute(1, null);
}, "线程3").start();
// 阻塞主线程, 所有子线程的sql部分执行完毕, 才能继续往下执行
if (!childCountDownLatch.await(5, TimeUnit.SECONDS)) {
// 如果等待超时, 则将状态标志位置为false, 全局回滚
isExpire = true;
}
System.out.println("所有子线程的sql部分执行完毕, 开始处理事务, 主线程通知中...");
// 通知子线程, 执行最后的事务处理
parentCountDownLatch.countDown();
System.out.println("多线程事务处理完毕...");
Thread.sleep(200);
System.out.println("---------------------");
System.out.println("线程名称 | 线程执行状态");
System.out.println("---------------------");
childThreadExecuteFlagHolder.forEach((k, v) -> {
System.out.println(k.getName() + " | " + v);
});
System.out.println("---------------------");
}
private static void execute(Integer id, Runnable doSomething) {
// 获取数据库连接
Connection connection = JdbcUtil.getDefaultConnection();
System.out.println("线程[" + Thread.currentThread().getName() + "]获取到连接...");
// 初始化所有子线程的执行状态为false, 防止出现sql加到同一个行锁, 而导致java代码死锁 (childCountDownLatch)
// 当childCountDownLatch等待超时后, 默认执行的都是false, 则所有子线程都会回滚
childThreadExecuteFlagHolder.put(Thread.currentThread(), false);
try {
Thread.sleep(100);
// 执行sql
executeSql(id, connection);
// doSomething, 可以抛异常
Optional.ofNullable(doSomething).ifPresent(Runnable::run);
// 执行成功, 将当前线程的执行状态更新为成功
childThreadExecuteFlagHolder.put(Thread.currentThread(), true);
} catch (Exception e) {
e.printStackTrace();
// 执行失败, 将当前线程的执行状态更新为失败
childThreadExecuteFlagHolder.put(Thread.currentThread(), false);
System.out.println(String.format("线程[%s]捕获到异常, 则将状态设置为失败!", Thread.currentThread().getName()));
} finally {
// (表示当前子线程的sql部分已经执行完毕) 当所有子线程都执行了countDown, 使得主线程可以往下执行
childCountDownLatch.countDown();
try {
// 【阻塞】等待所有子线程执行完毕, 主线程会通知这里, 解除阻塞
parentCountDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 最后处理线程事务回滚、提交
transactionProcess(connection);
}
}
/**
* 执行sql
*
* @param id
* @param connection
*/
private static void executeSql(Integer id, Connection connection) throws SQLException {
// connection.setSchema("my-test-database");
String sql = "update BAS_USER set is_deleted = 1 where id = ?";
// 发送一次请求, 执行预编译, 防止sql注入
PreparedStatement preparedStatement = connection.prepareStatement(sql);
// 设置sql中占位符的具体值
preparedStatement.setInt(1, id);
// 执行sql
preparedStatement.executeUpdate();
// 首个拿到行锁的线程, 打印出线程信息
if (isFirst.compareAndSet(false, true)){
// 如果是抛出异常的线程, 率先拿到锁的话, 无法失效多线程事务原子性
System.out.println(String.format("--------> %s 首先拿到行锁了", Thread.currentThread().getName()));
}
}
/**
* 处理事务回滚、提交
*
* @date 2023/9/4 16:13
* @param connection
*/
private static void transactionProcess(Connection connection) {
try {
// 如果是等待超时, 则回滚所有线程事务
if (isExpire) {
System.out.println("等待超时, 回滚所有线程事务!");
connection.rollback();
} else {
// 如果有一个子线程失败, 则全部回滚;
if (childThreadExecuteFlagHolder.values().stream().anyMatch(e -> !e)) {
connection.rollback();
System.out.println(String.format("线程[%s]回滚事务!", Thread.currentThread().getName()));
} else {
connection.commit();
System.out.println(String.format("线程[%s]成功提交事务!", Thread.currentThread().getName()));
}
}
} catch (SQLException e) {
e.printStackTrace();
}finally {
// 关闭连接资源
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}