在MySQL中建立数据库school,在数据库中建立表student。SQL语句如下:
- create database school;
- use school;
- create table student(
- id int not null,
- name varchar(40),
- age int,
- grade int,
- primary key(id)
- );
请使用Flume实时捕捉MySQL数据库中的记录更新,一旦有新的记录生成,就捕获该记录并显示到控制台。可以使用如下SQL语句模拟MySQL数据库中的记录生成操作。
-
- insert into student(id,name,age,grade)value(1,'Xiaoming',23,98);
-
- insert into student(id,name,age,grade)value(2,'Zhangsan',24,96);
-
- insert into student(id,name,age,grade)value(3,'Lisi',24,93);
-
- insert into student(id,name,age,grade)value(4,'Wangwu',21,91);
-
- insert into student(id,name,age,grade)value(5,'Weiliu',21,91);
要求:
安装好flume-ng-sql-source-1.5.2.jar以及mysql-connector-java-8.0.23.jar
- #设置名称
- a1.sources=r1
- a1.sinks=k1
- a1.channels=c1
-
- #配置Source
- a1.sources.r1.type=org.keedio.flume.source.SQLSource
- a1.sources.r1.hibernate.connection.url=jdbc:mysql://localhost:3306/school
- a1.sources.r1.hibernate.connection.user=root
- a1.sources.r1.hibernate.connection.password=123456
- a1.sources.r1.hibernate.connection.autocommit=true
- a1.sources.r1.table=student
- a1.sources.r1.run.query.delay=5000
- a1.sources.r1.status.file.path=C:/software/apache-flume-1.9.0-bin
- a1.sources.r1.status.file.name=a1.status
-
- #配置Sink
- a1.sinks.k1.type=logger
-
- #配置channels
- a1.channels.c1.type=memory
-
- #绑定sink source到channels上
- a1.sources.r1.channels=c1
- a1.sinks.k1.channel=c1
新建一个cmd窗口,输入如下命令启动Flume:
> cd C:\apache-flume-1.9.0-bin
> .\bin\flume-ng agent --conf .\conf --conf-file .\conf\znconsole.conf --name a1 -property flume.root.logger=INFO,console
建表,插入数据
Flume接收到数据,可以看到我在插入数据时还把自己的名字发过去了