• 分布式ELK+KAFKA日志采集 docker-compose


    在这里插入图片描述
    在这里插入图片描述

    一、安装docker-compose插件
    1. 下载docker-compose插件
    curl -L https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
    
    • 1
    2. 赋予权限
    chmod +x /usr/local/bin/docker-compose
    
    • 1
    二、搭建ELK+KAFKA环境

    内存建议4g及以上

    2.1. 编写docker-compose
    cd /app/
    mkdir mayiktelkkafka
    
    • 1
    • 2

    上传docker-compose.yml

    version: '2'
    services:
      zookeeper:
        image: wurstmeister/zookeeper
        ports:
          - "2181:2181"
      kafka:
        image: wurstmeister/kafka
        volumes:
          - /etc/localtime:/etc/localtime
        ports:
          - "9092:9092"
        environment:
           KAFKA_ADVERTISED_HOST_NAME: 192.168.122.128
           KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
           KAFKA_ADVERTISED_PORT: 9092
           KAFKA_LOG_RETENTION_HOURS: 120
           KAFKA_MESSAGE_MAX_BYTES: 10000000
           KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000
           KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000
           KAFKA_NUM_PARTITIONS: 3
           KAFKA_DELETE_RETENTION_MS: 1000
      kafka-manager:
        image: sheepkiller/kafka-manager
        environment:
          ZK_HOSTS: 192.168.122.128
        ports:
          - "9001:9001"
      elasticsearch:
        image: daocloud.io/library/elasticsearch:6.5.4
        restart: always
        container_name: elasticsearch
        ports:
          - "9200:9200"
      kibana:
        image: daocloud.io/library/kibana:6.5.4
        restart: always
        container_name: kibana
        ports:
          - "5601:5601"
        environment:
           - elasticsearch_url=http://192.168.122.128:9200
        depends_on:
          - elasticsearch
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    2.2. 启动docker-compose
    docker-compose up
    
    • 1

    在这里插入图片描述

    这个错误需要你检查一下命令后面是否有多余的空格,删除重新运行即可

    启动成功后的效果图
    在这里插入图片描述
    成功启动后有5个容器,如果容器个数不够根据容器ID查看日志,我使用的是虚拟机,启动后es容器启动失败,查查看日志
    异常信息+解决方案->跳转:max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]

    2.3. 验证效果

    访问zk:http://192.168.122.128:2181
    在这里插入图片描述
    访问es:http://192.168.122.128:9200
    在这里插入图片描述
    访问kibana:http://192.168.122.128:5601/app/kibana#/home?_g=()
    在这里插入图片描述

    2.4. 安装logstash

    提前安装jdk环境,logstash需要https://blog.csdn.net/weixin_40816738/article/details/108532702

    在这里插入图片描述
    上传或者下载logstash-6.4.3.tar.gz到服务器中

    wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz
    
    • 1

    解压

    tar -zxvf logstash-6.4.3.tar.gz
    
    • 1

    安装插件

    cd logstash-6.4.3
    bin/logstash-plugin install logstash-input-kafka
    bin/logstash-plugin install logstash-output-elasticsearch
    
    • 1
    • 2
    • 3

    在这里插入图片描述
    编写配置文件

    cd config
    vim elk-kafka.conf
    
    • 1
    • 2

    内容如下

    input {
       kafka {
         bootstrap_servers => "192.168.122.128:9092"
         topics => "mayikt-log"
     }
    }
    filter {
      #Only matched data are send to output.
    }
    
    output {
      elasticsearch {
        action => "index"  #The operation on ES
        hosts  => "192.168.122.128:9200" #Ellasticsearch host,can be array.
        index  => "mayikt_logs" #The index towrite data to.
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    启动logstash

    cd bin
    ./logstash -f ../config/elk-kafka.conf
    
    • 1
    • 2
    三、微信项目投递消息kafka
    3.1. 微信集成kafka
        <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    3.2. 配置kafka

    bootstrap.yml

    spring:
      kafka:
        bootstrap-servers: 192.168.122.128:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: default_consumer_group #群组ID
          enable-auto-commit: true
          auto-commit-interval: 1000
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    3.3. aop拦截
    package com.mayikt.api.impl.elk.log;
    
    import com.alibaba.fastjson.JSONObject;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.web.context.request.RequestContextHolder;
    import org.springframework.web.context.request.ServletRequestAttributes;
    
    import javax.servlet.http.HttpServletRequest;
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.Date;
    
    /**
     * 
     * elk+kafka采集
     */
    @Aspect
    @Component
    public class AopLogAspect {
        @Value("${server.port}")
        private String serverPort;
        @Autowired
        private LogContainer logContainer;
        // 申明一个切点 里面是 execution表达式
        @Pointcut("execution(* com.mayikt.api.impl.*.*.*(..))")
        private void serviceAspect() {
        }
    
    
    
        //
        // 请求method前打印内容
        @Before(value = "serviceAspect()")
        public void methodBefore(JoinPoint joinPoint) {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
            JSONObject jsonObject = new JSONObject();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
            jsonObject.put("request_time", df.format(new Date()));
            jsonObject.put("request_url", request.getRequestURL().toString());
            jsonObject.put("request_method", request.getMethod());
            jsonObject.put("signature", joinPoint.getSignature());
            jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs()));
            // IP地址信息
            jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
            JSONObject requestJsonObject = new JSONObject();
            requestJsonObject.put("request", jsonObject);
            jsonObject.put("request_time", df.format(new Date()));
            jsonObject.put("log_type", "info");
            // 将日志信息投递到kafka中
            String log = requestJsonObject.toJSONString();
            logContainer.putLog(log);
    
        }
    
        //
        // 在方法执行完结后打印返回内容
        @AfterReturning(returning = "o", pointcut = "serviceAspect()")
        public void methodAfterReturing(Object o) {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
            JSONObject respJSONObject = new JSONObject();
            JSONObject jsonObject = new JSONObject();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
            jsonObject.put("response_time", df.format(new Date()));
            jsonObject.put("response_content", JSONObject.toJSONString(o));
            // IP地址信息
            jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
            jsonObject.put("log_type", "info");
            respJSONObject.put("response", jsonObject);
            // 将日志信息投递到kafka中
    //      kafkaTemplate.send("mayikt-log",respJSONObject.toJSONString());
           logContainer.putLog(respJSONObject.toJSONString());
        }
    //
    //
        /**
         * 异常通知
         *
         * @param point
         */
        @AfterThrowing(pointcut = "serviceAspect()", throwing = "e")
        public void serviceAspect(JoinPoint point, Exception e) {
            ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
                    .getRequestAttributes();
            HttpServletRequest request = requestAttributes.getRequest();
            JSONObject jsonObject = new JSONObject();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式
            jsonObject.put("request_time", df.format(new Date()));
            jsonObject.put("request_url", request.getRequestURL().toString());
            jsonObject.put("request_method", request.getMethod());
            jsonObject.put("signature", point.getSignature());
            jsonObject.put("request_args", Arrays.toString(point.getArgs()));
            jsonObject.put("error", e.toString());
            // IP地址信息
            jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);
            jsonObject.put("log_type", "error");
            JSONObject requestJsonObject = new JSONObject();
            requestJsonObject.put("request", jsonObject);
            // 将日志信息投递到kafka中
            String log = requestJsonObject.toJSONString();
            logContainer.putLog(log);
        }
    //
        public static String getIpAddr(HttpServletRequest request) {
            //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。
            String ipAddress = request.getHeader("x-forwarded-for");
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("WL-Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getRemoteAddr();
                if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) {
                    //根据网卡取本机配置的IP
                    InetAddress inet = null;
                    try {
                        inet = InetAddress.getLocalHost();
                    } catch (UnknownHostException e) {
                        e.printStackTrace();
                    }
                    ipAddress = inet.getHostAddress();
                }
            }
            //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
            if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15
                if (ipAddress.indexOf(",") > 0) {
                    ipAddress = ipAddress.substring(0, ipAddress.indexOf(","));
                }
            }
            return ipAddress;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    3.4. 消息投递
    package com.mayikt.api.impl.elk.log;
    
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    import java.util.concurrent.LinkedBlockingDeque;
    
    @Component
    public class LogContainer {
        private LogThread logThread;
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        public LogContainer() {
            logThread = new LogThread();
            logThread.start();
        }
    
        private static LinkedBlockingDeque<String> logs = new LinkedBlockingDeque<>();
    
        /**
         * 存入一条日志消息到并发队列中
         *
         * @param log
         */
        public void putLog(String log) {
            logs.offer(log);
        }
    
        /**
         * 异步日志线程 实时从队列中获取内容
         */
        class LogThread extends Thread {
            @Override
            public void run() {
                while (true) {
    
                    /**
                     * 代码的优化
                     * 当前线程批量获取多条日志消息 投递kafka   批量
                     *
                     */
                    String log = logs.poll();
                    if (!StringUtils.isEmpty(log)) {
                        /// 将该消息投递到kafka中 批量形式投递kafka
                        kafkaTemplate.send("mayikt-log", log);
                    }
                }
            }
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    3.5. 测试接口
    package com.mayikt.api.weixin;
    
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    
    public interface WeChatService {
    
        /**
         * feign rpc远程调用 405
         * @param a
         * @return
         */
        @GetMapping("/getWeChat")
        String getWeChat( @RequestParam("a")Integer a);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    3.6. apipost 发送请求
    http://localhost:9000/getWeChat?a=123456888
    
    • 1

    在这里插入图片描述

    3.7. kibana 查看日志

    在这里插入图片描述
    在这里插入图片描述

  • 相关阅读:
    java毕业设计开题报告javaweb户籍管理系统|户口
    js的数组如何根据元素内容删除
    大数据Flink(七十一):SQL的时间属性
    IU酒店落子重庆,数山城美景
    问卷制作好了,怎么分析?
    利用sham-link实现路由还原
    springboot多模块项目启动经历
    9-AJAX-上-原理详解
    ADC实验
    HTML 笔记 表格
  • 原文地址:https://blog.csdn.net/weixin_40816738/article/details/126333533