- 1 数据库配置(必选项)
- 1.1 日志格式配置(如果已经修改过,可忽略)
- 2 添加槽位号说明(可选操作)
- 2.1 创建槽位号(SQL/系统命令两种可选)
- 2.1.1 查看创建槽位号信息
- 2.1.2 切换日志测试
- 2.1.3 删除日志参考:
- 3 配置解析日志用户(必选项)
- 4 数据表配置(必选项)
- 4.1 需要同步的表打开补充日志
- 4.1.1 分区表检查SQL以及注意事项
- 源表为分区表开启补充日志
- 5 DDL兼容配置(可选项)
- 5.1 创建ddl_event_tbl表(用于存储DDL语句)
- 5.2 创建函数zcbus_func_ddl_command
- 5.3 创建事件触发器
- 6 用户授权(必选项)
- 6.1 创建用户及赋权
- 7 补充说明
- 7.1 添加新的同步表
- 7.2 数据类型支持列表
1 数据库配置(必选项)
1.1 日志格式配置(如果已经修改过,可忽略)
- 修改postgresql.conf文件
#逻辑级别日志
wal_level = logical
#如果需要远程读日志,需要设置,至少为1,就接受1个读日志线程
max_wal_senders = 10 #建议大于等于10
#设置日志保留个数,设置太少了,日志量太大了日志自动删除后,容易读不到日志
wal_keep_segments = 10 #建议大于等于10
# wal_keep_size:要为日志复制进程保留多少日志量。旧版本使用的是wal_keep_segments参数(要为日志复制进程保留多少个日志文件),在pg 13中,该参数已被移除。wal_keep_size = wal_keep_segments * wal_segment_size
# 修改完成后需要重启数据库:pg_ctl -D /var/lib/postgresql/data -l logfile restart
# wal_level=logical 需要重启数据库
- 验证是否成功
postgres=# select name,setting,category from pg_settings where name in ('wal_level','wal_keep_segments','max_wal_senders','wal_keep_size');
name | setting | category
-------------------+---------+-------------------------------
max_wal_senders | 10 | Replication / Sending Servers
wal_keep_segments | 10 | Replication / Sending Servers
wal_level | logical | Write-Ahead Log / Settings
(3 rows)
2 添加槽位号说明(可选操作)
(如发布数据库未配置SLOT_NAME参数,则此步骤添加槽位号跳过,目前软件仅支持添加物理复制槽)
2.1 创建槽位号(SQL/系统命令两种可选)
- 创建槽位号
select pg_create_physical_replication_slot('zcbus_slot_name');
- 删除槽位号
select pg_drop_replication_slot('zcbus_slot_name');
2.1.1 查看创建槽位号信息
postgres=# select * from pg_replication_slots;
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------------+--------+-----------+--------+----------+-----------+--------+------------+------+--------------+-------------+---------------------
zcbus_slot_name | | physical | | | f | t | 2495 | | | 0/136FA6E0 |
(1 row)
2.1.2 切换日志测试
- PG10之前版本
select pg_switch_xlog();
- PG10之后版本
select pg_switch_wal();
2.1.3 删除日志参考:
- 检查可以删除wal的日志文件
83e44cfdd569:~/data$ pg_controldata -D /var/lib/postgresql/data/ |grep "REDO WAL file"
Latest checkpoint's REDO WAL file: 000000010000000100000067
- 删除日志文件操作
pg_archivecleanup /var/lib/postgresql/data/pg_wal/ 000000010000000100000067
3 配置解析日志用户(必选项)
- 如果远程读日志,需要设置读日志的权限,修改pg_hba.conf
host replication all 0.0.0.0/32 ident/md5 ###可选
上段用户加密方式可以通过下述SQL查询加密方式,进行配置
- 查询数据库加密方式
show password_encryption;
- 修改pg_hba.conf执行pg_ctl命令生效
pg_ctl reload -D 指定pg_hba.conf所在目录
示例:pg_ctl reload -D /opt/postgresql/data
- 验证是否成功
postgres=# select * from pg_hba_file_rules;
line_number | type | database | user_name | address | netmask | auth_method | options | error
-------------+-------+---------------+-----------+-----------+-----------------------------------------+-------------+---------+-------
95 | local | {replication} | {all} | | | trust | |
4 数据表配置(必选项)
4.1 需要同步的表打开补充日志
ALTER TABLE schemaname.tabname REPLICA IDENTITY FULL;
---批量生成打开表补充日志sql语句参考
SELECT 'ALTER TABLE '||nspname||'.'||relname||' REPLICA IDENTITY FULL;' FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind='r' and nspname='public';
---注1:表所属用户或者超级用户才有开启REPLICA IDENTITY权限,否则报错:must be owner of table
4.1.1 分区表检查SQL以及注意事项
源表为分区表开启补充日志
SELECT
parent.relname AS partitioned_table_name,
parent.relkind AS partitioned_table_type,
parent.relpartbound AS partition_bound,
child.relname,
child.relkind,
child.relreplident,
'ALTER TABLE ' || nspname || '.' || child.relname || ' REPLICA IDENTITY FULL' rif
FROM
pg_class parent
LEFT JOIN pg_inherits ON pg_inherits.inhparent = parent.oid
LEFT JOIN pg_class child ON pg_inherits.inhrelid = child.oid
LEFT JOIN pg_namespace pns ON child.relnamespace = pns.oid
WHERE
parent.relkind = 'p'
AND parent.relname = 'tabname';
---该语句能查询出分区表中的分区是否开启补充日志,若没有开启,可通过生成的批量SQL开启表补充日志
备注:
postgresql只有owner或usersuper权限才能修改表结构或执行drop操作,请在执行操作前确定所同步表的Owner或用Superuser权限用户执行ALTER USER zcbus WITH SUPERUSER;
5 DDL兼容配置(可选项)
- 添加此功能后,在增量复制过程中,若有表结构修改等操作,软件将自动同步。若未添加此功能,在修改结构时若遇到结构不一致的情况,可能需要人工参与。
5.1 创建ddl_event_tbl表(用于存储DDL语句)
- 创建表
create schema zcbus;
create table zcbus.ddl_event_tbl (
event text,
tag text,
objid oid,
object_type text,
schema_name text,
object_identity text,
query text,
op_time timestamp default now()
);
- 检查ddl_event_tbl表是否创建
select * from information_schema.tables where table_type='BASE TABLE' and table_name='ddl_event_tbl';
5.2 创建函数zcbus_func_ddl_command
create or replace function zcbus.zcbus_func_ddl_command() returns event_trigger as $$
declare
v1 text;
r record;
begin
select query into v1 from pg_stat_activity where pid=pg_backend_pid();
-- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag;
if TG_EVENT='ddl_command_end' then
SELECT * into r FROM pg_event_trigger_ddl_commands();
if r.classid > 0 then
insert into zcbus.ddl_event_tbl(event, tag, objid, object_type, schema_name, object_identity, query)
values(TG_EVENT, TG_TAG, r.objid, r.object_type, r.schema_name, r.object_identity, v1);
end if;
end if;
if TG_EVENT='sql_drop' then
if TG_TAG != 'ALTER TABLE' then
SELECT * into r FROM pg_event_trigger_dropped_objects();
insert into zcbus.ddl_event_tbl(event, tag, object_type, schema_name, object_identity, query)
values(TG_EVENT, TG_TAG, r.object_type, r.schema_name, r.object_identity, v1);
end if;
end if;
end;
$$ language plpgsql strict;
- 检查function是否创建:
select a.* FROM pg_catalog.pg_proc a JOIN pg_catalog.pg_namespace b ON a.pronamespace = b.oid
WHERE a.proname = 'zcbus_func_ddl_command' ;
5.3 创建事件触发器
CREATE EVENT TRIGGER zcbus_et_ddl_command on ddl_command_end EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
CREATE EVENT TRIGGER zcbus_et_ddl_drop on sql_drop EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
- 检查EVENT TRIGGER是否创建:
select * from pg_event_trigger where evtname in ('zcbus_et_ddl_drop','zcbus_et_ddl_command');
6 用户授权(必选项)
6.1 创建用户及赋权
切换到同步库下,执行如下(数据库命令行执行):
create user zcbus with password '*****';
grant connect on database *** to zcbus; # ***为需要同步的数据库名称
create schema zcbus authorization zcbus;
grant select,trigger on table zcbus.ddl_event_tbl to zcbus;
grant execute ON FUNCTION zcbus.zcbus_func_ddl_command to zcbus;
alter user zcbus REPLICATION;
grant select on all tables in schema *** to zcbus;
grant execute on function pg_hba_file_rules to zcbus;
alter table zcbus.ddl_event_tbl owner to zcbus;
以上过程中,web页面不要点击注册按键,所有配置手工完成!
7 补充说明
7.1 添加新的同步表
添加新表,根据schemaname.tablename,确认postgres账号开启表的补充日志模式状态开启成功
select relreplident from pg_class c , pg_tables tab where tab.schemaname = 'schemaname' and c.relname='tablename';
查询结果说明
d = 默认(主键)
n = 无
f = 所有列
i = 索引的indisreplident被设置或者默认
查询结果为f,则此表满足同步条件,开启表的补充日志模式成功
注:每次添加完新的同步表,需要再次对同步用户执行如下权限
grant select on all tables in schema public to zcbus;
grant execute on function pg_hba_file_rules to zcbus;
7.2 数据类型支持列表
在抓取数据时,我们将TELPG数据类型与Zcbus支持的类型相匹配。如果我们不支持某个数据类型,我们会自动将该类型更改为最受支持的类型,或者在某些情况下,根本不加载该数据。我们的系统将会抛出异常、跳过我们不接受或转换的数据类型对应表。
TELPG类型 | 是否支持 | 描述 |
---|---|---|
smallint | 支持 | |
integer | 支持 | |
bigint | 支持 | |
decimal(m,d) | 支持 | |
numeric(m,d) | 支持 | |
real | 支持 | |
double precision | 支持 | |
smallserial | 支持 | |
serial | 支持 | |
bigserial | 支持 | |
money | 支持 | |
character varying(n) | 支持 | |
varchar(n) | 支持 | |
character(n) | 支持 | |
char(n) | 支持 | |
text | 支持 | |
timestamp | 支持 | |
tmestamp with time zone | 支持 | |
date | 支持 | |
time | 支持 | |
time with time zone | 支持 | |
interval | 支持 | |
bit(n) | 支持 | |
bit varing(n) | 支持 | |
boolean | 支持 | |
point | 支持 | |
line | 支持 | |
lseg | 支持 | |
box | 支持 | |
path | 支持 | |
polygon | 支持 | |
circle | 支持 | |
cidr | 支持 | |
inet | 支持 | |
macaddr | 支持 | |
tsvector | 支持 | |
tsquery | 支持 | |
uuid | 支持 | |
xml | 支持 |
文档更新时间: 2024-07-27 20:55 作者:操李红