店群自动化做到多平台、多店铺时,会遇到一个很隐蔽的问题:
数据不一致。
同一个商品,在拼多多上已经下架了,但TEMU上还是上架状态。订单已经在ERP里发货了,但店铺后台没同步。批量改价改了50个商品,第30个时脚本报错退出,前29个改了,后面21个没改。
这些问题比脚本报错更麻烦。报错你能看见,数据不一致你可能过好几天才发现。
我们早期处理这些问题的思路是“尽量避免”。后来发现避免不了——网络会抖、平台会限流、节点会宕机。
于是我们设计了一套数据一致性保障与事务补偿机制。
这篇文章不讲调度,也不讲监控。专门聊聊如何让店群系统在出现异常后,能够自动发现不一致并修复,最终达到最终一致性。
适用场景:多平台、多店铺的批量操作、订单同步、库存管理。 技术栈:Python + Redis + 对账任务 + 补偿脚本 + 操作日志。
一、数据不一致是怎么产生的?
先列举几种典型场景。
场景一:批量操作中途失败
批量修改100个商品的价格。脚本循环调用API,第50个时网络超时,脚本退出。前49个成功,后50个没执行。没有事务回滚。
场景二:跨平台操作,部分成功
TEMU上架商品成功后,需要同步在拼多多也上架。TEMU成功了,拼多多接口报错。两边状态不一致。
场景三:异步回调丢失
订单发货后,平台会回调通知。回调丢了,系统以为没发货,重复发一次。
场景四:节点宕机导致状态未更新
任务正在执行,节点突然宕机。任务标记为“运行中”,实际上已经部分完成但状态没更新到数据库。
这些问题的共同点是:自动化系统假设每个操作都是原子且可靠的,但现实不是。
解决方案不是让每个操作都成功,而是让系统能检测到不一致并自动补偿。
二、设计原则:最终一致性 + 可补偿操作
我们放弃了强一致性(实时事务),因为跨系统、跨平台不可能做到。
采用最终一致性:允许短暂的不一致,但系统会通过异步对账和补偿机制,在可接受的时间窗口内(通常是几分钟到几小时)修复。
核心要求:
- 每个写操作都要有幂等键(之前文章讲过)
- 每个批量操作都要有“快照”:记录操作前状态和目标状态
- 每个状态变更都要有可审计的日志:知道谁、什么时候、把什么从A改成了B
- 补偿操作必须是可逆的:能回滚或能正向修复
三、操作日志与快照设计
所有对店铺数据的写操作(上架、下架、改价、修改库存、发货),都先记录一条操作日志。
# operation_log.py
from datetime import datetime
import json
class OperationLog:
def __init__(self, db_conn):
self.db = db_conn
def create(self, op_id: str, shop_id: str, op_type: str, target_ids: list, before_state: dict, after_state: dict):
"""记录一个操作的前后状态"""
sql = """
INSERT INTO operation_logs
(op_id, shop_id, op_type, target_ids, before_state, after_state, status, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
self.db.execute(sql, (
op_id, shop_id, op_type,
json.dumps(target_ids),
json.dumps(before_state),
json.dumps(after_state),
'pending',
datetime.now()
))
self.db.commit()
def update_status(self, op_id: str, status: str, error_msg: str = None):
self.db.execute(
"UPDATE operation_logs SET status = %s, error_msg = %s, finished_at = %s WHERE op_id = %s",
(status, error_msg, datetime.now(), op_id)
)
self.db.commit()
def get_pending_ops(self, older_than_minutes=5):
"""获取长时间未完成的操作(可能节点挂了)"""
sql = """
SELECT * FROM operation_logs
WHERE status = 'pending' AND created_at < %s
"""
cutoff = datetime.now() - timedelta(minutes=older_than_minutes)
return self.db.query(sql, (cutoff,))
执行批量操作时:
def batch_update_prices(shop_id, product_ids, new_price):
op_id = str(uuid.uuid4())
# 记录操作前的价格快照
before = {pid: get_current_price(shop_id, pid) for pid in product_ids}
after = {pid: new_price for pid in product_ids}
op_log.create(op_id, shop_id, 'batch_update_price', product_ids, before, after)
success_list = []
fail_list = []
for pid in product_ids:
try:
update_price(shop_id, pid, new_price)
success_list.append(pid)
except Exception as e:
fail_list.append((pid, str(e)))
if fail_list:
# 部分失败,更新操作日志状态为 partial
op_log.update_status(op_id, 'partial', json.dumps(fail_list))
# 触发补偿:回滚已成功的部分?
# 我们选择不自动回滚,而是让对账任务去修复
else:
op_log.update_status(op_id, 'success')
有了操作日志,对账任务就可以知道哪些操作没有完全成功。
四、对账任务:发现不一致
我们设计了多种对账任务,定时执行。
类型一:操作日志对账
扫描状态为pending或partial的操作日志,如果创建时间超过5分钟且没有新的重试,认为可能失败。触发补偿重试或人工介入。
类型二:数量对账
比较本地数据库和平台API返回的商品数量、订单数量。如果数量不一致,触发全量同步。
类型三:字段对账
随机抽取部分商品,比对本地数据库中的价格、库存与平台实际值。发现差异则告警。
类型四:订单状态对账
本地ERP中的订单状态与店铺后台的状态对比。最常见的场景:本地标记“已发货”,店铺后台还是“待发货”。
# reconciliation.py
class OrderReconciler:
def __init__(self, erp_api, shop_api):
self.erp = erp_api
self.shop = shop_api
def reconcile_orders(self, shop_id, date_from, date_to):
# 获取ERP中的订单状态
erp_orders = self.erp.get_orders(shop_id, date_from, date_to)
# 获取店铺后台的订单状态
shop_orders = self.shop.get_orders(shop_id, date_from, date_to)
mismatches = []
for order_id, erp_status in erp_orders.items():
shop_status = shop_orders.get(order_id)
if shop_status != erp_status:
mismatches.append({
"order_id": order_id,
"erp_status": erp_status,
"shop_status": shop_status
})
return mismatches
对账任务发现不一致后,不是直接修复,而是生成一个“差异报告”,由补偿策略决定如何处理。
五、补偿机制设计
补偿分为三种级别:
自动正向补偿:对于明确可自动修复的差异,系统自动执行补偿操作。
自动逆向补偿:回滚已执行的错误操作。
人工介入补偿:系统无法确定的,生成工单等待人工处理。
正向补偿示例:订单状态不一致
# compensator.py
class OrderStatusCompensator:
def __init__(self, erp_api, shop_api):
self.erp = erp_api
self.shop = shop_api
def compensate(self, mismatches):
actions = []
for m in mismatches:
# 策略:以ERP状态为准,更新店铺状态
if m["erp_status"] == "shipped" and m["shop_status"] == "pending":
actions.append(("ship_order", m["order_id"]))
elif m["erp_status"] == "delivered" and m["shop_status"] == "shipped":
# 不需要操作,平台会自动更新
pass
elif m["erp_status"] == "cancelled" and m["shop_status"] != "cancelled":
actions.append(("cancel_order", m["order_id"]))
# 执行补偿操作(带幂等)
for action, order_id in actions:
self._execute_compensation(action, order_id)
逆向补偿示例:批量改价部分成功
如果一个批量改价操作失败了30%,我们选择不自动回滚已成功的部分(因为回滚可能引起价格波动),而是:
- 标记该操作日志为“需人工审核”
- 将失败的商品列表单独拿出来,重试改价
- 如果重试仍然失败,发出告警,由运营决定是继续手动改还是全部回滚
这里的关键是:补偿不是简单的“撤销”,而是“修复到期望状态”。
六、分布式事务的简化版:Saga模式
跨平台的组合操作,比如“在TEMU上架商品,同时在拼多多上架”。我们实现了Saga模式的简化版。
Saga将一个长事务拆分成多个本地事务,每个事务有对应的补偿事务。
# saga.py
class SagaOrchestrator:
def __init__(self):
self.steps = [] # (action, compensate_action)
def add_step(self, action, compensate):
self.steps.append((action, compensate))
def execute(self):
executed = []
for i, (action, compensate) in enumerate(self.steps):
try:
result = action()
executed.append((i, compensate, result))
except Exception as e:
# 执行失败,开始补偿已执行的步骤(逆序)
for j in range(len(executed)-1, -1, -1):
_, comp, _ = executed[j]
try:
comp()
except Exception as comp_e:
# 补偿失败,记录到死信队列
self._handle_compensation_failure(comp, comp_e)
raise e
return True
# 使用示例
saga = SagaOrchestrator()
saga.add_step(
action=lambda: temu_api.upload_product(data),
compensate=lambda: temu_api.delete_product(product_id)
)
saga.add_step(
action=lambda: pdd_api.upload_product(data),
compensate=lambda: pdd_api.delete_product(product_id)
)
saga.execute()
实际操作中,补偿不一定是删除(因为可能已经产生了订单),可能只是标记为“不同步”或下架。我们根据业务敏感度设计补偿逻辑。
七、对账任务的调度与避让
对账任务不能影响正常业务。我们做了几个设计:
- 低峰期执行:对账任务默认在凌晨2点-5点执行
- 随机延迟:避免所有店铺同时触发对账打爆API
- 限流:每秒最多10个请求
- 熔断:如果对账任务连续失败3次,暂停该店铺对账并告警
class ReconciliationScheduler:
def __init__(self):
self.rate_limiter = RateLimiter(max_qps=10)
def run_for_shop(self, shop_id):
if not self._is_low_peak():
return
if self._is_recently_failed(shop_id):
return
with self.rate_limiter:
mismatches = self.reconciler.reconcile(shop_id)
if mismatches:
self.compensator.handle(shop_id, mismatches)
我们还区分了“关键对账”和“常规对账”。订单状态对账每1小时执行一次;商品数量对账每24小时一次;价格对账每6小时抽样一次。
八、真实踩坑记录
坑1:补偿操作没有幂等,导致重复补偿
自动补偿执行时,如果补偿脚本本身没有幂等,可能出现多次补偿把状态改乱。比如重复发货、重复退款。
解决:所有补偿操作必须检查当前状态,只在不一致时执行。并且补偿操作自身也要有幂等键。
坑2:对账任务误报
平台有缓存,刚改完价格立即去查,返回的还是旧值。对账任务误以为不一致。
解决:对账任务在查询前,对于刚完成的写操作(5分钟内)跳过不比对。或者使用平台的实时查询接口绕过缓存。
坑3:补偿操作触发新的补偿,形成循环
订单状态补偿将ERP状态同步到店铺,但店铺的回调又更新ERP,两边互相覆盖。
解决:在操作日志中记录来源,补偿操作更新的数据不再触发新的对账(设置一个标志位)。
坑4:操作日志表无限增长
每天产生几十万条操作日志,半年后几千万条。
解决:定期归档。成功的日志保留30天,失败的保留90天。归档到对象存储后删除原表数据。
九、数据一致性巡检看板
我们把对账结果做成一个看板,运营可以直观看到:
- 各店铺的数据一致性得分(一致订单数/总订单数)
- 待补偿的差异数量及类型分布
- 补偿操作的成功率与耗时
- 操作日志中pending/partial的任务列表
当某个店铺的一致性得分低于95%时,自动标记为“需要关注”。运营可以直接在界面上触发手动对账或手动补偿。
有一次运营发现一个店铺的订单状态一致率只有60%,点进去看,发现是ERP的物流同步接口坏了三天。如果不是这个看板,可能客户投诉了才发现。
十、总结:从“执行正确”到“结果正确”
自动化系统的目标不是“执行正确”,而是“结果正确”。
执行正确要求每一步都不出错,这在复杂环境中几乎做不到。结果正确允许中间出错,但最终通过检测和补偿达到一致。
店群自动化引入数据一致性体系后,我们最大的变化是:
- 不再追求单个操作的100%成功率
- 把精力花在对账和补偿的健壮性上
- 运营可以接受短暂的不一致,只要知道系统会自动修复
这套体系不是银弹,它增加了系统复杂度(操作日志、对账任务、补偿脚本)。但对于多平台、多店铺的规模化运营,这笔投入是值得的。
如果你在运行超过20个店铺,建议至少实现操作日志和订单对账。成本不高,但能省下大量人工排查数据不一致的时间。
作者:林焱
