Skip to content

Flink CDC 接入 Doris 实践

Posted on:2022年3月2日

运行环境

Flink version:1.13.3
Flink CDC version:2.2.0
Doris version:PALO-0.15.1-rc09
doris-flink-connector version:1.13.5-2.12-SNAPSHOT

同步方案

选择 DataStream 开发,根据表 Schema 将如下 changelog 转换为 RowData

{
  "before": null,
  "after": {
    "id": 1,
    "name": "name1",
    "age": "1"
  },
  "source": {
    "version": "1.5.4.Final",
    "connector": "mysql",
    "name": "mysql_binlog_source",
    "ts_ms": 1689245998000,
    "snapshot": "false",
    "db": "cc",
    "sequence": null,
    "table": "student",
    "server_id": 1,
    "gtid": null,
    "file": "mysql-bin.000001",
    "pos": 2781,
    "row": 0,
    "thread": null,
    "query": null
  },
  "op": "c",
  "ts_ms": 1689246005928,
  "transaction": null
}
// get data by option
if ("d".equals(op)) {
    data = object.getJSONObject("before");
    rowData.setRowKind(RowKind.DELETE);
} else {
    data = object.getJSONObject("after");
    rowData.setRowKind(RowKind.INSERT);
}

问题列表

  1. Doris 数据中文乱码问题

    # 任务提交参数指定
    -Denv.java.opts="-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8"
  2. A slave with the same server_uuid/server_id as this slave has connected to the maste

    作业里使用的 server id 和其他作业或其他同步工具使用的 server id 冲突了,server id 需要全局唯一,根据并行度设置,比如作业的 source 设置成了四个并发,可以配置 ‘server-id’ = ‘5001-5004’

  3. MySql 数据类型 bigint 和 decimal 乱码

    // 需要设置 debezium 参数
    properties.setProperty("bigint.unsigned.handling.mode", "long");
    properties.setProperty("decimal.handling.mode", "string");
  4. CDC 已经进入增量阶段,需要新增加表(2.2.0 支持)

    MySqlSource.<String>builder().scanNewlyAddedTableEnabled(true)
  5. Doris 连接出现权限问题,无法插入数据

    DorisSink 连接用户至少需要 show backends 权限,随机选择 BE

  6. 导入数据报错 “The connection property zeroDateTimeBehavior only accepts values of the form: exception, round or convertToNull.The value CONVERT_TO_NULL is not in this set”

    修改源码 PooledDataSourceFactory.javaCONVERT_TO_NULL 改为 convertToNull

  7. Source Coordinator Thread already exists. There should never be more than one thread driving the actions of a Source Coordinator

    原因是 jm 的 java.lang.OutOfMemoryError: Java heap space。即使在全量阶段加大资源,进入增量后如果缩小 jm 资源,问题还是存在。参考 jira issue

  8. Failed to deserialize data of EventHeaderV4{timestamp=1542193955000, eventType=GTID, serverId=91111, headerLength=19, dataLength=46, nextPosition=1058898202, flags=0}

    set global slave_net_timeout = 120; (default was 30sec)
    set global thread_pool_idle_timeout = 120;
  9. Producer attempted an operation with an old epoch.Either there is a newer producer with the same transactionalId, or the producer’s transaction has been expired by the broker.

    • 每个算子使用不同的 transactionalId, 每次都给算子指定不会与别人冲突的 name 和 uid 值。 参考
    • 排查是否由于负载太高导致事务超时。
  10. org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs …, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.

    常见原因参考 阿里云 Flink CDC 常见问题 ,还有一种是 Some of the GTIDs needed to replicate have been already purged 导致,这个参考 github issue, 升级 Flink-CDC 版本解决。


参考

  1. Palo 文档 PALO - 预编译版本下载 | Doris (baidu.com)

  2. FAQ(中文) · ververica/flink-cdc-connectors

  3. 阿里云 Flink CDC 常见问题

  4. Debezium FAQ


Next Post
统计 HDFS 小文件