问题现象
如何通过修改 Logstash 配置文件,实现通过 Kafka 协议消费日志到其他业务系统。
问题分析
TLS 日志服务支持通过 Logstash 消费日志数据,您可以通过配置 Logstash 服务内置的 logstash-input-kafka 插件获取日志服务中的日志数据。
解决方案
1.安装 logstash
1.1 下载安装包。 1.2 解压安装包到指定目录。 1.3 查看logstash 版本
[root@lxb-jms ~]# /usr/share/logstash/bin/logstash -V
logstash 7.17.7
建议使用的 Logstash 版本为 7.12~8.8.1,如果其他版本需要根据实际情况调整适配。
2.检查 logstash 是否有内置 kafka 插件
[root@lxb-jms ~]# /usr/share/logstash/bin/logstash-plugin list | grep kafka
logstash-integration-kafka
├── logstash-input-kafka
└── logstash-output-kafka
3.修改 logstash 配置文件
添加 output 配置打印到标准输出,用于调试,实际根据情况对接业务系统。
input {
kafka {
bootstrap_servers => "tls-cn-beijing.ivolces.com:9093"
security_protocol => "SASL_SSL"
sasl_mechanism => "PLAIN"
enable_auto_commit => "false"
topics => "{topicID}"
sasl_jaas_config => ""org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectID}' password='${AK}#${SK}';"
}
}
output {
stdout {}
}
强烈建议不要把 AccessKey ID 和 AccessKey Secret 保存到工程代码里,否则可能导致 AccessKey 泄露,威胁您账号下所有资源的安全, 建议使用 jaas_path
参数配置,示例如下
jaas_path => "/usr/share/logstash/config/kafka-client-jaas.conf"
[root@lxb-jms conf.d]# cat kafka-client-jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="${projectID}"
password="${AK}#${SK}";
};
4.启动 logstash
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-tls-input.conf
参数 | 说明 |
---|---|
连接类型 | 为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例。 |
username | Kafka SASL 用户名。应配置为日志服务的日志项目 ID。 |
password | Kafka SASL 用户密码。应配置为火山引擎账户密钥。 格式为 ${access-key-id}#${access-key-secret} ,其中: * ${access-key-id} 应替换为您的 AccessKey ID。 * ${access-key-secret} 应替换为您的 AccessKey Secret。 说明 建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作。 |
hosts | 初始连接的集群地址,格式为服务地址:端口号 ,例如 tls-cn-beijing.ivolces.com:9093 其中: * 服务地址为当前地域下日志服务的服务入口。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务入口。 * 端口号固定为 9093。 说明 hosts 中的服务地址部分无需指定 https:// 。 |
topic | 配置为日志服务的日志主题 ID。 |
成功消费示例输出如下:
…………
…………
"@version" => "1",
"message" => "{\"__container_ip__\":\"172.27.112.10\",\"__container_name__\":\"filebeat\",\"__container_source__\":\"stderr\",\"__content__\":\" \\\"from\\\": \\\"LogServiceKubernetes\\\",\",\"__image_name__\":\"vke-cn-beijing.cr.volces.com/vke/log-collector-filebeat:v1.2.1\",\"__namespace__\":\"kube-system\",\"__pod_name__\":\"log-collector-z2llr\",\"__pod_uid__\":\"f9002d8b-9f63-451e-9f40-be3e25d2fd5b\",\"__tag____client_ip__\":\"192.168.0.190\",\"__tag____receive_time__\":\"1694605532\",\"cluster_id\":\"ccbp43nnqtofphqub8r90\",\"host_ip\":\"192.168.0.190\"}",
"@timestamp" => 2023-09-13T11:47:03.281Z
}
{
"@version" => "1",
"message" => "{\"__container_ip__\":\"172.27.112.10\",\"__container_name__\":\"filebeat\",\"__container_source__\":\"stderr\",\"__content__\":\" \\\"id\\\": \\\"3980555537189372045\\\",\",\"__image_name__\":\"vke-cn-beijing.cr.volces.com/vke/log-collector-filebeat:v1.2.1\",\"__namespace__\":\"kube-system\",\"__pod_name__\":\"log-collector-z2llr\",\"__pod_uid__\":\"f9002d8b-9f63-451e-9f40-be3e25d2fd5b\",\"__tag____client_ip__\":\"192.168.0.190\",\"__tag____receive_time__\":\"1694605532\",\"cluster_id\":\"ccbp43nnqtofphqub8r90\",\"host_ip\":\"192.168.0.190\"}",
"@timestamp" => 2023-09-13T11:47:03.281Z
…………
…………
问题报错
报错信息:
[[main]<kafka] kafka - Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :cause=>java.lang.SecurityException: java.io.IOException: 配置错误:
行 3: 应为 [option value], 找到 [6bdbbc16-****-****-ae68-07ea9fee997d]}
问题原因:
cat kafka-client-jaas.conf
配置文件中的 option value 需要使用 ""
, 而不是''
,这里与不用 jass_path
参数的方式是有一定区别的。
正确的写法 : username="{projectID}'
参考文档
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html