linux 怎样查看kafka的某 topic数据
发布网友
发布时间:2022-04-24 04:30
我来回答
共3个回答
热心网友
时间:2022-04-14 06:57
1、创建一个需要增加备份因子的topic列表的文件,文件格式是json格式的。
2、使用kafka官方提供的工具拿到上面topic的partions 分布情况,并重定向到文件中。
3、修改ressgintopic.conf 文件的,手动分配新增加的partion 备份因子。
4、通过下面命令执行备份因子扩容过程,bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json。
5、最后查看kafka的某 topic数据如图。
注意事项:
Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
热心网友
时间:2022-04-14 08:15
查看topic分布情况kafka-list-topic.sh:
bin/kafka-list-topic.sh-zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)
bin/kafka-list-topic.sh-zookeeper 192.168.197.170:2181,192.168.197.171:2181-topic test (查看test的分区情况)
创建topickafka-create-topic.sh:
bin/kafka-create-topic.sh-replica 2-partition 8-topic test-zookeeper 192.168.197.170:2181,192.168.197.171:2181
创建名为test的topic,8个分区分别存放数据,数据备份总共2份。
参考资料
kafka 创建topic,查看topic - CSDN博客.blog.csdn.[引用时间2017-12-29]
热心网友
时间:2022-04-14 09:50
基于0.8.0版本。
##查看topic分布情况kafka-list-topic.sh
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (列出所有topic的分区情况)
bin/kafka-list-topic.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --topic test (查看test的分区情况)
其实kafka-list-topic.sh里面就一句
exec $(dirname $0)/kafka-run-class.sh kafka.admin.ListTopicCommand $@
实际是通过
kafka-run-class.sh脚本执行的包kafka.admin下面的类
##创建TOPIC kafka-create-topic.sh
bin/kafka-create-topic.sh --replica 2 --partition 8 --topic test --zookeeper 192.168.197.170:2181,192.168.197.171:2181
创建名为test的topic, 8个分区分别存放数据,数据备份总共2份
bin/kafka-create-topic.sh --replica 1 --partition 1 --topic test2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181
结果 topic: test2 partition: 0 leader: 170 replicas: 170 isr: 170
##重新分配分区kafka-reassign-partitions.sh
这个命令可以分区指定到想要的--broker-list上
bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "171" --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --execute
cat topic-to-move.json
{"topics":
[{"topic": "test2"}],
"version":1
}
##为Topic增加 partition数目kafka-add-partitions.sh
bin/kafka-add-partitions.sh --topic test --partition 2 --zookeeper 192.168.197.170:2181,192.168.197.171:2181 (为topic test增加2个分区)
##控制台接收消息
bin/kafka-console-consumer.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --from-beginning --topic test
##控制台发送消息
bin/kafka-console-procer.sh --broker-list 192.168.197.170:9092,192.168.197.171: 9092 --topic test
##手动均衡topic, kafka-preferred-replica-election.sh
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.197.170:2181,192.168.197.171:2181 --path-to-json-file preferred-click.json
cat preferred-click.json
{
"partitions":
[
{"topic": "click", "partition": 0},
{"topic": "click", "partition": 1},
{"topic": "click", "partition": 2},
{"topic": "click", "partition": 3},
{"topic": "click", "partition": 4},
{"topic": "click", "partition": 5},
{"topic": "click", "partition": 6},
{"topic": "click", "partition": 7},
{"topic": "play", "partition": 0},
{"topic": "play", "partition": 1},
{"topic": "play", "partition": 2},
{"topic": "play", "partition": 3},
{"topic": "play", "partition": 4},
{"topic": "play", "partition": 5},
{"topic": "play", "partition": 6},
{"topic": "play", "partition": 7}
]
}
##删除topic,慎用,只会删除zookeeper中的元数据,消息文件须手动删除
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181
linux 怎样查看kafka的某 topic数据
1、创建一个需要增加备份因子的topic列表的文件,文件格式是json格式的。2、使用kafka官方提供的工具拿到上面topic的partions 分布情况,并重定向到文件中。3、修改ressgintopic.conf 文件的,手动分配新增加的partion 备份因子。4、通过下面命令执行备份因子扩容过程,bin/kafka-reassign-partitions.sh --zoo...
如何获取kafka某topic的Logsize
创建TOPIC kafka-create-topic.sh bin/kafka-create-topic.sh --replica 2 --partition 8 --topic test --zookeeper 192.168.197.170:2181,192.168.197.171:2181 创建名为test的topic, 8个分区分别存放数据,数据备份总共2份 bin/kafka-create-topic.sh --replica 1 --partition 1 -...
Kafka查看topic、consumer group状态命令
1,查看kafka topic列表,使用--list参数 bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list __consumer_offsets lx_test_topic test 2,查看kafka特定topic的详情,使用--topic与--describe参数 bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe T...
Kafka创建、查看topic,发送消息和接收消息
创建主题后,你可以通过Kafka的命令行工具或编程语言的API来查看主题,以确认主题是否已成功创建。例如,你可以在命令行界面中输入以下命令来查看主题:接下来,启动一个Producer,并发送消息。默认情况下,每行会单独算作一次消息发出。你可以使用命令行工具或编程语言的API来实现这个步骤。例如,使用命令行...
kafka查询和修改topic的offset
输出 输出 注意如果你的kafka设置了zookeeper root,比如为/kafka,那么命令应该改为:重启相关的应用程序,就可以从设置的offset开始读数据了。手动更新Kafka存在Zookeeper中的偏移量。我们有时候需要手动将某个主题的偏移量设置成某个值,这时候我们就需要更新Zookeeper中的数据了。Kafka内置为我们提供了修改...
如何获取kafka某一topic中最新的offset
如果你在0.9版本以上,可以用最新的Consumer client 客户端,有consumer.seekToEnd() / consumer.position() 可以用于得到当前最新的offset:{log.dirs}/replication-offset-checkpoint
如何获取kafka某一topic中最新的offset
选中某一行,var record = grid.getSelectionModel().getSelection();一行的所有数据都在record里面 具体某一列:record.get("列名-dataIndex")
Kafka shell 查看指定topic partition offset的信息
有时需要简单的用shell去检查一个topic下边某一个partition的某个offset的消息。之前一直用 kafka.tools.ConsoleConsumer, 用的心酸。后来发现一个稍强大的工具。https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example sample usage:其他命令:
Kafka的Topic配置详解
(A)创建topic时配置参数 (B)修改topic时配置参数 覆盖已经有topic参数,下面例子修改"my-topic"的max message属性 (C)删除topic级别配置参数 注:配置的kafka集群的根目录为/config/mobile/mq/mafka02,因此所有节点信息都在此目录下。cleanup.policy delete.retention.ms delete.retention.ms flush....
java api如何获取kafka所有Topic列表(TopicCommand,只打印),需要List...
在运行/调试设置中,编辑配置对话框中有“Main”这个选项卡,我们可以勾选“Stop in main”这个复选框。如果选中,那么在调试一个基于main方法的Java程序时,程序会在main方法第一行位置便停止执行。