一、TiCDC 的安装部署
https://pingkai.cn/download#tidb-community
1.1 环境初始化
创建标准化工作目录,实现程序、数据、日志的规范化存储:
mkdir -p /data/ticdc/{bin,conf,data,log,scripts}
1.2 程序部署
从工具包中提取 cdc 二进制组件:
# 1. 解压工具集总包
tar -zxvf tidb-community-toolkit-v8.5.4-linux-amd64.tar.gz
cd tidb-community-toolkit-v8.5.4-linux-amd64/
# 2. 部署二进制文件至目标路径
tar -zxvf cdc-v8.5.4-linux-amd64.tar.gz
mv cdc /data/ticdc/bin/
1.3 服务启动配置
创建基础配置文件(如无特殊参数调整,保持为空即可):
touch /data/ticdc/conf/cdc.toml
创建服务启动脚本 /data/ticdc/scripts/start_cdc.sh:
#!/bin/bash
set -e
DEPLOY_DIR=/data/ticdc
cd "${DEPLOY_DIR}" || exit 1
exec bin/cdc server \
--addr "0.0.0.0:8300" \
--advertise-addr "192.168.2.55:8300" \
--pd "http://192.168.2.53:2379" \
--data-dir="/data/ticdc/data" \
--gc-ttl 86400 \
--config conf/cdc.toml \
--log-file "/data/ticdc/log/cdc.log" 2>> "/data/ticdc/log/cdc_stderr.log"
关键参数说明:
--advertise-addr: TiCDC 节点对外的服务地址,填写部署本组件的服务器 IP。--pd: 源端 TiDB 集群的 PD 节点地址,多个节点间使用逗号分隔。--gc-ttl: 服务在 PD 预留的 GC 存活时间(单位:秒),默认86400即 24 小时。
执行脚本并挂载至后台运行:
nohup sh /data/ticdc/scripts/start_cdc.sh &
二、创建 Changefeed 同步数据到 Kafka
2.1 编写同步配置文件
通过配置文件可以灵活定义路由规则(如表过滤、Topic 分发等)。创建 /data/ticdc/conf/changefeed.conf:
# --- 全局参数区 (必须在最前面) ---
force-replicate = true
# --- 过滤规则区 ---
[filter]
rules = ['testdb.*']
# --- 路由分发区 ---
[sink]
# 路由分配器:支持根据 schema/table 匹配发送至特定的 Topic 或 Partition
dispatchers = [
{ matcher = ['testdb.*'],
topic = "testdb_cdc",
partition = "table" }
]
关键参数说明:
force-replicate: 强制同步开关。设为
true可同步没有主键或唯一索引的表,确保同步链路不中断。rules: 库表过滤规则。定义需要监听的源端库表,支持
*通配符进行批量匹配。matcher: 路由匹配规则。用于指定该分发策略适用的库表范围,需与过滤规则相对应。
topic: 目标 Kafka Topic 名称。支持固定名称或使用
{schema}_{table}表达式实现动态分发。partition: 分区路由策略。常用
index-value(按主键 Hash)或table(按表名 Hash)来保证数据变更的顺序性。
更多 Changefeed 参数配置请参考官方文档:TiCDC Changefeed 命令行参数和配置参数 | TiDB 文档中心
2.2 创建同步任务(Changefeed)
使用 --config 参数加载上述配置,启动增量同步任务:
/data/ticdc/bin/cdc cli changefeed create --pd=http://192.168.2.53:2379 --sink-uri="kafka://192.168.2.56:9092/testdb_cdc?protocol=canal-json&partition-num=1" --config="/data/ticdc/conf/changefeed.conf" --changefeed-id testdb-replica-task
关键参数说明:
–pd: 指定集群 PD 节点的地址。TiCDC 通过 PD 获取 TiDB 集群的元数据及授时服务。
–sink-uri: 定义下游接收端地址。在本例中为 Kafka 地址,包含协议格式(
canal-json)与强制分区数(partition-num=1)。–config: 指定配置文件路径。用于引入详细的过滤规则(Filter)和路由分发策略(Dispatcher)。
–changefeed-id: 定义同步任务的唯一名称。指定后可方便后续对该任务进行启动、停止或查询操作。
protocol: 传输协议。
canal-json是 zcbus 适配最常用的格式,包含完整的增量数据变更信息。partition-num: 强制指定分区数。设为
1可确保所有变更进入 Kafka 同一个分区,从而实现全局绝对时序。
2.3 任务日常维护
执行维护操作时,需指定 TiCDC 服务地址 (--server)。
| 操作类型 | 命令示例 |
|---|---|
| 查看任务列表 | /data/ticdc/bin/cdc cli changefeed list --server=http://192.168.2.55:8300 |
| 查询任务细节 | /data/ticdc/bin/cdc cli changefeed query --server=http://192.168.2.55:8300 --changefeed-id [ID] |
| 暂停同步任务 | /data/ticdc/bin/cdc cli changefeed pause --server=http://192.168.2.55:8300 --changefeed-id [ID] |
| 恢复同步任务 | /data/ticdc/bin/cdc cli changefeed resume --server=http://192.168.2.55:8300 --changefeed-id [ID] |
| 更新任务配置 | /data/ticdc/bin/cdc cli changefeed update --server=http://192.168.2.55:8300 --changefeed-id [ID] --config="[PATH]" |
| 删除同步任务 | /data/ticdc/bin/cdc cli changefeed remove --server=http://192.168.2.55:8300 --changefeed-id [ID] |
三、附录:Kafka 消息格式实例(Canal-JSON)
TiCDC 使用 canal-json 协议进行数据传输,每条消息均包含元数据及业务数据。以下为典型场景下的数据示例:
/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.56:9092 --topic testdb_cdc --from-beginning
3.1 INSERT(插入操作)
新行插入时,type 为 INSERT,old 字段为 null。
{
"id": 0,
"database": "ticdc_test",
"table": "sync_demo",
"pkNames": [
"id"
],
"isDdl": false,
"type": "INSERT",
"es": 1767603969383,
"ts": 1767603970457,
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"age": 4,
"created_at": 93
},
"mysqlType": {
"age": "int",
"created_at": "timestamp",
"id": "int",
"name": "varchar"
},
"old": null,
"data": [
{
"id": "1",
"name": "Alice",
"age": "25",
"created_at": "2026-01-05 17:06:09"
}
]
}
3.2 UPDATE(更新操作)
当行数据发生变更时,data 字段包含新值,old 字段包含变更前的值。
{
"id": 0,
"database": "ticdc_test",
"table": "sync_demo",
"pkNames": [
"id"
],
"isDdl": false,
"type": "UPDATE",
"es": 1767603987584,
"ts": 1767603988456,
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"age": 4,
"created_at": 93
},
"mysqlType": {
"id": "int",
"name": "varchar",
"age": "int",
"created_at": "timestamp"
},
"old": [
{
"id": "1",
"name": "Alice",
"age": "25",
"created_at": "2026-01-05 17:06:09"
}
],
"data": [
{
"id": "1",
"name": "Alice",
"age": "26",
"created_at": "2026-01-05 17:06:09"
}
]
}
3.3 DELETE(删除操作)
行删除时,type 为 DELETE,data 字段包含被删除行的完整快照。
{
"id": 0,
"database": "ticdc_test",
"table": "sync_demo",
"pkNames": [
"id"
],
"isDdl": false,
"type": "DELETE",
"es": 1767603996884,
"ts": 1767603998056,
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"age": 4,
"created_at": 93
},
"mysqlType": {
"id": "int",
"name": "varchar",
"age": "int",
"created_at": "timestamp"
},
"old": null,
"data": [
{
"id": "1",
"name": "Alice",
"age": "26",
"created_at": "2026-01-05 17:06:09"
}
]
}
3.4 DDL(结构变更操作)
当数据库执行 ALTER/CREATE/DROP 时,isDdl 标记为 true,并包含具体的 SQL 语句。
{
"id": 0,
"database": "ticdc_test",
"table": "sync_demo",
"pkNames": null,
"isDdl": true,
"type": "ALTER",
"es": 1767604004334,
"ts": 1767604005265,
"sql": "ALTER TABLE `sync_demo` ADD COLUMN `email` VARCHAR(100)",
"sqlType": null,
"mysqlType": null,
"data": null,
"old": null
}
3.5 核心字段说明
es: 业务发生时间(Epoch timestamp in milliseconds)。ts: TiCDC 成功解析并发送至 Kafka 的时间。pkNames: 主键列名,用于下游 zcbus 定位唯一记录。mysqlType: 原始字段类型,辅助下游进行精确的数据类型转换。