• Observability:使用 Elastic Agent 来收集定制的 TCP 日志


    自定义 TCP 日志包初始化一个侦听 TCP 套接字,该套接字收集接收到的任何 TCP 流量并将每一行作为文档发送到 Elasticsearch。 可以通过将 ingest pipeline 的名字添加到管道配置选项来添加自定义摄取管道,可以通过 API 或摄取节点管道 UI创建自定义摄取管道。

    前提条件

    在进行下面的练习之前,请先阅读我之前的文章 “Observability:使用 Elastic Agent 来进行 Uptime 监控” 来搭建自己的测试环境。我们按照那篇文章的配置来进行,直到我们添加 integration 那一步。

    添加 integration

    为了能够把 TCP 的日志包写入到 Elasticsearch 中,我们可以添加 Custom TCP Logs 集成:

     

    我们保存当前的配置:

    从上面的配置中,我们可以看出来,我们的 tcp-1 集成收集 localhost:9900 端口的 TCP 日志并传入到 Elasticsearch 中。 

    经过上面的配置后,我们可以使用如下的命令来检查当前 Ubuntu 机器上的 9900 端口的使用情况:

    sudo netstat -tulpn | grep LISTEN | grep 9900
    1. liuxg@liuxgu:~$ sudo netstat -tulpn | grep LISTEN | grep 9900
    2. [sudo] password for liuxg:
    3. tcp 0 0 127.0.0.1:9900 0.0.0.0:* LISTEN 10368/filebeat

    我们可以看到当前的 9900 是正在使用的。

    测试集成

    为了能够测试这个集成,我们在 Ubuntu 机器的一个目录中创建如下的一个 logs 文件。它是一个典型的 Syslog 日志:

    1. liuxg@liuxgu:~/data/customtcplogs$ pwd
    2. /home/liuxg/data/customtcplogs
    3. liuxg@liuxgu:~/data/customtcplogs$ ls
    4. logs
    5. liuxg@liuxgu:~/data/customtcplogs$ cat logs
    6. May 4 00:10:36 liuxg xpcproxy[69746]: libcoreservices: _dirhelper_userdir: 557: bootstrap_look_up returned (ipc/send) invalid destination port

    我们使用如下的命令来发送日志到 localhost:9900 的 TCP 端口中:

    head -n 1 logs | nc localhost 9900
    1. liuxg@liuxgu:~/data/customtcplogs$ pwd
    2. /home/liuxg/data/customtcplogs
    3. liuxg@liuxgu:~/data/customtcplogs$ head -n 1 logs | nc localhost 9900

    我们接下去 Kibana 查看一下:

     我们可以看到在过去 15分钟里有一个文档。点击详情:

     从上面的 message 字段中,我们可以看出来我们发送来的一个日志信息。它就是我们之前发送的。

    我们也可以通过如下的命令来发送信息:

     在上面,我们使用 telnet 来发送一个信息到 TCP 端口 9900。同样,我们再去 Kibana 进行查看:

     这次,我们可以看到有两条信息发送进来了。

    从上面显示的信息,我们看出来 message 字段还是一个非结构化的字段。它不便于我们对这个信息的分析。为此,我们希望使用 ingest pipeline 来对这个信息进行结构化。为了下面的步骤的进行,我们先使用如下的命令来删除已经写入的文档:

    1. POST .ds-logs-tcp.generic-*/_delete_by_query
    2. {
    3. "query": {
    4. "match": {
    5. "data_stream.dataset": "tcp.generic"
    6. }
    7. }
    8. }

    结构化输入的信息

    首先,我们使用 ingest pipeline API 来测试我们的 pipelines:

    1. POST _ingest/pipeline/_simulate
    2. {
    3. "pipeline": {
    4. "processors": [
    5. {
    6. "grok": {
    7. "field": "message",
    8. "patterns": [
    9. "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?:%{GREEDYDATA:syslog_message}"
    10. ]
    11. }
    12. }
    13. ]
    14. },
    15. "docs": [
    16. {
    17. "_source": {
    18. "message": "May 4 00:10:36 liuxg xpcproxy[69746]: libcoreservices: _dirhelper_userdir: 557: bootstrap_look_up returned (ipc/send) invalid destination port"
    19. }
    20. }
    21. ]
    22. }

    我们使用上面的测试命令,可以看到如下的结果:

    1. {
    2. "docs": [
    3. {
    4. "doc": {
    5. "_index": "_index",
    6. "_id": "_id",
    7. "_version": "-3",
    8. "_source": {
    9. "syslog_program": "xpcproxy[69746]",
    10. "message": "May 4 00:10:36 liuxg xpcproxy[69746]: libcoreservices: _dirhelper_userdir: 557: bootstrap_look_up returned (ipc/send) invalid destination port",
    11. "syslog_hostname": "liuxg",
    12. "syslog_message": " libcoreservices: _dirhelper_userdir: 557: bootstrap_look_up returned (ipc/send) invalid destination port",
    13. "syslog_timestamp": "May 4 00:10:36"
    14. },
    15. "_ingest": {
    16. "timestamp": "2022-09-21T09:23:14.824894Z"
    17. }
    18. }
    19. }
    20. ]
    21. }

    很显然,我们可以看到结构化的字段,比如 syslog_program, syslog_hostname 等。当然,我们还可以使用另外一个 processor 来删除 message 这个字段。如果你对 ingest pipeline 还不是很熟的话,请阅读我之前的文章 “Elastic:开发者上手指南” 中的 “Ingest pipeline” 章节。

    我们接下来创建一个叫做 structure_message 的 pipeline:

    1. PUT _ingest/pipeline/structure_message
    2. {
    3. "description": "This is used to structure messages",
    4. "processors": [
    5. {
    6. "grok": {
    7. "field": "message",
    8. "patterns": [
    9. "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:[%{POSINT:syslog_pid}])?:%{GREEDYDATA:syslog_message}"
    10. ]
    11. }
    12. }
    13. ]
    14. }

    注意:在我一开始做展示时,是使用 Apache 的一个日志来做练习的。在展示时,使用了像在文章 “Logstash:Logstash 入门教程 (二)” 中的方法来解析 Apache 日志。结果发现在解析的过程中,它生成了一个叫做 agent 的字段, 而这个字段在 Elastic Agent 的摄入中也创建了一个。也就造成了冲突。查看 Elastic Agent 的错误信息,你可以在 /opt/Elastic/Agent/data/elastic-agent-xxxxxx/logs/default 里进行查看。这个是针对 Linux 的安装。针对其它操作系统来说,这个路径会有所不同。

    我们接下来重新来编辑之前的 integration:

    这样,我们就更新成功了。

    接下来,我们再次使用上面的命令来写入一个文档: 

    head -n 1 logs| nc localhost 9900
    1. liuxg@liuxgu:~/data/customtcplogs$ pwd
    2. /home/liuxg/data/customtcplogs
    3. liuxg@liuxgu:~/data/customtcplogs$ head -n 1 logs | nc localhost 9900

    我们回到 Kibana 的界面:

     我们可以看到有一个文档被写入进来了。我们点击上面文档的详情:

     

    从上面,我们可以看出来,我们这次看到的数据是分析好的结构化的数据。这个便于我们对日志数据进行分析和统计。 

  • 相关阅读:
    17.4-8连接数据库&增删改查基本操作(血干JAVA系类)
    贺天下功夫酱酒闪耀亮相2023佛山秋色系列活动
    Avanci与现代汽车和起亚签署专利许可协议
    联盟链学习笔记-网络的创建
    雪花算法及微服务集群唯一ID解决方案
    代码规范:C++函数的高级特性
    数据可视化实战:如何给毛*易的歌曲做词云展示?
    adb调试系统app
    安装 ZooKeeper 并配置服务
    【SpringMVC】springmvc中的数据校验
  • 原文地址:https://blog.csdn.net/UbuntuTouch/article/details/126968858