一、介绍
在数据库和策略之间,进行管理,调整的一个桥梁。为数据应用提供一个更为合理,灵活的框架,保证软件能够进行多数据库配置。
- 作用
为企业数据应用,提供一个桥梁作用,操作方便快捷,简单。 - 功能:
- 数据库和数据应用策略之间桥梁连接作用
- 数据应用过程中,多线程策略管理服务
- 数据应用过程中,针对整个策略中的数据,在通道层,可以整体配置表附加信息
- 加载过程中,DDL的控制粒度,是否执行DDL等操作
- 加载过程中,是否增加辅助列:合并分区列数据、源端生产操作提交时间、数据最后操作类型、目标端数据加载时间、数据首次插入到目标端需要的时间字段。
- 数据应用过程中,针对整个策略中的数据,在通道层,可以整体配置表附加信息
- 数据加载的过程中,是否完全过滤DDl操作等
- 控制全量、增量数据加载方式
二、操作说明
1、配置主控通道
进入订阅页面,在左上角的订阅下拉框中选中一个订阅客户端,可以看到该订阅客户端拥有的所有策略(如页面为空,可能是该订阅客户端没有被分配策略,请参考策略分配)
- 配置:将策略与主控通道绑定
- 新增:当某一通道内处理的表过多时可能会造成数据延时过大或者加载慢等问题,可在该策略下新增多个通道,用以分担压力,新增的通道共用同一通道属性参数
- 解绑:将策略与加工通道解除关联,如需做此操作,请确认加工表的状态已重置,且通道处于停止状态,解绑后该策略可与其他主控通道绑定
选中一个策略,点击其右侧的配置按钮,在弹出的新页面中完善如下信息
- 类型:目标数据库类型,有PG,ORACLE,MYSQL等等
- 主控:即为订阅的通道,包含有通道属性(可通过点击右侧的新增按钮新增主控)
- 数据库:在确认主控后会根据类型及主控信息自动补充,无需输入
- 通道数:通道的数量,一般默认即可
- 库名表名:在目标库的库名表名大小写设置,一般当目标库为ORACLE或者达梦时,需要设置为大写
- 启动全量:默认即可
点击下一步,即可完成通道配置
2、配置参数说明
变量 | 值 | 描述 | 是否隐藏 |
---|---|---|---|
IfChangeSameDatabaseDatatype | 0 | 是否针对同种数据库类型,转换为ZCBUS数据类型,如果为源数据库类型(1),则返回源数据的SQL语句,如果为ZCBUS数据库类型(0),则返回ZCBUS翻译的SQL语句 | 是 |
full_load_with_pk | 1 | 设置为开启(1)时,全量装载时先建主键和唯一索引再装载数据,默认为关闭(0),装载完数据之后再建主键和其他索引 | 是 |
ifRepairDataForUpdate | 1 | 是否启用自动修复数据功能 1 允许启用更新失败自动修复, 0 不启用自动修复,兼容JAVA/C | 是 |
ZCBUS_SOURCE_PART | 是否添加辅助列,空,则不添加,否则为全量期间,自动增加该字段,兼容C 如果内容含冒号:colname:value格式,则显示列名和列对应的值。 如果没有冒号,则为value,列名为ZCBUS_SOURCE_PART |
是 | |
ZCBUS_SOURCE_OPTIME | 0 | 是否添加源端commit时间信息列,0不添加,1添加,兼容C | 是 |
ZCBUS_SOURCE_OPTYPE | 0 | 是否添加操作类型列,全量期间默认为INSERT/增量跟进实际操作类型记录,0不添加,1添加,兼容C | 是 |
ZCBUS_TARGET_OPTIME | 0 | 是否增加操作时间列,0不增加,1增加,兼容C | 是 |
ZCBUS_INSERT_OPTIME | 0 | 记录全量和增量插入时间的字段,兼容C | 是 |
ifFilterDDL | 0 | 设置为开启(1)时,过滤掉ddl操作,忽略ifFilterFullDDL和ifFilterRealDDL的设置,默认为关闭(0) | 否 |
tablespacename | 表存储 表空间 | 是 | |
indexspacename | 索引存储表空间[ORACLE使用] | 是 | |
kafka_consume_timeout | 1000 | 消费KAFKA集群中消费超时时间,单位毫秒,默认5000毫秒,最小为1000毫秒。设置小于1000毫秒,则设置参数失效 | 是 |
RepairDataofNULL | 如果NOT NULL列插入NULL,替换成设置的字符串 | 是 | |
max_consume_message | 64 | 设置一次消费的最大消息数,默认32,最大64,最小为装载线程数 | 是 |
no_message_restart_interval | 0 | 可以设置当一段时间内一直消费不到数据,重启通道进程,单位秒,设置重启间隔为0则不会重启进程,最小间隔为60 | 是 |
IfUpdateSplitDeleteInsert | 0 | 设置为开启(1)时,将update转成delete+insert装载,默认为关闭(0) | 是 |
IfSplitUpdateMessage | 0 | 是否拆分update message,默认为关闭(0),设置为开启(1)时,如果update可并发,会拆分并发装载 | 是 |
full_sql_mode | 0 | 全量装载时使用SQL模式,0位load方式装载,1位sql绑定方式加载,2位直接使用sql语句加载,默认为0,load方式加载 | 否 |
real_sql_mode | 0 | 增量装载时使用sql模式,0为load方式装载,1为sql绑定方式装载,2为直接使用sql方式装载,默认为1,sql绑定方式加载 | 否 |
ifFilterInsert | 0 | 设置为开启(1)时,增量装载过程中过滤掉insert操作,默认为关闭(0)不过滤 | 否 |
ifFilterUpdate | 0 | 设置为开启(1)时,增量装载过程中过滤掉update操作,默认为关闭(0)不过滤 | 否 |
ifFilterDelete | 0 | 设置为开启(1)时,增量装载过程中过滤掉delete操作,默认为关闭(0)不过滤 | 否 |
real_delay_load_min | 0 | 增量延迟加载最小时间,单位为分钟,默认为0,不延迟 | 是 |
real_delay_load_max | 0 | 增量延迟加载最大时间,单位为分钟,默认为0,不延迟。real_delay_load_min和real_delay_load_max必须同时设置,而且real_delay_load_max需要大于real_delay_load_min。 如果(当前时间-kafka里数据的时间) <= real_delay_load_min,则暂时不加载,等到(当前时间-kafka里数据的时间) > real_delay_load_max启动加载,直到(当前时间-kafka里数据的时间) <= real_delay_load_min暂停加载,进入下一轮等待 |
是 |
IfChangeColumnNameCase | 0 | 是否修改目标端列名的大小写,默认为不修改(0),按照原来的大小写 | 是 |
max_sql_length | 5242880 | SQL_MODE=2的情况下生效,直接sql语句装载时,如果目标库支持一条insert可以包含多个values的模式,该参数可以设置为一条insert的sql语句的最大长度,默认为0,一次插入一条数据 | 是 |
zcbusmq_mode | 0 | 消费数据时是否从本地zcbus消息队列读取: 0:从kafka消费,默认值 1:只从本地zcbus消息队列消费全量数据 2:从本地zcbus消息队列消费全量和增量数据 |
是 |
type_map | 类型映射,格式 源类型1:目标类型1,源类型2:目标类型2 | 是 | |
full_error_ignore | 0 | 全量用sql绑定方式加载时,如果此时表没有主键的话,设置为开启(1),可以忽略全量装载期间的错误 | 是 |
max_message_size | 5242880 | 客户端装载目标是kafka、rocketmq、rabbitmq时,发送的最大的消息长度,单位是字节,默认是5242880,设置为0的话,每行数据都会拆分成单条发送 | 是 |
update_to_insert_cols_diff_skip | 0 | update转为delete+insert时,如果update的after和before数据的列组合起来仍然不够实际列数时,是否跳过,默认为0,不跳过 | 是 |
load_with_tag | 0 | 双向复制时使用此参数,设置为1时, 如果目标库是mysql,装载时不写binlog日志 如果目标库是oracle,装载时打上tag 如果目标库是sqlserver,装载时加交易名 |
是 |
statistics_into_influxdb | 0 | 是否将增量信息记录到influxdb中 0 不添加 1添加 | 是 |
merge_kafka_message | 1 | 默认0,单包加载处理,设置为1时,增量消费时同一个表连续的小包会进行合并,目前是insert和delete的小包会合并,最多合并成5M | 是 |
output_data_format | 0 | 目标为kafka、rocketmq、rabbitmq时,输出的内容格式:JSON(0): 输出json格式;SQL(1): 输出sql语句格式 | 是 |
compress_level | 0 | 目标为rocketmq时,设置压缩级别,范围是0 - 9 | 是 |
delete_before_insert | 0 | 设置为开启(1)时,如果bsd包里的操作全是insert的时候,会先批量删除,再执行插入 | 是 |
load_error_retry_times | 3 | 加载出现错误后,重试次数,最小三次,可以设置处理N次 | 是 |
count_after_full_sync_end | 1 | 设置为开启(1)时,全量装载完成时对表进行select count操作,统计目标库的总记录数,默认为开启(1) | 是 |
compatible_with_old_bsdata | 1 | 设置为开启(1)时,对旧版本的bsd数据里的列类型转换为新类型后装载 | 是 |
insert_parallel_load | 1 | 目标端为数据库时,可以控制对于同一张表的insert操作是否可以并发装载,默认为开启(1),可以并发装载,设置为关闭(0)时,取消并发装载 | 是 |
delete_parallel_load | 0 | 目标端为数据库时,可以控制对于同一张表的删除操作是否可以并发装载,默认为1,可以并发装载,设置为0时,取消并发装载 | 是 |
audit_mode | 0 | 设置为开启(1)时,开启审计模式: 1.自动添加ZCBUS_SOURCE_OPTYPE列记录操作类型 2.全量只建表不建主键和索引,增量ddl全部过滤 3.所有dml操作自动转为insert操作,insert操作的类型是insert;update操作拆分为2条,before值的操作类型是updatebefore,after值的操作类型为update;delete操作的类型是delete |
否 |
dml_error_skip | 0 | 设置为开启(1)时,可以跳过出错的dml操作,目标为restapi时,可以拆分发送,跳过出错的内容 | 是 |
database_update_interval | 0 | 配置数据库统计信息和offset更新的间隔时间,单位为分钟,默认为0,装载完之后马上更新配置表 | 是 |
multi_table_merge_mode | 多表合一参数,设置为1时,该通道使用多表合一模式,该功能主要用于对分布式数据库的分表数据进行合并装载,去掉多余的ddl操作,通道内配置的多个表需要源端表结构完全一致,多个表发起订阅时,需要所有表一起发起。 此参数修改,需要重启服务才能生效 |
是 | |
hash_partition | 设置之后,在创建表的时候,可以将表自动建立成hash分区表,目前支持ORACLE/MYSQL/OPENGAUSS/MOGDB,可以设置为数字或者指定某列进行分区: 1. 只设置分区个数,比如3,则根据主键进行自动分区,数字为分区数量,在MYSQL中,则按照主键中第一列作为分区列 2. 指定列进行分区,比如col1:5或者col1,col2:5 注:如果设置表级别的hash_partition,则忽略通道级别的hash_partition |
是 | |
ignore_trail_spaces | 0 | oracle专属参数,装载时是否去掉字符串尾部空格,默认为开启(1)去掉空格,设置为关闭(0)时,会按照原始数据装载 | 是 |
ignore_full_tag_in_real | 0 | etl订阅客户端专属参数,设置为开启(1)时,etl消费增量数据时忽略掉全量控制信息,不会自动转为消费全量数据,并且全量发布过程中的表也可以进行etl,默认为关闭(0) | 是 |
auto_choose_columns_as_pk | 0 | 设置为开启(1)时,为没有主键的表自动选出可作为主键的列(不会自动建主键,只是作为delete和update的where条件),表有pk/uk或者设置applybykeys时,此参数在该表不起作用,目前支持目标库为oracle/mysql/sqlserver/postgresql/opengauss/dm。 自动选择基本的数字类型和字符串类型(除lob和二进制类型)列作为主键列(目标表的列类型) | 否 |
parallel_by_target_table | 0 | 如果通道内的部分或全部表的目标表相同,且不同表的数据可能存在主键冲突或者有ddl的情况,可以设置该参数为1,程序将按照目标表是否相同来判断是否可以并发装载,减少不同表的数据之间的主键冲突 | 是 |
full_subscribe_first | 1 | 设置为开启(1)时,增量订阅第一次启动时,会先检查所有表的订阅状态,没有需要全量订阅的表时,才会启动增量 | 是 |
update_merge | 0 | 设置为开启(1)时,可以将一个消息包内相同主键的update合并之后再装载,如果update小包比较多,可以配合merge_kafka_message使用 | 是 |
update_remove_after_pk | 0 | 设置为开启(1)时,如果update没有修改主键的话,去掉after值中的主键再做update,如果修改了主键的话,进行先删后插,适用于华为DWS数据库等不能更新主键的情况 | 是 |
year_error_replace | 0 | 默认为0,如果设置为非0值,可以将数据中的格式不正确的年月日进行替换,例如设置为1900时,进行如下替换:1. 0000-00-00 00:00:00 –> 1900-01-01 00:00:00;2. 0001-00-00 00:00:00 –> 0001-01-01 00:00:00。目前支持如下情形进行替换:1. 目标库为oracle,列类型为date时;2. 目标库为pg,bsd里的数据类型为datetime/date/timestamp时;3. 目标为SqlServer,列类型为datetime类型 | 是 |
filter_index | 0 | 1 过滤索引, 0 不过滤索引 | 是 |
temp_table_mode | 0 | 设置为开启(1)时,etl开启临时表模式 | 是 |
ifFilterFullDDL | 0 | 设置为开启(1)时,全量装载时过滤掉ddl操作,默认为关闭(0) | 否 |
ifFilterRealDDL | 0 | 设置为开启(1)时,增量装载时过滤掉ddl操作,默认为关闭(0) | 否 |
full_subscribe_only | 0 | 默认关闭(0),订阅全量完成,转成增量;设置为开启(1)时,只进行全量订阅 | 是 |
filter_trans_table | 订阅客户端在每次交易开始时先update表zcbus.zcbus_sys_trans,再装数据,zcbus-real分析日志时,发现交易的第一条操作是zcbus.zcbus_sys_trans表的操作,就知道是订阅客户端做的交易了,会过滤掉整个交易。订阅客户端需要是sql绑定模式或者sql语句模式。MYSQL 支持LOAD模式,pg不支持copy模式 | 是 | |
column_case_sensitive | 0 | 设置为开启(1)时,bsd数据中记录的列名和目标表列名严格匹配大小写,适合有多列的列名忽略大小写一致的情况,默认为关闭(0) | 是 |
file_column_separator | , | db_type为file时,列分隔符,默认为逗号 如果有特殊字符,写法如下: 换行符 \n 制表符 \t 回车符 \r 反斜杠 \ 自定义 \xA1 A1为字符asc码的16进制值,严格两位,不够两位用0补充,比如x0B 如果有多个字符,可以连续写,比如 \t\r\n |
是 |
file_row_separator | \n | db_type为file时,行分隔符,默认为 特殊字符写法参考file_column_separator |
是 |
file_column_enclose | db_type为file时,每列数据可以用指定的字符串括起来,CSV类型默认为”特殊字符写法参考file_column_separator | 是 | |
file_escape_string | db_type为file时,需要进行转义的所有字符,CSV类型默认为”特殊字符写法参考file_column_separator | 是 | |
file_escape_add_char | db_type为file时,需要进行转义的字符前面增加的字符,TXT类型默认为反斜杠,CSV类型默认为”特殊字符写法参考file_column_separator | 是 | |
file_remove_string | db_type为file时,可以对指定的字符进行删除特殊字符写法参考file_column_separator | 是 | |
load_timeout_restart_interval | 0 | 可以设置当超过指定时间没装载完一批数据时,重启通道进程,单位秒,设置重启间隔为0则不会重启进程,最小间隔为60 | 是 |
ZCBUS_SEQUENCE_NO | 0 | 是否增加订阅程序插入或更新行的序号列,格式: 1. 0为不添加列,1为添加列,使ZCBUS_SEQUENCE_NO默认列名 2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 |
是 |
etl_change_table_name | 1 | 设置为关闭(0)时,etl处理时不替换原数据中的schema和table,默认为开启(1) | 是 |
use_schema_name_in_data | 0 | 设置为开启(1)时,装载到的目标表的schema名使用bsd数据中的schema,默认为关闭(0) | 是 |
use_table_name_in_data | 0 | 设置为开启(1)时,装载到的目标表的table名使用bsd数据中的table,默认为关闭(0) | 是 |
use_zcbus_index | 0 | 是否使用zcbus索引参数 | 是 |
fastest_process | 1 | 设置为开启(1)时,程序将用最快的速度装载数据,大大缩短空闲状态检测的轮询时间 | 是 |
remove_illegal_chars | 0 | 设置为开启(1)时,会过滤掉数据中的一些乱码字符,然后自动替换成? | 是 |
tableownername | ddl转换时,目标表owner的名字,pg适用 | 是 | |
max_conf_db_connection | 1 | 是否合并管理数据库链接 | 是 |
auto_compare | 0 | 是否开启事务比对服务,0 关闭此功能,1 开启此功能;如果数据经过了数据处理,如需开启此功能则数据处理节点此参数也需要开启;如需开启此功能,发布节点也需要设置auto_compare_table才可以生效 | 是 |
load_mode_for_update | 0 | 设置为开启(1)时,如果一个bsd包中全是update,并且有全列数据,则会改用load模式加载(主键冲突覆盖的方式),适用mysql | 是 |
dml_parallel_load | 1 | 设置为开启(1)时,如果表有主键或设置了appbykeys,各kafka包中的dml操作主键不同的话,可以并发装载,默认为关闭(0),不并发装载 | 是 |
audit_op_type | 审计模式时(audit_mode=1),ZCBUS_SOURCE_OPTYPE列填充的操作类型映射,格式:insert:I,delete:D,update:U,updatebefore:UB | 是 | |
use client dict | 0 | 设置为开启(1)时,可以在bus_client product_object_dict 中自定义目标表的字典 use client dict 此参数修改,需要重启服务才能生效 |
是 |
do_not_fix_insert_data | 0 | 设置为开启(1)时,如果批量insert出错,直接改成单条插入,如果单条插入报错,根据dml_error_skip之类的设置判断跳过还是报错退出 | 是 |
error_skip_keywords | 发生错误时,设置跳过包含指定关键字的错误语句,如果是dml,错误跳过检查的检查顺序是:dml_error_skip->error_skip_keywords->表的skip_dml_cnt只要错误满足上述任何一个参数,都会跳过此条操作可以设置多个错误关键字,用逗号隔开,如:ORA-00001;unique constraint关键字匹配时忽略大小写 | 是 | |
use_client_dict | 0 | 设置为开启(1)时,可以在bus_client_product_object_dict中自定义目标表的字典,此参数修改,需要重启服务才能生效 | 是 |
ZCBUS_SOURCE_USER_NAME | 0 | 是否增加源端数据库操作用户列,格式:1. 0为不添加列,1为添加列,使用ZCBUS_SOURCE_USER_NAME默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名。注:需要发布端开启parse_with_user_info参数,暂时只支持发布端为oracle的类型 | 是 |
ZCBUS_SOURCE_OS_USER_NAME | 0 | 是否增加源端操作系统用户列,格式:1. 0为不添加列,1为添加列,使用ZCBUS_SOURCE_OS_USER_NAME默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名。注:需要发布端开启parse_with_user_info参数,暂时只支持发布端为oracle的类型 | 是 |
ZCBUS_SOURCE_OPTIME_DATE | 0 | 是否添加源端commit时间信息列(只包含日期信息),格式:1. 0为不添加列,1为添加列,使用ZCBUS_SOURCE_OPTIME_DATE默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 | 是 |
ZCBUS_SOURCE_OPTIME_TIME | 0 | 是否添加源端commit时间信息列(只包含时分秒信息),格式:1. 0为不添加列,1为添加列,使用ZCBUS_SOURCE_OPTIME_TIME默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 | 是 |
ZCBUS_TARGET_OPTIME_DATE | 0 | 是否增加订阅程序插入或更新行的时间列(只包含日期信息),格式:1. 0为不添加列,1为添加列,使用ZCBUS_TARGET_OPTIME_DATE默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 | 是 |
ZCBUS_TARGET_OPTIME_TIME | 0 | 是否增加订阅程序插入或更新行的时间列(只包含时分秒信息),格式:1. 0为不添加列,1为添加列,使用ZCBUS_TARGET_OPTIME_TIME默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 | 是 |
ZCBUS_INSERT_OPTIME_DATE | 0 | 是否增加第一次插入时的操作时间列(只包含日期信息),格式:1. 0为不添加列,1为添加列,使用ZCBUS_INSERT_OPTIME_DATE默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 | 是 |
ZCBUS_INSERT_OPTIME_TIME | 0 | 是否增加第一次插入时的操作时间列(只包含时分秒信息),格式:1. 0为不添加列,1为添加列,使用ZCBUS_INSERT_OPTIME_TIME默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 | 是 |
merge_kafka_message_max_len | 0 | merge_kafka_message设置为1时,可以在这里设置merge后最大的包大小,单位M,默认是5,最小是5,最大上限根据数据库类型不同:hive: 1024;其他类型: 10,此参数修改,需要重启服务才能生效 | 是 |
update_to_insert | 0 | 设置为开启(1)时,将update转成insert装载,默认为关闭(0) | 是 |
etl_not_check_column | 0 | 设置为不检查(1)时,etl规则不做列检查,默认为检查(0) | 是 |
max_batch_bind_rows | 2048 | 一次性批量装载的最大行数,最小1,最大2048,默认2048,如果想每行数据单独装载的话,设置1注:load模式装载不生效 | 是 |
allow_change_type | 0 | 涉及到异构转换,字符长度超过上限的时候,可能会更新类型(比如MYSQL VARCHAR(3000)->ORACLE varchar(4000)可能存储不下,转成clob。有的客户不让转换,就修复成varchar(4000)就固定了) | 否 |
ZCBUS_SOURCE_ROWID | 0 | 是否增加源端rowid列,格式1. 0为不添加列,1为添加列,使用ZCBUS_SOURCE_ROWID默认列名;2. 填写除了0和1之外的值均为添加列,所填值设置为自定义列名 | 是 |
delete_topic_full_sync_end | 0 | 设置为1时,如果zcbusmq_mode不等于0,当表全量装载结束后,会删除本地保存的该表的全量数据 | 否 |
full_load_table_count_at_once | 0 | 全量时同时装载的表个数,默认为0,所有的表一起装载 | 否 |
statistics_into_file | 0 | 设置为1时,会将装载的统计信息写入到统计文件中 | 否 |
update_delete_single_row | 0 | 设置为1时,如果表没有pk/uk或手动指定主键,每条update和delete操作都只更新或删除一行,支持mysql/pg | 否 |
update_columns_complete | 0 | 设置为1时,针对update操作,after值中没有的列使用before值的列进行补全,适用于发送到kafka或者service | 否 |
partition_pattern | hive分区键和分区方式,比如分区字段是dt,按天分区的话,可以设置dt=’YYYY-MM-DD’ | 是 |
文档更新时间: 2024-01-28 22:37 作者:操李红