• 此框架是SQL Server增量订阅,用来监听增删改数据库数据变更


    目前仅支持SQL Server,后续会支持MySQL和Oracle,Nuget上可以下载安装

    或者使用Nuget命令添加包

    dotnet add package Kogel.Subscribe.Mssql --version 0.0.0.1

     可以用来处理DB主从同步,跨库同步,数据备份,同步ES,缓存刷新等等

    (一)定义需要监听表的实体类 

     /// 
        /// 
        /// 
        [Display(Rename = "t_oms_order_detail")]
        [ElasticsearchType(RelationName = "t_oms_order_detail", IdProperty = "Id")]
        public class OmsOrderDetail : IBaseEntity
        {
            /// 
            /// 
            /// 
            [Identity]
            [Display(Rename = "id")]
            [Nest.PropertyName("id")]
            public override int Id { get; set; }
    
            /// 
            /// 
            /// 
            [Display(Rename = "name")]
            [Nest.PropertyName("name")]
            public string Name { get; set; }
    
            /// 
            /// 
            /// 
            [Display(Rename = "trade_id")]
            [Nest.PropertyName("trade_id")]
            public int? TradeId { get; set; }
    
            /// 
            /// 
            /// 
            [Display(Rename = "descption")]
            [Nest.PropertyName("descption")]
            public string Descption { get; set; }
    
            /// 
            /// 
            /// 
            [Display(Rename = "create_time")]
            [Nest.PropertyName("create_time")]
            public DateTime CreateTime { get; set; }
        }

    [Display]和[Identity]属于Kogel.Dapper.Extension的特性如果[想了解更多请点击],[ElasticsearchType]和[Nest.PropertyName]属于Elasticsearch特性,如果没用到可以忽略

    (二)定义表订阅

        /// 
        /// 定义表订阅
        /// 
        public class OmsOrderDetailSubscribe : Subscribe
        {
            /// 
            /// 设置连接配置
            /// 
            /// 
            public override void OnConfiguring(OptionsBuilder builder)
            {
                //此连接字符串账号需要有管理员权限
                builder.BuildConnection("数据库连接字符串");
            }
        }

    如果需要此表对应多张分表可以设置

    //配置所有表分片
    builder.BuildShards(new List
                {
                    "t_oms_order_detail_1",
                    "t_oms_order_detail_2",
                    "t_oms_order_detail_3"
                })

    (1).如果想推送订阅到RabbitMQ中

    builder.BuilderRabbitMQ(new RabbitMQ.Client.ConnectionFactory
                {
                    HostName = "localhost",
                    UserName = "guest",
                    Password = "guest"
                })

    可以通过BuildTopic设置交换机名称

    builder.BuildTopic("kogel_subscribe_order_detail")

    (2).如果想推送订阅到Kafka中

    builder.BuildKafka(new ProducerConfig
                {
                    BootstrapServers = "localhost:9092",
                    Acks = Acks.None
                })

    可以通过BuildTopic设置Topic名称

    builder.BuildTopic("kogel_subscribe_order_detail")

    (3).如果想推送订阅到Elasticsearch中

     builder.BuildElasticsearch(new ElasticsearchConfig
                {
                    Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/")),
                })

    如果有设置Basic授权

    builder.BuildElasticsearch(new ElasticsearchConfig
                {
                    Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
                        .BasicAuthentication("账号","密码")
                })

    如果想根据自己定义的分片逻辑插入到多个ES索引中可以通过WriteInterceptor

    /// 
            /// 设置连接配置
            /// 
            /// 
            public override void OnConfiguring(OptionsBuilder builder)
            {
                //此连接字符串账号需要有管理员权限
                builder.BuildConnection("数据库连接字符串");
                //定义推送ES
                builder.BuildElasticsearch(new ElasticsearchConfig
                {
                    Settings = new Nest.ConnectionSettings(new Uri("http://localhost:9200/"))
                        .BasicAuthentication("账号", "密码"),
                    WriteInterceptor = message => WriteInterceptor(message)
                });
            }
    
            /// 
            /// 定义自己的索引逻辑
            /// 
            /// 
            /// 
            private EsSubscribeMessage WriteInterceptor(SubscribeMessage message)
            {
                string esIndexName;
                //这里写自己索引分片的业务逻辑
                if (message.Result.Id % 3 == 0)
                {
                    esIndexName = $"kogel_orders_2";
                }
                else
                {
                    esIndexName = $"kogel_orders_1";
                }
                return message.ToEsSubscribeMessage(esIndexName);
            }

    并且ES索引不存在的时候会动态创建

    (4).如果想自定义实现订阅逻辑,在可以Subscribe订阅类中重写

    /// 
            /// 订阅变更 (每一次sql的执行会触发一次Subscribe)
            /// 
            /// 消息列表表示所有影响到的数据变更(会受BuildLimit限制,没有查询完成的会在下一次查出)
            public override void Subscribes(List> messageList)
            {
                foreach (var message in messageList)
                {
                    Console.WriteLine($"执行动作:{message.Operation},更新的表:{message.TableName},更新的id:{message.Result.GetId()}");
                }
            }

    以上订阅的优先级:

    (三)订阅启动

    启动监听所有继承自Subscribe的类,在应用程序启动时执行即可

    ApplicationProgram.Run();

    启动前需要确保DB已经开启了SQL Server Agent

    windows环境可以通过cmd命令开启

    net start SQLSERVERAGENT

    linux或docker环境可以通过以下命令开启

    /opt/mssql/bin/mssql-conf set sqlagent.enabled true

    如果是基础BaseSubscribe中间基类需要定义成abstract,例如

      /// 
        /// 基础配置类需要定义成abstract
        /// 
        /// 
        public abstract class BaseSubscribe : Subscribe
            where T : class, IBaseEntity
        {
        }

    关闭监听,在应用程序退出时执行即可

    ApplicationProgram.Close();

    (四)其他配置

    builder.BuildCdcConfig(new CdcConfig
                {
                    //扫描间隔(每次扫描变更表的间隔,单位毫秒) 默认10000毫秒/10秒
                    ScanInterval = 10000,
    
                    //变更捕捉文件在DB保存的时间(默认三天)
                    Retention = 60 * 24 * 3,
    
                    //是否首次扫描表全部数据再监听变更(默认false)
                    IsFirstScanFull = false,
    
                    //每次检索的变更量(默认10条)
                    Limit = 10,
    
                    //变更扫描的偏移量位置(默认从最后中止处开始)
                    OffsetPosition = OffsetPositionEnum.Abort
                })
  • 相关阅读:
    RocketMQ的主要组件及其功能
    我给 Apache 顶级项目提了个 Bug
    用DIV+CSS技术设计的凤阳旅游网站(web前端网页制作课作业)HTML+CSS+JavaScript
    uniapp(uncloud) 使用生态开发接口详情2(使用 schema创建数据, schema2code创建页面, iconfont 引入项目)
    Prometheus+Grafana实现监控报警
    fastTEXT入门自然语言处理NLP
    MySQL BufferPool缓存与Redo日志是如何提升事务性能的
    返回二叉树中最大的二叉搜索子树的大小
    Linux简单安装ffmpeg 实现用PHP压缩音频
    离线数仓同步数据2
  • 原文地址:https://blog.csdn.net/sinat_40572875/article/details/128128913