一、订阅配置

1.1 创建订阅客户端

如已存在普通订阅客户端,可忽略此步骤,进入下一步
进入 订阅 页面,再点击订阅页面左上角 订阅管理 按钮
进入 订阅管理 页面,点击右上角 添加 按钮,在弹出的新增页面中,输入

  • 客户端名称:可自行填写
  • 客户端类型:选择为普通客户端
  • 容器:可自行选择,一般默认为zcbus
  • 状态:默认即可
    点击 保存 按钮,即可在 订阅管理 页面中看到新增的订阅客户端

1.2 创建KAFKA api客户端

进入订阅-订阅管理页面,新增一个新的KAFKA API客户端

  • 客户端名称:可自行填写
  • 客户端类型:选择API客户端
  • 容器:可自行选择,一般默认为ZCBUS
  • 状态:默认为启动即可,也可进入 容器-详情 页面,进行手动启停
  • API类型:选择类型为KAFKA

注:将数据订阅到KAFKA时,创建的API客户端与普通客户端应在同一个容器内,此处的KAFKA API客户端对应配置参数当中的service_host和service_port两项

1.3 订阅数据库管理

进入 订阅 页面,选择普通订阅客户端后,点击左上角 数据库管理 按钮,新增一个新的数据库,类型选择为 KAFKA

  • host: 一般可为 软件登陆页面地址,如 http://192.168.37.4:8890
  • service_host: 为API客户端所在的主机地址,service_host一般是API所在主机的名字或IP
  • service_port: 可自选,一般选择为在本页1.2章节中所创建的API客户端
  • kafka_bootstrap.servers:为KAFKA地址和端口
  • zcbus_other: 默认即可,参数具体内容,可参考本页 补充说明-zcbus_other参数说明
  • 高级设置:如无特殊配置,默认即可,具体参数内窜,可参考本页 补充说明-目标数据库参数列表
    测试连接完成后点击 提交 按钮即可

1.4 订阅数据表配置

进入 订阅 页面,再点击订阅页面 数据表 按钮

  • 点击操作类型,选择批量修改库名,修改库名为目标端topic名
  • 点击操作类型,选择批量修改表名,修改表名为目标端partition编号,一般情况下设置为0即可
  • 源端多个表可以指定目标端相同topic的partition,如新增一张表,可以指定partition=1

二、补充说明

2.1 zcbus_other参数说明

{
    "keymap":{                            ### JSON中关键字翻译
        "loaderTime":"SEND_TIME",         ### 发送时间
        "send_type":"SEND_TYPE",          ### 发送类型
        "columntype":"COLUMN_TYPE",       ### 列类型
        "startTime":"START_TIME",         ### 开始时间
        "stopTime":"END_TIME",            ### 结束时间
        "optype":"OPERATION_TYPE",        ### 操作类型
        "db_type":"DB_TYPE",              ### 数据库类型
        "op_time":"OPERATION_TIME",       ### 源端提交时间
        "db_name":"SCHEMA_NAME",          ### 源端DB名字
        "batch_id":"RECONCILIATION_ID",   ### 发送时间 批次号
        "table_name":"TABLE_NAME",        ### 源端表名
        "insert_count":"INSERT_COUNT",    ### 插入统计关键字
        "update_count":"UPDATE_COUNT",    ### 更新统计关键字
        "delete_count":"DELETE_COUNT",    ### 删除统计关键字
        "ddl_count":"DDL_COUNT",          ### DDL统计关键字
        "schema_name":"SCHEMA_NAME",      ### 源端SCHEMA名字
        "before":"BEFORE",                ### 更新中BEFORE关键字
        "after":"AFTER",                  ### 更新中AFTER关键字
        "insertvalues":"INSERT",          ### 插入数据关键字
        "deletevalues":"DELETE",          ### 删除数据关键字
        "primarykey":"PRIMARY_KEY",       ### 主键列关键字
        "columntype":"COLUMN_TYPE",       ### 数据类型关键字
        "values":"DATALOAD",              ### 删除、插入关键字(20221110-废弃,insert替换为after,delete更新为before)
        "records":"SEND_RECORD",          ### 发送记录书统计关键字
        "ownerTable":"table",             ### 表owner信息
        "check_batch_id":"ID",            ### 对账batchid信息
        "oper_count":"OPER_COUNT"         ### 对账统计数量关键字
    },
    "opermap":{
        "fullload":"R",                     ### 是否包含全量标签信息,自定义标签
        "insert":"I",                       ### 增量数据标签,默认insert
        "update":"U",                       ### 增量数据标签,默认update
        "delete":"D",                       ### 增量数据标签,默认delete
        "ddl":"DDL"                         ### 增量数据标签,默认ddl
    },
    "loaderTime":1,                         ### 是否增加发送时间
    "rid":1,                                ### 标签
    "useupdatemark":1,                      ### 
    "sendtype":1,                           ### 发送类型,批量发送/单条发送
    "ddl":0,                                ### 
    "updatetodeleteandinsert":0,            ### 增量JSON中,更新操作是否更新为DELETE/INSERT操作
    "ColNameCase":0,                        ### 列名大小写配置,0不变,1小写,2大写
    "columnType":1,                         ### JSON中是否含有数据类型列
    "primaryKey":1,                         ### JSON中是否含有主键信息
    "sendHeader":{                          ### 头部补充信息,可以自定义,分为固定值和变量两种模型
        "provcode":"fix:834",               ### fix 代表固定值,provcode 赋予固定值为834
        "srccode":"fix:MSS",                ### 
        "dataacct":"var:CURRENT_DATE(YYYYmmdd)", ### var代表变量,后边支持current_date获取时间关键字,括号内为返回当前时间格式
        "batchnum":"var:NUM(2)",            ### 获取两位数字编码批次号,编码在batchCode关键字中获取
        "filename":""
    },
    "batchCode":1,                          ### 是否生成批次号信息
    "ifddltodict":1,                        ### 在执行DDL的时候,将ddl发送修改为表字典信息推送
    "sendbatch":0,                          ### 是否发送批量对账
    "real_check":1,                         ### 是否进行增量对账
    "full_check":1,                         ### 是否进行全量对账
    "module_check":"check_basic",           ### 数据对账模型
                                            ###check_basic
                                            ###tel_bj_module
    "ifIncludesendDate":0,                  ### 是否发送当天日期,格式yyyyMMdd
    "ifIncludeSend_type":1,                 ### 是否生效Send_type关键字
    "ifIncludeSchemaName":1,                ### 是否包含模式标签
    "ifIncludeTableName":1,                 ### 是否包含表名标签
    "ifIncludeOwnerTable":1,                ### 是否打开ownertable标签,会生成ownerTable标签,值为so1.tb01
    "ifIncludeoptime":0,                    ### 是否包含源端提交时间标签
    "ifIncludedbtype":0,                    ###是否包含源端数据库类型标签
    "ifIncludeprimaryKey":1,                ### 是否包含主键信息(订阅DDL解析生成主键信息)
    "ifIncludeprimaryKeyV2":1,              ### 是否包含主键信息,依赖发布自动生成的主键信息,参考bus_push_table_index表
    "ifIncludeGlobalSequence":0,            ### 是否生成全局序列号,每条记录生成一条唯一值
    "ifIncludeSequence":0,                  ### 是否生成表级别序列号,每个表每条记录生成一条唯一值
    "ifInclueUpdateBefore":1,               ### 更新操作是否包含before数据
    "ifIncludeAppendUpdateBeforeColumnsToNull":1,  ### 将before数据追加到after数据中不存在的列数据信息中去
    "ifIncludeSourceHostPort":1,            ### 是否包含源端IP地址和端口号
    "ifIncludeSharedingKey":1,              ### 是否包含Shareding 分片信息
    "ifIncludeCheckZeroData":0,             ### 对账信息,如果都是0,是否发送,0不发送,1发送对账信息
    "ifTableNameCase":0,                    ### 表名大小写,0 不变,1小写,2大写
    "LoaderTimeFormat":"yyyy-MM-dd HH:mm:ss", ### 发送时间格式,默认为yyyy-MM-dd HH:mm:ss格式,可以自定义,如yyyy-MM-dd'T'HH:mm:ss
    "skipddl":1,                            ### 是否跳过DDL
    "chk_real_topic":"SAPSR3_INC_CHECK:0",  ### 增量对账TOPIC
    "chk_full_topic":"SAPSR3_CHECK:0",      ### 全量对账TOPIC
    "chk_topic":"chk_topic_01",             ### 对账TOPIC
    "ifIncludecolumnType":"1",              ### 是否包含数据类型列
    "ifFilterDelete":1,                     ### 是否过滤delete操作
    "ifFilterInsert":1,                     ### 是否过滤insert操作
    "ifFilterUpdate":1,                     ### 是否过滤update操作
    "jsontype":"json_module"                ### json格式  zcbus、debezium、json_module 等格式
}

2.2 KAFKA兼容的安全认证方式

  • SSL方式
    • 通过TLS/SSL协议提供了一种安全的通信机制,可以保护Kafka客户端和Kafka集群之间的通信安全。
    • 需要关注kafka_ssl.key.password,kafka_ssl.keystore.location,kafka_ssl.keystore.password,kafka_ssl.truststore.locationkafka_ssl.truststore.password,kafka_ssl.enabled.protocols,kafka_ssl.keystore.type,kafka_ssl.protocol,kafka_ssl.provider,kafka_ssl.truststore.type,kafka_ssl.cipher.suites,kafka_ssl.endpoint.identification.algorithm,kafka_ssl.keymanager.algorithm,kafka_ssl.secure.random.implementation,kafka_ssl.trustmanager.algorithm参数项,详情请参考描述内容。
  • SASL方式
    • SASL是一种通用的认证框架,Kafka使用它来支持多种认证机制,包括PLAIN、GSSAPI和kerberos。通过SASL认证,Kafka客户端可以使用用户名和密码或者其他凭据来进行身份验证。
    • 需要关注kafka_sasl.jaas.config,kafka_sasl.mechanism,kafka_security.protocol, kafka_sasl.kerberos.kinit.cmd参数项,详情请参考描述内容。

2.3 目标数据库参数列表

序号 参数名 参考值 是否隐藏 描述
0 label - NO 标签
1 db_type kafka NO 输送目标类型API类型
2 service_host 172.17.58.146 NO 用户登录的url,如http://xxx.xxx.xxx.xxx:xxxx/sys/authentication
3 service_port 10000 NO 发送实际数据的url,如http://xxx.xxx.xxx.xxx:xxxx/api/data
4 api_type kafka NO API推送类型
5 zcbus_other 参考值详见下方 NO 关键字映射
6 kafka_bootstrap.servers 172.17.58.146:9092 NO KAFKA IP地址
7 kafka_max.request.size 10485760 YES 请求的最大字节数。这也是对最大消息大小的有效限制。注意:server具有自己对消息大小的限制,这些大小和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以防发出巨量的请求。
8 kafka_compression.type snappy YES 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。
string类型
9 kafka_value.serializer org.apache.kafka.common.serialization.StringSerializer YES value的序列化类(实现序列化接口)
class类型
10 kafka_key.serializer org.apache.kafka.common.serialization.StringSerializer YES key的序列化类(实现序列化接口)
class类型
11 kafka_client.id ZbomcClient YES 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。
12 kafka_producer.type async YES producer模式,分为同步和异步模式(sync/async)
13 kafka_acks all YES 生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置:
acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。
acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。
acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。这是最强壮的可用性保障。等价于acks=-1。
string 类型
14 kafka_buffer.memory 33554432 YES 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。
此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。
long类型
15 kafka_retries 0 YES 设置一个比零大的值,客户端如果发送失败则会重新发送。注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。
int 类型
16 kafka_ssl.key.password - YES 密钥仓库文件中的私钥的密码。
password类型
17 kafka_ssl.keystore.location - YES 密钥仓库文件的位置。可用于客户端的双向认证。
string类型
18 kafka_ssl.keystore.password - YES 密钥仓库文件的仓库密码。只有配置了ssl.keystore.location时才需要。
19 kafka_ssl.truststore.location - YES 信任仓库的位置
string类型
20 kafka_ssl.truststore.password - YES 信任仓库文件的密码
password类型
21 kafka_batch.size 262144 YES 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位):
不会打包大于此配置大小的消息。
发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。
较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。
int类型
22 kafka_connections.max.idle.ms 540000 YES 多少毫秒之后关闭闲置的连接。
long类型
23 kafka_max.block.ms 60000 YES 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。
24 kafka_receive.buffer.bytes 65536 YES 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,则将使用OS默认值。
int类型
25 kafka_request.timeout.ms 300000 YES 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。
int类型
26 kafka_sasl.jaas.config - YES JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:’(=)*;’
27 kafka_sasl.kerberos.service.name - YES Kafka运行的Kerberos主体名称。可以在Kafka的JAAS配置或Kafka的配置中定义。
string类型
28 kafka_sasl.mechanism GSSAPI YES SASL机制用于客户端连接。这是安全提供者可用与任何机制。GSSAPI是默认机制。
29 kafka_security.protocol PLAINTEXT YES 用于与broker通讯的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。
string类型
30 kafka_send.buffer.bytes 131072 YES 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。如果值为 -1,将默认使用系统的。
int类型
31 kafka_ssl.enabled.protocols TLSv1.2,TLSv1.1,TLSv1 YES 启用SSL连接的协议列表。
list类型
32 kafka_ssl.keystore.type JKS YES 密钥存储文件的文件格式。对于客户端是可选的。
string类型
33 kafka_ssl.protocol TLS YES 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。
string类型
34 kafka_ssl.provider - YES 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。
string类型
35 kafka_ssl.truststore.type JKS YES 信任仓库文件的文件格式。
36 kafka_enable.idempotence false YES 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。
boolean 类型
37 kafka_interceptor.classes - YES 实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。默认情况下没有拦截器。
list类型
38 kafka_max.in.flight.requests.per.connection 5 YES 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。
int类型
39 kafka_metadata.max.age.ms 300000 YES 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。
long类型
40 kafka_metric.reporters - YES 用作metrics reporters(指标记录员)的类的列表。实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。
list类型
41 kafka_metrics.num.samples 2 YES 维护用于计算度量的样例数量。
int
42 kafka_metrics.recording.level INFO YES 指标的最高记录级别。
string类型
43 kafka_metrics.sample.window.ms 30000 YES 度量样例计算上
long类型
44 kafka_reconnect.backoff.max.ms 1000 YES 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。
long类型
45 kafka_reconnect.backoff.ms 20000 YES 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。
long类型
46 kafka_retry.backoff.ms 20000 YES 尝试重试指定topic分区的失败请求之前等待的时间。这样可以避免在某些故障情况下高频次的重复发送请求。
long类型
47 kafka_sasl.kerberos.kinit.cmd /usr/bin/kinit YES Kerberos kinit 命令路径。
string类型
48 kafka_sasl.kerberos.min.time.before.relogin 60000 YES Login线程刷新尝试之间的休眠时间。
long类型
49 kafka_sasl.kerberos.ticket.renew.jitter 0.05 YES 添加更新时间的随机抖动百分比。
double类型
50 kafka_sasl.kerberos.ticket.renew.window.factor 0.8 YES 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。
double类型
51 kafka_ssl.cipher.suites - YES 密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。
list类型
52 kafka_ssl.endpoint.identification.algorithm - YES 使用服务器证书验证服务器主机名的端点识别算法。
string类型
53 kafka_ssl.keymanager.algorithm SunX509 YES 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。
string类型
54 kafka_ssl.secure.random.implementation - YES 用于SSL加密操作的SecureRandom PRNG实现。
string类型
55 kafka_ssl.trustmanager.algorithm PKIX YES 用于SSL连接的信任管理因子算法。默认值是JAVA虚拟机配置的信任管理工厂算法。
string类型
56 kafka_transaction.timeout.ms 60000 YES 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。
int类型
57 kafka_linger.ms 100 YES 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位):
不会打包大于此配置大小的消息。
发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。
较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。
int类型
58 label - NO 为数据库添加标签,备注说明
59 password_encrypt 0 YES 设置为1时,password和source_password参数使用密文保存,默认为0
文档更新时间: 2024-10-23 23:48   作者:操李红