搭建ELK日志收集,保姆级教程

开发与运维容器与中间件数据库管理服务

分布式日志采集产生背景

在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。

因此我们需要集中化的管理日志,ELK则应运而生。

为什么ELK需要结合Kafka

如果只整合elk 不结合kafka这样的话 每个服务器节点上都会安装Logstash做读写日志IO操作,可能性能不是很好,而且比较冗余。

picture.image

ELK+Kafka环境构建

基于docker-compose构建ELK+Kafka环境

注:所有的ip地址不能是localhost或者127.0.0.1

  
version: '3'  
services:  
 elasticsearch:  
 image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0  
 container_name: elasticsearch  
 environment:  
 - discovery.type=single-node  
 ports:  
 - "9200:9200"  
 - "9300:9300"  
  
 kibana:  
 image: docker.elastic.co/kibana/kibana:7.15.0  
 container_name: kibana  
 depends_on:  
 - elasticsearch  
 ports:  
 - "5601:5601"  
  
 logstash:  
 image: docker.elastic.co/logstash/logstash:7.15.0  
 container_name: logstash  
# command: --config /etc/logstash/conf.d/*.conf  
 volumes:  
 - ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf  
 ports:  
 - "5044:5044"   
  
 zookeeper:  
 image: wurstmeister/zookeeper  
 container_name: zookeeper  
 ports:  
 - "2181:2181"  
  
 kafka:  
 image: wurstmeister/kafka  
 container_name: kafka  
 depends_on:  
 - zookeeper  
 ports:  
 - "9092:9092"  
 environment:  
 - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181  
 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.137.1:9092  
 - KAFKA_LISTENERS=PLAINTEXT://:9092  
  
 kafka-manager:   
 image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面  
 environment:  
 ZK_HOSTS: 192.168.137.1:2181 ## 修改:宿主机IP  
 ports:   
 - "9001:9000" 

验证elk+kafka 环境

  
访问:zk http://127.0.0.1:2181  
访问:es http://127.0.0.1:9200/  
访问:kibana http://127.0.0.1:5601/app/kibana#/dev_tools/console?_g=()  

picture.image

picture.image

picture.image

SpringBoot项目整合ELK+Kafka

添加maven依赖

  
<!-- kafka -->  
 <dependency>  
 <groupId>org.springframework.kafka</groupId>  
 <artifactId>spring-kafka</artifactId>  
 </dependency>  
  
 <!--logstash 整合logback-->  
  
 <dependency>  
 <groupId>net.logstash.logback</groupId>  
 <artifactId>logstash-logback-encoder</artifactId>  
 <version>7.4</version>  
 <exclusions>  
 <exclusion>  
 <groupId>ch.qos.logback</groupId>  
 <artifactId>logback-core</artifactId>  
 </exclusion>  
 </exclusions>  
 </dependency>  
  
 <!--logback 整合 kafka-->  
 <dependency>  
 <groupId>com.github.danielwegener</groupId>  
 <artifactId>logback-kafka-appender</artifactId>  
 <version>0.2.0-RC2</version>  
 <scope>runtime</scope>  
 </dependency>

配置文件配置

  
spring:  
 kafka:  
 listener:  
 #设置是否批量消费,默认 single(单条),batch(批量)  
 type: single  
 # 集群地址 不能是localhost/127.0.0.1  
 bootstrap-servers: 192.168.137.1:9092  
 # 生产者配置  
 producer:  
 # 重试次数  
 retries: 3  
 # 应答级别  
 # acks=0 把消息发送到kafka就认为发送成功  
 # acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功  
 # acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功  
 acks: all  
 # 批量处理的最大大小 单位 byte  
 batch-size: 4096  
 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka  
 buffer-memory: 33554432  
 # 客户端ID  
 client-id: logstash  
 # Key 序列化类  
 key-serializer: org.apache.kafka.common.serialization.StringSerializer  
 # Value 序列化类  
 value-serializer: org.apache.kafka.common.serialization.StringSerializer  
 # 消息压缩:none、lz4、gzip、snappy,默认为 none。  
 compression-type: gzip  
 properties:  
 linger:  
 # 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka  
 ms: 1000  
 max:  
 block:  
 # KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms  
 ms: 6000  
 # 消费者配置  
 consumer:  
 # 默认消费者组  
 group-id: logstash  
 # 自动提交 offset 默认 true  
 enable-auto-commit: false  
 # 自动提交的频率 单位 ms  
 auto-commit-interval: 1000  
 # 批量消费最大数量  
 max-poll-records: 100  
 # Key 反序列化类  
 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
 # Value 反序列化类  
 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer  
 # 当kafka中没有初始offset或offset超出范围时将自动重置offset  
 # earliest:重置为分区中最小的offset  
 # latest:重置为分区中最新的offset(消费分区中新产生的数据)  
 # none:只要有一个分区不存在已提交的offset,就抛出异常  
 auto-offset-reset: latest  
 properties:  
 session:  
 timeout:  
 # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作  
 ms: 120000  
 request:  
 timeout:  
 # 请求超时  
 ms: 120000  
 template:  
 default-topic: tiger-log  
  
# 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件  
logging:  
 config: classpath:logback-spring.xml

配置logback-spring.xml

  
<?xml version="1.0" encoding="UTF-8"?>  
<configuration>  
 <include resource="org/springframework/boot/logging/logback/defaults.xml"/>  
 <include resource="org/springframework/boot/logging/logback/console-appender.xml"/>  
  
 <!-- 这里把整个测试环境的 模块复制过来了,里面包含 输入到 kafka 的配置-->  
 <springProfile name="dev">  
  
 <!-- 日志输出格式 -->  
 <property name="console.log.pattern"  
 value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%green(%thread)] %cyan(%-40.40(%logger{40})) : %msg%n"/>  
  
 <property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"/>  
  
  
 <property name="log_name_prefix" value="tiger-ucenter"/>  
  
 <property name="log.path" value="logs/tiger-ucenter"/>  
  
 <appender name="console" class="ch.qos.logback.core.ConsoleAppender">  
 <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">  
 <pattern>${console.log.pattern}</pattern>  
 </encoder>  
 <filter class="ch.qos.logback.classic.filter.ThresholdFilter">  
 <level>INFO</level>  
 </filter>  
 </appender>  
 <appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">  
 <File>${log.path}/${log_name_prefix}-info.log</File>  
 <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">  
 <pattern>${log.pattern}</pattern>  
 </encoder>  
 <filter class="ch.qos.logback.classic.filter.ThresholdFilter">  
 <level>INFO</level>  
 </filter>  
 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">  
 <FileNamePattern>${log.path}/backup/${log_name_prefix}-info.%d{yyyy-MM-dd}.log  
 </FileNamePattern>  
 </rollingPolicy>  
 </appender>  
 <appender name="file_error"  
 class="ch.qos.logback.core.rolling.RollingFileAppender">  
 <File>${log.path}/${log_name_prefix}-error.log</File>  
 <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">  
 <pattern>${log.pattern}</pattern>  
 </encoder>  
 <filter class="ch.qos.logback.classic.filter.ThresholdFilter">  
 <level>ERROR</level>  
 </filter>  
 <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">  
 <FileNamePattern>${log.path}/backup/${log_name_prefix}-error.%d{yyyy-MM-dd}.log  
 </FileNamePattern>  
 </rollingPolicy>  
 </appender>  
  
 <!-- kafka的appender配置-->  
 <appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">  
 <encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">  
 <providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders">  
 <pattern>  
 <pattern>  
 {"app":"${APP}",  
 "profile":"${PROFILES_ACTIVE}",  
 "thread": "%thread",  
 "logger": "%logger{5}",  
 "message":"%msg",  
 "app_name":"${APP_NAME}",  
 "env_name":"${ENV_NAME}",  
 "hostname":"${HOSTNAME}",  
 "captain_seq":"${CAPTAIN_SEQ}",  
 "captain_gen":"${CAPTAIN_GEN}",  
 "build_name":"${BUILD_NAME}",  
 "build_git_version":"${BUILD_GIT_VERSION}",  
 "build_git_hash":"${BUILD_GIT_HASH}",  
 "build_timestamp":"${BUILD_TIMESTAMP}",  
 "date":"%d{yyyy-MM-dd HH:mm:ss.SSS}",  
 "level":"%level",  
 "stack_trace":"%exception"  
 }  
 </pattern>  
 </pattern>  
 </providers>  
 </encoder>  
 <topic>tiger-log</topic>  
 <keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>  
 <deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>  
 <producerConfig>bootstrap.servers=192.168.137.1:9092</producerConfig>  
  
        
 <producerConfig>retries=1</producerConfig>  
        
 <producerConfig>batch-size=16384</producerConfig>  
        
 <producerConfig>buffer-memory=33554432</producerConfig>  
        
 <producerConfig>properties.max.request.size==2097152</producerConfig>  
 <appender-ref ref="console"/>  
 </appender>  
  
  
 <root level="INFO">  
 <appender-ref ref="console"/>  
 <appender-ref ref="file_error"/>  
 <appender-ref ref="file_info"/>  
 <appender-ref ref="kafkaAppender"/>  
 </root>  
 </springProfile>  
  
  
</configuration>

打开kibana查看收集的日志

picture.image

0
0
0
0
关于作者
关于作者

文章

0

获赞

0

收藏

0

相关资源
云原生数据库 veDB 核心技术剖析与展望
veDB 是一款分布式数据库,采用了云原生计算存储分离架构。本次演讲将为大家介绍火山引擎这款云原生数据库的核心技术原理,并对未来进行展望。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论