一、介绍
ZCBUS平台中,ZDT【ZCBUS DATA TRANSLATE】模块,为运行在软件内部模块,为数据流提供数据过滤,转换,整理,事件触发等多种动作提供数据处理服务的组件,完全在KAFKA内部完成。
ZDT模块自动衍生与ETL组件模块,所以在系统中,ZDT组件也成为ETL组件。
数据处理加工说明:
二、ZDT配置
参考ZCBUS平台中,增加数据订阅服务管理,当选择客户端类型的时候,选择为数据加工客户端,即使用ZDT数据模块。
每个容器内可以含有多个数据加工客户端,可以实现TOPIC的一级、N级数据交换。
三、ZDT操作介绍
1、加工管理
- 加工管理:添加、删除、修改加工客户端
- 数据库管理:在进行数据加工时可以选择是否通过数据库做数据处理操作。非数据库,则全部为基础条件设置,实现数据过滤,转换等规则。如果是数据库,则使流数据可以通过某个SQL与指定数据库关联,实现数据转换规则,当与数据库关联之后,则数据交换服务将会根据数据库的关系实现更为灵活的数据处理处理机制。
- 表属性自动更新:
- 统计:查看订阅通道当中表加工的状态
2、通道配置说明
将赋予数据加工客户端-数据应用中的策略和主控加工通道相关联,合称为准备转换的列表。再在数数据表中,对每个表执行ETL相关配置。
- 配置:将策略与加工通道关联,再在数据表中,对每个表执行ETL相关规则配置
- 新增:当某一通道内处理的表过多时可能会造成数据延时过大,可在该策略下新增多个通道,用以分担压力,新增的通道共用同一通道属性参数
- 解绑:将策略与加工通道解除关联,如需做此操作,请确认加工表的状态已重置,且通道处于停止状态
3、通道操作说明
- 属性:数据库和数据应用策略之间桥梁连接作用
- 数据应用过程中,多线程策略管理服务
- 数据应用过程中,针对整个策略中的数据,在通道层,可以整体配置表附加信息
- 加载过程中,DDL的控制粒度,是否执行DDL等操作
- 加载过程中,是否增加辅助列:合并分区列数据、源端生产操作提交时间、数据最后操作类型、目标端数据加载时间、数据首次插入到目标端需要的时间字段。
- 数据应用过程中,针对整个策略中的数据,在通道层,可以整体配置表附加信息
- 数据加载的过程中,是否完全过滤DDl操作等
- 控制全量、增量数据加载方式
- 数据表:对需要加工的数据表进行加工规则配置及开启数据加工等操作
- 操作类型:操作数据表开启全量订阅及恢复、暂停的状态
- 配置类型:更改数据表的表名大小写及表名、库名等信息
- ETL配置:可对数据表添加数据加工的规则,如数据过滤,数据处理等
- ETL配置SQL关联::
- 通道转移:当同一策略内存在多个加工通道时,可使用此功能将数据表表转移到其他加工通道中
- 异常信息批处理:针对通道可出现多个加工表异常时,可对异常的表进行批量处理
- 监控:监控加工数据表的全量及增量加工状态
- 平衡:当策略内存在多个加工通道时,可根据加工表的繁忙、延时等信息平衡加工通道内的表数量
四、转换规则介绍
数据过滤:
- 数据分流操作类型支持【INSERT/UPDATE/DELETE/ALL】
PASS(SQLTYPE=(ON=INSERT,IGNORE=UPDATE,ON=UPDATE),CONDITION=( id > 10 ))
- 此用例针对操作过程中,INSERT/DELETE按照条件过滤,UPDATE全部忽略,不进行传输 ON=? 代表支持那个那种类型的操作进行配置。INSERT 插入 UPDATE 更新 DELETE 删除 ALL所有操作
- 过滤条件支持WHERE条件组合,以及SQL语句整合
- 条件过滤
SQL语句整合PASS(SQLTYPE=(ON=INSERT,ON=UPDATE,ON=DELETE),CONDITION=(flag=1 and area='Beijing'))
注意:PASS(SQLTYPE=(ON=INSERT,ON=UPDATE,ON=DELETE),SQLCONDITION=(SELECT 1 FROM DEMO.TEST001 WHERE FLAG=#FLAG# AND AREA=#AREA#))
#ZCBUS_SCHEMA# 代表原数据用户名
#ZCBUS_TABLENAME# 代表原数据表名
- 条件过滤
数据处理:
- 复制数据转换,脱敏规则
- 以列为单位进行配置,可以添加、删除、修改、重命名列
- COLUMN_NAME为需要设置到列名
- OPTYPE为ADD/RENAME/MODIFY/DROP操作
- DATA_TYPE=STRING/NUMBER/DATE类型
- FUNCTION分为FUNCTION/SQLFUNCTION两种
- FUNCTION 为函数转换,相对比较固定,简单,易操作;支持函数嵌套组合
- SQLFUNCTION为SQL语句函数转换,可以依赖配置表,多个MAP信息转换等
注:针对SQLFUNCTION的转换,可以持久化内存,亦可以每条数据均通过SQL进行转换
- SQLFUNCTION为SQL语句函数转换,可以依赖配置表,多个MAP信息转换等
- FUNCTION 为函数转换,相对比较固定,简单,易操作;支持函数嵌套组合
- 函数列数据转换
COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,function=@strcat(@COMPUTEFLOAT(id*3.343),'-',@REPLACEALL(ob_object_id,'1','2))
- 通过SQL语句进行数据转换【支持MYSQL/ORACLE/SQLSERVER】
COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,sqlfunction=select abc ob_fundcode_0235 from ob_fund_0032 where id = #id#)
- 修改列名
COLMAP(column_name=c10,optype=rename,data_type=string,after_column_name=col6)
- 注:查询出来的列,必须和COLUMN_NAME列匹配,否则会匹配失败
- 附加:可以支持列和其他字段组合,支持数据库相关的所有的函数调用
COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,sqlfunction=select abc+#codeid# ob_fundcode_0235 from ob_fund_0032 where id = #id#)
- Optype:包括add/modify/drop/rename四种对列操作的类型
事件触发
- 根据传输的数据,支持触发类型
SQL语句、
存储过程、
SHELL、
BAT等 - 触发SQL语句【支持MYSQL/ORACLE/SQLSERVER】
COLTRIGGER(type=sql,text=insert into f003 select * from demo.test004 where id = #ob_fundcode_test_001# and objectid=#ob_object_id# and c2 = #c2# and type=‘001’)
- 触发调用存储过程
COLTRIGGER(type=procedure,text=proc_test(#c2#,#ob_fundcode_test_001#,#ob_object_id#))
- SHELL触发调用
COLTRIGGER(type=shell,text=sh /zbomc/abc/ccc.sh #ob_fundcode_test_001# #ob_object_id#)
- BAT出发调用
COLTRIGGER(type=cmd,text=d:\\adjuct.bat #codeid#)
- 注:上述触发条件中,#字段# 为输入的条件,可以根据实际情况进行参数传递
SQL规则生成TOPIC
根据设定的SQL语句,复制表为主表,形成复制规则,每条记录根据SQL语句查询结果集,将结果集再次输入到指定TOPIC中。
在多表关联的过程中,目前主要是个别表,简单操作为主。
注意:
(1)SQLFUNCTION规则中,sql语句查询结果集中,返回列必须设置别名,作为新的数据流中的列名
(2)需要指定数据库/用户/schemas名称(例如:ds.t1 a, ds.t2 b)
(3)关联的列不要改名,要不update或delete操作找不到对应列,导致执行不成功
NEWDATA(sqlfunction=(select a.id as id,a.name as name,b.dept as dept from ds.t1 a, ds.t2 b where a.id=b.id and a.id = #id#))
五、加工通道参数一览表
变量 | 值 | 描述 | 是否隐藏 |
---|---|---|---|
insert_parallel_load | 1 | 目标端为数据库时,可以控制对于同一张表的insert操作是否可以并发装载,默认为开启(1),可以并发装载,设置为关闭(0)时,取消并发装载 | 是 |
dml_parallel_load | 1 | 设置为开启(1)时,如果表有主键或设置了appbykeys,各kafka包中的dml操作主键不同的话,可以并发装载,默认为关闭(0),不并发装载 | 是 |
kafka_consume_timeout | 1000 | 消费KAFKA集群中消费超时时间,单位毫秒,默认5000毫秒,最小为1000毫秒。设置小于1000毫秒,则设置参数失效 | 是 |
max_consume_message | 64 | 设置一次消费的最大消息数,默认32,最大64,最小为装载线程数 | 是 |
max_message_size | 5242880 | 客户端装载目标是kafka、rocketmq、rabbitmq时,发送的最大的消息长度,单位是字节,默认是5242880,设置为0的话,每行数据都会拆分成单条发送 | 是 |
merge_kafka_message | 1 | 默认0,单包加载处理,设置为1时,增量消费时同一个表连续的小包会进行合并,目前是insert和delete的小包会合并,最多合并成5M | 是 |
no_message_restart_interval | 0 | 可以设置当一段时间内一直消费不到数据,重启通道进程,单位秒,设置重启间隔为0则不会重启进程,最小间隔为60 | 是 |
ignore_full_tag_in_real | 0 | etl订阅客户端专属参数,设置为开启(1)时,etl消费增量数据时忽略掉全量控制信息,不会自动转为消费全量数据,并且全量发布过程中的表也可以进行etl,默认为关闭(0) | 是 |
auto_choose_columns_as_pk | 0 | 设置为开启(1)时,为没有主键的表自动选出可作为主键的列(不会自动建主键,只是作为delete和update的where条件),表有pk/uk或者设置applybykeys时,此参数在该表不起作用,目前支持目标库为oracle/mysql/sqlserver/postgresql/opengauss/dm。 自动选择基本的数字类型和字符串类型(除lob和二进制类型)列作为主键列(目标表的列类型) | 否 |
full_subscribe_first | 1 | 设置为开启(1)时,增量订阅第一次启动时,会先检查所有表的订阅状态,没有需要全量订阅的表时,才会启动增量 | 是 |
update_remove_after_pk | 0 | 设置为开启(1)时,如果update没有修改主键的话,去掉after值中的主键再做update,如果修改了主键的话,进行先删后插,适用于华为DWS数据库等不能更新主键的情况 | 是 |
temp_table_mode | 0 | 设置为开启(1)时,etl开启临时表模式 | 是 |
auto_compare | 0 | 是否开启事务比对服务,0 关闭此功能,1 开启此功能;如果数据经过了数据处理,如需开启此功能则数据处理节点此参数也需要开启;如需开启此功能,发布节点也需要设置auto_compare_table才可以生效 | 是 |
full_subscribe_only | 0 | 默认关闭(0),订阅全量完成,转成增量;设置为开启(1)时,只进行全量订阅 | 是 |
filter_trans_table | 订阅客户端在每次交易开始时先update表zcbus.zcbus_sys_trans,再装数据,zcbus-real分析日志时,发现交易的第一条操作是zcbus.zcbus_sys_trans表的操作,就知道是订阅客户端做的交易了,会过滤掉整个交易。订阅客户端需要是sql绑定模式或者sql语句模式。MYSQL 支持LOAD模式,pg不支持copy模式 | 是 | |
use_zcbus_index | 0 | 是否使用zcbus索引参数 | 是 |
fastest_process | 1 | 设置为开启(1)时,程序将用最快的速度装载数据,大大缩短空闲状态检测的轮询时间 | 是 |
remove_illegal_chars | 0 | 设置为开启(1)时,会过滤掉数据中的一些乱码字符,然后自动替换成? | 是 |
allow_change_type | 0 | 涉及到异构转换,字符长度超过上限的时候,可能会更新类型(比如MYSQL VARCHAR(3000)->ORACLE varchar(4000)可能存储不下,转成clob。有的客户不让转换,就修复成varchar(4000)就固定了) | 否 |
delete_topic_full_sync_end | 0 | 设置为1时,如果zcbusmq_mode不等于0,当表全量装载结束后,会删除本地保存的该表的全量数据 | 否 |
full_load_table_count_at_once | 0 | 全量时同时装载的表个数,默认为0,所有的表一起装载 | 否 |
statistics_into_file | 0 | 设置为1时,会将装载的统计信息写入到统计文件中 | 否 |
update_delete_single_row | 0 | 设置为1时,如果表没有pk/uk或手动指定主键,每条update和delete操作都只更新或删除一行,支持mysql/pg | 否 |
update_columns_complete | 0 | 设置为1时,针对update操作,after值中没有的列使用before值的列进行补全,适用于发送到kafka或者service | 否 |
max_conf_db_connection | 1 | 是否合并管理数据库链接 | 是 |
etl_not_check_column | 0 | 设置为不检查(1)时,etl规则不做列检查,默认为检查(0) | 是 |
etl_change_table_name | 1 | 设置为关闭(0)时,etl处理时不替换原数据中的schema和table,默认为开启(1) | 是 |
文档更新时间: 2024-03-26 03:42 作者:操李红