• Elasticsearch:运用 Python 实时通过 Logstash 写入日志到 Elasticsearch


    在我之前的文章,我详细地介绍了如何通过 Filebeat 来收集日志并写入到 Elasticsearch。你可以阅读我之前的文章:

    在今天的文章中,我将分享如何使用 Logstash 把日志文件发送到 Elasticsearch。使用 Logstash 的好处是它可以很方便地使用它丰富的过滤器对数据进行清洗以便更好地对数据进行分析。我们使用如下的架构:

     在今天的展示中,我将使用最新的 Elastic Stack 8.4.3 来进行展示。

    安装

    如果你还没有安装好自己的 Elasticsearch,Kibana 及 Logstash,你可以按照如下的文章来进行安装:

    首先,我们参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群” 来生成 truststore.p12 证书文件:

    1. $ pwd
    2. /Users/liuxg/test/elasticsearch-8.4.3/config/certs
    3. $ ls
    4. http.p12 http_ca.crt transport.p12
    5. $ keytool -import -file http_ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12
    6. Certificate was added to keystore
    7. $ ls
    8. http.p12 http_ca.crt transport.p12 truststore.p12

    在上面,我们生产的 truststore.p12 的密码为 password。

    我们针对 Logstash 配置如下的配置文件:

    logstash.conf

    1. input {
    2. udp {
    3. port => 5959
    4. codec => json {
    5. target => "[document]"
    6. }
    7. }
    8. }
    9. output {
    10. stdout {
    11. codec => rubydebug
    12. }
    13. elasticsearch {
    14. index => "logdb"
    15. hosts => ["https://192.168.0.3:9200"]
    16. user => "elastic"
    17. password => "6bTlJp388KkgJKWi+hQr"
    18. ssl_certificate_verification => true
    19. truststore => "/Users/liuxg/test/elasticsearch-8.4.3/config/certs/truststore.p12"
    20. truststore_password => "password"
    21. }
    22. }

    在上面,我们需要根据自己的 Elasticsearch 账号及密码进行修改。另外你也需要根据自己的证书位置进行相应的调整。 上面的 hosts 是我的本地 Elasticsearch 集群的访问地址。你需要根据自己的进行配置。在上面,我们使用 udp input 来收集日志,并传入到 Elasticsearch。在本示例中,我们忽略了 filter 部分,以简化问题的描述。我们可以把这个 logstash.conf 置于 Logstash 的安装根目录中。

    我们可以使用如下的命令来运行:

    Python 日志应用

    我们首先来安装一个叫做 python-logstash 的包:

     pip install python-logstash

    我们设计如下的 Python 应用来通过 Logstash 写入日志:

    app.py

    1. import logging
    2. import logstash
    3. import sys
    4. class Logging(object):
    5. def __init__(self, logger_name='python-logger',
    6. log_stash_host='localhost',
    7. log_stash_upd_port=5959
    8. ):
    9. self.logger_name = logger_name
    10. self.log_stash_host = log_stash_host
    11. self.log_stash_upd_port = log_stash_upd_port
    12. def get(self):
    13. logging.basicConfig(
    14. filename="logfile",
    15. filemode="a",
    16. format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",
    17. datefmt="%H:%M:%S",
    18. level=logging.INFO,
    19. )
    20. self.stderrLogger = logging.StreamHandler()
    21. logging.getLogger().addHandler(self.stderrLogger)
    22. self.logger = logging.getLogger(self.logger_name)
    23. self.logger.addHandler(logstash.LogstashHandler(self.log_stash_host,
    24. self.log_stash_upd_port,
    25. version=1))
    26. return self.logger
    27. instance = Logging(log_stash_upd_port=5959, log_stash_host='localhost', logger_name='soumil')
    28. logger = instance.get()
    29. count = 0
    30. from time import sleep
    31. while True:
    32. count = count + 1
    33. if count % 2 == 0:
    34. logger.error('Error Message Code Faield :{} '.format(count))
    35. else:
    36. logger.info('python-logstash: test logstash info message:{} '.format(count))

    我们在和 Logstash 运行的同一个机器上运行上面的应用。我们使用如下的方法来运行:

    python app.py

    我们在 Logstash 的 terminal 中可以看到:

    它表明 Logstash 运作正常。

    我们再到 Kibana 中打入如下的命令:

    GET _cat/indices

    从上面的输出中,我们可以看到新生成的 logdb 索引。

    我们可以对这个索引进行搜索:

    我们可以看到日志被正常地解析并可以被搜索。

    采用异步方式写入日志

    在很多的时候如果我们的日志是大量的,那么我们可以采取使用异步的方式来写如日志。这样的好处是应用不用等待数据完全写入后才继续向下执行。我们创建如下的 Logstash 的配置文件:

    logstash_async.conf

    1. input {
    2. tcp {
    3. port => 6000
    4. }
    5. }
    6. output {
    7. stdout {
    8. codec => rubydebug
    9. }
    10. elasticsearch {
    11. index => "logstash_async"
    12. hosts => ["https://192.168.0.3:9200"]
    13. user => "elastic"
    14. password => "6bTlJp388KkgJKWi+hQr"
    15. ssl_certificate_verification => true
    16. truststore => "/Users/liuxg/test/elasticsearch-8.4.3/config/certs/truststore.p12"
    17. truststore_password => "password"
    18. }
    19. }

    我们把这个文件拷贝到 Logstash 的安装根目录下,并进行如下的运行:

     

    我们接下来创建如下的 Python 应用:

    python-logstash-logging.py 

    1. import logging
    2. import time
    3. from logstash_async.handler import AsynchronousLogstashHandler
    4. host = 'localhost'
    5. port = 6000
    6. test_logger = logging.getLogger('python-logging-test')
    7. test_logger.setLevel(logging.DEBUG)
    8. async_handler = AsynchronousLogstashHandler(host, port, database_path = None)
    9. test_logger.addHandler(async_handler)
    10. while True:
    11. test_logger.info("this is an info message at %s", time.time())
    12. time.sleep(0.5)

    在上面,我们每隔 0.5 秒的时间写入一条日志到 TCP 端口 6000,进而通过 Logstash 写入到 Elasticsearch。我们可以通过如下的命令来运行这个 Python 应用:

    python python-logstash-logging.py 

    在 Logstash 运行的 terminal 中,我们可以看到如下的输出:

    在 Kibana 中,我们可以查看索引 logstash_async:

  • 相关阅读:
    小程序 表单当用户修改字段,点击返回检测用户是否有修改
    C# 对象存储
    【C++】类和对象(上)
    Spring Boot - 用JUnit 5构建完美的Spring Boot测试套件
    SpringBoot 学习(十)分布式理论
    linux中的inode文件编号和软硬链接
    [附源码]Python计算机毕业设计Django基于vue的软件谷公共信息平台
    【知识点】深入浅出STL标准模板库
    Pangolin安装报错解决
    Go语句与表达式深度解析:全案例手册
  • 原文地址:https://blog.csdn.net/UbuntuTouch/article/details/127294357