Apache Kafka-Spring Kafka生产消费@KafkaListener源码解析_spring kafka生产者源码-程序员宅基地

技术标签: spring  KafkaListener  【MQ-Apache Kafka】  kafka  


在这里插入图片描述


概述

【依赖】

	<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
	</dependency>

【配置】

#kafka
spring.kafka.bootstrap-servers=10.11.114.247:9092
spring.kafka.producer.acks=1
spring.kafka.producer.retries=3
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer


spring.kafka.consumer.group-id=zfprocessor_group
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=com.artisan.common.entity.messages
spring.kafka.consumer.max-poll-records=500
spring.kafka.consumer.fetch-min-size=10
spring.kafka.consumer.fetch-max-wait=10000ms

spring.kafka.listener.missing-topics-fatal=false
spring.kafka.listener.type=batch
spring.kafka.listener.ack-mode=manual


logging.level.org.springframework.kafka=ERROR
logging.level.org.apache.kafka=ERROR

Spring-kafka生产者源码流程

ListenableFuture<SendResult<Object, Object>> result = kafkaTemplate.send(TOPICA.TOPIC, messageMock);

主要的源码流程如下

在这里插入图片描述


Spring-kafka消费者源码流程(@EnableKafka@KafkaListener

消费的话,比较复杂

 @KafkaListener(topics = TOPICA.TOPIC ,groupId = CONSUMER_GROUP_PREFIX + TOPICA.TOPIC)
    public void onMessage(MessageMock messageMock){
    
        logger.info("【接受到消息][线程:{} 消息内容:{}]", Thread.currentThread().getName(), messageMock);
    }

划重点,主要关注

在这里插入图片描述


Flow

在这里插入图片描述

作为引子,我们继续来梳理下源码

在这里插入图片描述

继续

在这里插入图片描述
继续

在这里插入图片描述
KafkaBootstrapConfiguration的主要功能是创建两个bean

KafkaListenerAnnotationBeanPostProcessor

实现了如下接口

implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton

主要功能就是监听@KafkaListener注解 。 bean的后置处理器 需要重写 postProcessAfterInitialization

@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
    
		   // 获取对应的class
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			// 查找类是否有@KafkaListener注解
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
			// 查找类中方法上是否有对应的@KafkaListener注解,
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
    
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
    
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
			if (annotatedMethods.isEmpty()) {
    
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
    
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
    
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
    
						// 处理@KafkaListener注解   重点看 
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
			if (hasClassLevelListeners) {
    
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}

重点方法

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
    
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}

继续 processListener

protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {
    

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
    
			this.listenerScope.addListener(beanRef, bean);
		}
		// 构建 endpoint
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		endpoint.setTopics(resolveTopics(kafkaListener));
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
    
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
    
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
    
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
    
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}
		resolveKafkaProperties(endpoint, kafkaListener.properties());
		endpoint.setSplitIterables(kafkaListener.splitIterables());

		KafkaListenerContainerFactory<?> factory = null;
		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
		if (StringUtils.hasText(containerFactoryBeanName)) {
    
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			try {
    
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
    
				throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
						+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
						+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
			}
		}

		endpoint.setBeanFactory(this.beanFactory);
		String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
		if (StringUtils.hasText(errorHandlerBeanName)) {
    
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
		}
		// 将endpoint注册到registrar
		this.registrar.registerEndpoint(endpoint, factory);
		if (StringUtils.hasText(beanRef)) {
    
			this.listenerScope.removeListener(beanRef);
		}
	}

继续看 registerEndpoint

	public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    
		Assert.notNull(endpoint, "Endpoint must be set");
		Assert.hasText(endpoint.getId(), "Endpoint id must be set");
		// Factory may be null, we defer the resolution right before actually creating the container
		// 把endpoint封装为KafkaListenerEndpointDescriptor
		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
    
			if (this.startImmediately) {
     // Register and start immediately
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
    
			   // 将descriptor添加到endpointDescriptors
				this.endpointDescriptors.add(descriptor);
			}
		}
	}

总的来看: 得到一个含有KafkaListener基本信息的Endpoint,将Endpoint被封装到KafkaListenerEndpointDescriptor,KafkaListenerEndpointDescriptor被添加到KafkaListenerEndpointRegistrar.endpointDescriptors中,至此这部分的流程结束了,感觉没有下文呀。

在这里插入图片描述


KafkaListenerEndpointRegistrar.endpointDescriptors 这个List中的数据怎么用呢?

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
    }

KafkaListenerEndpointRegistrar 实现了 InitializingBean 接口,重写 afterPropertiesSet,该方法会在bean实例化完成后执行

	@Override
	public void afterPropertiesSet() {
    
		registerAllEndpoints();
	}

继续 registerAllEndpoints();

	protected void registerAllEndpoints() {
    
		synchronized (this.endpointDescriptors) {
    
		// 遍历KafkaListenerEndpointDescriptor 
			for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
    
			   	// 注册 
				this.endpointRegistry.registerListenerContainer(
						descriptor.endpoint, resolveContainerFactory(descriptor));
			}
			this.startImmediately = true;  // trigger immediate startup
		}
	}

继续

	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
    
		registerListenerContainer(endpoint, factory, false);
	}

go

public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {
    

		Assert.notNull(endpoint, "Endpoint must not be null");
		Assert.notNull(factory, "Factory must not be null");

		String id = endpoint.getId();
		Assert.hasText(id, "Endpoint id must not be empty");
		synchronized (this.listenerContainers) {
    
			Assert.state(!this.listenerContainers.containsKey(id),
					"Another endpoint is already registered with id '" + id + "'");   
	      // 创建Endpoint对应的MessageListenerContainer,将创建好的MessageListenerContainer放入listenerContainers
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			// 如果KafkaListener注解中有对应的group信息,则将container添加到对应的group中
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
    
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
    
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
    
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
			if (startImmediately) {
    
				startIfNecessary(container);
			}
		}
	}

在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/yangshangwei/article/details/115359111

智能推荐

小程序video问题_小程序video show-play-btn-程序员宅基地

文章浏览阅读2.2k次。fullscreenchange: function (e){ console.log('fullscreenchange退出全屏',e); let that = this; console.log(that.video); let shipin = that.data.shipin; let play = that.data.play; if (shipi..._小程序video show-play-btn

mac终端新建标签/窗口ssh重复输入密码问题_为啥xshell每次新建窗口都要重新输入密码-程序员宅基地

文章浏览阅读1.3w次,点赞2次,收藏4次。mac的终端默认在打开一个新的tab/window的时候需要重新输入ssh的密码, 很不方便。本文完成在mac中设置,实现secureCRT/xshell里的克隆会话功能, 即新开一个terminal进行ssh连接无需重新输入密码。原理很简单,开一个ssh连接在后台放着,以后再有需要用到ssh到同样主机的时候,直接使用这个连接的socket文件,不用再创建连接了,同理,也不需要再进行用户身份验证。默_为啥xshell每次新建窗口都要重新输入密码

【Learing OpenCV】win7+vs2017+OpenCV4.0.1环境配置_opencv4 win7配置-程序员宅基地

文章浏览阅读3.4k次,点赞3次,收藏17次。这里写自定义目录标题欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入欢迎使用Ma..._opencv4 win7配置

【C语言】将一个3X2的矩阵(3行2列)的矩阵存入一个3X2的二维数组中,并输出矩阵。同时,找出矩阵中的最大值以及最大值所在的行下标和列下标,输出最大值所在的行下标和列下标及最大值。_将一个3x2的矩阵(3行2列)的矩阵存入一个3x2的二维数组中,并输出矩阵。同时,找出矩-程序员宅基地

文章浏览阅读4.6k次,点赞3次,收藏9次。【问题描述】 将一个3X2的矩阵(3行2列)的矩阵存入一个3X2的二维数组中,并输出矩阵。同时,找出矩阵中的最大值以及最大值所在的行下标和列下标,输出最大值所在的行下标和列下标及最大值。【输入输出样例】【样例说明】输入提示符中冒号为英文符号,后面无空格,需换行。输出矩阵时整数按照%4d格式输出。最后输出结束不换行。..._将一个3x2的矩阵(3行2列)的矩阵存入一个3x2的二维数组中,并输出矩阵。同时,找出矩

使用 React Testing Library 和 Jest 完成单元测试-程序员宅基地

文章浏览阅读1.2k次。引言在2020的今天,构建一个 web 应用对于我们来说,并非什么难事。因为有很多足够多优秀的的前端框架(比如 React,Vue 和 Angular);以及一些易用且强大的UI库(比如 Ant Design)为我们保驾护航,极大地缩短了应用构建的周期。但是,互联网时代也急剧地改变了许多软件设计,开发和发布的方式。开发者面临的问题是,需求越来越多,应用越来越复杂,时不时会有一种失控的的感觉,并在心...

利用css 动画实现节流_css动画forwards-程序员宅基地

文章浏览阅读2.7w次。css节流_css动画forwards

随便推点

drtek收音机使用说明_根德收音机使用说明书-程序员宅基地

文章浏览阅读627次。根德E5中波、调频、短波单边带收音机用户手册如果您需要任何帮助,请联系我们:邮政地址:EtónCorporation,1015CorporationWay,PaloAlto,CA94303,USA.客服电话号码:1-800-872-2228*(美国);1-800-637-1648(加拿大);650-903-3866(全球客服);星期一到星期五,早八点半到下午四点,太平洋标准时间:...

【JY】ABAQUS混凝土CDP插件分享-程序员宅基地

文章浏览阅读2.1k次,点赞3次,收藏23次。导读:为简便钢筋混凝土构件或者结构的本构模型设置,本期给大家推荐一款Abaqus混凝土CDP模型插件,供大家应用参考。这个插件无需繁琐的Excel操作,仅需选择混凝土等级即可在Abaqus..._abaqus混凝土cdp模型参数

python二进制转字符串_Python 二进制串转字符串例子-程序员宅基地

文章浏览阅读421次。0x01 问题我现在有一串 0和1组成的字符串,就像这样的110011011011001100001110011111110111010111011000010101110101010110011011101011101110110111011110011111101我把它叫做二进制串,我怎么能把它转成我能看懂的字符串呢?0x02 思路一个ascii码是8位,但是一般都用7位来表示,所以我可以把字..._python 0x01转为字符串

手把手教你写!Android平台HTTPS抓包解决方案及问题分析,年薪50W_android 怎么写一个抓包-程序员宅基地

文章浏览阅读165次。前言最近经常被朋友问到的两个问题。问题一: “从事IT工作3年了,做技术好累啊,是不是做到30岁就不能继续往下做啊?”问题二: “我已经30岁了,还能不能学编程?”我给出的答案是:只要你兴趣还在,可以一直做,什么时候都不会晚;种一棵树最好的时间是十年前,其次是现在。本人目前在 IT 行业工作了 6 年,做过大大小小的项目,虽然跟网上那些大牛比还差很远。但也确实经历过同样困惑,也迷茫过,踩过很多坑,今后也会继续踩。所以关于这一类问题,也有心得体验,在此算是做个人分享吧。学会深入思考,总结沉淀我想_android 怎么写一个抓包

滴滴夜莺:从监控告警系统向运维平台演化-程序员宅基地

文章浏览阅读5.1k次,点赞7次,收藏68次。简述滴滴夜莺(Nightingale)是一款经过大规模生产环境验证的、分布式高性能的运维监控系统。基于Open-Falcon,结合滴滴内部的最佳实践,在性能、可维护性、易用性方面做了大量的改进,支撑了滴滴内部数十亿监控指标,覆盖了从系统、容器、到应用等各层面的监控需求。夜莺于2020年3月底开源至今,GitHub Star已突破2000,并且于9月底发布了最新的3.0版本。本次更新夜莺被拆成了四个子系统,分别是:用户资源中心(RDB)平台底座,所有的运维系统,都需要依赖这个,内置用户、权限、角色、_滴滴夜莺

linux socket bind 内核详解,Linux内核Socket实现之------Socket绑定bind(3)-程序员宅基地

文章浏览阅读793次。socket系列文章都是承接第一篇socket创建,因此这里的编号和内核版本都继承了第一篇文章。2. SYSCALL_DEFINE3函数Bind系统调用通过SYSCALL_DEFINE3调用各个协议不同的bind函数,SYSCALL_DEFINE3(bind,int, fd, struct sockaddr __user *, umyaddr, int, addrlen){structsocke..._sock->ops->bind

推荐文章

热门文章

相关标签