建站不啰嗦,上手跟我做(十八)zookeeper-Kafka 集群搭建

需要三台服务器部署 zookeeper 创建集群
192.168.1.103
192.168.1.104
192.168.1.107
第 1 和 2 步骤三个服务器完全相同
第 3 步骤针对不同 ip 服务器区别配置

第一、安装 zookeeper

1、解压 Zookeeper

[root@localhost Zookeeper]# tar -zxvf zookeeper-3.4.12.tar.gz

2、修改配置文件

[root@localhost Zookeeper]# ls
zookeeper-3.4.12  zookeeper-3.4.12.tar.gz
[root@localhost Zookeeper]# cd zookeeper-3.4.12/
[root@localhost zookeeper-3.4.12]# ls
bin         docs             NOTICE.txt            zookeeper-3.4.12.jar
build.xml   ivysettings.xml  README.md             zookeeper-3.4.12.jar.asc
conf        ivy.xml          README_packaging.txt  zookeeper-3.4.12.jar.md5
contrib     lib              recipes               zookeeper-3.4.12.jar.sha1
dist-maven  LICENSE.txt      src
[root@localhost zookeeper-3.4.12]# mkdir data
[root@localhost zookeeper-3.4.12]# ls
bin        dist-maven       LICENSE.txt           src
build.xml  docs             NOTICE.txt            zookeeper-3.4.12.jar
conf       ivysettings.xml  README.md             zookeeper-3.4.12.jar.asc
contrib    ivy.xml          README_packaging.txt  zookeeper-3.4.12.jar.md5
data       lib              recipes               zookeeper-3.4.12.jar.sha1
[root@localhost zookeeper-3.4.12]# cd conf
[root@localhost conf]# ls
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@localhost conf]# vi zoo.cfg
#集群包括三个节点192.168.1.103
#集群包括三个节点192.168.1.104
#集群包括三个节点192.168.1.107
#(1)为什么是三个,因为Zookeeper喜欢奇数不喜欢偶数。
#server.X=A:B:C
#X-代表服务器编号
#A-代表ip
#B和C-代表端口,这个端口用来系统之间通信

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/var/lib/zookeeper/
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

server.1=192.168.1.103:2888:3888
 
server.2=192.168.1.104:2888:3888
 
server.3=192.168.1.107:2888:3888

3、创建 dataDir 目录,并在该目录下创建 my

首先 修改 dataDir,顾名思义就是【数据目录】了,修改成我们自定义的即可。
vim myid
#之后会产生一个新文件,直接在里面写 X 即可
#比如配置的三个server,myid里面写的X就是server.X=ip:2888:3888 中ip所对应的X
server.1=192.168.1.103:2888:3888【192.168.192.128服务器上面的myid填写1】
server.2=192.168.1.104:2888:3888【192.168.192.129服务器上面的myid填写2】
server.3=192.168.1.107:2888:3888【192.168.192.130服务器上面的myid填写3】
[root@localhost conf]# mkdir -p /var/lib/zookeeper/
[root@localhost zookeeper]# vi myid
1

4、启动 zookeeper

[root@localhost zookeeper-3.4.12]# cd bin
[root@localhost bin]# ls
README.txt    zkCli.cmd  zkEnv.cmd  zkServer.cmd
zkCleanup.sh  zkCli.sh   zkEnv.sh   zkServer.sh
[root@localhost zookeeper-3.4.12]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/Zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@localhost zookeeper-3.4.12]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/Zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower

如果启动不成功,查看如下几点

1、防火墙是否开放端口 2888 和 3888 以及 2181

然后开启防火墙

[root@localhost zookeeper-3.4.11]# firewall-cmd --zone=public --add-port=2888/tcp --permanent
success
[root@localhost zookeeper-3.4.11]# firewall-cmd --zone=public --add-port=3888/tcp --permanent
success
[root@localhost zookeeper-3.4.11]# firewall-cmd --zone=public --add-port=2181/tcp --permanent
success
[root@localhost zookeeper-3.4.11]# systemctl restart firewalld

2、是否有程序占用端口 2181

netstat -nltp | grep 2181
存在占用
kill -9 pid杀掉进程后重新启动
zoo.cfg 配置文件配置项解析:

#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 52000=10 秒
#syncLimit:
这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 5
2000=10 秒
#dataDir:
快照日志的存储路径
#dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到 dataDir 制定的目录,这样会严重影响 zk 的性能,当 zk 吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点

清理日志
#!/bin/bash 
 
#snapshot file dir 
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir 
dataLogDir=/opt/zookeeper/zkdatalog/version-2

#Leave 66 files 
count=66 
count=$[$count+1] 
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f 
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f 


#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。


#zk log dir   del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f

第二:配置 kafka

1、解压 kafka 安装包

[root@localhost usr]# cd kafka
[root@localhost kafka]# ls
kafka_2.12-1.1.0.tgz
[root@localhost kafka]# tar -zxvf kafka_2.12-1.1.0.tgz

2、配置 kafka

[root@localhost kafka_2.12-1.1.0]#  cd config
[root@localhost config]# ls
connect-console-sink.properties    connect-log4j.properties       server.properties
connect-console-source.properties  connect-standalone.properties  tools-log4j.properties
connect-distributed.properties     consumer.properties            zookeeper.properties
connect-file-sink.properties       log4j.properties
connect-file-source.properties     producer.properties
[root@localhost config]# pwd
/usr/kafka/kafka_2.12-1.1.0/config

主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有 Zookeeper 文件,我们可以根据 Kafka 内带的 zk 集群来启动,但是建议使用独立的 zk 集群

server.properties 配置文件
broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
上面是参数的解释,实际的修改项为:
#broker.id=0  每台服务器的broker.id都不能相同


#hostname
host.name=192.168.1.107

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=192.168.1.103:2181,192.168.1.104:2181,192.168.1.107:2181

3、启动 kafka 并测试

[root@localhost kafka_2.12-1.1.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@localhost kafka_2.12-1.1.0]# cd bin
[root@localhost bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@localhost bin]# jps
4009 QuorumPeerMain
4811 Kafka
4893 Jps

4、复制 kafka

复制文件文件夹(从本地/usr/kafka/(192.168.1.107)linux系统复制到远程的192.168.1.104linux系统/usr/目录下)
[root@localhost tmp]# ssh 192.168.1.104
root@192.168.1.110's password: 
Last login: Sat May  7 12:53:48 2016 from 192.168.1.107
[root@localhost usr]# scp -r ./kafka root@192.168.1.104:/usr/

5、配置 192.168.1.104 上的 kafka

[root@localhost config]# vi server.properties 
broker.id=1  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
[root@localhost config]# cd ../bin
[root@localhost bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@localhost bin]# jps

6、创建 Topic 来验证是否创建成功

[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.107:2181 --replication-factor 1 --partitions 1 --topic shuaige
Created topic "shuaige".

7、在 192.168.1.107 服务器上创建发布者

[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.7.107:19092 --topic shuaige

8、在 192.168.1.104 服务器上创建订阅者

[root@loaclhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.7.107:19092 --topic shuaige --from-beginning

9、启动 kafka

[root@localhost kafka_2.12-1.1.0]# bin/kafka-server-start.sh config/server.properties &
[1] 8905
[2018-06-04 17:34:06,729] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2018-06-04 17:34:06,730] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)

10、创建一个主题

[root@localhost kafka_2.12-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

11、查看主题

[root@localhost kafka_2.12-1.1.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test

12、发送消息

[root@localhost kafka_2.12-1.1.0]# bin/kafka-console-producer.sh --broker-t localhost:9092 --topic test
>this is a message
>this is another message

13、消费消息

[root@localhost kafka_2.12-1.1.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
this is a message
this is another message

上一篇 建站不啰嗦,上手跟我做(十七)Kafka 集群部署
目录
下一篇 建站不啰嗦,上手跟我做(十九)在 Linux 系统安装 Nodejs