了解kafka的都知道。kafka是通过Zookeeper实现分布操作的。不管是broker,consumer,还是provide信息都是存储在Zookeeper中的。当broker挂掉都是Zookeeper来进行重新分配选择的。所以实现kafka集群前我们得先实现Zookeeper的集群配置。
首先我们从官网上下载Zookeeper到本地。我这里下载的是Zookeeper-3.4.6.tar.gz版本的。读者可以根据自己情况下载。下载好之后进行文件解压。然后找到conf文件中的zoo_sample.cfg文件。该文件是Zookeeper官网给我们提供的一套样板。我们赋值该文件到同级下并改名为zoo.cfg.如下图
然后我们来看看这个配置文件里面都有些啥
tickTime=2000
initLimit=10
syncLimit=5
dataDir
dataLogDir
clientPort=2181
server.1=192.168.1.130:28881:38881 server.2=192.168.1.130:28882:38882 server.3=192.168.1.130:28883:38883
知道配置文件里的意思应该就知道如何修改了吧
dataDir+dataLogDir+clientPort
上面的格式我们可以简单的总结为 server.num=B:C:D。 num:是正整数代表的服务的唯一标识。这个要和后面说道的myid文件保持一致。
B: 标识Zookeeper集群中某一个服务的ip或者域名 192.168.1.130
C:表示server.num这个服务于集群中leader进行信息交流的端口。在kafka中我们leader和follower需要进行数据备份。具体服务就是通过这个地方制定的端口进行通信的。
D:表示万一leader宕机了,我们就通过这个端口来进行再follower中选举新的leader。
网上的很多教程也就介绍到这里。稍微好点就提了一下创建myid文件的事,我当时就纠结在这里。因为我根本不知道穿件的myid的类型。我就随便创建txt文件。结果是错的。这里我们创建myid我有两种方式。还有myid里面的内容就是我们对应的配置文件中server.num中的num。
第一种就是我们通过cmd窗口到我们要创建myid的文件夹下
执行如下命令
echo 1 > myid
第二种是我们先创建TXT文件将对应的内容写入。然后txt后缀删掉就可以了。
顺便提一下myid应该放在我们conf/zoo.cfg文件中指定的dataDir 的对应的文件路径下。
所谓的集群就是讲上面的Zookeeper复制成多个,将上面提到的几个重要的属性更改掉就行了。
如果你到这一步说明你离成功已经不远了。下面我们只需要开启服务就行了。开启服务在我们解压的bin目录下。
上面我们配置的Zookeeper在开启第一个时候回报错。为什么呢。原因就是我们开启了一个服务,。但是我们的配置文件配置的是集群的信息。这个时候就回去寻找其他服务。但是这个时候其他的服务还没有开启呢。所以这个错误是正常。等我们集群中的所有的服务都开启了就不会报错。这里大家不要被吓到。
除此之外,还有一点就是Zookeeper的安装目录(解压目录)是绝对不能包含汉字的。我上面的截图有汉字那是我计算机上设置的。实际的路径是没有汉字的。不要被上面的图片诱导。
当所有的服务都开启了,我们如何查看我们的服务是否开启成功呢。这很简单。我们重新打开一个新的cmd窗口。直接执行jps就可以看到我们的服务了。QuorumPeerMain就是我们的服务主类
上面我们就完成了Zookeeper的集群的配置。实际上Kafka中就自带有Zookeeper的服务。但是为了数据的高可用性。我们最好选择自己搭建Zookeeper集群。这也是官网上的建议。
这里我的Kafka版本选择的是0.8.1.1。建议单价不要选择太高的版本。刚出的版本可能有未知的bug。
同样这里的集群就是讲Kafka复制多个。这里我选择其中一个进行讲解。其他的都是一样的主要就是讲端口改掉就行了。
将官网下载的Kafka解压改名为kafka1(其他的改名数字递增就行。或者自定义别的名字)。找到config/server.properties文件。
同样的先来了解里面的参数含义吧
broker.id=1
port=9091
host.name=192.168.1.130
advertised.host.name=192.168.1.130
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=\logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
达到要求 都会删除该文件 在创建topic时可以重新制定。若没有.则选取该默认值
log.retention.check.interval.ms=60000
log.cleaner.enable=false
zookeeper.connect=192.168.1.130:num1,192.168.1.130:num2,192.168.1.130:num3
zookeeper.connection.timeout.ms=1000000
broker.id + port
set ivyPath=%USERPROFILE%\.ivy2\cache set snappy=%ivyPath%/org.xerial.snappy/snappy-java/bundles/snappy-java-1.0.5.jar call :concat %snappy% set library=%ivyPath%/org.scala-lang/scala-library/jars/scala-library-2.8.0.jar call :concat %library% set compiler=%ivyPath%/org.scala-lang/scala-compiler/jars/scala-compiler-2.8.0.jar call :concat %compiler% set log4j=%ivyPath%/log4j/log4j/jars/log4j-1.2.15.jar call :concat %log4j% set slf=%ivyPath%/org.slf4j/slf4j-api/jars/slf4j-api-1.6.4.jar call :concat %slf% set zookeeper=%ivyPath%/org.apache.zookeeper/zookeeper/jars/zookeeper-3.3.4.jar call :concat %zookeeper% set jopt=%ivyPath%/net.sf.jopt-simple/jopt-simple/jars/jopt-simple-3.2.jar call :concat %jopt% for %%i in (%BASE_DIR%\core\target\scala-2.8.0\*.jar) do ( call :concat %%i ) for %%i in (%BASE_DIR%\core\lib\*.jar) do ( call :concat %%i ) for %%i in (%BASE_DIR%\perf\target\scala-2.8.0/kafka*.jar) do ( call :concat %%i )
for %%i in (%BASE_DIR%\libs\*.jar) do ( call :concat %%i )
首先第一行提示 set JMX_PORT to default value 9999 这个错误是因为我没有设置这个值。这倒是小事。但是后面报说找不到或无法加载主类kafka.Kafka这就让我费解。在这里我也是卡了一天了。后来在网上找到了一个方法。我不知道这是不是Kafka的bug。反正用这个方法我是解决了这个错误了。
解决办法就是将kafka-run-class.bat文件中
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp %CLASSPATH% %*
set COMMAND= %JAVA% %KAFKA_OPTS% %KAFKA_JMX_OPTS% -cp "%CLASSPATH%" %*
到这一步我们离kafka的成功又不远了。我们新开cmd窗口cd到kafka的bin目录中。
但是在执行开启之前我们需要先执行
Set JMX_PORT=19091(每个服务数字不能一样)
kafka-server-start.bat ..\config\server.properties
kafka-run-class.bat kafka.admin.TopicCommand %*
创建Topic
kafka-topics.bat --create --zookeeper 192.168.1.130:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic
查看Topic
kafka-topics.bat --describe --zookeeper 192.168.1.130:2181 --topic my-replicated-topic
生产topic消息
kafka-console-producer.bat --broker-list 192.168.1.130:9093 --topic my-replicated-topic
消费topic消息
kafka-console-consumer.bat --zookeeper 192.168.1.130:2181 --from-beginning --topic my-replicated-topic
秃头哥给大家分享一篇一线开发大牛整理的java高并发核心编程神仙文档,里面主要包含的知识点有:多线程、线程池、内置锁、JMM、CAS、JUC、高并发设计模式、Java异步回调、CompletableFuture类等。
文档地址:一篇神文就把java多线程,锁,JMM,JUC和高并发设计模式讲明白了
码字不易,如果觉得本篇文章对你有用的话,请给我一键三连!关注作者,后续会有更多的干货分享,请持续关注!