POSTGRESQL准备
修改参数
修改postgresql.conf文件
#逻辑级别日志 wal_level = logical #设置日志保留个数,设置太少了,日志量太大了日志自动删除后,容易读不到日志 wal_keep_segments = 10 #如果需要远程读日志,需要设置,至少为1,就接受1个读日志线程 max_wal_senders = 10
验证是否成功
select name,setting,category from pg_settings where name in ('wal_level','wal_keep_segments','max_wal_senders');
如果远程读日志,需要设置读日志的权限,修改pg_hba.conf
host replication all 0.0.0.0/32 ident
验证是否成功
select * from pg_hba_file_rules;
- 重启数据库
创建数据库用户
create user zcbus with password 'zcbus';
配置
手动注册(需要超级用户手动执行以下SQL)
创建SHCEMA
\c database_name; create schema zcbus AUTHORIZATION zcbus;
注:schema可以使用其他名称,注意修改以下发布参数涉及的schema即可。
创建ddl_event_tbl表
create table zcbus.ddl_event_tbl (
event text,
tag text,
objid oid,
object_type text,
schema_name text,
object_identity text,
query text,
op_time timestamp default now()
);
- 创建FUNCTION zcbus_func_ddl_command
create or replace function zcbus.zcbus_func_ddl_command() returns event_trigger as $$
declare
v1 text;
r record;
begin
select query into v1 from pg_stat_activity where pid=pg_backend_pid();
-- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag;
if TG_EVENT='ddl_command_end' then
SELECT * into r FROM pg_event_trigger_ddl_commands();
if r.classid > 0 then
insert into zcbus.ddl_event_tbl(event, tag, objid, object_type, schema_name, object_identity, query)
values(TG_EVENT, TG_TAG, r.objid, r.object_type, r.schema_name, r.object_identity, v1);
end if;
end if;
if TG_EVENT='sql_drop' then
if TG_TAG != 'ALTER TABLE' then
SELECT * into r FROM pg_event_trigger_dropped_objects();
insert into zcbus.ddl_event_tbl(event, tag, object_type, schema_name, object_identity, query)
values(TG_EVENT, TG_TAG, r.object_type, r.schema_name, r.object_identity, v1);
end if;
end if;
end;
$$ language plpgsql strict;
- 创建触发器
CREATE EVENT TRIGGER zcbus_et_ddl_command on ddl_command_end EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
CREATE EVENT TRIGGER zcbus_et_ddl_drop on sql_drop EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
自动注册
- zcbus用户赋超级用户权限
ALTER USER zcbus WITH SUPERUSER; - web界面自动注册(资源管理—>发布节点—>勾选节点id—>DB配置—>测试连接—>注册)
- zcbus用户收回超级用户权限
ALTER USER zcbus WITH NOSUPERUSER;
分析日志是否采用槽位号模式
select * from pg_replication_slots;
pg_receivewal --drop-slot --slot=my_test
pg_receivewal --create-slot --slot=my_test
切换POSTGRESQL日志操作
旧版本:select pg_switch_xlog();
新版本:select pg_switch_wal();
ZCBUS发布配置增加SLOT_NAME参数,自动匹配指定槽位号进行日志读取。
赋权
grant connect on database databasedbname to zcbus;
grant usage on schema zcbus to zcbus;
grant usage on schema {schema_name} to zcbus; ## {schema_name} 需要复制的schema
grant select,trigger on table zcbus.ddl_event_tbl to zcbus;
grant execute ON FUNCTION zcbus_func_ddl_command to zcbus;
alter user zcbus REPLICATION ;
alter schema zcbus owner to zcbus;
grant select on all tables in schema public to zcbus;
grant select on all tables in schema schema_name to zcbus;
grant select on table pg_hba_file_rules to zcbus;
grant execute on function pg_hba_file_rules to zcbus; #pg10版本以后存在此权限,之前版本不用执行,手动查询pg_hba.conf权限,可忽略
注:
(1)schema_name是指需要发布的相应schema名称
(2)databasedbname是指需要发布的相应数据库名称
权限查询
查看用户的表权限
select * from information_schema.table_privileges where grantee='zcbus';
查看usage权限表
select * from information_schema.usage_privileges where grantee='zcbus';
查看存储过程函数相关权限
select * from information_schema.routine_privileges where grantee='zcbus';
开启表REPLICA IDENTITY参数
开启REPLICA IDENTITY
\c database_name;
ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;
生成修改表sql语句参考
SELECT 'ALTER TABLE '||nspname||'.'||relname||' REPLICA IDENTITY FULL;' FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind='r' and nspname='public';
注:表所属用户或者超级用户才有开启REPLICA IDENTITY权限,否则报错:must be owner of table
查询REPLICA IDENTITY状态
SELECT c.relreplident FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind='r' and nspname='schema_name' and relname='table_name'
状态说明:
- d = DEFAULT - 更新和删除将包含primary key列的现前值
- n = NOTHING - 更新和删除将不包含任何先前值
- f = FULL - 更新和删除将包含所有列的先前值
- i = INDEX index name -?更新和删除事件将包含名为index name的索引定义中包含的列的先前值。
ZCBUS准备
- 添加发布节点
- 资源管理-发布节点
添加发布节点,选择运行容器位置
配置发布数据库配置信息【POSTGRESQL】即可
序号 | 变量名 | 默认值 | 是否隐藏函数 | 描述 |
---|---|---|---|---|
1 | db_type | postgresql | 不隐藏 | 源数据库类型 |
2 | db_name | zcbus_demo | 不隐藏 | 捕获数据库名 |
3 | host | 172.17.58.146 | 不隐藏 | 数据库IP地址 |
4 | port | 5432 | 不隐藏 | 数据库端口号 |
5 | user | postgres | 不隐藏 | 捕获数据库用户名 |
6 | password | 1qaz!QAZ | 不隐藏 | 捕获数据库密码 |
26 | read_log_from_remote | 1 | 隐藏 | 是否远程 |
30 | log_buffer_len | 隐藏 | MYSQL日志,每轮读取日志最大限制,兆为单位 | |
31 | log_arch_path | 隐藏 | 归档日志路径 | |
31 | full_sync_buffer | 100 | 隐藏 | 全量导出的Buffer |
31 | full_sync_threads | 1 | 隐藏 | 全量并发线程数 |
32 | log_path | /var/lib/pgsql/10/data/pg_wal | 隐藏 | 日志缓存目录 |
33 | event_trigger_table | databus.ddl_event_tbl | 隐藏 | ddl触发器表名 |
33 | max_packet_len | 5242880 | 隐藏 | 发送到kafka的最大包长度(BYTE) |
33 | full_publish_interval | 0 | 隐藏 | 全量发送每个包的间隔(毫秒),0为不限制 |
35 | land_to_file | 0 | 隐藏 | 是否落地缓存,1:是 0:否 |
36 | land_file_reserve_cont | 0 | 隐藏 | 文件发送到kafka之后,保留暂时不删除的文件个数,默认为0,发送成功之后马上删除文件 |
38 | message_encrypt | 0 | 隐藏 | 发送到kafka的消息是否加密,默认为0不加密,1为加密 |
39 | not_compose_trans | 0 | 隐藏 | 是否进行交易合成,0 交易合成,1不合成直接发送,默认为0 ,PG有效 |
40 | auto_add_table_real_rule | 1 | 不隐藏 | 是否自动打开全列附加日志,0 不处理,或者手工处理,1 自动打开全列日志 |
41 | slot_name | 不隐藏 | 是否添加槽位号,指定槽位号名字 |
附: ACL 权限列表
Relacl Code | Privilege Name |
---|---|
a | INSERT |
r | SELECT |
w | UPDATE |
d | DELETE |
D | TRUNCATE |
x | REFERENCES |
t | TRIGGER |
arwdDxt | ALL |
文档更新时间: 2022-03-14 03:11 作者:阿力