简介
ZCBUS平台中,ZDT【ZCBUS DATA TRANSLATE】模块,为运行在软件内部模块,为数据流提供数据过滤,转换,整理,事件触发等多种动作提供数据处理服务的组件,完全在KAFKA内部完成。
ZDT模块自动衍生与ETL组件模块,所以在系统中,ZDT组件也成为ETL组件。
ZDT配置
参考ZCBUS平台中,增加数据订阅服务管理,当选择客户端类型的时候,选择为ETL组件,即使用ZDT数据模块。
每个容器内可以含有多个ETL订阅客户端,可以实现TOPIC的一级、N级数据交换。
ZDT操作介绍
增加传输通道
将赋予ZDT订阅客户端-数据应用中的策略和传输通道相关联,合称为准备转换的列表。再在数数据表中,对每个表执行ETL相关配置。
配置说明
订阅管理-传输通道
增加传输通道标签订阅管理-数据应用菜单中
根据实际分配的策略,将策略与传输通道标签绑定
在启动之前开始进行表级别的数据过滤,转换等配置。
操作配置如下:订阅管理-数据应用-数据表-选择需要操作的列表进行ETL配置
进入配置页面:
添加规则:
说明:ETL模板:重复使用的ETL规则,可以定义成模板以后方便使用。直接引用即可 数据库配置:可以选择是否通过数据库做数据处理操作。非数据库,则全部为基础条件设置,实现数据过滤,转换等规则。如果是数据库,则使流数据可以通过某个SQL与指定数据库关联,实现数据转换规则,当与数据库关联之后,则数据交换服务将会根据数据库的关系实现更为灵活的数据处理处理机制。
数据过滤:
- 数据分流操作类型支持【INSERT/UPDATE/DELETE/ALL】
PASS(SQLTYPE=(ON=INSERT,IGNORE=UPDATE,ON=UPDATE),CONDITION=( id > 10 ))
- 此用例针对操作过程中,INSERT/DELETE按照条件过滤,UPDATE全部忽略,不进行传输 ON=? 代表支持那个那种类型的操作进行配置。INSERT 插入 UPDATE 更新 DELETE 删除 ALL所有操作
- 过滤条件支持WHERE条件组合,以及SQL语句整合
- 条件过滤
SQL语句整合PASS(SQLTYPE=(ON=INSERT,ON=UPDATE,ON=DELETE),CONDITION=(flag=1 and area='Beijing'))
注意:PASS(SQLTYPE=(ON=INSERT,ON=UPDATE,ON=DELETE),SQLCONDITION=(SELECT 1 FROM DEMO.TEST001 WHERE FLAG=#FLAG# AND AREA=#AREA#))
#ZCBUS_SCHEMA# 代表原数据用户名
#ZCBUS_TABLENAME# 代表原数据表名
- 条件过滤
数据处理:
- 复制数据转换,脱敏规则
- 以列为单位进行配置,可以添加、删除、修改、重命名列
- COLUMN_NAME为需要设置到列名
- OPTYPE为ADD/RENAME/MODIFY/DROP操作
- DATA_TYPE=STRING/NUMBER/DATE类型
- FUNCTION分为FUNCTION/SQLFUNCTION两种
- FUNCTION 为函数转换,相对比较固定,简单,易操作;支持函数嵌套组合
- SQLFUNCTION为SQL语句函数转换,可以依赖配置表,多个MAP信息转换等
注:针对SQLFUNCTION的转换,可以持久化内存,亦可以每条数据均通过SQL进行转换
- SQLFUNCTION为SQL语句函数转换,可以依赖配置表,多个MAP信息转换等
- FUNCTION 为函数转换,相对比较固定,简单,易操作;支持函数嵌套组合
- 函数列数据转换
COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,function=@strcat(@COMPUTEFLOAT(id*3.343),'-',@REPLACEALL(ob_object_id,'1','2))
- 通过SQL语句进行数据转换【支持MYSQL/ORACLE/SQLSERVER】
COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,sqlfunction=select abc ob_fundcode_0235 from ob_fund_0032 where id = #id#)
- 修改列名
COLMAP(column_name=c10,optype=rename,data_type=string,after_column_name=col6)
- 注:查询出来的列,必须和COLUMN_NAME列匹配,否则会匹配失败
- 附加:可以支持列和其他字段组合,支持数据库相关的所有的函数调用
COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,sqlfunction=select abc+#codeid# ob_fundcode_0235 from ob_fund_0032 where id = #id#)
- Optype:包括add/modify/drop/rename四种对列操作的类型
事件触发
- 根据传输的数据,支持触发类型
SQL语句、
存储过程、
SHELL、
BAT等 - 触发SQL语句【支持MYSQL/ORACLE/SQLSERVER】
COLTRIGGER(type=sql,text=insert into f003 select * from demo.test004 where id = #ob_fundcode_test_001# and objectid=#ob_object_id# and c2 = #c2# and type=‘001’)
- 触发调用存储过程
COLTRIGGER(type=procedure,text=proc_test(#c2#,#ob_fundcode_test_001#,#ob_object_id#))
- SHELL触发调用
COLTRIGGER(type=shell,text=sh /zbomc/abc/ccc.sh #ob_fundcode_test_001# #ob_object_id#)
- BAT出发调用
COLTRIGGER(type=cmd,text=d:\\adjuct.bat #codeid#)
- 注:上述触发条件中,#字段# 为输入的条件,可以根据实际情况进行参数传递
SQL规则生成TOPIC
根据设定的SQL语句,复制表为主表,形成复制规则,每条记录根据SQL语句查询结果集,将结果集再次输入到指定TOPIC中。
在多表关联的过程中,目前主要是个别表,简单操作为主。
注意:
(1)SQLFUNCTION规则中,sql语句查询结果集中,返回列必须设置别名,作为新的数据流中的列名
(2)需要指定数据库/用户/schemas名称(例如:ds.t1 a, ds.t2 b)
(3)关联的列不要改名,要不update或delete操作找不到对应列,导致执行不成功
NEWDATA(sqlfunction=(select a.id as id,a.name as name,b.dept as dept from ds.t1 a, ds.t2 b where a.id=b.id and a.id = #id#))
附加
字符串函数
@LTRIM ##取消字符串左侧空格 一个参数
@RTRIM ##取消字符串右侧空格 一个参数
@TRIM ##取消字符串两侧空格 一个参数
@UPPER ##字符串转换为大写 一个参数
@LOWER ##字符串转换为小写 一个参数
@SUBSTR ##字符串截取
语法:SUBSTR(string,start, [length])
string:表示源字符串,即要截取的字符串。
start:开始位置,从1开始查找。如果start是负数,则从string字符串末尾开始算起。
length:可选项,表示截取字符串长度。
示例:@SUBSTR('',0) result[1]: '' @SUBSTR('abcd',0) result[1]: '' @SUBSTR('abcd',1) result[4]: 'abcd' @SUBSTR('abcd',5) result[1]: '' @SUBSTR('abcd',1,2) result[2]: 'ab' @SUBSTR('abcd',1,5) result[4]: 'abcd' @SUBSTR('abcd',1,0) result[1]: '' @SUBSTR('abcd',1,-1) result[1]: '' @SUBSTR('abcd',-1) result[1]: 'd' @SUBSTR('abcd',-5) result[1]: '' @SUBSTR('abcd',-1,2) result[1]: 'd' @SUBSTR('abcd',-1,0) result[1]: '' @SUBSTR('abcd',-1,-1) result[1]: ''
@CONCAT ##链接字符串,支持多个参数,比如@CONCAT(‘1’,’2’,’3’), 最少2个参数,最多16个参数
@STRCMP ##比对两边的字符串, 两个参数, 两边字符串相同,返回结果为0,如果左边大于右边,返回1,如果左边小于右边,返回-1
示例:@STRCMP('','') result[1]: '0' @STRCMP('','asd') result[2]: '-1' @STRCMP('asd','') result[1]: '1' @STRCMP('asb','abc') result[1]: '1' @STRCMP('asb','asbf') result[2]: '-1' @STRCMP('asb','as') result[1]: '1' @STRCMP('asb','asb') result[1]: '0'
@LENGTH ##获取字符串长度 一个参数
@INSTR ##在string中查找child_string ,从start给出的数值开始在string中检索,检索在string中第show_time次出现child_string起始位置。
语法:INSTR(string,child_string,[start],[show_time]) string:表示源字符串。 child_string:子字符串,即要查找的字符串。 start:可选项,开始位置,默认从1开始。如果为负数,则从右向左检索。 show_time:可选项,表示子字符串第几次出现在源字符串当中,默认第1次,负数或0则报错。
示例:
@INSTR('abcdbc', 'bc') result[1]: '2' @INSTR('abcdbc', 'bc',0) result[1]: '0' @INSTR('abcdbc', 'bc',2) result[1]: '2' @INSTR('abcdbc', 'bc',2,1) result[1]: '2' @INSTR('abcdbc', 'bc',2,2) result[1]: '5' @INSTR('abcdbc', 'bc',2,3) result[1]: '0' @INSTR('abcdbc', 'bc',3) result[1]: '5' @INSTR('abcdbc', 'bc',3,2) result[1]: '0' @INSTR('abcdbc', 'bc',6) result[1]: '0' @INSTR('abcdbc', 'bc',8) result[1]: '0' @INSTR('abcdbc', 'bc',-1) result[1]: '5' @INSTR('abcdbc', 'bc',-2) result[1]: '5' @INSTR('abcdbc', 'bc',-2,1) result[1]: '5' @INSTR('abcdbc', 'bc',-2,2) result[1]: '2' @INSTR('abcdbc', 'bc',-2,3) result[1]: '0' @INSTR('abcdbc', 'bc',-5) result[1]: '2' @INSTR('abcdbc', 'bc',-5,2) result[1]: '0' @INSTR('abcdbc', 'bc',-8) result[1]: '0' @INSTR('abcdbc', '') result[1]: '0' @INSTR('', 'abcde') result[1]: '0'
@REPLACE ##替換字符串中匹配到的第一個字符串
语法: @REPLACE(COLSTR,SOURCESTRING,TARGETSTRING) @REPLACE(COLSTR,s1,t1,s2,t2,s3,t3,....)
@REPLACEALL ##替換字符串中匹配到所有的字符串
语法: @REPLACE(COLSTR,SOURCESTRING,TARGETSTRING) @REPLACE(COLSTR,s1,t1,s2,t2,s3,t3,....)
@RANDOMSTRING ##获取随机字符串 @RANDOMSTRING(5) 获取5位长度的随机字符串
语法:RANDOM([length]) length:可选项,指定生成随机字符串的长度,最大长度8000。如果不指定长度,则生成一个长度20以内的随机字符串。
示例:
@RANDOMSTRING() result[10]: '6KA1kJR5Cp' @RANDOMSTRING(-1) result[1]: '' @RANDOMSTRING(0) result[1]: '' @RANDOMSTRING(20) result[20]: 'I0bSY2EEf3bm06ZKTTow'
@STRINGREVERSE #内容倒序函数
示例:@STRINGREVERSE('123456789') result[9]: '987654321'
@STRSUBPOINT #字符串指定位置替换 @STRSUBPOINT(colstr,3,5,targetstr)
语法:STRSUBPOINT(string,start, end, targetstr) string:表示源字符串。 start:替换开始位置,最小为1,即从头开始替换,如果start小于1的话,自动从1开始。 end: 替换结束位置,如果小于start,则不进行替换。 targetstr:要替换的字符串。
示例:
@STRSUBPOINT('abcdefg',2,6,'tyt') result[5]: 'atytg' @STRSUBPOINT('abcdefg',3,4,'tyt') result[8]: 'abtytefg' @STRSUBPOINT('abcdefg',3,3,'tyt') result[9]: 'abtytdefg' @STRSUBPOINT('abcdefg',6,10,'tyt') result[8]: 'abcdetyt' @STRSUBPOINT('abcdefg',3,5,'') result[4]: 'abfg' @STRSUBPOINT('abcdefg',3,1,'tyt') result[7]: 'abcdefg' @STRSUBPOINT('abcdefg',-1,2,'tyt') result[8]: 'tytcdefg' @STRSUBPOINT('abcdefg',3,-1,'tyt') result[7]: 'abcdefg' @STRSUBPOINT('',1,2,'sd') result[1]: ''
数字函数
@COMPUTE ##返回计算的值 @COMPUTE(12+(5-4)/2) 支持+-/%五种通用计算方式,支持括号、函数嵌套,支持整数,浮点数等
示例:@COMPUTE(10 +20*(-2-4)%(3--6)/-2) result[21]: 16.666666666666667851 @COMPUTE(1+ @ABS(@COMPUTE(10 +20*-2-4%3--6/-2)) + 2) result[21]: 37.333333333333328596
@RANDOM ##获取随机数 @RANDOM(3) 3为长度的随机数
语法:RANDOM([length]) length:可选项,指定生成随机数的长度,最大长度20。
示例:
@RANDOM() result[10]: 1799709735 @RANDOM(-1) result[1]: 0 @RANDOM(0) result[1]: 0 @RANDOM(5) result[5]: 17997 @RANDOM(20) result[20]: 17997097357313528046 @RANDOM(25) result[20]: 17997097357313528046
@ABS(X) X的绝对值 ABS(-3)=3
@CEIL(X) 大于或等于X的最小值
示例:@CEIL(1.2) result[1]: 2 @CEIL(10) result[2]: 10 @CEIL(-1.2) result[2]: -1 @CEIL(-0.0) result[1]: 0 @CEIL(0) result[1]: 0
@FLOOR(X) 小于或等于X的最大值
示例:@FLOOR(1.2) result[1]: 1 @FLOOR(10) result[2]: 10 @FLOOR(-1.2) result[2]: -2 @FLOOR(-0.0) result[1]: 0 @FLOOR(0) result[1]: 0
@MOD(X,Y) X除以Y的余数 MOD(8,3)=2
@ROUND(X[,Y]) X在第Y位四舍五入 ROUND(3.456,2)=3.46
语法: X 要处理的数 Y 可选项,指保留几位小数,默认为0。Y可以是负数,这时是指定小数点左边的Y位整数位为0,同时小数位均为0。
示例:
@ROUND(0.0) result[1]: 0 @ROUND(-0.0) result[1]: 0 @ROUND(1.435) result[1]: 1 @ROUND(1.567) result[1]: 2 @ROUND(1.435,0) result[1]: 1 @ROUND(1.435,1) result[3]: 1.4 @ROUND(1.435,2) result[4]: 1.44 @ROUND(-1.435,3) result[6]: -1.435 @ROUND(1.435,5) result[5]: 1.435 @ROUND(1.495,2) result[4]: 1.50 @ROUND(1.995,2) result[4]: 2.00 @ROUND(-9.995,2) result[6]: -10.00 @ROUND(20.2) result[2]: 20 @ROUND(-9.995,-1) result[3]: -10 @ROUND(111.995,-1) result[3]: 110 @ROUND(-141.995,-2) result[4]: -100 @ROUND(-451.995,-2) result[4]: -500 @ROUND(-451.995,-3) result[1]: 0 @ROUND(-551.995,-3) result[5]: -1000 @ROUND(-951.995,-4) result[1]: 0
@TRUNC(X[,Y]) X在第Y位截断 TRUNC(3.456,2)=3.45
语法同@ROUND,但是不进行四舍五入,直接舍掉。
@POWER(X,Y) X的Y次幂 POWER(2,3)=8
示例:@POWER(0,0) result[1]: 1 @POWER(0,1) result[1]: 0 @POWER(1.654,0) result[1]: 1 @POWER(4,2) result[2]: 16 @POWER(4,-2) result[6]: 0.0625 @POWER(-4.678,2) result[21]: 21.883683999999998804
@SQRT(X) X的平方根 SQRT(4)=2
@LOG(X,Y) X为底Y的对数 LOG(2,4)=2
@SIN(X) X的正弦值 SIN(1)=0.84147098480789650488
@COS(X) X的余弦值 COS(1)=0.54030230586814
@ASIN(X) X的反正弦 ASIN(1)=1.570796326794896558
@ACOS(X) X的反余弦 ACOS(1)=0
时间函数
-
@DATE() result[10]: '2020-03-06'
@DATENOW ##返回系统时间 @DATENOW()
示例:@DATENOW() result[19]: '2020-03-06 10:00:44'
@DATEDIFF ##返回时间比对结果 @DATEDIFF(FIRST,SECOND,FORMAT,UNID)
第一个时间,减去第二个时间,采用FORMAT格式,返回单位为D/H/M/S 天/小时/分/秒 语法:RANDOM([length]) FIRST: 第一个时间 SECOND:第二个时间 FORMAT:FIRST和SECOND采用的时间格式,如果设置为'',默认采用'yyyy-mm-dd HH:mm:ss'的格式 UNID: 返回值的单位, D/H/M/S 天/小时/分/秒
示例:
@DATEDIFF('2020-03-07 08:58:35','2020-03-06 08:58:35','yyyy-mm-dd HH:mm:ss','d') result[1]: 1 @DATEDIFF('2020-03-07 08:58:35','2020-03-06 08:58:35','yyyy-mm-dd HH:mm:ss','h') result[2]: 24 @DATEDIFF('2020-03-07 08:58:35','2020-03-06 08:58:35','yyyy-mm-dd HH:mm:ss','m') result[4]: 1440 @DATEDIFF('2020-03-07 08:58:35','2020-03-06 08:58:35','yyyy-mm-dd HH:mm:ss','s') result[5]: 86400 @DATEDIFF('2020/03/07 08:58:35','2020/03/06 08:58:35','yyyy/mm/dd HH:mm:ss','d') result[1]: 1 @DATEDIFF('20/3/7 08:58:35','20/3/8 07:58:35','yy/m/d HH:mm:ss','d') result[1]: 0 @DATEDIFF('20/3/7 08:58:35','20/3/8 07:58:35','yy/m/d HH:mm:ss','m') result[5]: -1380 @DATEDIFF('20/3/7 08:58:35','20/3/8 09:58:35','yy/m/d HH:mm:ss','d') result[2]: -1 @DATEDIFF('2020-03-07 08:58:35','2020-03-08 08:58:35','','d') result[2]: -1
@DATEADD ##时间加减 @DATEADD(SOURCE_TIME,INTERVAL)
源时间增加加时间间隔,返回新时间,返回格式为’yyyy-mm-dd HH:mm:ss’
语法:
SOURCE_TIME: 源时间,采用’yyyy-mm-dd HH:mm:ss’的格式
INTERVAL: 时间间隔,支持间隔类型DAY、HOUR、MINUTE、SECOND,间隔可为负数示例:
@DATEADD('2022-01-31 23:58:35', 'INTERVAL 1 HOUR') result[19]: '2022-02-01 00:58:35' @DATEADD('2022-01-31 00:58:35', 'INTERVAL -1 HOUR') result[19]: '2022-01-30 23:58:35' @DATEADD('2022-03-31 08:59:59', 'INTERVAL 1 MINUTE') result[19]: '2022-03-31 09:00:59' @DATEADD('2022-03-31 08:58:59', 'INTERVAL 12 SECOND') result[19]: '2022-03-31 08:59:11' @DATEADD('2023-02-16 11:08:02', 'INTERVAL 365 DAY') result[19]: '2024-02-16 11:08:02'