Logstash 如何通过 Kafka 协议消费 TLS 日志

容器与中间件中间件技术服务知识库
问题现象

如何通过修改 Logstash 配置文件,实现通过 Kafka 协议消费日志到其他业务系统。

问题分析

TLS 日志服务支持通过 Logstash 消费日志数据,您可以通过配置 Logstash 服务内置的 logstash-input-kafka 插件获取日志服务中的日志数据。

解决方案

1.安装 logstash

1.1 下载安装包。 1.2 解压安装包到指定目录。 1.3 查看logstash 版本

[root@lxb-jms ~]# /usr/share/logstash/bin/logstash -V
logstash 7.17.7

建议使用的 Logstash 版本为 7.12~8.8.1,如果其他版本需要根据实际情况调整适配。

2.检查 logstash 是否有内置 kafka 插件

[root@lxb-jms ~]# /usr/share/logstash/bin/logstash-plugin list | grep kafka
logstash-integration-kafka
 ├── logstash-input-kafka
 └── logstash-output-kafka

3.修改 logstash 配置文件

添加 output 配置打印到标准输出,用于调试,实际根据情况对接业务系统。

input {
        kafka {
                bootstrap_servers => "tls-cn-beijing.ivolces.com:9093"
                security_protocol => "SASL_SSL"
                sasl_mechanism => "PLAIN"
                enable_auto_commit => "false"
                topics => "{topicID}"
                sasl_jaas_config => ""org.apache.kafka.common.security.plain.PlainLoginModule required username='${projectID}' password='${AK}#${SK}';"
        }
}
output {
  stdout {}
}

强烈建议不要把 AccessKey ID 和 AccessKey Secret 保存到工程代码里,否则可能导致 AccessKey 泄露,威胁您账号下所有资源的安全, 建议使用 jaas_path 参数配置,示例如下

jaas_path => "/usr/share/logstash/config/kafka-client-jaas.conf"
[root@lxb-jms conf.d]# cat kafka-client-jaas.conf
KafkaClient {
        org.apache.kafka.common.security.plain.PlainLoginModule required 
        username="${projectID}" 
        password="${AK}#${SK}";
};

4.启动 logstash

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/logstash-tls-input.conf
参数说明
连接类型为保证日志传输的安全性,必须使用 SASL_SSL 连接协议。对应的用户名为日志服务项目 ID,密码为火山引擎账号密钥,详细信息请参考示例
usernameKafka SASL 用户名。应配置为日志服务的日志项目 ID。
passwordKafka SASL 用户密码。应配置为火山引擎账户密钥。
格式为 ${access-key-id}#${access-key-secret},其中:

* ${access-key-id} 应替换为您的 AccessKey ID。
* ${access-key-secret} 应替换为您的 AccessKey Secret。

说明
建议使用 IAM 用户的 AK,且 IAM 用户应具备 Action PutLogs 的权限。详细信息请参考可授权的操作
hosts初始连接的集群地址,格式为服务地址:端口号,例如
tls-cn-beijing.ivolces.com:9093其中:

* 服务地址为当前地域下日志服务的服务入口。请根据地域和网络类型选择正确的服务入口,详细信息请参见服务入口
* 端口号固定为 9093。

说明
hosts 中的服务地址部分无需指定 https://
topic配置为日志服务的日志主题 ID。

成功消费示例输出如下:

…………
…………
      "@version" => "1",
       "message" => "{\"__container_ip__\":\"172.27.112.10\",\"__container_name__\":\"filebeat\",\"__container_source__\":\"stderr\",\"__content__\":\"  \\\"from\\\": \\\"LogServiceKubernetes\\\",\",\"__image_name__\":\"vke-cn-beijing.cr.volces.com/vke/log-collector-filebeat:v1.2.1\",\"__namespace__\":\"kube-system\",\"__pod_name__\":\"log-collector-z2llr\",\"__pod_uid__\":\"f9002d8b-9f63-451e-9f40-be3e25d2fd5b\",\"__tag____client_ip__\":\"192.168.0.190\",\"__tag____receive_time__\":\"1694605532\",\"cluster_id\":\"ccbp43nnqtofphqub8r90\",\"host_ip\":\"192.168.0.190\"}",
    "@timestamp" => 2023-09-13T11:47:03.281Z
}
{
      "@version" => "1",
       "message" => "{\"__container_ip__\":\"172.27.112.10\",\"__container_name__\":\"filebeat\",\"__container_source__\":\"stderr\",\"__content__\":\"  \\\"id\\\": \\\"3980555537189372045\\\",\",\"__image_name__\":\"vke-cn-beijing.cr.volces.com/vke/log-collector-filebeat:v1.2.1\",\"__namespace__\":\"kube-system\",\"__pod_name__\":\"log-collector-z2llr\",\"__pod_uid__\":\"f9002d8b-9f63-451e-9f40-be3e25d2fd5b\",\"__tag____client_ip__\":\"192.168.0.190\",\"__tag____receive_time__\":\"1694605532\",\"cluster_id\":\"ccbp43nnqtofphqub8r90\",\"host_ip\":\"192.168.0.190\"}",
    "@timestamp" => 2023-09-13T11:47:03.281Z
    
 …………
 …………
问题报错

报错信息:

[[main]<kafka] kafka - Unable to create Kafka consumer from given configuration {:kafka_error_message=>org.apache.kafka.common.KafkaException: Failed to construct kafka consumer, :cause=>java.lang.SecurityException: java.io.IOException: 配置错误:
        行 3: 应为 [option value], 找到 [6bdbbc16-****-****-ae68-07ea9fee997d]}

问题原因:

cat kafka-client-jaas.conf 配置文件中的 option value 需要使用 "", 而不是'' ,这里与不用 jass_path 参数的方式是有一定区别的。
正确的写法 : username="projectID"错误的写法:username={projectID}" 错误的写法: username='{projectID}'

参考文档

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

48
0
0
0
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论