前面的文章讲的都是单个知识点或单平台方案。今天来一个综合实战——从零搭建一套完整的数据日报系统。
这套系统我实际跑了半年多,每天早上8点自动出报表,发到飞书群。覆盖淘宝、拼多多、小红书三个平台的数据采集、清洗、汇总和推送。全文按实际交付标准写,每一步都有可复用的代码。
先说这套系统能干什么
输入: 三个平台的账号配置 + 待监控的商品/关键词列表 输出: 每天早上8:05,飞书群收到一份包含以下内容的日报:
- 昨日销售额、订单量、访客数(三平台汇总)
- 各平台TOP5热销商品
- 异常告警(价格波动超10%/库存低于安全线)
- 多维表格链接(可查看明细数据)
系统架构总览
【定时触发】每天 08:00

↓
【主调度流程】DailyReport_Main
↓
┌────┼────┐
↓ ↓ ↓

淘宝 拼多多 小红书
采集 采集 采集
↓ ↓ ↓
└────┼────┘
↓

【数据汇总】Clean_MergeData(Python)
↓
【写入飞书多维表格】Write_Feishu
↓
【发送飞书消息】Send_Notification

↓
【归档日志】Archive_Logs
第一步:项目结构搭建
复制模板项目结构(详见上一篇文章),命名为 DailyReport_System/。
DailyReport_System/
├── 主流程/
│ └── DailyReport_Main.flow # 主调度流程
├── 子流程/
│ ├── 采集/
│ │ ├── Fetch_Taobao.flow
│ │ ├── Fetch_Pinduoduo.flow
│ │ └── Fetch_Xiaohongshu.flow
│ ├── 登录/
│ │ ├── Login_Taobao.flow
│ │ ├── Login_Pinduoduo.flow
│ │ └── Login_Xiaohongshu.flow
│ ├── 清洗/
│ │ └── Clean_MergeData.flow # Python合并清洗
│ ├── 输出/
│ │ ├── Write_Feishu.flow
│ │ └── Send_Notification.flow

│ └── 通用/
│ ├── Common_RandomWait.flow
│ ├── Common_RetryClick.flow
│ └── Common_CheckLogin.flow
├── 配置/
│ ├── accounts.xlsx # 各平台账号配置
│ ├── keywords.xlsx # 监控关键词列表
│ └── xpath_library.xlsx # XPath库
├── 数据/
│ ├── raw/ # 原始采集数据(按日期)
│ ├── processed/ # 清洗后数据
│ └── archive/ # 归档(保留90天)
└── 日志/
└── run_log.xlsx # 运行记录
第二步:配置文件设计
accounts.xlsx(账号配置表)
| 平台 | 账号名 | Profile名 | 代理IP | 账号 | 密码 | 是否启用 |
|---|---|---|---|---|---|---|
| 淘宝 | 主号 | Taobao_Main | 1.2.3.4:8080 | user1 | pass1 | 是 |
| 拼多多 | 店铺A | PDD_ShopA | 1.2.3.5:8080 | user2 | pass2 | 是 |
| 小红书 | 主号 | XHS_Main | 1.2.3.6:8080 | user3 | pass3 | 是 |
keywords.xlsx(监控关键词)
| 平台 | 关键词 | 目标价格 | 备注 |
|---|---|---|---|
| 淘宝 | 蓝牙耳机 真无线 | 99 | 竞品监控 |
| 淘宝 | 充电宝 10000mAh | 79 | 竞品监控 |
| 拼多多 | 蓝牙耳机 运动 | 65 | 竞品监控 |
| 小红书 | 蓝牙耳机测评 | - | 内容监控 |
第三步:主调度流程(核心)
# ============================================
# 主流程:DailyReport_Main
# 执行时间:每天 08:00(定时任务配置)
# ============================================
输出日志("===== 日报系统开始执行 =====")
设置变量(系统开始时间 = 获取当前时间())
# ----- 1. 加载配置 -----
输出日志("【1/6】加载配置")
读取Excel文件("配置/accounts.xlsx", 输出变量=账号配置)
读取Excel文件("配置/keywords.xlsx", 输出变量=关键词配置)
读取Excel文件("配置/xpath_library.xlsx", 输出变量=XPath库)
# 过滤出已启用的账号
设置变量(启用账号列表 = [])
按列表循环(账号配置[1:], 循环项=行):
如果 行[6] == "是":
将 行 加入列表(启用账号列表)
结束如果
结束循环
输出日志("共加载 " + 获取列表长度(启用账号列表) + " 个启用账号")
# ----- 2. 初始化输出文件和目录 -----
当前日期 = 获取当前日期()
创建文件夹("数据/raw/" + 当前日期)
创建文件夹("数据/processed/" + 当前日期)
# ----- 3. 按平台循环采集 -----
设置变量(所有平台数据 = [])
按列表循环(启用账号列表, 循环项=账号行):
平台 = 账号行[0]
账号名 = 账号行[1]
Profile名 = 账号行[2]
账号 = 账号行[4]
密码 = 账号行[5]
输出日志("【2/6】开始采集 " + 平台 + " - " + 账号名)
Try:
# 3.1 打开浏览器环境
打开浏览器环境(环境名称=Profile名)
# 3.2 检测登录态
调用子流程(Common_CheckLogin,
输入:
平台=平台,
登录标志XPath=从XPath库获取(平台, "登录标志"),
输出: 登录状态
)
如果 登录状态 == "失效":
输出日志(平台 + " 登录态失效,执行登录")
调用子流程("Login_" + 平台,
输入: 账号=账号, 密码=密码
)
结束如果
# 3.3 执行采集(根据平台调用不同子流程)
调用子流程("Fetch_" + 平台,
输入:
关键词列表=从关键词配置筛选(平台),
账号名=账号名,
输出: 平台数据
)
# 3.4 保存原始数据(备份)
保存路径 = "数据/raw/" + 当前日期 + "/" + 平台 + "_" + 账号名 + ".xlsx"
写入Excel文件(保存路径, 平台数据)
# 3.5 合并到总数据
将 平台数据 合并到 所有平台数据
输出日志(平台 + " 采集完成,共 " + 获取列表长度(平台数据) + " 条")
Catch:
输出日志("【ERROR】" + 平台 + " 采集失败")
截图保存("数据/raw/" + 当前日期 + "/error_" + 平台 + ".png")
# 继续处理下一个平台
Finally:
关闭浏览器环境()
调用子流程(Common_RandomWait)
结束循环
# ----- 4. 数据清洗与汇总(Python)-----
输出日志("【3/6】数据清洗与汇总")
调用子流程(Clean_MergeData,
输入:
原始数据=所有平台数据,
当前日期=当前日期,
输出:
汇总数据=汇总结果,
异常数据=异常列表
)
# 保存清洗后的数据
写入Excel文件("数据/processed/" + 当前日期 + "/汇总数据.xlsx", 汇总结果)
# ----- 5. 写入飞书多维表格 -----
输出日志("【4/6】写入飞书多维表格")
调用子流程(Write_Feishu,
输入:
数据=汇总结果,
表格ID="tbl_xxxxx"
)
# ----- 6. 发送通知 -----
输出日志("【5/6】发送日报通知")
调用子流程(Send_Notification,
输入:
汇总数据=汇总结果,
异常数据=异常列表,
开始时间=系统开始时间
)
# ----- 7. 归档日志 -----
输出日志("【6/6】归档日志")
调用子流程(Archive_Logs,
输入:
执行状态=成功,
总数据量=获取列表长度(汇总结果)
)
输出日志("===== 日报系统执行完成 =====")
第四步:数据清洗与汇总(Python)
# ============================================
# 子流程:Clean_MergeData
# 职责:合并三平台数据,统一格式,计算汇总指标
# ============================================
import pandas as pd
import json
from datetime import datetime
# 输入参数(从影刀传入):
# raw_data_json: 原始数据的JSON字符串
# current_date: 当前日期
# 输出参数:
# merged_data: 汇总数据的JSON字符串
# anomalies: 异常数据的JSON字符串
# ----- 1. 解析输入 -----
raw_list = json.loads(raw_data_json)
df = pd.DataFrame(raw_list)
# 统一列名(各平台采集时已按统一格式输出)
# 预期列:['平台', '商品名', '价格', '销量', '销售额', '采集时间']
# 如果数据为空,生成空报表
if len(df) == 0:
merged_data = json.dumps([])
anomalies = json.dumps([])
# 提前返回(用影刀返回指令)
返回结果(merged_data=merged_data, anomalies=anomalies)
# ----- 2. 数据清洗 -----
# 价格清洗:去除非数字字符
df['价格'] = df['价格'].astype(str).str.replace('¥', '').str.replace(',', '').str.replace('元', '')
df['价格'] = pd.to_numeric(df['价格'], errors='coerce')
# 销量清洗
df['销量'] = pd.to_numeric(df['销量'], errors='coerce').fillna(0)
# 销售额 = 价格 × 销量(如果缺失则计算)
df['销售额'] = df.apply(
lambda row: row['价格'] * row['销量'] if pd.notna(row['价格']) else 0,
axis=1
)
# 去除异常值(价格>10000或<1的可能是脏数据)
df = df[(df['价格'] <= 10000) & (df['价格'] >= 1)]
# 去除重复商品(按商品名去重,保留最新)
df = df.sort_values('采集时间').drop_duplicates('商品名', keep='last')
# ----- 3. 按平台汇总 -----
平台汇总 = df.groupby('平台').agg({
'销售额': 'sum',
'销量': 'sum',
'商品名': 'count'
}).reset_index()
平台汇总.columns = ['平台', '总销售额', '总销量', '商品数']
# ----- 4. 全平台汇总 -----
总销售额 = df['销售额'].sum()
总销量 = df['销量'].sum()
总商品数 = len(df)
# ----- 5. TOP10商品(全平台)-----
top10 = df.nlargest(10, '销售额')[['平台', '商品名', '价格', '销量', '销售额']]
# ----- 6. 异常检测 -----
# 6.1 价格波动超过10%(相比前一天的记录,需要历史数据)
# 这里简化:检测价格>1000或销量为0的商品
异常列表 = []
高价商品 = df[df['价格'] > 1000]
for _, row in 高价商品.iterrows():
异常列表.append({
'平台': row['平台'],
'商品名': row['商品名'],
'异常类型': '高价',
'详情': f"价格 {row['价格']} 元"
})
零销量商品 = df[df['销量'] == 0]
for _, row in 零销量商品.iterrows():
异常列表.append({
'平台': row['平台'],
'商品名': row['商品名'],
'异常类型': '零销量',
'详情': '销量为0'
})
# ----- 7. 组装汇总结果(用于推送)-----
汇总结果 = {
'日期': current_date,
'总销售额': round(总销售额, 2),
'总销量': int(总销量),
'总商品数': int(总商品数),
'平台明细': 平台汇总.to_dict('records'),
'TOP10': top10.to_dict('records')
}
# 输出给影刀
merged_data = json.dumps(汇总结果, ensure_ascii=False)
anomalies = json.dumps(异常列表, ensure_ascii=False)
第五步:飞书通知消息模板
# ============================================
# 子流程:Send_Notification
# 职责:组装日报消息并发送到飞书
# ============================================
# 输入参数:
# summary_json: 汇总数据的JSON字符串
# anomalies_json: 异常数据的JSON字符串
# start_time: 系统开始时间
# ----- 1. 解析数据 -----
summary = JSON解析(summary_json)
anomalies = JSON解析(anomalies_json)
# ----- 2. 计算耗时 -----
耗时秒 = 计算时间差(start_time, 获取当前时间())
耗时分钟 = 取整(耗时秒 / 60)
# ----- 3. 组装消息 -----
消息 = f"""
📊 **数据日报 - {summary['日期']}**
━━━━━━━━━━━━━━━━━━
⏱️ 执行耗时:{耗时分钟} 分钟
📦 总商品数:{summary['总商品数']} 个
💰 总销售额:¥{summary['总销售额']:,.2f}
📈 总销量:{summary['总销量']} 件
📋 **各平台明细**
"""
# 添加平台明细
for item in summary['平台明细']:
消息 += f"\n• {item['平台']}:销售额 ¥{item['总销售额']:,.2f},销量 {item['总销量']} 件"
# 添加TOP10
消息 += "\n\n🏆 **TOP10 热销商品**"
for i, item in enumerate(summary['TOP10'][:5]): # 只显示前5
消息 += f"\n{i+1}. {item['商品名']}({item['平台']})¥{item['销售额']:,.2f}"
# 添加异常告警
if len(anomalies) > 0:
消息 += "\n\n⚠️ **异常告警**"
for item in anomalies[:10]: # 最多显示10条
消息 += f"\n• {item['平台']} - {item['商品名']}:{item['异常类型']}({item['详情']})"
else:
消息 += "\n\n✅ 无异常数据"
消息 += "\n\n📎 查看明细:[飞书多维表格链接]"
消息 += f"\n━━━━━━━━━━━━━━━━━━\n发送时间:{获取当前时间()}"
# ----- 4. 发送 -----
# 有异常时@所有人
是否@所有人 = len(anomalies) > 0
调用子流程(Send_FeishuNotification,
输入参数:
msg_type="info",
title="数据日报 - " + summary['日期'],
content=消息,
mention_all=是否@所有人
)
第六步:定时任务配置
影刀RPA实操路径: 客户端 → 定时任务 → 新建任务。
| 配置项 | 设置值 |
|---|---|
| 任务名称 | 每日数据日报 |
| 选择流程 | DailyReport_Main.flow |
| 触发时间 | 每天 08:00 |
| 重复 | 每天 |
| 超时时间 | 60分钟 |
| 失败重试 | 1次 |
| 通知方式 | 飞书(异常时) |
运行保障机制
| 保障项 | 实现方式 |
|---|---|
| 登录态失效 | 每个平台采集前检测,失效自动登录 |
| 单平台失败 | Try-Catch包裹,不影响其他平台 |
| 数据为空 | Python清洗时判断,输出空报表而非报错 |
| 飞书写入失败 | 重试3次,仍失败则发告警 |
| 电脑休眠 | 电源设置“从不睡眠” |
| 日志留存 | 每次运行写入 run_log.xlsx |
上线检查清单
- 三个平台的账号在对应Profile中已手动登录过一次
- 所有XPath已用XPath Helper验证
- 飞书多维表格已创建,字段名与代码匹配
- 飞书机器人Webhook已配置
- Python依赖(pandas)已在影刀中安装
- 定时任务已创建并测试运行一次
- 电源设置已改为“从不睡眠”
- 运行日志表格已创建
常见问题速查
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 某个平台采集失败 | XPath变动 | 更新XPath库 |
| 汇总数据为0 | 采集无数据或格式错误 | 检查原始数据文件 |
| 飞书写入失败 | 字段名不匹配 | 核对表格字段名 |
| 定时任务没执行 | 客户端未打开 | 设置开机自启 |
| 消息发送超时 | 消息内容过长 | 缩短消息,明细放表格链接 |
推荐资源
- 影刀官方《综合实战案例》课程——社区学院
- 飞书多维表格API文档——官网
- 我的日报系统模板:上述完整项目结构打包,可直接下载使用(需自行替换账号和XPath)
#影刀RPA #RPA自动化 #综合实战 #数据日报 #多平台采集 #飞书
作者:林焱
本文为《影刀RPA学习手册》系列文章之一,内容源于实操经验的整理与分享。
