如何使用逻辑流复制来迁移数据

数据库关系型数据库技术服务知识库
前言

PostgreSQL 中的逻辑流复制与消息队列中的发布者/订阅者模型非常相似,在发布者端将 WAL 日志流解析成一定格式的数据流,订阅者节点收到后进行应用,以此来达到数据同步的目的。 本章内容将介绍如何使用逻辑流复制进行数据同步,可用于数据迁移等场景。

先决条件

在源库需要预先配置项如下:

  1. wal_level = logical
  2. max_replication_slots: 每个订阅需要消耗一个 slot。建议根据情况来进行设置。
  3. max_wal_senders,每一个slot要使用一个wal sender。建议根据实际情况来进行设置。
  4. 保证源库和目标库网络连通性。
  5. 选择复制的用户需要具有 replication 或 superuser 权限。

关于逻辑复制的相关限制条件,您可以参考文档[1][2]

操作步骤

创建发布订阅的语法

首先我们可以看下创建发布订阅的语法:

#  创建发布任务
CREATE PUBLICATION name
    [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]
      | FOR ALL TABLES ]
    [ WITH ( publication_parameter [= value] [, ... ] ) ]

```
`
````undefined
# 创建订阅任务
CREATE SUBSCRIPTION subscription_name
    CONNECTION 'conninfo'
    PUBLICATION publication_name [, ...]
    [ WITH ( subscription_parameter [= value] [, ... ] ) ]
```
`
### 在源库上执行的操作如下
在源库上创建测试库和表
````undefined
postgres=# create database pubdb;
CREATE DATABASE

postgres=# \c pubdb
You are now connected to database "pubdb" as user "postgres".

pubdb=# create table pub_t(id int,name varchar(10));
CREATE TABLE

```
`
在源库上创建发布任务:
````undefined

pubdb=# create publication pub1 for table pub_t;
CREATE PUBLICATION

pubdb=# select * from pg_publication;
  oid  | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate 
-------+---------+----------+--------------+-----------+-----------+-----------+-------------
 16405 | pub1    |       10 | f            | t         | t         | t         | t
(1 row)
```
`
### 在目标库上执行操作如下
在目标库上创建需要同步的数据库和表
````undefined
postgres=# create database pubdb;
CREATE DATABASE

postgres=# \c pubdb;
You are now connected to database "pubdb" as user "postgres".

pubdb=# create table pub_t(id int,name varchar(10));
CREATE TABLE
```
`
在目标库上创建订阅任务:
````undefined
pubdb=# create subscription sub1 connection 'host=xxxx port=5433 dbname=pubdb user=repluser password=xxxxx' publication pub1;
NOTICE:  created replication slot "sub1" on publisher
CREATE SUBSCRIPTION

pubdb=# select * from pg_subscription;
-[ RECORD 1 ]---+--------------------------------------------------------------------------
oid             | 16390
subdbid         | 16385
subname         | sub1
subowner        | 10
subenabled      | t
subconninfo     | host=xxxxx port=5433 dbname=pubdb user=repluser password=xxxxx
subslotname     | sub1
subsynccommit   | off
subpublications | {pub1}
```
`
### 在源库上查看并写入数据
我们可以使用如下 SQL 语句进行查询 slot 的状态,并写入数据,然后再从库上进行查询,会发现数据已经同步。
````undefined
pubdb=# SELECT slot_name,plugin,slot_type,database,active,restart_lsn FROM pg_replication_slots where slot_name='sub1';
 slot_name |  plugin  | slot_type | database | active | restart_lsn 
-----------+----------+-----------+----------+--------+-------------
 sub1      | pgoutput | logical   | pubdb    | t      | 0/501CDA0
(1 row)


pubdb=# insert into pub_t values(1,'rudonx');
INSERT 0 1

```
`
至此,一个逻辑流复制的发布订阅任务配置完成。更多信息,您可以参考文档[3]。
# 参考文档
[1] [https://www.postgresql.org/docs/12/logical-replication-security.html](https://www.postgresql.org/docs/12/logical-replication-security.html)
[2] [https://www.postgresql.org/docs/12/logical-replication-restrictions.html](https://www.postgresql.org/docs/12/logical-replication-restrictions.html)
[3] [https://www.postgresql.org/docs/12/logical-replication.html](https://www.postgresql.org/docs/12/logical-replication.html)
**如果您有其他问题,欢迎您联系火山引擎**[技术支持服务](https://console.volcengine.com/ticket/createTicketV2/)

43
0
0
0
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论