Debezium日常分享系列之:Debezium2.4版本之用于 MongoDB的Debezium 连接器_debezium 2.4-程序员宅基地

技术标签: Debezium2.4版本  Debezium日常分享系列  1024程序员节  日常分享专栏  MongoDB  Debezium 连接器  

Debezium打通Mongodb数据库数据采集系列文章:

更多debezium技术文章请阅读博主专栏:

Debezium 的 MongoDB 连接器跟踪 MongoDB 副本集或 MongoDB 分片集群以获取数据库和集合中的文档更改,并将这些更改记录为 Kafka 主题中的事件。连接器自动处理分片集群中分片的添加或删除、每个副本集成员资格的更改、每个副本集中的选举以及等待通信问题的解决。

一、综述

MongoDB 的复制机制提供了冗余和高可用性,是在生产环境中运行 MongoDB 的首选方式。 MongoDB 连接器捕获副本集或分片集群中的更改。

MongoDB 副本集由一组服务器组成,这些服务器都具有相同数据的副本,并且复制可确保客户端对副本集主服务器上的文档所做的所有更改都正确应用于其他副本集的服务器(称为辅助服务器)。 MongoDB 复制的工作原理是让主数据库记录其 oplog(或操作日志)中的更改,然后每个辅助数据库读取主数据库的 oplog 并按顺序将所有操作应用到自己的文档中。将新服务器添加到副本集时,该服务器首先对主服务器上的所有数据库和集合执行快照,然后读取主服务器的 oplog 以应用自开始快照以来可能进行的所有更改。当这个新服务器赶上主服务器 oplog 的尾部时,它就成为辅助服务器(并且能够处理查询)。

二、改变流

尽管 Debezium MongoDB 连接器不会成为副本集的一部分,但它使用类似的复制机制来获取 oplog 数据。主要区别在于连接器不直接读取 oplog。相反,它将 oplog 数据的捕获和解码委托给 MongoDB 更改流功能。通过更改流,MongoDB 服务器将集合中发生的更改公开为事件流。 Debezium 连接器监视流,然后将更改传递给下游。连接器第一次检测到副本集时,它会检查 oplog 以获取最后记录的事务,然后执行主数据库和集合的快照。连接器完成数据复制后,它会从之前读取的 oplog 位置开始创建更改流。

当 MongoDB 连接器处理更改时,它会定期记录 oplog 流中事件起源的位置。当连接器停止时,它会记录它处理的最后一个 oplog 流位置,以便在重新启动后它可以从该位置恢复流式传输。换句话说,连接器可以停止、升级或维护,并在一段时间后重新启动,并且始终准确地从中断处继续,而不会丢失任何事件。当然,MongoDB oplog 通常有最大大小上限,因此如果连接器长时间停止,oplog 中的操作可能会在连接器有机会读取它们之前被清除。在这种情况下,重新启动后,连接器会检测到丢失的 oplog 操作,执行快照,然后继续流式传输更改。

MongoDB 连接器还能够很好地容忍副本集的成员资格和领导权的变化、分片集群中分片的添加或删除以及可能导致通信失败的网络问题。连接器始终使用副本集的主节点来流式传输更改,因此当副本集进行选举并且不同的节点成为主节点时,连接器将立即停止流式传输更改,连接到新的主节点,并开始使用新的主节点流式传输更改节点。同样,如果连接器无法与主副本集通信,它会尝试重新连接(使用指数退避,以免淹没网络或副本集)。重新建立连接后,连接器将继续传输其捕获的最后一个事件的更改。通过这种方式,连接器可以动态调整以适应副本集成员资格的变化,并自动处理通信中断。

其他资源:

  • 复制机制
  • 副本集
  • 副本集选举
  • 分片集群
  • 分片添加
  • 分片清除
  • 改变流

三、阅读偏好

可以在连接器属性中指定连接的 MongoDB 读取首选项。用于设置读取首选项的方法取决于 MongoDB 拓扑和 mongodb.connection.mode。

副本集拓扑:

  • 在 mongodb.connection.string 中设置读取首选项。

分片集群拓扑:

  • 根据连接方式设置读取优先级,如下表:

表 1. 根据 mongodb.connection.mode 设置分片集群的读取首选项

连接方式 用于指定读取首选项的属性
sharded mongodb.connection.string
replica_set mongodb.connection.string.shard.params

在分片集群中,连接器首先启动与 mongodb.connection.string 中指定的 mongos 路由器的连接。对于该初始连接,无论连接模式如何,连接器都会遵循 mongodb.connection.string 中指定的读取首选项。当连接模式设置为replica_set时,连接器建立初始路由器连接后,会从路由器的config.shards中检索拓扑信息。然后,它使用检索到的分片地址连接到集群中的各个分片,构建使用 mongodb.connection.string.shard.params 中的连接参数的连接字符串。对于特定于分片的连接,连接器会忽略 mongodb.connection.string 中设置的读取首选项。

四、MongoDB 连接器的工作原理

连接器支持的 MongoDB 拓扑概述对于规划您的应用程序非常有用。

配置和部署 MongoDB 连接器时,它首先连接到种子地址处的 MongoDB 服务器,并确定有关每个可用副本集的详细信息。由于每个副本集都有自己独立的oplog,因此连接器将尝试为每个副本集使用单独的任务。连接器可以限制它将使用的最大任务数,如果没有足够的任务可用,连接器将为每个任务分配多个副本集,尽管该任务仍将为每个副本集使用单独的线程。

针对分片集群运行连接器时,请使用大于副本集数量的tasks.max 值。这将允许连接器为每个副本集创建一个任务,并让 Kafka Connect 协调、分配和管理所有可用工作进程中的任务。

五、支持的 MongoDB 拓扑

MongoDB 连接器支持以下 MongoDB 拓扑:

  • MongoDB 副本集

    • Debezium MongoDB 连接器可以捕获单个 MongoDB 副本集的更改。生产副本集至少需要三个成员。
    • 要将 MongoDB 连接器与副本集结合使用,必须将连接器配置中的mongodb.connection.string 属性的值设置为副本集连接字符串。当连接器准备好开始从 MongoDB 更改流捕获更改时,它会启动连接任务。然后,连接任务使用指定的连接字符串建立与可用副本集成员的连接。

MongoDB 分片集群

  • MongoDB 分片集群包括:

    • 一个或多个分片,每个分片部署为一个副本集;
    • 充当集群配置服务器的单独副本集
    • 客户端连接的一个或多个路由器(也称为 mongos),并将请求路由到适当的分片
    • 要将 MongoDB 连接器与分片集群结合使用,请在连接器配置中,将 mongodb.connection.string 属性的值设置为分片集群连接字符串。

mongodb.connection.string 属性替换了已删除的 mongodb.hosts 属性,该属性用于为早期版本的连接器提供配置服务器副本的主机地址。在当前版本中,使用 mongodb.connection.string 为连接器提供 MongoDB 路由器(也称为 mongos)的地址。

当连接器连接到分片集群时,它会发现有关代表集群中分片的每个副本集的信息。连接器使用单独的任务来捕获每个分片的更改。当在集群中添加或删除分片时,连接器会动态调整任务数量以补偿变化。

MongoDB 独立服务器:

  • MongoDB 连接器无法监视独立 MongoDB 服务器的更改,因为独立服务器没有 oplog。如果独立服务器转换为具有一名成员的副本集,则连接器将起作用。

MongoDB 不建议在生产中运行独立服务器。

六、所需的用户权限

为了从 MongoDB 捕获数据,Debezium 以 MongoDB 用户身份连接到数据库。您为 Debezium 创建的 MongoDB 用户帐户需要特定的数据库权限才能从数据库中读取数据。连接器用户需要以下权限:

  • 从数据库中读取。
  • 运行 ping 命令。

连接器用户可能还需要以下权限:

  • 从 config.shards 系统集合中读取。

数据库读取权限

连接器用户必须能够从所有数据库读取,或从特定数据库读取,具体取决于连接器的 capture.scope 属性的值。根据 capture.scope 设置,向用户分配以下权限之一:

  • capture.scope 设置为部署:授予用户读取任何数据库的权限。
  • capture.scope 设置为数据库:授予用户读取连接器的 capture.target 属性指定的数据库的权限。

使用 MongoDB ping 命令的权限

  • 无论 capture.scope 设置如何,用户都需要权限才能运行 MongoDB ping 命令。

读取 config.shards 集合的权限

  • 对于从分片 MongoDB 集群集群更改且 mongodb.connection.mode 属性设置为replica_set 的连接器,必须配置用户读取 config.shards 系统集合的权限。

七、逻辑连接器名称

连接器配置属性 topic.prefix 用作 MongoDB 副本集或分片集群的逻辑名称。连接器以多种方式使用逻辑名称:作为所有主题名称的前缀,以及在记录每个副本集的更改流位置时作为唯一标识符。

应该为每个 MongoDB 连接器指定一个唯一的逻辑名称,以有意义地描述源 MongoDB 系统。我们建议逻辑名称以字母或下划线字符开头,其余字符为字母数字或下划线。

八、执行快照

当 Debezium 任务开始使用副本集时,它使用连接器的逻辑名称和副本集名称来查找描述连接器先前停止读取更改的位置的偏移量。如果可以找到偏移量并且它仍然存在于 oplog 中,则任务立即从记录的偏移位置开始继续进行流式更改。

但是,如果未找到偏移量,或者 oplog 不再包含该位置,则任务必须首先通过执行快照来获取副本集内容的当前状态。该过程首先记录 oplog 的当前位置并将其记录为偏移量(以及表示快照已启动的标志)。然后,该任务继续复制每个集合,生成尽可能多的线程(最多为 snapshot.max.threads 配置属性的值)以并行执行此工作。连接器为其看到的每个文档记录一个单独的读取事件。每个读取事件都包含对象的标识符、对象的完整状态以及有关找到该对象的 MongoDB 副本集的源信息。源信息还包括一个标志,表示该事件是在快照期间生成的。

此快照将继续,直到复制了与连接器的过滤器匹配的所有集合。如果连接器在任务快照完成之前停止,则连接器重新启动后将再次开始快照。

当连接器执行任何副本集的快照时,尽量避免任务重新分配和重新配置。连接器生成日志消息来报告快照的进度。为了提供最大程度的控制,请为每个连接器运行单独的 Kafka Connect 集群。

九、临时快照

默认情况下,连接器仅在首次启动后运行初始快照操作。在此初始快照之后,在正常情况下,连接器不会重复快照过程。连接器捕获的任何未来更改事件数据仅通过流处理传入。

但是,在某些情况下,连接器在初始快照期间获取的数据可能会过时、丢失或不完整。为了提供重新捕获收集数据的机制,Debezium 包含一个执行临时快照的选项。在 Debezium 环境中发生以下任何更改后,可能需要执行临时快照:

  • 修改连接器配置以捕获一组不同的集合。
  • Kafka 主题已删除,必须重建。
  • 由于配置错误或其他一些问题,会发生数据损坏。

可以通过启动所谓的临时快照为之前捕获快照的集合重新运行快照。特别快照需要使用信令集合。可以通过向 Debezium 信令集合发送信号请求来启动临时快照。

当启动现有集合的临时快照时,连接器会将内容附加到该集合已存在的主题中。如果删除了先前存在的主题,并且启用了自动主题创建,Debezium 可以自动创建主题。

即席快照信号指定要包含在快照中的集合。快照可以捕获数据库的全部内容,也可以仅捕获数据库中集合的子集。

可以通过向信令集合发送执行快照消息来指定要捕获的集合。将执行快照信号的类型设置为增量或阻塞,并提供要包含在快照中的集合的名称,如下表所述:

表 2. 即席执行快照信号记录示例

字段 默认值
type incremental 指定要运行的快照的类型。目前,您可以请求增量快照或阻塞快照。
data-collections N/A 包含与要快照的集合的完全限定名称匹配的正则表达式的数组。名称的格式与 signal.data.collection 配置选项相同。

触发临时增量快照

您可以通过将具有执行快照信号类型的条目添加到信令集合来启动临时增量快照。连接器处理消息后,开始快照操作。快照进程读取第一个和最后一个主键值,并将这些值用作每个集合的起点和终点。根据集合中的条目数和配置的块大小,Debezium 将集合划分为块,并继续对每个块进行快照,一次一个。

触发临时阻塞快照

您可以通过将具有执行快照信号类型的条目添加到信令集合来启动临时阻塞快照。连接器处理消息后,开始快照操作。连接器暂时停止流式传输,然后启动指定集合的​​快照,遵循初始快照期间使用的相同过程。快照完成后,连接器将恢复流式传输。

十、增量快照

为了提供管理快照的灵活性,Debezium 包含一个补充快照机制,称为增量快照。增量快照依赖 Debezium 机制向 Debezium 连接器发送信号。增量快照基于DDD-3设计文档。

在增量快照中,Debezium 不像初始快照那样一次性捕获数据库的完整状态,而是以一系列可配置块的形式分阶段捕获每个集合。您可以指定希望快照捕获的集合以及每个块的大小。块大小确定快照在数据库上的每个提取操作期间收集的行数。增量快照的默认块大小为 1024 行。

随着增量快照的进行,Debezium 使用水印来跟踪其进度,维护其捕获的每个集合行的记录。与标准初始快照过程相比,这种分阶段捕获数据的方法具有以下优点:

  • 您可以与流数据捕获并行运行增量快照,而不是推迟流数据直到快照完成。连接器在整个快照过程中持续从更改日志中捕获近乎实时的事件,并且两个操作都不会阻塞另一个操作。
  • 如果增量快照的进度中断,您可以恢复增量快照而不会丢失任何数据。进程恢复后,快照会从停止点开始,而不是从头开始重新捕获集合。
  • 您可以随时按需运行增量快照,并根据需要重复该过程以适应数据库更新。例如,您可以在修改连接器配置以将集合添加到其 collection.include.list 属性后重新运行快照。

1.增量快照流程

当您运行增量快照时,Debezium 按主键对每个集合进行排序,然后根据配置的块大小将集合拆分为块。逐块工作,然后捕获块中的每个集合行。对于它捕获的每一行,快照都会发出一个 READ 事件。该事件表示块快照开始时行的值。

随着快照的进行,其他进程可能会继续访问数据库,从而可能修改集合记录。为了反映此类更改,INSERT、UPDATE 或 DELETE 操作将照常提交到事务日志。同样,正在进行的 Debezium 流处理继续检测这些更改事件并将相应的更改事件记录发送到 Kafka。

2.Debezium 如何解决具有相同主键的记录之间的冲突

在某些情况下,流处理发出的 UPDATE 或 DELETE 事件的接收顺序不正确。也就是说,在快照捕获包含该行的 READ 事件的块之前,流处理可能会发出一个修改集合行的事件。当快照最终发出该行相应的 READ 事件时,其值已被取代。为了确保按正确的逻辑顺序处理不按顺序到达的增量快照事件,Debezium 采用缓冲方案来解决冲突。仅当快照事件和流式事件之间的冲突得到解决后,Debezium 才会向 Kafka 发送事件记录。

3.快照窗口

为了帮助解决迟到的 READ 事件和修改同一集合行的流式事件之间的冲突,Debezium 采用了所谓的快照窗口。快照窗口划定了增量快照捕获指定收集块的数据的时间间隔。在块的快照窗口打开之前,Debezium 会遵循其通常的行为,并将事件从事务日志直接向下游发送到目标 Kafka 主题。但从特定块的快照打开的那一刻起,直到其关闭,Debezium 都会执行重复数据删除步骤来解决具有相同主键的事件之间的冲突。

对于每个数据收集,Debezium 会发出两种类型的事件,并将它们的记录存储在单个目标 Kafka 主题中。它直接从表中捕获的快照记录作为 READ 操作发出。同时,随着用户继续更新数据集合中的记录,并且更新事务日志以反映每次提交,Debezium 会针对每次更改发出 UPDATE 或 DELETE 操作。

当快照窗口打开时,Debezium 开始处理快照块,它将快照记录传送到内存缓冲区。在快照窗口期间,缓冲区中 READ 事件的主键与传入流事件的主键进行比较。如果未找到匹配项,则流式事件记录将直接发送到 Kafka。如果 Debezium 检测到匹配,它会丢弃缓冲的 READ 事件,并将流式记录写入目标主题,因为流式事件在逻辑上取代静态快照事件。块的快照窗口关闭后,缓冲区仅包含不存在相关事务日志事件的 READ 事件。 Debezium 将这些剩余的 READ 事件发送到集合的 Kafka 主题。

连接器对每个快照块重复该过程。

增量快照需要主键稳定有序。但是,字符串可能无法保证稳定的排序,因为编码和特殊字符可能会导致意外行为(Mongo 排序字符串)。执行增量快照时请考虑使用其他类型的主键。

4.分片集群的增量快照

要将增量快照与分片 MongoDB 集群一起使用,您必须为以下属性设置特定值:

  • 将 mongodb.connection.mode 设置为分片。
  • 将incremental.snapshot.chunk.size设置为足够高的值,以补偿变更流管道增加的复杂性。

5.触发增量快照

目前,启动增量快照的唯一方法是将临时快照信号发送到源数据库上的信令集合。

您可以使用 MongoDB insert() 方法向信令集合提交信号。

Debezium 检测到信号集合中的变化后,它会读取信号,并运行请求的快照操作。

您提交的查询指定要包含在快照中的集合,并且可以选择指定快照操作的类型。目前,快照操作的唯一有效选项是默认值增量。

要指定要包含在快照中的集合,请提供一个列出集合的数据集合数组或用于匹配集合的正则表达式数组,例如,{“data-collections”: [“public.Collection1”, “public.Collection2”]}

增量快照信号的数据收集数组没有默认值。如果数据收集数组为空,Debezium 会检测到不需要执行任何操作,并且不会执行快照。

如果要包含在快照中的集合的名称在数据库、架构或表的名称中包含点 (.),则要将该集合添加到数据集合数组中,必须转义该集合的每个部分名称用双引号引起来。

例如,要包含公共数据库中存在且名称为 My.Collection 的数据集合,请使用以下格式:“public”.“My.Collection”。

先决条件

  • 信令已启用。
    • 源数据库中存在信令数据集合。
    • 信令数据收集在 signal.data.collection 属性中指定。

使用源信令通道触发增量快照

  • 将快照信号文档插入到信令集合中:
<signalDataCollection>.insert({
    "id" : _<idNumber>,"type" : <snapshotType>, "data" : {
    "data-collections" ["<collectionName>", "<collectionName>"],"type": <snapshotType>}});

例如,

db.debeziumSignal.insert({
     
"type" : "execute-snapshot",  
"data" : {
    
"data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 
"type": "incremental"} 
});

命令中的id、type、data参数的取值与信令集合的字段相对应。

示例中的参数说明如下表:

表 3. 用于将增量快照信号发送到信令集合的 MongoDB insert() 命令中的字段描述

1 db.debeziumSignal 指定源数据库上信令集合的完全限定名称。
2 null _id 参数指定指定为信号请求的 id 标识符的任意字符串。前面示例中的 insert 方法省略了可选 _id 参数的使用。由于文档没有显式地为参数分配值,因此 MongoDB 自动分配给文档的任意 id 就成为信号请求的 id 标识符。使用此字符串来标识信令集合中条目的日志消息。 Debezium 不使用此标识符字符串。相反,在快照期间,Debezium 会生成自己的 id 字符串作为水印信号。
3 execute-snapshot 指定类型参数指定信号要触发的操作。
4 data-collections 信号数据字段的必需组件,指定集合名称或正则表达式的数组,以匹配要包含在快照中的集合名称。该数组列出了通过完全限定名称匹配集合的正则表达式,使用与在 signal.data.collection 配置属性中指定连接器信令集合名称相同的格式。
5 incremental 信号数据字段的可选类型组件,指定要运行的快照操作的类型。目前,唯一有效的选项是默认值增量。如果不指定值,连接器将运行增量快照。

以下示例显示了连接器捕获的增量快照事件的 JSON。

示例:增量快照事件消息

{
    
    "before":null,
    "after": {
    
        "pk":"1",
        "value":"New data"
    },
    "source": {
    
        ...
        "snapshot":"incremental"  1
    },
    "op":"r", 2
    "ts_ms":"1620393591654",
    "transaction":null
}
选项 字段名称 描述
1 snapshot 指定要运行的快照操作的类型。目前,唯一有效的选项是默认值增量。在提交到信令集合的 SQL 查询中指定类型值是可选的。如果不指定值,连接器将运行增量快照。
2 op 指定事件类型。快照事件的值为r,表示READ操作。

6.使用Kafka信令通道触发增量快照

您可以向配置的 Kafka 主题发送消息,请求连接器运行临时增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是一个带有类型和数据字段的 JSON 对象。

信号类型为execute-snapshot,数据字段必须有以下字段:

表 4. 执行快照数据字段

字段 默认值
type incremental 要执行的快照的类型。目前 Debezium 仅支持增量类型。
data-collections N/A 一组以逗号分隔的正则表达式,与要包含在快照中的表的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。
additional-condition N/A 一个可选字符串,指定连接器评估的条件,以指定要包含在快照中的记录子集。此属性已弃用,应由附加条件属性替换。
additional-conditions N/A 附加条件的可选数组,指定连接器评估的条件以指定要包含在快照中的记录子集。每个附加条件都是一个对象,指定过滤临时快照捕获的数据的条件。您可以为每个附加条件设置以下参数: data-collection:: 过滤器应用到的集合的完全限定名称。您可以对每个集合应用不同的过滤器。 filter:: 指定数据库记录中必须存在的列值,快照才能包含该列值,例如“color=‘blue’”。您分配给过滤器参数的值与您在为阻塞快照设置 snapshot.select.statement.overrides 属性时可能在 SELECT 语句的 WHERE 子句中指定的值类型相同。在早期 Debezium 版本中,没有为快照信号定义显式过滤器参数;相反,过滤条件是由为现已弃用的附加条件参数指定的值隐含的。

执行快照 Kafka 消息的示例:

Key = `test_connector`

Value = `{
     "type":"execute-snapshot","data": {
     "data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

7.具有附加条件的临时增量快照

Debezium 使用附加条件字段来选择集合内容的子集。

通常,当 Debezium 运行快照时,它会运行 SQL 查询,例如:

SELECT * FROM <tableName> …​.

当快照请求包含附加条件属性时,该属性的数据收集和过滤参数将附加到 SQL 查询中,例如:

SELECT * FROM <data-collection> WHERE <filter> …​.

例如,给定一个包含 id(主键)、颜色和品牌列的产品集合,如果您希望快照仅包含 color=‘blue’ 的内容,则当您请求快照时,您可以添加附加 -用于过滤内容的条件属性:

Key = `test_connector`

Value = `{
     "type":"execute-snapshot","data": {
     "data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{
     "data-collection": "schema1.products" ,"filter":"color='blue'"}]}}`

您可以使用additional-conditions 属性来传递基于多列的条件。例如,使用与上一示例中相同的产品集合,如果您希望快照仅包含产品集合中 color=‘blue’、brand=‘MyBrand’ 的内容,则可以发送以下请求:

Key = `test_connector`

Value = `{
     "type":"execute-snapshot","data": {
     "data-collections": ["schema1.products"], "type": "INCREMENTAL", "additional-conditions": [{
     "data-collection": "schema1.products" ,"filter":"color='blue' AND brand='MyBrand'"}]}}`

8.停止增量快照

您还可以通过向源数据库上的集合发送信号来停止增量快照。您可以通过将文档插入信号集合来提交停止快照信号。 Debezium 检测到信号集合中的变化后,会读取信号,并停止正在进行的增量快照操作。

您提交的查询指定增量快照操作,以及(可选)要删除的当前运行快照的集合。

先决条件

  • 信令已启用。
    • 源数据库中存在信令数据集合。
    • 信令数据收集在 signal.data.collection 属性中指定。

使用源信令通道停止增量快照

  • 将停止快照信号文档插入到信号集合中:
<signalDataCollection>.insert({
    "id" : _<idNumber>,"type" : "stop-snapshot", "data" : {
    "data-collections" ["<collectionName>", "<collectionName>"],"type": "incremental"}});

例如,

db.debeziumSignal.insert({
     
"type" : "stop-snapshot",  
"data" : {
    
"data-collections" ["\"public\".\"Collection1\"", "\"public\".\"Collection2\""], 
"type": "incremental"} 
});

signal命令中的id、type、data参数的取值对应于信令集合的字段。

示例中的参数说明如下表:

表5 向信令集合发送停止增量快照文档的插入命令字段说明

选项 描述
1 db.debeziumSignal 指定源数据库上信令集合的完全限定名称。
2 null 前面示例中的 insert 方法省略了可选 _id 参数的使用。由于文档没有显式地为参数分配值,因此 MongoDB 自动分配给文档的任意 id 就成为信号请求的 id 标识符。使用此字符串来标识信令集合中条目的日志消息。 Debezium 不使用此标识符字符串。
3 stop-snapshot 类型参数指定信号要触发的操作。
4 data-collections 信号数据字段的可选组件,指定集合名称或正则表达式的数组,以匹配要从快照中删除的集合名称。该数组列出了通过完全限定名称匹配集合的正则表达式,使用与在 signal.data.collection 配置属性中指定连接器信令集合名称相同的格式。如果省略数据字段的该组成部分,则该信号将停止正在进行的整个增量快照。
5 incremental 信号数据字段的必需组成部分,指定要停止的快照操作类型。目前,唯一有效的选项是增量选项。如果不指定类型值,则信号无法停止增量快照。

9.使用Kafka信令通道停止增量快照

您可以向配置的 Kafka 信令主题发送信号消息以停止即席增量快照。

Kafka 消息的键必须与 topic.prefix 连接器配置选项的值匹配。

消息的值是一个带有类型和数据字段的 JSON 对象。

信号类型为stop-snapshot,数据字段必须有以下字段:

表 6. 执行快照数据字段

字段 默认值
type incremental 要执行的快照的类型。目前 Debezium 仅支持增量类型。
data-collections N/A 一个可选的逗号分隔正则表达式数组,与要包含在快照中的表的完全限定名称相匹配。使用与 signal.data.collection 配置选项所需的格式相同的格式指定名称。

以下示例显示了典型的停止快照 Kafka 消息:

Key = `test_connector`

Value = `{
     "type":"stop-snapshot","data": {
     "data-collections": ["schema1.table1", "schema1.table2"], "type": "INCREMENTAL"}}`

10.阻止快照

为了在管理快照方面提供更大的灵活性,Debezium 包含一个补充的临时快照机制,称为阻塞快照。阻止快照依赖 Debezium 机制向 Debezium 连接器发送信号。

阻塞快照的行为就像初始快照一样,只是您可以在运行时触发它。

在以下情况下,您可能希望运行阻塞快照而不是使用标准初始快照进程:

  • 您添加了一个新集合,并且希望在连接器运行时完成快照。
  • 您添加了一个大型集合,并且希望快照在比增量快照更短的时间内完成。

11.阻塞快照进程

当您运行阻塞快照时,Debezium 会停止流式传输,然后启动指定集合的​​快照,遵循初始快照期间使用的相同流程。快照完成后,流将恢复。

12.配置快照

您可以在信号的数据组件中设置以下属性:

  • data-collections:指定哪些集合必须是快照
  • 附加条件:您可以为不同的集合指定不同的过滤器。
    • data-collection 属性是要应用过滤器的集合的完全限定名称。
    • 过滤器属性将具有与 snapshot.select.statement.overrides 中使用的相同值

例如:

{
    "type": "blocking", "data-collections": ["schema1.table1", "schema1.table2"], "additional-conditions": [{
    "data-collection": "schema1.table1", "filter": "SELECT * FROM [schema1].[table1] WHERE column1 = 0 ORDER BY column2 DESC"}, {
    "data-collection": "schema1.table2", "filter": "SELECT * FROM [schema1].[table2] WHERE column2 > 0"}]}

13.可能重复

发送信号以触发快照的时间与流停止和快照开始的时间之间可能存在延迟。由于此延迟,在快照完成后,连接器可能会发出一些与快照捕获的记录重复的事件记录。

十一、流变化

副本集的连接器任务记录偏移量后,它使用该偏移量来确定 oplog 中应开始流式传输更改的位置。然后,该任务(取决于配置)要么连接到副本集的主节点,要么连接到副本集范围的更改流并从该位置开始流式传输更改。它处理所有创建、插入和删除操作,并将它们转换为 Debezium 更改事件。每个更改事件都包含 oplog 中找到操作的位置,并且连接器会定期将其记录为最近的偏移量。记录偏移量的时间间隔由 offset.flush.interval.ms 控制,这是 Kafka Connect 工作线程配置属性。

当连接器正常停止时,会记录最后处理的偏移量,以便在重新启动时,连接器将准确地从其停止的位置继续。但是,如果连接器的任务意外终止,则任务可能在最后记录偏移量之后但在记录最后偏移量之前处理并生成事件;重新启动后,连接器从最后记录的偏移量开始,可能会生成一些与崩溃之前生成的事件相同的事件。

注意:当 Kafka 管道中的所有组件正常运行时,Kafka 消费者只会接收每条消息一次。然而,当出现问题时,Kafka 只能保证消费者至少收到每条消息一次。为了避免意外结果,消费者必须能够处理重复的消息。

如前所述,连接器任务始终使用副本集的主节点来传输来自 oplog 的更改,从而确保连接器尽可能看到最新的操作,并且能够以比辅助节点更低的延迟捕获更改。代替使用。当副本集选择新的主节点时,连接器立即停止流式传输更改,连接到新的主节点,并开始从新主节点的同一位置流式传输更改。同样,如果连接器在与副本集成员通信时遇到任何问题,它会尝试使用指数退避来重新连接,以免淹没副本集,并且一旦连接,它就会从上次停止的位置继续流式传输更改。通过这种方式,连接器能够动态调整以适应副本集成员资格的变化并自动处理通信故障。

总而言之,MongoDB 连接器在大多数情况下都会继续运行。通信问题可能会导致连接器等待问题解决。

十二、原像支持

在 MongoDB 6.0 及更高版本中,您可以配置更改流以发出文档的原像状态,以填充 MongoDB 更改事件的 before 字段。要在 MongoDB 中使用原像,您必须使用 db.createCollection()、create 或 collMod 为集合设置changeStreamPreAndPostImages。要使 Debezium MongoDB 能够在更改事件中包含原像,请将连接器的 capture.mode 设置为 *_with_pre_image 选项之一。

注意:MongoDB 更改流事件的大小限制

MongoDB 更改流事件的大小限制为 16 MB。因此,原像的使用增加了超过该阈值的可能性,这可能导致失败。

十三、主题名称

MongoDB 连接器将每个集合中文档的所有插入、更新和删除操作的事件写入单个 Kafka 主题。 Kafka 主题的名称始终采用逻辑名称.数据库名称.集合名称的形式,其中逻辑名称是使用 topic.prefix 配置属性指定的连接器的逻辑名称,数据库名称是发生操作的数据库的名称,集合名称是受影响文档所在的 MongoDB 集合的名称。

例如,考虑一个 MongoDB 副本集,其库存数据库包含四个集合:产品、现有产品、客户和订单。如果监控此数据库的连接器被赋予了fulfillment的逻辑名称,那么连接器将生成关于这四个 Kafka 主题的事件:

  • fulfillment.inventory.products
  • fulfillment.inventory.products_on_hand
  • fulfillment.inventory.customers
  • fulfillment.inventory.orders

请注意,主题名称不包含副本集名称或分片名称。因此,对分片集合(其中每个分片包含集合文档的子集)的所有更改都将转到同一个 Kafka 主题。

您可以将 Kafka 设置为根据需要自动创建主题。如果没有,则必须在启动连接器之前使用 Kafka 管理工具创建主题。

十四、分区

MongoDB 连接器不会明确确定如何对事件主题进行分区。相反,它允许 Kafka 根据事件键确定如何对主题进行分区。您可以通过在 Kafka Connect 工作配置中定义 Partitioner 实现的名称来更改 Kafka 的分区逻辑。

Kafka 仅维护写入单个主题分区的事件的总顺序。按键对事件进行分区确实意味着具有相同键的所有事件始终进入同一分区。这可确保特定文档的所有事件始终完全有序。

十五、交易元数据

Debezium 可以生成代表事务元数据边界的事件并丰富变更数据事件消息。

Debezium 接收交易元数据的时间限制
Debezium 仅注册和接收部署连接器后发生的事务的元数据。部署连接器之前发生的事务的元数据不可用。

对于每笔交易的 BEGIN 和 END,Debezium 都会生成一个包含以下字段的事件:

状态:

  • 开始或结束

ID:

  • 唯一交易标识符的字符串表示形式。

event_count(对于 END 事件):

  • 事务发出的事件总数。

data_collections(对于 END 事件):

  • data_collection 和 event_count 对的数组,提供源自给定数据集合的更改所发出的事件数。

以下示例显示了一条典型消息:

{
    
  "status": "BEGIN",
  "id": "1462833718356672513",
  "event_count": null,
  "data_collections": null
}

{
    
  "status": "END",
  "id": "1462833718356672513",
  "event_count": 2,
  "data_collections": [
    {
    
      "data_collection": "rs0.testDB.collectiona",
      "event_count": 1
    },
    {
    
      "data_collection": "rs0.testDB.collectionb",
      "event_count": 1
    }
  ]
}

除非通过 topic.transaction 选项覆盖,否则事务事件将写入名为 <topic.prefix>.transaction 的主题。

十六、变更数据事件丰富

启用事务元数据后,数据消息信封将通过新的事务字段进行丰富。该字段以字段组合的形式提供有关每个事件的信息:

ID:

  • 唯一交易标识符的字符串表示形式。

总订单数:

  • 该事件在事务生成的所有事件中的绝对位置。

数据收集顺序:

  • 事件在事务发出的所有事件中的每个数据收集位置。

以下是消息的示例:

{
    
  "after": "{
    \"_id\" : {
    \"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"[email protected]\"}",
  "source": {
    
...
  },
  "op": "c",
  "ts_ms": "1580390884335",
  "transaction": {
    
    "id": "1462833718356672513",
    "total_order": "1",
    "data_collection_order": "1"
  }
}

十七、数据变更事件

Debezium MongoDB 连接器为插入、更新或删除数据的每个文档级操作生成数据更改事件。每个事件都包含一个键和一个值。键和值的结构取决于更改的集合。

Debezium 和 Kafka Connect 是围绕连续的事件消息流而设计的。然而,这些事件的结构可能会随着时间的推移而改变,这对消费者来说可能很难处理。为了解决这个问题,每个事件都包含其内容的架构,或者,如果您使用架构注册表,则还包含消费者可用于从注册表获取架构的架构 ID。这使得每个事件都是独立的。

以下 JSON 框架显示了更改事件的基本四个部分。但是,您选择在应用程序中使用的 Kafka Connect 转换器的配置方式决定了这四个部分在更改事件中的表示。仅当您配置转换器来生成模式字段时,模式字段才会处于更改事件中。同样,仅当您配置转换器来生成事件键和事件负载时,事件键和事件负载才会出现在更改事件中。如果您使用 JSON 转换器并将其配置为生成所有四个基本更改事件部分,则更改事件具有以下结构:

{
    
 "schema": {
     1
   ...
  },
 "payload": {
     2
   ...
 },
 "schema": {
     3
   ...
 },
 "payload": {
     4
   ...
 },
}

表 7. 变更事件基本内容概述

选项 字段名称 描述
1 schema 第一个架构字段是事件键的一部分。它指定了一个 Kafka Connect 架构,该架构描述了事件键的有效负载部分中的内容。换句话说,第一个模式字段描述了已更改的文档的键的结构。
2 payload 第一个有效负载字段是事件键的一部分。它具有先前架构字段描述的结构,并且包含已更改文档的键。
3 schema 第二个架构字段是事件值的一部分。它指定 Kafka Connect 架构,该架构描述事件值的有效负载部分中的内容。换句话说,第二个架构描述了已更改的文档的结构。通常,此模式包含嵌套模式。
4 payload 第二个有效负载字段是事件值的一部分。它具有先前架构字段描述的结构,并且包含已更改文档的实际数据。

默认情况下,连接器将事件记录流更改为名称与事件的原始集合相同的主题。

MongoDB 连接器确保所有 Kafka Connect 架构名称都遵循 Avro 架构名称格式。这意味着逻辑服务器名称必须以拉丁字母或下划线开头,即 a-z、A-Z 或 _。逻辑服务器名称中的每个剩余字符以及数据库和集合名称中的每个字符都必须是拉丁字母、数字或下划线,即 a-z、A-Z、0-9 或 _。如果存在无效字符,则将其替换为下划线字符。

如果逻辑服务器名称、数据库名称或集合名称包含无效字符,并且唯一区分名称的字符无效并用下划线替换,则可能会导致意外冲突。

十八、更改事件键

更改事件的键包含已更改文档的键的架构和已更改文档的实际键。对于给定的集合,模式及其相应的负载都包含单个 id 字段。该字段的值是文档的标识符,表示为从 MongoDB 扩展 JSON 序列化严格模式派生的字符串。

考虑一个逻辑名称为“fulfillment”的连接器、一个包含库存数据库的副本集以及一个包含如下文档的客户集合。

示例文档

{
    
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "[email protected]"
}

更改事件键示例

捕获客户集合更改的每个更改事件都具有相同的事件键架构。只要客户集合具有先前的定义,捕获客户集合更改的每个更改事件都具有以下关键结构。在 JSON 中,它看起来像这样:

{
    
  "schema": {
     1
    "type": "struct",
    "name": "fulfillment.inventory.customers.Key", 2
    "optional": false, 3
    "fields": [ 4
      {
    
        "field": "id",
        "type": "string",
        "optional": false
      }
    ]
  },
  "payload": {
     5
    "id": "1004"
  }
}

表 8. 更改事件键的说明

选项 字段 描述
1 schema 密钥的架构部分指定 Kafka Connect 架构,该架构描述密钥的有效负载部分中的内容。
2 fulfillment.inventory.customers.Key 定义密钥有效负载结构的架构名称。此架构描述了已更改文档的键的结构。键架构名称的格式为连接器名称.数据库名称.集合名称.Key。在这个例子中:fulfillment 是生成此事件的连接器的名称。inventory 是包含已更改的集合的数据库。customers 是包含已更新文档的集合。
3 optional 指示事件键是否必须在其负载字段中包含值。在此示例中,需要密钥有效负载中的值。当文档没有密钥时,密钥有效负载字段中的值是可选的。
4 fields 指定负载中预期的每个字段,包括每个字段的名称、类型以及是否必需。
5 payload 包含为其生成此更改事件的文档的键。在此示例中,键包含一个字符串类型的 id 字段,其值为 1004。

此示例使用带有整数标识符的文档,但任何有效的 MongoDB 文档标识符都以相同的方式工作,包括文档标识符。对于文档标识符,事件键的 Payload.id 值是一个字符串,表示更新文档的原始 _id 字段,作为使用严格模式的 MongoDB 扩展 JSON 序列化。下表提供了如何表示不同类型的 _id 字段的示例。

表 9. 事件密钥有效负载中表示文档 _id 字段的示例

类型 MongoDB _id Value Key’s payload
Integer 1234 { “id” : “1234” }
Float 12.34 { “id” : “12.34” }
String “1234” { “id” : "“1234"” }
Document { “hi” : “kafka”, “nums” : [10.0, 100.0, 1000.0] } { “id” : “{“hi” : “kafka”, “nums” : [10.0, 100.0, 1000.0]}” }
ObjectId ObjectId(“596e275826f08b2730779e1f”) { “id” : “{”$oid" : “596e275826f08b2730779e1f”}" }
Binary BinData(“a2Fma2E=”,0) { “id” : “{“KaTeX parse error: Expected group as argument to '\"' at position 9: binary\" ̲: \"a2Fma2E=\",…type” : “00”}” }

十九、更改事件值

更改事件中的值比键稍微复杂一些。与键一样,值也具有模式部分和有效负载部分。模式部分包含描述有效负载部分的 Envelope 结构的模式,包括其嵌套字段。创建、更新或删除数据的操作的更改事件都具有带有信封结构的值有效负载。

考虑用于显示更改事件键示例的相同示例文档:

示例文档

{
    
  "_id": 1004,
  "first_name": "Anne",
  "last_name": "Kretchmar",
  "email": "[email protected]"
}

针对每种事件类型描述了对此文档的更改的更改事件的值部分:

  • 创建事件
  • 更新事件
  • 删除事件
  • 墓碑事件

二十、创建事件

以下示例显示连接器为在客户集合中创建数据的操作生成的更改事件的值部分:

{
    
    "schema": {
     1
      "type": "struct",
      "fields": [
        {
    
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 2
          "version": 1,
          "field": "after"
        },
        {
    
          "type": "string",
          "optional": true,
          "name": "io.debezium.data.Json", 
          "version": 1,
          "field": "patch"
        },
        {
    
          "type": "struct",
          "fields": [
            {
    
              "type": "string",
              "optional": false,
              "field": "version"
            },
            {
    
              "type": "string",
              "optional": false,
              "field": "connector"
            },
            {
    
              "type": "string",
              "optional": false,
              "field": "name"
            },
            {
    
              "type": "int64",
              "optional": false,
              "field": "ts_ms"
            },
            {
    
              "type": "boolean",
              "optional": true,
              "default": false,
              "field": "snapshot"
            },
            {
    
              "type": "string",
              "optional": false,
              "field": "db"
            },
            {
    
              "type": "string",
              "optional": false,
              "field": "rs"
            },
            {
    
              "type": "string",
              "optional": false,
              "field": "collection"
            },
            {
    
              "type": "int32",
              "optional": false,
              "field": "ord"
            },
            {
    
              "type": "int64",
              "optional": true,
              "field": "h"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.mongo.Source", 3
          "field": "source"
        },
        {
    
          "type": "string",
          "optional": true,
          "field": "op"
        },
        {
    
          "type": "int64",
          "optional": true,
          "field": "ts_ms"
        }
      ],
      "optional": false,
      "name": "dbserver1.inventory.customers.Envelope" 4
      },
    "payload": {
     5
      "after": "{
    \"_id\" : {
    \"$numberLong\" : \"1004\"},\"first_name\" : \"Anne\",\"last_name\" : \"Kretchmar\",\"email\" : \"[email protected]\"}", 6
      "source": {
     7
        "version": "2.4.0.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 31,
        "h": 1546547425148721999
      },
      "op": "c",        8
      "ts_ms": 1558965515240      9
    }
  }

表 10. 创建事件值字段的描述

选项 字段名称 描述
1 schema 值的架构,描述值有效负载的结构。连接器为特定集合生成的每个更改事件中,更改事件的值架构都是相同的。
2 name 在架构部分中,每个名称字段指定值的有效负载中字段的架构。io.debezium.data.Json 是有效负载的 after、patch 和 filter 字段的架构。此架构特定于客户集合。创建事件是唯一一种包含 after 字段的事件。更新事件包含过滤字段和补丁字段。删除事件包含过滤字段,但不包含 after 字段或 patch 字段。
3 name io.debezium.connector.mongo.Source 是有效负载源字段的架构。此架构特定于 MongoDB 连接器。连接器将其用于它生成的所有事件。
4 name dbserver1.inventory.customers.Envelope 是负载整体结构的架构,其中 dbserver1 是连接器名称,inventory 是数据库,customers 是集合。此架构特定于集合。
5 payload 该值是实际数据。这是更改事件提供的信息。事件的 JSON 表示形式可能比它们描述的文档大得多。这是因为 JSON 表示必须包含消息的架构和有效负载部分。但是,通过使用 Avro 转换器,您可以显着减小连接器流式传输到 Kafka 主题的消息的大小。
6 after 一个可选字段,指定事件发生后文档的状态。在此示例中,after 字段包含新文档的 _id、first_name、last_name 和 email 字段的值。之后的值始终是一个字符串。按照惯例,它包含文档的 JSON 表示形式。当 capture.mode 选项设置为change_streams_update_full时,MongoDB oplog条目仅包含_create_事件和更新事件的文档的完整状态;换句话说,无论 capture.mode 选项如何,create 事件是唯一包含 after 字段的事件。
7 source 描述事件源元数据的必填字段。此字段包含可用于将此事件与其他事件进行比较的信息,包括事件的来源、事件发生的顺序以及事件是否属于同一事务的一部分。源元数据包括:debezium版本。生成事件的连接器的名称。MongoDB 副本集的逻辑名称,它形成生成事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。包含新文档的集合和数据库的名称。如果事件是快照的一部分。数据库中发生更改的时间戳以及时间戳内事件的序号。MongoDB操作的唯一标识符(oplog事件中的h字段)。MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符,以防在事务内执行更改(仅限更改流捕获模式)。
8 op 强制字符串,描述导致连接器生成事件的操作类型。在此示例中,c 表示该操作创建了一个文档。有效值为:c=创建u=更新d = 删除r = 读取(仅适用于快照)
9 ts_ms 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。

二十一、更新事件

更改流捕获模式

示例客户集合中更新的更改事件的值与该集合的创建事件具有相同的架构。同样,事件值的有效负载具有相同的结构。但是,事件值有效负载在更新事件中包含不同的值。仅当 capture.mode 选项设置为change_streams_update_full 时,更新事件才包含后值。如果 capture.mode 选项设置为 *_with_pre_image 选项之一,则提供之前值。在本例中,有一个新的结构化字段 updateDescription 以及一些附加字段:

  • UpdatedFields 是一个字符串字段,其中包含已更新文档字段及其值的 JSON 表示形式
  • removedFields 是从文档中删除的字段名称的列表
  • truncatedArrays 是文档中被截断的数组列表

以下是连接器为客户集合中的更新生成的事件中的更改事件值的示例:

{
    
    "schema": {
     ... },
    "payload": {
    
      "op": "u",  1
      "ts_ms": 1465491461815, 2
      "before":"{
    \"_id\": {
    \"$numberLong\": \"1004\"},\"first_name\": 3\"unknown\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}", 
      "after":"{
    \"_id\": {
    \"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}", 4
      "updateDescription": {
    
        "removedFields": null,
        "updatedFields": "{
    \"first_name\": \"Anne Marie\"}", 5
        "truncatedArrays": null
      },
      "source": {
     6
        "version": "2.4.0.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": false,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 1,
        "h": null,
        "tord": null,
        "stxnid": null,
        "lsid":"{
    \"id\": {
    \"$binary\": \"FA7YEzXgQXSX9OxmzllH2w==\",\"$type\": \"04\"},\"uid\": {
    \"$binary\": \"47DEQpj8HBSa+/TImW+5JCeuQeRkm5NMpJWZG3hSuFU=\",\"$type\": \"00\"}}",
        "txnNumber":1
      }
    }
  }

表 11. 更新事件值字段说明

选项 字段名 描述
1 op 强制字符串,描述导致连接器生成事件的操作类型。在此示例中,u 表示该操作更新了文档。
2 ts_ms 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。
3 before 包含更改前实际 MongoDB 文档的 JSON 字符串表示形式。 + 如果捕获模式未设置为 *_with_preimage 选项之一,则更新事件值不包含 before 字段。
4 after 包含实际 MongoDB 文档的 JSON 字符串表示形式。如果捕获模式未设置为change_streams_update_full,则更新事件值不包含after字段
5 updatedFields 包含文档更新的字段值的 JSON 字符串表示形式。在此示例中,更新将first_name 字段更改为新值。
6 source 描述事件源元数据的必填字段。该字段包含与同一集合的创建事件相同的信息,但值不同,因为该事件来自 oplog 中的不同位置。源元数据包括:debezium版本。生成事件的连接器的名称。MongoDB 副本集的逻辑名称,它形成生成事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。包含更新文档的集合和数据库的名称。如果事件是快照的一部分。数据库中发生更改的时间戳以及时间戳内事件的序号。MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符(以防在事务内执行更改)。

注意:
事件中的后值应作为文档的时间点值进行处理。该值不是动态计算的,而是从集合中获取的。因此,如果多个更新紧密地一个接一个地进行,则所有更新更新事件将包含相同的后值,该后值将表示存储在文档中的最后一个值。

如果您的应用程序依赖于渐变演变,那么您应该仅依赖 updateDescription。

二十二、删除事件

删除更改事件中的值与同一集合的创建和更新事件具有相同的架构部分。删除事件中的有效负载部分包含与同一集合的创建和更新事件不同的值。特别是,删除事件既不包含后值也不包含 updateDescription 值。以下是客户集合中文档的删除事件的示例:

{
    
    "schema": {
     ... },
    "payload": {
    
      "op": "d", 1
      "ts_ms": 1465495462115, 2
      "before":"{
    \"_id\": {
    \"$numberLong\": \"1004\"},\"first_name\": \"Anne Marie\",\"last_name\": \"Kretchmar\",\"email\": \"[email protected]\"}", 3
      "source": {
     4
        "version": "2.4.0.Final",
        "connector": "mongodb",
        "name": "fulfillment",
        "ts_ms": 1558965508000,
        "snapshot": true,
        "db": "inventory",
        "rs": "rs0",
        "collection": "customers",
        "ord": 6,
        "h": 1546547425148721999
      }
    }
  }

表 12. 删除事件值字段说明

选项 字段值 描述
1 op 描述操作类型的强制字符串。 op字段值为d,表示该文档已被删除。
2 ts_ms 显示连接器处理事件的时间的可选字段。该时间基于运行 Kafka Connect 任务的 JVM 中的系统时钟。在源对象中,ts_ms 表示数据库中进行更改的时间。通过将payload.source.ts_ms的值与payload.ts_ms的值进行比较,您可以确定源数据库更新与Debezium之间的滞后。
3 before 包含更改前实际 MongoDB 文档的 JSON 字符串表示形式。 + 如果捕获模式未设置为 *_with_preimage 选项之一,则更新事件值不包含 before 字段。
4 source 描述事件源元数据的必填字段。该字段包含与同一集合的创建或更新事件相同的信息,但值不同,因为该事件来自 oplog 中的不同位置。源元数据包括:debezium版本。生成事件的连接器的名称。MongoDB 副本集的逻辑名称,它形成生成事件的命名空间,并在连接器写入的 Kafka 主题名称中使用。包含已删除文档的集合和数据库的名称。如果事件是快照的一部分。数据库中发生更改的时间戳以及时间戳内事件的序号。MongoDB操作的唯一标识符(oplog事件中的h字段)。MongoDB 会话 lsid 和事务号 txnNumber 的唯一标识符,以防在事务内执行更改(仅限更改流捕获模式)。

二十三、墓碑事件

MongoDB 连接器事件旨在与 Kafka 日志压缩配合使用。只要保留每个键的最新消息,日志压缩就可以删除一些较旧的消息。这使得 Kafka 可以回收存储空间,同时确保主题包含完整的数据集并且可用于重新加载基于键的状态。

二十四、设置mongodb

MongoDB 连接器使用 MongoDB 的更改流来捕获更改,因此该连接器仅适用于 MongoDB 副本集或分片集群,其中每个分片都是一个单独的副本集。有关设置副本集或分片集群的信息,请参阅 MongoDB 文档。另外,请务必了解如何使用副本集启用访问控制和身份验证。

您还必须有一个 MongoDB 用户,该用户具有适当的角色来读取可以读取操作日志的管理数据库。此外,用户还必须能够读取分片集群的配置服务器中的配置数据库,并且必须具有 listDatabases 权限操作。当使用变更流(默认)时,用户还必须具有集群范围的权限操作 find 和 changeStream。

当您打算使用原像并填充 before 字段时,您需要首先使用 db.createCollection()、create 或 collMod 为集合启用changeStreamPreAndPostImages。

云中的 MongoDB

您可以将 MongoDB 的 Debezium 连接器与 MongoDB Atlas 结合使用。请注意,MongoDB Atlas 仅支持通过 SSL 的安全连接,即 +mongodb.ssl.enabled 连接器选项必须设置为 true。

二十五、最佳 Oplog 配置

Debezium MongoDB 连接器读取更改流以获取副本集的 oplog 数据。因为 oplog 是一个固定大小、有上限的集合,所以如果它超过其最大配置大小,它就会开始覆盖其最旧的条目。如果连接器因任何原因停止,当它重新启动时,它会尝试从最后一个 oplog 流位置恢复流式传输。但是,如果从 oplog 中删除了最后一个流位置,则根据连接器的 snapshot.mode 属性中指定的值,连接器可能无法启动,并报告无效的恢复令牌错误。如果发生故障,您必须创建一个新的连接器,以使 Debezium 能够继续从数据库捕获记录。如果 snapshot.mode 设置为初始,连接器在长时间停止后会失败。

为了确保 oplog 保留 Debezium 恢复流式传输所需的偏移值,您可以使用以下方法之一:

  • 增加 oplog 的大小。根据您的典型工作负载,将 oplog 大小设置为大于每小时 oplog 条目峰值数的值。
  • 增加 oplog 条目保留的最短小时数(MongoDB 4.4 及更高版本)。此设置是基于时间的,这样即使 oplog 达到其最大配置大小,也保证过去 n 小时内的条目可用。尽管这通常是首选选项,但对于具有接近容量的高工作负载的集群,请指定最大 oplog 大小。

为了帮助防止与丢失 oplog 条目相关的故障,跟踪报告复制行为的指标并优化 oplog 大小以支持 Debezium 非常重要。特别是,您应该监视 Oplog GB/Hour 和 Replication Oplog Window 的值。如果 Debezium 离线的时间间隔超过了复制 oplog 窗口的值,并且主 oplog 的增长速度快于 Debezium 消耗条目的速度,则可能会导致连接器故障。

有关如何监控这些指标的信息,请参阅 MongoDB 文档。

最好将最大 oplog 大小设置为基于 oplog 的预期每小时增长(Oplog GB/小时)的值,乘以解决 Debezium 故障可能需要的时间。

那是,

Oplog GB/Hour X average reaction time to Debezium failure

例如,如果oplog大小限制设置为1GB,并且oplog每小时增长3GB,则oplog条目每小时会被清除3次。如果 Debezium 在这段时间内失败,它的最后一个 oplog 位置可能会被删除。

如果 oplog 以 3GB/小时的速度增长,并且 Debezium 离线两个小时,那么您可以将 oplog 大小设置为 3GB/小时 X 2 小时,即 6GB。

二十六、部署

下载mongodb数据库的debezium2.4版本插件:

部署加载mongodb数据库debezium2.4版本的详细步骤:

二十七、MongoDB 连接器配置示例

以下是连接器实例的配置示例,该实例从 192.168.99.100 端口 27017 上的 MongoDB 副本集 rs0 捕获数据,我们在逻辑上将其命名为 fullfillment。通常,您可以通过设置连接器可用的配置属性在 JSON 文件中配置 Debezium MongoDB 连接器。

您可以选择为特定 MongoDB 副本集或分片集群生成事件。或者,您可以过滤掉不需要的集合。

{
    
  "name": "inventory-connector", 1
  "config": {
    
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 2
    "mongodb.connection.string": "mongodb://192.168.99.100:27017/?replicaSet=rs0", 3
    "topic.prefix": "fullfillment", 4
    "collection.include.list": "inventory[.]*" 5
  }
}
  • 1.当我们向 Kafka Connect 服务注册连接器时的名称。
  • 2.MongoDB 连接器类的名称。
  • 3.用于连接到 MongoDB 副本集的连接字符串。
  • 4.MongoDB 副本集的逻辑名称,它形成生成事件的命名空间,并用于连接器写入的所有 Kafka 主题的名称、Kafka Connect 架构名称以及 Avro 启动时对应的 Avro 架构的命名空间使用转换器。
  • 5.与要监视的所有集合的集合命名空间(例如,.)匹配的正则表达式列表。这是可选的。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。该服务记录配置并启动一个执行以下操作的连接器任务:

  • 连接到 MongoDB 副本集或分片集群。
  • 为每个副本集分配任务。
  • 如有必要,执行快照。
  • 读取变更流。
  • 流将事件记录更改为 Kafka 主题。

完整案例:

二十八、连接器属性

Debezium MongoDB 连接器具有许多配置属性,您可以使用它们来为您的应用程序实现正确的连接器行为。许多属性都有默认值。有关属性的信息组织如下:

  • 必需的 Debezium MongoDB 连接器配置属性
  • 高级 Debezium MongoDB 连接器配置属性

除非有默认值,否则需要以下配置属性。

表 13. 所需的 Debezium MongoDB 连接器配置属性

属性 默认值 描述
name 无默认值 连接器的唯一名称。尝试使用相同名称再次注册将会失败。 (所有 Kafka Connect 连接器都需要此属性。)
connector.class 无默认值 连接器的 Java 类的名称。始终对 MongoDB 连接器使用 io.debezium.connector.mongodb.MongoDbConnector 值。
mongodb.connection.string 无默认值 指定连接器用于连接到 MongoDB 副本集的连接字符串。此属性替换了 MongoDB 连接器早期版本中提供的 mongodb.hosts 属性。仅当 mongodb.connection.mode 设置为replica_set 时,从分片 MongoDB 集群捕获更改的连接器仅在初始分片发现过程中使用此连接字符串。在初始发现过程之后,将为每个单独的分片生成连接字符串。
mongodb.connection.string.shard.params 无默认值 指定连接器用于连接到 MongoDB 分片集群的各个分片的连接字符串的 URL 参数,包括读取首选项。仅当 mongodb.connection.mode 设置为replica_set 时,此属性才适用。
mongodb.connection.mode replica_set 指定连接器连接到分片 MongoDB 集群时使用的策略。将此属性设置为以下值之一:副本集,连接器为每个分片建立与副本集的单独连接。分片的,连接器根据 mongodb.connection.string 的值建立与数据库的单个连接。副本集选项允许连接器跨多个连接器任务分配分片处理。但是,在此配置中,连接器在连接到各个分片时会绕过 MongoDB 路由器,MongoDB 不建议这样做。连接模式之间的切换会使存储的偏移量失效,从而触发新的快照。
topic.prefix 无默认值 标识该连接器监控的连接器和/或 MongoDB 副本集或分片集群的唯一名称。每台服务器最多应由一个 Debezium 连接器监控,因为该服务器名称为来自 MongoDB 副本集或集群的所有持久 Kafka 主题添加前缀。仅使用字母数字字符、连字符、点和下划线来构成名称。逻辑名称在所有其他连接器中应该是唯一的,因为该名称用作命名从该连接器接收记录的 Kafka 主题的前缀。不要更改此属性的值。如果您更改名称值,则在重新启动后,连接器不会继续向原始主题发出事件,而是向名称基于新值的主题发出后续事件。
mongodb.authentication.class DefaultMongoDbAuthProvider 完整的 Java 类名,是 io.debezium.connector.mongodb.connection.MongoDbAuthProvider 接口的实现。此类处理 MongoDB 连接上的凭据设置(在每次应用程序启动时调用)。默认行为根据每个文档使用 mongodb.user、mongodb.password 和 mongodb.authsource 属性,但其他实现可能会以不同方式使用它们或完全忽略它们。请注意,mongodb.connection.string 中的任何设置都将覆盖此类设置的设置
mongodb.user 无默认值 使用默认 mongodb.authentication.class 时:连接到 MongoDB 时使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。
mongodb.password No default 使用默认 mongodb.authentication.class 时:连接到 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。
mongodb.authsource admin 使用默认 mongodb.authentication.class 时:包含 MongoDB 凭据的数据库(身份验证源)。仅当 MongoDB 配置为使用 admin 之外的其他身份验证数据库进行身份验证时,才需要这样做。
mongodb.ssl.enabled false 连接器将使用 SSL 连接到 MongoDB 实例。
mongodb.ssl.invalid.hostname.allowed false 启用 SSL 后,此设置控制是否在连接阶段禁用严格的主机名检查。如果为 true,连接将不会阻止中间人攻击。
filters.match.mode regex 用于根据包含/排除的数据库和集合名称来匹配事件的模式。将属性设置为以下值之一:正则表达式,数据库和集合包含/排除被评估为逗号分隔的正则表达式列表。文字,数据库和集合包含/排除被评估为逗号分隔的字符串文字列表。这些文字周围的空白字符将被删除。
database.include.list empty string 与要监视的数据库名称匹配的可选逗号分隔正则表达式或文字列表。默认情况下,所有数据库都会受到监控。设置database.include.list 后,连接器仅监视该属性指定的数据库。其他数据库被排除在监控之外。为了匹配数据库的名称,Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说,指定的表达式与数据库的整个名称字符串进行匹配;它与数据库名称中可能存在的子字符串不匹配。将您指定的文字与数据库的整个名称字符串进行比较如果您在配置中包含此属性,则不要同时设置database.exclude.list 属性。
database.exclude.list empty string 可选的以逗号分隔的正则表达式或文字列表,与要从监视中排除的数据库名称相匹配。设置 database.exclude.list 后,连接器将监视除该属性指定的数据库之外的每个数据库。为了匹配数据库的名称,Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说,指定的表达式与数据库的整个名称字符串进行匹配;它与数据库名称中可能存在的子字符串不匹配。将您指定的文字与数据库的整个名称字符串进行比较,如果您在配置中包含此属性,请不要设置database.include.list 属性。
collection.include.list empty string 可选的以逗号分隔的正则表达式或文字列表,与要监视的 MongoDB 集合的完全限定命名空间相匹配。默认情况下,连接器监视除本地和管理数据库中的集合之外的所有集合。设置 collection.include.list 后,连接器仅监视该属性指定的集合。其他集合被排除在监控之外。集合标识符的格式为databaseName.collectionName。为了匹配命名空间的名称,Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说,指定的表达式与命名空间的整个名称字符串进行匹配;它与名称中的子字符串不匹配。将您指定的文字与命名空间的整个名称字符串进行比较如果您在配置中包含此属性,则不要同时设置 collection.exclude.list 属性。
collection.exclude.list empty string 可选的以逗号分隔的正则表达式或文字列表,与要从监视中排除的 MongoDB 集合的完全限定命名空间相匹配。设置 collection.exclude.list 后,连接器将监视除该属性指定的集合之外的每个集合。集合标识符的格式为databaseName.collectionName。为了匹配命名空间的名称,Debezium 根据filters.match.mode 属性的值执行以下操作之一应用您指定为锚定正则表达式的正则表达式。也就是说,指定的表达式与命名空间的整个名称字符串进行匹配;它与数据库名称中可能存在的子字符串不匹配。将您指定的文字与命名空间的整个名称字符串进行比较,如果您在配置中包含此属性,请不要设置 collection.include.list 属性。
capture.mode change_streams_update_full 指定连接器用于从 MongoDB 服务器捕获更新事件更改的方法。将此属性设置为以下值之一:change_streams:更新事件消息不包含完整文档。消息不包含表示更改之前文档状态的字段。change_streams_update_full:更新事件消息包括完整的文档。消息不包含表示更新之前文档状态的 before 字段。事件消息在 after 字段中返回文档的完整状态。在某些情况下,当 capture.mode 配置为返回完整文档时,更新事件消息的 updateDescription 和 after 字段可能会报告不一致的值。对文档快速连续应用多个更新后,可能会导致此类差异。连接器仅在收到事件 updateDescription 字段中描述的更新后才从 MongoDB 数据库请求完整文档。如果稍后的更新在连接器从数据库中检索源文档之前修改了源文档,则连接器会收到由该后续更新修改的文​​档。change_streams_update_full_with_pre_image:更新事件事件消息包括完整文档,并包括表示更改之前文档状态的字段。change_streams_with_pre_image:更新事件不包括完整文档,但包括表示更改之前文档状态的字段。
capture.scope deployment 指定连接器打开的变更流的范围。将此属性设置为以下值之一:deployment打开部署(副本集或分片集群)的更改流游标,以监视所有数据库(管理、本地和配置除外)中所有非系统集合的更改。database打开单个数据库的更改流游标以监视其所有非系统集合的更改。要支持 Debezium 信令,如果将 capture.scope 设置为数据库,则信令数据集合必须驻留在 capture.target 属性指定的数据库中。
capture.target 指定连接器监视更改的数据库。仅当 capture.scope 设置为数据库时,此属性才适用。
field.exclude.list empty string 应从更改事件消息值中排除的字段的完全限定名称的可选逗号分隔列表。字段的完全限定名称的格式为databaseName.collectionName.fieldName.nestedFieldName,其中databaseName 和collectionName 可以包含与任何字符匹配的通配符(*)。
field.renames empty string 字段的完全限定替换的可选逗号分隔列表,应用于重命名更改事件消息值中的字段。字段的完全限定替换的形式为databaseName.collectionName.fieldName.nestedFieldName:newNestedFieldName,其中databaseName和collectionName可以包含匹配任何字符的通配符(*),冒号字符(:)用于确定字段的重命名映射。下一个字段替换将应用于列表中上一个字段替换的结果,因此在重命名同一路径中的多个字段时请记住这一点。
tasks.max 1 指定连接器用于连接到分片集群的最大任务数。当您将连接器与单个 MongoDB 副本集一起使用时,默认值是可以接受的。但是,当集群包含多个分片时,为了使 Kafka Connect 能够为每个副本集分配工作,请指定一个等于或大于集群中分片数量的值。然后,MongoDB 连接器可以使用单独的任务连接到集群中每个分片的副本集。仅当连接器连接到分片 MongoDB 集群并且 mongodb.connection.mode 属性设置为replica_set 时,此属性才有效。当 mongodb.connection.mode 设置为 sharded 时,或者连接器连接到未分片的 MongoDB 副本集部署时,连接器会忽略此设置,并默认仅使用单个任务。
tombstones.on.delete true 控制删除事件后是否发生逻辑删除事件。true - 删除操作由删除事件和后续逻辑删除事件表示。false - 仅发出删除事件。删除源记录后,如果为主题启用了日志压缩,则发出逻辑删除事件(默认行为)允许 Kafka 完全删除与已删除行的键相关的所有事件。
schema.name.adjustment.mode none 指定应如何调整架构名称以与连接器使用的消息转换器兼容。可能的设置:none 不应用任何调整。avro 将 Avro 类型名称中不能使用的字符替换为下划线。avro_unicode 将 Avro 类型名称中不能使用的下划线或字符替换为相应的 unicode,如 _uxxxx。注意:_是转义序列,类似于Java中的反斜杠
field.name.adjustment.mode none 指定应如何调整字段名称以与连接器使用的消息转换器兼容。可能的设置:none 不应用任何调整。avro 将 Avro 类型名称中不能使用的字符替换为下划线。avro_unicode 将 Avro 类型名称中不能使用的下划线或字符替换为相应的 unicode,如 _uxxxx。注意:_是转义序列,类似于Java中的反斜杠

以下高级配置属性具有良好的默认值,适用于大多数情况,因此很少需要在连接器的配置中指定。

表 14. Debezium MongoDB 连接器高级配置属性

属性 默认值 描述
max.batch.size 2048 正整数值,指定在此连接器的每次迭代期间应处理的每批事件的最大大小。默认为 2048。
max.queue.size 8192 正整数值,指定阻塞队列可以保存的最大记录数。当 Debezium 读取从数据库流式传输的事件时,它会将事件放入阻塞队列中,然后再将其写入 Kafka。如果连接器摄取消息的速度快于将消息写入 Kafka 的速度,或者当 Kafka 不可用时,阻塞队列可以为从数据库读取更改事件提供反压。当连接器定期记录偏移量时,队列中保存的事件将被忽略。始终将 max.queue.size 的值设置为大于 max.batch.size 的值。
max.queue.size.in.bytes 0 一个长整数值,指定阻塞队列的最大容量(以字节为单位)。默认情况下,没有为阻塞队列指定卷限制。要指定队列可以使用的字节数,请将此属性设置为正 long 值。如果还设置了 max.queue.size,则当队列大小达到任一属性指定的限制时,写入队列将被阻止。例如,如果设置 max.queue.size=1000,并且 max.queue.size.in.bytes=5000,则在队列包含 1000 条记录后或在队列中的记录量达到后,写入队列将被阻止达到5000字节。
poll.interval.ms 1000 正整数值,指定连接器在每次迭代期间应等待新更改事件出现的毫秒数。默认为 500 毫秒或 0.5 秒。
connect.backoff.initial.delay.ms 1000 正整数值,指定在第一次连接尝试失败后或没有可用的主节点时尝试重新连接到主节点时的初始延迟。默认为 1 秒(1000 毫秒)。
connect.backoff.max.delay.ms 1000 正整数值,指定在重复尝试连接失败后或没有可用的主节点时尝试重新连接到主节点时的最大延迟。默认为 120 秒(120,000 毫秒)。
connect.max.attempts 16 正整数值,指定在发生异常和任务中止之前尝试连接到主副本集失败的最大次数。默认为 16,使用 connect.backoff.initial.delay.ms 和 connect.backoff.max.delay.ms 的默认值会导致尝试 20 多分钟后失败。
source.struct.version v2 CDC 事件中源块的架构版本。 Debezium 0.10 引入了一些突破更改源块的结构,以便统一所有连接器上的公开结构。通过将此选项设置为 v1,可以生成早期版本中使用的结构。请注意,不建议使用此设置,并计划在未来的 Debezium 版本中删除此设置。
heartbeat.interval.ms 0 控制发送心跳消息的频率。此属性包含一个时间间隔(以毫秒为单位),用于定义连接器将消息发送到检测信号主题的频率。这可用于监视连接器是否仍在接收来自数据库的更改事件。如果仅非捕获集合中的记录在较长时间内发生更改,您还应该利用心跳消息。在这种情况下,连接器将继续从数据库读取 oplog/更改流,但不会将任何更改消息发送到 Kafka,这又意味着不会向 Kafka 提交任何偏移更新。这将导致 oplog 文件被轮换,但连接器不会注意到它,因此重新启动时某些事件不再可用,这导致需要重新执行初始快照。将此参数设置为 0 则根本不发送心跳消息。默认禁用。
skipped.operations t 在流式传输过程中将跳过的以逗号分隔的操作类型列表。这些操作包括:c 表示插入/创建,u 表示更新/替换,d 表示删除,t 表示截断,none 表示不跳过任何上述操作。默认情况下,为了与其他 Debezium 连接器保持一致,会跳过截断操作(此连接器不发出)。但是,由于 MongoDB 不支持截断更改事件,因此这实际上与指定 none 相同。
snapshot.collection.filter.overrides No default 控制快照中包含哪些集合项。此属性仅影响快照。以databaseName.collectionName 的形式指定以逗号分隔的集合名称列表。对于您指定的每个集合,还指定另一个配置属性:snapshot.collection.filter.overrides.databaseName.collectionName。例如,其他配置属性的名称可能是:snapshot.collection.filter.overrides.customers.orders。将此属性设置为有效的筛选表达式,该表达式仅检索快照中所需的项目。当连接器执行快照时,它仅检索与过滤器表达式匹配的项目。
snapshot.delay.ms No default 连接器在启动后拍摄快照之前应等待的时间间隔(以毫秒为单位);可用于避免在集群中启动多个连接器时发生快照中断,这可能会导致连接器重新平衡。
snapshot.fetch.size 0 指定在拍摄快照时应从每个集合一次性读取的最大文档数。连接器将按此大小分批读取集合内容。默认为 0,表示服务器选择合适的获取大小。
snapshot.include.collection.list collection.include.list 中指定的所有集合 可选的、以逗号分隔的正则表达式列表,与要包含在快照中的架构的完全限定名称 (.) 相匹配。指定的项目必须在连接器的 collection.include.list 属性中命名。仅当连接器的 snapshot.mode 属性设置为 never 以外的值时,此属性才会生效。此属性不会影响增量快照的行为。为了匹配模式的名称,Debezium 应用您指定为锚定正则表达式的正则表达式。也就是说,指定的表达式与模式的整个名称字符串进行匹配;它与模式名称中可能存在的子字符串不匹配。
snapshot.max.threads 1 正整数值,指定用于执行副本集中集合的初始同步的最大线程数。默认为 1。
snapshot.mode initial 指定连接器启动时执行快照的条件。将属性设置为以下值之一:initial当连接器启动时,如果它没有检测到其 offsets 主题中的值,它将执行数据库快照。never当连接器启动时,它会跳过快照过程并立即开始将数据库记录到 oplog 的操作的更改事件流式传输。
provide.transaction.metadata false 当设置为 true 时,Debezium 会生成具有事务边界的事件,并使用事务元数据丰富数据事件包络。
retriable.restart.connector.wait.ms 10000 (10 seconds) 发生可重试错误后重新启动连接器之前要等待的毫秒数。
mongodb.poll.interval.ms 30000 连接器轮询新的、已删除的或更改的副本集的时间间隔。
mongodb.connect.timeout.ms 10000 (10 seconds) 中止新连接尝试之前驱动程序将等待的毫秒数。
mongodb.heartbeat.frequency.ms 10000 (10 seconds) 集群监视器尝试访问每台服务器的频率。
mongodb.socket.timeout.ms 0 发生超时之前套接字上发送/接收所需的毫秒数。值为 0 会禁用此行为。
mongodb.server.selection.timeout.ms 30000 (30 seconds) 驱动程序在超时并抛出错误之前等待选择服务器的毫秒数。
cursor.pipeline 无默认值 当流变化时,此设置将处理更改流事件作为标准 MongoDB 聚合流管道的一部分。管道是 MongoDB 聚合管道,由对数据库进行过滤或转换数据的指令组成。这可用于自定义连接器消耗的数据。此属性的值必须是 JSON 格式的允许聚合管道阶段的数组。请注意,它附加在用于支持连接器的内部管道之后(例如过滤操作类型、数据库名称、集合名称等)。
cursor.pipeline.order internal_first 用于构造有效的 MongoDB 聚合流管道的顺序。将属性设置为以下值之一:internal_first首先应用由连接器定义的内部阶段。这意味着只有应该由连接器捕获的事件才会被馈送到用户定义的阶段(通过设置cursor.pipeline进行配置)。user_first首先应用由“cursor.pipeline”属性定义的阶段。在此模式下,所有事件(包括连接器未捕获的事件)都将馈送到用户定义的管道阶段。如果cursor.pipeline 的值包含复杂的操作,此模式可能会对性能产生负面影响。user_only由“cursor.pipeline”属性定义的阶段将替换由连接器定义的内部阶段。此模式仅适用于专家用户,因为所有事件仅由用户定义的管道阶段处理。此模式会对连接器的性能和整体功能产生负面影响!
cursor.oversize.handling.mode fail 用于处理超过指定 BSON 大小的文档的更改事件的策略。将属性设置为以下值之一:fail如果更改事件的总大小超过最大 BSON 大小,连接器将失败。skip超过最大大小(由cursor.oversize.skip.threshold 属性指定)的文档的任何更改事件都将被忽略
cursor.oversize.skip.threshold 0 处理更改事件的存储文档的最大允许大小(以字节为单位)。这包括数据库操作之前和之后的大小,更具体地说,这限制了 MongoDB 更改事件的 fullDocument 和 fullDocumentBeforeChange 字段的大小。
cursor.max.await.time.ms 0 指定 oplog/更改流游标在导致执行超时异常之前等待服务器生成结果的最大毫秒数。值 0 表示使用服务器/驱动程序默认等待超时。
signal.data.collection 无默认值 用于向连接器发送信号的数据集合的完全限定名称。使用以下格式指定集合名称:<数据库名称>.<集合名称>
signal.enabled.channels source 为连接器启用的信令通道名称列表。默认情况下,以下通道可用:source、kafka、file、jmx 您还可以选择实现自定义信号通道。
notification.enabled.channels 无默认值 为连接器启用的通知通道名称列表。默认情况下,以下通道可用:sink、log、jmx 您还可以选择实现自定义通知通道。
incremental.snapshot.chunk.size 1024 连接器在增量快照块期间获取并读入内存的最大文档数。增加块大小可以提高效率,因为快照运行的快照查询数量越大,查询的数量就越少。然而,较大的块大小也需要更多的内存来缓冲快照数据。将块大小调整为可在您的环境中提供最佳性能的值。
topic.naming.strategy io.debezium.schema.DefaultTopicNamingStrategy TopicNamingStrategy 类的名称应用于确定数据更改、架构更改、事务、心跳事件等的主题名称,默认为DefaultTopicNamingStrategy。
topic.delimiter . 指定主题名称的分隔符,默认为…
topic.cache.size 10000 用于在有界并发哈希图中保存主题名称的大小。该缓存将有助于确定与给定数据集合对应的主题名称。
topic.heartbeat.prefix __debezium-heartbeat 控制连接器向其发送心跳消息的主题的名称。主题名称具有以下模式:topic.heartbeat.prefix.topic.prefix,例如,如果主题前缀为fulfillment,则默认主题名称为__debezium-heartbeat.fulfillment。
topic.transaction transaction 控制连接器向其发送事务元数据消息的主题的名称。主题名称具有以下模式:topic.prefix.topic.transaction例如,如果主题前缀是fulfillment,则默认主题名称是fulfillment.transaction。
custom.metric.tags No default 自定义指标标签将接受键值对来自定义 MBean 对象名称,该名称应附加在常规名称的末尾,每个键代表 MBean 对象名称的一个标签,相应的值将是该标签的值关键群岛。例如:k1=v1,k2=v2。
errors.max.retries -1 失败前可重试错误(例如连接错误)的最大重试次数(-1 = 无限制,0 = 禁用,> 0 = 重试次数)。

二十九、Debezium 连接器 Kafka 信号配置属性

Debezium 提供了一组 signal.* 属性,用于控制连接器如何与 Kafka 信号主题交互。

下表描述了 Kafka 信号属性。

表 15. Kafka 信号配置属性

属性 默认值 描述
signal.kafka.topic <topic.prefix>-signal 连接器监视临时信号的 Kafka 主题的名称。如果禁用自动创建主题,则必须手动创建所需的信令主题。需要一个信令主题来保留信号顺序。信令主题必须具有单个分区。
signal.kafka.groupId kafka-signal Kafka 消费者使用的组 ID 的名称。
signal.kafka.bootstrap.servers No default 连接器用于建立与 Kafka 集群的初始连接的主机/端口对列表。每对都引用 Debezium Kafka Connect 进程使用的 Kafka 集群。
signal.kafka.poll.timeout.ms 100 一个整数值,指定连接器在轮询信号时等待的最大毫秒数。

Debezium 连接器传递信号 Kafka 消费者客户端配置属性

Debezium 连接器提供信号 Kafka 消费者的直通配置。直通信号属性以前缀 Signals.consumer.* 开头。例如,连接器将 signal.consumer.security.protocol=SSL 等属性传递给 Kafka 使用者。

Debezium 在将属性传递给 Kafka 信号使用者之前会去除属性中的前缀。

Debezium 连接器接收器通知配置属性

下表描述了通知属性。

表 16. 接收器通知配置属性

属性 默认值 描述
notification.sink.topic.name No default 从 Debezium 接收通知的主题的名称。当您配置 notification.enabled.channels 属性以将接收器包含为启用的通知通道之一时,需要此属性。

三十、监控

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zhengzaifeidelushang/article/details/134018934

智能推荐

OpenHarmony语言基础类库【@ohos.url (URL字符串解析)】

本模块首批接口从API version 7开始支持。后续版本的新增接口,采用上角标单独标记接口的起始版本。

浏览器安全之同源策略_浏览器同源策略-程序员宅基地

文章浏览阅读1.2k次。明确定义集成系统之间的接口和端点。确定HTTP请求和响应的格式,包括使用的数据编码格式(如JSON或XML),以及请求和响应的头部信息。定义HTTP的接口,首先应该确定接口功能和目标,明确接口的目的和提供的功能。确定接口所要实现的业务逻辑或服务,并理解它在整个系统中的角色和作用。然后选择HTTP方法和端点,根据接口的功能,选择合适的HTTP方法(如GET、POST、PUT、DELETE等)来表示接口的操作类型。同时,定义接口的端点(Endpoint),即接口的URL路径,例如:/api/users。_浏览器同源策略

【ARM 裸机】BSP 工程管理

ARM 裸机,BSP 工程管理,之后的工程结构一目了然,层次分明,不再乱乱了

三 STM32F4使用Sys_Tick 实现微秒定时器和延时_stm32如何实现定时器-程序员宅基地

文章浏览阅读3.3k次,点赞46次,收藏48次。时钟是由电路产生的周期性的脉冲信号,相当于单片机的心脏。_stm32如何实现定时器

Python爱心代码

【代码】Python爱心代码。

CSS-文本换行处理-white-space_white-space;normal-程序员宅基地

文章浏览阅读2.3k次,点赞4次,收藏3次。简介:本文主要介绍通过设置CSS的white-space属性来处理元素内的空白、空白符,以实现文本的不换行、自动换行、空白保留或合并。详情参考:https://timor419.github.io/2021/04/07/CSS-white-space/这边先列一下white-space可以设置的值,及其作用:一、normal作用:默认,空白会被浏览器忽略。HTML<div class="normal">这是一些文本。这是一些文本。这是一些文本。</div>_white-space;normal

随便推点

ORACLE 11G利用 ORDS+pljson来实现json_table 效果_oracle pljson-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏4次。Oracle 在12.1中引入了对json的支持,可以利用sql来查询json字段,对于11G的版本,例如EBS环境,可以利用开源软件pljson 来实现。json数据源实例下面这一段就是要处理的json,是多层次的机构{ "PONumber": 1608, "Requestor": "Alexis Bull", "CostCenter": "A50", "Address": { "street": "200 Sporting Green", "city": "South San ._oracle pljson

centos 解决python3.7 安装时No module named _ssl_centos 7.6 python3.8 no module named 'xml.etree-程序员宅基地

文章浏览阅读2.6k次。转载自:https://www.jianshu.com/p/3ec24f563b81_centos 7.6 python3.8 no module named 'xml.etree

Numpy/Padas/Scipy/Matplotlib/sklearn在Ubuntu16.04下的安装-程序员宅基地

文章浏览阅读149次。本文主要介绍在Ubuntu16.04下安装基于Python3的机器学习开发环境所需要的一些Python插件,具体包括常用的Numpy,Pandas,Scipy,Matplotlib,sklearn等,由于Ubuntu16.4系统默认的Python版本是python2.7.12,但是目前主流的机器学习开发环境是基于Python3的,所以本文以Python3为例,介绍机器学习开发环境的搭建,..._在虚拟环境内安装扩展库numpy、pandas、matplotlib、sklearn

ChatGPT 网络安全秘籍(一)

在不断发展的网络安全领域中,由 OpenAI 推出的 ChatGPT 所代表的生成式人工智能和大型语言模型LLMs)的出现,标志着一个重大的飞跃。本书致力于探索 ChatGPT 在网络安全领域的应用,从这个工具作为基本聊天界面的萌芽阶段开始,一直到它如今作为重塑网络安全方法论的先进平台的地位。最初构想为通过分析用户交互来辅助 AI 研究,ChatGPT 从其于 2022 年底的首次发布到如今的形态,仅一年多的时间就经历了一次非凡的演变。

ChatGPT向付费用户推“记忆”功能,可记住用户喜好 | 最新快讯

最初,只有“一小部分”用户能够使用此功能,但现在,除欧洲和韩国以外的所有ChatGPT Plus付费用户都能使用“记忆”功能。公司表示,将向ChatGPT Enterprise及ChatGPT Teams的订阅用户推出“记忆”功能,并计划将其在GPT Store商店上线,但未具体说明时间。用户可以通过查看聊天机器人从对话中提取的内容来管理ChatGPT的“记忆”,甚至可以指示ChatGPT“忘记”不再需要的细节。- 用户是一名拥有25名学生的幼儿园老师,更喜欢50分钟的课程和随后的活动。

Git Hooks的使用_git git-hocks使用-程序员宅基地

文章浏览阅读2.7k次。Git Hooks的使用1.git默认提供2.自定义hook操作(1)直接修改(2)链接自定义文件有时我们想要在git操作时候进行一些定制化操作,比如在git commit时候检查一下提交内容是否合规、git push时候检查一下资源文件大小等等,这些功能需要我们可以在git命令执行前后进行拦截,git hooks提供了这样的能力。1.git默认提供我们每个通过git管理的项目,在.git/hooks/文件夹中,会提供一些默认的git hooks文件,比如pre-commit.sample pre-p_git git-hocks使用