影刀RPA店群自动化:跨平台数据一致性与事务补偿机制实战

影刀RPA店群自动化:跨平台数据一致性与事务补偿机制实战

店群自动化做到多平台、多店铺时,会遇到一个很隐蔽的问题:

数据不一致。

同一个商品,在拼多多上已经下架了,但TEMU上还是上架状态。订单已经在ERP里发货了,但店铺后台没同步。批量改价改了50个商品,第30个时脚本报错退出,前29个改了,后面21个没改。

这些问题比脚本报错更麻烦。报错你能看见,数据不一致你可能过好几天才发现。

我们早期处理这些问题的思路是“尽量避免”。后来发现避免不了——网络会抖、平台会限流、节点会宕机。

于是我们设计了一套数据一致性保障与事务补偿机制

这篇文章不讲调度,也不讲监控。专门聊聊如何让店群系统在出现异常后,能够自动发现不一致并修复,最终达到最终一致性。

picture.image

适用场景:多平台、多店铺的批量操作、订单同步、库存管理。 技术栈:Python + Redis + 对账任务 + 补偿脚本 + 操作日志。


一、数据不一致是怎么产生的?

picture.image

先列举几种典型场景。

场景一:批量操作中途失败

批量修改100个商品的价格。脚本循环调用API,第50个时网络超时,脚本退出。前49个成功,后50个没执行。没有事务回滚。

场景二:跨平台操作,部分成功

picture.image

TEMU上架商品成功后,需要同步在拼多多也上架。TEMU成功了,拼多多接口报错。两边状态不一致。

场景三:异步回调丢失

订单发货后,平台会回调通知。回调丢了,系统以为没发货,重复发一次。

场景四:节点宕机导致状态未更新

picture.image

任务正在执行,节点突然宕机。任务标记为“运行中”,实际上已经部分完成但状态没更新到数据库。

这些问题的共同点是:自动化系统假设每个操作都是原子且可靠的,但现实不是。

解决方案不是让每个操作都成功,而是让系统能检测到不一致自动补偿


picture.image

picture.image

二、设计原则:最终一致性 + 可补偿操作

我们放弃了强一致性(实时事务),因为跨系统、跨平台不可能做到。

采用最终一致性:允许短暂的不一致,但系统会通过异步对账和补偿机制,在可接受的时间窗口内(通常是几分钟到几小时)修复。

核心要求:

  1. 每个写操作都要有幂等键(之前文章讲过)
  2. 每个批量操作都要有“快照”:记录操作前状态和目标状态
  3. 每个状态变更都要有可审计的日志:知道谁、什么时候、把什么从A改成了B
  4. 补偿操作必须是可逆的:能回滚或能正向修复

三、操作日志与快照设计

所有对店铺数据的写操作(上架、下架、改价、修改库存、发货),都先记录一条操作日志。

# operation_log.py
from datetime import datetime
import json

class OperationLog:
    def __init__(self, db_conn):
        self.db = db_conn
        
    def create(self, op_id: str, shop_id: str, op_type: str, target_ids: list, before_state: dict, after_state: dict):
        """记录一个操作的前后状态"""
        sql = """
        INSERT INTO operation_logs 
        (op_id, shop_id, op_type, target_ids, before_state, after_state, status, created_at)
        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
        """
        self.db.execute(sql, (
            op_id, shop_id, op_type, 
            json.dumps(target_ids),
            json.dumps(before_state),
            json.dumps(after_state),
            'pending',
            datetime.now()
        ))
        self.db.commit()
        
    def update_status(self, op_id: str, status: str, error_msg: str = None):
        self.db.execute(
            "UPDATE operation_logs SET status = %s, error_msg = %s, finished_at = %s WHERE op_id = %s",
            (status, error_msg, datetime.now(), op_id)
        )
        self.db.commit()
        
    def get_pending_ops(self, older_than_minutes=5):
        """获取长时间未完成的操作(可能节点挂了)"""
        sql = """
        SELECT * FROM operation_logs 
        WHERE status = 'pending' AND created_at < %s
        """
        cutoff = datetime.now() - timedelta(minutes=older_than_minutes)
        return self.db.query(sql, (cutoff,))

执行批量操作时:

def batch_update_prices(shop_id, product_ids, new_price):
    op_id = str(uuid.uuid4())
    # 记录操作前的价格快照
    before = {pid: get_current_price(shop_id, pid) for pid in product_ids}
    after = {pid: new_price for pid in product_ids}
    op_log.create(op_id, shop_id, 'batch_update_price', product_ids, before, after)
    
    success_list = []
    fail_list = []
    for pid in product_ids:
        try:
            update_price(shop_id, pid, new_price)
            success_list.append(pid)
        except Exception as e:
            fail_list.append((pid, str(e)))
            
    if fail_list:
        # 部分失败,更新操作日志状态为 partial
        op_log.update_status(op_id, 'partial', json.dumps(fail_list))
        # 触发补偿:回滚已成功的部分?
        # 我们选择不自动回滚,而是让对账任务去修复
    else:
        op_log.update_status(op_id, 'success')

有了操作日志,对账任务就可以知道哪些操作没有完全成功。


四、对账任务:发现不一致

我们设计了多种对账任务,定时执行。

类型一:操作日志对账

扫描状态为pendingpartial的操作日志,如果创建时间超过5分钟且没有新的重试,认为可能失败。触发补偿重试或人工介入。

类型二:数量对账

比较本地数据库和平台API返回的商品数量、订单数量。如果数量不一致,触发全量同步。

类型三:字段对账

随机抽取部分商品,比对本地数据库中的价格、库存与平台实际值。发现差异则告警。

类型四:订单状态对账

本地ERP中的订单状态与店铺后台的状态对比。最常见的场景:本地标记“已发货”,店铺后台还是“待发货”。

# reconciliation.py
class OrderReconciler:
    def __init__(self, erp_api, shop_api):
        self.erp = erp_api
        self.shop = shop_api
        
    def reconcile_orders(self, shop_id, date_from, date_to):
        # 获取ERP中的订单状态
        erp_orders = self.erp.get_orders(shop_id, date_from, date_to)
        # 获取店铺后台的订单状态
        shop_orders = self.shop.get_orders(shop_id, date_from, date_to)
        
        mismatches = []
        for order_id, erp_status in erp_orders.items():
            shop_status = shop_orders.get(order_id)
            if shop_status != erp_status:
                mismatches.append({
                    "order_id": order_id,
                    "erp_status": erp_status,
                    "shop_status": shop_status
                })
        return mismatches

对账任务发现不一致后,不是直接修复,而是生成一个“差异报告”,由补偿策略决定如何处理。


五、补偿机制设计

补偿分为三种级别:

自动正向补偿:对于明确可自动修复的差异,系统自动执行补偿操作。

自动逆向补偿:回滚已执行的错误操作。

人工介入补偿:系统无法确定的,生成工单等待人工处理。

正向补偿示例:订单状态不一致

# compensator.py
class OrderStatusCompensator:
    def __init__(self, erp_api, shop_api):
        self.erp = erp_api
        self.shop = shop_api
        
    def compensate(self, mismatches):
        actions = []
        for m in mismatches:
            # 策略:以ERP状态为准,更新店铺状态
            if m["erp_status"] == "shipped" and m["shop_status"] == "pending":
                actions.append(("ship_order", m["order_id"]))
            elif m["erp_status"] == "delivered" and m["shop_status"] == "shipped":
                # 不需要操作,平台会自动更新
                pass
            elif m["erp_status"] == "cancelled" and m["shop_status"] != "cancelled":
                actions.append(("cancel_order", m["order_id"]))
        # 执行补偿操作(带幂等)
        for action, order_id in actions:
            self._execute_compensation(action, order_id)

逆向补偿示例:批量改价部分成功

如果一个批量改价操作失败了30%,我们选择不自动回滚已成功的部分(因为回滚可能引起价格波动),而是:

  1. 标记该操作日志为“需人工审核”
  2. 将失败的商品列表单独拿出来,重试改价
  3. 如果重试仍然失败,发出告警,由运营决定是继续手动改还是全部回滚

这里的关键是:补偿不是简单的“撤销”,而是“修复到期望状态”


六、分布式事务的简化版:Saga模式

跨平台的组合操作,比如“在TEMU上架商品,同时在拼多多上架”。我们实现了Saga模式的简化版。

Saga将一个长事务拆分成多个本地事务,每个事务有对应的补偿事务。

# saga.py
class SagaOrchestrator:
    def __init__(self):
        self.steps = []  # (action, compensate_action)
        
    def add_step(self, action, compensate):
        self.steps.append((action, compensate))
        
    def execute(self):
        executed = []
        for i, (action, compensate) in enumerate(self.steps):
            try:
                result = action()
                executed.append((i, compensate, result))
            except Exception as e:
                # 执行失败,开始补偿已执行的步骤(逆序)
                for j in range(len(executed)-1, -1, -1):
                    _, comp, _ = executed[j]
                    try:
                        comp()
                    except Exception as comp_e:
                        # 补偿失败,记录到死信队列
                        self._handle_compensation_failure(comp, comp_e)
                raise e
        return True
        
# 使用示例
saga = SagaOrchestrator()
saga.add_step(
    action=lambda: temu_api.upload_product(data),
    compensate=lambda: temu_api.delete_product(product_id)
)
saga.add_step(
    action=lambda: pdd_api.upload_product(data),
    compensate=lambda: pdd_api.delete_product(product_id)
)
saga.execute()

实际操作中,补偿不一定是删除(因为可能已经产生了订单),可能只是标记为“不同步”或下架。我们根据业务敏感度设计补偿逻辑。


七、对账任务的调度与避让

对账任务不能影响正常业务。我们做了几个设计:

  • 低峰期执行:对账任务默认在凌晨2点-5点执行
  • 随机延迟:避免所有店铺同时触发对账打爆API
  • 限流:每秒最多10个请求
  • 熔断:如果对账任务连续失败3次,暂停该店铺对账并告警
class ReconciliationScheduler:
    def __init__(self):
        self.rate_limiter = RateLimiter(max_qps=10)
        
    def run_for_shop(self, shop_id):
        if not self._is_low_peak():
            return
        if self._is_recently_failed(shop_id):
            return
        with self.rate_limiter:
            mismatches = self.reconciler.reconcile(shop_id)
            if mismatches:
                self.compensator.handle(shop_id, mismatches)

我们还区分了“关键对账”和“常规对账”。订单状态对账每1小时执行一次;商品数量对账每24小时一次;价格对账每6小时抽样一次。


八、真实踩坑记录

坑1:补偿操作没有幂等,导致重复补偿

自动补偿执行时,如果补偿脚本本身没有幂等,可能出现多次补偿把状态改乱。比如重复发货、重复退款。

解决:所有补偿操作必须检查当前状态,只在不一致时执行。并且补偿操作自身也要有幂等键。

坑2:对账任务误报

平台有缓存,刚改完价格立即去查,返回的还是旧值。对账任务误以为不一致。

解决:对账任务在查询前,对于刚完成的写操作(5分钟内)跳过不比对。或者使用平台的实时查询接口绕过缓存。

坑3:补偿操作触发新的补偿,形成循环

订单状态补偿将ERP状态同步到店铺,但店铺的回调又更新ERP,两边互相覆盖。

解决:在操作日志中记录来源,补偿操作更新的数据不再触发新的对账(设置一个标志位)。

坑4:操作日志表无限增长

每天产生几十万条操作日志,半年后几千万条。

解决:定期归档。成功的日志保留30天,失败的保留90天。归档到对象存储后删除原表数据。


九、数据一致性巡检看板

我们把对账结果做成一个看板,运营可以直观看到:

  • 各店铺的数据一致性得分(一致订单数/总订单数)
  • 待补偿的差异数量及类型分布
  • 补偿操作的成功率与耗时
  • 操作日志中pending/partial的任务列表

当某个店铺的一致性得分低于95%时,自动标记为“需要关注”。运营可以直接在界面上触发手动对账或手动补偿。

有一次运营发现一个店铺的订单状态一致率只有60%,点进去看,发现是ERP的物流同步接口坏了三天。如果不是这个看板,可能客户投诉了才发现。


十、总结:从“执行正确”到“结果正确”

自动化系统的目标不是“执行正确”,而是“结果正确”。

执行正确要求每一步都不出错,这在复杂环境中几乎做不到。结果正确允许中间出错,但最终通过检测和补偿达到一致。

店群自动化引入数据一致性体系后,我们最大的变化是:

  • 不再追求单个操作的100%成功率
  • 把精力花在对账和补偿的健壮性上
  • 运营可以接受短暂的不一致,只要知道系统会自动修复

这套体系不是银弹,它增加了系统复杂度(操作日志、对账任务、补偿脚本)。但对于多平台、多店铺的规模化运营,这笔投入是值得的。

如果你在运行超过20个店铺,建议至少实现操作日志和订单对账。成本不高,但能省下大量人工排查数据不一致的时间。


作者:林焱

0
0
0
0
评论
未登录
暂无评论