前言
在逻辑复制过程中,如果在复制过程中出现任何冲突,如主键冲突,逻辑复制都会停止。这时候需要我们手动去解决。
问题分析
修复冲入的方法共有两种:
- 在订阅端手动找到冲突的数据并删除,然后重新让订阅继续。
- 在订阅端调用 pg_replication_origin_advance 函数,跳过有冲突的事务。
问题复现
rudonx=# select * from pgbench_tellers;
tid | bid | tbalance | filler
-----+-----+----------+-----------
7 | 1 | 0 |
8 | 1 | 0 |
9 | 1 | 0 |
10 | 1 | 0 |
11 | 1 | 0 | 100
(5 rows)
```
`
在订阅端插入数据
````undefined
rudonx=# insert into pgbench_tellers values(12,1,0,99);
INSERT 0 1
```
`
在发布端插入数据
````undefined
rudonx=# insert into pgbench_tellers values(12,1,0,99);
INSERT 0 1
rudonx=# insert into pgbench_tellers values(13,1,0,99);
INSERT 0 1
```
`
在订阅端查看数据,发现 tid = 13 并未同步,此时出现了复制冲突。
````undefined
rudonx=# select * from pg_stat_subscription;
-[ RECORD 1 ]---------+----------
subid | 23038
subname | alltabsub
pid |
relid |
received_lsn |
last_msg_send_time |
last_msg_receipt_time |
latest_end_lsn |
latest_end_time |
```
`
在发布端查看 replication slot 状态
````undefined
rudonx=# select * from pg_replication_slots;
-[ RECORD 1 ]-------+-----------
slot_name | data_slot
plugin | pgoutput
slot_type | logical
datoid | 16596
database | rudonx
temporary | f
active | f # 可以看到此 slot 已经为不活跃的状态
active_pid |
xmin |
catalog_xmin | 44750878
restart_lsn | A/912F0C28
confirmed_flush_lsn | A/912F0C60 # 备库收到的 LSN 号也不会向前推进
```
`
在订阅端的错误日志中,会有如下报错:
````undefined
ERROR,23505,duplicate key value violates unique constraint "pgbench_tellers_pkey",Key (tid)=(12) already exists.,,,,,,,_bt_check_unique, nbtinsert.c:570,
```
`
## 解决方法 1
在备库上删除产生主键冲突的数据,然后重新设置订阅为启用状态。
````undefined
rudonx=# delete from pgbench_tellers where tid=12;
DELETE 1
rudonx=# ALTER SUBSCRIPTION alltabsub enable;
ALTER SUBSCRIPTION
rudonx=# select * from pgbench_tellers;
tid | bid | tbalance | filler
-----+-----+----------+-------------
7 | 1 | 0 |
8 | 1 | 0 |
9 | 1 | 0 |
10 | 1 | 0 |
11 | 1 | 0 | 100
12 | 1 | 0 | 99
13 | 1 | 0 | 99
```
`
## 解决方法 2
重新复现 insert 冲突 (插入 tid = 17),查看发布端 replication slot 相关位点信息:
````undefined
rudonx=# select * from pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+----------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
data_slot | pgoutput | logical | 16596 | rudonx | f | f | | | 44750887 | A/912F2C30 | A/912F2C68
(1 row)
```
`
在订阅端查看 pg_replication_origin_status
````undefined
rudonx=# select * from pg_replication_origin_status;
local_id | external_id | remote_lsn | local_lsn
----------+-------------+------------+-------------
1 | pg_23038 | A/912F2B48 | 69/43000278
(1 row)
```
`
克隆一个 slot 名字叫 sub_copy,查看发生冲突时的 LSN
````undefined
rudonx=# SELECT * from pg_logical_slot_peek_changes('sub_copy', null, NULL);
lsn | xid | data
------------+----------+-----------------------------------------------------------
A/912F2C68 | 44750887 | BEGIN 44750887
A/912F2C68 | 44750887 | table public.pgbench_tellers: INSERT: tid[integer]:17 bid[integer]:1 tbalance[integer]:0 filler[character]:'99'
A/912F3590 | 44750887 | COMMIT 44750887
A/912F35C8 | 44750888 | BEGIN 44750888
A/912F35C8 | 44750888 | table public.pgbench_tellers: INSERT: tid[integer]:18 bid[integer]:1 tbalance[integer]:0 filler[character]:'99'
A/912F36D0 | 44750888 | COMMIT 44750888
(6 rows)
```
`
在插入 tid=17 这条数据的时候发生了主键冲突,我们需要找到下一条数据的 LSN,然后执行语句跳过。跳过之后,发现数据已经同步
````undefined
rudonx=# SELECT pg_replication_origin_advance ('pg_23038', 'A/912F35C8'::pg_lsn);
pg_replication_origin_advance
-------------------------------
(1 row)
```
`
# 参考文档
[1] [https://www.postgresql.org/docs/current/logical-replication-conflicts.html](https://www.postgresql.org/docs/current/logical-replication-conflicts.html)
**如果您有其他问题,欢迎您联系火山引擎**[技术支持服务](https://console.volcengine.com/ticket/createTicketV2/)