使用Logstash将TOS上的数据导入到云搜索

容器与中间件中间件技术服务知识库
场景介绍

在 Logstash 中,整个 pipleline 分为三个部分:

  • input 插件用于提取数据。
  • filter 插件用于转换并丰富数据。
  • output 插件会将已处理的事件加载到其他环境中,例如 ElasticSearch 。

本文介绍如何将 TOS 中的文件,通过 Logstash S3 input plugin 导入到云搜索服务中。

运行版本

  • logstash 7.10.2
  • 云搜索 7.10.2
操作步骤

样例数据

需要导入的数据展示如下:student.csv 需要上传到 TOS 中

(base) [root@rudonx logstash-7.10.2]# cat student.csv                     
id,name,age
10,aa,21
11,ab,22
12,ac,19

Logstash 配置

准备 Logstash 配置文件

input {
      s3 {
        access_key_id => "your ak"
        secret_access_key => "your sk"
        bucket => "your bucketname"
        region => "cn-beijing"
        endpoint => "https://tos-s3-cn-beijing.volces.com"
        prefix => "esimport"
      }
    }

filter {
  csv {
    separator => ","
    skip_header => true
    columns => ["id", "name", "age"]
  }

  mutate {
    add_field => { "write_time" => "%{[@timestamp]}" }
    remove_field => ["@version","@timestamp","message"]
  }
}

output {
 elasticsearch {
    hosts => ["https://xxxxxxx.ivolces.com:9200"]
    index => "tosindex"
    user => "username"
    password => "password"
    ssl => false
    ssl_certificate_verification => false
  }
  stdout { codec => rubydebug }
}

配置解读

input

  • 填写 AK,SK,需要确保有正确的权限。
  • 填写 region,endpoint 信息,关于 TOS 更多区域,参考此文档
  • prefix:指定需要访问的文件前缀

filter

  • 使用 csv filter plugin:按照 "," 为分隔符
  • skip_header:跳过 id, name, age 字段,直接读取值
  • add_field:将 logstash 默认添加的 @timestamp 字段复制给新字段 write_time
  • remove_field:去除 logstash 默认添加的 ["@version","@timestamp","message"] 字段

output

  • 指定云搜索服务的主机、索引、用户认证信息及 SSL 验证信息。
  • stdout 输出用于调试,它会输出 Logstash 处理后的事件。

运行 Logstash

运行 Logstsah

(base) [root@rudonx logstash-7.10.2]# bin/logstash -f conf/tos.conf

输出如下:

{
          "name" => "ac",
           "age" => "19",
            "id" => "12",
    "write_time" => "2023-11-21T07:44:23.438Z"
}
{
          "name" => "ab",
           "age" => "22",
            "id" => "11",
    "write_time" => "2023-11-21T07:44:23.438Z"
}
{
          "name" => "aa",
           "age" => "21",
            "id" => "10",
    "write_time" => "2023-11-21T07:44:23.437Z"
}

注意

我们注意到,在 Logstash S3 input plugin 文档中提到:

The S3 input plugin only supports AWS S3. Other S3 compatible storage solutions are not supported

这里和实际测试有一些出入。火山引擎对象存储 TOS是火山引擎提供的海量、安全、低成本、易用、高可靠、高可用的分布式云存储服务,提供了对 AWS S3 协议(以下简称 S3 协议)的兼容性支持。因此可以使用 s3 input plugin 方便的将 TOS 中的数据导入到云搜索服务中。

参考文档
0
0
0
0
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论