Kafka消费的几种方式--low-level SimpleConsumer-程序员宅基地

技术标签: python  java  大数据  

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

@Service
public class KafkaSimpleConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);

    private List<String> m_replicaBrokers = new ArrayList<String>();
    private List<Integer> m_replicaPorts = new ArrayList<Integer>();
    
    private  ExecutorService executor=null;


    @PostConstruct
    public  void start() {        
        // Topic to read from
        String topic = "page_visits";
        // One broker to use for Metadata lookup
        List<String> seeds = new ArrayList<String>();
        seeds.add("192.168.137.176");
        // Port the brokers listen on
        List<Integer> ports = new ArrayList<Integer>();
        ports.add(9092);
        
        
    
        try {
            int partitions = getPartitions(seeds,ports,topic);
            executor = Executors.newFixedThreadPool(partitions);            
            
            for (int part=0;part>partitions;part++){
                executor.submit(new SimpleKafkaConsumerProcesser(this,topic,part,seeds, ports));
            }
        } catch (Exception e) {
            logger.error("Oops:{}", e);
            e.printStackTrace();
        }
    }
    
    @PreDestroy
    public void close(){
        try {
            if (executor != null) {
                executor.shutdown();
                executor.awaitTermination(5, TimeUnit.SECONDS);
                logger.info("shutdown KafkaSimpleConsumer successfully");                
                executor=null;
            }
        } catch (Exception e) {
            logger.warn("shutdown KafkaSimpleConsumer failed", e);
        }
    }
    
     public  String getString(ByteBuffer buffer)  
        {  
            Charset charset = null;  
            CharsetDecoder decoder = null;  
            CharBuffer charBuffer = null;  
            try  
            {  
                charset = Charset.forName("UTF-8");  
                decoder = charset.newDecoder();  
                // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空  
                charBuffer = decoder.decode(buffer.asReadOnlyBuffer());  
                return charBuffer.toString();  
            }  
            catch (Exception ex)  
            {  
                ex.printStackTrace();  
                return "";  
            }  
        }  

    public void run(String a_topic, int a_partition,
            List<String> a_seedBrokers, List<Integer> a_ports) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_ports,    a_topic, a_partition);
        if (metadata == null) {
            logger.error("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            logger.error("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        int a_port = metadata.leader().port();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
                100000, 64 * 1024, clientName);
        // kafka.api.OffsetRequest.EarliestTime() finds the beginning of the
        // data in the logs and starts streaming from there
        long readOffset = getLastOffset(consumer, a_topic, a_partition,
                kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;
        boolean isRunning=true;
        while (isRunning) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName);
            }
            // Note: this fetchSize of 100000 might need to be increased if
            // large batches are written to Kafka
            FetchRequest req = new FetchRequestBuilder().clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000).build();

            FetchResponse fetchResponse = consumer.fetch(req);

            // Identify and recover from leader changes
            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                logger.error("Error fetching data from the Broker:{}  Reason: ",leadBroker, code);
                if (numErrors > 5)
                    break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition,
                            kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                // 查找新的leader
                metadata = findNewLeader(leadBroker, a_topic, a_partition,    a_port);
                leadBroker = metadata.leader().host();
                a_port = metadata.leader().port();
                continue;
            }
            numErrors = 0;

            // Fetch the data
            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {

                long currentOffset = messageAndOffset.offset();
                // This is needed since if Kafka is compressing the
                // messages,
                // the fetch request will return an entire compressed block
                // even if the requested offset isn't the beginning of the
                // compressed block.
                if (currentOffset < readOffset) {
                    logger.error("Found an old offset:{}  Expecting: ",    currentOffset, readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                logger.error("{}: {}",    String.valueOf(messageAndOffset.offset()), new String(bytes, "UTF-8"));
                numRead++;
                
                consumer.commitOffsets(request)

            }

            // If we didn't read anything on the last request we go to sleep for
            // a second so we aren't hammering Kafka when there is no data.
            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    logger.error("InterruptedException:{}",ie);                    
                    if (consumer != null)
                        consumer.close();
                }
            }
        }
        
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic,
            int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic,    partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            logger.error("Error fetching data Offset Data the Broker. Reason:{}",
                    response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    private PartitionMetadata findNewLeader(String a_oldLeader, String a_topic,
            int a_partition, int a_oldLeader_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers,m_replicaPorts, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
                    && a_oldLeader_port == metadata.leader().port() && i == 0) {
                // first time through if the leader hasn't changed, give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata;
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    logger.error("findLeader,topic={},partition={},{}",    a_topic, a_partition, ie);
                }
            }
        }
        logger.error("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private int getPartitions(List<String> a_seedBrokers,List<Integer> a_port, String a_topic) {
        int count=0;
        loop: for (int i = 0; i < a_seedBrokers.size(); i++) {
            String seed = a_seedBrokers.get(i);
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port.get(i), 100000,    4 * 1024, "getPartitions");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                     count=item.partitionsMetadata().size();
                     break loop;
                }
            } catch (Exception e) {
                logger.error("getPartitions{},{},{}",    seed, a_topic, e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        return count;
    }
    private PartitionMetadata findLeader(List<String> a_seedBrokers,
            List<Integer> a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop: for (int i = 0; i < a_seedBrokers.size(); i++) {
            String seed = a_seedBrokers.get(i);
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port.get(i), 100000,
                        64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("findLeader,seed={},topic={},partition={},{}",
                        seed, a_topic, a_partition, e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            m_replicaPorts.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
                m_replicaPorts.add(replica.port());
            }
        }
        return returnMetaData;
    }
}

 class SimpleKafkaConsumerProcesser implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);
        
     KafkaSimpleConsumer consumer;     
     String a_topic;
     int a_partition;
     List<String> a_seedBrokers;
     List<Integer> a_ports;
    
    public SimpleKafkaConsumerProcesser(KafkaSimpleConsumer consumer, String a_topic, int a_partition,
            List<String> a_seedBrokers, List<Integer> a_ports) {
        this.consumer=consumer;
        this.a_topic=a_topic;
        this.a_partition=a_partition;
        this.a_seedBrokers=a_seedBrokers;
        this.a_ports=a_ports;
    }

    @Override
    public void run() {
        for(;;){
            try {
                consumer.run(a_topic, a_partition, a_seedBrokers, a_ports);
            } catch (Exception e) {
                logger.error("SimpleKafkaConsumerProcesser Oops:{}", e);
                e.printStackTrace();
            } 
        }
    }

}

 

转载于:https://my.oschina.net/u/778683/blog/1828564

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_33752045/article/details/92014615

智能推荐

c# 调用c++ lib静态库_c#调用lib-程序员宅基地

文章浏览阅读2w次,点赞7次,收藏51次。四个步骤1.创建C++ Win32项目动态库dll 2.在Win32项目动态库中添加 外部依赖项 lib头文件和lib库3.导出C接口4.c#调用c++动态库开始你的表演...①创建一个空白的解决方案,在解决方案中添加 Visual C++ , Win32 项目空白解决方案的创建:添加Visual C++ , Win32 项目这......_c#调用lib

deepin/ubuntu安装苹方字体-程序员宅基地

文章浏览阅读4.6k次。苹方字体是苹果系统上的黑体,挺好看的。注重颜值的网站都会使用,例如知乎:font-family: -apple-system, BlinkMacSystemFont, Helvetica Neue, PingFang SC, Microsoft YaHei, Source Han Sans SC, Noto Sans CJK SC, W..._ubuntu pingfang

html表单常见操作汇总_html表单的处理程序有那些-程序员宅基地

文章浏览阅读159次。表单表单概述表单标签表单域按钮控件demo表单标签表单标签基本语法结构<form action="处理数据程序的url地址“ method=”get|post“ name="表单名称”></form><!--action,当提交表单时,向何处发送表单中的数据,地址可以是相对地址也可以是绝对地址--><!--method将表单中的数据传送给服务器处理,get方式直接显示在url地址中,数据可以被缓存,且长度有限制;而post方式数据隐藏传输,_html表单的处理程序有那些

PHP设置谷歌验证器(Google Authenticator)实现操作二步验证_php otp 验证器-程序员宅基地

文章浏览阅读1.2k次。使用说明:开启Google的登陆二步验证(即Google Authenticator服务)后用户登陆时需要输入额外由手机客户端生成的一次性密码。实现Google Authenticator功能需要服务器端和客户端的支持。服务器端负责密钥的生成、验证一次性密码是否正确。客户端记录密钥后生成一次性密码。下载谷歌验证类库文件放到项目合适位置(我这边放在项目Vender下面)https://github.com/PHPGangsta/GoogleAuthenticatorPHP代码示例://引入谷_php otp 验证器

【Python】matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距-程序员宅基地

文章浏览阅读4.3k次,点赞5次,收藏11次。matplotlib.plot画图横坐标混乱及间隔处理_matplotlib更改横轴间距

docker — 容器存储_docker 保存容器-程序员宅基地

文章浏览阅读2.2k次。①Storage driver 处理各镜像层及容器层的处理细节,实现了多层数据的堆叠,为用户 提供了多层数据合并后的统一视图②所有 Storage driver 都使用可堆叠图像层和写时复制(CoW)策略③docker info 命令可查看当系统上的 storage driver主要用于测试目的,不建议用于生成环境。_docker 保存容器

随便推点

网络拓扑结构_网络拓扑csdn-程序员宅基地

文章浏览阅读834次,点赞27次,收藏13次。网络拓扑结构是指计算机网络中各组件(如计算机、服务器、打印机、路由器、交换机等设备)及其连接线路在物理布局或逻辑构型上的排列形式。这种布局不仅描述了设备间的实际物理连接方式,也决定了数据在网络中流动的路径和方式。不同的网络拓扑结构影响着网络的性能、可靠性、可扩展性及管理维护的难易程度。_网络拓扑csdn

JS重写Date函数,兼容IOS系统_date.prototype 将所有 ios-程序员宅基地

文章浏览阅读1.8k次,点赞5次,收藏8次。IOS系统Date的坑要创建一个指定时间的new Date对象时,通常的做法是:new Date("2020-09-21 11:11:00")这行代码在 PC 端和安卓端都是正常的,而在 iOS 端则会提示 Invalid Date 无效日期。在IOS年月日中间的横岗许换成斜杠,也就是new Date("2020/09/21 11:11:00")通常为了兼容IOS的这个坑,需要做一些额外的特殊处理,笔者在开发的时候经常会忘了兼容IOS系统。所以就想试着重写Date函数,一劳永逸,避免每次ne_date.prototype 将所有 ios

如何将EXCEL表导入plsql数据库中-程序员宅基地

文章浏览阅读5.3k次。方法一:用PLSQL Developer工具。 1 在PLSQL Developer的sql window里输入select * from test for update; 2 按F8执行 3 打开锁, 再按一下加号. 鼠标点到第一列的列头,使全列成选中状态,然后粘贴,最后commit提交即可。(前提..._excel导入pl/sql

Git常用命令速查手册-程序员宅基地

文章浏览阅读83次。Git常用命令速查手册1、初始化仓库git init2、将文件添加到仓库git add 文件名 # 将工作区的某个文件添加到暂存区 git add -u # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,不处理untracked的文件git add -A # 添加所有被tracked文件中被修改或删除的文件信息到暂存区,包括untracked的文件...

分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120-程序员宅基地

文章浏览阅读202次。分享119个ASP.NET源码总有一个是你想要的_千博二手车源码v2023 build 1120

【C++缺省函数】 空类默认产生的6个类成员函数_空类默认产生哪些类成员函数-程序员宅基地

文章浏览阅读1.8k次。版权声明:转载请注明出处 http://blog.csdn.net/irean_lau。目录(?)[+]1、缺省构造函数。2、缺省拷贝构造函数。3、 缺省析构函数。4、缺省赋值运算符。5、缺省取址运算符。6、 缺省取址运算符 const。[cpp] view plain copy_空类默认产生哪些类成员函数

推荐文章

热门文章

相关标签