简介

 ZCBUS平台中,ZDT【ZCBUS DATA TRANSLATE】模块,为运行在软件内部模块,为数据流提供数据过滤,转换,整理,事件触发等多种动作提供数据处理服务的组件,完全在KAFKA内部完成。
ZDT模块自动衍生与ETL组件模块,所以在系统中,ZDT组件也成为ETL组件。

数据处理加工说明

数据处理12345ZCBUS数据缓存区数据加工服务SDK服务数据加工服务数据过滤数据加工事件触发SQL关联数据缓存区其他源数据源SDK服务发布服务数据订阅SDK目标

ZDT配置

 参考ZCBUS平台中,增加数据订阅服务管理,当选择客户端类型的时候,选择为ETL组件,即使用ZDT数据模块。

 每个容器内可以含有多个ETL订阅客户端,可以实现TOPIC的一级、N级数据交换。

ZDT操作介绍

 增加传输通道

 将赋予ZDT订阅客户端-数据应用中的策略和传输通道相关联,合称为准备转换的列表。再在数数据表中,对每个表执行ETL相关配置。

配置说明

  • 订阅管理-传输通道
      增加传输通道标签

  • 订阅管理-数据应用菜单中
      根据实际分配的策略,将策略与传输通道标签绑定

    在启动之前开始进行表级别的数据过滤,转换等配置。
    操作配置如下:

  • 订阅管理-数据应用-数据表-选择需要操作的列表进行ETL配置

    进入配置页面:

    添加规则:
    说明:

      ETL模板:重复使用的ETL规则,可以定义成模板以后方便使用。直接引用即可
      数据库配置:可以选择是否通过数据库做数据处理操作。非数据库,则全部为基础条件设置,实现数据过滤,转换等规则。如果是数据库,则使流数据可以通过某个SQL与指定数据库关联,实现数据转换规则,当与数据库关联之后,则数据交换服务将会根据数据库的关系实现更为灵活的数据处理处理机制。

数据过滤:

  • 数据分流操作类型支持【INSERT/UPDATE/DELETE】
     PASS(SQLTYPE=(ON=INSERT,IGNORE=UPDATE,ON=DELETE),CONDITION=( id > 10 ))
  • 此用例针对操作过程中,INSERT/DELETE按照条件过滤,UPDATE全部忽略,不进行传输 ON=? 代表支持那个那种类型的操作进行配置。INSERT 插入 UPDATE 更新 DELETE 删除 ALL所有操作
  • 过滤条件支持WHERE条件组合,以及SQL语句整合
    • 条件过滤
      PASS(SQLTYPE=(ON=INSERT,ON=UPDATE,ON=DELETE),CONDITION=(flag=1 and area='Beijing'))
      SQL语句整合
      PASS(SQLTYPE=(ON=INSERT,ON=UPDATE,ON=DELETE),SQLCONDITION=(SELECT 1 FROM DEMO.TEST001 WHERE FLAG=#FLAG# AND AREA=#AREA#))
      注意:
      #ZCBUS_SCHEMA# 代表原数据用户名
      #ZCBUS_TABLENAME# 代表原数据表名

数据处理:

  • 复制数据转换,脱敏规则
    • 以列为单位进行配置,可以添加、删除、修改、重命名列
    • COLUMN_NAME为需要设置到列名
    • OPTYPE为ADD/RENAME/MODIFY/DROP操作
    • DATA_TYPE=STRING/NUMBER/DATE类型
    • FUNCTION分为FUNCTION/SQLFUNCTION两种
      • FUNCTION 为函数转换,相对比较固定,简单,易操作;支持函数嵌套组合
        • SQLFUNCTION为SQL语句函数转换,可以依赖配置表,多个MAP信息转换等
          注:针对SQLFUNCTION的转换,可以持久化内存,亦可以每条数据均通过SQL进行转换
  • 函数列数据转换
    COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,function=(@strcat(@COMPUTEFLOAT(id*3.343),'-',@REPLACEALL(ob_object_id,'1','2')))
  • 通过SQL语句进行数据转换【支持MYSQL/ORACLE/SQLSERVER】
    COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,sqlfunction=(select abc ob_fundcode_0235 from ob_fund_0032 where id = #id#))
  • 修改列名
    COLMAP(column_name=c10,optype=rename,data_type=string,after_column_name=col6)
  • 注:查询出来的列,必须和COLUMN_NAME列匹配,否则会匹配失败
  • 附加:可以支持列和其他字段组合,支持数据库相关的所有的函数调用
    COLMAP(column_name=ob_fundcode_0235,optype=modify,data_type=string,sqlfunction=(select abc+#codeid# ob_fundcode_0235 from ob_fund_0032 where id = #id#))
  • Optype:包括add/modify/drop/rename四种对列操作的类型

事件触发

  • 前期过滤,在事件操作过滤等操作完成之后,是否需要透传数据的问题,INSERT/UPDATE/DELETE,以及UPDATE操作的BEFORE/AFTER等操作,均可以单独配置
    PASS(SQLTYPE=(IGNORE=INSERT,IGNORE=UPDATE,IGNORE=UPDATEBEFORE,IGNORE=UPDATEAFTER,IGNORE=DELETE))

  • 根据传输的数据,支持触发类型
    SQL语句、
    存储过程、
    SHELL、
    BAT等
  • 触发SQL语句【支持MYSQL/ORACLE/SQLSERVER】
    TRIGGER(type=sql,text=(insert into f003 select * from demo.test004 where id = #ob_fundcode_test_001# and objectid=#ob_object_id# and c2 = #c2# and type=‘001’))
  • 触发调用存储过程
    TRIGGER(type=procedure,text=(call proc_test(#c2#,#ob_fundcode_test_001#,#ob_object_id#)))
  • SHELL触发调用
    TRIGGER(type=shell,text=(sh /zbomc/abc/ccc.sh #ob_fundcode_test_001# #ob_object_id#))
  • BAT出发调用
    TRIGGER(type=cmd,text=(d:\\adjuct.bat #codeid#))
  • 注:上述触发条件中,#字段# 为输入的条件,可以根据实际情况进行参数传递

  • 过滤操作(事件触发之后,后边所有的操作均不透传)

    TRIGGER(type=procedure,text=(call proc_test(#c2#,#ob_fundcode_test_001#,#ob_object_id#))),
    PASS(SQLTYPE=(IGNORE=INSERT,IGNORE=UPDATE,IGNORE=UPDATEBEFORE,IGNORE=UPDATEAFTER,IGNORE=DELETE))

SQL规则生成TOPIC

根据设定的SQL语句,复制表为主表,形成复制规则,每条记录根据SQL语句查询结果集,将结果集再次输入到指定TOPIC中。
在多表关联的过程中,目前主要是个别表,简单操作为主。

注意:
(1)SQLFUNCTION规则中,sql语句查询结果集中,返回列必须设置别名,作为新的数据流中的列名
(2)需要指定数据库/用户/schemas名称(例如:ds.t1 a, ds.t2 b)
(3)关联的列不要改名,要不update或delete操作找不到对应列,导致执行不成功

NEWDATA(sqlfunction=(select a.id as id,a.name as name,b.dept as dept from ds.t1 a, ds.t2 b where a.id=b.id and a.id = #id#))

文档更新时间: 2023-10-22 14:45   作者:操李红