• Kafka读取数据到Hbase数据库2种方式 使用桥梁模式


    读入数据时 注意服务器内存大小 可用命令查看!

    如果内存太小会出现读入报错的情况

    df 

    方法一

    1.导入pom依赖

    
        
            junit
            junit
            4.12
            test
        
        
            org.apache.kafka
            kafka-clients
            2.0.0
        
        
            org.apache.hbase
            hbase-client
            1.2.0
        
    

    2.编写代码

    public class HbaseReadKafka{
    
        static int i = 0;
        static Connection conn;
    
        static {
            try {
                Configuration cnf = HBaseConfiguration.create();
                cnf.set("hbase.zookeeper.quorum", "192.168.64.128:2181");
                conn = ConnectionFactory.createConnection(cnf);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
    
            Properties prop = new Properties();
            prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.128:9092");
            prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            prop.put(ConsumerConfig.GROUP_ID_CONFIG, "cm");
            prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
            KafkaConsumer consumer = new KafkaConsumer(prop);
            consumer.subscribe(Arrays.asList(new String[]{"user_friends_raw"}));
            while (true) {
                ConsumerRecords records = consumer.poll(Duration.ofSeconds(3));
                System.out.println("获取数据:" + records.count() + "====>" + (++i));
                List puts = new ArrayList<>();
                for (ConsumerRecord record : records) {
                    //对kafka读取的数据处理 塞入集合
                    String[] lines = record.value().split(",", -1);
                    if (lines.length > 1) {
                        String[] fids = lines[1].split(" ");
                        for (String fid : fids) {
                            Put put = new Put((lines[0] + "-" + fid).getBytes());
                            put.addColumn("base".getBytes(), "userid".getBytes(), lines[0].getBytes());
                            put.addColumn("base".getBytes(), "friendid".getBytes(), fid.getBytes());
                            puts.add(put);
                        }
                    }
                }
                            //找到表 //调用hbase写入数据
                try {
                    Table htable = conn.getTable(TableName.valueOf("userfriends"));
                    htable.put(puts);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                //清空一下list集合
                puts.clear();
                //异步提交
                consumer.commitAsync();
                //同步提交
    //            consumer.commitSync();
            }
        }
    }

    方法二

    类型总览!

     

    1 导入pom

    1. <properties>
    2. <java.version>1.8java.version>
    3. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
    4. <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
    5. <spring-boot.version>2.3.7.RELEASEspring-boot.version>
    6. properties>
    7. <dependencies>
    8. <dependency>
    9. <groupId>org.springframework.bootgroupId>
    10. <artifactId>spring-boot-starterartifactId>
    11. dependency>
    12. <dependency>
    13. <groupId>org.springframework.kafkagroupId>
    14. <artifactId>spring-kafkaartifactId>
    15. dependency>
    16. <dependency>
    17. <groupId>org.apache.hbasegroupId>
    18. <artifactId>hbase-shaded-clientartifactId>
    19. <version>1.2.0version>
    20. dependency>
    21. <dependency>
    22. <groupId>org.projectlombokgroupId>
    23. <artifactId>lombokartifactId>
    24. dependency>
    25. <dependency>
    26. <groupId>org.springframework.bootgroupId>
    27. <artifactId>spring-boot-starter-testartifactId>
    28. <scope>testscope>
    29. <exclusions>
    30. <exclusion>
    31. <groupId>org.junit.vintagegroupId>
    32. <artifactId>junit-vintage-engineartifactId>
    33. exclusion>
    34. exclusions>
    35. dependency>
    36. <dependency>
    37. <groupId>org.springframework.kafkagroupId>
    38. <artifactId>spring-kafka-testartifactId>
    39. <scope>testscope>
    40. dependency>
    41. dependencies>
    42. <dependencyManagement>
    43. <dependencies>
    44. <dependency>
    45. <groupId>org.springframework.bootgroupId>
    46. <artifactId>spring-boot-dependenciesartifactId>
    47. <version>${spring-boot.version}version>
    48. <type>pomtype>
    49. <scope>importscope>
    50. dependency>
    51. dependencies>
    52. dependencyManagement>
    53. <build>
    54. <plugins>
    55. <plugin>
    56. <groupId>org.apache.maven.pluginsgroupId>
    57. <artifactId>maven-compiler-pluginartifactId>
    58. <version>3.8.1version>
    59. <configuration>
    60. <source>1.8source>
    61. <target>1.8target>
    62. <encoding>UTF-8encoding>
    63. configuration>
    64. plugin>
    65. <plugin>
    66. <groupId>org.springframework.bootgroupId>
    67. <artifactId>spring-boot-maven-pluginartifactId>
    68. <version>2.3.2.RELEASEversion>
    69. <configuration>
    70. <mainClass>com.kgc.UserinterestApplicationmainClass>
    71. configuration>
    72. <executions>
    73. <execution>
    74. <id>repackageid>
    75. <goals>
    76. <goal>repackagegoal>
    77. goals>
    78. execution>
    79. executions>
    80. plugin>
    81. plugins>
    82. build>

    2 配置yml

    spring:
      application:
        name: userinterest
      kafka:
        bootstrap-servers: 192.168.64.128:9092
        consumer:
            #关闭手动提交
          enable-auto-commit: false
            #从首位置读取文件
          auto-offset-reset: earliest
           #序列化编码
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        listener:
          ack-mode: manual_immediate
    server:
      port: 8999
    

    3 配置桥接模式

    # 编写接口1

    public interface FillData{
        List fillData (List lst);
    }
    

    # 编写接口2

    /**
     * 根据查询结果自动填充数据
     * @param 
     */
    
    public interface FillHbaseData extends FillData {
        List fillData(List list);
    }
    
    # 编写接口3
    
    /**
     * 根据用户的不同数据和传入的实体模型进行数据格式转换接口
     * @param 
     */
    public interface StringToEntity{
        List change(String line);
    }
    

    # 编写接口 4

    /**
     * 数据转化接口 kafka数据转为常用数据格式
     * @param 
     */
    public interface DataChange {
        List change(String line);
    }
    

    # 编写抽象类 5 实现接口4

    /**
     * 桥梁模式中的抽象角
     */
    public abstract class AbstracDataChange implements DataChange {
    
        protected FillData fillData;
        protected StringToEntity stringToEntity;
        protected Writer writer;
    
        public AbstracDataChange(FillData fillData, StringToEntity stringToEntity, Writer writer) {
            this.fillData = fillData;
            this.stringToEntity = stringToEntity;
            this.writer = writer;
        }
    
    
        public abstract List change(String line);
    
        public abstract  void  fill(ConsumerRecord record,String tableName);
    }
    

     

    # 编写接口实现类1  实现接口 2

    1.1

    /**
     * 处理eventAttendees数据
     */
    public class EventAttendeesFillDataImp implements FillHbaseData {
    
        @Override
        public List fillData(List list) {
            List puts=new ArrayList<>();
            list.stream().forEach(eventAttendees -> {
                Put put = new Put((eventAttendees.getEventid()+eventAttendees.getUserid()+eventAttendees.getAnswer()).getBytes());
                put.addColumn("base".getBytes(),"eventid".getBytes(),eventAttendees.getEventid().getBytes());
                put.addColumn("base".getBytes(),"userid".getBytes(),eventAttendees.getUserid().getBytes());
                put.addColumn("base".getBytes(),"answer".getBytes(),eventAttendees.getAnswer().getBytes());
                puts.add(put);
    
            });
            return puts;
        }
    }

    # 编写接口实现类2  实现接口 2

    1.2

    public class EventsFillDataImp implements FillHbaseData {
        @Override
        public List fillData(List list) {
            List puts=new ArrayList<>();
            list.stream().forEach(events -> {
                Put put=new Put(events.getEventid().getBytes());
                put.addColumn("base".getBytes(),"userid".getBytes(),events.getUserid().getBytes());
                put.addColumn("base".getBytes(),"starttime".getBytes(),events.getStarttime().getBytes());
                put.addColumn("base".getBytes(),"city".getBytes(),events.getCity().getBytes());
                put.addColumn("base".getBytes(),"state".getBytes(),events.getState().getBytes());
                put.addColumn("base".getBytes(),"zip".getBytes(),events.getZip().getBytes());
                put.addColumn("base".getBytes(),"country".getBytes(),events.getCountry().getBytes());
                put.addColumn("base".getBytes(),"lat".getBytes(),events.getLat().getBytes());
                put.addColumn("base".getBytes(),"lng".getBytes(),events.getLng().getBytes());
                puts.add(put);
            });
            return puts;
        }
    }
    

    # 编写接口实现类3 实现接口 2

    1.3

    /**
     * 针对userfriends消息队列转换的list集合再转为list
     */
    public class UserFriendsFillDataImp implements FillHbaseData {
        @Override
        public List fillData(List list) {
            List puts=new ArrayList<>();
            list.stream().forEach(userFriends -> {
                Put put = new Put((userFriends.getUserid()+"-"+userFriends.getFriendid()).getBytes());
                put.addColumn("base".getBytes(),"userid".getBytes(), userFriends.getUserid().getBytes());
                put.addColumn("base".getBytes(),"friendid".getBytes(), userFriends.getFriendid().getBytes());
                puts.add(put);
            });
            return puts;
        }
    }
    

    # 编写接口实现类1  实现接口 3

    public class EventAttendeesChangeImp implements StringToEntity {
        /**
         * 数据进入为 eventid yes maybe invited no
         * ex:123,112233 34343,234234 45454,112233 23232 234234,343343  34343
         * 将数据格式转为 123 112233 yes,123 34343 yes,123 234234 maybe ......
         * @param line
         * @return
         */
        @Override
        public List change(String line) {
            String[] infos = line.split(",", -1);
            List eas= new ArrayList<>();
            //先计算所有回答yes的人
            if (!infos[1].trim().equals("")&&infos[1]!=null) {
                Arrays.asList(infos[1].split(" ")).stream()
                        .forEach(yes->{
                            EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(yes).answer("yes").build();
                            eas.add(eventAttendees);
                        });
            }
            //计算所有maybe的人
            if (!infos[2].trim().equals("")&&infos[2]!=null) {
                Arrays.asList(infos[2].split(" ")).stream()
                        .forEach(maybe->{
                            EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(maybe).answer("maybe").build();
                            eas.add(eventAttendees);
                        });
            }
            //计算所有invited的人
            if (!infos[3].trim().equals("")&&infos[3]!=null) {
                Arrays.asList(infos[3].split(" ")).stream()
                        .forEach(invited->{
                            EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(invited).answer("invited").build();
                            eas.add(eventAttendees);
                        });
            }
            //计算no的人
            if (!infos[4].trim().equals("")&&infos[4]!=null) {
                Arrays.asList(infos[4].split(" ")).stream()
                        .forEach(no->{
                            EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(no).answer("no").build();
                            eas.add(eventAttendees);
                        });
            }
            return eas;
        }
    }
    

    # 编写接口实现类2   实现接口 3

    public class EventsChangeImp implements StringToEntity {
        @Override
        public List change(String line) {
            String[] infos = line.split(",", -1);
            List events=new ArrayList<>();
            Events event = Events.builder().eventid(infos[0]).userid(infos[1]).starttime(infos[2])
                    .city(infos[3]).state(infos[4]).zip(infos[5])
                    .country(infos[6]).lat(infos[7]).lng(infos[8]).build();
            events.add(event);
            return events;
        }
    }
    

    # 编写接口实现类 3  实现接口 3

    /**
     * 将 123123,123435 435455 345345 => 123123,12335 123123,435455 123123,345345
     */
    public class UserFriendsChangeImp implements StringToEntity {
        @Override
        public List change(String line) {
            String[] infos = line.split(",",-1);
            List ufs=new ArrayList<>();
            if(infos.length>=1){
                UserFriends userFriends = UserFriends.builder().userid(infos[0]).friendid("").build();
                ufs.add(userFriends);
            }else {
                Arrays.asList(infos[1].split(" ")).stream()
                        .forEach(fid -> {
                            UserFriends uf = UserFriends.builder().userid(infos[0]).friendid(fid).build();
                            ufs.add(uf);
                        });
            }
            return ufs;
        }
    }

    3.1 配置

    DataChangeFillHbaseDatabase实体类 完成接口整合

    public class DataChangeFillHbaseDatabase extends AbstracDataChange {
        static int count;
    
        public DataChangeFillHbaseDatabase(FillData fillData, StringToEntity stringToEntity, Writer writer) {
            super(fillData, stringToEntity, writer);
        }
    
        @Override
        public List change(String line) {
            return stringToEntity.change(line);
        }
    
        @Override
        public void fill(ConsumerRecord record,String tableName) {
            //读kafka获得的ConsumerRecord 转字符串
            List puts = fillData.fillData(change(record.value()));
    
    //        puts.forEach( System.out::println);
            //将集合填充到对应的Hbase数据库中
            writer.write(puts,tableName);
            System.out.println("hu获取到kafka数据======>"+count++);
    
        }
    }
    

    3.2 配置实体类

    # 实体类 1

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class EventAttendees {
        private String eventid;
        private String userid;
        private String answer;
    }
    

    # 实体类 2

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class Events {
        private String eventid;
        private String userid;
        private String starttime;
        private String city;
        private String state;
        private String zip;
        private String country;
        private String lat;
        private String lng;
    }
    
    # 实体类 3
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @Builder
    public class UserFriends {
        private String userid;
        private String friendid;
    }
    

     3.3 配置读写文件service层

    # 接口 writer
    public interface Writer {
        void write(List puts, String tableName);
    }
    

    #接口实现类

    /**
     * 接受转换后的list数据集合 填写到hbase数据库
     */
    @Component
    public class HbaseWriter implements Writer {
        @Resource
        private Connection connection;
        public void write(List puts,String tableName){
            try {
                Table table = connection.getTable(TableName.valueOf(tableName));
                table.put(puts);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    #KafkaReader 类

    @Component
    public class KafkaReader {
        @Resource
        private Writer writer;
    
        @KafkaListener(groupId = "cm", topics = {"events_raw"})
        public void readEventToHbase(ConsumerRecord record, Acknowledgment ack) {
            AbstracDataChange eventHandler = new DataChangeFillHbaseDatabase(
                    new EventsFillDataImp(),
                    new EventsChangeImp(),
                    writer
            );
            eventHandler.fill(record,"events");
            ack.acknowledge();
        }
    
        @KafkaListener(groupId = "cm", topics = {"event_attendees_raw"})
        public void readEventAttendeesToHbase(ConsumerRecord record, Acknowledgment ack) {
            AbstracDataChange eventHandler = new DataChangeFillHbaseDatabase(
                    new EventAttendeesFillDataImp(),
                    new EventAttendeesChangeImp(),
                    writer
            );
            eventHandler.fill(record,"eventsattends");
            ack.acknowledge();
        }
    
    //    @KafkaListener(groupId = "cm", topics = {"user_friends_raw"})
        public void readUserFriendsToHbase(ConsumerRecord record, Acknowledgment ack) {
            AbstracDataChange eventHandler = new DataChangeFillHbaseDatabase(
                    new UserFriendsFillDataImp(),
                    new UserFriendsChangeImp(),
                    writer
            );
            eventHandler.fill(record,"userfriends");
            ack.acknowledge();
        }
    }

    3.4  hbase配置 

    @Configuration
    public class HbaseConfig {
        @Bean
        public org.apache.hadoop.conf.Configuration getConfig(){
            org.apache.hadoop.conf.Configuration cfg = HBaseConfiguration.create();
            cfg.set(HConstants.ZOOKEEPER_QUORUM,"192.168.64.128:2181");
            return cfg;
        }
        @Bean
        @Scope(value = "prototype")
        public Connection getConnection(){
            Connection connection = null;
            try {
                connection = ConnectionFactory.createConnection(getConfig());
            } catch (IOException e) {
                e.printStackTrace();
            }
            return connection;
        }
        @Bean
        public Supplier hBaseConnection(){
            return this::getConnection;
        }
    }

  • 相关阅读:
    vue-json-editor
    Scala的基本使用
    MATLAB神经网络编程(八)——BP神经网络的限制与改进
    概率论与数理统计(第一章 概率论的基本概念)
    【路径规划】如何给路径增加运动对象
    Nacos配置的加载规则详解(spring cloud 组件教程大全六)
    C++STL详解(六)unordered_set&unordered_map介绍
    仪表基础知识培训
    vue+element实现多级表头加树结构
    逆向案例二:关键字密文解密,自定义的加密解密。基于企名片科技的爬取。
  • 原文地址:https://blog.csdn.net/just_learing/article/details/126444470