可以利用kafka工具查看topic中的数据情况
kafka-console-consumer.sh --bootstrap-server ip:port --topic topic名 --from-beginning
例如:
[root@node ~]# docker exec -it zcbuskafka bash
[root@zcbuskafka kafka]# cd bin
[root@zcbuskafka bin]# ./kafka-console-consumer.sh --bootstrap-server zcbuskafka:9092 --topic test001 --from-beginning
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"257","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"258","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"259","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
{"op_time":"2024-06-04 10:21:20","db_type":"mysql","schema_name":"testdb","table_name":"test","optype":"insert","values":{"id":"260","name":"sdfrsf","optime":"2023-11-05 07:31:13","col1":"2023-11-05","col2":"zbcsdkseebfbds","col3":"850226","col4":"a1AAieFJMAZA","col5":"4b4DLPNyiOFBpcN0imgmh"}}
……
1 源库配置
1.1 发布选择kafka类型
1.2 配置连接信息
对于填写连接信息时 只需填写topic名以及kafka_bootstrap.servers(kafka连接信息),其他参数默认即可。
序号 | 配置项 | 配置参数 | 描述信息 |
---|---|---|---|
1 | db_type | KAFKA | 源数据库类型 |
2 | data_format | json | kafka中数据的格式,支持bsd/json,默认按照bsd类型解析 |
3 | label | - | 标签 |
4 | topic | topic名字 | kafka增量数据来源的topic名字 |
5 | kafka_auto.commit.interval.ms | 60000 | 自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s) 消费者偏移的频率以毫秒为单位自动提交给Kafka,如果enable.auto.commit设置为true。 |
6 | kafka_auto.offset.reset | earliest | 这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:? ● earliest: 自动将偏移量重置为最早的偏移量 ● latest:自动将偏移量重置为最新的偏移量 ● none: 如果边更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。 ● anything else: 如果不是上述3种,只抛出异常给consumer。 |
7 | kafka_bootstrap.servers | 172.17.58.146:9092 | 用于建立与kafka集群连接的host/port组。数据将会在所有servers上均衡加载,不管哪些server是指定用于bootstrapping。这个列表仅仅影响初始化的hosts(用于发现全部的servers)。这个列表格式: host1:port1,host2:port2,… 因为这些server仅仅是用于初始化的连接,以发现集群所有成员关系(可能会动态的变化),这个列表不需要包含所有的servers(你可能想要不止一个server,尽管这样,可能某个server宕机了)。如果没有server在这个列表出现,则发送数据会一直失败,直到列表可用。 list类型 |
8 | kafka_session.timeout.ms | 100000 | 使用Kafka的组管理设施时,用于检测消费者失败的超时。消费者定期发送心跳来向经纪人表明其活跃度。如果代理在该会话超时到期之前没有收到心跳,那么代理将从该组中删除该消费者并启动重新平衡。请注意,该值必须在允许的范围内 Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。 |
9 | kafka_enable.auto.commit | false | enable.auto.commit 参数用于指定是否自动提交消费者的偏移量。如果设置为 true,消费者会定期自动提交偏移量;如果设置为 false,您需要手动提交偏移量。默认值为 true。 |
10 | source_db_type | mysql | 源 类型 |
11 | source_db_name | kafka | 实例名【kafka可以忽略】 |
12 | source_host | 127.0.0.1 | 数据库主机地址 |
13 | source_port | 3306 | 数据库端口号 |
14 | source_user | zcbus | 数据库用户名 |
15 | source_password | 1qaz!QAZ | 数据库密码 |
16 | real_send_cache_buffer | 0 | 增量发布,设置cache中缓存buffer大小,单位M,最小50,最大500,如果设置这个值,real_send_queues忽略 |
17 | statistics_into_influxdb | 0 | 是否将增量信息记录到influxdb中 0 不添加 1添加 |
18 | send_sys_time | 0 | 发送系统时间的时间间隔,单位为分钟,设置大于0时,每隔指定的时间间隔发送一次系统时间 |
19 | send_log_position | 0 | 发送增量分析日志点的时间间隔,单位为分钟,默认为0不发送,设置大于0时,每隔指定的时间间隔发送一次增量分析的日志点、日志时间、系统时间 |
20 | max_conf_db_connection | 1 | zcbus配置库的最大连接数,默认是2 |
1.3 设置json模版json_template
json转bsd模版:
例子:
{
"insert_value": "/",
"delete_value": "/",
"ddl_value": "values",
"schema_name": "sc01",
"table_name": [
{ "name":"appname" },
"_",
{ "name":"tag"},
"_001"
],
"columns": [
{ "name": "timestamp", "new_name": "time_stamp","convert": "timestamp_ms_to_date" },
{ "name": "event_time","new_name": "eventtime" ,"convert": "timestamp_ms_to_date" },
{ "name": "switch.kvmsg.SourceIP","new_name": "switch_sourceIP" }
],
"optime": {
"name": "timestamp",
"convert":"timestamp_ms_to_date"
},
"optype": "insert"
}
- 参数描述:
一. schmea名
比如schema名字是sc01,则配置”schema_name”:”sc01”
二. table名
- 如果是固定的table名字比如tb01,则配置”table_name”:”tb01”
- 如果需要根据用户json串中的多个标签的值比如table_tag1和table_tag2来拼接,可以写成数组形式,比如
“table_name”: [
{ “name”:”table_tag1” },
““,
{ “name”:”tabletag2”, “concat”:”“ },
“abc”
]
如果标签table_tag1的值是ss,标签table_tag2的值是数组[“tt1”,”tt2”],数组的每个值之间用”concat”设置的下划线连接,拼接出来的表名是ss_tt1_tt2_abc
三. 操作时间- 如果是固定的操作时间,可以写”optime”:”2020-02-05 10:21:32”
- 如果是从json串中的某个字段比如loadertime取值,可以写成:
“optime”: {
“name”: “loadertime”
}
此时loadertime的值需要是标准的时间字符串。- 如果loadertime的值是timestamp的数值,需要转成时间字符串,则可以设置”convert”:”timestamptodate”进行转换,如:
“optime”: {
“name”: “loadertime”,
“convert”:”timestamp_to_date\timestamp_ms_to_date”
}
四. 操作类型- 如果所有json串都是固定的操作类型,比如都固定转成insert,可以用”optype”:”insert”
- 如果操作类型是根据用户json串中的某个标签的值来决定,比如optype_tag标签对应的值作为操作类型,则可以用下面的方式:
“optype”: {
“name”: “optype_tag”
}
此时optype_tag标签的值,必须是程序默认的操作类型之一,insert/update/delete/ddl。- 如果optype_tag标签的值不是程序默认的操作类型,则可以配置类型映射,比如标签的值”I/U/D/DDL”分别对应的操作类型是insert/update/delete/ddl,如下:
五. 列数据
"optype": { "name": "optype_tag", "opmap": { "I": "insert", "U": "update", "D": "delete", "DDL": "ddl" } }
- 可以用”insert_value”: “values_tag”来设置用户json串的values_tag标签下的值是insert的列数据,如果只有一层json,所有标签都是insert的列的话,可以设置”insert_value”: “/“
- 可以用”delete_value”: “values_tag”来设置用户json串的values_tag标签下的值是delete的列数据,如果只有一层json,所有标签都是delete的列的话,可以设置”delete_value”: “/“
- 可以用”after_value”: “after_tag”来设置用户json串的after_tag标签下的值是update操作after值的列数据
- 可以用”before_value”: “before_tag”来设置用户json串的before_tag标签下的值是update操作before值的列数据
- 可以用”ddl_value”: “values_tag”来设置用户json串的values_tag标签下的值是ddl操作语句
- 如果不设置”insert_value”的话,默认值为”insert_value”: “values”
- 如果不设置”delete_value”的话,默认值为”delete_value”: “values”
- 如果不设置”after_value”的话,默认值为”after_value”: “after”
- 如果不设置”before_value”的话,默认值为”before_value”: “before”
10.如果不设置”ddl_value”的话,默认值为”ddl_value”: “values”- 如果列信息不需要修改的话,不需要配置”columns”
- 如果要修改列名,或者增加列值转换的话,需要配置”columns”
例如col1列需要改名为col1_new,并且对列值进行timestamp_ms_to_date转换,col2列需要改名为col2_new,则如下配置:
columns: [
{ “name”: “col1”, “new_name”: “col1_new”, “convert”: “timestamp_ms_to_date” },
{ “name”: “col2”, “new_name”: “col2_new” },
]
六. 其他- 如果列名中有’.’的话,自动替换成”_”。
- 如果列值是数组的话,把数组每个成员的值用逗号拼接成一个大字符串。
注意 :
(1) kafka发布 暂时可以识别增量数据内容 ;
(2) 发布时,选择批量导入方式 填写库名.表名;
(3) 订阅到目标数据库时 需手动建表,如有主键则使用主键 没有则需指定apply_by_key参数;
文档更新时间: 2024-06-30 23:04 作者:程少波