• net core天马行空系列-各大数据库快速批量插入数据方法汇总


    1.前言

    hi,大家好,我是三合。我是怎么想起写一篇关于数据库快速批量插入的博客的呢?事情起源于我们工作中的一个需求,简单来说,就是有一个定时任务,从数据库里获取大量数据,在应用层面经过处理后再把结果批量插入回到数据库里。这个任务每十分钟执行一次,但是有的时候数据量太大,循环插入数据库的时候会超时,导致任务失败,所以这个时候我就开始研究怎么快速批量插入数据库,因为我们用的数据库是Oracle,所以我首先研究了Oracle的快速批量插入,后面我一想那其他类型的数据库肯定也有这样的需求,于是我在找了很多资料,并且反复实验后,终于完美解决了mysql,sqlServer以及Oracle的快速批量插入,sqlite自身不支持,所以没有sqlite,特地整理成这篇文章,分享给大家。

    2.测试前准备

    添加一个具有绝大多数类型属性的实体类,用来完整测验批量插入效果,该实体类用于mysql和sqlserver的测试。

    public class NullableTable
    {
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        [Key]
        public int Id { get; set; }
        [Description("Int2")]
        public int? Int2 { get; set; }
        [Description("Long2")]
        public long? Long2 { get; set; }
    
        public float? Float2 { get; set; }
        public double? Double2 { get; set; }
    
        public decimal? Decimal2 { get; set; }
    
        [DecimalPrecision(20,4)]
        public decimal? Decimal3 { get; set; }
    
        public Guid? Guid2 { get; set; }
    
        public short? Short2 { get; set; }
    
        public DateTime? DateTime2 { get; set; }
    
        public bool? Bool2 { get; set; }
    
        public TimeSpan? TimeSpan2 { get; set; }
    
        public byte? Byte2 { get; set; }
    
    
        [StringLength(100)]
        public string String2 { get; set; }
        public string String3 { get; set; }
    
        public Enum2? Enum2 { get; set; }
    
        [Column("TestInt3")]
        [Description("Int2")]
        public int? Int3 { get; set; }
    }
    
     public enum Enum2
        {
            x,
            y
        }
    
    折叠

    因为oracle数据库我们习惯于表名和字段名大写,所以oracle的测试实体类定义如下:

    [Table("NULLABLETABLE")]
    [Description("NullableTable")]
    public class NullableTable
    {
        [DatabaseGenerated(DatabaseGeneratedOption.Identity)]
        [Key]
        [Column("ID")]
        public int Id { get; set; }
        [Description("Int2")]
        [Column("INT2")]
        public int? Int2 { get; set; }
        [Description("Long2")]
        [Column("LONG2")]
        public long? Long2 { get; set; }
        [Column("FLOAT2")]
        public float? Float2 { get; set; }
        [Column("DOUBLE2")]
        public double? Double2 { get; set; }
        [Column("DECIMAL2")]
        public decimal? Decimal2 { get; set; }
        [Column("DECIMAL3")]
        [DecimalPrecision(20,4)]
        public decimal? Decimal3 { get; set; }
        [Column("GUID2")]
        public Guid? Guid2 { get; set; }
        [Column("SHORT2")]
        public short? Short2 { get; set; }
        [Column("DATETIME2")]
        public DateTime? DateTime2 { get; set; }
        [Column("BOOL2")]
        public bool? Bool2 { get; set; }
        [Column("TIMESPAN2")]
        public TimeSpan? TimeSpan2 { get; set; }
        [Column("BYTE2")]
        public byte? Byte2 { get; set; }
    
        [Column("STRING2")]
        [StringLength(100)]
        public string String2 { get; set; }
        [Column("STRING3")]
        public string String3 { get; set; }
        [Column("ENUM2")]
        public Enum2? Enum2 { get; set; }
    
        [Column("TESTINT3")]
        [Description("Int2")]
        public int? Int3 { get; set; }
    }
    
    折叠

    实验我们采用的是code first,先利用SummerBoot框架的可用于依赖注入的,数据库表和c#实体类互相转换的接口实现功能从实体类生成相应的数据库表,本次实验批量插入2w条数据来对比时间,定义一个列表,用循环的方式给这个列表添加2w条数据。

    var nullableTableList3 = new List();
    var now = DateTime.Now;
    for (int i = 0; i < 20000; i++)
    {
        var a = new NullableTable()
        {
            Int2 = 2,
            Bool2 = true,
            Byte2 = 1,
            DateTime2 = now,
            Decimal2 = 1m,
            Decimal3 = 1.1m,
            Double2 = 1.1,
            Float2 = (float)1.1,
            Guid2 = Guid.NewGuid(),
            Id = 0,
            Short2 = 1,
            TimeSpan2 = TimeSpan.FromHours(1),
            String2 = "sb",
            String3 = "sb",
            Long2 = 2,
            Enum2 = Model.Enum2.y,
            Int3 = 4
        };
        nullableTableList3.Add(a);
    }
    

    数据库驱动上的选择是这样的,sqlserver采用微软官方驱动System.Data.SqlClient,oracle采用官方驱动Oracle.ManagedDataAccess.Core,mysql采用社区驱动MySqlConnector(为啥mysql不采用官方的驱动呢?因为官方的驱动封装的太差了,社区的驱动支持列名映射,同时项目里官方驱动和社区驱动可以共存)。
    同时快速批量插入均支持异步同步,这里仅演示同步,异步的实现基本一样。

    3.sqlserver快速批量插入

    sqlserver官方提供的批量插入方式是SqlBulkCopy,参数为一个dataTable对象,原生的批量插入代码如下,采用StopWatch类进行计时,测试前都会用DELETE from NullableTable 语句清空表,测试里循环跑5次,获取总时间后除以5获取平均值,合计插入10w条数据。

    var sw = new Stopwatch();
    sw.Start();
    for (int i = 0; i < 5; i++)
    {
      using (var dbConnection = new SqlConnection(connectionString))
      {
          dbConnection.Open();
    
          SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(dbConnection, SqlBulkCopyOptions.KeepIdentity,
              null);
          sqlBulkCopy.BatchSize = 20000;
          sqlBulkCopy.DestinationTableName = "NullableTable";
          //针对列名做一下映射
          sqlBulkCopy.ColumnMappings.Add("Int2", "Int2");
          sqlBulkCopy.ColumnMappings.Add("Bool2", "Bool2");
          sqlBulkCopy.ColumnMappings.Add("Byte2", "Byte2");
          sqlBulkCopy.ColumnMappings.Add("DateTime2", "DateTime2");
          sqlBulkCopy.ColumnMappings.Add("Decimal2", "Decimal2");
          sqlBulkCopy.ColumnMappings.Add("Decimal3", "Decimal3");
          sqlBulkCopy.ColumnMappings.Add("Double2", "Double2");
          sqlBulkCopy.ColumnMappings.Add("Float2", "Float2");
          sqlBulkCopy.ColumnMappings.Add("Guid2", "Guid2");
          sqlBulkCopy.ColumnMappings.Add("Short2", "Short2");
          sqlBulkCopy.ColumnMappings.Add("TimeSpan2", "TimeSpan2");
    
          sqlBulkCopy.ColumnMappings.Add("String2", "String2");
          sqlBulkCopy.ColumnMappings.Add("String3", "String3");
          sqlBulkCopy.ColumnMappings.Add("Long2", "Long2");
          sqlBulkCopy.ColumnMappings.Add("Enum2", "Enum2");
          sqlBulkCopy.ColumnMappings.Add("Int3", "TestInt3");
          //将实体类列表转换成dataTable
          var table = nullableTableList3.ToDataTable();
          sqlBulkCopy.WriteToServer(table);
      }
    
    }
    sw.Stop();            
    var totalTime= sw.ElapsedMilliseconds;
    var avgValue = totalTime / 5;
    
    折叠

    实验结果如下,sql server中:
    采用快速批量插入10w条数据,时间合计1858毫秒,平均插入2w条数据仅需371毫秒。
    采用insert into语句,循环插入10w条数据,时间合计457606毫秒,平均插入2w条数据需91521毫秒。

    4.实体类列表转dataTable的扩展方法

    这里有一个实体类列表转dataTable的扩展方法,采用的是表达式树+构建委托的方式,性能不错,大家可以参考,代码实现如下。

    public static ConcurrentDictionary<string, object> CacheDictionary = new ConcurrentDictionary<string, object>();
    /// 
    /// 构建一个object数据转换成一维数组数据的委托
    /// 
    /// 
    /// 
    /// 
    public static Func<T, object[]> BuildObjectGetValuesDelegate<T>(List propertyInfos) where T : class
    {
        var objParameter = Expression.Parameter(typeof(T), "model");
        var selectExpressions = propertyInfos.Select(it => BuildObjectGetValueExpression(objParameter, it));
        var arrayExpression = Expression.NewArrayInit(typeof(object), selectExpressions);
        var result = Expression.Lambdaobject[]>>(arrayExpression, objParameter).Compile();
        return result;
    }
    
    
    /// 
    /// 构建对象获取单个值得
    /// 
    /// 
    /// 
    /// 
    public static Expression BuildObjectGetValueExpression(ParameterExpression modelExpression, PropertyInfo propertyInfo)
    {
        var propertyExpression = Expression.Property(modelExpression, propertyInfo);
        var convertExpression = Expression.Convert(propertyExpression, typeof(object));
        return convertExpression;
    }
    
    public static DataTable ToDataTable<T>(this IEnumerable source, List propertyInfos = null,bool useColumnAttribute=false) where T : class
    {
        var table = new DataTable("template");
        if (propertyInfos == null || propertyInfos.Count == 0)
        {
            propertyInfos = typeof(T).GetProperties().Where(it => it.CanRead).ToList();
        }
        foreach (var propertyInfo in propertyInfos)
        {
            var columnName=useColumnAttribute?(propertyInfo.GetCustomAttribute()?.Name?? propertyInfo.Name) : propertyInfo.Name;
            table.Columns.Add(columnName, ChangeType(propertyInfo.PropertyType));
        }
    
        Funcobject[]> func;
        var key = typeof(T).FullName + propertyInfos.Select(it => it.Name).ToList().StringJoin();
        if (CacheDictionary.TryGetValue(key, out var cacheFunc))
        {
            func = (Funcobject[]>)cacheFunc;
        }
        else
        {
            func = BuildObjectGetValuesDelegate(propertyInfos);
            CacheDictionary.TryAdd(key, func);
        }
    
        foreach (var model in source)
        {
            var rowData = func(model);
            table.Rows.Add(rowData);
        }
    
        return table;
    }
    
    private static Type ChangeType(Type type)
    {
        if (type.IsNullable())
        {
            type = Nullable.GetUnderlyingType(type);
        }
    
        return type;
    }
    
    折叠

    5.oracle快速批量插入

    oracle官方提供的批量插入方式是ArrayBindCount,即数组批量插入,原生的批量插入代码如下,计时方式与sqlserver相同

    var total = 20000;
    var sw = new Stopwatch();
    sw.Start();
    for (int i = 0; i < 5; i++)
    {
        var connection = new OracleConnection(connectionString);
        connection.Open();
        int?[] Int2 = new int?[total];
        bool[] Bool2 = new bool[total];
        byte[] Byte2 = new byte[total];
        DateTime[] DateTime2 = new DateTime[total];
        decimal?[] Decimal2 = new decimal?[total];
        decimal[] Decimal3 = new decimal[total];
        double[] Double2 = new double[total];
        float[] Float2 = new float[total];
        Guid?[] Guid2 = new Guid?[total];
        short[] Short2 = new short[total];
        TimeSpan[] TimeSpan2 = new TimeSpan[total];
        string[] String2 = new string[total];
        string[] String3 = new string[total];
        long[] Long2 = new long[total];
        Enum2[] Enum2 = new Enum2[total];
    
        for (int j = 0; j < total; j++)
        {
            Int2[j] = 2;
            Bool2[j] = true;
            Byte2[j] = 1;
            DateTime2[j] = now;
            Decimal2[j] = 1m;
            Decimal3[j] = 1.1m;
            Double2[j] = 1.1;
            Float2[j] = (float) 1.1;
            Guid2[j] = Guid.NewGuid();
            Short2[j] = 1;
            TimeSpan2[j] = TimeSpan.FromHours(1);
            String2[j] = "sb";
            String3[j] = "sb";
            Long2[j] = 2;
            Enum2[j] = Model.Enum2.y;
        }
    
        var c = (int) Model.Enum2.y;
        OracleParameter pInt2 = new OracleParameter();
        pInt2.OracleDbType = OracleDbType.Int32;
        pInt2.Value = Int2;
    
        OracleParameter pBool2 = new OracleParameter();
        pBool2.OracleDbType = OracleDbType.Byte;
        pBool2.Value = Bool2;
    
        OracleParameter pByte2 = new OracleParameter();
        pByte2.OracleDbType = OracleDbType.Byte;
        pByte2.Value = Byte2;
    
        OracleParameter pDateTime2 = new OracleParameter();
        pDateTime2.OracleDbType = OracleDbType.TimeStamp;
        pDateTime2.Value = DateTime2;
    
        OracleParameter pDecimal2 = new OracleParameter();
        pDecimal2.OracleDbType = OracleDbType.Decimal;
        pDecimal2.Value = Decimal2;
    
        OracleParameter pDecimal3 = new OracleParameter();
        pDecimal3.OracleDbType = OracleDbType.Decimal;
        pDecimal3.Value = Decimal3;
    
        OracleParameter pDouble2 = new OracleParameter();
        pDouble2.OracleDbType = OracleDbType.Double;
        pDouble2.Value = Double2;
    
        OracleParameter pFloat2 = new OracleParameter();
        pFloat2.OracleDbType = OracleDbType.BinaryFloat;
        pFloat2.Value = Float2;
    
    
        OracleParameter pGuid2 = new OracleParameter();
        pGuid2.OracleDbType = OracleDbType.Raw;
        pGuid2.Value = Guid2;
    
        OracleParameter pShort2 = new OracleParameter();
        pShort2.OracleDbType = OracleDbType.Int16;
        pShort2.Value = Short2;
    
        OracleParameter pTimeSpan2 = new OracleParameter();
        pTimeSpan2.OracleDbType = OracleDbType.IntervalDS;
        pTimeSpan2.Value = TimeSpan2;
    
        OracleParameter pString2 = new OracleParameter();
        pString2.OracleDbType = OracleDbType.Varchar2;
        pString2.Value = String2;
    
        OracleParameter pString3 = new OracleParameter();
        pString3.OracleDbType = OracleDbType.Varchar2;
        pString3.Value = String3;
    
    
        OracleParameter pLong2 = new OracleParameter();
        pLong2.OracleDbType = OracleDbType.Long;
        pLong2.Value = Long2;
    
        OracleParameter pEnum2 = new OracleParameter();
        pEnum2.OracleDbType = OracleDbType.Byte;
        pEnum2.Value = Enum2;
        // create command and set properties
        OracleCommand cmd = connection.CreateCommand();
        cmd.CommandText =
            "INSERT INTO NULLABLETABLE (INT2, LONG2, FLOAT2, DOUBLE2, DECIMAL2, DECIMAL3, GUID2, SHORT2, DATETIME2, BOOL2, TIMESPAN2, BYTE2, STRING2, STRING3,ENUM2) VALUES(:1,:2,:3,:4,:5,:6,:7,:8,:9,:10,:11,:12,:13,:14,:15)";
        cmd.ArrayBindCount = total;
        cmd.Parameters.Add(pInt2);
        cmd.Parameters.Add(pLong2);
        cmd.Parameters.Add(pFloat2);
        cmd.Parameters.Add(pDouble2);
        cmd.Parameters.Add(pDecimal2);
        cmd.Parameters.Add(pDecimal3);
        cmd.Parameters.Add(pGuid2);
        cmd.Parameters.Add(pShort2);
        cmd.Parameters.Add(pDateTime2);
        cmd.Parameters.Add(pBool2);
        cmd.Parameters.Add(pTimeSpan2);
        cmd.Parameters.Add(pByte2);
        cmd.Parameters.Add(pString2);
        cmd.Parameters.Add(pString3);
        cmd.Parameters.Add(pEnum2);
        cmd.ExecuteNonQuery();
    }
    sw.Stop();
    
    var totalTime = sw.ElapsedMilliseconds;
    var avgValue = totalTime / 5;
    
    折叠

    实验结果如下,oracle中:
    采用快速批量插入10w条数据,时间合计2323毫秒,平均插入2w条数据仅需464毫秒。
    采用insert into语句,循环插入10w条数据,时间合计462837毫秒,平均插入2w条数据仅需92567毫秒。

    6.mysql快速批量插入

    mysql社区驱动MySqlConnector提供的批量插入方式是SqlBulkCopy,基于mysql自身的文件上传机制进行批量插入,参数为一个dataTable对象,原生的批量插入代码如下,计时方式与sqlserver相同,同时,mysql的连接字符串里要添加";AllowLoadLocalInfile=true",即连接字符串的形式应该是"Server= ;Database=;User ID=;Password=;AllowLoadLocalInfile=true",同时在mysql数据库上执行"set global local_infile=1"开启批量上传

    var sw = new Stopwatch();
    sw.Start();
    for (int j = 0; j < 5; j++)
    {
        using (var dbConnection = new MySqlConnection(connectionString))
        {
            dbConnection.Open();
    
            MySqlBulkCopy sqlBulkCopy = new MySqlBulkCopy(dbConnection, null);
            sqlBulkCopy.DestinationTableName = "NullableTable";
            var propertys = typeof(NullableTable).GetProperties()
                .Where(it => it.CanRead && it.GetCustomAttribute() == null).ToList();
    
            for (int i = 0; i < propertys.Count; i++)
            {
                var property = propertys[i];
                var columnName = property.GetCustomAttribute()?.Name ?? property.Name;
    
                if (property.PropertyType.GetUnderlyingType() == typeof(Guid))
                {
                    sqlBulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, "@tmp",
                        $"{columnName} =unhex(@tmp)"));
                }
                else
                {
                    sqlBulkCopy.ColumnMappings.Add(new MySqlBulkCopyColumnMapping(i, columnName));
                }
            }
    
            var table = nullableTableList3.ToDataTable();
    
            SbUtil.ReplaceDataTableColumnTypebyte[]>(table, guid1 => guid1.ToByteArray());
            var c = sqlBulkCopy.WriteToServer(table);
        }
    }
    
    sw.Stop();
    
    var totalTime = sw.ElapsedMilliseconds;
    var avgValue = totalTime / 5;
    
    折叠

    实验结果如下,mysql中:
    采用快速批量插入10w条数据,时间合计2350毫秒,平均插入2w条数据仅需470毫秒。
    采用insert into语句,循环插入10w条数据,时间合计414700毫秒,平均插入2w条数据需82940毫秒。

    在mysql中c#的guid对应的mysql字段类型为varbinary(16),所以table里的guid要转换为字节数组,否则插入数据库后,guid的值就会变成乱码,字节数组传递到mysql服务端后利用unhex函数进行解析,即可正常保存guid类型。 将table里guid的值转为字节数组的方法-SbUtil.ReplaceDataTableColumnType的代码实现如下:

    /// 
    /// 替换dataTable里的列类型
    /// 
    /// 
    public  static void ReplaceDataTableColumnType<OldType,NewType>(DataTable dt,Func replaceFunc)
    {
        var needUpdateColumnIndexList = new List<int>();
        var needUpdateColumnNameList = new List<string>();
        
        for (int i = 0; i < dt.Columns.Count; i++)
        {
            var column = dt.Columns[i];
            if (column.DataType.GetUnderlyingType() == typeof(OldType))
            {
                needUpdateColumnIndexList.Add(i);
                needUpdateColumnNameList.Add(column.ColumnName);
              
            }
        }
    
        if (needUpdateColumnIndexList.Count == 0)
        {
            return;
        }
    
        var nameMapping = new Dictionary<string, string>();
        for (int i = 0; i < needUpdateColumnIndexList.Count; i++)
        {
            var oldColumnName = needUpdateColumnNameList[i];
            var newColumnName = Guid.NewGuid().ToString("N");
            nameMapping.Add(newColumnName, oldColumnName);
          
            dt.Columns.Add(newColumnName, typeof(byte[])).SetOrdinal(needUpdateColumnIndexList[i]);
            for (int j = 0; j < dt.Rows.Count; j++)
            {
                var c = (dt.Rows[j][oldColumnName]);
                dt.Rows[j][newColumnName] = replaceFunc((OldType)(dt.Rows[j][oldColumnName]));
            }
            dt.Columns.Remove(oldColumnName);
        }
        
        for (int i = 0; i < dt.Columns.Count; i++)
        {
            var columnName = dt.Columns[i].ColumnName;
            if (nameMapping.ContainsKey(columnName))
            {
                dt.Columns[i].ColumnName = nameMapping[columnName];
            }
        }
    
    }
    
    折叠

    7.SummerBoot对各数据库快速批量插入的封装

    基于以上各种数据库对于快速批量插入的原生写法过于复杂难记,SummerBoot对其进行了封装,在声明式编程的理念下,封装后仅需3步即可快速批量插入,这里以sqlserver举例。

    7.1在StartUp.cs中添加summerBoot的服务支持

    services.AddSummerBoot();
    services.AddSummerBootRepository(it =>
    {
        it.DbConnectionType = typeof(SqlConnection);
        it.ConnectionString = connectionString;
    });
    

    7.2添加仓储接口

    [AutoRepository]
    public interface INullableTableRepository : IBaseRepository<NullableTable>
    {
        
    }
    

    7.3注入仓储接口后直接调用FastBatchInsert方法

    var sw = new Stopwatch();
    sw.Start();
    for (int i = 0; i < 5; i++)
    {
        nullableTableRepository.FastBatchInsert(nullableTableList3);
    }
    sw.Stop();
      
    var totalTime= sw.ElapsedMilliseconds;
    var avgValue = totalTime / 5;
    

    实验结果如下,sql server中:
    采用SummerBoot统一封装后快速批量插入10w条数据,时间合计3926(原生快速批量写法1858)毫秒,平均插入2w条数据仅需785(原生快速批量写法371)毫秒。从对比可以看出,经过SummerBoot封装后,快速批量插入所花费的时间有所增加,但是对于这么大数据量而言,这点多消耗的时间和节省的开发量对比,不值一提。

    写在最后

    SummerBoot是一款声明式编程框架,专注于”做什么”而不是”如何去做”,更多用法,可参考SummerBoot文档,也可以加入QQ群:799648362反馈建议。同时各位看官,如果你觉得这篇文章还不错的话,请帮忙一键三连哦(推荐+关注+github star)

  • 相关阅读:
    毅速丨3D打印随形水路模具日常如何保养
    实例解释遇到前端报错时如何排查问题
    西瓜书笔记
    GPT-4o:人工智能技术的新巅峰
    go-zero微服务实战系列(七、请求量这么高该如何优化)
    Django学习
    面试题:ElasticSearch是什么?应用场景是什么?
    Python-表白小程序练习
    什么是CAPL编程语言
    RabbitMQ: return机制
  • 原文地址:https://www.cnblogs.com/hezp/p/16519431.html