import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
@Service
public class KafkaSimpleConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);
private List<String> m_replicaBrokers = new ArrayList<String>();
private List<Integer> m_replicaPorts = new ArrayList<Integer>();
private ExecutorService executor=null;
@PostConstruct
public void start() {
// Topic to read from
String topic = "page_visits";
// One broker to use for Metadata lookup
List<String> seeds = new ArrayList<String>();
seeds.add("192.168.137.176");
// Port the brokers listen on
List<Integer> ports = new ArrayList<Integer>();
ports.add(9092);
try {
int partitions = getPartitions(seeds,ports,topic);
executor = Executors.newFixedThreadPool(partitions);
for (int part=0;part>partitions;part++){
executor.submit(new SimpleKafkaConsumerProcesser(this,topic,part,seeds, ports));
}
} catch (Exception e) {
logger.error("Oops:{}", e);
e.printStackTrace();
}
}
@PreDestroy
public void close(){
try {
if (executor != null) {
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
logger.info("shutdown KafkaSimpleConsumer successfully");
executor=null;
}
} catch (Exception e) {
logger.warn("shutdown KafkaSimpleConsumer failed", e);
}
}
public String getString(ByteBuffer buffer)
{
Charset charset = null;
CharsetDecoder decoder = null;
CharBuffer charBuffer = null;
try
{
charset = Charset.forName("UTF-8");
decoder = charset.newDecoder();
// charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
return charBuffer.toString();
}
catch (Exception ex)
{
ex.printStackTrace();
return "";
}
}
public void run(String a_topic, int a_partition,
List<String> a_seedBrokers, List<Integer> a_ports) throws Exception {
// find the meta data about the topic and partition we are interested in
//
PartitionMetadata metadata = findLeader(a_seedBrokers, a_ports, a_topic, a_partition);
if (metadata == null) {
logger.error("Can't find metadata for Topic and Partition. Exiting");
return;
}
if (metadata.leader() == null) {
logger.error("Can't find Leader for Topic and Partition. Exiting");
return;
}
String leadBroker = metadata.leader().host();
int a_port = metadata.leader().port();
String clientName = "Client_" + a_topic + "_" + a_partition;
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
100000, 64 * 1024, clientName);
// kafka.api.OffsetRequest.EarliestTime() finds the beginning of the
// data in the logs and starts streaming from there
long readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.EarliestTime(), clientName);
int numErrors = 0;
boolean isRunning=true;
while (isRunning) {
if (consumer == null) {
consumer = new SimpleConsumer(leadBroker, a_port, 100000,64 * 1024, clientName);
}
// Note: this fetchSize of 100000 might need to be increased if
// large batches are written to Kafka
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000).build();
FetchResponse fetchResponse = consumer.fetch(req);
// Identify and recover from leader changes
if (fetchResponse.hasError()) {
numErrors++;
// Something went wrong!
short code = fetchResponse.errorCode(a_topic, a_partition);
logger.error("Error fetching data from the Broker:{} Reason: ",leadBroker, code);
if (numErrors > 5)
break;
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
// We asked for an invalid offset. For simple case ask for
// the last element to reset
readOffset = getLastOffset(consumer, a_topic, a_partition,
kafka.api.OffsetRequest.LatestTime(), clientName);
continue;
}
consumer.close();
consumer = null;
// 查找新的leader
metadata = findNewLeader(leadBroker, a_topic, a_partition, a_port);
leadBroker = metadata.leader().host();
a_port = metadata.leader().port();
continue;
}
numErrors = 0;
// Fetch the data
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
// This is needed since if Kafka is compressing the
// messages,
// the fetch request will return an entire compressed block
// even if the requested offset isn't the beginning of the
// compressed block.
if (currentOffset < readOffset) {
logger.error("Found an old offset:{} Expecting: ", currentOffset, readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
logger.error("{}: {}", String.valueOf(messageAndOffset.offset()), new String(bytes, "UTF-8"));
numRead++;
consumer.commitOffsets(request)
}
// If we didn't read anything on the last request we go to sleep for
// a second so we aren't hammering Kafka when there is no data.
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
logger.error("InterruptedException:{}",ie);
if (consumer != null)
consumer.close();
}
}
}
}
public static long getLastOffset(SimpleConsumer consumer, String topic,
int partition, long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
logger.error("Error fetching data Offset Data the Broker. Reason:{}",
response.errorCode(topic, partition));
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
private PartitionMetadata findNewLeader(String a_oldLeader, String a_topic,
int a_partition, int a_oldLeader_port) throws Exception {
for (int i = 0; i < 3; i++) {
boolean goToSleep = false;
PartitionMetadata metadata = findLeader(m_replicaBrokers,m_replicaPorts, a_topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
&& a_oldLeader_port == metadata.leader().port() && i == 0) {
// first time through if the leader hasn't changed, give
// ZooKeeper a second to recover
// second time, assume the broker did recover before failover,
// or it was a non-Broker issue
//
goToSleep = true;
} else {
return metadata;
}
if (goToSleep) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
logger.error("findLeader,topic={},partition={},{}", a_topic, a_partition, ie);
}
}
}
logger.error("Unable to find new leader after Broker failure. Exiting");
throw new Exception("Unable to find new leader after Broker failure. Exiting");
}
private int getPartitions(List<String> a_seedBrokers,List<Integer> a_port, String a_topic) {
int count=0;
loop: for (int i = 0; i < a_seedBrokers.size(); i++) {
String seed = a_seedBrokers.get(i);
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port.get(i), 100000, 4 * 1024, "getPartitions");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
count=item.partitionsMetadata().size();
break loop;
}
} catch (Exception e) {
logger.error("getPartitions{},{},{}", seed, a_topic, e);
} finally {
if (consumer != null)
consumer.close();
}
}
return count;
}
private PartitionMetadata findLeader(List<String> a_seedBrokers,
List<Integer> a_port, String a_topic, int a_partition) {
PartitionMetadata returnMetaData = null;
loop: for (int i = 0; i < a_seedBrokers.size(); i++) {
String seed = a_seedBrokers.get(i);
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(seed, a_port.get(i), 100000,
64 * 1024, "leaderLookup");
List<String> topics = Collections.singletonList(a_topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
if (part.partitionId() == a_partition) {
returnMetaData = part;
break loop;
}
}
}
} catch (Exception e) {
logger.error("findLeader,seed={},topic={},partition={},{}",
seed, a_topic, a_partition, e);
} finally {
if (consumer != null)
consumer.close();
}
}
if (returnMetaData != null) {
m_replicaBrokers.clear();
m_replicaPorts.clear();
for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
m_replicaBrokers.add(replica.host());
m_replicaPorts.add(replica.port());
}
}
return returnMetaData;
}
}
class SimpleKafkaConsumerProcesser implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaSimpleConsumer.class);
KafkaSimpleConsumer consumer;
String a_topic;
int a_partition;
List<String> a_seedBrokers;
List<Integer> a_ports;
public SimpleKafkaConsumerProcesser(KafkaSimpleConsumer consumer, String a_topic, int a_partition,
List<String> a_seedBrokers, List<Integer> a_ports) {
this.consumer=consumer;
this.a_topic=a_topic;
this.a_partition=a_partition;
this.a_seedBrokers=a_seedBrokers;
this.a_ports=a_ports;
}
@Override
public void run() {
for(;;){
try {
consumer.run(a_topic, a_partition, a_seedBrokers, a_ports);
} catch (Exception e) {
logger.error("SimpleKafkaConsumerProcesser Oops:{}", e);
e.printStackTrace();
}
}
}
}