• windows环境下安装logstash同步数据,注册系统服务


    windows环境下安装logstash同步数据,注册系统服务

    此方法适用于Windows环境,同一个配置文件配置多个管道,并且配置系统服务,防止程序被杀进程

    一、安装logstash

    (1)下载压缩包,解压后修改config文件夹中的logstash-sample.conf文件

    # Sample Logstash configuration for creating a simple
    # Beats -> Logstash -> Elasticsearch pipeline.
    
    input {
    # 日志列表
    jdbc {
        jdbc_driver_library => "D:\logstash-7.2.0\logstash-7.2.0\lib\ojdbc8-19.3.0.0.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521:orcl"
        jdbc_user => "admin"
        jdbc_password => "123456"
        # statement => "SELECT * FROM tb_hotel"
        statement_filepath => "D:\logstash-7.2.0\logstash-7.2.0\sql\admin_gatelog.sql"
        jdbc_paging_enabled => true
        jdbc_page_size => 10000
        jdbc_default_timezone => "Asia/Shanghai"
        schedule => "* * * * *"
        # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
        record_last_run => true
        # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
        use_column_value => true
        # 记录上一次追踪的结果值
    	last_run_metadata_path => "D:\logstash-7.2.0\logstash-7.2.0\sync\last_run_gatelog.yml"
    	# 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
    	tracking_column => "crtTime"
    	# tracking_column 对应字段的类型
    	tracking_column_type => "timestamp"
    	# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
    	clean_run => false
      }
    # 文件管理
    jdbc {
        jdbc_driver_library => "D:\logstash-7.2.0\logstash-7.2.0\lib\ojdbc8-19.3.0.0.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521:orcl"
        jdbc_user => "mbbb"
        jdbc_password => "123456"
        # statement => "SELECT * FROM tb_hotel"
        statement_filepath => "D:\logstash-7.2.0\logstash-7.2.0\sql\admin_fileManage.sql"
        jdbc_paging_enabled => true
        jdbc_page_size => 10000
        jdbc_default_timezone => "Asia/Shanghai"
        schedule => "* * * * *"
        # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
        record_last_run => true
        # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
        use_column_value => true
        # 记录上一次追踪的结果值
    	last_run_metadata_path => "D:\logstash-7.2.0\logstash-7.2.0\sync\last_run_file_manage.yml"
    	# 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
    	tracking_column => "create_time"
    	# tracking_column 对应字段的类型
    	tracking_column_type => "timestamp"
    	# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
    	clean_run => false
    }
    # 消息审计
    jdbc {
        jdbc_driver_library => "D:\logstash-7.2.0\logstash-7.2.0\lib\ojdbc8-19.3.0.0.jar"
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
        jdbc_connection_string => "jdbc:oracle:thin:@127.0.0.1:1521:orcl"
        jdbc_user => "admin"
        jdbc_password => "123456"
        # statement => "SELECT * FROM tb_hotel"
        statement_filepath => "D:\logstash-7.2.0\logstash-7.2.0\sql\admin_messageLog.sql"
        jdbc_paging_enabled => true
        jdbc_page_size => 10000
        jdbc_default_timezone => "Asia/Shanghai"
        schedule => "* * * * *"
        # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件
        record_last_run => true
        # 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
        use_column_value => true
        # 记录上一次追踪的结果值
    	last_run_metadata_path => "D:\logstash-7.2.0\logstash-7.2.0\sync\last_run_message_log.yml"
    	# 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间
    	tracking_column => "createtime"
    	# tracking_column 对应字段的类型
    	tracking_column_type => "timestamp"
    	# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录
    	clean_run => false
    }
    
      beats {
        port => 5044
      }
      stdin{
      }
    }
    
    
    filter {
            
     
    }
    
    output {
      elasticsearch {
        hosts => ["http://127.0.0.1:9201","http://127.0.0.1:9202","http://127.0.0.1:9203"]
        # index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        index => "%{es_index}_index"
        document_type => "doc"
        document_id => "%{id}"
        user => "elastic"
        password => "elastic"
      }
      stdout{
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110

    (2)在sql文件夹中创建admin_gatelog.sql文件

    select 'gate_log'                                 as es_index,  # 必须
           ROWNUM,   # 必须
           ID,   # 必须
           MENU,
           OPT,
           URI,
           to_char(CRT_TIME, 'YYYY-MM-DD HH24:MI:SS') as crtTime, # 必须  和tracking_column对应,任何时间字段都可
           CRT_USER                                   as crtUser,
           CRT_NAME                                   as crtName,
           CRT_HOST                                   as crtHost,
           IS_SUCCESS                                 as isSuccess,
           P_ID                                       as pId,
           OPT_INFO                                   as optInfo,
           ORG_NAME                                   as orgName,
           ORG_CODE                                   as orgCode,
           PATH_CODE                                  as pathCode,
           PATH_NAME                                  as pathName
    from TABLE
    where to_timestamp(to_char(CRT_TIME, 'YYYY-MM-DD HH24:MI:SS'), 'YYYY-MM-DD HH24:MI:SS') > :sql_last_value
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    (3)在symc文件夹中创建 某某.yml文件用来记录数据中最后同步的日期

    (4)修改jvm.options的

    -Xms5g
    -Xmx5g

    (5)查看lib 、sql、sync文件夹的配置(没有这些文件夹的请创建),其中:

    lib里面放着oracle数据库连接的驱动。(mysql也可以)

    sql里面放着配置的要执行的sql文件。

    sync里面放着每条sql查询数据全部同步完成后的最新数据的的日期(指定字段的最新日期)

    (6)进入到bin目录启动logstash

    D:\logstash-7.2.0\logstash-7.2.0\bin>logstash -f D:\logstash-7.2.0\logstash-7.2.0\config\logstash-sample.conf

    (7)windows环境下配置系统服务

    详细配置请查看附属文档nssm将logstash注册系统服务.pdf

    二、附加

    影响插入性能的主要因素如下:

    一. 服务端

    1、磁盘写入速度:越快越好,最好是SSD以上。
    2、CPU运算性能:越快越好,多核心可能有益处。
    3、内存用量:内存用量大,分配更大容量较好。
    4、分片数:与节点数相关,默认5个分片较好。尽量不要让主分片分布在低速节点上。若集群中节点是高低性能搭配的情况,则只分配1个主分片到高速节点上较好,低速节点可用于添加副本。初始导入大量数据前设置索引的副本数为0,导入完成后再开启期望副本数,副本的存在对导入效率的影响成倍提高。
    二. 客户端

    客户端提交数据采用bulk方式效率高,transport-client略快于rest-client。异步提交比同步提交更能压榨服务器性能。综合比较建议采用bulk+rest+async组合。
    bulk批次大小,一般单笔1000条以上或传输数据量在5M-15M左右。实测当中,主要观察服务器处理能力,若服务器处理时间较长导致异步提交响应超时,则要降低单笔传送数据量。

    关于分片和副本
    1、若索引的分片数(shards)大于1,则索引数据将自动分布在多个分片上,若每个分片分布在不同的实体节点上,则大批量提交数据时可将写入操作分布到不同节点并发进行,从而会提高写入速度。分片数这个参数在创建索引时就固化下来,以后难以更改。
    2、副本(replica)是分片的数据备份,同时也提供查询功能。若设置副本数为1,则每个分片都会有1个副本。大批量提交数据时,副本的存在会使得写入数据时间加倍。所幸副本数这个参数可以随时调整,在大批量导入操作开始时将其设置为0,待导入数据完成再修改为1即可,ES引擎会自动处理副本数据。

    三、解决elasticsearch限制:

    logstash.outputs.elasticsearch] retrying failed action with response code: 429 ({"type"=>"circuit_breaking_exception", "reason"=>"[parent] Data too large, data for [] would be [255226018/243.4mb], which is larger than the limit of [246546432/235.1mb], real usage: [255206808/243.3mb], new bytes reserved: [19210/18.7kb]", "bytes_wanted"=>255226018, "bytes_limit"=>246546432, "durability"=>"PERMANENT"})
    
    • 1

    添加或修改相关配置项:在 elasticsearch.yml 文件中,你可以添加或修改与数据大小限制有关的配置项。以下是一些常用的配置项:

    • indices.breaker.total.limit:这个配置项用于设置整个索引的数据大小限制。你可以将其设置为你希望的限制值。例如,将限制设置为 1GB:

      indices.breaker.total.limit: 1gb
      
      • 1
    • indices.breaker.total.use_real_memory:默认情况下,Elasticsearch 使用 Java 运行时环境的堆内存大小来计算数据大小限制。你可以将此配置项设置为 false,以使用 Elasticsearch 自己的内存计算,这有助于更精确地控制限制。例如:

      indices.breaker.total.use_real_memory: false
      
      • 1

    四、修改Logstash的配置

    在这里插入图片描述
    一般pipeline.workers 不做更改,直接使用机器的最大线程。
    如果有需要请修改pipeline.batch.size: ##
    pipeline.batch.delay:##

  • 相关阅读:
    MATLAB 不同的surface图需要一个统一的colorbar
    Linux系统硬盘读写慢,如何排查
    腾讯地图开发填坑总结
    PostgreSQL sequence cache 参数导致的数值序列不连续 有间隔 gap
    蓝桥杯嵌入式STM32G431RBT6竞赛指南与模板——最后的绝唱
    linux命令—awk(二)
    Springboot毕设项目基于Java的Cisco网络安全设备采购平台wl7jy(java+VUE+Mybatis+Maven+Mysql)
    使用VC++实现分段线性变换,直方图均衡化、锐化处理(使用拉普拉斯算子)
    Django之同时新增数据到两个数据库表与同时返回两个表的数据(插拔式)
    阿里云轻量应用服务器有月流量限制吗?
  • 原文地址:https://blog.csdn.net/qq_41978323/article/details/133132937