Elastic 的 beat 家族有很多著名的产品比如 filebeat、packetbeat 等,都支持扩展开发,项目使用 go 语言,这里演示开发环境的搭建。
设置下载源:
go env -w GO111MODULE=on
go env -w GOPROXY=https://goproxy.io,direct
安装go,设置好gopath和goroot,go的版本最好和 mod 文件一致。
安装 winpcap,底层的网卡抓包支持:
https://www.winpcap.org/install/bin/WinPcap_4_1_3.exe
安装 elasticsearch 和 kibana,这里使用 packetbeat-8.2,那么对应的 es 和 kibana 也使用 8.2 的版本。
【1】开发环境搭建
拉取源代码:
GitHub - elastic/beats: Beats - Lightweight shippers for Elasticsearch & Logstash
所有的 beat 项目都在这里面,例如常用的 filebeat、packetbeat 等。
packetbeat整合在beats项目中,其中还包括topbeat以及filebeat,现简要介绍beats源码框架内容如下:
* /libbeat:公共依赖库
* /filebeat:logstash升级版,处理日志类型数据
* /packbeat:网络抓包
* /topbeat:监控系统性能;
* /vendor:依赖的第三方库(如dns开源库或者其他协议栈)
* /tests:用于测试的pcamp抓包文件
* /scripts:测试脚本
packetbeat项目源码:
注意 gopath 下面的mod文件标注有需要的go版本:
然后编译:
会进行相关依赖的下载:
注意分支的选择,这里选择 8.2 进行开发测试。
开发教程
/packetbeat/main.Go: 启动入口,里面没有什么逻辑;
/packetbeat/beat/: 里面就一个packetbeat.go文件,packetbeat主程序,处理配置和命令行参数,协议需要在这里进行注册;
/packetbeat/config/: 里面就一个config.go文件,定义了所有的配置相关的struct结构体,新协议需要在这里定义其配置的结构体;
/packetbeat/debian/: debian打包相关;
/packetbeat/decoder/: 解码类,网络传输层包的解码;
/packetbeat/docs/: 项目的相关文档;
/packetbeat/etc/: 示例配置文件;
/packetbeat/procs/: 获取系统内核运作状态与进程信息的工具类;
/packetbeat/protos/:自定义协议类,每个目录对应一个应用协议,我们需要在此新增我们的协议,如SMTP;
/packetbeat/sniffer/: 三种不同抓包方式的实现:pcap、af_packet、pf_ring,关于这三者的区别,请参照文档: Traffic Capturing
/packetbeat/tests/: 测试相关的文件,里面有每一个协议的pcab抓包样板,还有一堆Python测试脚本;
开发原理:
每一个协议都有一个或者多个固定的端口用于通信,开发者要做的事情就是定义协议端口,然后按照TCP以及UDP实现对应的接口,Packetbeat会捕获到指定端口的数据包,然后交给开发者定义的方法来解析,如TCP对应的是Parse,UDP是ParseUdp.解析出来的结构化数据封装成Json,插入到Elasticsearch中,后续便可使用Elasticsearch的搜索和数据统计能力进行应用层数据分析。
TcpProtocolPlugin:TCP协议插件的接口定义,依次是:Parse() 解析Packet,ReceivedFin()处理TCP断开连接,GapInStream()处理空包丢包,ConnectionTimeout()超时时间;
UdpProtocolPlugin: UDP协议的接口定义,UDP协议是不需要握手和保障数据可靠性的,扔出去就结束,速度快,不保证数据可靠送达,所以只有ParseUdp一个方法需要实现,比较简单;
ProtocolPlugin:TCP和UDP都需要实现ProtocolPlugin的基础接口,其实就定义了获取端口和初始化接口。
packetbeat/protos/registry.go 里面定义了 tcp 和 udp 抓包接口、用户只需要实现改接口,抓包的数据会自动传进接口里:
//tcp抓包回调接口
type TCPPlugin interface {
Plugin
//抓包的消息
Parse(pkt *Packet, tcptuple *common.TCPTuple,dir uint8, private ProtocolData) ProtocolData
//处理链接断开
ReceivedFin(tcptuple *common.TCPTuple, dir uint8,private ProtocolData) ProtocolData
//处理丢包空包
GapInStream(tcptuple *common.TCPTuple, dir uint8, nbytes int,private ProtocolData) (priv ProtocolData, drop bool)
//处理超时
ConnectionTimeout() time.Duration
}
//udp抓包回调接口
type UDPPlugin interface {
Plugin
//抓包的消息
ParseUDP(pkt *Packet)
}
数据包 Packet 结构:
消息数据的结构:
消息结构体: