es服务器版本必须和logstash版本一致 7.9.2
在/usr/local/src/下新建logstash文件夹,解压
下载logstash后查看是否安装成功,在logstash的bin目录下输入指令:
./logstash -e 'input { stdin { } } output { stdout {} }'
进入logstash的bin目录下,新建文件夹mysql,在这个文件夹中存放配置信息
将mysql-connect-java.jar放入mysql中,版本要和mysql一致,用来进行数据库连接
创建mysql.conf,注意使用UTF-8字符集编码,否则报错
- input {
- jdbc {
- # mysql 数据库链接,shop为数据库名
- jdbc_connection_string => "jdbc:mysql://43.143.207.96:3306/db0616?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
- # 用户名和密码
- jdbc_user => "root"
- jdbc_password => "qcBY@2021"
-
- # 驱动(即是数据库连接驱动jar包的路径)
- jdbc_driver_library => "/usr/local/src/logstash/logstash-7.9.2/bin/mysql/mysql-connector-java-5.1.38.jar"
-
- # 驱动类名
- jdbc_driver_class => "com.mysql.jdbc.Driver"
- jdbc_paging_enabled => "true"
- jdbc_page_size => "50000"
- # 执行的sql 文件路径+名称
- statement_filepath => "/usr/local/src/logstash/logstash-7.9.2/bin/mysql/jdbc.sql"
-
- # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
- #schedule => "* * * * *"
-
- # 索引类型
- type => "cm"
- }
- }
-
- filter {
- json {
- source => "message"
- remove_field => ["message"]
- }
- }
-
- output {
- elasticsearch {
- # ES的IP地址及端口
- hosts => ["43.143.207.96:9200"]
- # 索引名称
- index => "myuser"
- # 需要关联的数据库中有有一个id字段,对应类型中的id
- document_id => "%{id}"
- }
- stdout {
- # JSON格式输出
- codec => json_lines
- }
- }
创建jdbc.sql,在文件中编写sql,这条sql会作为logstash执行sql进行同步
(sql语句不用写分号,写了会报错)
select * from student
./logstash -f mysql/mysql.conf
同步成功:
使用elasticsearch-head查看,其安装本地下载zip压缩包解压就可以