• HBase表数据的读、写操作与综合操作


    HBase表数据的读、写操作与综合操作

    一、实验目标

    1. 熟练掌握通过HBase shell命令来设计HBase表结构实例
    2. 掌握使用HBase编程创建HBase表、删除HBase表、修改HBase表和查看HBase表和表结构。
    3. 掌握通过HBase 编程实现HBase表数据的读、写操作

    二、实验要求及注意事项

    1. 给出每个实验的主要实验步骤、实现代码和测试效果截图。
    2. 对本次实验工作进行全面的总结分析。
    3. 建议工程名,类名、包名或表名显示个人学号或者姓名

    三、实验内容及步骤

    实验任务1:使用MapReduce批量将HBase表中数据导入到HDFS上。表名和表中数据自拟。

    主要实现步骤和运行效果图:

    完整程序

    WjwReadMapper:

    package hbase;
    import org.apache.hadoop.hbase.mapreduce.TableMapper;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.io.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.*;
    import org.apache.hadoop.io.*;
    public class WjwReadMapper extends TableMapper<Writable, Writable> {
    	private Text k=new Text();
    	private Text v=new Text();
    	public static final String F1="\u0001";
    	protected void setup(Context c){
    	}
    	public void map(ImmutableBytesWritable row,Result r,Context c){
    		String value=null;
    		String rk=new String(row.get());
    		byte[] family=null;
    		byte[] column=null;
    		long ts=0L;
    		try{
    			for(KeyValue kv:r.list()){
    			value=Bytes.toStringBinary(kv.getValue());
    			family=kv.getFamily();
    			column=kv.getQualifier();
    			ts=kv.getTimestamp();
    			k.set(rk);
    			v.set(Bytes.toString(family)+F1+Bytes.toString(column)+F1+value+F1+ts);
    			c.write(k, v);
    			}
    		}catch(Exception e){
    			e.printStackTrace();
    			System.err.println();
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    WjwReadMain:

    package hbase;
    import java.io.IOException;
    import org.apache.hadoop.io.*;
    import org.apache.commons.logging.*;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.*;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
    import org.apache.hadoop.hbase.util.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    
    public class WjwReadMain {
    	public static final Log LOG = LogFactory.getLog(WjwMain.class);
    	public static final String NAME = "Member Test1";
    	public static final String TEMP_INDEX_PATH = "hdfs://master:9000/tmp/tb_wjw";
    	public static String inputTable = "tb_wjw";
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
    		Configuration conf = HBaseConfiguration.create();
    		Scan scan = new Scan();
    		scan.setBatch(0);
    		scan.setCaching(10000);
    		scan.setMaxVersions();
    		scan.setTimeRange(System.currentTimeMillis() - 3*24*3600*1000L, System.currentTimeMillis());
    		scan.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("keyword"));
    		conf.setBoolean("mapred.map.tasks.speculative.execution", false);
    		conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
    		Path tmpIndexPath = new Path(TEMP_INDEX_PATH);
    		FileSystem fs = FileSystem.get(conf);
    		if(fs.exists(tmpIndexPath)){
    			fs.delete(tmpIndexPath, true);
    		}
    		
    		Job job = new Job(conf, NAME);
    		job.setJarByClass(WjwMain.class);
    		TableMapReduceUtil.initTableMapperJob(inputTable, scan, WjwMapper.class, Text.class, Text.class, job);
    	   job.setNumReduceTasks(0);
    	   job.setOutputFormatClass(TextOutputFormat.class);
    	   FileOutputFormat.setOutputPath(job, tmpIndexPath);
    	   boolean success = job.waitForCompletion(true);
    	   System.exit(success?0:1);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    运行结果

    创建表,用于等会将数据传入hadoop里

    1-1

    运行map程序将表数据导入hadoop,并查看是否导入成功

    1-2

    实验任务2:使用MapReduce批量将HDFS上的数据导入到HBase表中。表名和数据自拟,建议体现个人学号或姓名。使用Java编程创建表和删除表,表名和列族自拟。

    主要实现步骤和运行效果图:

    完整程序

    WjwWriteMapper:

    package hbase;
    import java.io.*;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.io.*;
    public class WjwWriteMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    	private byte[] family=null;
    	private byte[] qualifier=null;
    	private byte[] val=null;
    	private String rk=null;
    	private long ts=System.currentTimeMillis();
    	protected void map(LongWritable key,Text value,Context context) throws InterruptedException, IOException{
    		try{
    			String line=value.toString();
    			String[] arr=line.split("\t",-1);
    			if(arr.length==2){
    				 rk=arr[0];
    			    String[] vals=arr[1].split("\u0001",-1);
    			    if(vals.length==4){
    			        family=vals[0].getBytes();
    					  qualifier=vals[1].getBytes();
    					  val=vals[2].getBytes();
    					  ts=Long.parseLong(vals[3]);
    					  Put put=new Put(rk.getBytes(),ts);
    					  put.add(family,qualifier,val);
    			        context.write(new ImmutableBytesWritable(rk.getBytes()), put);
    			    }
    			}
    		}catch(Exception e){
    		    e.printStackTrace();
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    WjwWriteMain:

    package hbase;
    import org.apache.hadoop.util.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.io.*;
    import org.apache.hadoop.hbase.mapreduce.*;
    import java.io.IOException;
    import org.apache.commons.logging.*;
    public class WjwWriteMain extends Configured implements Tool{
    	static final Log LOG=LogFactory.getLog(WjwWriteMain.class);
    	public int run(String[] args)throws Exception{
    		if(args.length!=2){
    		    LOG.info("2 parameters needed!");
    		}
    		String input="hdfs://master:9000/tmp/tb_wjw/part-m-00000";
    		String table="tb_wjw01";
    		Configuration conf=HBaseConfiguration.create();
    		Job job=new Job(conf,"Input from file "+input+" into table "+table);
    		job.setJarByClass(WjwWriteMain.class);
    		job.setMapperClass(WjwWriteMapper.class);
    		job.setJarByClass(WjwWriteMain.class);
    		job.setMapperClass(WjwWriteMapper.class);
    		job.setOutputFormatClass(TableOutputFormat.class);
    		job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE,table);
    		job.setOutputKeyClass(ImmutableBytesWritable.class);
    		job.setOutputValueClass(Waitable.class);
    		job.setNumReduceTasks(0);
    		FileInputFormat.addInputPath(job, new Path(input));
    		return job.waitForCompletion(true)?0:1;
    	}
    	public static void main(String[] args) throws IOException {
    	    Configuration conf=new Configuration();
    		 String[] otherArgs=new GenericOptionsParser(conf,args).getRemainingArgs();
    		try {
    		    System.out.println(ToolRunner.run(conf, new WjwWriteMain(),otherArgs));
    		}catch(Exception e) {
    		    e.printStackTrace();
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    运行结果

    创建一个空表tb_wjw01,用于等会将tb_wjw的数据导入tb_wjw01

    2-1

    配置yarn,并运行map程序

    2-2
    2-3

    查看hadoop里的表tb_wjw

    2-4

    将hadoop里tb_wjw的数据导入hbase里的tb_wjw01里面

    2-5

    实验任务3:在实验任务1和实验任务2的基础上,通过HBase编程,实现创建HBase表,修改HBase表(包括增加列族和删除列族),向HBase表中写入数据,读取HBase表中数据,查看HBase数据库中所有表和表结构功能,建议在一个类中定义多个方法实现上述功能,并进行验证。表名和数据自拟。

    主要实现步骤和运行效果图:

    完整程序

    package hbase;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class WjwHbase{
        private static Configuration conf = HBaseConfiguration.create();
    
        public static void createTable(String tableName, String[] families)
                throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                System.out.println("Table already exists!");
            } else {
                HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
                for (String family : families) {
                    tableDesc.addFamily(new HColumnDescriptor(family));
                }
                admin.createTable(tableDesc);
                System.out.println("Table created successfully!");
            }
            admin.close();
            conn.close();
        }
    
        public static void addRecord(String tableName, String rowKey,
                                      String family, String qualifier, String value) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value));
            table.put(put);
            System.out.println("Record added successfully!");
            table.close();
            conn.close();
        }
    
        public static void deleteRecord(String tableName, String rowKey,
                                         String family, String qualifier) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf(tableName));
            Delete delete = new Delete(Bytes.toBytes(rowKey));
            delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
            table.delete(delete);
            System.out.println("Record deleted successfully!");
            table.close();
            conn.close();
        }
    
        public static void deleteTable(String tableName) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                admin.disableTable(TableName.valueOf(tableName));
                admin.deleteTable(TableName.valueOf(tableName));
                System.out.println("Table deleted successfully!");
            } else {
                System.out.println("Table does not exist!");
            }
            admin.close();
            conn.close();
        }
    
        public static void addColumnFamily(String tableName, String columnFamily) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                HColumnDescriptor columnDesc = new HColumnDescriptor(columnFamily);
                admin.addColumn(TableName.valueOf(tableName), columnDesc);
                System.out.println("Column family added successfully!");
            } else {
                System.out.println("Table does not exist!");
            }
            admin.close();
            conn.close();
        }
    
        public static void deleteColumnFamily(String tableName, String columnFamily) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            if (admin.tableExists(TableName.valueOf(tableName))) {
                admin.deleteColumn(TableName.valueOf(tableName), Bytes.toBytes(columnFamily));
                System.out.println("Column family deleted successfully!");
            } else {
                System.out.println("Table does not exist!");
            }
            admin.close();
            conn.close();
        }
    
        public static void getRecord(String tableName, String rowKey,
                                      String family, String qualifier) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf(tableName));
            Get get = new Get(Bytes.toBytes(rowKey));
            Result result = table.get(get);
            byte[] value = result.getValue(Bytes.toBytes(family), Bytes.toBytes(qualifier));
            System.out.println("Result: " + Bytes.toString(value));
            table.close();
            conn.close();
        }
    
        public static void scanTable(String tableName) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Table table = conn.getTable(TableName.valueOf(tableName));
            Scan scan = new Scan();
            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) {
                System.out.println("Result: " + result);
            }
            table.close();
            conn.close();
        }
    
        public static void listTables() throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            HTableDescriptor[] tableDescs = admin.listTables();
            List<String> tableNames = new ArrayList<String>();
            for (HTableDescriptor tableDesc : tableDescs) {
                tableNames.add(tableDesc.getNameAsString());
            }
            System.out.println("Tables: " + tableNames);
            admin.close();
            conn.close();
        }
    
        public static void describeTable(String tableName) throws IOException {
            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            HTableDescriptor tableDesc = admin.getTableDescriptor(TableName.valueOf(tableName));
            System.out.println("Table structure: " + tableDesc);
            admin.close();
            conn.close();
        }
    
        public static void main(String[] args) throws IOException {
            String tableName = "wjwtest";
            String rowKey = "row1";
            String family = "cf1";
            String qualifier = "q1";
            String value = "this is wjw!";
            String columnFamily = "cf2";
            String[] families = {family};
            createTable(tableName, families);
            addRecord(tableName, rowKey, family, qualifier, value);
            getRecord(tableName, rowKey, family, qualifier);
            scanTable(tableName);
            addColumnFamily(tableName, columnFamily);
            describeTable(tableName);
            deleteColumnFamily(tableName, columnFamily);
            deleteRecord(tableName, rowKey, family, qualifier);
            deleteTable(tableName);
            listTables();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163

    运行结果

    3-1
    3-2
    3-3
    3-4
    3-5
    3-6

    附:系列文章

    实验文章目录直达链接
    实验01Hadoop安装部署https://want595.blog.csdn.net/article/details/132767284
    实验02HDFS常用shell命令https://want595.blog.csdn.net/article/details/132863345
    实验03Hadoop读取文件https://want595.blog.csdn.net/article/details/132912077
    实验04HDFS文件创建与写入https://want595.blog.csdn.net/article/details/133168180
    实验05HDFS目录与文件的创建删除与查询操作https://want595.blog.csdn.net/article/details/133168734
    实验06SequenceFile、元数据操作与MapReduce单词计数https://want595.blog.csdn.net/article/details/133926246
    实验07MapReduce编程:数据过滤保存、UID 去重https://want595.blog.csdn.net/article/details/133947981
    实验08MapReduce 编程:检索特定群体搜索记录和定义分片操作https://want595.blog.csdn.net/article/details/133948849
    实验09MapReduce 编程:join操作和聚合操作https://want595.blog.csdn.net/article/details/133949148
    实验10MapReduce编程:自定义分区和自定义计数器https://want595.blog.csdn.net/article/details/133949522
  • 相关阅读:
    【计网 传输层概述】 中科大郑烇老师笔记 (十)
    cuda文档链接
    C++多线程带参可执行对象的值传递
    用 Rust 的 declarative macro 做了个小东西
    Mand Mobile - 滴滴出品的适用于金融项目的 Vue 移动端 UI 组件库,免费开源、灵活快速、丰富实用
    滑动窗口:最长不含重复字符的子字符串
    天星金融细说社保 筑牢民生保障防线
    nginx中gzip推荐配置
    机器学习(三十五):多层感知器分类器
    hangfire安装
  • 原文地址:https://blog.csdn.net/m0_68111267/article/details/134229414