前言
PostgreSQL 中的逻辑流复制与消息队列中的发布者/订阅者模型非常相似,在发布者端将 WAL 日志流解析成一定格式的数据流,订阅者节点收到后进行应用,以此来达到数据同步的目的。 本章内容将介绍如何使用逻辑流复制进行数据同步,可用于数据迁移等场景。
先决条件
在源库需要预先配置项如下:
- wal_level = logical
- max_replication_slots: 每个订阅需要消耗一个 slot。建议根据情况来进行设置。
- max_wal_senders,每一个slot要使用一个wal sender。建议根据实际情况来进行设置。
- 保证源库和目标库网络连通性。
- 选择复制的用户需要具有 replication 或 superuser 权限。
关于逻辑复制的相关限制条件,您可以参考文档[1][2]
操作步骤
创建发布订阅的语法
首先我们可以看下创建发布订阅的语法:
# 创建发布任务
CREATE PUBLICATION name
[ FOR TABLE [ ONLY ] table_name [ * ] [, ...]
| FOR ALL TABLES ]
[ WITH ( publication_parameter [= value] [, ... ] ) ]
```
`
````undefined
# 创建订阅任务
CREATE SUBSCRIPTION subscription_name
CONNECTION 'conninfo'
PUBLICATION publication_name [, ...]
[ WITH ( subscription_parameter [= value] [, ... ] ) ]
```
`
### 在源库上执行的操作如下
在源库上创建测试库和表
````undefined
postgres=# create database pubdb;
CREATE DATABASE
postgres=# \c pubdb
You are now connected to database "pubdb" as user "postgres".
pubdb=# create table pub_t(id int,name varchar(10));
CREATE TABLE
```
`
在源库上创建发布任务:
````undefined
pubdb=# create publication pub1 for table pub_t;
CREATE PUBLICATION
pubdb=# select * from pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate
-------+---------+----------+--------------+-----------+-----------+-----------+-------------
16405 | pub1 | 10 | f | t | t | t | t
(1 row)
```
`
### 在目标库上执行操作如下
在目标库上创建需要同步的数据库和表
````undefined
postgres=# create database pubdb;
CREATE DATABASE
postgres=# \c pubdb;
You are now connected to database "pubdb" as user "postgres".
pubdb=# create table pub_t(id int,name varchar(10));
CREATE TABLE
```
`
在目标库上创建订阅任务:
````undefined
pubdb=# create subscription sub1 connection 'host=xxxx port=5433 dbname=pubdb user=repluser password=xxxxx' publication pub1;
NOTICE: created replication slot "sub1" on publisher
CREATE SUBSCRIPTION
pubdb=# select * from pg_subscription;
-[ RECORD 1 ]---+--------------------------------------------------------------------------
oid | 16390
subdbid | 16385
subname | sub1
subowner | 10
subenabled | t
subconninfo | host=xxxxx port=5433 dbname=pubdb user=repluser password=xxxxx
subslotname | sub1
subsynccommit | off
subpublications | {pub1}
```
`
### 在源库上查看并写入数据
我们可以使用如下 SQL 语句进行查询 slot 的状态,并写入数据,然后再从库上进行查询,会发现数据已经同步。
````undefined
pubdb=# SELECT slot_name,plugin,slot_type,database,active,restart_lsn FROM pg_replication_slots where slot_name='sub1';
slot_name | plugin | slot_type | database | active | restart_lsn
-----------+----------+-----------+----------+--------+-------------
sub1 | pgoutput | logical | pubdb | t | 0/501CDA0
(1 row)
pubdb=# insert into pub_t values(1,'rudonx');
INSERT 0 1
```
`
至此,一个逻辑流复制的发布订阅任务配置完成。更多信息,您可以参考文档[3]。
# 参考文档
[1] [https://www.postgresql.org/docs/12/logical-replication-security.html](https://www.postgresql.org/docs/12/logical-replication-security.html)
[2] [https://www.postgresql.org/docs/12/logical-replication-restrictions.html](https://www.postgresql.org/docs/12/logical-replication-restrictions.html)
[3] [https://www.postgresql.org/docs/12/logical-replication.html](https://www.postgresql.org/docs/12/logical-replication.html)
**如果您有其他问题,欢迎您联系火山引擎**[技术支持服务](https://console.volcengine.com/ticket/createTicketV2/)