建站不啰嗦,上手跟我做(十八)zookeeper-Kafka 集群搭建
需要三台服务器部署 zookeeper 创建集群
192.168.1.103
192.168.1.104
192.168.1.107
第 1 和 2 步骤三个服务器完全相同
第 3 步骤针对不同 ip 服务器区别配置
第一、安装 zookeeper
1、解压 Zookeeper
[root@localhost Zookeeper]# tar -zxvf zookeeper-3.4.12.tar.gz
2、修改配置文件
[root@localhost Zookeeper]# ls
zookeeper-3.4.12 zookeeper-3.4.12.tar.gz
[root@localhost Zookeeper]# cd zookeeper-3.4.12/
[root@localhost zookeeper-3.4.12]# ls
bin docs NOTICE.txt zookeeper-3.4.12.jar
build.xml ivysettings.xml README.md zookeeper-3.4.12.jar.asc
conf ivy.xml README_packaging.txt zookeeper-3.4.12.jar.md5
contrib lib recipes zookeeper-3.4.12.jar.sha1
dist-maven LICENSE.txt src
[root@localhost zookeeper-3.4.12]# mkdir data
[root@localhost zookeeper-3.4.12]# ls
bin dist-maven LICENSE.txt src
build.xml docs NOTICE.txt zookeeper-3.4.12.jar
conf ivysettings.xml README.md zookeeper-3.4.12.jar.asc
contrib ivy.xml README_packaging.txt zookeeper-3.4.12.jar.md5
data lib recipes zookeeper-3.4.12.jar.sha1
[root@localhost zookeeper-3.4.12]# cd conf
[root@localhost conf]# ls
configuration.xsl log4j.properties zoo_sample.cfg
[root@localhost conf]# vi zoo.cfg
#集群包括三个节点192.168.1.103
#集群包括三个节点192.168.1.104
#集群包括三个节点192.168.1.107
#(1)为什么是三个,因为Zookeeper喜欢奇数不喜欢偶数。
#server.X=A:B:C
#X-代表服务器编号
#A-代表ip
#B和C-代表端口,这个端口用来系统之间通信
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/var/lib/zookeeper/
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=192.168.1.103:2888:3888
server.2=192.168.1.104:2888:3888
server.3=192.168.1.107:2888:3888
3、创建 dataDir 目录,并在该目录下创建 my
首先 修改 dataDir,顾名思义就是【数据目录】了,修改成我们自定义的即可。
vim myid
#之后会产生一个新文件,直接在里面写 X 即可
#比如配置的三个server,myid里面写的X就是server.X=ip:2888:3888 中ip所对应的X
server.1=192.168.1.103:2888:3888【192.168.192.128服务器上面的myid填写1】
server.2=192.168.1.104:2888:3888【192.168.192.129服务器上面的myid填写2】
server.3=192.168.1.107:2888:3888【192.168.192.130服务器上面的myid填写3】
[root@localhost conf]# mkdir -p /var/lib/zookeeper/
[root@localhost zookeeper]# vi myid
1
4、启动 zookeeper
[root@localhost zookeeper-3.4.12]# cd bin
[root@localhost bin]# ls
README.txt zkCli.cmd zkEnv.cmd zkServer.cmd
zkCleanup.sh zkCli.sh zkEnv.sh zkServer.sh
[root@localhost zookeeper-3.4.12]# bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/Zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@localhost zookeeper-3.4.12]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/Zookeeper/zookeeper-3.4.12/bin/../conf/zoo.cfg
Mode: follower
如果启动不成功,查看如下几点
1、防火墙是否开放端口 2888 和 3888 以及 2181
然后开启防火墙
[root@localhost zookeeper-3.4.11]# firewall-cmd --zone=public --add-port=2888/tcp --permanent
success
[root@localhost zookeeper-3.4.11]# firewall-cmd --zone=public --add-port=3888/tcp --permanent
success
[root@localhost zookeeper-3.4.11]# firewall-cmd --zone=public --add-port=2181/tcp --permanent
success
[root@localhost zookeeper-3.4.11]# systemctl restart firewalld
2、是否有程序占用端口 2181
netstat -nltp | grep 2181
存在占用
kill -9 pid杀掉进程后重新启动
zoo.cfg 配置文件配置项解析:
#tickTime:
这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
#initLimit:
这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 5 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 52000=10 秒
#syncLimit:
这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 52000=10 秒
#dataDir:
快照日志的存储路径
#dataLogDir:
事物日志的存储路径,如果不配置这个那么事物日志会默认存储到 dataDir 制定的目录,这样会严重影响 zk 的性能,当 zk 吞吐量较大的时候,产生的事物日志、快照日志太多
#clientPort:
这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。修改他的端口改大点
清理日志
#!/bin/bash
#snapshot file dir
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir
dataLogDir=/opt/zookeeper/zkdatalog/version-2
#Leave 66 files
count=66
count=$[$count+1]
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f
#以上这个脚本定义了删除对应两个目录中的文件,保留最新的66个文件,可以将他写到crontab中,设置为每天凌晨2点执行一次就可以了。
#zk log dir del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f
第二:配置 kafka
1、解压 kafka 安装包
[root@localhost usr]# cd kafka
[root@localhost kafka]# ls
kafka_2.12-1.1.0.tgz
[root@localhost kafka]# tar -zxvf kafka_2.12-1.1.0.tgz
2、配置 kafka
[root@localhost kafka_2.12-1.1.0]# cd config
[root@localhost config]# ls
connect-console-sink.properties connect-log4j.properties server.properties
connect-console-source.properties connect-standalone.properties tools-log4j.properties
connect-distributed.properties consumer.properties zookeeper.properties
connect-file-sink.properties log4j.properties
connect-file-source.properties producer.properties
[root@localhost config]# pwd
/usr/kafka/kafka_2.12-1.1.0/config
主要关注:server.properties 这个文件即可,我们可以发现在目录下:
有很多文件,这里可以发现有 Zookeeper 文件,我们可以根据 Kafka 内带的 zk 集群来启动,但是建议使用独立的 zk 集群
server.properties 配置文件
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=19092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.7.100 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #设置zookeeper的连接端口
上面是参数的解释,实际的修改项为:
#broker.id=0 每台服务器的broker.id都不能相同
#hostname
host.name=192.168.1.107
#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
#设置zookeeper的连接端口
zookeeper.connect=192.168.1.103:2181,192.168.1.104:2181,192.168.1.107:2181
3、启动 kafka 并测试
[root@localhost kafka_2.12-1.1.0]# ls
bin config libs LICENSE logs NOTICE site-docs
[root@localhost kafka_2.12-1.1.0]# cd bin
[root@localhost bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@localhost bin]# jps
4009 QuorumPeerMain
4811 Kafka
4893 Jps
4、复制 kafka
复制文件文件夹(从本地/usr/kafka/(192.168.1.107)linux系统复制到远程的192.168.1.104linux系统/usr/目录下)
[root@localhost tmp]# ssh 192.168.1.104
root@192.168.1.110's password:
Last login: Sat May 7 12:53:48 2016 from 192.168.1.107
[root@localhost usr]# scp -r ./kafka root@192.168.1.104:/usr/
5、配置 192.168.1.104 上的 kafka
[root@localhost config]# vi server.properties
broker.id=1 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
[root@localhost config]# cd ../bin
[root@localhost bin]# ./kafka-server-start.sh -daemon ../config/server.properties
[root@localhost bin]# jps
6、创建 Topic 来验证是否创建成功
[root@localhost bin]# ./kafka-topics.sh --create --zookeeper 192.168.1.107:2181 --replication-factor 1 --partitions 1 --topic shuaige
Created topic "shuaige".
7、在 192.168.1.107 服务器上创建发布者
[root@localhost bin]# ./kafka-console-producer.sh --broker-list 192.168.7.107:19092 --topic shuaige
8、在 192.168.1.104 服务器上创建订阅者
[root@loaclhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.7.107:19092 --topic shuaige --from-beginning
9、启动 kafka
[root@localhost kafka_2.12-1.1.0]# bin/kafka-server-start.sh config/server.properties &
[1] 8905
[2018-06-04 17:34:06,729] INFO Kafka commitId : fdcf75ea326b8e07 (org.apache.kafka.common.utils.AppInfoParser)
[2018-06-04 17:34:06,730] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
10、创建一个主题
[root@localhost kafka_2.12-1.1.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
11、查看主题
[root@localhost kafka_2.12-1.1.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
test
12、发送消息
[root@localhost kafka_2.12-1.1.0]# bin/kafka-console-producer.sh --broker-t localhost:9092 --topic test
>this is a message
>this is another message
13、消费消息
[root@localhost kafka_2.12-1.1.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
this is a message
this is another message
上一篇 建站不啰嗦,上手跟我做(十七)Kafka 集群部署
目录
下一篇 建站不啰嗦,上手跟我做(十九)在 Linux 系统安装 Nodejs