做跨平台店群,最头疼的问题不是上架,而是数据同步。
同一个商品,在拼多多、TEMU、TikTok Shop上都要卖。库存是共享的,价格可能同步调整,订单需要在多个平台之间做状态回传。一个平台的库存卖完了,另外两个平台必须及时下架或改库存,否则就会超卖。
我们早期用简单粗暴的方式:每个平台的脚本独立操作数据库。结果经常出现拼多多库存已经扣减了,TEMU还没同步,导致超卖了30单,赔了不少钱。
后来我们设计了一套跨平台数据同步与最终一致性架构,基于事件驱动和定时对账,保证多平台数据最终一致。
这篇文章不讲单个脚本的优化,也不讲调度。专门聊聊店群系统中跨平台数据同步的工程实践:如何避免超卖、如何实现价格统一、如何做跨平台订单状态回传,以及如何用对账发现不一致。
适用场景:同一商品多平台销售、共享库存池的店群项目。 技术栈:事件溯源、本地消息表、跨平台对账、分布式事务补偿。
一、跨平台数据同步的三大挑战
先还原一个真实场景:某商品在拼多多、TEMU同时销售,总库存100件。
挑战一:库存扣减的原子性
拼多多上卖出一件,需要扣减总库存1,同时通知TEMU端库存减1。如果拼多多扣减成功,但通知TEMU失败,就产生数据不一致。
挑战二:价格同步的时效性
运营在后台批量修改商品价格,需要同时更新三个平台的售价。若部分平台更新失败,会导致同一商品在不同平台价格不同,引起买家投诉。
挑战三:订单状态回传的可靠性
订单发货后,需要回传给ERP和财务系统。由于网络抖动,可能丢失回传消息。
解决这些问题的核心思想:放弃强一致性,拥抱最终一致性。
二、整体架构:本地消息表 + 异步对账
我们采用经典的本地消息表模式,保证每个操作要么全部成功,要么可重试直到成功。
每个平台适配器在执行写操作时(如扣减库存),不直接调用其他平台API,而是向本地消息表插入一条“待同步任务”。后台Worker扫描消息表,异步执行跨平台调用,失败则重试。
架构图(文字描述):
拼多多订单 -> 扣减总库存 -> 写入本地消息表 {target: "temu", action: "dec_stock", amount:1}
后台Worker:
- 扫描状态为pending的消息
- 调用TEMU API扣减库存
- 成功则更新消息状态为done,失败则增加重试次数,延迟重试
同时,配套定时对账任务:每天凌晨比对各平台的实际库存与中央库存,发现不一致则触发补偿。
三、本地消息表的设计与实现
消息表结构:
CREATE TABLE cross_platform_sync_task (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
task_id VARCHAR(64) NOT NULL COMMENT '全局唯一任务ID',
source_platform VARCHAR(16) NOT NULL,
target_platform VARCHAR(16) NOT NULL,
action_type VARCHAR(32) NOT NULL, -- dec_stock, sync_price, sync_order_status
payload JSON NOT NULL, -- 操作参数,如 {sku_id: "123", delta: -1}
status VARCHAR(16) DEFAULT 'pending', -- pending, processing, done, failed
retry_count INT DEFAULT 0,
max_retries INT DEFAULT 5,
next_retry_time DATETIME DEFAULT CURRENT_TIMESTAMP,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_status_next_retry (status, next_retry_time)
);
操作示例:拼多多产生订单后,需要扣减TEMU库存。
# 在拼多多订单处理脚本中
def handle_pdd_order(order):
# 1. 扣减中央库存(数据库原子操作)
with db.transaction():
db.execute("UPDATE central_inventory SET stock = stock - 1 WHERE sku_id = %s", sku_id)
# 2. 插入同步任务
db.execute("""
INSERT INTO cross_platform_sync_task
(task_id, source_platform, target_platform, action_type, payload, next_retry_time)
VALUES (%s, %s, %s, %s, %s, NOW())
""", (uuid4(), "pdd", "temu", "dec_stock", json.dumps({"sku_id": sku_id, "delta": -1})))
# 事务提交后,Worker异步执行同步
后台Worker(独立服务)轮询任务表:
# sync_worker.py
class SyncWorker:
def __init__(self):
self.db = db_connection
self.adapters = {"temu": TemuAdapter(), "pdd": PddAdapter()}
def run(self):
while True:
# 取出待处理任务(使用FOR UPDATE SKIP LOCKED避免多worker冲突)
tasks = self.db.query("""
SELECT * FROM cross_platform_sync_task
WHERE status = 'pending' AND next_retry_time <= NOW()
ORDER BY created_at
LIMIT 10
FOR UPDATE SKIP LOCKED
""")
for task in tasks:
self.process_task(task)
time.sleep(1)
def process_task(self, task):
# 标记为processing
self.db.execute("UPDATE cross_platform_sync_task SET status='processing' WHERE id=%s", task.id)
try:
adapter = self.adapters[task.target_platform]
if task.action_type == "dec_stock":
payload = json.loads(task.payload)
adapter.decrement_stock(payload["sku_id"], payload["delta"])
elif task.action_type == "sync_price":
# ...
# 成功,标记done
self.db.execute("UPDATE cross_platform_sync_task SET status='done' WHERE id=%s", task.id)
except Exception as e:
# 失败,增加重试计数
new_retry = task.retry_count + 1
if new_retry >= task.max_retries:
self.db.execute("UPDATE cross_platform_sync_task SET status='failed' WHERE id=%s", task.id)
self.alert(f"同步任务最终失败: {task.id}")
else:
# 指数退避计算下次重试时间
delay = 2 ** new_retry # 2,4,8,16,32秒
next_time = datetime.now() + timedelta(seconds=delay)
self.db.execute("""
UPDATE cross_platform_sync_task
SET retry_count=%s, next_retry_time=%s, status='pending'
WHERE id=%s
""", (new_retry, next_time, task.id))
这套机制保证了即使TEMU API短暂故障,任务会自动重试,最终成功。
四、库存同步的特殊处理:预占与释放
库存同步比普通数据同步更敏感。我们引入了预占机制。
当拼多多产生订单时,不立即扣减中央库存,而是先“预占”1件。预占期间,TEMU端仍然可以看到库存(避免买家看到库存不变),但中央库存实际已减少。
预占超时(如30分钟)未确认,则释放回库存。
def reserve_stock(sku_id, quantity):
with db.transaction():
# 更新中央库存:可用库存减少,预占库存增加
db.execute("""
UPDATE central_inventory
SET reserved = reserved + %s, available = available - %s
WHERE sku_id = %s AND available >= %s
""", (quantity, quantity, sku_id, quantity))
if affected_rows == 0:
raise OutOfStockError()
# 记录预占单
db.execute("INSERT INTO stock_reservations (sku_id, quantity, expires_at) VALUES (%s, %s, NOW() + INTERVAL 30 MINUTE)", (sku_id, quantity))
订单支付成功后,将预占转为实际扣减,并触发跨平台同步。
如果订单超时未支付,自动释放预占(库存回滚)。
五、价格同步的版本控制
价格同步的挑战在于:多个操作可能并发修改同一商品价格。我们使用乐观锁 + 版本号。
每个商品在中央数据库有一个price_version。运营修改价格时,版本号+1。同步到各平台时,携带版本号。如果目标平台已有更高版本号,则跳过,避免旧覆盖新。
# 价格更新流程
def update_price(sku_id, new_price, operator):
with db.transaction():
old_version = db.query("SELECT price_version FROM central_product WHERE sku_id=%s", sku_id)
new_version = old_version + 1
db.execute("UPDATE central_product SET price=%s, price_version=%s WHERE sku_id=%s", (new_price, new_version, sku_id))
# 写入同步任务,携带版本号
db.insert("cross_platform_sync_task", {
"action_type": "sync_price",
"payload": json.dumps({"sku_id": sku_id, "price": new_price, "version": new_version})
})
同步到TEMU时,先查询TEMU当前价格版本(需平台支持或本地存储)。若版本小于等于传入版本,则更新;否则跳过。
对于不支持版本查询的平台,我们采用“最后一次成功时间”比较:只同步比上次成功时间更新的价格变更。
六、订单状态跨平台回传
多平台订单需要在ERP系统中统一处理。当订单在ERP中发货后,需要回传到各个平台更新状态。
我们同样使用本地消息表,但增加了“幂等键”,防止重复回传。
{
"idempotency_key": "order_shipped_12345",
"platform": "pdd",
"order_id": "pdd_order_678",
"action": "ship",
"carrier": "sf",
"tracking_no": "SF123456"
}
回传API调用时,在请求头中加入幂等键,平台端(或我们的适配器)保证同一幂等键只处理一次。
七、跨平台对账:发现不一致的兜底
即使有了同步机制,仍可能因网络、bug产生不一致。我们每天凌晨运行对账任务。
对账逻辑:
- 获取中央库存表所有SKU的库存值
- 并发调用各平台API,获取各平台的实际库存
- 比对差异,生成差异报告
- 自动触发补偿同步(或者人工审核)
# reconciliation.py
def reconcile_inventory():
central = get_central_inventory()
for sku in central:
pdd_stock = get_pdd_stock(sku.sku_id)
temu_stock = get_temu_stock(sku.sku_id)
if pdd_stock != sku.available:
diff = pdd_stock - sku.available
if abs(diff) <= 2:
# 小差异自动纠正:以中央为准
set_pdd_stock(sku.sku_id, sku.available)
else:
alert(f"库存差异过大: sku {sku.sku_id}, 中央={sku.available}, 拼多多={pdd_stock}")
对于价格、商品状态等其他数据,采用抽样对账(效率考虑)。
对账结果存储到ClickHouse,生成趋势图,帮助发现长期不一致的模式。
八、真实踩坑记录
坑1:消息表积压导致同步延迟
高峰期订单量暴增,Worker处理不过来,消息表积压了几万条,TEMU库存更新延迟了10分钟,导致超卖。
解决:Worker水平扩展(多实例),每个实例按目标平台分片(shard by target_platform)。同时增加优先级队列:库存同步任务优先级最高,价格同步次之。
坑2:幂等键设计不合理导致重复发货
回传发货状态时,幂等键只用了order_id,没有包含action。第二次重试时,同一订单又触发发货API(平台侧幂等实现不完善),导致重复发货。
解决:幂等键采用order_id:action:timestamp,或使用全局唯一递增版本号。
坑3:对账任务执行时间过长,影响白天业务
全量对账扫描几十万SKU,调用API被限流,持续几个小时。
解决:改为增量对账——只对账最近7天有库存变动的SKU。每周一次全量对账放在周末凌晨。
坑4:跨平台时区问题导致订单状态回传时序错乱
拼多多订单创建时间使用北京时间,TEMU使用UTC,回传时时间比对错误。
解决:统一在中央数据库存储UTC时间,适配器做转换。
九、从最终一致到实时一致(混合架构)
对于库存这种高敏感数据,最终一致性的短暂窗口(秒级到分钟级)仍可能导致超卖。我们引入了一种实时降级方案。
当拼多多订单产生时,先尝试实时同步TEMU库存(同步调用,超时1秒)。如果成功,则正常扣减;如果超时,再走异步消息表,但中央库存先扣减(承担风险,概率低)。
同时,在TEMU端设置“安全库存”缓冲(比中央库存少3-5件),即使有短暂不一致,也不会超卖。
十、总结:同步系统的工程原则
跨平台数据同步是店群系统最复杂的部分之一。我们的经验可以总结为几条原则:
- 本地消息表是基础:保证至少一次投递,配合幂等实现精确一次。
- 异步+重试兜底:临时故障靠重试,永久故障靠人工。
- 对账是最后防线:不管设计多好,都要有对账发现漏网之鱼。
- 敏感数据用预占:库存先预占再确认,避免超卖。
- 版本号解决冲突:价格、商品信息用乐观锁。
不需要一开始就做所有优化。从最简单的本地消息表开始,逐步加上重试、对账、预占。每次扩容遇到问题,再迭代。
希望这篇文章能帮你避免我们在跨平台同步上踩过的坑。
作者:林焱
