Kafka Admin

查看Topic



        /*
        bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
        */
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (Admin admin = Admin.create(props)) {
             ListTopicsResult result = admin.listTopics();
            result.namesToListings().whenComplete(((stringTopicListingMap, throwable) -> {

                for (Map.Entry<String, TopicListing> entity:stringTopicListingMap.entrySet()) {
                    System.out.println(entity.getKey());
                    System.out.println(entity.getValue().topicId());
                    System.out.println(entity.getValue().name());
                    System.out.println(entity.getValue());

                    System.out.println("------------------------------------------");
                }
            }));
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }


创建Topic



    /**
     * bin/kafka-topics.sh --create \
     *     --bootstrap-server localhost:9092 \
     *     --replication-factor 1 \
     *     --partitions 1 \
     *     --topic streams-wordcount-output \
     *     --config cleanup.policy=compact
     */
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (Admin admin = Admin.create(props)) {
            String topicName = "create-topic-with-java";
            int partitions = 1;
            short replicationFactor = 1;
            // Create a compacted topic
            CreateTopicsResult result = admin.createTopics(Collections.singleton(
                    new NewTopic(topicName, partitions, replicationFactor)
                            .configs(Collections.singletonMap(TopicConfig.CLEANUP_POLICY_CONFIG,
                                    TopicConfig.CLEANUP_POLICY_COMPACT))));

            // Call values() to get the result for a specific topic
            KafkaFuture<Void> future = result.values().get(topicName);

            // Call get() to block until the topic creation is complete or has failed
            // if creation failed the ExecutionException wraps the underlying cause.
            future.get();
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }


删除Topic




        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        try (Admin admin = Admin.create(props)) {

            TopicCollection topicCollection = TopicCollection.ofTopicNames(
                    Collections.singleton("create-topic-with-java"));
            DeleteTopicsResult result = admin.deleteTopics(topicCollection);
            result.all().whenComplete((r,t) ->{
               if (t != null) {
                   t.printStackTrace();
               }
                System.out.println("delete done!");
            });
        } catch (Throwable e) {
            throw new RuntimeException(e);
        }