技术标签: kafka
最近学校project项目需要一个消息系统,所以尝试搭了一个集群版kafka,过程中踩了非常多坑。。。这里和大家分享一下搭建过程以及踩过的坑。
kafka使用zookeeper保存元数据,kafka自带了zookeeper,不过为了后续方便管理,我还是自己搭建了一个zookeeper集群。
首先将下载好的zookeeper解压
tar -zxf zookeeper-3.4.14.tar.gz
之后将zookeeper自带的配置文件复制一份
进入zookeeper的/config目录,然后执行复制命令
cp zoo_sample.cfg zoo.cfg
这时可以看到已经有zoo.cfg这个文件
root@VM-0-17-ubuntu:/usr/zookeeper/zookeeper-3.4.14/conf# ls
configuration.xsl log4j.properties zoo.cfg zoo_sample.cfg
打开它,修改配置
vim zoo.cfg
我是用两台服务器搭建集群,这是server2的配置,切记这里server.2一定要写成0.0.0.0::的形式不然server1是连接不上这个节点的,同理,第一台的配置中server.1也要写成0.0.0.0::。
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/usr/zookeeper/data
# the port at which the clients will connect
clientPort=2181
server.1=server1:2888:3888 #server1为第一台服务器的ip
server.2=0.0.0.0:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
maxClientCnxns=60
#
写好配置文件后:wq保存退出,然后进入dataDir指定的目录,这里我是/usr/zookeeper/data。然后创建一个名为myid的文件,里面只有一个数字,当前服务器是第几个server就写几,例如,我当前的服务器是server.2,就写2
echo "2" > myid
配置好了这些以后就可以启动zookeeper了,进入zookeeper的/bin目录,修改一下环境配置
vim zkEnv.sh
将ZOO_LOG_DIR写成你生成日志的目录
if [ "x${ZOO_LOG_DIR}" = "x" ]
then
ZOO_LOG_DIR="/usr/zookeeper/log"
fi
之后启动zookeeper,在/bin目录下执行
./zkServer.sh start
然后执行
./zkServer.sh status
查看zookeeper节点状况
root@VM-0-17-ubuntu:/usr/zookeeper/zookeeper-3.4.14/bin# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/zookeeper/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: follower
可以看到启动成功,当前节点为follower节点。按之前的步骤,修改一下必要配置后启动server1。
至此,zookeeper集群搭建完毕,之后开始搭建kafka集群。
首先解压kafka,
tar -zxf kafka_2.11-2.1.1.tgz
进入kafka的/config目录,打开server.properties文件
vim server.properties
修改为如下配置,这里是server1的配置,注意borker.id每一个节点必须为唯一值,同时zookeeper.connect里填写你zookeeper集群的ip和端口号。advertised.listeners=PLAINTEXT://server.ip:9092这里server.ip为你当前节点的公网ip,以便于客户端连接
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
listeners = PLAINTEXT://0.0.0.0:9092
#server.ip为你当前服务器的公网ip,如果不填写公网ip将导致客户端无法连接kafka
advertised.listeners=PLAINTEXT://server.ip:9092
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/usr/kafka/log
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# server1为节点1的ip,server2为节点2的ip
zookeeper.connect=server1:2181,server2:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
配置好以后进入kafka的/bin目录,启动节点
./kafak-server-start.sh -daemon ../config/server.properties
查看是否启动
netstat -anp | grep 9092
按以上的步骤修改一下配置,逐一启动其他节点。所有节点启动完成后就可以测试集群了。进入/bin目录,创建topic,这里因为我的集群是两个节点,所以设置复制因子为2,分区为2
./kafka-topics.sh --create --replication-factor 2 --partitions 2 --topic cluster-test --zookeeper zookeeper.server:2181
之后查看topic状态
root@VM-0-13-ubuntu:/usr/kafka/kafka_2.11-2.1.1/bin# ./kafka-topics.sh --describe --topic cluster-test --zookeeper localhost:2181
Topic:cluster-test PartitionCount:2 ReplicationFactor:2 Configs:
Topic: cluster-test Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: cluster-test Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
可以看到该topic有两个分区,各自的leader分别是节点1和节点2,各个分区的从节点为节点2和节点1。
至此,kafka集群搭建完成。最后通过java客户端测试生产者消费者功能。
我使用的是springboot整合spring-kafka来测试,这里贴上生产者和消费者的关键代码
@Component
@Slf4j
public class KafkaSender {
private final KafkaTemplate kafkaTemplate;
@Autowired
public KafkaSender(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void send(Object data,String topic){
KafkaProducerMsg msg = new KafkaProducerMsg(data);
log.info("----------send message to topic :{}-------------",topic);
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, JSON.toJSONString(msg));
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable throwable) {
log.info("--------------Fail to send message---------------");
}
@Override
public void onSuccess(SendResult<String, String> stringStringSendResult) {
log.info("-----------------success in sending message:{}-----------------------"
,stringStringSendResult.toString());
}
});
}
}
通过spring-test模块发送消息
@Test
public void testSendMsgToKafka(){
for (int i = 0; i < 10 ; i++) {
Msg msg = new Msg();
msg.setMessage("test send msg to Kafka test1: "+i);
msg.setCode("0");
kafkaSender.send(msg,"cluster-test");
}
}
通过kafka客户端提供的回调方法将发送结果输出到日志,测试成功。
2019-09-25 15:42:48.820 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 0"},"id":1569397368180,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-0@5]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 2"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-0@6]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 4"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-0@7]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 6"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-0@8]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 8"},"id":1569397368608,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-0@9]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 1"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-1@5]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 3"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-1@6]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 5"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-1@7]-----------------------
2019-09-25 15:42:48.824 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 7"},"id":1569397368608,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-1@8]-----------------------
2019-09-25 15:42:48.828 INFO 11608 --- [ad | producer-1] c.c.ksl.heart.service.impl.KafkaSender : -----------------success in sending message:SendResult [producerRecord=ProducerRecord(topic=cluster-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value={
"data":{
"code":"0","message":"test send msg to Kafka cluster: 9"},"id":1569397368608,"sendDate":"2019-09-25 15:42:48"}, timestamp=null), recordMetadata=cluster-test-1@9]-----------------------
之后测试消费者,
@Component
@Slf4j
public class KafkaReceiver {
@KafkaListener(topics = {
"cluster-test"})
public void listen(@Payload String record,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId){
Optional<?> messageValue = Optional.ofNullable(record);
if(messageValue.isPresent()){
log.info("----------success in getting message " +
"from kafka topic:{},record:{},with partitionId:{}",topic,record,partitionId);
KafkaProducerMsg data = JSON.parseObject(record,KafkaProducerMsg.class);
System.out.println(data.toString());
}else{
log.info("---------Failure in getting message from kafka");
}
}
}
以上是消费者关键代码,它会持续监听kafka,有消息就会通过轮询方法拉下来。启动应用,查看测试结果
2019-09-25 15:45:47.074 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 1"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"},with partitionId:1
KafkaProducerMsg(id=1569397547170, data={
"code":"0","message":"test send msg to Kafka cluster: 1"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.174 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 3"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"},with partitionId:1
KafkaProducerMsg(id=1569397547174, data={
"code":"0","message":"test send msg to Kafka cluster: 3"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.174 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 5"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"},with partitionId:1
KafkaProducerMsg(id=1569397547174, data={
"code":"0","message":"test send msg to Kafka cluster: 5"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.174 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 7"},"id":1569397368608,"sendDate":"2019-09-25 15:42:48"},with partitionId:1
KafkaProducerMsg(id=1569397547174, data={
"code":"0","message":"test send msg to Kafka cluster: 7"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.174 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 9"},"id":1569397368608,"sendDate":"2019-09-25 15:42:48"},with partitionId:1
KafkaProducerMsg(id=1569397547174, data={
"code":"0","message":"test send msg to Kafka cluster: 9"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.178 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 0"},"id":1569397368180,"sendDate":"2019-09-25 15:42:48"},with partitionId:0
KafkaProducerMsg(id=1569397547178, data={
"code":"0","message":"test send msg to Kafka cluster: 0"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.178 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 2"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"},with partitionId:0
KafkaProducerMsg(id=1569397547178, data={
"code":"0","message":"test send msg to Kafka cluster: 2"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.178 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 4"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"},with partitionId:0
KafkaProducerMsg(id=1569397547178, data={
"code":"0","message":"test send msg to Kafka cluster: 4"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.178 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 6"},"id":1569397368604,"sendDate":"2019-09-25 15:42:48"},with partitionId:0
KafkaProducerMsg(id=1569397547178, data={
"code":"0","message":"test send msg to Kafka cluster: 6"}, sendDate=2019-09-25 15:45:47)
2019-09-25 15:45:47.178 INFO 2576 --- [ntainer#0-0-C-1] c.c.k.heart.service.impl.KafkaReceiver : ----------success in getting message from kafka topic:cluster-test,record:{
"data":{
"code":"0","message":"test send msg to Kafka cluster: 8"},"id":1569397368608,"sendDate":"2019-09-25 15:42:48"},with partitionId:0
KafkaProducerMsg(id=1569397547178, data={
"code":"0","message":"test send msg to Kafka cluster: 8"}, sendDate=2019-09-25 15:45:47)
可以看到消费者收到了之前生产者发送到消息系统的数据。
至此,kafka集群的测试结束。
如果你也刚开始学习kafka,希望这篇文章可以帮到你。
题目 1518: [蓝桥杯][算法提高VIP]寻找三位数 时间限制: 1Sec 内存限制: 128MB 提交: 1053 解决: 573 题目描述 将1,2,…,9共9个数分成三组,分别组成三个三位数,且使这三个三位数构成 1:2:3的比例,试求出所有满足条件的三个三位数。例如_二分查找 - 3个数和为0 时间限制: 1 sec 内存限制: 128 mb 提交: 516 解决: 388 [
首先,大体了解PX4IO 与PX4FMU各自的任务PX4IO(STM32F100)为PIXHAWK 中专用于处理输入输出的部分,输入为支持的各类遥控器(PPM,SPKT/DSM,SBUS), 输出为电调的PWM 驱动信号, 它与PX4FMU(STM32F427)通过串口进行通信PX4FMU :各种传感器数据读取、姿态解算、PWM控制量的计算、与PX4IO通信进入主题PX_px4模块代码启动
持续更新,敬请期待!Linux任督二脉之进程管理郭健: Linux进程调度技术的前世今生之“前世”郭...
注:非Vmware安装全篇都在root权限下运行1.修改主机名1.查看主机名:hostname2.修改主机名:hostnamectl set-hostname master或者修改:/etc/hostname3.将主机名写入/etc/hostsIP地址 主机名2.关闭防火墙1.查看防火墙状态:firewalld-cmd --state | system status firewalld2.关闭防火墙:system stop firewalld3.ssh主节点生成密钥:ssh._ dfs.replication 2
安装VMware Workstation 15声明Windows 1908 版本 支持15.3之后版本首先下载 windows的安装包VMware 官网https://www.vmware.com/选着专业版 选着windows 进行下载这里推荐使用 迅雷下载开始安装跟计算机硬件有关-------需要等待根据向导------下一步接受条款选择安装位置...
直接上步骤$ python -V$ sudo yum install epel-release$ yum install epel-release$ yum search python3$ yum install python3# 安装pip3curl -O https://bootstrap.pypa.io/get-pip.pypython3 get-pip.py_centos 7.2 安装pip3
安装jdk 随意选择目录 只需把默认安装目录 \java 之前的目录修改即可安装jre→更改→ \java 之前目录和安装 jdk 目录相同即可注:若无安装目录要求,可全默认设置。无需做任何修改,两次均直接点下一步。安装完JDK后配置环境变量 计算机→属性→高级系统设置→高级→环境变量系统变量→新建 JAVA_HOME 变量 。变量值填写jdk的安装目录(本人是 E:\Java\jdk1.7.0..._java安装 2个环境 如何默认
CInstantCameraParams_Params相机参数类,是CInstantCamera基类,以接口形式声明了很多成员变量。class CInstantCameraParams_Params { protected: CInstantCameraParams_Params(void); ~CInstantCamer...
1、生活中的异常2、异常程序中出现的错误被称为异常。异常可分为两大类:编译时异常和运行时异常。异常就是在程序的运行过程中所发生的不正常的事件,它会中断正在运行的程序。编译时异常一般是指语法错误,可以通过编译器的提示加以修正;运行时异常包括:运行错误:如数组下标越界,除数为0等;逻辑错误:如年龄超过200岁等。3、发生异常的原因多种多样,主要有:系统资源不可用:如内存分配失败,文件打开失败,数据源连..._控制台下的除数为0的异常程序首先建立一个空项目命名为exception
感觉在介绍之前有必要阐述一下连接池的几个概念,有助于后边一些文字的理解。最原始的数据库使用就是打开一个连接并进行使用,使用过后一定要关闭连接释放资源。由于频繁的打开和关闭连接对jvm包括数据库都有一定的资源负荷,尤其应用压力较大时资源占用比较多容易产生性能问题。由此使用连接池的作用就显现出来,他的原理其实不复杂:先打开一定数量的数据库连接,当使用的时候分配给调用者,调用完毕后返回给
关于RestfulToolkit的RestServices面板不显示接口的问题_restservices插件
一、Tomcat结合memc[root@server2 ~]# lftp 172.25.254.250lftp 172.25.254.250:/> cd pub/docs/lamp/lftp 172.25.254.250:/pub/docs/lamp> get apache-tomcat-7.0.37.tar.gz ##下载tomcat7814288 bytes transferred lftp 17_tomcat memcache