1 数据库配置(必选项)

1.1 日志格式配置(如果已经修改过,可忽略)

  • 修改postgresql.conf文件
#逻辑级别日志
wal_level = logical
#如果需要远程读日志,需要设置,至少为1,就接受1个读日志线程
max_wal_senders = 10   #建议大于等于10 
#设置日志保留个数,设置太少了,日志量太大了日志自动删除后,容易读不到日志
wal_keep_segments = 10   #建议大于等于10

# wal_keep_size:要为日志复制进程保留多少日志量。旧版本使用的是wal_keep_segments参数(要为日志复制进程保留多少个日志文件),在pg 13中,该参数已被移除。wal_keep_size = wal_keep_segments * wal_segment_size(默认16MB)
# 修改完成后需要重启数据库:pg_ctl -D /var/lib/postgresql/data -l logfile restart
# wal_level=logical 需要重启数据库
  • 验证是否成功
postgres=# select name,setting,category from pg_settings where name in ('wal_level','wal_keep_segments','max_wal_senders','wal_keep_size');
       name        | setting |           category
-------------------+---------+-------------------------------
 max_wal_senders   | 10      | Replication / Sending Servers
 wal_keep_segments | 10      | Replication / Sending Servers
 wal_level         | logical | Write-Ahead Log / Settings
(3 rows)

2 添加槽位号说明(可选操作)

(如发布数据库未配置SLOT_NAME参数,则此步骤添加槽位号跳过,目前软件仅支持添加物理复制槽)

2.1 创建槽位号(SQL/系统命令两种可选)

  • 创建槽位号
  select pg_create_physical_replication_slot('zcbus_slot_name');
  • 删除槽位号
  select pg_drop_replication_slot('zcbus_slot_name');

2.1.1 查看创建槽位号信息

postgres=#  select * from pg_replication_slots;
    slot_name    | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn 
-----------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
 zcbus_slot_name |        | physical  |        |          | f         | t      |       2495 |      |              | 0/136FA6E0  | 
(1 row)

2.1.2 切换日志测试

  • PG10之前版本
  select pg_switch_xlog();
  • PG10之后版本
    select pg_switch_wal();

2.1.3 删除日志参考:

  • 检查可以删除wal的日志文件

     83e44cfdd569:~/data$ pg_controldata -D /var/lib/postgresql/data/ |grep "REDO WAL file"
     Latest checkpoint's REDO WAL file:    000000010000000100000067
  • 删除日志文件操作

     pg_archivecleanup /var/lib/postgresql/data/pg_wal/ 000000010000000100000067

    3 配置解析日志用户(必选项)

  • 如果远程读日志,需要设置读日志的权限,修改pg_hba.conf

host    replication     all             0.0.0.0/32            ident/md5 ###可选

上段用户加密方式可以通过下述SQL查询加密方式,进行配置

  • 查询数据库或者zcbus用户加密方式
show password_encryption;
select rolname,rolpassword from pg_authid where rolname='zcbus';
  • 修改pg_hba.conf执行pg_ctl命令生效
pg_ctl reload -D 指定pg_hba.conf所在目录
示例:pg_ctl reload -D /opt/postgresql/data
  • 验证是否成功
postgres=# select * from pg_hba_file_rules;
 line_number | type  |   database    | user_name |  address  |                 netmask                 | auth_method | options | error 
-------------+-------+---------------+-----------+-----------+-----------------------------------------+-------------+---------+-------
          95 | local | {replication} | {all}     |           |                                         | trust       |         | 

以下过程中,web页面不要点击注册按键,所有配置手工完成!

1.2 创建用户及赋权

1.2.1 用postgres系统账号登录pg库

切换到同步库下,执行如下(数据库命令行执行):

create user zcbus with password  '*****';
grant connect on database *** to zcbus;  # ***为需要同步的数据库名称
create schema zcbus authorization zcbus;
GRANT ALL PRIVILEGES ON DATABASE testdb TO zcbus;
grant all on schema zcbus to zcbus;
grant all on schema public to zcbus;

备注:

  postgresql只有owner或usersuper权限才能修改表结构或执行drop操作,请在执行操作前确定所同步表的Owner或用Superuser权限用户执行ALTER USER zcbus WITH SUPERUSER;

1.2.2 创建ddl_event_tbl表

create table zcbus.ddl_event_tbl (
event text,
tag text,
objid oid,
object_type text,
schema_name text,
object_identity text,
query text,
op_time timestamp default now()
);

alter table zcbus.ddl_event_tbl owner to zcbus;
  • 检查ddl_event_tbl表是否创建
select * from information_schema.tables where table_schema='zcbus' and table_type='BASE TABLE' and table_name='ddl_event_tbl';

1.2.3 创建函数zcbus_func_ddl_command

create or replace function zcbus.zcbus_func_ddl_command() returns event_trigger as $$  
declare  
  v1 text;
  r record;
begin
  select query into v1 from pg_stat_activity where pid=pg_backend_pid();
  -- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag; 
  if TG_EVENT='ddl_command_end' then  
    SELECT * into r FROM pg_event_trigger_ddl_commands();
    if r.classid > 0 then
      insert into zcbus.ddl_event_tbl(event, tag, objid, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.objid, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
  if TG_EVENT='sql_drop' then
    if TG_TAG != 'ALTER TABLE' then
      SELECT * into r FROM pg_event_trigger_dropped_objects();
      insert into zcbus.ddl_event_tbl(event, tag, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
end;  
$$ language plpgsql strict;
  • 检查function是否创建:
select a.* FROM pg_catalog.pg_proc a JOIN pg_catalog.pg_namespace b ON a.pronamespace = b.oid
WHERE a.proname = 'zcbus_func_ddl_command' and b.nspname='zcbus';

1.2.4 创建事件触发器

CREATE EVENT TRIGGER zcbus_et_ddl_command on ddl_command_end EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
CREATE EVENT TRIGGER zcbus_et_ddl_drop on sql_drop EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
  • 检查EVENT TRIGGER是否创建:
select * from  pg_event_trigger  where evtname in ('zcbus_et_ddl_drop','zcbus_et_ddl_command');

1.2.5 为需要同步的表打开补充日志

ALTER TABLE schemaname.tabname REPLICA IDENTITY FULL;

---批量生成打开表补充日志sql语句参考
SELECT 'ALTER TABLE '||nspname||'.'||relname||' REPLICA IDENTITY FULL;' FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind='r' and nspname='public';

---注1:表所属用户或者超级用户才有开启REPLICA IDENTITY权限,否则报错:must be owner of table
---注2:关于分区表开启补充日志以及添加新的同步表,请查看本页2.1,2.2章节说明

1.2.6 用户授权

grant select,trigger on table zcbus.ddl_event_tbl  to zcbus;
grant execute ON FUNCTION zcbus.zcbus_func_ddl_command to zcbus;
alter user zcbus REPLICATION;
grant select on all tables in schema public to zcbus;
grant execute on function pg_hba_file_rules to zcbus;

2 补充说明

2.1 添加新的同步表

添加新表,根据schemaname.tablename,确认postgres账号开启表的补充日志模式状态开启成功

select relreplident from pg_class c , pg_tables tab where tab.schemaname = 'schemaname' and c.relname='tablename';

查询结果说明

d = 默认(主键)
n = 无
f = 所有列
i = 索引的indisreplident被设置或者默认
查询结果为f,则此表满足同步条件,开启表的补充日志模式成功

注:每次添加完新的同步表,需要再次对同步用户执行如下权限

grant select on all tables in schema public to zcbus;
grant execute on function pg_hba_file_rules to zcbus;

2.2 源表为分区表开启补充日志

SELECT
   parent.relname AS partitioned_table_name,
   parent.relkind AS partitioned_table_type,
   parent.relpartbound AS partition_bound,
   child.relname,
   child.relkind,
   child.relreplident,
   'ALTER TABLE ' || nspname || '.' || child.relname || ' REPLICA IDENTITY FULL' rif
FROM
 pg_class parent
LEFT JOIN pg_inherits ON pg_inherits.inhparent = parent.oid
LEFT JOIN pg_class child ON pg_inherits.inhrelid = child.oid
LEFT JOIN pg_namespace pns ON child.relnamespace = pns.oid
WHERE
 parent.relkind = 'p'
AND parent.relname = 'tabname';

---该语句能查询出分区表中的分区是否开启补充日志,若没有开启,可通过生成的批量SQL开启表补充日志
文档更新时间: 2024-07-26 22:28   作者:王增轩