欢迎访问 生活随笔!

生活随笔

当前位置: 首页 > 编程资源 > 编程问答 >内容正文

编程问答

5、kafka的操作

发布时间:2025/5/22 编程问答 63 豆豆
生活随笔 收集整理的这篇文章主要介绍了 5、kafka的操作 小编觉得挺不错的,现在分享给大家,帮大家做个参考.

2019独角兽企业重金招聘Python工程师标准>>>

#1、通过shell脚本

  • 查看当前服务器中的所有topic

    bin/kafka-topics.sh --list --zookeeper zk01:2181

  • 创建topic

    bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test

  • 删除topic

    bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test

    需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

  • 生产数据

    kafka-console-producer.sh --broker-list kafka01:9092 --topic itheima

    演变操作:(不需要任何的数据采集工具)tail -F /root/log.log | kafka-console-producer.sh --broker-list hadoop1:9092 --topic accesslogs
  • 消费消息

    sh bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1

  • 查看消费位置

    sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup

  • 查看某个Topic的详情

    sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181

  • 对分区数进行修改

    kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic

#2、通过Java的api操作:

生产者API

public class KafkaProducerSimple {public static void main(String[] args) throws InterruptedException {String TOPIC = "test9";Properties props = new Properties();props.put("serializer.class", "kafka.serializer.StringEncoder");props.put("metadata.broker.list", "kafka01:9092");props.put("request.required.acks", "1");//定义一个partition分区器props.put("partitioner.class", "cn.itcast.storm.kafka.producer.MyLogPartitioner");Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));int messageNo = 0;while (true){String messageStr = new String("produce数据");KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(TOPIC, messageNo + "", messageStr);producer.send(keyedMessage);messageNo +=1;}} }

消费者API

public class KafkaConsumerSimple implements Runnable {public String title;public KafkaStream<byte[], byte[]> stream;public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {this.title = title;this.stream = stream;}public void run() {System.out.println("开始运行 " + title);ConsumerIterator<byte[], byte[]> it = stream.iterator();/*** 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞* 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false* */while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> data = it.next();String topic = data.topic();int partition = data.partition();long offset = data.offset();String msg = new String(data.message());System.out.println(String.format("Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",title, topic, partition, offset, msg));}System.err.println(String.format("Consumer: [%s] exiting ...", title));}public static void main(String[] args) throws Exception{// main方法Properties props = new Properties();props.put("group.id", "testGroup");props.put("zookeeper.connect", "zk01:2181,zk02:2181,zk03:2181");props.put("auto.offset.reset", "largest");props.put("auto.commit.interval.ms", "1000");props.put("partition.assignment.strategy", "roundrobin");ConsumerConfig config = new ConsumerConfig(props);String topic = "test9";//只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);//创建topicCountMapMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic,9);Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);//取出 `kafkaTest` 对应的 streamsList<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);//创建一个容量为20的线程池ExecutorService executor = Executors.newFixedThreadPool(9);//创建20个consumer threadsfor (int i = 0; i < streams.size(); i++)executor.execute(new KafkaConsumerSimple("消费者" + (i + 1), streams.get(i)));} }

转载于:https://my.oschina.net/liufukin/blog/800434

总结

以上是生活随笔为你收集整理的5、kafka的操作的全部内容,希望文章能够帮你解决所遇到的问题。

如果觉得生活随笔网站内容还不错,欢迎将生活随笔推荐给好友。