• 使用UDP协议实现简单的分布式日志服务, java和python


    使用UDP协议实现简单的分布式日志服务, java和python

    这几天系统出现问题, 需要查原因. 日志分散在各个服务器上, 查起来很要命.
    网上百度了好久, 最后发现, 各种日志的处理方式都略显纷繁复杂了.

    1. 有的是syslog
    2. 有的是用 influxdb 实现日志存储和查询.
    3. 收费的则还有阿里云的SLS https://www.aliyun.com/product/sls/
    4. 有的是用工具链. ELK .其中 Elasticsearch 又是相当巨大的性能开销. 和各种安装配置. 为了一个简单的日志 , 实在是有点浪费资源.

    在我看来,日志最主要的作用还是在出错的时候查找错误信息, 跟踪和分析系统性能. 对于分布式系统来讲又增加了一个 “集中” 日志管理的功能需求.方便查日志. 系统7-8个, 查日志一个文件一个文件的翻很慢的. 对于日志信息, 实际上并无太高要求.当查错误的时候, 能够方便的查询到信息即可. 对于日志的丢失和实时,以及查询性能都无太高要求.

    如果要想基于日志系统进行系统业务逻辑层面得扩展, 那么我认为这种想法是不科学的. 毕竟日志是不严谨的. 而且日志的正则分析和匹配都是相当大的性能开销.

    所以日志就让它回归到日志本身.

    我的设计是, 各个分布式服务, 通过UDP协议(很简单易编程)发送到日志服务. . 日志服务器则直接保存到本地文件中.这个日志服务器只起到了收集日志的作用, 如果要想查询和检索, 则将日志文件通过ssh 复制到本地.然后用vscode进行搜索和查询也是非常方便的(现在谁的电脑上还没有个vscode?), 如果不用则留在服务器上不用管. 只是存在硬盘上. 也不会占用太多资源,

    日志服务端代码

    日志服务本身很简单. 几句话就可以实现.代码如下.
    用的是python语言. java语言的还没实现, 有精力的小伙伴可以实现以下. 实现后联系我.分享一下.

    import logging
    from logging import handlers 
    import asyncio
    import time
    
    # from socket import *
    
    import socket 
    
    
    class LogServer_UDP(object):
          def __init__(self, host, port):
                # PORT = 9000
                ADDR = (host, port) #地址与端口
                self.BUFSIZ = 4096 #接收数据缓冲大小
                self.UDPServeSock=socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建udp服务器套接字
                self.UDPServeSock.bind(ADDR) #套接字与地址绑定-服务端特有
      
          def run(self):
                while True:
                    try: 
                        while True: 
                            msg, addr = self.UDPServeSock.recvfrom(self.BUFSIZ) #接收客户端发来的字节数组,data.decode()='char',data.upper()='bytes'
                            logging.info(addr[0] + ":"+str(addr[1])+"=>" + msg.decode("utf8"))
                            pass
    
                    except Exception as e:
                        time.sleep(1)
                        print(e)
    
                self.UDPServeSock.close() #关闭服务端socket
    
    def namer(filename): 
          return filename
    
    def main_logger():
        # 日志集中处理区,在主程序中调用一次
        
        # handlers配置区,filter可选
        # formatter = logging.Formatter("%(name)s - %(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s")
        formatter = logging.Formatter("%(message)s")
        console = logging.StreamHandler()
        console.setLevel(logging.INFO)
    
          # info日志处理器
        # filename:日志文件名
        # when:日志文件按什么维度切分。'S'-秒;'M'-分钟;'H'-小时;'D'-天;'W'-周
        #       这里需要注意,如果选择 D-天,那么这个不是严格意义上的'天',而是从你
        #       项目启动开始,过了24小时,才会从新创建一个新的日志文件,
        #       如果项目重启,这个时间就会重置。所以这里选择'MIDNIGHT'-是指过了午夜
        #       12点,就会创建新的日志。
        # interval:是指等待多少个单位 when 的时间后,Logger会自动重建文件。
        # backupCount:是保留日志个数。默认的0是不会自动删除掉日志。
        logfile = handlers.TimedRotatingFileHandler(
            './log/udp.log',
            when='D',
            backupCount=10,
            encoding='utf-8')
        logfile.suffix =  "%Y_%m_%d.log"
        logfile.namer = namer
        logfile.setLevel(logging.INFO)
        logfile.setFormatter(formatter)  # add formatter to ch
        
    
        rootlog = logging.getLogger()
        rootlog.setLevel(level=logging.INFO)
        # rootlog.addHandler(log_file_handler)
        # rootlog.addHandler(errorlog_file_handler)
        rootlog.addHandler(console)
        rootlog.addHandler(logfile)
    
        # 设置监听的端口,并传递handlers
        # loggerListener = ZMQListener("tcp://127.0.0.1:6666",*(ch,console))
        
        # loggerListener.start()   # 开启一个子线程处理记录器监听
        # zq_listener = LogServer_UDP("127.0.0.1", 6666)
        udp_listener = LogServer_UDP("0.0.0.0", 11385)
        print("日志服务运行中,监听端口 11385")
        udp_listener.run()
        
        
    # 主进程调用一次,非阻塞
    main_logger()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    日志输出端 java语言

    用的是logback的扩展.UDPLogAppender
    在UDPLogAppender 中实现了数据上传到日志服务端的功能.

    package com.qcd.DDD;
    
    import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
    import ch.qos.logback.classic.spi.ILoggingEvent;
    import ch.qos.logback.core.UnsynchronizedAppenderBase;
    import lombok.extern.slf4j.Slf4j;
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    import java.net.DatagramPacket;
    import java.net.DatagramSocket;
    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.net.SocketException;
    import java.net.UnknownHostException;
    import java.nio.charset.Charset;
    import java.util.Scanner;
     
     
    @Slf4j
    public class UDPLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
     
        // Scanner input = new Scanner(System.in);
        DatagramSocket socket;
        DatagramPacket sendPacked;
    
        // DatagramSocket datagramSocket = new DatagramSocket(); // 创建DatagramSocket
    	// 	DatagramPacket datagramPacket = new DatagramPacket(str.getBytes(),
    	// 			str.getBytes().length, InetAddress.getByName("127.0.0.1"), 1111); // 创建DatagramPacket(要发送的数据,数据的长度,Ip地址,端口)
    	// 	datagramSocket.send(datagramPacket); // 发送
    	// 	datagramSocket.close(); // 关闭
    // ————————————————
    // 版权声明:本文为CSDN博主「梅开二度%」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    // 原文链接:https://blog.csdn.net/s990420/article/details/119335819
      
        @Override
        public void start() {
            super.start();
            try {
                this.socket = new DatagramSocket();            
                String hostaddr  = InetAddress.getByName(this.host).getHostAddress(); 
                
                sendPacked = new DatagramPacket(
                    new byte[2048], 2048,
                    new InetSocketAddress(hostaddr, this.port));
            } 
            catch (UnknownHostException e){
                e.printStackTrace();
            }
            catch (SocketException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
            // encoder.setContext(context);
            encoder.start();
            encoder.setCharset(Charset.forName("utf8"));
        }
    
        String host;
    
        public void setHost(String host) {
            this.host = host;
        }
    
        int port;
    
        public void setPort(int port) {
            this.port = port;
        }
    
          
        PatternLayoutEncoder  encoder;
    
        public  PatternLayoutEncoder  getEncoder() {
            return encoder;
        }
          
        public  void setEncoder(PatternLayoutEncoder encoder) {
            this.encoder = encoder;
        }
    
        @Override
        protected void append(ILoggingEvent eventObject)
        {  
            byte[] bytemsg = this.encoder.encode(eventObject);
    
            //必须先转成字符串, 再转成utf8编码发出去,
            // 注释原因, 后来在上面加了个 encoder.setCharset(Charset.forName("utf8"));
            //String msg =  new String(bytemsg); 
            //String msg =  eventObject.getFormattedMessage();  
    		try { 
                // sendPacked.setData(msg.getBytes("utf8"));
                sendPacked.setData(bytemsg);
                socket.send(sendPacked); // 发送
                // socket.close(); // 关闭
    		} catch (Exception e) {
    			e.printStackTrace();
    		} 
        }
     
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    logback-spring.xml 的配置

      
    <configuration>  
      
          
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">  
            <encoder>  
                <pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%npattern>
                
            encoder>
        appender>  
    
        <appender name="udp" class="com.qcd.DDD.UDPLogAppender">
            <encoder>  
                <pattern>%d{HH:mm:ss} %-5level %logger{12}:%-5line - %msg%npattern>
            encoder>
            <host>wuxi.ai.px82.comhost>
            <port>11385port>
        appender>
          
          
        <root level="info">   
            <appender-ref ref="udp"/>        
            <appender-ref ref="STDOUT"/>
             
        root>  
    configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    日志输出端, python语言,

    用的是logging库.
    LogConfig.py 文件内容如下

    import logging
    from logging.handlers import TimedRotatingFileHandler
    from logging.handlers import RotatingFileHandler
    import os 
    import colorlog
    import traceback
    import socket
    import Config 
    
    log_colors_config = {
        # 终端输出日志颜色配置
        'DEBUG': 'white',
        'INFO': 'cyan',
        'WARNING': 'yellow',
        'ERROR': 'red',
        'CRITICAL': 'bold_red',
    }
     
    
    class UDPHandler(logging.Handler):
         def __init__(self, host, port):
            super().__init__() 
            self.host = host
            self.port = port 
            self.ADDR = (host, port)
            # print(ADDR)
            self.UDPCliSock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #创建客户端套接字
      
         def emit(self, record):
             try:  
                 msg = self.format(record) 
                 #  self.zq.send_string(msg, flags=zmq.NOBLOCK)
                 self.UDPCliSock.sendto(bytes(msg,'utf8'), self.ADDR) 
             except Exception:
                traceback.print_exc() 
    
     
    # 滚动日志 
    def backrollLog():
        def namer(filename): 
          return filename
    
        LOG_FORMAT = "%(asctime)s %(levelname)-8s[%(name)s %(filename)s:%(lineno)d] %(message)s"
        formatter = logging.Formatter(LOG_FORMAT)
    
        color_format ='%(log_color)s%(asctime)s-%(name)s-%(filename)s:%(lineno)d-%(levelname)s-[msg]: %(message)s',
     
        color_formater = colorlog.ColoredFormatter(
                  "%(asctime)s %(log_color)s%(levelname)-8s%(reset)s [%(name)s %(filename)s :%(lineno)d] %(message_log_color)s%(message)s",
                  secondary_log_colors={
                    'message': {
                    'ERROR': 'red',
                    'CRITICAL': 'red'
                    }
                  })
    
    
     
    
        # 总开关
        rootlog = logging.getLogger()
        rootlog.setLevel(level=logging.INFO) 
      
        # pid = str(os.getpid())
        #控制台输出INFO级别的信息
        stream_handler = logging.StreamHandler()  # 日志控制台输出 
        stream_handler.setFormatter(formatter)
        stream_handler.setLevel(level=logging.INFO) 
        rootlog.addHandler(stream_handler)
     
    
        IP = socket.gethostbyname(Config.Seting.LogHost)
        udphandler = UDPHandler(IP, Config.Seting.LogPort)
        udphandler.setFormatter(color_formater)
        udphandler.setLevel(logging.INFO)
        rootlog.addHandler(udphandler)
     
        logging.error(" log error test ")
        logging.info(" log info test ")
        logging.warning(" log warning test ")
        logging.debug(" log debug test ")
    
    
    
    
    backrollLog()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    python 语言要求在主程序中import 一次 上面的代码. …
    正常使用

    logtest.py

    import Config
    import LogConfig
    import logging
    import time
    
    
    # LogConfig.backrollLog() , 不需要单独执行, import LogConfig的时候已经执行过
    
    while(True):
        logging.info("hahaha中文")
        time.sleep(1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • 相关阅读:
    【力扣】两数相加
    猿创征文|一文吃透JAVA初学者的开发工具
    Managing Supply and Demand Balance Through Machine Learning-笔记
    Linux的tomcat的shutdown.sh关闭不了进程程
    uni-app跳转到另一个app
    第十四章 报告
    docker centos date syn
    梦笔记2022-1122
    靠这份业界最强算法及数据结构宝典,我挺进字节跳动
    如何才能让用例自动运行完之后,生成一张直观可看易懂的测试报告呢?
  • 原文地址:https://blog.csdn.net/phker/article/details/127861969