影刀RPA多平台店群数据同步架构:最终一致性设计与跨平台对账实战

影刀RPA多平台店群数据同步架构:最终一致性设计与跨平台对账实战

做跨平台店群,最头疼的问题不是上架,而是数据同步

同一个商品,在拼多多、TEMU、TikTok Shop上都要卖。库存是共享的,价格可能同步调整,订单需要在多个平台之间做状态回传。一个平台的库存卖完了,另外两个平台必须及时下架或改库存,否则就会超卖。

我们早期用简单粗暴的方式:每个平台的脚本独立操作数据库。结果经常出现拼多多库存已经扣减了,TEMU还没同步,导致超卖了30单,赔了不少钱。

后来我们设计了一套跨平台数据同步与最终一致性架构,基于事件驱动和定时对账,保证多平台数据最终一致。

这篇文章不讲单个脚本的优化,也不讲调度。专门聊聊店群系统中跨平台数据同步的工程实践:如何避免超卖、如何实现价格统一、如何做跨平台订单状态回传,以及如何用对账发现不一致。

适用场景:同一商品多平台销售、共享库存池的店群项目。 技术栈:事件溯源、本地消息表、跨平台对账、分布式事务补偿。


一、跨平台数据同步的三大挑战

先还原一个真实场景:某商品在拼多多、TEMU同时销售,总库存100件。

挑战一:库存扣减的原子性

picture.image 拼多多上卖出一件,需要扣减总库存1,同时通知TEMU端库存减1。如果拼多多扣减成功,但通知TEMU失败,就产生数据不一致。

挑战二:价格同步的时效性

运营在后台批量修改商品价格,需要同时更新三个平台的售价。若部分平台更新失败,会导致同一商品在不同平台价格不同,引起买家投诉。

picture.image

picture.image 挑战三:订单状态回传的可靠性

订单发货后,需要回传给ERP和财务系统。由于网络抖动,可能丢失回传消息。

解决这些问题的核心思想:放弃强一致性,拥抱最终一致性

picture.image

二、整体架构:本地消息表 + 异步对账

我们采用经典的本地消息表模式,保证每个操作要么全部成功,要么可重试直到成功。

每个平台适配器在执行写操作时(如扣减库存),不直接调用其他平台API,而是向本地消息表插入一条“待同步任务”。后台Worker扫描消息表,异步执行跨平台调用,失败则重试。

picture.image

picture.image 架构图(文字描述):

拼多多订单 -> 扣减总库存 -> 写入本地消息表 {target: "temu", action: "dec_stock", amount:1}

后台Worker:
    - 扫描状态为pending的消息
    - 调用TEMU API扣减库存
    - 成功则更新消息状态为done,失败则增加重试次数,延迟重试

同时,配套定时对账任务:每天凌晨比对各平台的实际库存与中央库存,发现不一致则触发补偿。


三、本地消息表的设计与实现

消息表结构:

CREATE TABLE cross_platform_sync_task (
    id BIGINT AUTO_INCREMENT PRIMARY KEY,
    task_id VARCHAR(64) NOT NULL COMMENT '全局唯一任务ID',
    source_platform VARCHAR(16) NOT NULL,
    target_platform VARCHAR(16) NOT NULL,
    action_type VARCHAR(32) NOT NULL,   -- dec_stock, sync_price, sync_order_status
    payload JSON NOT NULL,               -- 操作参数,如 {sku_id: "123", delta: -1}
    status VARCHAR(16) DEFAULT 'pending', -- pending, processing, done, failed
    retry_count INT DEFAULT 0,
    max_retries INT DEFAULT 5,
    next_retry_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_status_next_retry (status, next_retry_time)
);

操作示例:拼多多产生订单后,需要扣减TEMU库存。

# 在拼多多订单处理脚本中
def handle_pdd_order(order):
    # 1. 扣减中央库存(数据库原子操作)
    with db.transaction():
        db.execute("UPDATE central_inventory SET stock = stock - 1 WHERE sku_id = %s", sku_id)
        # 2. 插入同步任务
        db.execute("""
            INSERT INTO cross_platform_sync_task 
            (task_id, source_platform, target_platform, action_type, payload, next_retry_time)
            VALUES (%s, %s, %s, %s, %s, NOW())
        """, (uuid4(), "pdd", "temu", "dec_stock", json.dumps({"sku_id": sku_id, "delta": -1})))
    # 事务提交后,Worker异步执行同步

后台Worker(独立服务)轮询任务表:

# sync_worker.py
class SyncWorker:
    def __init__(self):
        self.db = db_connection
        self.adapters = {"temu": TemuAdapter(), "pdd": PddAdapter()}
        
    def run(self):
        while True:
            # 取出待处理任务(使用FOR UPDATE SKIP LOCKED避免多worker冲突)
            tasks = self.db.query("""
                SELECT * FROM cross_platform_sync_task
                WHERE status = 'pending' AND next_retry_time <= NOW()
                ORDER BY created_at
                LIMIT 10
                FOR UPDATE SKIP LOCKED
            """)
            for task in tasks:
                self.process_task(task)
            time.sleep(1)
            
    def process_task(self, task):
        # 标记为processing
        self.db.execute("UPDATE cross_platform_sync_task SET status='processing' WHERE id=%s", task.id)
        try:
            adapter = self.adapters[task.target_platform]
            if task.action_type == "dec_stock":
                payload = json.loads(task.payload)
                adapter.decrement_stock(payload["sku_id"], payload["delta"])
            elif task.action_type == "sync_price":
                # ...
            # 成功,标记done
            self.db.execute("UPDATE cross_platform_sync_task SET status='done' WHERE id=%s", task.id)
        except Exception as e:
            # 失败,增加重试计数
            new_retry = task.retry_count + 1
            if new_retry >= task.max_retries:
                self.db.execute("UPDATE cross_platform_sync_task SET status='failed' WHERE id=%s", task.id)
                self.alert(f"同步任务最终失败: {task.id}")
            else:
                # 指数退避计算下次重试时间
                delay = 2 ** new_retry  # 2,4,8,16,32秒
                next_time = datetime.now() + timedelta(seconds=delay)
                self.db.execute("""
                    UPDATE cross_platform_sync_task 
                    SET retry_count=%s, next_retry_time=%s, status='pending'
                    WHERE id=%s
                """, (new_retry, next_time, task.id))

这套机制保证了即使TEMU API短暂故障,任务会自动重试,最终成功。


四、库存同步的特殊处理:预占与释放

库存同步比普通数据同步更敏感。我们引入了预占机制

当拼多多产生订单时,不立即扣减中央库存,而是先“预占”1件。预占期间,TEMU端仍然可以看到库存(避免买家看到库存不变),但中央库存实际已减少。

预占超时(如30分钟)未确认,则释放回库存。

def reserve_stock(sku_id, quantity):
    with db.transaction():
        # 更新中央库存:可用库存减少,预占库存增加
        db.execute("""
            UPDATE central_inventory 
            SET reserved = reserved + %s, available = available - %s
            WHERE sku_id = %s AND available >= %s
        """, (quantity, quantity, sku_id, quantity))
        if affected_rows == 0:
            raise OutOfStockError()
        # 记录预占单
        db.execute("INSERT INTO stock_reservations (sku_id, quantity, expires_at) VALUES (%s, %s, NOW() + INTERVAL 30 MINUTE)", (sku_id, quantity))

订单支付成功后,将预占转为实际扣减,并触发跨平台同步。

如果订单超时未支付,自动释放预占(库存回滚)。


五、价格同步的版本控制

价格同步的挑战在于:多个操作可能并发修改同一商品价格。我们使用乐观锁 + 版本号。

每个商品在中央数据库有一个price_version。运营修改价格时,版本号+1。同步到各平台时,携带版本号。如果目标平台已有更高版本号,则跳过,避免旧覆盖新。

# 价格更新流程
def update_price(sku_id, new_price, operator):
    with db.transaction():
        old_version = db.query("SELECT price_version FROM central_product WHERE sku_id=%s", sku_id)
        new_version = old_version + 1
        db.execute("UPDATE central_product SET price=%s, price_version=%s WHERE sku_id=%s", (new_price, new_version, sku_id))
        # 写入同步任务,携带版本号
        db.insert("cross_platform_sync_task", {
            "action_type": "sync_price",
            "payload": json.dumps({"sku_id": sku_id, "price": new_price, "version": new_version})
        })

同步到TEMU时,先查询TEMU当前价格版本(需平台支持或本地存储)。若版本小于等于传入版本,则更新;否则跳过。

对于不支持版本查询的平台,我们采用“最后一次成功时间”比较:只同步比上次成功时间更新的价格变更。


六、订单状态跨平台回传

多平台订单需要在ERP系统中统一处理。当订单在ERP中发货后,需要回传到各个平台更新状态。

我们同样使用本地消息表,但增加了“幂等键”,防止重复回传。

{
  "idempotency_key": "order_shipped_12345",
  "platform": "pdd",
  "order_id": "pdd_order_678",
  "action": "ship",
  "carrier": "sf",
  "tracking_no": "SF123456"
}

回传API调用时,在请求头中加入幂等键,平台端(或我们的适配器)保证同一幂等键只处理一次。


七、跨平台对账:发现不一致的兜底

即使有了同步机制,仍可能因网络、bug产生不一致。我们每天凌晨运行对账任务。

对账逻辑:

  • 获取中央库存表所有SKU的库存值
  • 并发调用各平台API,获取各平台的实际库存
  • 比对差异,生成差异报告
  • 自动触发补偿同步(或者人工审核)
# reconciliation.py
def reconcile_inventory():
    central = get_central_inventory()
    for sku in central:
        pdd_stock = get_pdd_stock(sku.sku_id)
        temu_stock = get_temu_stock(sku.sku_id)
        if pdd_stock != sku.available:
            diff = pdd_stock - sku.available
            if abs(diff) <= 2:
                # 小差异自动纠正:以中央为准
                set_pdd_stock(sku.sku_id, sku.available)
            else:
                alert(f"库存差异过大: sku {sku.sku_id}, 中央={sku.available}, 拼多多={pdd_stock}")

对于价格、商品状态等其他数据,采用抽样对账(效率考虑)。

对账结果存储到ClickHouse,生成趋势图,帮助发现长期不一致的模式。


八、真实踩坑记录

坑1:消息表积压导致同步延迟

高峰期订单量暴增,Worker处理不过来,消息表积压了几万条,TEMU库存更新延迟了10分钟,导致超卖。

解决:Worker水平扩展(多实例),每个实例按目标平台分片(shard by target_platform)。同时增加优先级队列:库存同步任务优先级最高,价格同步次之。

坑2:幂等键设计不合理导致重复发货

回传发货状态时,幂等键只用了order_id,没有包含action。第二次重试时,同一订单又触发发货API(平台侧幂等实现不完善),导致重复发货。

解决:幂等键采用order_id:action:timestamp,或使用全局唯一递增版本号。

坑3:对账任务执行时间过长,影响白天业务

全量对账扫描几十万SKU,调用API被限流,持续几个小时。

解决:改为增量对账——只对账最近7天有库存变动的SKU。每周一次全量对账放在周末凌晨。

坑4:跨平台时区问题导致订单状态回传时序错乱

拼多多订单创建时间使用北京时间,TEMU使用UTC,回传时时间比对错误。

解决:统一在中央数据库存储UTC时间,适配器做转换。


九、从最终一致到实时一致(混合架构)

对于库存这种高敏感数据,最终一致性的短暂窗口(秒级到分钟级)仍可能导致超卖。我们引入了一种实时降级方案

当拼多多订单产生时,先尝试实时同步TEMU库存(同步调用,超时1秒)。如果成功,则正常扣减;如果超时,再走异步消息表,但中央库存先扣减(承担风险,概率低)。

同时,在TEMU端设置“安全库存”缓冲(比中央库存少3-5件),即使有短暂不一致,也不会超卖。


十、总结:同步系统的工程原则

跨平台数据同步是店群系统最复杂的部分之一。我们的经验可以总结为几条原则:

  1. 本地消息表是基础:保证至少一次投递,配合幂等实现精确一次。
  2. 异步+重试兜底:临时故障靠重试,永久故障靠人工。
  3. 对账是最后防线:不管设计多好,都要有对账发现漏网之鱼。
  4. 敏感数据用预占:库存先预占再确认,避免超卖。
  5. 版本号解决冲突:价格、商品信息用乐观锁。

不需要一开始就做所有优化。从最简单的本地消息表开始,逐步加上重试、对账、预占。每次扩容遇到问题,再迭代。

希望这篇文章能帮你避免我们在跨平台同步上踩过的坑。


作者:林焱

0
0
0
0
评论
未登录
暂无评论