分布式日志采集产生背景
在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。
因此我们需要集中化的管理日志,ELK则应运而生。
为什么ELK需要结合Kafka
如果只整合elk 不结合kafka这样的话 每个服务器节点上都会安装Logstash做读写日志IO操作,可能性能不是很好,而且比较冗余。
ELK+Kafka环境构建
基于docker-compose构建ELK+Kafka环境
注:所有的ip地址不能是localhost或者127.0.0.1
version: '3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.15.0
container_name: elasticsearch
environment:
- discovery.type=single-node
ports:
- "9200:9200"
- "9300:9300"
kibana:
image: docker.elastic.co/kibana/kibana:7.15.0
container_name: kibana
depends_on:
- elasticsearch
ports:
- "5601:5601"
logstash:
image: docker.elastic.co/logstash/logstash:7.15.0
container_name: logstash
# command: --config /etc/logstash/conf.d/*.conf
volumes:
- ./logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
ports:
- "5044:5044"
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.137.1:9092
- KAFKA_LISTENERS=PLAINTEXT://:9092
kafka-manager:
image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面
environment:
ZK_HOSTS: 192.168.137.1:2181 ## 修改:宿主机IP
ports:
- "9001:9000"
验证elk+kafka 环境
访问:zk http://127.0.0.1:2181
访问:es http://127.0.0.1:9200/
访问:kibana http://127.0.0.1:5601/app/kibana#/dev_tools/console?_g=()
SpringBoot项目整合ELK+Kafka
添加maven依赖
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!--logstash 整合logback-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>7.4</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--logback 整合 kafka-->
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
<scope>runtime</scope>
</dependency>
配置文件配置
spring:
kafka:
listener:
#设置是否批量消费,默认 single(单条),batch(批量)
type: single
# 集群地址 不能是localhost/127.0.0.1
bootstrap-servers: 192.168.137.1:9092
# 生产者配置
producer:
# 重试次数
retries: 3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
acks: all
# 批量处理的最大大小 单位 byte
batch-size: 4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
buffer-memory: 33554432
# 客户端ID
client-id: logstash
# Key 序列化类
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
compression-type: gzip
properties:
linger:
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
ms: 1000
max:
block:
# KafkaProducer.send() 和 partitionsFor() 方法的最长阻塞时间 单位 ms
ms: 6000
# 消费者配置
consumer:
# 默认消费者组
group-id: logstash
# 自动提交 offset 默认 true
enable-auto-commit: false
# 自动提交的频率 单位 ms
auto-commit-interval: 1000
# 批量消费最大数量
max-poll-records: 100
# Key 反序列化类
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化类
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 当kafka中没有初始offset或offset超出范围时将自动重置offset
# earliest:重置为分区中最小的offset
# latest:重置为分区中最新的offset(消费分区中新产生的数据)
# none:只要有一个分区不存在已提交的offset,就抛出异常
auto-offset-reset: latest
properties:
session:
timeout:
# session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
ms: 120000
request:
timeout:
# 请求超时
ms: 120000
template:
default-topic: tiger-log
# 指定logback配置文件,因为查找优先级问题,最好手动配置上,避免其他依赖导致未使用到自定义的logback文件
logging:
config: classpath:logback-spring.xml
配置logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<include resource="org/springframework/boot/logging/logback/console-appender.xml"/>
<!-- 这里把整个测试环境的 模块复制过来了,里面包含 输入到 kafka 的配置-->
<springProfile name="dev">
<!-- 日志输出格式 -->
<property name="console.log.pattern"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) --- [%green(%thread)] %cyan(%-40.40(%logger{40})) : %msg%n"/>
<property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"/>
<property name="log_name_prefix" value="tiger-ucenter"/>
<property name="log.path" value="logs/tiger-ucenter"/>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${console.log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
</appender>
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${log.path}/${log_name_prefix}-info.log</File>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>INFO</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${log.path}/backup/${log_name_prefix}-info.%d{yyyy-MM-dd}.log
</FileNamePattern>
</rollingPolicy>
</appender>
<appender name="file_error"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<File>${log.path}/${log_name_prefix}-error.log</File>
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>ERROR</level>
</filter>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<FileNamePattern>${log.path}/backup/${log_name_prefix}-error.%d{yyyy-MM-dd}.log
</FileNamePattern>
</rollingPolicy>
</appender>
<!-- kafka的appender配置-->
<appender name="kafkaAppender" class="com.github.danielwegener.logback.kafka.KafkaAppender">
<encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers class="net.logstash.logback.composite.loggingevent.LoggingEventJsonProviders">
<pattern>
<pattern>
{"app":"${APP}",
"profile":"${PROFILES_ACTIVE}",
"thread": "%thread",
"logger": "%logger{5}",
"message":"%msg",
"app_name":"${APP_NAME}",
"env_name":"${ENV_NAME}",
"hostname":"${HOSTNAME}",
"captain_seq":"${CAPTAIN_SEQ}",
"captain_gen":"${CAPTAIN_GEN}",
"build_name":"${BUILD_NAME}",
"build_git_version":"${BUILD_GIT_VERSION}",
"build_git_hash":"${BUILD_GIT_HASH}",
"build_timestamp":"${BUILD_TIMESTAMP}",
"date":"%d{yyyy-MM-dd HH:mm:ss.SSS}",
"level":"%level",
"stack_trace":"%exception"
}
</pattern>
</pattern>
</providers>
</encoder>
<topic>tiger-log</topic>
<keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/>
<deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/>
<producerConfig>bootstrap.servers=192.168.137.1:9092</producerConfig>
<producerConfig>retries=1</producerConfig>
<producerConfig>batch-size=16384</producerConfig>
<producerConfig>buffer-memory=33554432</producerConfig>
<producerConfig>properties.max.request.size==2097152</producerConfig>
<appender-ref ref="console"/>
</appender>
<root level="INFO">
<appender-ref ref="console"/>
<appender-ref ref="file_error"/>
<appender-ref ref="file_info"/>
<appender-ref ref="kafkaAppender"/>
</root>
</springProfile>
</configuration>
