影刀RPA工程实战:店群数据采集加工流水线与闭环数据治理

影刀RPA工程实战:店群数据采集加工流水线与闭环数据治理

数据采回来了,堆在那里不处理,等于垃圾。
自动化的真正价值,不是替代人工点击,而是让数据自己流动起来。

前面几篇文章,我们把执行层的骨架搭得差不多了——浏览器池、环境隔离、分布式调度、断点续跑、Python插件。这些能力让系统能在几十上百个店铺的规模下稳定运行。但工程化走到这一步,我们会发现一个更深层的问题浮出水面:自动化产生的数据,到底该怎么管?

店群每天通过RPA跑下来的数据量不小。TEMU的全店商品列表、拼多多的订单明细、TikTok Shop的客服消息记录,这些数据如果只是落个Excel丢在网盘里,自动化等于只做了一半。真正的工程化,必须把数据从“采集”到“加工”再到“反哺业务”的整条链路都管起来。

这篇文章就讲我们是怎么在影刀RPA之上,搭起一套数据采集加工流水线,并形成闭环数据治理的。


一、原始数据的混沌现状

picture.image 先看看没有数据治理之前的状态。我们的自动化流程跑完,数据散落在各个角落:

  • 有的流程用影刀内置的“写入Excel”指令,文件存在执行机的某个目录下
  • 有的流程用Python插件直接写MySQL,表结构是临时建的,字段名随意取
  • 有的流程调用第三方API把数据推给业务系统,推送失败就丢了,没有任何重试
  • 日志里嵌着大量有效业务数据,但和日志混在一起,没人去提取

运营同事想拉一份“过去七天所有店铺的退货率趋势”,得手工打开几十个Excel,复制粘贴汇总。这个过程的痛苦程度,让自动化省下来的时间又还回去了。

自动化采集如果不解决数据治理,只是把手工操作的痛苦转移到了数据处理环节。


二、统一数据模型:从源头管住

我们做的第一件事,是定义一套跨平台、跨流程的统一数据模型。

picture.image

店群运营涉及的数据看似五花八门,但抽象出来无非几类:

  • 商品数据:商品ID、标题、价格、库存、状态、主图URL
  • 订单数据:订单ID、商品ID、金额、状态、下单时间、买家信息(脱敏)
  • 客服数据:会话ID、消息内容、发送方、时间、是否已回复
  • 营销数据:活动ID、报名状态、出价、预算、曝光量

picture.image 我们给每类数据定义了一套标准Schema,不绑定任何平台。所有采集流程,不管来自拼多多、TEMU还是TikTok Shop,最终落地的数据结构必须符合这套Schema。

以商品数据为例,标准Schema长这样:

{
  "platform": "temu",
  "shop_id": "temu_045",
  
![picture.image](https://p6-volc-community-sign.byteimg.com/tos-cn-i-tlddhu82om/6d8c7db6cdba40f8b068add21d6befb8~tplv-tlddhu82om-image.image?=&rk3s=8031ce6d&x-expires=1779475038&x-signature=KUKKDHLhCFXWtU0xrZYLSpuqZ%2Bk%3D)
  "product_id": "PD20260518001",
  "title": "夏季透气运动鞋",
  "price": 29.90,
  "currency": "USD",
  "stock": 1200,
  "status": "on_sale",
  "main_image_url": "https://...",
  "collected_at": "2026-05-18T10:23:45Z",
  "batch_id": "batch_20260518_001",
  "trace_id": "abc-123-def"
}

picture.image 所有流程共享这个Schema定义,存在一个独立的Schema Registry里,Python插件在写入数据前做校验。不符合Schema的数据直接拒绝写入并告警,从源头杜绝脏数据。

picture.image

三、采集与加工分离

最开始的自动化流程,边采集边处理——从页面提取出标题后立刻做清洗、分词、写入。看起来效率很高,实际上耦合太紧。一旦清洗逻辑需要调整,就得改流程重新发布。而且采集流程的执行时间被清洗计算拉长,页面可能因为等待而超时。

我们的重构方向很明确:采集和加工分离为两个独立的流水线阶段。

采集阶段只做三件事:

  1. 从页面提取原始数据
  2. 按标准Schema组装
  3. 写入原始数据缓冲区(Redis Stream或数据库原始表)

这个阶段追求的是快和稳,不做任何复杂计算。

加工阶段是独立的Python服务(可以脱离影刀运行),从缓冲区消费原始数据,做清洗、转换、聚合、去重,最终写入业务数据库。

分离后,采集流程变得轻量且稳定,加工逻辑可以独立迭代、独立测试、独立扩缩容。一条原始数据可以触发多条加工流水线,比如一个订单数据同时流向财务报表、客服回访提醒、库存预警三个下游。


四、原始数据缓冲区设计

我们选择Redis Stream作为采集和加工之间的缓冲区。原因有几个:

  • 原生支持消费者组,多实例并行消费不会重复
  • 消息持久化,加工服务挂了重启后可以继续消费
  • 支持消息确认,处理成功后再ACK,失败了数据不丢

采集流程的Python插件写入Stream:

def write_to_raw_buffer(stream_name: str, data: dict, max_len: int = 100000):
    redis.xadd(stream_name, data, maxlen=max_len, approximate=True)

加工服务用消费者组读取:

def consume_raw_data(stream_name: str, group_name: str, consumer_name: str):
    try:
        redis.xgroup_create(stream_name, group_name, mkstream=True)
    except Exception:
        pass  # 消费者组已存在则忽略

    while True:
        messages = redis.xreadgroup(group_name, consumer_name, {stream_name: ">"}, count=10, block=5000)
        for stream, msg_list in messages:
            for msg_id, data in msg_list:
                try:
                    process_raw_data(data)
                    redis.xack(stream_name, group_name, msg_id)
                except Exception as e:
                    logger.error(f"Process failed for {msg_id}: {e}")
                    # 不ACK,消息会被重新投递

这套缓冲区机制让数据采集和加工完全解耦,采集流程不知道数据会被如何处理,加工服务也不知道数据从哪个具体流程来的,只认Schema。


五、数据质量校验的三道防线

原始数据进到缓冲区之后,不是直接写进业务库的。我们设了三道数据质量防线。

第一道:Schema校验。 前面提过,字段类型、必填项、枚举值范围都在Schema Registry里定义,插件在写入Stream前就会校验,不通过的连缓冲区都进不去。

第二道:业务规则校验。 在加工阶段执行。比如商品价格不能为负数、订单金额和商品数量乘起来对不上、采集时间戳和任务执行时间偏差超过一小时——这些都属于业务异常,会被标记为quarantine(隔离),进入一个待审区,人工确认后才能放行。

第三道:跨源对账。 我们定期把RPA采集的数据和平台后台导出的报表数据做抽样比对。如果偏差超过5%,说明采集逻辑可能有问题(比如漏了某个筛选条件、翻页有遗漏)。这个对账目前是半自动化的,Python脚本拉取两边数据做差异分析,结果推送到运营看板。

这三道防线加在一起,让进入业务数据库的数据质量有了底线。不会出现运营拿着自动化产出的数据做了决策,结果发现数据是错的那种灾难。


六、数据生命周期与冷热分离

店群数据有个特点:最近7天的数据高频使用,30天内的偶尔回溯,超过90天的几乎只有归档价值。

我们的业务数据库如果无差别地存所有数据,查询性能会随时间持续下降。所以我们在加工层做了冷热分离:

  • 热数据(近7天):存入MySQL的主表,带索引,查询不做任何限制
  • 温数据(8~30天):存MySQL但取消部分非核心索引,查询有轻微延迟
  • 冷数据(30天以上):迁移到归档库(我们用ClickHouse做分析型查询),MySQL里只保留聚合统计结果

迁移是自动化的,每天凌晨跑一个定时任务,把超过30天的数据从热表挪到冷存储,并重建热表的空间。


七、闭环反哺:让数据回到运营流程

数据治理做到这一步,已经从“采集-存储”进化到了“采集-加工-存储”。但还能再往前走一步——让加工后的数据反哺运营自动化流程。

举几个实际场景:

场景一:商品价格监控与自动调价。 采集加工后的数据里,我们可以在加工层做竞品比价分析,把需要调价的商品清单生成一个任务,推回到Redis任务队列。调度中心拿到这个任务,分发到执行节点,影刀流程自动去后台改价。

场景二:差评闭环处理。 加工层识别出差评后,不是只生成一条报表,而是直接向任务队列投递一个“差评回复”任务。影刀流程收到后,用模板生成回复内容并发送。整个链路从数据采集到问题处理完全闭环,人工只需抽检。

场景三:异常数据触发人工工单。 当数据质检防线发现quarantine数据时,加工层会调用内部工单系统的API,自动生成一条工单,附带异常数据详情和截图,指派给对应运营。这样异常不会石沉大海。

数据不应该是自动化的终点。
它应该是下一轮自动化的起点。


八、工程化边界:哪些数据不进这条流水线

不是什么数据都值得进入流水线。我们定了三条准入标准:

第一条:业务决策依赖的数据。 比如商品价格、库存、订单量,直接影响运营策略,必须进流水线。 第二条:跨流程共享的数据。 比如店铺登录态的有效期,采集流程和操作流程都需要,但这不是业务数据而是环境元数据,走环境管理那条线,不进数据流水线。 第三条:需要长期留痕的数据。 比如客服消息记录,有合规和纠纷举证需求,必须持久化。

临时性的、中间态的数据——比如流程断点Checkpoint、浏览器池状态、任务队列元数据——全部留在Redis里,不进入数据流水线,避免污染业务数据存储。


九、一个完整的串联案例

说一个完整的链路,把前面讲的串起来。

TEMU某店铺的商品采集流程跑完后,Python插件把500条商品原始数据写入Redis Stream。加工服务的消费者组拿到这批数据,做三件事:

  1. 清洗标题中的特殊字符和平台标记
  2. 和上一次采集的快照做diff,识别出新增商品12个、价格变动商品8个、下架商品3个
  3. 把结果写入MySQL的热数据表,同时把价格变动清单推送到调价任务队列

调度中心检测到调价任务队列有新任务,分配到空闲执行节点。影刀调价流程启动,从Checkpoint读取之前的状态,跳过已完成部分,逐条处理价格变动。

整个链路里,采集流程不知道调价流程的存在,调价流程也不知道数据是加工层算出来的。每个环节只关心自己的输入和输出,松耦合、可独立演进。

这套东西跑稳以后,运营同事给我发了条消息,说“现在看数据终于不用再开Excel地狱了”。那一刻我觉得所有的工程化投入都值了。


十、下一步:实时化与流式计算

当前这套加工流水线还是微批处理模式,加工服务每隔几秒拉一批数据处理,延迟在秒级。对于价格监控这类场景勉强够用,但如果未来要介入更实时的场景——比如直播间的实时评论监控——就需要把加工层升级为真正的流式计算。

我们已经在测试用Flink消费Redis Stream,做更高效的流式聚合和窗口计算。不过那是另一篇文章要讲的故事了。

数据治理从来不是一个技术点,而是一条持续建设的工程链路。
把这条链路做扎实了,自动化的投入产出比才会真正释放出来。

作者:林焱
一个总想把数据治理做到自动化系统底层的工程师

0
0
0
0
评论
未登录
暂无评论