可以利用kafka工具查看topic中的数据情况

kafka-console-consumer.sh --bootstrap-server ip:port --topic topic名 --from-beginning
例如:
[root@node ~]# docker exec -it zcbuskafka bash
[root@zcbuskafka kafka]# cd bin
[root@zcbuskafka bin]# ./kafka-console-consumer.sh --bootstrap-server zcbuskafka:9092 --topic test001 --from-beginning
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"257","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"258","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"259","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"260","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
……

1 源库配置

1.1 发布选择kafka类型

1.2 配置连接信息

对于填写连接信息时 只需填写topic名以及kafka_bootstrap.servers(kafka连接信息),其他参数默认即可。

序号 配置项 配置参数 描述信息
1 db_type KAFKA 源数据库类型
2 data_format json kafka中数据的格式,支持bsd/json,默认按照bsd类型解析
3 label - 标签
4 topic topic名字 kafka增量数据来源的topic名字
5 kafka_auto.commit.interval.ms 60000 自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)
消费者偏移的频率以毫秒为单位自动提交给Kafka,如果enable.auto.commit设置为true。
6 kafka_auto.offset.reset earliest 这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:?
● earliest: 自动将偏移量重置为最早的偏移量
● latest:自动将偏移量重置为最新的偏移量
● none: 如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。
● anything else: 如果不是上述3种,只抛出异常给consumer。
7 kafka_bootstrap.servers 172.17.58.146:9092 用于建立与kafka集群连接的host/port组。数据将会在所有servers上均衡加载,不管哪些server是指定用于bootstrapping。这个列表仅仅影响初始化的hosts(用于发现全部的servers)。这个列表格式:
host1:port1,host2:port2,…
因为这些server仅仅是用于初始化的连接,以发现集群所有成员关系(可能会动态的变化),这个列表不需要包含所有的servers(你可能想要不止一个server,尽管这样,可能某个server宕机了)。如果没有server在这个列表出现,则发送数据会一直失败,直到列表可用。
list类型
8 kafka_session.timeout.ms 100000 使用Kafka的组管理设施时,用于检测消费者失败的超时。消费者定期发送心跳来向经纪人表明其活跃度。如果代理在该会话超时到期之前没有收到心跳,那么代理将从该组中删除该消费者并启动重新平衡。请注意,该值必须在允许的范围内
Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。
9 kafka_enable.auto.commit false enable.auto.commit 参数用于指定是否自动提交消费者的偏移量。如果设置为 true,消费者会定期自动提交偏移量;如果设置为 false,您需要手动提交偏移量。默认值为 true。
10 source_db_type mysql 源 类型
11 source_db_name kafka 实例名【kafka可以忽略】
12 source_host 127.0.0.1 数据库主机地址
13 source_port 3306 数据库端口号
14 source_user zcbus 数据库用户名
15 source_password 1qaz!QAZ 数据库密码
16 real_send_cache_buffer 0 增量发布,设置cache中缓存buffer大小,单位M,最小50,最大500,如果设置这个值,real_send_queues忽略
17 statistics_into_influxdb 0 是否将增量信息记录到influxdb中 0 不添加 1添加
18 send_sys_time 0 发送系统时间的时间间隔,单位为分钟,设置大于0时,每隔指定的时间间隔发送一次系统时间
19 send_log_position 0 发送增量分析日志点的时间间隔,单位为分钟,默认为0不发送,设置大于0时,每隔指定的时间间隔发送一次增量分析的日志点、日志时间、系统时间
20 max_conf_db_connection 1 zcbus配置库的最大连接数,默认是2

1.3 设置json模版json_template

json转bsd模版:
例子:
{
    "insert_value": "/",
    "delete_value": "/",
    "ddl_value": "values",
    "schema_name": "sc01",
    "table_name": [
      { "name":"appname" },
      "_",
      { "name":"tag"},
      "_001"
    ],
  "columns": [
    { "name": "timestamp", "new_name": "time_stamp","convert": "timestamp_ms_to_date" },
    { "name": "event_time","new_name": "eventtime" ,"convert": "timestamp_ms_to_date" },
    { "name": "switch.kvmsg.SourceIP","new_name": "switch_sourceIP" }
  ],
   "optime": {
        "name": "timestamp",
        "convert":"timestamp_ms_to_date"
    },
    "optype": "insert"
}
  • 参数描述:
    一. schmea名
    比如schema名字是sc01,则配置”schema_name”:”sc01”
    二. table名
    1. 如果是固定的table名字比如tb01,则配置”table_name”:”tb01”
    2. 如果需要根据用户json串中的多个标签的值比如table_tag1和table_tag2来拼接,可以写成数组形式,比如
      “table_name”: [
      { “name”:”table_tag1” },
      “,
      { “name”:”table
      tag2”, “concat”:”“ },
      abc”
      ]
      如果标签table_tag1的值是ss,标签table_tag2的值是数组[“tt1”,”tt2”],数组的每个值之间用”concat”设置的下划线连接,拼接出来的表名是ss_tt1_tt2_abc
      三. 操作时间
    3. 如果是固定的操作时间,可以写”optime”:”2020-02-05 10:21:32”
    4. 如果是从json串中的某个字段比如loadertime取值,可以写成:
      “optime”: {
      “name”: “loadertime”
      }
      此时loadertime的值需要是标准的时间字符串。
    5. 如果loadertime的值是timestamp的数值,需要转成时间字符串,则可以设置”convert”:”timestamptodate”进行转换,如:
      “optime”: {
      “name”: “loadertime”,
      “convert”:”timestamp_to_date\timestamp_ms_to_date”
      }
      四. 操作类型
    6. 如果所有json串都是固定的操作类型,比如都固定转成insert,可以用”optype”:”insert”
    7. 如果操作类型是根据用户json串中的某个标签的值来决定,比如optype_tag标签对应的值作为操作类型,则可以用下面的方式:
      “optype”: {
      “name”: “optype_tag”
      }
      此时optype_tag标签的值,必须是程序默认的操作类型之一,insert/update/delete/ddl。
    8. 如果optype_tag标签的值不是程序默认的操作类型,则可以配置类型映射,比如标签的值”I/U/D/DDL”分别对应的操作类型是insert/update/delete/ddl,如下:
      "optype": {
         "name": "optype_tag",
         "opmap": {
             "I": "insert",
             "U": "update",
             "D": "delete",
             "DDL": "ddl"
             }
      }
      五. 列数据
    9. 可以用”insert_value”: “values_tag”来设置用户json串的values_tag标签下的值是insert的列数据,如果只有一层json,所有标签都是insert的列的话,可以设置”insert_value”: “/“
    10. 可以用”delete_value”: “values_tag”来设置用户json串的values_tag标签下的值是delete的列数据,如果只有一层json,所有标签都是delete的列的话,可以设置”delete_value”: “/“
    11. 可以用”after_value”: “after_tag”来设置用户json串的after_tag标签下的值是update操作after值的列数据
    12. 可以用”before_value”: “before_tag”来设置用户json串的before_tag标签下的值是update操作before值的列数据
    13. 可以用”ddl_value”: “values_tag”来设置用户json串的values_tag标签下的值是ddl操作语句
    14. 如果不设置”insert_value”的话,默认值为”insert_value”: “values”
    15. 如果不设置”delete_value”的话,默认值为”delete_value”: “values”
    16. 如果不设置”after_value”的话,默认值为”after_value”: “after”
    17. 如果不设置”before_value”的话,默认值为”before_value”: “before”
      10.如果不设置”ddl_value”的话,默认值为”ddl_value”: “values”
    18. 如果列信息不需要修改的话,不需要配置”columns”
    19. 如果要修改列名,或者增加列值转换的话,需要配置”columns”
      例如col1列需要改名为col1_new,并且对列值进行timestamp_ms_to_date转换,col2列需要改名为col2_new,则如下配置:
      columns: [
      { “name”: “col1”, “new_name”: “col1_new”, “convert”: “timestamp_ms_to_date” },
      { “name”: “col2”, “new_name”: “col2_new” },
      ]
      六. 其他
    20. 如果列名中有’.’的话,自动替换成”_”。
    21. 如果列值是数组的话,把数组每个成员的值用逗号拼接成一个大字符串。

注意 :
(1) kafka发布 暂时可以识别增量数据内容 ;
(2) 发布时,选择批量导入方式 填写库名.表名;
(3) 订阅到目标数据库时 需手动建表,如有主键则使用主键 没有则需指定apply_by_key参数;

文档更新时间: 2024-06-30 23:04   作者:程少波