- package batchinsert;
-
- import com.alibaba.druid.pool.DruidDataSource;
- import lombok.SneakyThrows;
- import org.apache.commons.dbutils.QueryRunner;
-
- import java.sql.Connection;
- import java.sql.PreparedStatement;
- import java.util.Date;
- import java.util.Map;
- import java.util.UUID;
- import java.util.concurrent.CountDownLatch;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
-
- public class BatchInsertRunnable implements Runnable {
-
- CountDownLatch countDownLatch;
-
- int rowCount;
-
- public static DruidDataSource dataSource;
- public static String workId= UUID.randomUUID().toString();
-
- public static int batchSize=30;
-
-
- static{
- dataSource = new DruidDataSource();
- //dataSource.setDriverClassName(driverClassName);//如果不配置druid会根据url自动识别dbType,然后选择相应的driverClassName
- dataSource.setUrl("jdbc:mysql://10.8.4.214:3306/aops_asoe?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC&useLocalSessionState=true&useSSL=false&rewriteBatchedStatements=true");
- dataSource.setUsername("root");
- dataSource.setPassword("root123456");
- dataSource.setValidationQuery("SELECT 1");//用来检测连接是否有效
- dataSource.setTestOnBorrow(false);//借用连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
- dataSource.setTestOnReturn(false);//归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能
- //连接空闲时检测,如果连接空闲时间大于timeBetweenEvictionRunsMillis指定的毫秒,执行validationQuery指定的SQL来检测连接是否有效
- dataSource.setTestWhileIdle(true);//如果检测失败,则连接将被从池中去除
- dataSource.setTimeBetweenEvictionRunsMillis(60000);//1分钟
- dataSource.setMaxActive(20);
- dataSource.setInitialSize(20);
-
- }
-
- public BatchInsertRunnable(CountDownLatch countDownLatch, int rowCount) {
- this.countDownLatch = countDownLatch;
- this.rowCount = rowCount;
- }
-
- @SneakyThrows
- public void dorun() {
-
- Connection connection = dataSource.getConnection();
- connection.setAutoCommit(false);
- QueryRunner queryRunner=new QueryRunner();
- String sql = "insert into asoe_hi_actinst(id,activity_id,activity_name,activity_type,duration,end_time,exec_id,flow_index,proc_def_id,proc_inst_id,start_time) values (?,?,?,?,?,?,?,?,?,?,?)";
- PreparedStatement preparedStatement = connection.prepareStatement(sql);
-
- for(int i=1;i<=rowCount;++i){
- preparedStatement.setObject(1, UUID.randomUUID().toString());
- preparedStatement.setObject(2, UUID.randomUUID().toString());
- preparedStatement.setObject(3, UUID.randomUUID().toString());
- preparedStatement.setObject(4, UUID.randomUUID().toString());
-
-
- preparedStatement.setObject(5, 0);
- preparedStatement.setObject(6, new Date());
- preparedStatement.setObject(7, UUID.randomUUID().toString());
- preparedStatement.setObject(8, 0);
-
- preparedStatement.setObject(9,"batch");
- preparedStatement.setObject(10,"batch");
- preparedStatement.setObject(11, new Date());
-
- preparedStatement.addBatch();
-
- if(i%batchSize==0){
- preparedStatement.executeBatch();
- preparedStatement.clearBatch();
- }
- }
- if(rowCount%batchSize!=0){
- preparedStatement.executeBatch();
- preparedStatement.clearBatch();
- }
- connection.commit();
- preparedStatement.close();
- connection.close();
- countDownLatch.countDown();
- }
-
- public static void main(String[] args) throws InterruptedException {
-
- // int threadCount=4;
- // int perRowCount=100000;
- //191502
- //batch=1 271851
-
- // int threadCount=8;
- // int perRowCount=50000;
- //158645
- //batch=1 231818
-
- int threadCount=1;
- int perRowCount=400000;
- //167135
- //batch=1 782624
-
- CountDownLatch latch=new CountDownLatch(threadCount);
-
- ThreadPoolExecutor executor=new ThreadPoolExecutor(threadCount,threadCount ,1000 ,TimeUnit.SECONDS,new LinkedBlockingQueue<>());
-
- DruidDataSource dataSource = BatchInsertRunnable.dataSource;
- long start = System.currentTimeMillis();
- for(int i=0;i
- executor.submit(new BatchInsertRunnable(latch, perRowCount));
- }
- latch.await();
- System.out.println("timeuse:"+(System.currentTimeMillis()-start));
- }
-
- @Override
- public void run() {
- try {
- dorun();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
- //timeuse:17268
- //timeuse:18669
- //timeuse:12135
-
-
-
如果不用批量插入(batchSize=1),插入400000行
线程数 耗时ms 1 782624 8 231818
如果用了批量插入
线程数 耗时ms 1 167135 4 191502 8 158645
结论:不使用批量插入的话,多线程确实能提高插入速度,如果使用批量插入,单线程多线程都差不多(其实和kafka批量提交差不多)。