Flink消费kafka的offset设置_flink kafka offset-程序员宅基地

技术标签: spark  Kafka  Spark  

1.问题问题简介及背景

在使用Flink自带的Kafka消费API时,我们可以像单纯的使用Kafka消费对象API对其进行相应的属性设置,例如,读取offset的方式、设置offset的方式等。但是,Flink具有checkpoint功能,保存各运算算子的状态,也包括消费kafka时的offset,那这两种情况分别在什么时候起作用呢?

2. Flink checkpoint设置

flink并不依赖kafka或zookeeper保证容错,其保存offset只是为了外部来查询监视kafka数据的消费情况。但其提供了提交消费kafka数据的offset给Kafka或者zookeeper(kafka0.8之前)的配置,因此最终是由kafka自动设置offset,还是由flink的checkpoint机制进行最终的offset设置,取决于开发过程中的相关设置。

配置offset的提交方式取决于是否为job设置开启checkpoint,可以使用env.enableCheckpointing(milliseconds)来设置开启checkpoint。

如果禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。

如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。

    // checkpoint设置
    // 每隔1s进行启动一个检查点【设置checkpoint的周期】
    env.enableCheckpointing(1000);
    // 设置模式为:exactly_one,仅一次语义
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 确保检查点之间有1s的时间间隔【checkpoint最小间隔】
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
    // 检查点必须在1s之内完成,或者被丢弃【checkpoint超时时间】
    env.getCheckpointConfig().setCheckpointTimeout(1000);
    // 同一时间只允许进行一次检查点
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    //表示一旦Flink程序被cancel后,会保留checkpoint数据,以便根据实际需要恢复到指定的checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //设置statebackend,将检查点保存在hdfs上面,默认保存在内存中。这里先保存到本地
    // env.setStateBackend(new FsStateBackend("file:///F:/kafkaTool/aaa"));

    // Kafka相关设置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "hadoop:9091,hadoop:9092,hadoop:9093")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    // properties.setProperty("auto.offset.reset", "latest") // 默认是latest,当第一次运行,会读取最近提交的offset
    // properties.setProperty("enable.auto.commit", "true") // 默认是true

    val flinkKafkaConsumer011 = new FlinkKafkaConsumer011[String]("GMALL_EVENT_0105", new SimpleStringSchema(), properties)
    // 设置根据程序checkpoint进行offset提交
    flinkKafkaConsumer011.setCommitOffsetsOnCheckpoints(true)
    flinkKafkaConsumer011.setStartFromGroupOffsets()

无论是否设置checkpoint,auto.offset.reset都默认为latest,enable.auto.commit都默认为true,第一次启动程序,会根据FlinkKafkaConsumer011对象的设置读取相应的offsets。

  1. setStartFromEarliest():从消息最开始消费;
  2. setStartFromLatest():从消息的最后开始消费;
  3. setStartFromTimestamp(long startupOffsetsTimestamp):从设定的时间戳消费;
  4. setStartFromGroupOffsets():从kafka保存的消费者组的offsets消费,如果没有,会根据auto.offset.reset设定进行消费(auto.offset.reset默认为latest),也就是说我第一次启动程序的时候从哪里消费也需要考虑;
    setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets):从特定offset消费;
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/pageniao/article/details/124279617

智能推荐

第三十六篇、基于Arduino uno,获取红外寻迹传感器的原始值——结果导向_怎么检测红外寻迹模块返回值-程序员宅基地

文章浏览阅读385次。基于Arduino uno,获取红外寻迹传感器的原始值_怎么检测红外寻迹模块返回值

基于单片机的无线投票显示系统设计-程序员宅基地

文章浏览阅读494次,点赞5次,收藏9次。单片机(Microcontroller)是一种集成了微处理器核心、存储器、输入/输出接口和定时器等功能模块的集成电路芯片,具有体积小、功耗低、性价比高等特点,被广泛应用于各个领域。单片机的发展历史可以追溯到20世纪70年代,当时的单片机功能有限,主要用于简单的控制任务。

生成对抗网络GAN_生成对抗网络 python代码-程序员宅基地

文章浏览阅读412次。https://zhuanlan.zhihu.com/p/54096381_生成对抗网络 python代码

html——网页上添加表格_怎样在网站中添加表格别人可以下载-程序员宅基地

文章浏览阅读5.2k次,点赞7次,收藏18次。有时候我们需要在网页上展示一些数据,如某公司想在网页上展示公司的库存清单。如下表:想在网页上展示上述表格效果可以使用以下代码:创建表格的四个元素:table、tbody、tr、th、td1、…:整个表格以标记开始、标记结束。2、…:当表格内容非常多时,表格会下载一点显示一点,但如果加上标签后,这个表格就要等表格内容全部下载完才会显示。如右侧代码编辑器中的代码。3、…_怎样在网站中添加表格别人可以下载

《Qt MOOC系列教程》第五章第三节:创建新的QML类型_qmlregisteruncreatabletype-程序员宅基地

文章浏览阅读770次。到目前为止,我们已经讨论了如何将对象实例公开给QML上下文。有时我们还希望在QML中可以使用注册类本身。注册允许将类当作QML中的数据类型来使用。此外,注册还可以提供其他功能,比如允许在QML中将类用作可实例化的QML对象类型,或者允许在QML中导入和使用类的单例实例。通常我们使用Q_OBJECT宏注册从QObject派生的类,也可以用Q_GADGET宏声明一个比QObject“更轻”的版本。在这些更轻的类中,我们可以访问它们的属性、枚举和可调用的方法,但不能使用信号槽系统,我们稍后会进行介绍。1. 注_qmlregisteruncreatabletype

头文件与命名空间的关系_c#中命名空间和c语言中头文件之间的关系-程序员宅基地

文章浏览阅读2.1k次,点赞7次,收藏15次。头文件与命名空间的关系 Q:有些书说有些头文件不在std里是什么意思?std里包含些什么?为什么不用std就不能使用cout?头文件中声明的东西为什么在使用的时候需要先using namespace std;一下?如果我不用#include和其他头文件。只用using namespace std 的话,是不能用cout的。这说明cout是在iostream里声明_c#中命名空间和c语言中头文件之间的关系

随便推点

python实现矩阵乘法(实现文件读写操作)_python 读取csv矩阵乘法-程序员宅基地

文章浏览阅读1.2k次,点赞2次,收藏8次。注释dtype=np.int代表导入数据的格式为整数delimiter=’,'代表原始数据的存储格式为以‘,’为间隔原始文件中以‘#’开头的行代表被注释,不会被np.loadtxt读取通过[[0] * b for i in range(a)]的方式初始化一个x[a][b]的二维数组np.savetxt()函数可以用来保存数据,第一个参数为保存数据的路径,其中C是自定义的文件名,如果该文..._python 读取csv矩阵乘法

《军团要塞2》绘画渲染_军团要塞画师-程序员宅基地

文章浏览阅读1.4k次。军团要塞2绘画渲染(a)美术概念 (b)游戏内玩家看到的角色摘要在《军团要塞2》中我们提出了一整套美术方案和新的实时渲染技术,这种技术能实现出一种独一无二的渲染风格。《军团要塞2》由美术和程序基于20世纪初时商业插画中的传统风格合作完成。在这篇论文中,我们会结合美术方向与技术选择,来讨论如何支持美术目标和玩法限制。除了实现一种有冲击力的风格外,我们也设计了边缘光照和亮度与色调变化的着色器技..._军团要塞画师

【数字图像处理实验二】:RGB图3个通道的提取、RGB图转灰度图、图片反转、图片亮度调整、直方图显示_jupter rgb灰度直方图提取-程序员宅基地

文章浏览阅读9.6k次,点赞8次,收藏65次。这里介绍:RGB图3个通道的提取、RGB图转灰度图、图片反转、图片亮度调整具体操作,需导入的库如下:原图如下:结果如下,从左到右分别是:Red,Green,Blue这里借助skimage库中的exposure函数来进行图像亮度的调整结果如下:........._jupter rgb灰度直方图提取

2023年地级、省级、县级、国界、九段线的shp数据_九段线shp数据-程序员宅基地

文章浏览阅读931次。2023年地级、省级、县级、国界、九段线的shp数据_九段线shp数据

python高校本科生学习成长记录系统的设计与实现flask-django-php-nodejs-程序员宅基地

文章浏览阅读797次,点赞16次,收藏19次。二十一世纪我们的社会进入了信息时代,信息管理系统的建立,大大提高了人们信息化水平。传统的管理方式对时间、地点的限制太多,而在线管理系统刚好能满足这些需求,在线管理系统突破了传统管理方式的局限性。于是本文针对这一需求设计并实现了一个基于django高校本科生学习成长记录系统,为了简捷并有效的解决学习各方面的问题。

redis实现分布式session共享_redis分布式session共享-程序员宅基地

文章浏览阅读7.7k次。为什么要共享session?我们使用单台Tomcat的时候不会有共享sesssion的疑虑,只要使用Tomcat的默认配置即可,session即可存储在Tomcat上。但是随着业务的扩大,增加Tomcat节点构成Tomcat集群大势所趋,分布式带来了增加更大规模并发请求的优势,但是也随之到来了一个问题,每个Tomcat只存储来访问自己的请求产生的session,如果Tomcat-A已经为客..._redis分布式session共享

推荐文章

热门文章

相关标签