• 关于springboot创建kafkaTopic


    工具类提供,方法名见名知意。使用kafka admin

    
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.common.KafkaFuture;
    
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    import org.apache.kafka.clients.admin.ListTopicsOptions;
    import org.apache.kafka.clients.admin.TopicDescription;
    import org.apache.kafka.common.KafkaFuture;
    import org.apache.kafka.common.TopicPartitionInfo;
    
    import java.util.Properties;
    import java.util.Set;
    import java.util.concurrent.ExecutionException;
    /**
     * @author: zhoumo
     * @data: 2024/6/24 16:37
     * @descriptions:
     */
    public class KafkaTopicInfo {
    
        final static String ip="127.0.0.1:9090";
    
    
        public static void main(String[] args) {
            getListDetail();
        }
    
    
        
        public static void createTopic(String topicName) throws ExecutionException, InterruptedException {
            // Kafka 配置
            Properties props = new Properties();
            // Kafka 服务器地址和端口
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    
            // 创建 AdminClient 实例
            try (AdminClient adminClient = AdminClient.create(props)) {
                // 创建一个新的主题
                // 指定分区数量
                // 指定复制因子
                int numPartitions = 2;
                short replicationFactor = 1;
    
                NewTopic newTopic = new NewTopic(topicName, numPartitions, replicationFactor);
    
                // 创建主题
                adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                System.out.println("Topic created successfully: " + topicName);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void deleteTopic(String topicName) {
            // Kafka 配置
            Properties props = new Properties();
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    
            try (AdminClient adminClient = AdminClient.create(props)) {
                // 要删除的主题名称
             //   String topicName = "myTopic";
    
                // 删除主题
                DeleteTopicsResult deleteResult = adminClient.deleteTopics(Collections.singletonList(topicName));
                deleteResult.all().get();
    
                System.out.println("Topic deleted successfully: " + topicName);
    
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        public static void getList() {
            // Kafka 配置
            Properties props = new Properties();
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    
            try (AdminClient adminClient = AdminClient.create(props)) {
                // 列出所有主题
                ListTopicsOptions options = new ListTopicsOptions();
                // 是否包括内部主题,默认为 false
                options.listInternal(true);
                ListTopicsResult topicsResult = adminClient.listTopics(options);
                Set<String> topics = topicsResult.names().get();
    
                System.out.println("Existing topics:");
                for (String topic : topics) {
                    System.out.println(topic);
                }
    
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    
    
        public static void getListDetail() {
            // Kafka 配置
            Properties props = new Properties();
            props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, ip);
    
            try (AdminClient adminClient = AdminClient.create(props)) {
                // 列出所有主题
                ListTopicsOptions options = new ListTopicsOptions();
                // 是否包括内部主题,默认为 false
                options.listInternal(true);
                KafkaFuture<Set<String>> topics = adminClient.listTopics(options).names();
    
                System.out.println("Existing topics:");
                for (String topic : topics.get()) {
                    System.out.println(topic);
                    // 获取主题的详细信息(包括分区情况)
                    /*
                    回退jdk1.8 版本
                    KafkaFuture topicDescription = adminClient.describeTopics(Set.of(topic)).values().get(topic);
                    printTopicDetails(topicDescription.get());
                    */
                    Set<String> topicSet = new HashSet<>();
                    topicSet.add(topic);
                    KafkaFuture<TopicDescription> topicDescriptionFuture = adminClient.describeTopics(topicSet).values().get(topic);
                    TopicDescription topicDescription = topicDescriptionFuture.get();
                    printTopicDetails(topicDescription);
                }
    
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }
        }
        private static void printTopicDetails(TopicDescription topicDescription) {
            System.out.println("Topic: " + topicDescription.name());
            System.out.println("Partitions:");
    
            for (TopicPartitionInfo partition : topicDescription.partitions()) {
                System.out.printf("  Partition %d, Leader: %d, Replicas: %s, Isrs: %s%n",
                        partition.partition(),
                        partition.leader().id(),
                        partition.replicas(),
                        partition.isr());
            }
            System.out.println();
        }
    }
    
    
  • 相关阅读:
    13Java数组与数组内存图
    mysql8.0笔记
    Java设计模式8,校验、审批流程改善神器,责任链模式
    开放之光——湖北电大搜题助力学习之旅
    地下室选择笔记
    怎么解决Spring的循环依赖
    Python 深度学习入门之CNN
    从零开始搭建 Hexo 个人博客
    【译】Based:简单线性注意力语言模型平衡召回-吞吐量权衡
    Inno Setup安装中文语言
  • 原文地址:https://blog.csdn.net/zhougubei/article/details/139931132