订阅RESTAPIKAFKA
  1. 配置KAFKA模式
变量 序号 描述
api_type kafka 6 API推送类型
db_name http://172.17.58.146:8085/api/data 3 发送实际数据的url,如http://xxx.xxx.xxx.xxx:xxxx/api/data
db_type RESTAPI 1 输送目标类型API类型
host http://172.17.58.146:8085/sys/authentication 2 用户登录的url,如http://xxx.xxx.xxx.xxx:xxxx/sys/authentication
kafka_acks all 16 生产者需要leader确认请求完成之前接收的应答数。此配置控制了发送消息的耐用性,支持以下配置:

acks=0 如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。

acks=1,这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。

acks=all 这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。这是最强壮的可用性保障。等价于acks=-1。

string 类型
kafka_batch.size 16384 24 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位):

不会打包大于此配置大小的消息。

发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。

较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。

int类型
kafka_bootstrap.servers 172.17.58.146:9092 9 KAFKA IP地址
kafka_buffer.memory 33554432 17 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。

此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。

long类型
kafka_client.id ZbomcClient 14 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。
kafka_compression.type snappy 11 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。

string类型
kafka_connections.max.idle.ms 540000 25 多少毫秒之后关闭闲置的连接。

long类型
kafka_enable.idempotence false 39 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。

boolean 类型
kafka_key.serializer org.apache.kafka.common.serialization.StringSerializer 13 key的序列化类(实现序列化接口)

class类型
kafka_linger.ms 1 60 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位):

不会打包大于此配置大小的消息。

发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。

较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。

int类型
kafka_max.block.ms 60000 26 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。
kafka_max.in.flight.requests.per.connection 5 41 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。

int类型
kafka_max.request.size 10485760 10 请求的最大字节数。这也是对最大消息大小的有效限制。注意:server具有自己对消息大小的限制,这些大小和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以防发出巨量的请求。
kafka_metadata.max.age.ms 300000 42 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。

long类型
kafka_metrics.num.samples 2 44 维护用于计算度量的样例数量。

int
kafka_metrics.recording.level INFO 45 指标的最高记录级别。

string类型
kafka_metrics.sample.window.ms 30000 46 度量样例计算上

long类型
kafka_producer.type async 15 producer模式,分为同步和异步模式(sync/async)
kafka_receive.buffer.bytes 65536 27 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。如果值为-1,则将使用OS默认值。

int类型
kafka_reconnect.backoff.max.ms 1000 47 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。

long类型
kafka_reconnect.backoff.ms 20000 48 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。

long类型
kafka_request.timeout.ms 30000 28 该配置控制客户端等待请求响应的最长时间。如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。

int类型
kafka_retries 0 18 设置一个比零大的值,客户端如果发送失败则会重新发送。注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。

int 类型
kafka_retry.backoff.ms 20000 49 尝试重试指定topic分区的失败请求之前等待的时间。这样可以避免在某些故障情况下高频次的重复发送请求。

long类型
kafka_sasl.kerberos.kinit.cmd /usr/bin/kinit 50 Kerberos kinit 命令路径。

string类型
kafka_sasl.kerberos.min.time.before.relogin 60000 51 Login线程刷新尝试之间的休眠时间。

long类型
kafka_sasl.kerberos.ticket.renew.jitter 0.05 52 添加更新时间的随机抖动百分比。

double类型
kafka_sasl.kerberos.ticket.renew.window.factor 0.8 53 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。

double类型
kafka_sasl.mechanism GSSAPI 31 SASL机制用于客户端连接。这是安全提供者可用与任何机制。GSSAPI是默认机制。
kafka_security.protocol PLAINTEXT 32 用于与broker通讯的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。

string类型
kafka_send.buffer.bytes 131072 33 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。如果值为 -1,将默认使用系统的。

int类型
kafka_ssl.enabled.protocols TLSv1.2,TLSv1.1,TLSv1 34 启用SSL连接的协议列表。

list类型
kafka_ssl.keymanager.algorithm SunX509 56 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。

string类型
kafka_ssl.keystore.type JKS 35 密钥存储文件的文件格式。对于客户端是可选的。

string类型
kafka_ssl.protocol TLS 36 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。

string类型
kafka_ssl.trustmanager.algorithm PKIX 58 用于SSL连接的信任管理因子算法。默认值是JAVA虚拟机配置的信任管理工厂算法。

string类型
kafka_ssl.truststore.type JKS 38 信任仓库文件的文件格式。
kafka_transaction.timeout.ms 60000 59 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。

int类型
kafka_value.serializer org.apache.kafka.common.serialization.StringSerializer 12 value的序列化类(实现序列化接口)

class类型
password 123456 5 数据库用户密码
user admin 4 数据库用户
zcbus_other {
“keymap”:{
“optype”:”opertype”,
“db_type”:”opertype”,
“op_time”:”opTs”,
“db_name”:”database_name”,
“batchCode”:”batchCode”,
“table_name”:”tableName”,
“before”:”beforecolinfo”,
“after”:”aftercolinfo”,
“insert”:”insertCount”,
“update”:”updateCount”,
“delete”:”deleteCount”,
“schema_name”:”tableOwner”,
“linkstatus”:”dsgstatus”,
“errcount”:”errorCount”
},
“opermap”:{
“insert”:”I”,
“update”:”U”,
“delete”:”D”,
“ddl”:”DDL”
},
“loaderTime”:1,
“rid”:1,
“useupdatemark”:1,
“sendtype”:1,
“ddl”:0,
“updatetodeleteandinsert”:0,
“ColNameCase”:0,
“jsontype”:”debezium”
}
8 关键字映射
文档更新时间: 2022-11-09 20:40   作者:阿力