订阅TCP/IP APIKAFKA

TOPIC命名规则设置

  • 增量TOPIC
    • TOPIC名字为BUS_CLIENT_PRODUCT_OBJECT中对应的TARGETTABLESCHEMA.TARGETTABLENAME组成TOPIC名字和分区名【此时,targettablename必须是数字,如果不是数字,则将分区设置为0号分区】

注:后续所有的TOPIC设置规则为TOPIC_NAME(topic名字):PARTITION_NAME(数字)

  • 全量TOPIC

    1. 当设置BUS_CLIENT_PRODUCT_OBJECT中full_topic,依赖于full_topic对应的参数
    2. 没有设置full_topic,将依赖于增量TOPIC设置到名字作为默认TOPIC名字
      注: TOPIC优先顺序 表full_topic > 表real_topic
  • 是否启用对账TOPIC设置

    • 依赖于zcbus_other中对应的full_check、real_check参数,如果设置为1,则增量和全量TOPIC会生效
  • 增量对账TOPIC

    • 表属性参数中chk_real_topic,依赖于这个参数,如果没有设置,对账参数依赖于chk_topic参数,如果均为参数,可参考ZCBUS_OTHER中的chk_real_topic参数。
    • 如果参数均为设置,则使用默认TOPIC:chk_real_topic
      注: 即优先级顺序表chk_real_topic > 表chk_topic > zcbus_other中的chk_real_topic > zcbus_other中的chk_topic
  • 全量对账TOPIC

    • 表属性参数中chk_full_topic,依赖于这个参数,如果没有设置,对账参数依赖于chk_topic参数,如果均为参数,可参考ZCBUS_OTHER中的chk_full_topic参数。
    • 如果参数均为设置,则使用默认TOPIC:chk_full_topic
      注: 即优先级顺序表chk_full_topic > 表chk_topic > zcbus_other中的chk_full_topic > zcbus_other中的chk_topic

关键字配置说明{zcbus_other} 配置

{
    "keymap":{    #####JSON中关键字翻译
        "loaderTime":"SEND_TIME",         ### 发送时间
        "send_type":"SEND_TYPE",          ### 发送类型
        "columntype":"COLUMN_TYPE",       ### 列类型
        "startTime":"START_TIME",         ### 开始时间
        "stopTime":"END_TIME",            ### 结束时间
        "optype":"OPERATION_TYPE",        ### 操作类型
        "db_type":"DB_TYPE",              ### 数据库类型
        "op_time":"OPERATION_TIME",       ### 源端提交时间
        "db_name":"SCHEMA_NAME",          ### 源端DB名字
        "batch_id":"RECONCILIATION_ID",   ### 发送时间 批次号
        "table_name":"TABLE_NAME",        ### 源端表名
        "insert_count":"INSERT_COUNT",    ### 插入统计关键字
        "update_count":"UPDATE_COUNT",    ### 更新统计关键字
        "delete_count":"DELETE_COUNT",    ### 删除统计关键字
        "ddl_count":"DDL_COUNT",          ### DDL统计关键字
        "schema_name":"SCHEMA_NAME",      ### 源端SCHEMA名字
        "before":"BEFORE",                ### 更新中BEFORE关键字
        "after":"AFTER",                  ### 更新中AFTER关键字
        "primarykey":"PRIMARY_KEY",       ### 主键列关键字
        "columntype":"COLUMN_TYPE",       ### 数据类型关键字
        "values":"SEND_DATA",             ### 删除、插入关键字(20221110-废弃,insert替换为after,delete更新为before)
        "records":"SEND_RECORD",          ### 发送记录书统计关键字
        "ownerTable":"table",            ### 对账batchid信息
        "check_batch_id":"ID",            ### 对账batchid信息
        "oper_count":"OPER_COUNT"         ### 对账统计数量关键字
    },
    "opermap":{
        "fullload":"R",                     ###是否包含全量标签信息,自定义标签
        "insert":"I",                       ###增量数据标签,默认insert
        "update":"U",                       ###增量数据标签,默认update
        "delete":"D",                       ###增量数据标签,默认delete
        "ddl":"DDL"                         ###增量数据标签,默认ddl
    },
    "loaderTime":1,                         ### 是否增加发送时间
    "rid":1,                                ### 标签
    "useupdatemark":1,                      ### 
    "sendtype":1,                           ### 发送类型,批量发送/单条发送
    "ddl":0,                                ### 
    "updatetodeleteandinsert":0,            ### 增量JSON中,更新操作是否更新为DELETE/INSERT操作
    "ColNameCase":0,                        ### 列名大小写配置,0不变,1小写,2大写
    "columnType":1,                         ### JSON中是否含有数据类型列
    "primaryKey":1,                         ### JSON中是否含有主键信息
    "sendHeader":{                          ### 头部补充信息,可以自定义,分为固定值和变量两种模型
        "provcode":"fix:834",               ### fix 代表固定值,provcode 赋予固定值为834
        "srccode":"fix:MSS",                ### 
        "dataacct":"var:CURRENT_DATE(YYYYmmdd)", ### var代表变量,后边支持current_date获取时间关键字,括号内为返回当前时间格式
        "batchnum":"var:NUM(2)",            ### 获取两位数字编码批次号,编码在batchCode关键字中获取
        "filename":""
    },
    "batchCode":1,                          ### 是否生成批次号信息
    "ifddltodict":1,                        ### 在执行DDL的时候,将ddl发送修改为表字典信息推送
    "sendbatch":0,                          ### 是否发送批量对账
    "real_check":1,                         ### 是否进行增量对账
    "full_check":1,                         ### 是否进行全量对账
    "module_check":"check_basic",           ### 数据对账模型
                                            ###check_basic
                                            ###tel_bj_module
    "ifIncludesendDate":0,                  ### 是否发送当天日期,格式yyyyMMdd
    "ifIncludeSend_type":1,                 ### 是否生效Send_type关键字
    "ifIncludeSchemaName":1,                ### 是否包含模式标签
    "ifIncludeTableName":1,                 ### 是否包含表名标签
    "ifIncludeOwnerTable":1,                ### 是否打开ownertable标签,会生成ownerTable标签,值为so1.tb01
    "ifIncludeoptime":0,                    ### 是否包含源端提交时间标签
    "ifIncludedbtype":0,                    ###是否包含源端数据库类型标签
    "ifIncludeprimaryKey":1,                ### 是否包含主键信息(订阅DDL解析生成主键信息)
    "ifIncludeprimaryKeyV2":1,              ### 是否包含主键信息,依赖发布自动生成的主键信息,参考bus_push_table_index表
    "ifIncludeGlobalSequence":0,            ### 是否生成全局序列号,每条记录生成一条唯一值
    "ifIncludeSequence":0,                  ### 是否生成表级别序列号,每个表每条记录生成一条唯一值
    "ifInclueUpdateBefore":1,               ### 更新操作是否包含before数据
    "ifIncludeAppendUpdateBeforeColumnsToNull":1,  ### 将before数据追加到after数据中不存在的列数据信息中去
    "ifIncludeSourceHostPort":1,            ### 是否包含源端IP地址和端口号
    "ifIncludeSharedingKey":1,              ### 是否包含Shareding 分片信息
    "ifIncludeCheckZeroData":0,             ### 对账信息,如果都是0,是否发送,0不发送,1发送对账信息
    "ifTableNameCase":0,                    ### 表名大小写,0 不变,1小写,2大写
    "LoaderTimeFormat":"yyyy-MM-dd HH:mm:ss", ### 发送时间格式,默认为yyyy-MM-dd HH:mm:ss格式,可以自定义,如yyyy-MM-dd'T'HH:mm:ss
    "skipddl":1,                            ### 是否跳过DDL
    "chk_real_topic":"SAPSR3_INC_CHECK:0",  ### 增量对账TOPIC
    "chk_full_topic":"SAPSR3_CHECK:0",      ### 全量对账TOPIC
    "chk_topic":"chk_topic_01",             ### 对账TOPIC
    "ifIncludecolumnType":"1",              ### 是否包含数据类型列
    "ifKeyValueCase":"0"                    ### 参数:控制keymap设置的value大小写(key的大小写,直接设置即可),0不变,1小写,2大写
    "jsontype":"json_module"                ### json格式  zcbus、debezium、json_module 等格式
}

check_basic

{
    "filename":"",                  ### sendHeader 头部信息 
    "dataacct":"20222827",          ### sendHeader 头部信息
    "provcode":"834",               ### sendHeader 头部信息
    "srccode":"MSS",                ### sendHeader 头部信息
    "batchnum":"02",                ### sendHeader 头部信息
    "START_TIME":"2022-07-27 22:28:08", ### 对账启动时间
    "SEND_TYPE":"CHECK",                ### 对账标签类型
    "END_TIME":"2022-07-27 22:28:08",   ### 本轮对账结束时间
    "SEND_TIME":"2022-07-27 22:28:08",  ### 发送时间
    "SEND_DATA":[                      ### 发送统计内容
        {
            "OPER_COUNT":1,                   ### 对账操作总数总计
            "SCHEMA_NAME":"mm",               ### 对账schema名字
            "UPDATE_COUNT":0,                 ### 更新统计总数
            "TABLE_NAME":"tb01",              ### 表名
            "DDL_COUNT":0,                    ### ddl统计总数
            "INSERT_COUNT":1,                 ### 插入统计总数
            "DELETE_COUNT":0                  ### 删除统计总数
        }
    ],                           
    "OPERATION_TIME":"2022-07-27 16:37:03",#### 生产最后提交时间
    "ID":"20220727(22)",                #### 对账批次号
    "SEND_RECORD":1                     #### 对账总记录数
}

jsontype=json_module,demo记录

  • 测试表

    create table test.test123 (id number primary key,col1 varchar2(10),col2 varchar2(10),col3 varchar2(10),col4 varchar2(10),update_time timestamp);
    insert into test.test123 values(1,'test1','test2','test3','test4',sysdate);
    commit;
    update  test.test123 set col1='aaa',col2='bbb',update_time=sysdate;
    commit;
    delete test.test123;
    commit;
    truncate table test.test123 ;
  • zcbus_other

    {
      "keymap":{
          "loaderTime":"loadertime",
          "send_type":"send_type",
          "columntype":"columntype",
          "startTime":"starttime",
          "stopTime":"stoptime",
          "optype":"optype",
          "db_type":"db_type",
          "op_time":"op_time",
          "db_name":"db_name",
          "batchCode":"batchcode",
          "table_name":"table_name",
          "insert_count":"insert_count",
          "update_count":"update_count",
          "delete_count":"delete_count",
          "ddl_count":"ddl_count",
          "schema_name":"schema_name",
          "before":"before",
          "after":"after",
          "primarykey":"primarykey",
          "values":"values",
          "records":"records",
          "oper_count":"oper_count",
          "sendDate":"senddate",
          "shardingKey":"shardingkey"
      },
      "opermap":{
          "fullload":"R",
          "insert":"I",
          "update":"U",
          "delete":"D",
          "ddl":"DDL",
          "real_check":"RC",
          "full_check":"FC"
      },
      "loaderTime":1,
      "rid":1,
      "useupdatemark":1,
      "sendtype":1,
      "ddl":0,
      "updatetodeleteandinsert":0,
      "ColNameCase":1,
      "ifIncludecolumnType":1,
      "ifIncludeprimaryKey":1,
      "sendHeader":{
          "provcode":"fix:834",
          "srccode":"fix:MSS",
          "dataacct":"var:CURRENT_DATE(YYYYmmdd)",
          "batchnum":"var:NUM(2)",
          "filename":""
      },
      "ifIncludebatchCode":0,
      "ifIncludesendDate":1,
      "ifIncludeCheckBatchCode":0,
      "ifIncludeSharedingKey":1,
      "sendbatch":0,
      "real_check":1,
      "full_check":0,
      "module_check":"tel_check_v2",
      "ifddltodict":0,
      "ifIncludeCheckDateRange":0,
      "ifIncludeSend_type":0,
      "ifIncludeoptime":0,
      "ifIncludedbtype":1,
      "ifIncludeSchemaName":0,
      "ifSupplementUpdateAfter":1,
      "ifInclueUpdateBefore":0,
      "ifIncludeSourceHostPort":0,
      "ifIncludeShardingSchemaTable":0,
      "ifIncludeCheckZeroData":0,
      "skipddl":0,
      "jsontype":"json_module"
    }
  • INSERT操作

    {"loadertime":"2024-03-27 11:48:27","filename":"","dataacct":"20244427","provcode":"834","srccode":"MSS","batchnum":"0","db_type":"oracle","columntype":{"update_time":"91","col4":"12","id":"3","col2":"12","col3":"12","col1":"12"},"primarykey":{"id":"1"},"shardingkey":"45d405b4843610b90a0d788a352df36e","senddate":"20240327","table_name":"TEST123","optype":"I","values":{"id":"1","col1":"test1","col2":"test2","col3":"test3","col4":"test4","update_time":"2024-03-27 11:48:14"}}
  • UPDATE操作

    {"loadertime":"2024-03-27 11:48:27","filename":"","dataacct":"20244427","provcode":"834","srccode":"MSS","batchnum":"0","db_type":"oracle","columntype":{"update_time":"91","col4":"12","id":"3","col2":"12","col3":"12","col1":"12"},"primarykey":{"id":"1"},"shardingkey":"45d405b4843610b90a0d788a352df36e","senddate":"20240327","table_name":"TEST123","optype":"U","after":{"col1":"aaa","col2":"bbb","update_time":"2024-03-27 11:48:15","id":"1","col3":"test3","col4":"test4"}}
  • DELETE操作

    {"loadertime":"2024-03-27 11:48:27","filename":"","dataacct":"20244427","provcode":"834","srccode":"MSS","batchnum":"0","db_type":"oracle","columntype":{"update_time":"91","col4":"12","id":"3","col2":"12","col3":"12","col1":"12"},"primarykey":{"id":"1"},"shardingkey":"45d405b4843610b90a0d788a352df36e","senddate":"20240327","table_name":"TEST123","optype":"D","values":{"id":"1","col1":"aaa","col2":"bbb","col3":"test3","col4":"test4","update_time":"2024-03-27 11:48:15"}}
  • DDL操作

    {"loadertime":"2024-03-27 11:50:16","filename":"","dataacct":"20244427","provcode":"834","srccode":"MSS","batchnum":"0","db_type":"oracle","columntype":{"update_time":"91","col4":"12","id":"3","col2":"12","col3":"12","col1":"12"},"primarykey":{},"shardingkey":"45d405b4843610b90a0d788a352df36e","senddate":"20240327","table_name":"TEST123","optype":"ddl","values":"truncate table test.test123;"}

jsontype=simple_json,demo记录 #20240613支持

  • 支持simple_json类型配置(单层json)

  • 仅支持insert操作/update after操作(相关列将从before中补充after不存在的列)

  • 支持添加参数
    db_type 数据库类型 控制参数:ifIncludedbtype
    optype 操作类型 控制参数:ifIncludeOptype 新增参数
    schema_name 模式名 控制参数:ifIncludeSchemaName
    op_time 源端操作时间 控制参数:ifIncludeoptime
    table_name 表名 控制参数:ifIncludeTableName
    batchCode 批量导出编号 控制参数:ifIncludebatchCode
    loaderTime 系统加载时间 控制参数:ifIncludeloaderTime
    ownerTable 用户名.表名 控制参数:ifIncludeOwnerTable
    过滤DDL配置 控制参数:ifddltodict

  • 测试表

  • zcbus_other (添加zcbus_前缀,防止和业务表字段冲突)

    {
      "keymap":{
          "loaderTime":"zcbus_loadertime",
          "send_type":"zcbus_send_type",
          "columntype":"zcbus_columntype",
          "startTime":"zcbus_starttime",
          "stopTime":"zcbus_stoptime",
          "optype":"zcbus_op_type",
          "db_type":"zcbus_db_type",
          "op_time":"zcbus_op_time",
          "db_name":"zcbus_db_name",
          "batchCode":"batchcode",
          "table_name":"zcbus_table_name",
          "insert_count":"insert_count",
          "update_count":"update_count",
          "delete_count":"delete_count",
          "ddl_count":"ddl_count",
          "schema_name":"zcbus_schema_name",
          "insertvalues":"after",
          "deletevalues":"before",
          "updatebefore":"before",
          "updateafter":"after",
          "primarykey":"primarykey",
          "records":"records",
          "oper_count":"oper_count",
          "sendDate":"senddate",
          "shardingKey":"shardingkey"
      },
      "opermap":{
          "fullload":"R",
          "insert":"I",
          "update":"U",
          "delete":"D",
          "ddl":"DDL",
          "real_check":"RC",
          "full_check":"FC"
      },
      "loaderTime":1,
      "rid":1,
      "useupdatemark":1,
      "sendtype":1,
      "ddl":0,
      "updatetodeleteandinsert":0,
      "ColNameCase":1,
      "ifIncludecolumnType":1,
      "ifIncludeprimaryKey":1,
      "ifIncludebatchCode":0,
      "ifIncludesendDate":1,
      "ifIncludeCheckBatchCode":0,
      "ifIncludeSharedingKey":1,
      "sendbatch":0,
      "real_check":1,
      "full_check":0,
      "module_check":"tel_check_v2",
      "ifddltodict":0,
      "ifIncludeCheckDateRange":0,
      "ifIncludeSend_type":0,
      "ifIncludeoptime":0,
      "ifIncludedbtype":1,
      "ifIncludeSchemaName":0,
      "ifSupplementUpdateAfter":1,
      "ifInclueUpdateBefore":0,
      "ifIncludeSourceHostPort":0,
      "ifIncludeShardingSchemaTable":0,
      "ifIncludeCheckZeroData":0,
      "ifIncludeoptime":1,
      "skipddl":0,
      "jsontype":"simple_json"
    }
  • INSERT操作

    {"zcbus_op_time":"2024-06-13 14:08:16","zcbus_op_type":"insert","zcbus_table_name":"test","op_time":"2024-06-13 14:08:16","zcbus_db_type":1,"zcbus_loadertime":"2024-06-13 14:08:27","c1":"121212111","c2":"111111"}
  • UPDATE操作

    {"zcbus_op_time":"2024-06-13 14:08:53","zcbus_op_type":"update","zcbus_table_name":"test","op_time":"2024-06-13 14:08:53","zcbus_db_type":1,"zcbus_loadertime":"2024-06-13 14:09:00","c1":"121212111","c2":"1111111111"}
  • DELETE操作,自动过滤

  • DDL操作,自动过滤

文档更新时间: 2024-06-16 22:56   作者:程少波