问题描述
如何将 RabbitMQ 元数据中的 Topic 和 Group 的名称导出到 excel。
解决方案
我们可以使用 API 脚本导出,您可以参考如下示例:
import datetime
import hashlib
import hmac
import json
from urllib.parse import quote
import pandas as pd
import requests
Service = "rocketmq"
Version = "2021-04-01"
#实例对应地域、host信息
Region = "cn-beijing"
Host = "rocketmq.volcengineapi.com"
#配置对应账号的ak sk
AK = ""
SK = ""
InstanctID = ['MQ_INST_xxxx','rocketmq-xxxx']
def norm_query(params):
query = ""
for key in sorted(params.keys()):
if type(params[key]) == list:
for k in params[key]:
query = (
query + quote(key, safe="-_.~") + "=" + quote(k, safe="-_.~") + "&"
)
else:
query = (query + quote(key, safe="-_.~") + "=" + quote(params[key], safe="-_.~") + "&")
query = query[:-1]
return query.replace("+", "%20")
# 第一步:准备辅助函数。
# sha256 非对称加密
def hmac_sha256(key: bytes, content: str):
return hmac.new(key, content.encode("utf-8"), hashlib.sha256).digest()
# sha256 hash算法
def hash_sha256(content: str):
return hashlib.sha256(content.encode("utf-8")).hexdigest()
# 第二步:创建一个 API 请求函数。签名计算的过程包含在该函数中。
def request(method, query, header, ak, sk, action, body):
# 第三步:创建身份证明。其中的 Service 和 Region 字段是固定的。ak 和 sk 分别代表
# AccessKeyID 和 SecretAccessKey。同时需要初始化签名结构体。一些签名计算时需要的属性也在这里处理。
# 初始化身份证明结构体
credential = {
"access_key_id": ak,
"secret_access_key": sk,
"service": Service,
"region": Region,
}
# 初始化签名结构体
request_param = {
"body": json.dumps(body),
"host": Host,
"path": "/",
"method": method,
"content_type": "application/json",
"date": datetime.datetime.utcnow(),
"query": {"Action": action, "Version": Version, **query},
}
# 第四步:接下来开始计算签名。在计算签名前,先准备好用于接收签算结果的 signResult 变量,并设置一些参数。
# 初始化签名结果的结构体
x_date = request_param["date"].strftime("%Y%m%dT%H%M%SZ")
short_x_date = x_date[:8]
x_content_sha256 = hash_sha256(request_param["body"])
sign_result = {
"Host": request_param["host"],
"X-Content-Sha256": x_content_sha256,
"X-Date": x_date,
"Content-Type": request_param["content_type"],
}
# 第五步:计算 Signature 签名。
signed_headers_str = ";".join(
["content-type", "host", "x-content-sha256", "x-date"]
)
canonical_request_str = "\n".join(
[request_param["method"],
request_param["path"],
norm_query(request_param["query"]),
"\n".join(
[
"content-type:" + request_param["content_type"],
"host:" + request_param["host"],
"x-content-sha256:" + x_content_sha256,
"x-date:" + x_date,
]
),
"",
signed_headers_str,
x_content_sha256,
]
)
hashed_canonical_request = hash_sha256(canonical_request_str)
credential_scope = "/".join([short_x_date, credential["region"], credential["service"], "request"])
string_to_sign = "\n".join(["HMAC-SHA256", x_date, credential_scope, hashed_canonical_request])
k_date = hmac_sha256(credential["secret_access_key"].encode("utf-8"), short_x_date)
k_region = hmac_sha256(k_date, credential["region"])
k_service = hmac_sha256(k_region, credential["service"])
k_signing = hmac_sha256(k_service, "request")
signature = hmac_sha256(k_signing, string_to_sign).hex()
sign_result["Authorization"] = "HMAC-SHA256 Credential={}, SignedHeaders={}, Signature={}".format(
credential["access_key_id"] + "/" + credential_scope,
signed_headers_str,
signature,
)
header = {**header, **sign_result}
# 第六步:将 Signature 签名写入 HTTP Header 中,并发送 HTTP 请求。
r = requests.post("http://{}{}".format(request_param["host"], request_param["path"]),
headers=header,
params=request_param["query"],
data=request_param["body"],
)
return r.json()
if __name__ == "__main__":
now = datetime.datetime.utcnow()
for inst in InstanctID:
print(inst)
request_body = {
"InstanceId": inst,
"Offset": 0,
"Limit": 100,
"SortField": "Name",
"SortOrder": "Asc"
}
response_body_listgroup = request("POST", {}, {}, AK, SK, "ListGroups", request_body)
response_body_listtopic = request("POST", {}, {}, AK, SK, "ListTopics", request_body)
#打开文件写入到csv文件中
f = open(f"{inst}.csv", "a")
#写入Group到csv
f.write('Group')
f.write(',')
for item in response_body_listgroup['Result']['Groups']:
f.write(item['GroupId'])
f.write(',')
f.write('\n')
#写入Topic到csv
f.write('Topic')
f.write(',')
for item in response_body_listtopic['Result']['Topics']:
f.write(item['TopicName'])
f.write(',')
f.write('\n')
f.close()