Java的序列化:将对象写入到文件中
Student.java
//实现Java的序列化,必须实现Serializable
public class Student implements Serializable {
private int stuID;
private String stuName;
public int getStuID() {
return stuID;
}
public void setStuID(int stuID) {
this.stuID = stuID;
}
public String getStuName() {
return stuName;
}
public void setStuName(String stuName) {
this.stuName = stuName;
}
}
TestStudent.java
public class TestStudent {
public static void main(String[] args) throws Exception {
//创建一个学生对象
Student student = new Student();
student.setStuID(1);
student.setStuName("Tom");
//把这个对象保存到文件中------------>序列化
OutputStream out = new FileOutputStream("F:\\temp\\student.txt");
ObjectOutputStream oos = new ObjectOutputStream(out);
oos.writeObject(student);
oos.close();
out.close();
}
}
核心:接口 Writable
如果一个类实现了的Hadoop的序列化机制(接口:Writable),这个类的对象就可以作为输入和输出的值。
案例1: 读取员工数据,生成员工的对象,直接输出到HDFS
Emp.java
//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1998/9/29,1250,1400,30
public class Emp implements Writable {
private int empno;//员工号
private String ename;//员工姓名
private String job;//职位
private int mgr;//经理的员工号
private String hiredate;//加入时间
private int sal;//月薪
private int comm;//奖金
private int deptno;//部门编号
@Override
public String toString() {
return "Emp{" +
"empno=" + empno +
", ename='" + ename + '\'' +
", sal=" + sal +
", deptno=" + deptno +
'}';
}
@Override
public void write(DataOutput dataOutput) throws IOException {
//实现序列化,把对象输出到输出流
dataOutput.writeInt(this.empno);
dataOutput.writeUTF(this.ename);
dataOutput.writeUTF(this.job);
dataOutput.writeInt(this.mgr);
dataOutput.writeUTF(this.hiredate);
dataOutput.writeInt(this.sal);
dataOutput.writeInt(this.comm);
dataOutput.writeInt(this.deptno);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
//实现反序列化,从输入流中读取对象
this.empno = dataInput.readInt();
this.ename = dataInput.readUTF();
this.job = dataInput.readUTF();
this.mgr = dataInput.readInt();
this.hiredate = dataInput.readUTF();
this.sal = dataInput.readInt();
this.comm = dataInput.readInt();
this.deptno = dataInput.readInt();
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
EmpInforMapper.java
public class EmpInforMapper extends Mapper<LongWritable, Text, IntWritable, Emp> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1998/9/29,1250,1400,30
String data = value1.toString();
String[] words = data.split(",");
Emp emp = new Emp();
emp.setEmpno(Integer.parseInt(words[0]));
emp.setEname(words[1]);
emp.setJob(words[2]);
emp.setMgr(Integer.parseInt(words[3]));
emp.setHiredate(words[4]);
emp.setSal(Integer.parseInt(words[5]));
emp.setComm(Integer.parseInt(words[6]));
emp.setDeptno(Integer.parseInt(words[7]));
context.write(new IntWritable(emp.getEmpno()), emp);
}
}
EmoInfoMain.java
public class EmpInfoMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmpInfoMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(EmpInforMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Emp.class);
//3、指定任务输出的数据类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
导出 jar 包,上传虚拟机,执行命令
案例2: 使用MapReduce序列化重写“求每个部门的工资总额"
SalaryTotalMapper.java
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Emp> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1998/9/29,1250,1400,30
String data = value1.toString();
String[] words = data.split(",");
Emp emp = new Emp();
emp.setEmpno(Integer.parseInt(words[0]));
emp.setEname(words[1]);
emp.setJob(words[2]);
emp.setMgr(Integer.parseInt(words[3]));
emp.setHiredate(words[4]);
emp.setSal(Integer.parseInt(words[5]));
emp.setComm(Integer.parseInt(words[6]));
emp.setDeptno(Integer.parseInt(words[7]));
context.write(new IntWritable(emp.getDeptno()), emp);
}
}
SalaryTotalReducer.java
public class SalaryTotalReducer extends Reducer<IntWritable, Emp, IntWritable, IntWritable> {
@Override
protected void reduce(IntWritable key3, Iterable<Emp> values3, Context context) throws IOException, InterruptedException {
//对v3求和
int total = 0;
for (Emp emp : values3)
total += emp.getSal();
//输出 k4:部门 v4:部门总工资
context.write(key3, new IntWritable(total));
}
}
SalaryTotalMain.java
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Emp.class);
//3、指定任务的Reduce和reduce输出的数据类型
job.setReducerClass(SalaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
导出 jar 包,上传虚拟机,执行命令
规则:按照Key2排序
基本数据类型
默认升序。可以改变默认的排序规则(创建自己的比较器即可),以MapReduce案例2——统计每个部门的工资总额,举例如下。
其他程序不变,仅新建一个自己的比较规则类,以及修改main方法,指定自己的比较规则。
MyNumbercomparator.java
//针对是number的数据结构,定义自己的比较结构
public class MyNumbercomparator extends IntWritable.Comparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
SalaryTotalMain.java
public class SalaryTotalMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(SalaryTotalMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(SalaryTotalMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Emp.class);
// 指定自己的比较规则
job.setSortComparatorClass(MyNumbercomparator.class);
//3、指定任务的Reduce和reduce输出的数据类型
job.setReducerClass(SalaryTotalReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
默认字典顺序。可以改变默认的排序规则(创建自己的比较器即可),以WordCount程序举例如下。
其他程序不变,仅新建一个自己的比较规则类,以及修改main方法,指定自己的比较规则。
MyTextComparator.java
//针对是Text的数据结构,定义自己的比较结构
public class MyTextComparator extends Text.Comparator {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
WordCountMain.java
public class WordCountMain {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(WordCountMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(WordCountMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定自己的比较规则
job.setSortComparatorClass(MyTextComparator.class);
//3、指定任务的Reduce
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
order by (后面) + 列名、表达式、别名、序号
select * from emp order by sal;
select empno, ename, sal, sal * 12, from emp order by sal * 12 desc;
select empno, ename, sal, sal * 12 annlsal, from emp order by annlsal desc;
select empno, ename, sal, sal * 12 annlsal, from emp order by 4 desc;
desc 只作用于最近的一列,下述语句仅对 sal 排序起作用。
select * from emp order by sal desc;
select * from emp order by deptno, sal desc;
补充知识:在Oracle数据库中,查询的结果不是原来的表,是Oracle创建的临时表数据
Java 的对象排序,实现 java.lang接口 Comparable 接口
Student.java
/学生对象:按照学生的age年龄进行排序
public class Student implements Comparable<Student>{
private int stuID;
private String stuName;
private int age;
public Student(int stuID, String stuName, int age) {
this.stuID = stuID;
this.stuName = stuName;
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"stuID=" + stuID +
", stuName='" + stuName + '\'' +
", age=" + age +
'}';
}
@Override
public int compareTo(Student o) {
//定义排序规则:按照学生的age年龄进行排序
if (this.age >= o.getAge()){
return 1;
}else {
return -1;
}
}
public int getStuID() {
return stuID;
}
public void setStuID(int stuID) {
this.stuID = stuID;
}
public String getStuName() {
return stuName;
}
public void setStuName(String stuName) {
this.stuName = stuName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
}
StudentMain.java
public class StudentMain {
public static void main(String[] args) {
//创建几个学生对象
Student s1 = new Student(1, "Tom", 24);
Student s2 = new Student(2, "Mary", 26);
Student s3 = new Student(3, "Mike", 25);
//创建数组排序
Student[] stuList = {s1, s2, s3};
Arrays.sort(stuList);
for (Student student : stuList)
System.out.println(student);
}
}
前提:
案例1:实现员工表一个列的mr排序
Emp.java
//代表员工
//数据:7654,MARTIN,SALESMAN,7698,1998/9/29,1250,1400,30
public class Emp implements WritableComparable<Emp> {
private int empno;//员工号
private String ename;//员工姓名
private String job;//职位
private int mgr;//经理的员工号
private String hiredate;//加入时间
private int sal;//月薪
private int comm;//奖金
private int deptno;//部门编号
@Override
public int compareTo(Emp o) {
//定义自己的排序规则:一个列的排序
//按照薪水排序
if (this.sal >= o.getSal())
return 1;
else
return -1;
}
@Override
public String toString() {
return "Emp{" +
"empno=" + empno +
", ename='" + ename + '\'' +
", sal=" + sal +
", deptno=" + deptno +
'}';
}
@Override
public void write(DataOutput dataOutput) throws IOException {
//实现序列化,把对象输出到输出流
dataOutput.writeInt(this.empno);
dataOutput.writeUTF(this.ename);
dataOutput.writeUTF(this.job);
dataOutput.writeInt(this.mgr);
dataOutput.writeUTF(this.hiredate);
dataOutput.writeInt(this.sal);
dataOutput.writeInt(this.comm);
dataOutput.writeInt(this.deptno);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
//实现反序列化,从输入流中读取对象
this.empno = dataInput.readInt();
this.ename = dataInput.readUTF();
this.job = dataInput.readUTF();
this.mgr = dataInput.readInt();
this.hiredate = dataInput.readUTF();
this.sal = dataInput.readInt();
this.comm = dataInput.readInt();
this.deptno = dataInput.readInt();
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
EmpSortMapper.java
/**
* 一定要把Emp作为Key2
* 没有value2,返回null值
*/
public class EmpSortMapper extends Mapper<LongWritable, Text, Emp, NullWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1998/9/29,1250,1400,30
String data = value1.toString();
String[] words = data.split(",");
Emp emp = new Emp();
emp.setEmpno(Integer.parseInt(words[0]));
emp.setEname(words[1]);
emp.setJob(words[2]);
emp.setMgr(Integer.parseInt(words[3]));
emp.setHiredate(words[4]);
emp.setSal(Integer.parseInt(words[5]));
emp.setComm(Integer.parseInt(words[6]));
emp.setDeptno(Integer.parseInt(words[7]));
context.write(emp, NullWritable.get());
}
}
EmpSortMain.java
public class EmpSortMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(EmpSortMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(EmpSortMapper.class);
job.setMapOutputKeyClass(Emp.class);//k2是员工对象
job.setMapOutputValueClass(NullWritable.class);//v2是空值
//3、指定任务输出的数据类型
job.setOutputKeyClass(Emp.class);
job.setOutputValueClass(NullWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
案例2:实现员工表多个列的mr排序
仅修改mr对象中的排序规则,规则代码对下
@Override
public int compareTo(Emp o) {
//定义自己的排序规则:多个列的排序
//先按照部门号进行排序,在按照薪水进行排序
if (this.deptno > o.getDeptno())
return 1;
else if (this.deptno < o.getDeptno()){
return -1;
}
//再按照薪水排序
if (this.sal >= o.getSal())
return 1;
else
return -1;
}
什么是分区(partition)? 结合关系型数据库Oracle说明
1、默认情况下,MapReduce只有一个分区(只有一个输出文件)
2、MapReduce的分区根据 Map 的输出<key2, value2>进行分区
3、自定义分区:
Demo: 按照员工的部门号进行分区,相同部门号的员工输出到一个分区中
Emp.java 与 6.3.1.2 Mapreduce的序列化中的Emp对象一样
MyPartitioner.java
//自定义的分区规则:按照部门号进行分区 k2 部门号 v2 员工对象
public class MyPartitioner extends Partitioner<IntWritable, Emp> {
/**
* //建立我们的分区规则
* @param k2 部门号
* @param v2 员工对象
* @param numTask 分区的个数
* @return 分区号
*/
@Override
public int getPartition(IntWritable k2, Emp v2, int numTask) {
//得到该员工的部门号
int deptno = v2.getDeptno();
if (deptno == 10){
return 1 % numTask;
}else if (deptno == 20){
return 2 % numTask;
}else {
return 3 % numTask;
}
}
}
MyPartitionMapper.java
public class MyPartitionMapper extends Mapper<LongWritable, Text, IntWritable, Emp> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
//数据:7654,MARTIN,SALESMAN,7698,1998/9/29,1250,1400,30
String data = value1.toString();
String[] words = data.split(",");
Emp emp = new Emp();
emp.setEmpno(Integer.parseInt(words[0]));
emp.setEname(words[1]);
emp.setJob(words[2]);
emp.setMgr(Integer.parseInt(words[3]));
emp.setHiredate(words[4]);
emp.setSal(Integer.parseInt(words[5]));
emp.setComm(Integer.parseInt(words[6]));
emp.setDeptno(Integer.parseInt(words[7]));
//输出员工对象 k2 :部门号
context.write(new IntWritable(emp.getDeptno()), emp);
}
}
MyPartitionReducer.java
public class MyPartitionReducer extends Reducer<IntWritable, Emp, IntWritable, Emp> {
@Override
protected void reduce(IntWritable key3, Iterable<Emp> values3, Context context) throws IOException, InterruptedException {
for (Emp emp : values3){
context.write(key3, emp);
}
}
}
MyPartitionMain.java
public class MyPartitionMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(MyPartitionMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(MyPartitionMapper.class);
job.setMapOutputKeyClass(IntWritable.class);//k2是部门号
job.setMapOutputValueClass(Emp.class);//v2输出是员工对象
//加入分区规则
job.setPartitionerClass(MyPartitioner.class);
//指定分区的个数
job.setNumReduceTasks(3);
//3、指定任务输出的数据类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Emp.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
执行程序
1、合并(Combiner)是一种特殊的 reducer
2、作用:合并是在 Map 端执行一次合并,用于减少 Map 输出到 Reducer 的数据量,可以提高效率。
3、举例:以 WordCount 为例
仅仅在**6.1.1.2 开发程序 **小节的 WordCount.java 程序中加入 Combiner,如下
使用 Combiner 的注意事项:
正常情况:没有引入Combiner
AvgSalaryMapper.java
public class AvgSalaryMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
String data = value1.toString();
String[] words = data.split(",");
context.write(new Text("salary"), new IntWritable(Integer.parseInt(words[5])));
}
}
AvgSalaryReducer.java
public class AvgSalaryReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {
@Override
protected void reduce(Text key3, Iterable<IntWritable> values3, Context context) throws IOException, InterruptedException {
int total = 0;
int count = 0;
for (IntWritable salary : values3){
total += salary.get();//工资求和
count++;//人数加一
}
context.write(new Text("The avg salary is :"), new DoubleWritable(total / count));
}
}
AvgSalaryMain.java
public class AvgSalaryMain {
public static void main(String[] args) throws Exception {
//1、创建一个任务,指定任务的入口
Job job = Job.getInstance(new Configuration());
job.setJarByClass(AvgSalaryMain.class);
//2、指定任务的map和map输出的数据类型
job.setMapperClass(AvgSalaryMapper.class);
job.setMapOutputKeyClass(Text.class);//k2是部门号
job.setMapOutputValueClass(IntWritable.class);//v2输出是员工对象
//加入combiner
job.setCombinerClass(AvgSalaryReducer.class);
//3、指定任务输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
//4、指定任务的输入路径、任务的输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//5、执行任务
job.waitForCompletion(true);
}
}
异常情况:引入Combiner
在 map 和 Reducer 的中间添加 Combiner
//2、指定任务的map和map输出的数据类型
...
//加入combiner
job.setCombinerClass(AvgSalaryReducer.class);
//4、指定任务的输入路径、任务的输出路径
...
参数文件 | 配置参数 | 参考值 |
---|---|---|
yarn-site.xml | yarn.nodemanager.aux-services | mapreduce_shuffle |