POSTGRESQL准备

修改参数

  • 修改postgresql.conf文件

    #逻辑级别日志
    wal_level = logical
    #设置日志保留个数,设置太少了,日志量太大了日志自动删除后,容易读不到日志
    wal_keep_segments = 10
    #如果需要远程读日志,需要设置,至少为1,就接受1个读日志线程
    max_wal_senders = 10

    验证是否成功

    select name,setting,category from pg_settings where name in ('wal_level','wal_keep_segments','max_wal_senders');
  • 如果远程读日志,需要设置读日志的权限,修改pg_hba.conf

host    replication     all             0.0.0.0/32            ident

验证是否成功

select * from pg_hba_file_rules;
  • 重启数据库

创建数据库用户

create user zcbus with password  'zcbus';

配置

手动注册(需要超级用户手动执行以下SQL)

  • 创建SHCEMA

    \c database_name;
    create schema zcbus AUTHORIZATION zcbus;

    注:schema可以使用其他名称,注意修改以下发布参数涉及的schema即可。

  • 创建ddl_event_tbl表

create table zcbus.ddl_event_tbl (  
  event text,
  tag text, 
  objid oid,
  object_type text,  
  schema_name text,  
  object_identity text,
  query text,
  op_time timestamp default now()
);
  • 创建FUNCTION zcbus_func_ddl_command
create or replace function zcbus.zcbus_func_ddl_command() returns event_trigger as $$  
declare  
  v1 text;
  r record;
begin
  select query into v1 from pg_stat_activity where pid=pg_backend_pid();
  -- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag; 
  if TG_EVENT='ddl_command_end' then  
    SELECT * into r FROM pg_event_trigger_ddl_commands();
    if r.classid > 0 then
      insert into zcbus.ddl_event_tbl(event, tag, objid, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.objid, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
  if TG_EVENT='sql_drop' then
    if TG_TAG != 'ALTER TABLE' then
      SELECT * into r FROM pg_event_trigger_dropped_objects();
      insert into zcbus.ddl_event_tbl(event, tag, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
end;  
$$ language plpgsql strict;
  • 创建触发器
CREATE EVENT TRIGGER zcbus_et_ddl_command on ddl_command_end EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();  
CREATE EVENT TRIGGER zcbus_et_ddl_drop on sql_drop EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();

自动注册

  • zcbus用户赋超级用户权限
    ALTER USER zcbus WITH SUPERUSER;
  • web界面自动注册(资源管理—>发布节点—>勾选节点id—>DB配置—>测试连接—>注册)
  • zcbus用户收回超级用户权限
    ALTER USER zcbus WITH NOSUPERUSER;

分析日志是否采用槽位号模式

select * from pg_replication_slots;
pg_receivewal --drop-slot --slot=my_test
pg_receivewal --create-slot --slot=my_test

切换POSTGRESQL日志操作
旧版本:select pg_switch_xlog();
新版本:select pg_switch_wal();
ZCBUS发布配置增加SLOT_NAME参数,自动匹配指定槽位号进行日志读取。

赋权

grant connect on database databasedbname to zcbus;
grant usage on schema zcbus to zcbus;
grant usage on schema {schema_name} to zcbus;  ## {schema_name} 需要复制的schema
grant select,trigger on table zcbus.ddl_event_tbl  to zcbus;
grant execute ON FUNCTION zcbus_func_ddl_command to zcbus;
alter user zcbus REPLICATION ;
alter schema zcbus owner to zcbus;
grant select on all tables in schema public to zcbus;
grant select on all tables in schema schema_name to zcbus;
grant select on table pg_hba_file_rules to zcbus;
grant execute on function pg_hba_file_rules to zcbus;  #pg10版本以后存在此权限,之前版本不用执行,手动查询pg_hba.conf权限,可忽略

注:
(1)schema_name是指需要发布的相应schema名称
(2)databasedbname是指需要发布的相应数据库名称

权限查询

查看用户的表权限

select * from information_schema.table_privileges where grantee='zcbus';

查看usage权限表

select * from information_schema.usage_privileges where grantee='zcbus';

查看存储过程函数相关权限

select * from information_schema.routine_privileges where grantee='zcbus';

开启表REPLICA IDENTITY参数

开启REPLICA IDENTITY

\c database_name;
ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;

生成修改表sql语句参考

SELECT 'ALTER TABLE '||nspname||'.'||relname||' REPLICA IDENTITY FULL;' FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind='r' and nspname='public';

注:表所属用户或者超级用户才有开启REPLICA IDENTITY权限,否则报错:must be owner of table

查询REPLICA IDENTITY状态

SELECT c.relreplident FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind='r' and nspname='schema_name' and relname='table_name'

状态说明:

  • d = DEFAULT - 更新和删除将包含primary key列的现前值
  • n = NOTHING - 更新和删除将不包含任何先前值
  • f = FULL - 更新和删除将包含所有列的先前值
  • i = INDEX index name -?更新和删除事件将包含名为index name的索引定义中包含的列的先前值。

ZCBUS准备

  1. 添加发布节点
  • 资源管理-发布节点
      添加发布节点,选择运行容器位置
      配置发布数据库配置信息【POSTGRESQL】即可
序号 变量名 默认值 是否隐藏函数 描述
1 db_type postgresql 不隐藏 源数据库类型
2 db_name zcbus_demo 不隐藏 捕获数据库名
3 host 172.17.58.146 不隐藏 数据库IP地址
4 port 5432 不隐藏 数据库端口号
5 user postgres 不隐藏 捕获数据库用户名
6 password 1qaz!QAZ 不隐藏 捕获数据库密码
26 read_log_from_remote 1 隐藏 是否远程
30 log_buffer_len 隐藏 MYSQL日志,每轮读取日志最大限制,兆为单位
31 log_arch_path 隐藏 归档日志路径
31 full_sync_buffer 100 隐藏 全量导出的Buffer
31 full_sync_threads 1 隐藏 全量并发线程数
32 log_path /var/lib/pgsql/10/data/pg_wal 隐藏 日志缓存目录
33 event_trigger_table databus.ddl_event_tbl 隐藏 ddl触发器表名
33 max_packet_len 5242880 隐藏 发送到kafka的最大包长度(BYTE)
33 full_publish_interval 0 隐藏 全量发送每个包的间隔(毫秒),0为不限制
35 land_to_file 0 隐藏 是否落地缓存,1:是 0:否
36 land_file_reserve_cont 0 隐藏 文件发送到kafka之后,保留暂时不删除的文件个数,默认为0,发送成功之后马上删除文件
38 message_encrypt 0 隐藏 发送到kafka的消息是否加密,默认为0不加密,1为加密
39 not_compose_trans 0 隐藏 是否进行交易合成,0 交易合成,1不合成直接发送,默认为0 ,PG有效
40 auto_add_table_real_rule 1 不隐藏 是否自动打开全列附加日志,0 不处理,或者手工处理,1 自动打开全列日志
41 slot_name 不隐藏 是否添加槽位号,指定槽位号名字

附: ACL 权限列表

Relacl Code Privilege Name
a INSERT
r SELECT
w UPDATE
d DELETE
D TRUNCATE
x REFERENCES
t TRIGGER
arwdDxt ALL
文档更新时间: 2022-03-14 03:11   作者:阿力