一、TiCDC 的安装部署

https://pingkai.cn/download#tidb-community

1.1 环境初始化

创建标准化工作目录,实现程序、数据、日志的规范化存储:

mkdir -p /data/ticdc/{bin,conf,data,log,scripts}

1.2 程序部署

从工具包中提取 cdc 二进制组件:

# 1. 解压工具集总包
tar -zxvf tidb-community-toolkit-v8.5.4-linux-amd64.tar.gz
cd tidb-community-toolkit-v8.5.4-linux-amd64/

# 2. 部署二进制文件至目标路径
tar -zxvf cdc-v8.5.4-linux-amd64.tar.gz
mv cdc /data/ticdc/bin/

1.3 服务启动配置

创建基础配置文件(如无特殊参数调整,保持为空即可):

touch /data/ticdc/conf/cdc.toml

创建服务启动脚本 /data/ticdc/scripts/start_cdc.sh

#!/bin/bash
set -e

DEPLOY_DIR=/data/ticdc
cd "${DEPLOY_DIR}" || exit 1

exec bin/cdc server \
    --addr "0.0.0.0:8300" \
    --advertise-addr "192.168.2.55:8300" \
    --pd "http://192.168.2.53:2379" \
    --data-dir="/data/ticdc/data" \
    --gc-ttl 86400 \
    --config conf/cdc.toml \
    --log-file "/data/ticdc/log/cdc.log" 2>> "/data/ticdc/log/cdc_stderr.log"

关键参数说明:

  • --advertise-addr: TiCDC 节点对外的服务地址,填写部署本组件的服务器 IP。
  • --pd: 源端 TiDB 集群的 PD 节点地址,多个节点间使用逗号分隔。
  • --gc-ttl: 服务在 PD 预留的 GC 存活时间(单位:秒),默认 86400 即 24 小时。

执行脚本并挂载至后台运行:

nohup sh /data/ticdc/scripts/start_cdc.sh &

二、创建 Changefeed 同步数据到 Kafka

2.1 编写同步配置文件

通过配置文件可以灵活定义路由规则(如表过滤、Topic 分发等)。创建 /data/ticdc/conf/changefeed.conf

# --- 全局参数区 (必须在最前面) ---
force-replicate = true

# --- 过滤规则区 ---
[filter]
rules = ['testdb.*']

# --- 路由分发区 ---
[sink]
# 路由分配器:支持根据 schema/table 匹配发送至特定的 Topic 或 Partition
dispatchers = [
  { matcher = ['testdb.*'],
    topic = "testdb_cdc",
    partition = "table" }
]

关键参数说明:

  • force-replicate: 强制同步开关。设为 true 可同步没有主键或唯一索引的表,确保同步链路不中断。

  • rules: 库表过滤规则。定义需要监听的源端库表,支持 * 通配符进行批量匹配。

  • matcher: 路由匹配规则。用于指定该分发策略适用的库表范围,需与过滤规则相对应。

  • topic: 目标 Kafka Topic 名称。支持固定名称或使用 {schema}_{table} 表达式实现动态分发。

  • partition: 分区路由策略。常用 index-value(按主键 Hash)或 table(按表名 Hash)来保证数据变更的顺序性。

更多 Changefeed 参数配置请参考官方文档:TiCDC Changefeed 命令行参数和配置参数 | TiDB 文档中心

2.2 创建同步任务(Changefeed)

使用 --config 参数加载上述配置,启动增量同步任务:

/data/ticdc/bin/cdc cli changefeed create --pd=http://192.168.2.53:2379 --sink-uri="kafka://192.168.2.56:9092/testdb_cdc?protocol=canal-json&partition-num=1" --config="/data/ticdc/conf/changefeed.conf" --changefeed-id testdb-replica-task

关键参数说明:

  • –pd: 指定集群 PD 节点的地址。TiCDC 通过 PD 获取 TiDB 集群的元数据及授时服务。

  • –sink-uri: 定义下游接收端地址。在本例中为 Kafka 地址,包含协议格式(canal-json)与强制分区数(partition-num=1)。

  • –config: 指定配置文件路径。用于引入详细的过滤规则(Filter)和路由分发策略(Dispatcher)。

  • –changefeed-id: 定义同步任务的唯一名称。指定后可方便后续对该任务进行启动、停止或查询操作。

  • protocol: 传输协议。canal-json 是 zcbus 适配最常用的格式,包含完整的增量数据变更信息。

  • partition-num: 强制指定分区数。设为 1 可确保所有变更进入 Kafka 同一个分区,从而实现全局绝对时序。

2.3 任务日常维护

执行维护操作时,需指定 TiCDC 服务地址 (--server)。

操作类型 命令示例
查看任务列表 /data/ticdc/bin/cdc cli changefeed list --server=http://192.168.2.55:8300
查询任务细节 /data/ticdc/bin/cdc cli changefeed query --server=http://192.168.2.55:8300 --changefeed-id [ID]
暂停同步任务 /data/ticdc/bin/cdc cli changefeed pause --server=http://192.168.2.55:8300 --changefeed-id [ID]
恢复同步任务 /data/ticdc/bin/cdc cli changefeed resume --server=http://192.168.2.55:8300 --changefeed-id [ID]
更新任务配置 /data/ticdc/bin/cdc cli changefeed update --server=http://192.168.2.55:8300 --changefeed-id [ID] --config="[PATH]"
删除同步任务 /data/ticdc/bin/cdc cli changefeed remove --server=http://192.168.2.55:8300 --changefeed-id [ID]

三、附录:Kafka 消息格式实例(Canal-JSON)

TiCDC 使用 canal-json 协议进行数据传输,每条消息均包含元数据及业务数据。以下为典型场景下的数据示例:

/data/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.56:9092 --topic testdb_cdc --from-beginning

3.1 INSERT(插入操作)

新行插入时,typeINSERTold 字段为 null

{
    "id": 0,
    "database": "ticdc_test",
    "table": "sync_demo",
    "pkNames": [
        "id"
    ],
    "isDdl": false,
    "type": "INSERT",
    "es": 1767603969383,
    "ts": 1767603970457,
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "age": 4,
        "created_at": 93
    },
    "mysqlType": {
        "age": "int",
        "created_at": "timestamp",
        "id": "int",
        "name": "varchar"
    },
    "old": null,
    "data": [
        {
            "id": "1",
            "name": "Alice",
            "age": "25",
            "created_at": "2026-01-05 17:06:09"
        }
    ]
}

3.2 UPDATE(更新操作)

当行数据发生变更时,data 字段包含新值,old 字段包含变更前的值。

{
    "id": 0,
    "database": "ticdc_test",
    "table": "sync_demo",
    "pkNames": [
        "id"
    ],
    "isDdl": false,
    "type": "UPDATE",
    "es": 1767603987584,
    "ts": 1767603988456,
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "age": 4,
        "created_at": 93
    },
    "mysqlType": {
        "id": "int",
        "name": "varchar",
        "age": "int",
        "created_at": "timestamp"
    },
    "old": [
        {
            "id": "1",
            "name": "Alice",
            "age": "25",
            "created_at": "2026-01-05 17:06:09"
        }
    ],
    "data": [
        {
            "id": "1",
            "name": "Alice",
            "age": "26",
            "created_at": "2026-01-05 17:06:09"
        }
    ]
}

3.3 DELETE(删除操作)

行删除时,typeDELETEdata 字段包含被删除行的完整快照。

{
    "id": 0,
    "database": "ticdc_test",
    "table": "sync_demo",
    "pkNames": [
        "id"
    ],
    "isDdl": false,
    "type": "DELETE",
    "es": 1767603996884,
    "ts": 1767603998056,
    "sql": "",
    "sqlType": {
        "id": 4,
        "name": 12,
        "age": 4,
        "created_at": 93
    },
    "mysqlType": {
        "id": "int",
        "name": "varchar",
        "age": "int",
        "created_at": "timestamp"
    },
    "old": null,
    "data": [
        {
            "id": "1",
            "name": "Alice",
            "age": "26",
            "created_at": "2026-01-05 17:06:09"
        }
    ]
}

3.4 DDL(结构变更操作)

当数据库执行 ALTER/CREATE/DROP 时,isDdl 标记为 true,并包含具体的 SQL 语句。

{
    "id": 0,
    "database": "ticdc_test",
    "table": "sync_demo",
    "pkNames": null,
    "isDdl": true,
    "type": "ALTER",
    "es": 1767604004334,
    "ts": 1767604005265,
    "sql": "ALTER TABLE `sync_demo` ADD COLUMN `email` VARCHAR(100)",
    "sqlType": null,
    "mysqlType": null,
    "data": null,
    "old": null
}

3.5 核心字段说明

  • es: 业务发生时间(Epoch timestamp in milliseconds)。
  • ts: TiCDC 成功解析并发送至 Kafka 的时间。
  • pkNames: 主键列名,用于下游 zcbus 定位唯一记录。
  • mysqlType: 原始字段类型,辅助下游进行精确的数据类型转换。
文档更新时间: 2026-01-08 22:33   作者:wangyanjie