如果内存太小会出现读入报错的情况
df
1.导入pom依赖
junit junit 4.12 test org.apache.kafka kafka-clients 2.0.0 org.apache.hbase hbase-client 1.2.0
2.编写代码
public class HbaseReadKafka{
static int i = 0;
static Connection conn;
static {
try {
Configuration cnf = HBaseConfiguration.create();
cnf.set("hbase.zookeeper.quorum", "192.168.64.128:2181");
conn = ConnectionFactory.createConnection(cnf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Properties prop = new Properties();
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.64.128:9092");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
prop.put(ConsumerConfig.GROUP_ID_CONFIG, "cm");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
KafkaConsumer consumer = new KafkaConsumer(prop);
consumer.subscribe(Arrays.asList(new String[]{"user_friends_raw"}));
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(3));
System.out.println("获取数据:" + records.count() + "====>" + (++i));
List puts = new ArrayList<>();
for (ConsumerRecord record : records) {
//对kafka读取的数据处理 塞入集合
String[] lines = record.value().split(",", -1);
if (lines.length > 1) {
String[] fids = lines[1].split(" ");
for (String fid : fids) {
Put put = new Put((lines[0] + "-" + fid).getBytes());
put.addColumn("base".getBytes(), "userid".getBytes(), lines[0].getBytes());
put.addColumn("base".getBytes(), "friendid".getBytes(), fid.getBytes());
puts.add(put);
}
}
}
//找到表 //调用hbase写入数据
try {
Table htable = conn.getTable(TableName.valueOf("userfriends"));
htable.put(puts);
} catch (IOException e) {
e.printStackTrace();
}
//清空一下list集合
puts.clear();
//异步提交
consumer.commitAsync();
//同步提交
// consumer.commitSync();
}
}
}
类型总览!

- <properties>
- <java.version>1.8java.version>
- <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8project.reporting.outputEncoding>
- <spring-boot.version>2.3.7.RELEASEspring-boot.version>
- properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starterartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafkaartifactId>
- dependency>
- <dependency>
- <groupId>org.apache.hbasegroupId>
- <artifactId>hbase-shaded-clientartifactId>
- <version>1.2.0version>
- dependency>
- <dependency>
- <groupId>org.projectlombokgroupId>
- <artifactId>lombokartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- <exclusions>
- <exclusion>
- <groupId>org.junit.vintagegroupId>
- <artifactId>junit-vintage-engineartifactId>
- exclusion>
- exclusions>
- dependency>
- <dependency>
- <groupId>org.springframework.kafkagroupId>
- <artifactId>spring-kafka-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
-
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-dependenciesartifactId>
- <version>${spring-boot.version}version>
- <type>pomtype>
- <scope>importscope>
- dependency>
- dependencies>
- dependencyManagement>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-compiler-pluginartifactId>
- <version>3.8.1version>
- <configuration>
- <source>1.8source>
- <target>1.8target>
- <encoding>UTF-8encoding>
- configuration>
- plugin>
- <plugin>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-maven-pluginartifactId>
- <version>2.3.2.RELEASEversion>
- <configuration>
- <mainClass>com.kgc.UserinterestApplicationmainClass>
- configuration>
- <executions>
- <execution>
- <id>repackageid>
- <goals>
- <goal>repackagegoal>
- goals>
- execution>
- executions>
- plugin>
- plugins>
- build>
spring:
application:
name: userinterest
kafka:
bootstrap-servers: 192.168.64.128:9092
consumer:
#关闭手动提交
enable-auto-commit: false
#从首位置读取文件
auto-offset-reset: earliest
#序列化编码
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual_immediate
server:
port: 8999
# 编写接口1
public interface FillData{ List fillData (List lst); } # 编写接口2
/** * 根据查询结果自动填充数据 * @param*/ public interface FillHbaseData extends FillData { List fillData(List list); } # 编写接口3 /** * 根据用户的不同数据和传入的实体模型进行数据格式转换接口 * @param*/ public interface StringToEntity { List change(String line); } # 编写接口 4
/** * 数据转化接口 kafka数据转为常用数据格式 * @param*/ public interface DataChange { List change(String line); } # 编写抽象类 5 实现接口4
/** * 桥梁模式中的抽象角色 */ public abstract class AbstracDataChangeimplements DataChange { protected FillData fillData; protected StringToEntity stringToEntity; protected Writer writer; public AbstracDataChange(FillData fillData, StringToEntity stringToEntity, Writer writer) { this.fillData = fillData; this.stringToEntity = stringToEntity; this.writer = writer; } public abstract List change(String line); public abstract void fill(ConsumerRecord record,String tableName); }
# 编写接口实现类1 实现接口 2
1.1
/** * 处理eventAttendees数据 */ public class EventAttendeesFillDataImp implements FillHbaseData{ @Override public List fillData(List list) { List puts=new ArrayList<>(); list.stream().forEach(eventAttendees -> { Put put = new Put((eventAttendees.getEventid()+eventAttendees.getUserid()+eventAttendees.getAnswer()).getBytes()); put.addColumn("base".getBytes(),"eventid".getBytes(),eventAttendees.getEventid().getBytes()); put.addColumn("base".getBytes(),"userid".getBytes(),eventAttendees.getUserid().getBytes()); put.addColumn("base".getBytes(),"answer".getBytes(),eventAttendees.getAnswer().getBytes()); puts.add(put); }); return puts; } } # 编写接口实现类2 实现接口 2
1.2
public class EventsFillDataImp implements FillHbaseData{ @Override public List fillData(List list) { List puts=new ArrayList<>(); list.stream().forEach(events -> { Put put=new Put(events.getEventid().getBytes()); put.addColumn("base".getBytes(),"userid".getBytes(),events.getUserid().getBytes()); put.addColumn("base".getBytes(),"starttime".getBytes(),events.getStarttime().getBytes()); put.addColumn("base".getBytes(),"city".getBytes(),events.getCity().getBytes()); put.addColumn("base".getBytes(),"state".getBytes(),events.getState().getBytes()); put.addColumn("base".getBytes(),"zip".getBytes(),events.getZip().getBytes()); put.addColumn("base".getBytes(),"country".getBytes(),events.getCountry().getBytes()); put.addColumn("base".getBytes(),"lat".getBytes(),events.getLat().getBytes()); put.addColumn("base".getBytes(),"lng".getBytes(),events.getLng().getBytes()); puts.add(put); }); return puts; } } # 编写接口实现类3 实现接口 2
1.3
/** * 针对userfriends消息队列转换的list集合再转为list*/ public class UserFriendsFillDataImp implements FillHbaseData { @Override public List fillData(List list) { List puts=new ArrayList<>(); list.stream().forEach(userFriends -> { Put put = new Put((userFriends.getUserid()+"-"+userFriends.getFriendid()).getBytes()); put.addColumn("base".getBytes(),"userid".getBytes(), userFriends.getUserid().getBytes()); put.addColumn("base".getBytes(),"friendid".getBytes(), userFriends.getFriendid().getBytes()); puts.add(put); }); return puts; } }
# 编写接口实现类1 实现接口 3
public class EventAttendeesChangeImp implements StringToEntity{ /** * 数据进入为 eventid yes maybe invited no * ex:123,112233 34343,234234 45454,112233 23232 234234,343343 34343 * 将数据格式转为 123 112233 yes,123 34343 yes,123 234234 maybe ...... * @param line * @return */ @Override public List change(String line) { String[] infos = line.split(",", -1); List eas= new ArrayList<>(); //先计算所有回答yes的人 if (!infos[1].trim().equals("")&&infos[1]!=null) { Arrays.asList(infos[1].split(" ")).stream() .forEach(yes->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(yes).answer("yes").build(); eas.add(eventAttendees); }); } //计算所有maybe的人 if (!infos[2].trim().equals("")&&infos[2]!=null) { Arrays.asList(infos[2].split(" ")).stream() .forEach(maybe->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(maybe).answer("maybe").build(); eas.add(eventAttendees); }); } //计算所有invited的人 if (!infos[3].trim().equals("")&&infos[3]!=null) { Arrays.asList(infos[3].split(" ")).stream() .forEach(invited->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(invited).answer("invited").build(); eas.add(eventAttendees); }); } //计算no的人 if (!infos[4].trim().equals("")&&infos[4]!=null) { Arrays.asList(infos[4].split(" ")).stream() .forEach(no->{ EventAttendees eventAttendees = EventAttendees.builder().eventid(infos[0]).userid(no).answer("no").build(); eas.add(eventAttendees); }); } return eas; } } # 编写接口实现类2 实现接口 3
public class EventsChangeImp implements StringToEntity{ @Override public List change(String line) { String[] infos = line.split(",", -1); List events=new ArrayList<>(); Events event = Events.builder().eventid(infos[0]).userid(infos[1]).starttime(infos[2]) .city(infos[3]).state(infos[4]).zip(infos[5]) .country(infos[6]).lat(infos[7]).lng(infos[8]).build(); events.add(event); return events; } } # 编写接口实现类 3 实现接口 3
/** * 将 123123,123435 435455 345345 => 123123,12335 123123,435455 123123,345345 */ public class UserFriendsChangeImp implements StringToEntity{ @Override public List change(String line) { String[] infos = line.split(",",-1); List ufs=new ArrayList<>(); if(infos.length>=1){ UserFriends userFriends = UserFriends.builder().userid(infos[0]).friendid("").build(); ufs.add(userFriends); }else { Arrays.asList(infos[1].split(" ")).stream() .forEach(fid -> { UserFriends uf = UserFriends.builder().userid(infos[0]).friendid(fid).build(); ufs.add(uf); }); } return ufs; } }
public class DataChangeFillHbaseDatabaseextends AbstracDataChange { static int count; public DataChangeFillHbaseDatabase(FillData fillData, StringToEntity stringToEntity, Writer writer) { super(fillData, stringToEntity, writer); } @Override public List change(String line) { return stringToEntity.change(line); } @Override public void fill(ConsumerRecord record,String tableName) { //读kafka获得的ConsumerRecord 转字符串 List puts = fillData.fillData(change(record.value())); // puts.forEach( System.out::println); //将集合填充到对应的Hbase数据库中 writer.write(puts,tableName); System.out.println("hu获取到kafka数据======>"+count++); } }
# 实体类 1
@Data @AllArgsConstructor @NoArgsConstructor @Builder public class EventAttendees { private String eventid; private String userid; private String answer; }# 实体类 2
@Data @AllArgsConstructor @NoArgsConstructor @Builder public class Events { private String eventid; private String userid; private String starttime; private String city; private String state; private String zip; private String country; private String lat; private String lng; } # 实体类 3@Data @AllArgsConstructor @NoArgsConstructor @Builder public class UserFriends { private String userid; private String friendid; }
# 接口 writer public interface Writer{ void write(List puts, String tableName); } #接口实现类
/** * 接受转换后的list数据集合 填写到hbase数据库 */ @Component public class HbaseWriter implements Writer { @Resource private Connection connection; public void write(List puts,String tableName){ try { Table table = connection.getTable(TableName.valueOf(tableName)); table.put(puts); } catch (IOException e) { e.printStackTrace(); } } } #KafkaReader 类
@Component public class KafkaReader { @Resource private Writerwriter; @KafkaListener(groupId = "cm", topics = {"events_raw"}) public void readEventToHbase(ConsumerRecord record, Acknowledgment ack) { AbstracDataChange eventHandler = new DataChangeFillHbaseDatabase ( new EventsFillDataImp(), new EventsChangeImp(), writer ); eventHandler.fill(record,"events"); ack.acknowledge(); } @KafkaListener(groupId = "cm", topics = {"event_attendees_raw"}) public void readEventAttendeesToHbase(ConsumerRecord record, Acknowledgment ack) { AbstracDataChange eventHandler = new DataChangeFillHbaseDatabase ( new EventAttendeesFillDataImp(), new EventAttendeesChangeImp(), writer ); eventHandler.fill(record,"eventsattends"); ack.acknowledge(); } // @KafkaListener(groupId = "cm", topics = {"user_friends_raw"}) public void readUserFriendsToHbase(ConsumerRecord record, Acknowledgment ack) { AbstracDataChange eventHandler = new DataChangeFillHbaseDatabase ( new UserFriendsFillDataImp(), new UserFriendsChangeImp(), writer ); eventHandler.fill(record,"userfriends"); ack.acknowledge(); } }
@Configuration
public class HbaseConfig {
@Bean
public org.apache.hadoop.conf.Configuration getConfig(){
org.apache.hadoop.conf.Configuration cfg = HBaseConfiguration.create();
cfg.set(HConstants.ZOOKEEPER_QUORUM,"192.168.64.128:2181");
return cfg;
}
@Bean
@Scope(value = "prototype")
public Connection getConnection(){
Connection connection = null;
try {
connection = ConnectionFactory.createConnection(getConfig());
} catch (IOException e) {
e.printStackTrace();
}
return connection;
}
@Bean
public Supplier hBaseConnection(){
return this::getConnection;
}
}