阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

PostgreSQL逻辑复制之pglogical篇

260次阅读
没有评论

共计 13869 个字符,预计需要花费 35 分钟才能阅读完成。

一、pglogical 介绍

pglogical 是 PostgreSQL 的拓展模块, 为 PostgreSQL 数据库提供了逻辑流复制发布和订阅的功能。pglogical 重用了 BDR 项目中的一部分相关技术。pglogical 是一个完全作为 PostgreSQL 扩展实现的逻辑复制系统。完全集成,它不需要触发器或外部程序。这种物理复制的替代方法是使用发布 / 订阅模型复制数据以进行选择性复制的一种高效方法。支持 PG10、9.6、9.5、9.4,提供比 Slony、Bucardo 或 Londiste 更快的复制速度,以及跨版本升级。
我们使用的下列术语来描述节点和数据流之间的关系,重用了一些早期的 Slony 技术中的术语:

  • 节点 – PostgreSQL 数据库实例
  • 发布者和订阅者 – 节点的角色名称
  • 复制集 – 关系表的集合

pglogical 是新技术组件,使用了最新的 PostgreSQL 数据库中的一些核心功能,所以存在一些数据库版本限制:

  • 数据源发布和订阅节点需要运行 PostgreSQL 9.4 +
  • 复制源过滤和冲突检测需要 PostgreSQL 9.5 +

支持的使用场景:

  • 主版本数据库之间的升级(存在上述的版本限制)
  • 完整的数据库复制
  • 利用复制集,选择性的筛选的关系表
  • 可从多个上游服务器, 做数据的聚集和合并

二、安装操作

本节介绍了 pglogical 扩展模块复制的基本用法。
下载地址,安装步骤

tar -zxvf pglogical-REL2_2_0.tar.gz 
cd pglogical-REL2_2_0
. /home/postgres/.bash_profile
pg_config
USE_PGXS=1 make clean
USE_PGXS=1 make
USE_PGXS=1 make install

首先 PostgreSQL 服务器必须正确配置才能够支持逻辑解码︰

wal_level = 'logical'
# one per database needed on (provider/subscriber)provider node
max_worker_processes = 10  
# one per node needed on provider node
max_replication_slots = 10  
# one per node needed on provider node
max_wal_senders = 10 
shared_preload_libraries = 'pglogical'

如果你想要处理解决与上一次 / 第一次更新之间的冲突 wins(参阅冲突章节), 你的数据库版本需要为 PostgreSQL 9.5+ (在 9.4 中无效) 您可以向 PostgreSQL.conf 添加此额外的选项:

# needed for last/first update wins conflict resolution property available in Postgre
track_commit_timestamp = on

pg_hba.conf 需要配置成允许从本地主机复制,用户拥有有复制权限, 连接权限;并重启数据库服务

host    replication     postgres        网段 ip/24           trust

在所有节点上所对应数据库安装 pglogical 拓展模块:

CREATE EXTENSION pglogical;

三、pglogical 复制配置

现有实验环境

数据库版本 IP 角色
psql (PostgreSQL) 9.6.0 192.168.1.221 provider
psql (PostgreSQL) 10.5 192.168.1.235 subscriber

3.1、时间同步

服务器时间同步(主备库都需操作)

echo "*/20 * * * * /usr/sbin/ntpdate -u ntp.api.bz >/dev/null" >> /var/spool/cron/root

3.2、提供者节点配置

1、创建节点

在一个数据库里创建提供者节点

# 创建节点
SELECT pglogical.create_node(node_name := 'provider1',
    dsn := 'host=192.168.1.221 port=5432 dbname=lottu'
);

2、创建复制集

将 public 架构中的所有表添加到 default 复制集中

SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);

复制集 default 的表都必需要 primary key

3.3、订阅者节点配置

1、创建节点

在另一个数据库创建订阅者节点

SELECT pglogical.create_node(node_name := 'subscriber1',
dsn := 'host=192.168.1.235 port=5432 dbname=lottu'
);

2、创建订阅

订阅提供者节点,该订阅将在后台启动同步和复制过程

SELECT pglogical.create_subscription(subscription_name := 'subscription1',
provider_dsn := 'host=192.168.1.221 port=5432 dbname=lottu'
);

3.4、验证复制

前面我们已经完成安装 / 配置 pglogical 操作。

1、创建测试表

create table tbl_lottu01(id int primary key, name text, reg_time timestamp);

由于需要验证 insert/update/delete/truncate 操作是否同步;所以创建的表要有主键。当然只对发布者必须要主键约束。

2、添加测试数据

lottu=# insert into tbl_lottu01 select generate_series(1,10000),'lottu',now();
INSERT 0 10000

3、将表添加对应的复制集

对新建的表;并没有为其分配对应的复制集;需要手动添加。当然可以利用触发器自动添加;后续补充。

lottu=# select * from pglogical.replication_set_table ;
 set_id | set_reloid | set_att_list | set_row_filter 
--------+------------+--------------+----------------
(0 rows)
  • 方法 1:

前面讲解创建复制集中;3.2.2 中“将 public 架构中的所有表添加到 default 复制集中”

SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
  • 方法二:

将表添加到对应的复制集中;详细介绍可以查看前面文档。

pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)

两种方法都可以;我们采用第二种方法。

lottu=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu01',synchronize_data := true);
 replication_set_add_table 
---------------------------
 t
(1 row)

我们查看复制集

lottu=# select * from pglogical.replication_set_table ;
  set_id   | set_reloid  | set_att_list | set_row_filter 
-----------+-------------+--------------+----------------
 290045701 | tbl_lottu01 |              | 
(1 row)

同时,数据也同步到 subscriber 节点。因为在第二种方法有 同步 的操作。若使用第一种方法;还需要在 subscriber 节点同步表的操作。

#重新同步一个表
pglogical.alter_subscription_resynchronize_table(subscription_name name, relation regclass) 
#将所有的表都同步
pglogical.alter_subscription_synchronize(subscription_name name, truncate bool)

4、查看 subscriber 节点

查看表 tbl_lottu01 信息

lottu=# select * from pglogical.show_subscription_table('subscription1','tbl_lottu01');
 nspname |   relname   |    status    
---------+-------------+--------------
 public  | tbl_lottu01 | synchronized
(1 row)

lottu=# select count(1) from tbl_lottu01;
 count 
-------
 10000
(1 row)

在复制集 default 中:update/delete/truncate 操作也是同步复制。不作演示

复制集 INSERT UPDATE DELETE TRUNCATE
default
default_insert_only × × ×

四、复制特性扩展

4.1、延迟复制

pglogical.create_subscription(subscription_name name, provider_dsn text, replication_sets text[], synchronize_structure boolean, synchronize_data boolean, forward_origins text[], apply_delay interval)

参数:

  • subscription_name – 订阅的名称,必须是唯一的
  • provider_dsn – 提供者的连接字符串
  • replication_sets – 要订阅的复制集数组,这些必须已存在,默认为“{default,default_insert_only,ddl_sql}”
  • synchronize_structure – 指定是否将提供者与订阅者之间的结构同步,默认为 false
  • synchronize_data – 指定是否将数据从提供者同步到订阅者,默认为 true
  • forward_origins – 要转发的原始名称数组,当前只支持的值是空数组,意味着不转发任何不是源自提供者节点的更改,或“{all}”这意味着复制所有更改,无论它们的来源是什么,默认是全部}”
  • apply_delay – 延迟复制多少,默认为 0 秒

示例:数据表结构同步;且延迟复制 1 分钟

SELECT pglogical.create_subscription(subscription_name := 'subscription1',
provider_dsn := 'host=192.168.1.221 port=5432 dbname=lottu',
synchronize_structure := true,
apply_delay := '00:01:00'::interval
);

4.2、对源端进行 行 / 列 过滤

过滤机制需要 PostgreSQL 9.5 +

pglogical.replication_set_add_table(set_name name, relation regclass, synchronize_data boolean, columns text [],row_filter text)

参数:

  • set_name – 现有复制集的名称
  • relation – 要添加到集合中的表的名称或 OID
  • synchronize_data – 如果为 true,则表数据将在订阅给定复制集的所有订户上同步,默认为 false
  • columns – 要复制的列的列表。通常,当应复制所有列时,这将设置为 NULL,这是默认值
  • row_filter – 行过滤表达式,默认为 NULL(无过滤),有关详细信息,请参阅(行过滤)。警告:在使用有效行筛选器同步数据时要小心。使用 synchronize_data=true 有效 row_filter 就像对表的一次性操作。使用修改后再次执行它将 row_filter 不会将数据同步到订户。订阅者可能需要 pglogical.alter_subscription_resynchronize_table()来修复它。

**

示例:对表 tbl_lottu02 中字段{id, name, job} 字段列过滤;且对条件‘id > 10’进行行过滤 **

# provider 节点 创建表并插入测试数据
create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp);
insert into tbl_lottu02 select generate_series(1,20) id,'lottu'||generate_series(1,20),'pg', now();

# subscriber 节点创建表;可以只创建复制的列的数据表
create table tbl_lottu02 (id int primary key, name text, job text, reg_time timestamp);
# or
create table tbl_lottu02 (id int primary key, name text, job text);

#provider 节点 将表加入复制集中;并同步记录
lottu=# select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu02', synchronize_data := true, columns := '{id, name, job}',row_filter := 'id < 10');
 replication_set_add_table 
---------------------------
 t
(1 row)

# subscriber 节点查看表 tbl_lottu02 记录
lottu=# select * from tbl_lottu02;
 id |  name  | job 
----+--------+-----
  1 | lottu1 | pg
  2 | lottu2 | pg
  3 | lottu3 | pg
  4 | lottu4 | pg
  5 | lottu5 | pg
  6 | lottu6 | pg
  7 | lottu7 | pg
  8 | lottu8 | pg
  9 | lottu9 | pg
(9 rows)

4.3、为新表自动分配复制集

事件触发器工具可用于描述为新创建的表定义复制集的规则。

CREATE OR REPLACE FUNCTION pglogical_assign_repset()
RETURNS event_trigger AS $$
DECLARE obj record;
BEGIN
    FOR obj IN SELECT * FROM pg_event_trigger_ddl_commands()
    LOOP
        IF obj.object_type = 'table' THEN
            IF obj.schema_name = 'config' THEN
                PERFORM pglogical.replication_set_add_table('configuration', obj.objid);
            ELSIF NOT obj.in_extension THEN
                PERFORM pglogical.replication_set_add_table('default', obj.objid);
            END IF;
        END IF;
    END LOOP;
END;
$$ LANGUAGE plpgsql;

CREATE EVENT TRIGGER pglogical_assign_repset_trg
    ON ddl_command_end
    WHEN TAG IN ('CREATE TABLE', 'CREATE TABLE AS')
    EXECUTE PROCEDURE pglogical_assign_repset();

4.4、冲突检测

冲突检测需要 PostgreSQL 9.5 +
如果节点订阅多个提供程序,或当本地写入在订阅服务器上发生,可能会发生冲突,尤其是对传入的变化。这些都自动检测,并可以就此采取行动取决于配置。
解决冲突的办法是通过配置 pglogical.conflict_resolution 参数。
pglogical.conflict_resolution 支持的配置参数选项为︰

  • error – 复制将停止上错误如果检测到冲突和手动操作需要解决
  • apply_remote – 总是应用与本地数据有冲突的更改,这是默认值
  • keep_local – 保留数据的本地版本,并忽略来自远程节点相互冲突的更改
  • last_update_wins – 时间戳为提交最新的版本 (newest commit timestamp) 的数据将会被保存(这可以是本地或远程版本)
  • first_update_wins – 时间戳为最旧的版本 (oldest timestamp) 的数据将会被保存(这可以是本地或远程版本)

当参数 track_commit_timestamp 被禁用时,唯一允许的配置值是 apply_remote。PostgreSQL 9.4 不支持 track_commit_timestamp 配置参数只能配置参数 apply_remote(该参数是默认值)。

# 在 订阅者 节点配置; 我们保留最新的数据
track_commit_timestamp = on
pglogical.conflict_resolution = 'last_update_wins'

# 在 订阅者 节点创建测试表 tbl_lottu03
lottu=# create table tbl_lottu03(id int primary key, name text);
CREATE TABLE
lottu=# insert into tbl_lottu03 values (1001,'subscriber');
INSERT 0 1

# 在 发布者 节点 创建测试表
create table tbl_lottu03(id int primary key, name text);
select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu03',synchronize_data := true);
insert into tbl_lottu03 values (1001,'provider');

# 在 订阅者 节点 查看数据
lottu=# select * from tbl_lottu03;
  id  |   name   
------+----------
 1001 | provider

后记:在订阅者的表需要主键约束;不然检测不到冲突;是否需要主键约束当然这个也是根据需求而定。

五、场景介绍

5.1、可从多个上游服务器, 做数据的聚集和合并

发布者跟订阅者的关系;一个发布者可以被多个订阅者订阅。多个发布者可以被同一个订阅者订阅。

数据库版本 IP 数据库 角色
psql (PostgreSQL) 9.6.0 192.168.1.221 lottu provider1
psql (PostgreSQL) 9.6.0 192.168.1.221 lottu02 provider2
psql (PostgreSQL) 10.5 192.168.1.235 lottu subscriber

为了加以区分;我们定制 SQL 提示符;例如

lottu=# \set PROMPT1 '%`echo provider1=`'
provider1=

5.1.1、创建测试表

# 每个节点创建测试表; 订阅者创建的表可以无主键;若订阅者有主键,可利用序列自增来解决冲突。(例如:本例是两个发布者,则发布者 1 可取奇数; 发布者二可取偶数)。若无主键; 数据不受影响。
provider1=create table tbl_lottu05(id int primary key,name text);
CREATE TABLE
provider1=CREATE SEQUENCE seq_lottu05_id INCREMENT BY 2 START WITH 1;
CREATE SEQUENCE

provider2=create table tbl_lottu05(id int primary key,name text);
CREATE TABLE
provider2=CREATE SEQUENCE seq_lottu05_id INCREMENT BY 2 START WITH 2;
CREATE SEQUENCE

subscriber=create table tbl_lottu05(id int primary key,name text);
CREATE TABLE

5.1.2、搭建模拟场景

更多介绍查看第三节;或者查考《PostgreSQL 逻辑复制文档 (pglogical 文档)》

# provider 节点 1 
provider1=SELECT pglogical.create_node(node_name := 'provider1', dsn := 'host=192.168.1.221 port=5432 dbname=lottu');
 create_node 
-------------
  2976894835
  
provider1=select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu05',synchronize_data := true);
 replication_set_add_table 
---------------------------
 t

# provider 节点 2 
provider2=SELECT pglogical.create_node(node_name := 'provider2', dsn := 'host=192.168.1.221 port=5432 dbname=lottu02');
 create_node 
-------------
  1828187473

provider2=select pglogical.replication_set_add_table(set_name := 'default', relation := 'tbl_lottu05',synchronize_data := true);
 replication_set_add_table 
---------------------------
 t

# subscriber 节点
subscriber=SELECT pglogical.create_node(node_name := 'subscriber', dsn := 'host=192.168.1.235 port=5432 dbname=lottu');
 create_node 
-------------
  2941155235
  
subscriber=SELECT pglogical.create_subscription(subscription_name := 'subscription1', provider_dsn := 'host=192.168.1.221 port=5432 dbname=lottu');
 create_subscription 
---------------------
          1763399739
        
subscriber=SELECT pglogical.create_subscription(subscription_name := 'subscription2', provider_dsn := 'host=192.168.1.221 port=5432 dbname=lottu02'); 
create_subscription 
---------------------
          1871150101

5.1.3、插入数据验证

provider1=insert into tbl_lottu05 select nextval('seq_lottu05_id'),'lottu' || generate_series(1,10,2);
INSERT 0 5

provider2=insert into tbl_lottu05 select nextval('seq_lottu05_id'),'lottu' || generate_series(1,10,2);
INSERT 0 5

subscriber=select * from tbl_lottu05;
 id |  name  
----+--------
  1 | lottu1
  3 | lottu3
  5 | lottu5
  7 | lottu7
  9 | lottu9
  2 | lottu1
  4 | lottu3
  6 | lottu5
  8 | lottu7
 10 | lottu9
(10 rows)

5.2、数据库版本升级

pglogical 对 PostgreSQL 版本升级是一个很实用的工具。能实现以几乎为零的停机时间迁移和升级 PostgreSQL。局限性在于 pglogical 支持的 PostgreSQL 版本。
本例简单模拟下 pglogical 对 PostgreSQL 版本升级;忽略插件、存储空间、表空间、以及业务 SQL 和自定义函数创建。

数据库版本 IP 数据库 角色
psql (PostgreSQL) 9.6.0 192.168.1.221 lottu provider
psql (PostgreSQL) 10.5 192.168.1.235 lottu subscriber

5.2.1、新建升级数据库

以一个全新的数据库进行操作

PG10-235=drop database if exists  lottu;
NOTICE:  database "lottu" does not exist, skipping
DROP DATABASE
PG10-235=create database lottu owner lottu;
CREATE DATABASE

5.2.2、pglogical 插件安装

本环境已经安装 pglogical;只要到对应数据库创建 pglogical 插件即可

PG10-235=CREATE EXTENSION pglogical;
CREATE EXTENSION
PG10-235=\dx
                   List of installed extensions
   Name    | Version |   Schema   |          Description           
-----------+---------+------------+--------------------------------
 pglogical | 2.2.0   | pglogical  | PostgreSQL Logical Replication
 plpgsql   | 1.0     | pg_catalog | PL/pgSQL procedural language
(2 rows)

5.2.3、配置 pglogical

  • 发布者节点

这个要根据真实环境来设置;考虑到真实环境数据库中表不一���都有主键约束,可将表放到复制集 “default_insert_only”

PG96-221=SELECT pglogical.create_node(node_name := 'provider', dsn := 'host=192.168.1.221 port=5432 dbname=lottu');
 create_node 
-------------
  3171898924
(1 row)

PG96-221=SELECT pglogical.replication_set_add_all_tables('default_insert_only', ARRAY['public']);
 replication_set_add_all_tables 
--------------------------------
 t
(1 row)

该函数可实现主键和非主键分别放到 ’default’ 和 ’default_insert_only’ 复制集

CREATE OR REPLACE FUNCTION "public"."pglogical_relhaspkey_repset"()
  RETURNS "pg_catalog"."void" AS $BODY$ DECLARE obj record;
BEGIN
  FOR obj IN (SELECT n.nspname, c.relname, c.relhaspkey
                FROM pg_catalog.pg_class c
                LEFT JOIN pg_catalog.pg_namespace n
                  ON n.oid = c.relnamespace
               WHERE c.relkind = 'r'
                 AND n.nspname <> 'pg_catalog'
                 AND n.nspname <> 'information_schema'
                 AND n.nspname !~ '^pg_toast'
                 AND pg_catalog.pg_table_is_visible(c.oid)
               ORDER BY 1, 2) LOOP
      IF obj.relhaspkey THEN
        PERFORM pglogical.replication_set_add_table(set_name := 'default', relation := obj.relname :: regclass);
      ELSE
        PERFORM pglogical.replication_set_add_table(set_name := 'default_insert_only', relation :=  obj.relname :: regclass);
      END IF;
  END LOOP;
END; $BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100
  • 订阅者节点
PG10-235=SELECT pglogical.create_node(node_name := 'subscriber', dsn := 'host=192.168.1.235 port=5432 dbname=lottu');
 create_node 
-------------
  2941155235

5.2.4、迁移 DDL

pglogical 可以同步表 / 序列结构;在创建订阅者 ‘pglogical.create_subscription’ ; 里面参数 synchronize_structure – 指定是否将提供者与订阅者之间的结构同步,默认为 false。可以同步表 / 序列 / 索引。

PG10-235=SELECT pglogical.create_subscription(subscription_name := 'subscription', provider_dsn := 'host=192.168.1.221 port=5432 dbname=lottu', synchronize_structure := true, synchronize_data := false);
 create_subscription 
---------------------
          2875150205
(1 row)

5.2.5、业务代码改写优化

上一步我们没同步数据。所以参数 synchronize_data 我们选择 false。虽然把表 / 序列 / 索引结构同步过来;但是业务代码 (函数 / 插件) 没同步过来;还要考虑这些业务代码是否需要改写优化。因为新的版本往往有新特性。

5.2.6、全量复制

pglogical 有将所有未同步表都在单个操作中同步
语法:

pglogical.alter_subscription_synchronize(subscription_name name, truncate bool)

参数:

  • subscription_name – 现有订阅的名称
  • truncate – 如果为 true,表将在复制前被截断,默认为 false
PG10-235=SELECT pglogical.alter_subscription_synchronize(subscription_name := 'subscription', truncate := false);
 alter_subscription_synchronize 
--------------------------------
 t
(1 row)

5.2.7、比对数据一致

经过上一步,两个数据库数据达到一致。

  • 查看表同步状态
PG10-235=select * from pglogical.show_subscription_table(subscription_name := 'subscription', relation := 'tbl_lottu01'::regclass);
 nspname |   relname   |    status    
---------+-------------+--------------
 public  | tbl_lottu01 | synchronized
(1 row)
  • 比对两个数据库表的数据
PG96-221=select count(1) from tbl_lottu01;
 count 
-------
 10000
(1 row)

PG10-235=select count(1) from tbl_lottu01;
 count 
-------
 10000
(1 row)

5.2.8、业务切换

比对数据一致;可以将业务切换到升级后的数据库。

5.2.9、删除 pglogical 配置

这步是可选的;保证升级后的数据库正常支持业务。不存在数据丢失的情况下。可以删除 pglogical 配置。
删除步骤:

  • 删除订阅信息
  • 删除两个数据库 pglogical 节点
PG10-235=select pglogical.drop_subscription(subscription_name := 'subscription',ifexists := true);
 drop_subscription 
-------------------
                 1
(1 row)

PG10-235=select pglogical.drop_node(node_name := 'subscriber', ifexists := true);
 drop_node 
-----------
 t
(1 row)

PG96-221=select pglogical.drop_node(node_name := 'provider', ifexists := true);
 drop_node 
-----------
 t
(1 row)

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计13869字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中