一、订阅配置
1.1 创建订阅客户端
如已存在普通订阅客户端,可忽略此步骤,进入下一步
进入 订阅 页面,再点击订阅页面左上角 订阅管理 按钮
进入 订阅管理 页面,点击右上角 添加 按钮,在弹出的新增页面中,输入
- 客户端名称:可自行填写
- 客户端类型:选择为普通客户端
- 容器:可自行选择,一般默认为zcbus
- 状态:默认即可
点击 保存 按钮,即可在 订阅管理 页面中看到新增的订阅客户端
1.2 创建KAFKA api客户端
进入订阅-订阅管理页面,新增一个新的KAFKA API客户端
- 客户端名称:可自行填写
- 客户端类型:选择API客户端
- 容器:可自行选择,一般默认为ZCBUS
- 状态:默认为启动即可,也可进入 容器-详情 页面,进行手动启停
- API类型:选择类型为KAFKA
注:将数据订阅到KAFKA时,创建的API客户端与普通客户端应在同一个容器内,此处的KAFKA API客户端对应配置参数当中的service_host和service_port两项
1.3 订阅数据库管理
进入 订阅 页面,选择普通订阅客户端后,点击左上角 数据库管理 按钮,新增一个新的数据库,类型选择为 KAFKA
- host: 一般可为 软件登陆页面地址,如 http://192.168.37.4:8890
- service_host: 为API客户端所在的主机地址,service_host一般是API所在主机的名字或IP
- service_port: 可自选,一般选择为在本页1.2章节中所创建的API客户端
- kafka_bootstrap.servers:为KAFKA地址和端口
- zcbus_other: 默认即可,参数具体内容,可参考本页 补充说明-zcbus_other参数说明
- 高级设置:如无特殊配置,默认即可,具体参数内窜,可参考本页 补充说明-目标数据库参数列表
测试连接完成后点击 提交 按钮即可
1.4 订阅数据表配置
进入 订阅 页面,再点击订阅页面 数据表 按钮
- 点击操作类型,选择批量修改库名,修改库名为目标端topic名
- 点击操作类型,选择批量修改表名,修改表名为目标端partition编号,一般情况下设置为0即可
- 源端多个表可以指定目标端相同topic的partition,如新增一张表,可以指定partition=1
二、补充说明
2.1 zcbus_other参数说明
{
"keymap":{ ### JSON中关键字翻译
"loaderTime":"SEND_TIME", ### 发送时间
"send_type":"SEND_TYPE", ### 发送类型
"columntype":"COLUMN_TYPE", ### 列类型
"startTime":"START_TIME", ### 开始时间
"stopTime":"END_TIME", ### 结束时间
"optype":"OPERATION_TYPE", ### 操作类型
"db_type":"DB_TYPE", ### 数据库类型
"op_time":"OPERATION_TIME", ### 源端提交时间
"db_name":"SCHEMA_NAME", ### 源端DB名字
"batch_id":"RECONCILIATION_ID", ### 发送时间 批次号
"table_name":"TABLE_NAME", ### 源端表名
"insert_count":"INSERT_COUNT", ### 插入统计关键字
"update_count":"UPDATE_COUNT", ### 更新统计关键字
"delete_count":"DELETE_COUNT", ### 删除统计关键字
"ddl_count":"DDL_COUNT", ### DDL统计关键字
"schema_name":"SCHEMA_NAME", ### 源端SCHEMA名字
"before":"BEFORE", ### 更新中BEFORE关键字
"after":"AFTER", ### 更新中AFTER关键字
"insertvalues":"INSERT", ### 插入数据关键字
"deletevalues":"DELETE", ### 删除数据关键字
"primarykey":"PRIMARY_KEY", ### 主键列关键字
"columntype":"COLUMN_TYPE", ### 数据类型关键字
"values":"DATALOAD", ### 删除、插入关键字(20221110-废弃,insert替换为after,delete更新为before)
"records":"SEND_RECORD", ### 发送记录书统计关键字
"ownerTable":"table", ### 表owner信息
"check_batch_id":"ID", ### 对账batchid信息
"oper_count":"OPER_COUNT" ### 对账统计数量关键字
},
"opermap":{
"fullload":"R", ### 是否包含全量标签信息,自定义标签
"insert":"I", ### 增量数据标签,默认insert
"update":"U", ### 增量数据标签,默认update
"delete":"D", ### 增量数据标签,默认delete
"ddl":"DDL" ### 增量数据标签,默认ddl
},
"loaderTime":1, ### 是否增加发送时间
"rid":1, ### 标签
"useupdatemark":1, ###
"sendtype":1, ### 发送类型,批量发送/单条发送
"ddl":0, ###
"updatetodeleteandinsert":0, ### 增量JSON中,更新操作是否更新为DELETE/INSERT操作
"ColNameCase":0, ### 列名大小写配置,0不变,1小写,2大写
"columnType":1, ### JSON中是否含有数据类型列
"primaryKey":1, ### JSON中是否含有主键信息
"sendHeader":{ ### 头部补充信息,可以自定义,分为固定值和变量两种模型
"provcode":"fix:834", ### fix 代表固定值,provcode 赋予固定值为834
"srccode":"fix:MSS", ###
"dataacct":"var:CURRENT_DATE(YYYYmmdd)", ### var代表变量,后边支持current_date获取时间关键字,括号内为返回当前时间格式
"batchnum":"var:NUM(2)", ### 获取两位数字编码批次号,编码在batchCode关键字中获取
"filename":""
},
"batchCode":1, ### 是否生成批次号信息
"ifddltodict":1, ### 在执行DDL的时候,将ddl发送修改为表字典信息推送
"sendbatch":0, ### 是否发送批量对账
"real_check":1, ### 是否进行增量对账
"full_check":1, ### 是否进行全量对账
"module_check":"check_basic", ### 数据对账模型
###check_basic
###tel_bj_module
"ifIncludesendDate":0, ### 是否发送当天日期,格式yyyyMMdd
"ifIncludeSend_type":1, ### 是否生效Send_type关键字
"ifIncludeSchemaName":1, ### 是否包含模式标签
"ifIncludeTableName":1, ### 是否包含表名标签
"ifIncludeOwnerTable":1, ### 是否打开ownertable标签,会生成ownerTable标签,值为so1.tb01
"ifIncludeoptime":0, ### 是否包含源端提交时间标签
"ifIncludedbtype":0, ###是否包含源端数据库类型标签
"ifIncludeprimaryKey":1, ### 是否包含主键信息(订阅DDL解析生成主键信息)
"ifIncludeprimaryKeyV2":1, ### 是否包含主键信息,依赖发布自动生成的主键信息,参考bus_push_table_index表
"ifIncludeGlobalSequence":0, ### 是否生成全局序列号,每条记录生成一条唯一值
"ifIncludeSequence":0, ### 是否生成表级别序列号,每个表每条记录生成一条唯一值
"ifInclueUpdateBefore":1, ### 更新操作是否包含before数据
"ifIncludeAppendUpdateBeforeColumnsToNull":1, ### 将before数据追加到after数据中不存在的列数据信息中去
"ifIncludeSourceHostPort":1, ### 是否包含源端IP地址和端口号
"ifIncludeSharedingKey":1, ### 是否包含Shareding 分片信息
"ifIncludeCheckZeroData":0, ### 对账信息,如果都是0,是否发送,0不发送,1发送对账信息
"ifTableNameCase":0, ### 表名大小写,0 不变,1小写,2大写
"LoaderTimeFormat":"yyyy-MM-dd HH:mm:ss", ### 发送时间格式,默认为yyyy-MM-dd HH:mm:ss格式,可以自定义,如yyyy-MM-dd'T'HH:mm:ss
"skipddl":1, ### 是否跳过DDL
"chk_real_topic":"SAPSR3_INC_CHECK:0", ### 增量对账TOPIC
"chk_full_topic":"SAPSR3_CHECK:0", ### 全量对账TOPIC
"chk_topic":"chk_topic_01", ### 对账TOPIC
"ifIncludecolumnType":"1", ### 是否包含数据类型列
"ifFilterDelete":1, ### 是否过滤delete操作
"ifFilterInsert":1, ### 是否过滤insert操作
"ifFilterUpdate":1, ### 是否过滤update操作
"jsontype":"json_module" ### json格式 zcbus、debezium、json_module 等格式
}
2.2 KAFKA兼容的安全认证方式
- SSL方式
- 通过TLS/SSL协议提供了一种安全的通信机制,可以保护Kafka客户端和Kafka集群之间的通信安全。
- 需要关注kafka_ssl.key.password,kafka_ssl.keystore.location,kafka_ssl.keystore.password,kafka_ssl.truststore.locationkafka_ssl.truststore.password,kafka_ssl.enabled.protocols,kafka_ssl.keystore.type,kafka_ssl.protocol,kafka_ssl.provider,kafka_ssl.truststore.type,kafka_ssl.cipher.suites,kafka_ssl.endpoint.identification.algorithm,kafka_ssl.keymanager.algorithm,kafka_ssl.secure.random.implementation,kafka_ssl.trustmanager.algorithm参数项,详情请参考描述内容。
- SASL方式
- SASL是一种通用的认证框架,Kafka使用它来支持多种认证机制,包括PLAIN、GSSAPI和kerberos。通过SASL认证,Kafka客户端可以使用用户名和密码或者其他凭据来进行身份验证。
- 需要关注kafka_sasl.jaas.config,kafka_sasl.mechanism,kafka_security.protocol, kafka_sasl.kerberos.kinit.cmd参数项,详情请参考描述内容。
2.3 目标数据库参数列表
序号 | 参数名 | 参考值 | 是否隐藏 | 描述 |
---|---|---|---|---|
0 | label | - | NO | 标签 |
1 | db_type | kafka | NO | 输送目标类型API类型 |
2 | service_host | 172.17.58.146 | NO | 用户登录的url,如http://xxx.xxx.xxx.xxx:xxxx/sys/authentication |
3 | service_port | 10000 | NO | 发送实际数据的url,如http://xxx.xxx.xxx.xxx:xxxx/api/data |
4 | api_type | kafka | NO | API推送类型 |
5 | zcbus_other | 参考值详见下方 | NO | 关键字映射 |
6 | kafka_bootstrap.servers | 172.17.58.146:9092 | NO | KAFKA IP地址 |
7 | kafka_max.request.size | 10485760 | YES | 请求的最大字节数。这也是对最大消息大小的有效限制。注意:server具有自己对消息大小的限制,这些大小和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以防发出巨量的请求。 |
8 | kafka_compression.type | snappy | YES | 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。 string类型 |
9 | kafka_value.serializer | org.apache.kafka.common.serialization.StringSerializer | YES | value的序列化类(实现序列化接口) class类型 |
10 | kafka_key.serializer | org.apache.kafka.common.serialization.StringSerializer | YES | key的序列化类(实现序列化接口) class类型 |
11 | kafka_client.id | ZbomcClient | YES | 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。 |
12 | kafka_producer.type | async | YES | producer模式,分为同步和异步模式(sync/async) |
13 | kafka_acks | all | YES | 生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置: acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。 acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。 acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。这是最强壮的可用性保障。等价于acks=-1。 string 类型 |
14 | kafka_buffer.memory | 33554432 | YES | 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。 此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。 long类型 |
15 | kafka_retries | 0 | YES | 设置一个比零大的值,客户端如果发送失败则会重新发送。注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。 int 类型 |
16 | kafka_ssl.key.password | - | YES | 密钥仓库文件中的私钥的密码。 password类型 |
17 | kafka_ssl.keystore.location | - | YES | 密钥仓库文件的位置。可用于客户端的双向认证。 string类型 |
18 | kafka_ssl.keystore.password | - | YES | 密钥仓库文件的仓库密码。只有配置了ssl.keystore.location时才需要。 |
19 | kafka_ssl.truststore.location | - | YES | 信任仓库的位置 string类型 |
20 | kafka_ssl.truststore.password | - | YES | 信任仓库文件的密码 password类型 |
21 | kafka_batch.size | 262144 | YES | 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位): 不会打包大于此配置大小的消息。 发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。 int类型 |
22 | kafka_connections.max.idle.ms | 540000 | YES | 多少毫秒之后关闭闲置的连接。 long类型 |
23 | kafka_max.block.ms | 60000 | YES | 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。 |
24 | kafka_receive.buffer.bytes | 65536 | YES | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,则将使用OS默认值。 int类型 |
25 | kafka_request.timeout.ms | 300000 | YES | 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。 int类型 |
26 | kafka_sasl.jaas.config | - | YES | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:’(=)*;’ |
27 | kafka_sasl.kerberos.service.name | - | YES | Kafka运行的Kerberos主体名称。可以在Kafka的JAAS配置或Kafka的配置中定义。 string类型 |
28 | kafka_sasl.mechanism | GSSAPI | YES | SASL机制用于客户端连接。这是安全提供者可用与任何机制。GSSAPI是默认机制。 |
29 | kafka_security.protocol | PLAINTEXT | YES | 用于与broker通讯的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string类型 |
30 | kafka_send.buffer.bytes | 131072 | YES | 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。如果值为 -1,将默认使用系统的。 int类型 |
31 | kafka_ssl.enabled.protocols | TLSv1.2,TLSv1.1,TLSv1 | YES | 启用SSL连接的协议列表。 list类型 |
32 | kafka_ssl.keystore.type | JKS | YES | 密钥存储文件的文件格式。对于客户端是可选的。 string类型 |
33 | kafka_ssl.protocol | TLS | YES | 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string类型 |
34 | kafka_ssl.provider | - | YES | 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。 string类型 |
35 | kafka_ssl.truststore.type | JKS | YES | 信任仓库文件的文件格式。 |
36 | kafka_enable.idempotence | false | YES | 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 boolean 类型 |
37 | kafka_interceptor.classes | - | YES | 实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。默认情况下没有拦截器。 list类型 |
38 | kafka_max.in.flight.requests.per.connection | 5 | YES | 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。 int类型 |
39 | kafka_metadata.max.age.ms | 300000 | YES | 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。 long类型 |
40 | kafka_metric.reporters | - | YES | 用作metrics reporters(指标记录员)的类的列表。实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。 list类型 |
41 | kafka_metrics.num.samples | 2 | YES | 维护用于计算度量的样例数量。 int |
42 | kafka_metrics.recording.level | INFO | YES | 指标的最高记录级别。 string类型 |
43 | kafka_metrics.sample.window.ms | 30000 | YES | 度量样例计算上 long类型 |
44 | kafka_reconnect.backoff.max.ms | 1000 | YES | 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 long类型 |
45 | kafka_reconnect.backoff.ms | 20000 | YES | 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。 long类型 |
46 | kafka_retry.backoff.ms | 20000 | YES | 尝试重试指定topic分区的失败请求之前等待的时间。这样可以避免在某些故障情况下高频次的重复发送请求。 long类型 |
47 | kafka_sasl.kerberos.kinit.cmd | /usr/bin/kinit | YES | Kerberos kinit 命令路径。 string类型 |
48 | kafka_sasl.kerberos.min.time.before.relogin | 60000 | YES | Login线程刷新尝试之间的休眠时间。 long类型 |
49 | kafka_sasl.kerberos.ticket.renew.jitter | 0.05 | YES | 添加更新时间的随机抖动百分比。 double类型 |
50 | kafka_sasl.kerberos.ticket.renew.window.factor | 0.8 | YES | 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。 double类型 |
51 | kafka_ssl.cipher.suites | - | YES | 密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。 list类型 |
52 | kafka_ssl.endpoint.identification.algorithm | - | YES | 使用服务器证书验证服务器主机名的端点识别算法。 string类型 |
53 | kafka_ssl.keymanager.algorithm | SunX509 | YES | 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 string类型 |
54 | kafka_ssl.secure.random.implementation | - | YES | 用于SSL加密操作的SecureRandom PRNG实现。 string类型 |
55 | kafka_ssl.trustmanager.algorithm | PKIX | YES | 用于SSL连接的信任管理因子算法。默认值是JAVA虚拟机配置的信任管理工厂算法。 string类型 |
56 | kafka_transaction.timeout.ms | 60000 | YES | 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。 int类型 |
57 | kafka_linger.ms | 100 | YES | 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位): 不会打包大于此配置大小的消息。 发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。 int类型 |
58 | label | - | NO | 为数据库添加标签,备注说明 |
59 | password_encrypt | 0 | YES | 设置为1时,password和source_password参数使用密文保存,默认为0 |
文档更新时间: 2024-10-23 23:48 作者:操李红