kafka 清理数据

kafka 全部数据清空的步骤

  • 停止每台机器上的 kafka;
  • 删除 kafka 存储目录(server.properties 文件 log.dirs 配置,默认为“/tmp/kafka-logs”)全部 topic 的数据目录;
  • 删除 zookeeper 上与 kafka 相关的 znode 节点;
  • 重启 kafka、如果删除 topic 还在则需要重启 zookeeper;

注意:kafka 版本为 kafka_2.11-1.1.1

停止每台机器上的 kafka

[root@localhost bin]# ls
connect-distributed.sh        kafka-consumer-perf-test.sh          kafka-replay-log-producer.sh        kafka-verifiable-consumer.sh
connect-standalone.sh         kafka-delegation-tokens.sh           kafka-replica-verification.sh       kafka-verifiable-producer.sh
kafka-acls.sh                 kafka-delete-records.sh              kafka-run-class.sh                  trogdor.sh
kafka-broker-api-versions.sh  kafka-log-dirs.sh                    kafka-server-start.sh               windows
kafka-configs.sh              kafka-mirror-maker.sh                kafka-server-stop.sh                zookeeper-security-migration.sh
kafka-console-consumer.sh     kafka-preferred-replica-election.sh  kafka-simple-consumer-shell.sh      zookeeper-server-start.sh
kafka-console-producer.sh     kafka-producer-perf-test.sh          kafka-streams-application-reset.sh  zookeeper-server-stop.sh
kafka-consumer-groups.sh      kafka-reassign-partitions.sh         kafka-topics.sh                     zookeeper-shell.sh
[root@localhost bin]# ./kafka-server-stop.sh
[root@localhost bin]# pwd
/usr/local/kafka/kafka_2.12-1.1.0/bin

删除 kafka 存储目录

在 kafka 安装目录的 config 文件夹下 server.properties 中查看存储目录

[root@localhost kafka_2.12-1.1.0]# vim ./config/server.properties 

############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

删除该目录所有数据:

[root@localhost kafka-logs]# cd /tmp/kafka-logs
[root@localhost kafka-logs]# ls
cleaner-offset-checkpoint  __consumer_offsets-21  __consumer_offsets-35  __consumer_offsets-49                 delivery_plan_machine_demo_001-0
__consumer_offsets-0       __consumer_offsets-22  __consumer_offsets-36  __consumer_offsets-5                  delivery_plan_machine_demo_001-1
__consumer_offsets-1       __consumer_offsets-23  __consumer_offsets-37  __consumer_offsets-6                  delivery_plan_machine_demo_001-2
__consumer_offsets-10      __consumer_offsets-24  __consumer_offsets-38  __consumer_offsets-7                  log-start-offset-checkpoint
__consumer_offsets-11      __consumer_offsets-25  __consumer_offsets-39  __consumer_offsets-8                  meta.properties
__consumer_offsets-12      __consumer_offsets-26  __consumer_offsets-4   __consumer_offsets-9                  order-0
__consumer_offsets-13      __consumer_offsets-27  __consumer_offsets-40  delivery_plan_device_demo_kafka-0     order-1
__consumer_offsets-14      __consumer_offsets-28  __consumer_offsets-41  delivery_plan_device_demo_kafka003-0  order-2
__consumer_offsets-15      __consumer_offsets-29  __consumer_offsets-42  delivery_plan_device_demo_kafka004-0  recovery-point-offset-checkpoint
__consumer_offsets-16      __consumer_offsets-3   __consumer_offsets-43  delivery_plan_device_demo_kafka005-0  replication-offset-checkpoint
__consumer_offsets-17      __consumer_offsets-30  __consumer_offsets-44  delivery_plan_device_demo_kafka006-0  topicName-0
__consumer_offsets-18      __consumer_offsets-31  __consumer_offsets-45  delivery_plan_device_demo_kafka006-1  zhang6-0
__consumer_offsets-19      __consumer_offsets-32  __consumer_offsets-46  delivery_plan_device_demo_kafka006-2  zhang6-1
__consumer_offsets-2       __consumer_offsets-33  __consumer_offsets-47  delivery_plan_device_demo_kafka-1     zhang6-2
__consumer_offsets-20      __consumer_offsets-34  __consumer_offsets-48  delivery_plan_device_demo_kafka-2     zhang6-3
[root@localhost kafka-logs]# rm -rf ./__consumer_offsets*
[root@localhost kafka-logs]# ls
cleaner-offset-checkpoint             delivery_plan_device_demo_kafka006-0  delivery_plan_machine_demo_001-0  order-0                           topicName-0
delivery_plan_device_demo_kafka-0     delivery_plan_device_demo_kafka006-1  delivery_plan_machine_demo_001-1  order-1                           zhang6-0
delivery_plan_device_demo_kafka003-0  delivery_plan_device_demo_kafka006-2  delivery_plan_machine_demo_001-2  order-2                           zhang6-1
delivery_plan_device_demo_kafka004-0  delivery_plan_device_demo_kafka-1     log-start-offset-checkpoint       recovery-point-offset-checkpoint  zhang6-2
delivery_plan_device_demo_kafka005-0  delivery_plan_device_demo_kafka-2     meta.properties                   replication-offset-checkpoint     zhang6-3
[root@localhost kafka-logs]# rm -rf ./delivery_plan_device*
[root@localhost kafka-logs]# ls
cleaner-offset-checkpoint         delivery_plan_machine_demo_001-2  order-0  recovery-point-offset-checkpoint  zhang6-0  zhang6-3
delivery_plan_machine_demo_001-0  log-start-offset-checkpoint       order-1  replication-offset-checkpoint     zhang6-1
delivery_plan_machine_demo_001-1  meta.properties                   order-2  topicName-0                       zhang6-2
[root@localhost kafka-logs]# rm -rf ./delivery_plan_machine*
[root@localhost kafka-logs]# ls
cleaner-offset-checkpoint    meta.properties  order-1  recovery-point-offset-checkpoint  topicName-0  zhang6-1  zhang6-3
log-start-offset-checkpoint  order-0          order-2  replication-offset-checkpoint     zhang6-0     zhang6-2
[root@localhost kafka-logs]# rm -rf ./order*
[root@localhost kafka-logs]# rm -rf ./topicName*
[root@localhost kafka-logs]# rm -rf zhang*
[root@localhost kafka-logs]# ls
cleaner-offset-checkpoint  log-start-offset-checkpoint  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint

删除 zookeeper 上与 kafka 相关的 znode 节点

zookeeper 上面保存着 kafka 的所有 topic 及其消费信息,故需要删除与 kafka 相关的 znode 节点:

进入 zookeeper 的 shell 界面:

[root@localhost bin]# ./zkCli.sh -server localhost:2181
Connecting to localhost:2181
2021-08-15 10:15:28,686 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.12-e5259e437540f349646870ea94dc2658c4e44b3b, built on 03/27/2018 03:55 GMT
2021-08-15 10:15:28,693 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=localhost
2021-08-15 10:15:28,693 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_181
2021-08-15 10:15:28,700 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2021-08-15 10:15:28,700 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/local/java/jdk1.8.0_181/jre
2021-08-15 10:15:28,700 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/home/beijing/zookeeper-3.4.12/bin/../build/classes:/home/beijing/zookeeper-3.4.12/bin/../build/lib/*.jar:/home/beijing/zookeeper-3.4.12/bin/../lib/slf4j-log4j12-1.7.25.jar:/home/beijing/zookeeper-3.4.12/bin/../lib/slf4j-api-1.7.25.jar:/home/beijing/zookeeper-3.4.12/bin/../lib/netty-3.10.6.Final.jar:/home/beijing/zookeeper-3.4.12/bin/../lib/log4j-1.2.17.jar:/home/beijing/zookeeper-3.4.12/bin/../lib/jline-0.9.94.jar:/home/beijing/zookeeper-3.4.12/bin/../lib/audience-annotations-0.5.0.jar:/home/beijing/zookeeper-3.4.12/bin/../zookeeper-3.4.12.jar:/home/beijing/zookeeper-3.4.12/bin/../src/java/lib/*.jar:/home/beijing/zookeeper-3.4.12/bin/../conf:.:/usr/local/java/jdk1.8.0_181/lib/dt.jar:/usr/local/java/jdk1.8.0_181/lib/tools.jar
2021-08-15 10:15:28,701 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2021-08-15 10:15:28,701 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2021-08-15 10:15:28,701 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
2021-08-15 10:15:28,701 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2021-08-15 10:15:28,701 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2021-08-15 10:15:28,701 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.10.0-957.el7.x86_64
2021-08-15 10:15:28,701 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
2021-08-15 10:15:28,702 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
2021-08-15 10:15:28,702 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/home/beijing/zookeeper-3.4.12/bin
2021-08-15 10:15:28,704 [myid:] - INFO  [main:ZooKeeper@441] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@5c29bfd
Welcome to ZooKeeper!
2021-08-15 10:15:28,756 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1028] - Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2021-08-15 10:15:28,967 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@878] - Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
2021-08-15 10:15:29,008 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1302] - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x100000cc1e80001, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0]

查看与 kafka 相关的 znode 节点:

[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, brokers, zookeeper, dubbo,  admin, isr_change_notification, log_dir_event_notification, controller_epoch, consumers, latest_producer_id_block, config]

在上面的 znode 节点中,除了 zookeeper 作为 zk 的安全保障措施,其他 znode 节点都得删除

[zk: localhost:2181(CONNECTED) 2] rmr /cluster
[zk: localhost:2181(CONNECTED) 3] rmr /brokers
[zk: localhost:2181(CONNECTED) 5] rmr /dubbo
[zk: localhost:2181(CONNECTED) 10] rmr /admin
[zk: localhost:2181(CONNECTED) 12] rmr /isr_change_notification
[zk: localhost:2181(CONNECTED) 15] rmr /log_dir_event_notification
[zk: localhost:2181(CONNECTED) 18] rmr /controller_epoch
[zk: localhost:2181(CONNECTED) 19] ls /
[zookeeper, consumers, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 20] rmr /consumers
[zk: localhost:2181(CONNECTED) 21] rmr /latest_producer_id_block
[zk: localhost:2181(CONNECTED) 22] rmr /config
[zk: localhost:2181(CONNECTED) 23] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 25] quit

重启 kafka

执行如下命令启动 kafka:

[root@localhost bin]# /usr/local/kafka/kafka_2.12-1.1.0/bin/kafka-server-start.sh /usr/local/kafka/kafka_2.12-1.1.0/config/server.properties >/dev/null 2>&1 &
[1] 4729

jps 命令查看 kafka 的启动情况:

[root@localhost bin]# jps
4729 Kafka
5084 Jps
3918 QuorumPeerMain

最后在查看 kafka 上面是否还有 topic 存在:

[root@localhost bin]# /usr/local/kafka/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
[root@localhost bin]#

可以看到 topic 及其相关数据已被清空删除

向 kafka 生产和消费消息

图片.png

某一 topic 数据清空

查看当前所有 topic

[root@localhost logs]# /usr/local/kafka/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
topicName

比如目前需要删除 topicName 这一 topic,目前 kafka_2.11-1.1.1 以上版本默认 delete.topic.enable=true,即是说使用命令

[root@localhost logs]# /usr/local/kafka/kafka_2.12-1.1.0/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicName
Topic topicName is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

该命令将会在 zookeeper 中删除与 topicName 这一 topic 相关的 znode 节点(包括 test 详细信息、生产数据、消费数据的节点),并在 kafka 的存储目录 /opt/data/kafka/kafka-logs/ 下把与 topicName 这一 topic 相关的存储数据目录标记为待删除,稍后会真正删除这些待删除的目录,如下:

使用 kafka-topics.sh 查看 test 在 zookeeper 中相关 znode 节点信息是否已被删除

[root@localhost logs]# /usr/local/kafka/kafka_2.12-1.1.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets

在 /opt/data/kafka/kafka-logs 目录下查看 topicName 相关存储目录是否被标记删除

[root@localhost kafka-logs]# ls
cleaner-offset-checkpoint  __consumer_offsets-19  __consumer_offsets-3   __consumer_offsets-40  __consumer_offsets-7
__consumer_offsets-0       __consumer_offsets-2   __consumer_offsets-30  __consumer_offsets-41  __consumer_offsets-8
__consumer_offsets-1       __consumer_offsets-20  __consumer_offsets-31  __consumer_offsets-42  __consumer_offsets-9
__consumer_offsets-10      __consumer_offsets-21  __consumer_offsets-32  __consumer_offsets-43  log-start-offset-checkpoint
__consumer_offsets-11      __consumer_offsets-22  __consumer_offsets-33  __consumer_offsets-44  meta.properties
__consumer_offsets-12      __consumer_offsets-23  __consumer_offsets-34  __consumer_offsets-45  recovery-point-offset-checkpoint
__consumer_offsets-13      __consumer_offsets-24  __consumer_offsets-35  __consumer_offsets-46  replication-offset-checkpoint
__consumer_offsets-14      __consumer_offsets-25  __consumer_offsets-36  __consumer_offsets-47  topicName-0.261071af154c4c16a8fe63ce11294cc8-delete
__consumer_offsets-15      __consumer_offsets-26  __consumer_offsets-37  __consumer_offsets-48  topicName-1.d3b6a23af1974355a712155f5d3a9ff7-delete
__consumer_offsets-16      __consumer_offsets-27  __consumer_offsets-38  __consumer_offsets-49  topicName-2.9d3ef2768b3c495da592528933b4ea4e-delete
__consumer_offsets-17      __consumer_offsets-28  __consumer_offsets-39  __consumer_offsets-5
__consumer_offsets-18      __consumer_offsets-29  __consumer_offsets-4   __consumer_offsets-6

在 /opt/data/kafka/kafka-logs 目录下查看 topicName 相关存储目录已被删除

[root@localhost kafka-logs]# ls
cleaner-offset-checkpoint  __consumer_offsets-17  __consumer_offsets-26  __consumer_offsets-35  __consumer_offsets-44  __consumer_offsets-9
__consumer_offsets-0       __consumer_offsets-18  __consumer_offsets-27  __consumer_offsets-36  __consumer_offsets-45  log-start-offset-checkpoint
__consumer_offsets-1       __consumer_offsets-19  __consumer_offsets-28  __consumer_offsets-37  __consumer_offsets-46  meta.properties
__consumer_offsets-10      __consumer_offsets-2   __consumer_offsets-29  __consumer_offsets-38  __consumer_offsets-47  recovery-point-offset-checkpoint
__consumer_offsets-11      __consumer_offsets-20  __consumer_offsets-3   __consumer_offsets-39  __consumer_offsets-48  replication-offset-checkpoint
__consumer_offsets-12      __consumer_offsets-21  __consumer_offsets-30  __consumer_offsets-4   __consumer_offsets-49
__consumer_offsets-13      __consumer_offsets-22  __consumer_offsets-31  __consumer_offsets-40  __consumer_offsets-5
__consumer_offsets-14      __consumer_offsets-23  __consumer_offsets-32  __consumer_offsets-41  __consumer_offsets-6
__consumer_offsets-15      __consumer_offsets-24  __consumer_offsets-33  __consumer_offsets-42  __consumer_offsets-7
__consumer_offsets-16      __consumer_offsets-25  __consumer_offsets-34  __consumer_offsets-43  __consumer_offsets-8

图片.png

  • kafka 全部数据清空步骤比较繁琐,借鉴某一 topic 数据清空的方式,可以通过使用 kafka-topics.sh --delete 命令逐个删除所有的 topic,达到清空 kafka 全部 topic 数据的目的,不足的是 topic“__consumer_offsets”无法删除,不过不碍事。