Flink调用dubbo接口写入kafka时遇到的一些问题解决_cannot load user class: org.apache.flink.connector-程序员宅基地

技术标签: flink  kafka  

Flink调用dubbo接口写入kafka时遇到的一些问题解决

  1. org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:416)
    	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:288)
    	at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:123)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initProducer(FlinkKafkaProducer011.java:983)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initNonTransactionalProducer(FlinkKafkaProducer011.java:979)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:717)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:97)
    	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
    	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:868)
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
    	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:106)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:258)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:479)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.kafka.common.KafkaException: org.apache.flink.kafka011.shaded.org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
    	at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
    	at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:327)
    	... 22 more
    

    上述问题是kafka版本冲突的问题,通常在本地调试的时候不会出现问题,但是线上一般都会有相应的依赖,所以在maven依赖中需要加如下限定

<scope>provided</scope>

2

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:471)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:459)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:393)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:155)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:528)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.InvalidClassException: org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper; local class incompatible: stream classdesc serialVersionUID = -45234324918511287, local class serialVersionUID = -2852600975509513916
	at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:616)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1623)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.readObject(FlinkKafkaProducer.java:1288)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
	at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
	... 12 more

这个问题属于序列化的问题,解决方式就是在实体类中添加对应的序列化版本号

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/wangxingxingalulu/article/details/119002927

智能推荐

一次APM32替换STM32的经历分享_amp32-程序员宅基地

文章浏览阅读1.6w次,点赞55次,收藏101次。系列文章目录这几年相信大家知道STM32系列的芯片价格翻倍的涨,自己玩都快玩不起了,要是用于生产,这得多掏多少钱!所以现在大家都选择了国产芯片,哈哈不能说多差吧!价格你没得说。 这是我的一次APM32代替STM32的经历,你是不是也会遇到这样的坑呢?文章目录系列文章目录一、开始替换(流程)1.首先第一步找一个简单的工程,保证没有错误。警告没问题。2.寻找APM32芯片替换STM32芯片3.修改错误4.重点来了二、测试方法与结果1.测试2. SPI怎么测?总结一、开始替换(流程)&g_amp32

java中的进制转换及转换函数_java将八进制转五进制函数-程序员宅基地

文章浏览阅读4.4k次,点赞4次,收藏21次。Java的进制转换 进制转换原理 十进制 转二进制: 原理: 对十进制数进行除2 运算取余。 6 --> 110 二进制 转十进制 原理: 二进制 乘以 2 的n次幂 的过程 110 ->0*20 + 1*21 + 1 * 22 0 + 2 + 4=6 _java将八进制转五进制函数

Effective Java学习笔记--2017年5月_但构造完毕时,我们可以通过手工冻结对象-程序员宅基地

文章浏览阅读1.1k次。作为自己在大四最后一年时间学习书籍中的一本,EffectiveJava通过一周时间的完整阅读,根据自身的条件记录下重点,供自身以后学习参考借鉴_但构造完毕时,我们可以通过手工冻结对象

IOS:简单说一下MVVM与MVC的优缺点和使用_ios mvc和mvvm优缺点-程序员宅基地

文章浏览阅读2.2k次。MVC :M是数据模型V是视图C是控制器Model和View是相互独立的。View只负责页面展示,Model只是数据的存储,那么也就达到了解耦和重用的目的。MVVM的优点:方便测试 便于代码的移植兼容MVC,缺点:类会增多,viewModel会越来越庞大,调用复杂度增加MVVM什么时候使用:mvvm其实是mvc的变种而已。mvvm只是帮mvc中controller做瘦身,就是把一些逻辑代码和_ios mvc和mvvm优缺点

在Java中轻松将HTML格式文本转换为纯文本(保留换行)_把html代码转换成java string格式,遇到换行加上\n-程序员宅基地

文章浏览阅读1.3w次。第一步:引入lang和lang3的依赖:这两个包里有转换所需的工具类<dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version></dependen..._把html代码转换成java string格式,遇到换行加上\n

关于现代OS(操作系统)的四种基本观点_os有哪3种观点-程序员宅基地

文章浏览阅读1.2k次。文章目录1 从外部看OS1.1 计算机的普通使用者1.2 普通应用程序员的观点2 从内部看OS2.1 资源管理2.2 作业组织每天我们都在和手机,和平板,和电脑打交道。大部分人都听到过 “操作系统” 这几个字。我们的电子设备一开机就会进入到操作系统中。那么,究竟什么是操作系统呢,我们是怎样定义操作系统的呢?在现代观点中,我们主要针对不同的角度与不同的用户来定义操作系统。我们会从讲个角度来定义操作系统,内部,和外部。1 从外部看OS1.1 计算机的普通使用者对于普通的计算机用户来说,操作操作系统就_os有哪3种观点

随便推点

《Three.js 开发指南》源码示例说明以及在线demo(原书第二版)附第三版的代码下载_threejs开发指南第三版 pdf-程序员宅基地

文章浏览阅读3.9k次,点赞4次,收藏33次。1. 用Three.js创建你的第一个三维场景1.1 具有所有基本元素的hello world示例src/chapter-01/06-screen-size-change.html2. 使用构建Three.js场景的基本组件2.1 添加、删除、枚举、通过名字获取场景中的对象src/chapter-02/01-basic-scene.html2.2 雾化效果src/chapter-02..._threejs开发指南第三版 pdf

开发语言的选择_开发语言应首选-程序员宅基地

文章浏览阅读5.7k次,点赞4次,收藏2次。在软件这个行业里,怕是没有任何一个其话题域像开发语言这样引起争议了。对开发语言是非的争论,不单旷日持久,且深度亦是与时俱进。实现要强调下的是,在这里我们要专注的是开发语言的选择而非开发语言的优劣。从不同的视角对开发语言进行选择,其结论可能大相径庭。从项目的角度看,语言自身特性的多少,强弱往往并不成为一个关键选择因素。好比说某语言支持多重继承,而某语言不支持多重继承,但对大多项目而言多重继承这一语言_开发语言应首选

clickhouse(十二、踩坑之路)_attempt to read after eof: cannot parse int32 from-程序员宅基地

文章浏览阅读2.1w次,点赞11次,收藏36次。Q1DB::Exception: Cannot create table from metadata file /data/clickhouse/metadata/default/dwd_test.sql, error: DB::Exception: The local set of parts of table default.dwd_test doesn’t look like the set of parts in ZooKeeper: 65.88 million rows of 85.04 mi._attempt to read after eof: cannot parse int32 from string, because value is

python基础教程-数字与表达式——浮点数_python表达浮点数的两种方式-程序员宅基地

文章浏览阅读621次。1、python的加减乘数与计算机的加减乘除几乎差不多 + - * /2、如果参数除法中有一个为浮点数结果 亦为浮点数 >>> 1.0 / 2.0 0.5 >>> 1/2.0 0.53 、 双斜线 // 实现整除的操作符 >>> 1.0 // 2.0 0.0 >>> 1 // _python表达浮点数的两种方式

西瓜书《机器学习》课后答案——Chapter6_6.3_实验二、自主选择两个uci数据集,分别用高斯核训练svm分类器以及bp神经网络进行分-程序员宅基地

文章浏览阅读8.2k次,点赞5次,收藏45次。6.3 选择两个UCI数据集,分别用线性核和高斯核训练一个SVM,并与BP神经网络和C4.5决策树进行实验比较。 解答: (1) 准备libsvm的训练数据与测试数据从UCI网站上选择了Iris数据集,这个数据集总共分为3类,每类50个样本,每个实例有四个属性。数据保存在bezdekIris.txt文件中,举一个样本为例:5.1,3.5,1.4,0.2,Iris-setosa书中也没有介绍解决多_实验二、自主选择两个uci数据集,分别用高斯核训练svm分类器以及bp神经网络进行分

HBase-2.4.6安装教程 附常见错误解决_hbase2.4.6-程序员宅基地

文章浏览阅读741次。我这里采用了jdk1.8.0_301+hadoop-3.3.1+zookeeper-3.6.3+hbase-2.4.6的版本不同版本可能不能兼容,兼容性问题可以去官网查看http://hbase.apache.org/book.html#_preface我这里有三台虚拟机,hadoop102,hadoop103,hadoop1041、zookeeper正常部署首先保证三台机器的zookeeper正常启动[user@hadoop102 zookeeper-3.6.3]$ bin/zkServer.s_hbase2.4.6

推荐文章

热门文章

相关标签