Flink CDC 的 debezium-json 格式和 debezium 原生格式是一回事吗?_flinkcdc 与debezium-程序员宅基地

技术标签: flink  CDC数据入湖方案 • 合集  debezium-json  cdc  debezium  大数据专题  格式  

《大数据平台架构与原型实现:数据中台建设实战》 博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

这是一个很容易混淆和误解的问题,值得拿出来讨论对比一下。我们知道 Debezium 是专门用于捕获 CDC 数据的开源框架,它对接了多种数据库,同时也定义了自己的 CDC 数据交换格式,也就是常说的 debezium 格式。而Flink CDC 复用了 Debezium 的部分功能,也就是说:Debezium 是 Flink CDC 的底层采集工具,Flink CDC 的工程依赖会用使用到 Debezium 的 Jar 包,然后 Flink CDC 在 Debezium 基础之上,封装了额外的功能,例如:无锁读取,并发读取(全量数据的读取性能可以水平扩展),断点续传,这些功能是 Debezium 所不具备的,也是 Flink CDC 存在的意义。同时,Flink 还有一种专门的数据格式 debezium-json,从名称上看,它似乎就是 debezium 格式的 json 表达形式,那 debezium-json 格式和 debezium 原生格式是一回事吗?

首先,我们要注意到这样一个细节:当我们使用 Flink SQL 声明一张 mysql-cdc 的源表, mysql-cdc 作为一个 source connector,并不要求指定 format,实际上,它的 format 是不可配置的,因为 Flink CDC 在内部实现依赖 debezium,获得的原始的数据格式就是 debezium 格式。它的格式是不可配置的,也不可见,只有向下游传递数据时,才会涉及到解析和转换的问题。

但这并不意味着 Flink CDC 抽取的原始的 CDC 数据是不可见的,如果我们使用 Flink CDC 的 API 去提取一个数据库的 CDC 数据并直接 Sink 到 Kafka 上(不是用 Flink SQL 定义的一张 Sink 表)时,就可以清楚得看到 Flink CDC 输出的原始数据了。关于这一场景的具体实现,请参考:,而 Flink CDC 提供两种内置的消息反序列化器,分别是:

  • com.ververica.cdc.debezium.StringDebeziumDeserializationSchema
  • com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema

在作用 API 实现时,如果 deserializer 选择了 JsonDebeziumDeserializationSchema,那么输出到 Kafka 中的消息就是标准的 debezium 格式!但要注意区分的是:这是向 Kafka 中写原生的 debezium 数据,而不是向一个 debezium-json 格式的 Flink SQL 表中写数据。

其次,我们还要先澄清一种误解:debezium-json 并不是跟 Flink CDC(例如mysql-cdc)绑定在一起的,作为一种独立的、可描述 changelog 的格式,实际上,它可以应用到任何动态表上,例如:如果上游表是:connector=upsert-kafka,format=json,下游依旧可以使用: connector=kafka,format=debezium-json,关于这一点,可以参考本文的实测 《Flink SQL:debezium-json 格式的表一定是数据库的 CDC 数据吗?》,这个测试给出了这样一个非常明确的结论:

使用 debezium-json 格式的表不一定是数据库的 CDC 数据,但一定是上游动态表的 changelog,然后使用 debezium-json 格式描述。

Flink CDC 从数据库 binlog 中提取数据时使用了 debezium,获得的原始的数据格式也是 debezium 格式,如果是通过 Flink CDC API 直接写到 Kafka 中,是可以看到原生的 debezium 格式的数据的,但如果写入的是一张由 Flink SQL 定义的 debezium 格式的动态表,那么,是看不到原生的 debezium 格式的,向下游写入的格式转换(原生 debezium => debezium-json ) 也是透明的,是根据目标表 DDL 中定义的 Schema 自动地隐式地完成的。

我们还是靠举例和试验来说明这个问题吧。再次看一下 《Flink CDC 与 Kafka 集成:Snapshot 还是 Changelog?Upsert Kafka 还是 Kafka?》 一文的 ”测试组合(1):connector=kafka,format=debezium-json“ 一节给出的案例。

原生 Debezium 格式(样例)


如前所述,只有通过 Flink CDC 的 API 将从数据库中提取的数据 Sink 到 Kafka 中时,才会看到原生的 debezium 格式的数据!这个例子,请移步《Flink CDC 整库 / 多表同步至 Kafka 方案(附源码)》了解详细的实现,得到的数据是这样的:

{
    
  "before": null,
  "after": {
    
    "osci.mysql-server-3.inventory.orders.Value": {
    
      "order_number": 10006,
      "order_date": 16852,
      "purchaser": 1003,
      "quantity": 1,
      "product_id": 107
    }
  },
  "source": {
    
    "version": "2.2.0.Final",
    "connector": "mysql",
    "name": "osci.mysql-server-3",
    "ts_ms": 1705645511000,
    "snapshot": {
    
      "string": "false"
    },
    "db": "inventory",
    "sequence": null,
    "table": {
    
      "string": "orders"
    },
    "server_id": 223344,
    "gtid": null,
    "file": "mysql-bin.000004",
    "pos": 640,
    "row": 0,
    "thread": {
    
      "long": 10
    },
    "query": null
  },
  "op": "c",
  "ts_ms": {
    
    "long": 1705645511455
  },
  "transaction": null
}

使用如下 SQL 创建一个 mysql-cdc 的源表:

SET 'sql-client.execution.result-mode' = 'TABLEAU';

DROP TABLE IF EXISTS orders_mysql_cdc;

CREATE TABLE IF NOT EXISTS orders_mysql_cdc (
    `order_number` INT NOT NULL,
    `order_date` DATE NOT NULL,
    `purchaser` INT NOT NULL,
    `quantity` INT NOT NULL,
    `product_id` INT NOT NULL,
    CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '10.0.13.30',
    'port' = '3307',
    'username' = 'root',
    'password' = 'Admin1234!',
    'database-name' = 'inventory',
    'table-name' = 'orders'
);

那从 Flink CDC 源表提取出来的原始 debezium 数据会在写入下游表时,根据下游表的格式进行透明的自动转换了,所以在整个 pipeline 中是见不到原始 debezium 格式的数据的。

debezium-json 格式(样例)


如果接着上面定义的如下的 orders_mysql_cdc 表,用 Flink SQL 在 Kafka 上再创建了一个 debezium-json 格式的目标表,然后使用 INSERT INTO ... SELECT ... 把源表和目标表的数据流驱动起来:

DROP TABLE IF EXISTS orders_kafka_debezium_json;

CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (
    order_number int,
    order_date   date,
    purchaser    int,
    quantity     int,
    product_id   int
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders_kafka_debezium_json',
    'properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092',
    'properties.group.id' = 'orders_kafka_debezium_json',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'debezium-json'
);

-- 提交持续查询,驱动整个 Pipeline

insert into orders_kafka_debezium_json select * from orders_mysql_cdc;

这时,写入 Kafka 中的 debezium-json 格式的数据是这样的:

{
    
  "before": {
    
    "order_number": 10003,
    "order_date": "2016-02-19",
    "purchaser": 1002,
    "quantity": 2,
    "product_id": 106
  },
  "after": null,
  "op": "d"
}

结论


比较上述两种消息格式就能看出:

debezium-json 格式并不等于原生的 debezium 格式,两者有很多相似之处,都有 before,after,op,debezium-json 格式可用于表达任何动态表的 changelog,与数据库 CDC 数据已无必然的绑定关系。


关联阅读

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/bluishglc/article/details/137651670

智能推荐

课后习题1-数据结构_简述逻辑结构的四种基本关系并画出它们的关系图-程序员宅基地

文章浏览阅读4k次,点赞11次,收藏26次。1.概念:数据:Data,是客观事物的符号表示,是所有能输入到计算机中并被计算机程序处理的符号的总称。数据元素:Data Element,是数据的基本单位,在计算机中常作为一个整体进行考虑和处理,用于完整的描述一个对象。数据项:Data Item,是组成数据元素的、有独立含义的、不可分割的最小单位。数据对象:Data Object,是性质相同的数据元素的集合,是数据的一个子集。数据结构:Data Structure,是相互之间存在一种或多种特定关系的数据元素的集合。逻辑结构:从具体问题抽象出来的_简述逻辑结构的四种基本关系并画出它们的关系图

Compute Shader 语法及函数 Reference for HLSL_hlsl compute shader-程序员宅基地

文章浏览阅读887次。///////////////////////////////////////变量语法使用以下语法规则声明 HLSL 变量。[Storage_Class] [Type_Modifier]Type Name[Index] [: Semantic] [: Packoffset] [: Register]; [Annotations] [= Initial_Value]参数存储 _班级可选的存储类修饰符,它们为编译器提示指定变量范围和生存期;可以按任意顺序指定修饰符。值 说明._hlsl compute shader

一文彻底搞懂 Alertmanager 的告警抑制与静默_alertmanager告警-程序员宅基地

文章浏览阅读4.3k次,点赞4次,收藏16次。一文彻底搞懂 Alertmanager 的告警抑制与静默。_alertmanager告警

如何进行性能优化?这篇360°全方面性能调优(含一线大厂Android端性能优化方案),帮您分分钟解决APP卡顿_论软件的性能优化设计-程序员宅基地

文章浏览阅读5.2k次,点赞8次,收藏8次。什么是性能调优?性能调优就是对计算机硬件、操作系统和应用有相当深入的了解,调节三者之间的关系,实现整个系统(包括硬件、操作系统、应用)的性能最大化,并能不断的满足现有的业务需求。性能优化的目的流畅(解决:卡顿)稳定(解决:内存溢出、崩溃)低耗损(解决:耗电快、流量大、网络慢)小安装包(解决:APK过大)性能优化原则:先优化瓶颈问题;方案简单,尽量不引入更多复杂性,尽量不降低业务体验;满足系统性能要求即可,不引入新的bug。为什么需要性能调优?为了获得更好的系统性能(就是你现_论软件的性能优化设计

实战BULK COLLECT(成批聚合类型)和数组集合type类型is table of 表%rowtype index by binary_integer_bulk collect into 写表-程序员宅基地

文章浏览阅读1.3w次,点赞2次,收藏26次。例1: 批量 查询部门号为 "10" 号的并把它们打印出来 . DECLARE TYPE emp_table_type IS TABLE OF my_emp%ROWTYPE INDEX BY BINARY_INTEGER; v_emp_table emp_table_type; BEGIN SELECT * BULK COLLECT INTO v_emp__bulk collect into 写表

Python网络爬虫使用教程_python爬虫教程-程序员宅基地

文章浏览阅读1.4w次,点赞12次,收藏120次。python爬虫资源抓取--urllib/requests/requests-html、正则表达式、数据解析-Beautiful Soup/lxml/selectolax、自动化爬虫--selenium、爬虫框架--Scrapy/pyspider、模拟登录与验证码识别、autoscraper_python爬虫教程

随便推点

华为实习面试(二)_华为实习业务主管面试-程序员宅基地

文章浏览阅读2.1w次,点赞9次,收藏7次。这是来自一位学长的 (业务主管综合面)4.29下午2:10分,全程20min学长的视角:主管也是真的很nice!我每次回答问题后都给我说谢谢,搞得我都不好意思了,整个过程非常随和,完全没有架子,很耐心的给我解释问题,最后退出还说非常感谢面试华为,体验超好,面完五分钟官网刷新通过,十分钟短信通过。总结一下吧,总的来说,我这次华子的面试准备了很多东西,但是基本没问…整个过程体验非常好,不会让你尴尬的,面试官都大赞!给大家分享面筋,希望对还没面试的小伙伴提供参考,不过目前进了池子,得等很久才能出结果,许愿offe_华为实习业务主管面试

word2vec & 相关系数_word2vec文本相关性-程序员宅基地

文章浏览阅读674次。对文本进行提取,利用结巴分词进行分词,然后进行word2vec训练(维度设置为100),得到每个词的词向量.对于每一个用户,通过其发表的内容,得到用户所使用的词汇,然后求得用户的平均词向量.(词向量和除以词的数量)通过训练集,分别对用户地区,年龄,性别进行建模2017CSDN用户画像竞赛用户内容主题词生成:给定若干用户文档(博客或帖子),为每一篇文档生成3个最合适的主题词。要求生成的主题..._word2vec文本相关性

ubuntu kylin优麒麟中开发c/c++程序-程序员宅基地

文章浏览阅读1.5k次。开发工具:visual studio code(vs code) 一、从官网下载app https://code.visualstudio.com/ 执行安装。 sudo dpkg -i code*.deb 从开始菜单启动程序。 二、设置界面为中文 同时按下ctrl ..._麒麟系统c++编辑器中文版

windows Elasticsearch启动报此处不应有Files\elascsearch-7.8.0\jdk解决办法一_启动elasticsearch报错usage 0f java_home-程序员宅基地

文章浏览阅读779次。这个问题是因为没有配置JAVA_HOME系统变量1、在环境变量里面新建系统变量一般最新的elk里面带有java在jdk或者也可以使用自己安装的,配置完成重启命令行就行了_启动elasticsearch报错usage 0f java_home

【AXI】解读AXI协议原子化访问_axi 原子访问-程序员宅基地

文章浏览阅读5.7k次,点赞20次,收藏80次。解读AXI协议原子化访问的部分,涵盖排他性操作的过程,信号列表,访问要求和为什么从AXI3到AXI4取消了Locked Type等内容。_axi 原子访问

HTML翻页按钮教程:如何用CSS实现漂亮的分页按钮-程序员宅基地

文章浏览阅读1.7k次。CSS实现的一个漂亮分页按钮样式.pagination{overflow:hidden;margin:0;padding:10px 10px 6px 10px;border-top:1px solid #f60;_zoom:1;}.pagination *{display:inline;float:left;margin:0;padding:0;font-size:12px;}.paginatio..._html翻页按钮怎么写

推荐文章

热门文章

相关标签