• MapReduce Common PageRank


    题目描述:

    基于两个输入文本(网页链接关系、初始的网页排名)实现网页链接排名算法(阻尼系数以0.85计算)。 本题对网页排名值的收敛条件做了简化,如果当某一网页当前排名值与上一轮迭代排名值之间差值的绝对值小于1e-6,那么认为该网页的排名值已经收敛。 迭代停止的条件为达到最大迭代次数或某次迭代中所有网页均收敛。 网页总数N在测试阶段由后台自动给出。
    输入格式:文本中的第一列都为网页名,列与列之间用空格分隔。其中,网页链接关系文本中的其他列为出站链接,如A B D表示网页A链向网页B和D(所有网页权重按1.0计算)

    A B D
    B C
    C A B
    D B C

    初始的网页排名文本第二列为该网页的排名值,如 A 1 表示网页A的排名为1

    A 1
    B 1
    C 1
    D 1

    输出格式: 要求分两步完成。第一步连接网页链接关系和初始的网页排名两个文件,输出连接结果:

    A 1 B D
    B 1 C
    C 1 A B
    D 1 B C

    第二步输出网页的链接关系和最终的排名值:

    A 0.21436248817266176 B D
    B 0.3633209225962085 C
    C 0.40833002013844744 A B
    D 0.1302651623462253 B C

    在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankJoinMapperImpl,继承PageRankJoinMapper,实现抽象方;在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankJoinReducerImpl,继承PageRankJoinReducer,实现抽象方法;在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankMapperImpl,继承PageRankMapper,实现抽象方法;在DSPPCode.mapreduce.common_pagerank.impl中创建PageRankReducerImpl,继承PageRankReducer,实现抽象方法。
    注意:连接阶段无需将排名值解析为数值类型, 计算网页排名的阶段请将排名值解析为 double 类型变量进行计算。 输出结果的小数位数无需处理。

    2、代码

    PageRankJoinMapperImpl.java

    package DSPPCode.mapreduce.common_pagerank.impl;
    
    import DSPPCode.mapreduce.common_pagerank.question.PageRankJoinMapper;
    import DSPPCode.mapreduce.common_pagerank.question.utils.ReduceJoinWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Mapper.Context;
    import java.io.IOException;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;
    
    public class PageRankJoinMapperImpl extends PageRankJoinMapper {
    
      @Override
      public void map(LongWritable key, Text value,
          Mapper<LongWritable, Text, Text, ReduceJoinWritable>.Context context)
          throws IOException, InterruptedException {
        String line = value.toString();
        String[] segments = line.split(" ", 2);
        ReduceJoinWritable val = new ReduceJoinWritable();
        val.setData(segments[1]);
        Pattern pattern = Pattern.compile("[0-9]*\\.?[0-9]+");
        Matcher isNum = pattern.matcher(segments[1]);
        if(isNum.matches()) val.setTag("2");
        else val.setTag("1");
        context.write(new Text(segments[0]), val);
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    PageRankJoinReducerImpl.java

    package DSPPCode.mapreduce.common_pagerank.impl;
    
    import DSPPCode.mapreduce.common_pagerank.question.PageRankJoinReducer;
    import DSPPCode.mapreduce.common_pagerank.question.utils.ReduceJoinWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    import java.io.IOException;
    
    public class PageRankJoinReducerImpl extends PageRankJoinReducer {
    
      @Override
      public void reduce(Text key, Iterable<ReduceJoinWritable> values,
          Reducer<Text, ReduceJoinWritable, Text, NullWritable>.Context context)
          throws IOException, InterruptedException {
        String[] row = new String[2];
        for(ReduceJoinWritable value : values){
          if(value.getTag().equals("2")){
            row[0] = value.getData();
          }
          else if(value.getTag().equals("1")){
            row[1] = value.getData();
          }
        }
        String line = key.toString() + " " + row[0] + " " + row[1];
        context.write(new Text(line), NullWritable.get());
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    PageRankMapperImpl.java

    package DSPPCode.mapreduce.common_pagerank.impl;
    
    import DSPPCode.mapreduce.common_pagerank.question.PageRankMapper;
    
    public class PageRankMapperImpl extends PageRankMapper {
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    PageRankReducerImpl.java

    package DSPPCode.mapreduce.common_pagerank.impl;
    
    import DSPPCode.mapreduce.common_pagerank.question.PageRankReducer;
    import DSPPCode.mapreduce.common_pagerank.question.PageRankRunner;
    import DSPPCode.mapreduce.common_pagerank.question.utils.ReducePageRankWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.Reducer.Context;
    import java.io.IOException;
    
    public class PageRankReducerImpl extends PageRankReducer {
    
      @Override
      public void reduce(Text key, Iterable<ReducePageRankWritable> values,
          Reducer<Text, ReducePageRankWritable, Text, NullWritable>.Context context)
          throws IOException, InterruptedException {
        double pr = 0.0;
        String outPage = "";
        double lastPr = 0.0;
        for(ReducePageRankWritable value : values){
          if(value.getTag().equals(ReducePageRankWritable.PR_L)){
            try {
              pr += Double.parseDouble(value.getData());
            } catch (NumberFormatException e){
              e.printStackTrace();
            }
          }
          else{
            String[] segment = value.getData().split(" ", 3);
            outPage += segment[2];
            lastPr = Double.parseDouble(segment[1]);
          }
        }
        int totalPage = context.getConfiguration().getInt("1", 0);
        pr = 0.85 * pr + (1 - 0.85) / totalPage;
        if(Math.abs(pr - lastPr) < PageRankRunner.DELTA){
          context.getCounter(PageRankRunner.GROUP_NAME, PageRankRunner.COUNTER_NAME).increment(1L);
        }
        String out = key.toString() + " " + String.valueOf(pr) + " " + outPage;
        context.write(new Text(out), NullWritable.get());
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
  • 相关阅读:
    得物极光蓝纸箱尺寸设计实践
    音乐转录(AMT)库Omnizart论文笔记及实践
    LeetCode只出现一次的数字
    ConcurrentLinkedQueue解析
    基于ssm(非maven)学生考勤管理系统
    集群-Nacos-2.2.3、Nginx-1.24.0集群配置
    ubuntu无法virtualenv创建python虚拟环境的解决
    promise实现koa2洋葱中间件模型
    Sentinel源码剖析之常用限流算法原理实现
    MySQL常用指令
  • 原文地址:https://blog.csdn.net/weixin_45975575/article/details/125447257