注:此次测试所用版本为V7.2.0社区版

1 数据库配置(必选项)

1.1 日志格式配置(如果已经修改过,可忽略)

  • 修改postgresql.conf文件
#逻辑级别日志
wal_level = logical
#如果需要远程读日志,需要设置,至少为1,就接受1个读日志线程
max_wal_senders = 10   #建议大于等于10 
#设置日志保留个数,设置太少了,日志量太大了日志自动删除后,容易读不到日志
wal_keep_segments = 10   #建议大于等于10

# wal_keep_size:要为日志复制进程保留多少日志量。旧版本使用的是wal_keep_segments参数(要为日志复制进程保留多少个日志文件),在pg 13中,该参数已被移除。wal_keep_size = wal_keep_segments * wal_segment_size
# 修改完成后需要重启数据库:pg_ctl -D /var/lib/postgresql/data -l logfile restart
# wal_level=logical 需要重启数据库
  • 验证是否成功
postgres=# select name,setting,category from pg_settings where name in ('wal_level','wal_keep_segments','max_wal_senders','wal_keep_size');
       name        | setting |           category
-------------------+---------+-------------------------------
 max_wal_senders   | 10      | Replication / Sending Servers
 wal_keep_segments | 10      | Replication / Sending Servers
 wal_level         | logical | Write-Ahead Log / Settings
(3 rows)

2 添加槽位号说明(可选操作)

(如发布数据库未配置SLOT_NAME参数,则此步骤添加槽位号跳过,目前软件仅支持添加物理复制槽)

2.1 创建槽位号(SQL/系统命令两种可选)

  • 创建槽位号
  select pg_create_physical_replication_slot('zcbus_slot_name');
  • 删除槽位号
  select pg_drop_replication_slot('zcbus_slot_name');

2.1.1 查看创建槽位号信息

postgres=#  select * from pg_replication_slots;
    slot_name    | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
-----------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
 zcbus_slot_name |        | physical  |        |          | f         | t      |       2495 |      |              | 0/136FA6E0  | 
(1 row)

2.1.2 切换日志测试

  • PG10之前版本
  select pg_switch_xlog();
  • PG10之后版本
  select pg_switch_wal();

2.1.3 删除日志参考:

  • 检查可以删除wal的日志文件
   83e44cfdd569:~/data$ pg_controldata -D /var/lib/postgresql/data/ |grep "REDO WAL file"
   Latest checkpoint's REDO WAL file:    000000010000000100000067
  • 删除日志文件操作
   pg_archivecleanup /var/lib/postgresql/data/pg_wal/ 000000010000000100000067

3 配置解析日志用户(必选项)

  • 如果远程读日志,需要设置读日志的权限,修改pg_hba.conf
host    replication     all             0.0.0.0/32            ident/md5 ###可选

上段用户加密方式可以通过下述SQL查询加密方式,进行配置

  • 查询数据库加密方式
show password_encryption;
  • 修改pg_hba.conf执行pg_ctl命令生效
pg_ctl reload -D 指定pg_hba.conf所在目录
示例:pg_ctl reload -D /opt/postgresql/data
  • 验证是否成功
postgres=# select * from pg_hba_file_rules;
 line_number | type  |   database    | user_name |  address  |                 netmask                 | auth_method | options | error 
-------------+-------+---------------+-----------+-----------+-----------------------------------------+-------------+---------+-------
          95 | local | {replication} | {all}     |           |                                         | trust       |         | 

4 数据表配置(必选项)

4.1 需要同步的表打开补充日志

ALTER TABLE schemaname.tabname REPLICA IDENTITY FULL;

---批量生成打开表补充日志sql语句参考
SELECT 'ALTER TABLE '||nspname||'.'||relname||' REPLICA IDENTITY FULL;' FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind='r' and nspname='public';

---注1:表所属用户或者超级用户才有开启REPLICA IDENTITY权限,否则报错:must be owner of table

4.1.1 分区表检查SQL以及注意事项

源表为分区表开启补充日志
SELECT
   parent.relname AS partitioned_table_name,
   parent.relkind AS partitioned_table_type,
   parent.relpartbound AS partition_bound,
   child.relname,
   child.relkind,
   child.relreplident,
   'ALTER TABLE ' || nspname || '.' || child.relname || ' REPLICA IDENTITY FULL' rif
FROM
 pg_class parent
LEFT JOIN pg_inherits ON pg_inherits.inhparent = parent.oid
LEFT JOIN pg_class child ON pg_inherits.inhrelid = child.oid
LEFT JOIN pg_namespace pns ON child.relnamespace = pns.oid
WHERE
 parent.relkind = 'p'
AND parent.relname = 'tabname';

---该语句能查询出分区表中的分区是否开启补充日志,若没有开启,可通过生成的批量SQL开启表补充日志

备注:

  postgresql只有owner或usersuper权限才能修改表结构或执行drop操作,请在执行操作前确定所同步表的Owner或用Superuser权限用户执行ALTER USER zcbus WITH SUPERUSER;

5 DDL兼容配置(可选项)

  • 添加此功能后,在增量复制过程中,若有表结构修改等操作,软件将自动同步。若未添加此功能,在修改结构时若遇到结构不一致的情况,可能需要人工参与。

5.1 创建ddl_event_tbl表(用于存储DDL语句)

  • 创建表
create schema zcbus;

create table zcbus.ddl_event_tbl (
event text,
tag text,
objid oid,
object_type text,
schema_name text,
object_identity text,
query text,
op_time timestamp default now()
);
  • 检查ddl_event_tbl表是否创建
select * from information_schema.tables where table_type='BASE TABLE' and table_name='ddl_event_tbl';

5.2 创建函数zcbus_func_ddl_command

create or replace function zcbus.zcbus_func_ddl_command() returns event_trigger as $$  
declare  
  v1 text;
  r record;
begin
  select query into v1 from pg_stat_activity where pid=pg_backend_pid();
  -- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag; 
  if TG_EVENT='ddl_command_end' then  
    SELECT * into r FROM pg_event_trigger_ddl_commands();
    if r.classid > 0 then
      insert into zcbus.ddl_event_tbl(event, tag, objid, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.objid, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
  if TG_EVENT='sql_drop' then
    if TG_TAG != 'ALTER TABLE' then
      SELECT * into r FROM pg_event_trigger_dropped_objects();
      insert into zcbus.ddl_event_tbl(event, tag, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
end;  
$$ language plpgsql strict;
  • 检查function是否创建:
select a.* FROM pg_catalog.pg_proc a JOIN pg_catalog.pg_namespace b ON a.pronamespace = b.oid
WHERE a.proname = 'zcbus_func_ddl_command' ;

5.3 创建事件触发器

CREATE EVENT TRIGGER zcbus_et_ddl_command on ddl_command_end EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
CREATE EVENT TRIGGER zcbus_et_ddl_drop on sql_drop EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
  • 检查EVENT TRIGGER是否创建:
select * from  pg_event_trigger  where evtname in ('zcbus_et_ddl_drop','zcbus_et_ddl_command');

6 用户授权(必选项)

6.1 创建用户及赋权

切换到同步库下,执行如下(数据库命令行执行):

create user zcbus with password  '*****';
grant connect on database *** to zcbus;  # ***为需要同步的数据库名称
create schema zcbus authorization zcbus;
grant select,trigger on table zcbus.ddl_event_tbl  to zcbus;
grant execute ON FUNCTION zcbus.zcbus_func_ddl_command to zcbus;
alter user zcbus REPLICATION;
grant select on all tables in schema *** to zcbus;
grant execute on function pg_hba_file_rules to zcbus;
alter table zcbus.ddl_event_tbl owner to zcbus;

以上过程中,web页面不要点击注册按键,所有配置手工完成!

7 补充说明

7.1 添加新的同步表

添加新表,根据schemaname.tablename,确认postgres账号开启表的补充日志模式状态开启成功

select relreplident from pg_class c , pg_tables tab where tab.schemaname = 'schemaname' and c.relname='tablename';

查询结果说明

d = 默认(主键)
n = 无
f = 所有列
i = 索引的indisreplident被设置或者默认
查询结果为f,则此表满足同步条件,开启表的补充日志模式成功

注:每次添加完新的同步表,需要再次对同步用户执行如下权限

grant select on all tables in schema public to zcbus;
grant execute on function pg_hba_file_rules to zcbus;

7.2 数据类型支持列表

 在抓取数据时,我们将antdb数据类型与Zcbus支持的类型相匹配。如果我们不支持某个数据类型,我们会自动将该类型更改为最受支持的类型,或者在某些情况下,根本不加载该数据。我们的系统将会抛出异常、跳过我们不接受或转换的数据类型对应表。

antdb类型 是否支持 描述
smallint 支持
integer 支持
bigint 支持
decimal(m,d) 支持
numeric(m,d) 支持
real 支持
double precision 支持
smallserial 支持
serial 支持
bigserial 支持
money 支持
character varying(n) 支持
varchar(n) 支持
character(n) 支持
char(n) 支持
text 支持
timestamp 支持
tmestamp with time zone 支持
date 支持
time 支持
time with time zone 支持
interval 支持
bit(n) 支持
bit varing(n) 支持
boolean 支持
point 支持
line 支持
lseg 支持
box 支持
path 支持
polygon 支持
circle 支持
cidr 支持
inet 支持
macaddr 支持
tsvector 支持
tsquery 支持
uuid 支持
xml 支持
文档更新时间: 2024-07-26 19:08   作者:操李红