github 链接:GitHub - go-mysql-org/go-mysql-elasticsearch: Sync MySQL data into elasticsearch
无需三方工具直接监听 mysql binlog 即可同步数据到 es
所占内存小 cpu 水位不会飙升(相对使用logstash)
服务器配置 2 核 4g 代码,mysql和es在同一台服务器上
开启时间 11:00 后
测试数据1万
- Git clone https://github.com/go-mysql-org/go-mysql-elasticsearch.git
- [root@hecs-202871 go-mysql-elasticsearch]# pwd
- /home/golang/gopath/go-mysql-elasticsearch
- #安装
- go install
- #编译
- make
- [root@hecs-202871 go-mysql-elasticsearch]# ls
- bin cmd elastic go.mod LICENSE README.md var
- clear_vendor.sh Dockerfile etc go.sum Makefile river
- #进行配置,配置比较重要,在下边将详细去将
- vim /home/golang/gopath/go-mysql-elasticsearch/etc/river.toml
- #启动
- ./bin/go-mysql-elasticsearch -config=./etc/river.toml
- # MySQL 的相关配置
- # 指定用户必须具备复制权限
- my_addr = "127.0.0.1:3306"
- my_user = "canal"
- my_pass = "123456"
- my_charset = "utf8"
-
- # ES 相关配置
- es_addr = "127.0.0.1:9200"
- es_user = ""
- es_pass = ""
-
- # Inner Http status address 不加会报错
- stat_addr = "127.0.0.1:12800"
- stat_path = "/metrics"
-
- # 数据源配置
- # 以 Slave 模式工作
- server_id = 10001
- # mysql/mariadb
- flavor = "mysql"
-
- # mysqldump 路径,如果为空或者未设置,会跳过这一环节。
- mysqldump = "mysqldump"
- bulk_size = 128
- flush_bulk_time = "200ms"
- skip_no_pk_table = false
-
- [[source]]
- # 数据库名称
- schema = "canal"
- # 数据表同步范围,支持通配符
- tables = ["uuid"]
-
- # 规则定义
- [[rule]]
- # 数据库名称
- schema = "canal"
- # 规则对应的数据表,支持通配符
- table = "uuid"
- # 目标 ES 索引
- index = "uuid"
- # 该规则在 ES 中生成的文档类型
- type = "_doc"
# 规则对应的数据表,支持通配符 table = "uuid*"
- # MySQL address, user and password
- # user must have replication privilege in MySQL.
- my_addr = "127.0.0.1:3306"
- my_user = "root"
- my_pass = ""
- my_charset = "utf8"
-
- # Set true when elasticsearch use https
- #es_https = false
- # Elasticsearch address
- es_addr = "127.0.0.1:9200"
- # Elasticsearch user and password, maybe set by shield, nginx, or x-pack
- es_user = ""
- es_pass = ""
-
- # Path to store data, like master.info, if not set or empty,
- # we must use this to support breakpoint resume syncing.
- # TODO: support other storage, like etcd.
- data_dir = "./var"
-
- # Inner Http status address
- stat_addr = "127.0.0.1:12800"
- stat_path = "/metrics"
-
- # pseudo server id like a slave
- server_id = 1001
-
- # mysql or mariadb
- flavor = "mysql"
-
- # mysqldump execution path
- # if not set or empty, ignore mysqldump.
- mysqldump = "mysqldump"
-
- # if we have no privilege to use mysqldump with --master-data,
- # we must skip it.
- #skip_master_data = false
-
- # minimal items to be inserted in one bulk
- bulk_size = 128
-
- # force flush the pending requests if we don't have enough items >= bulk_size
- flush_bulk_time = "200ms"
- # Ignore table without primary key
- skip_no_pk_table = false
- # MySQL data source
- [[source]]
- schema = "test"
- # Only below tables will be synced into Elasticsearch.
- # "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
- # I don't think it is necessary to sync all tables in a database.
- tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]
-
- # Below is for special rule mapping
-
- # Very simple example
- #
- # desc t;
- # +-------+--------------+------+-----+---------+-------+
- # | Field | Type | Null | Key | Default | Extra |
- # +-------+--------------+------+-----+---------+-------+
- # | id | int(11) | NO | PRI | NULL | |
- # | name | varchar(256) | YES | | NULL | |
- # +-------+--------------+------+-----+---------+-------+
- #
- # The table `t` will be synced to ES index `test` and type `t`.
- [[rule]]
- schema = "test"
- table = "t"
- index = "test"
- type = "t"
-
- # Wildcard table rule, the wildcard table must be in source tables
- # All tables which match the wildcard format will be synced to ES index `test` and type `t`.
- # In this example, all tables must have same schema with above table `t`;
- [[rule]]
- schema = "test"
- table = "t_[0-9]{4}"
- index = "test"
- type = "t"
-
- # Simple field rule
- #
- # desc tfield;
- # +----------+--------------+------+-----+---------+-------+
- # | Field | Type | Null | Key | Default | Extra |
- # +----------+--------------+------+-----+---------+-------+
- # | id | int(11) | NO | PRI | NULL | |
- # | tags | varchar(256) | YES | | NULL | |
- # | keywords | varchar(256) | YES | | NULL | |
- # +----------+--------------+------+-----+---------+-------+
- #
- [[rule]]
- schema = "test"
- table = "tfield"
- index = "test"
- type = "tfield"
-
- [rule.field]
- # Map column `id` to ES field `es_id`
- id="es_id"
- # Map column `tags` to ES field `es_tags` with array type
- tags="es_tags,list"
- # Map column `keywords` to ES with array type
- keywords=",list"
-
- # Filter rule
- #
- # desc tfilter;
- # +-------+--------------+------+-----+---------+-------+
- # | Field | Type | Null | Key | Default | Extra |
- # +-------+--------------+------+-----+---------+-------+
- # | id | int(11) | NO | PRI | NULL | |
- # | c1 | int(11) | YES | | 0 | |
- # | c2 | int(11) | YES | | 0 | |
- # | name | varchar(256) | YES | | NULL | |
- # +-------+--------------+------+-----+---------+-------+
- #
- [[rule]]
- schema = "test"
- table = "tfilter"
- index = "test"
- type = "tfilter"
-
- # Only sync following columns
- filter = ["id", "name"]
-
- # id rule
- #
- # desc tid_[0-9]{4};
- # +----------+--------------+------+-----+---------+-------+
- # | Field | Type | Null | Key | Default | Extra |
- # +----------+--------------+------+-----+---------+-------+
- # | id | int(11) | NO | PRI | NULL | |
- # | tag | varchar(256) | YES | | NULL | |
- # | desc | varchar(256) | YES | | NULL | |
- # +----------+--------------+------+-----+---------+-------+
- #
- [[rule]]
- schema = "test"
- table = "tid_[0-9]{4}"
- index = "test"
- type = "t"
- # The es doc's id will be `id`:`tag`
- # It is useful for merge muliple table into one type while theses tables have same PK
- id = ["id", "tag"]
待续