KINGBASE开启增量流程

前提条件:源库参数配置

修改kingbase.conf文件

#逻辑级别日志
wal_level = logical

#设置日志保留个数,设置太少了,日志量太大了日志自动删除后,容易读不到日志
wal_keep_segments = 10

#如果需要远程读日志,需要设置,至少为1,就接受1个读日志线程
max_wal_senders = 10

#验证是否成功
select name,setting,category from pg_settings where name in (‘wal_level’,’wal_keep_segments’,’max_wal_senders’);

如果远程读日志,需要设置读日志的权限,修改sys_hba.conf

host replication all 0.0.0.0/32 ident

#验证是否成功
select * from pg_hba_file_rules;

1、创建用户

用system账号登陆kingbase数据库,执行如下命令:

create user zcbus with password ‘zcbus’;
grant connect on database *** to zcbus;
create schema zcbus AUTHORIZATION zcbus;

2、创建DDL事件记录表

create table zcbus.ddl_event_tbl (
event text,
tag text,
objid oid,
object_type text,
schema_name text,
object_identity text,
query text,
op_time timestamp default now()
);

#查看zcbus.ddl_event_tbl是否创建成功
select * from information_schema.tables where table_schema=’zcbus’ and table_type=’BASE TABLE’ and table_name=’ddl_event_tbl’;

#开启REPLICA IDENTITY参数(对于没有主键的表,如果要涉及到复制的话,就需要开启此参数,为了能执行更新和删除操作)
alter table “zcbus”.”ddl_event_tbl” REPLICA IDENTITY FULL;

3、创建函数zcbus_func_ddl_command

create or replace function zcbus.zcbus_func_ddl_command() returns event_trigger as $$
declare
  v1 text;
  r record;
begin
  select query into v1 from pg_stat_activity where pid=pg_backend_pid();
  -- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag; 
  if TG_EVENT='ddl_command_end' then  
    SELECT * into r FROM pg_event_trigger_ddl_commands();
    if r.classid > 0 then
      insert into zcbus.ddl_event_tbl(event, tag, objid, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.objid, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
  if TG_EVENT='sql_drop' then
    if TG_TAG != 'ALTER TABLE' then
      SELECT * into r FROM pg_event_trigger_dropped_objects();
      insert into zcbus.ddl_event_tbl(event, tag, object_type, schema_name, object_identity, query)  
        values(TG_EVENT, TG_TAG, r.object_type, r.schema_name, r.object_identity, v1);
    end if;
  end if;
end;  
$$ language plpgsql strict;

检查function是否创建:

select a.*
FROM pg_catalog.pg_proc a
JOIN pg_catalog.pg_namespace b ON a.pronamespace = b.oid
WHERE a.proname = ‘zcbus_func_ddl_command’ and b.nspname=’zcbus’;

4、创建事件触发器

CREATE EVENT TRIGGER zcbus_et_ddl_command on ddl_command_end EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();
CREATE EVENT TRIGGER zcbus_et_ddl_drop on sql_drop EXECUTE PROCEDURE zcbus.zcbus_func_ddl_command();

检查EVENT TRIGGER是否创建:

select * from pg_event_trigger where evtname in (‘zcbus_et_ddl_drop’,’zcbus_et_ddl_command’);

5、用户授权

grant select,trigger on table zcbus.ddl_event_tbl to zcbus;
grant execute ON FUNCTION zcbus.zcbus_func_ddl_command to zcbus;
alter user zcbus REPLICATION;
grant select on all tables in schema public to zcbus;
grant execute on function pg_hba_file_rules to zcbus;

注:对于发布库添加的新表,需要再次执行授权命令如下:
alter table public.table_name REPLICA IDENTITY FULL;
grant select on all tables in schema public to zcbus;
grant execute on function pg_hba_file_rules to zcbus;

6、开启REPLICA IDENTITY(建议对所有需要同步的表开启全列补充日志)

\c database_name;
ALTER TABLE schema_name.table_name REPLICA IDENTITY FULL;

生成修改表sql语句参考

SELECT ‘ALTER TABLE ‘||nspname||’.’||relname||’ REPLICA IDENTITY FULL;’ FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind=’r’ and nspname=’public’;
注:表所属用户或者超级用户才有开启REPLICA IDENTITY权限,否则报错:must be owner of table

查询REPLICA IDENTITY状态

SELECT c.relreplident FROM pg_class c,pg_namespace n where relnamespace = n.oid and c.relkind=’r’ and nspname=’schema_name’ and relname=’table_name’
状态说明:
d = DEFAULT - 更新和删除将包含primary key列的现前值
n = NOTHING - 更新和删除将不包含任何先前值
f = FULL - 更新和删除将包含所有列的先前值
i = INDEX index name -?更新和删除事件将包含名为index name的索引定义中包含的列的先前值。

文档更新时间: 2024-01-29 21:16   作者:阿力