• 6.824 lab1


    *** Starting wc test.
    --- wc test: PASS
    *** Starting indexer test.
    --- indexer test: PASS
    *** Starting map parallelism test.
    --- map parallelism test: PASS
    *** Starting reduce parallelism test.
    --- reduce parallelism test: PASS
    *** Starting job count test.
    --- job count test: PASS
    *** Starting early exit test.
    --- early exit test: PASS
    *** Starting crash test.
    --- crash test: PASS
    *** PASSED ALL TESTS
    6.824 Schedule: Spring 2021
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    根据课程规定不能公开代码

    前置知识:

    MapReduce paper

    the way to go

    goroutine,chan,mutex,IO,json,rpc,data race

    其中rpc在project已经有写好的架构了,mutex data race在课程也有提到,json lab提示里有

    不懂的地方多看project已经有的源码

    设计思路:

    coordinator 分发task,记录task完成情况,shuffle

    worker 询问task,map/reduce,报告完成

    coordinator为rpc主节点,worker向其询问task,向其报告完成

    一个文件作为一个task

    shuffle时,按照ihash将key分为nReduce个文件,每个文件为一个task

    按照以上思路应该可以完成除了crash test以外的test。(注意不要同时运行多个test)

    crash test crash.go

    func maybeCrash() {
    	max := big.NewInt(1000)
    	rr, _ := crand.Int(crand.Reader, max)
    	if rr.Int64() < 330 {
    		// crash!
    		os.Exit(1)
    	} else if rr.Int64() < 660 {
    		// delay for a while.
    		maxms := big.NewInt(10 * 1000)
    		ms, _ := crand.Int(crand.Reader, maxms)
    		time.Sleep(time.Duration(ms.Int64()) * time.Millisecond)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在map reduce中运行如上函数,使得有 1/3几率直接退出,1/3几率等到0-10s,1/3几率直接开始

    所以在coordinator分发task时,需考虑将超时task重新分发。

    所以综上二者考虑,我们将超过15s的task视为woker宕机,将task重新发放

    在这里,我使用了一种类似LRU、LFU的机制。设定每个task的生命为15,启动一个线程,每隔1s,将task生命-1,当task生命为0,则task重新分配。

    另外,需要注意的是,如果worker1超过15s,但它可能仍旧存活,worker2此时分配得到相同task,他们可能会抢夺相同命名文件。所以lab提示里有:使用tmp文件。使worker1、work2使用不同文件。由coordinator在记录task完成情况时,将文件名修改

    不过此时,运行crash test还是会出问题,我发现原因在测试脚本中

    ( while [ -e $SOCKNAME -a ! -f mr-done ]
      do
        timeout -k 2s 180s ../mrworker ../../mrapps/crash.so
        sleep 1
      done ) &
    
    ( while [ -e $SOCKNAME -a ! -f mr-done ]
      do
        timeout -k 2s 180s ../mrworker ../../mrapps/crash.so
        sleep 1
      done ) &
    
    while [ -e $SOCKNAME -a ! -f mr-done ]
    do
      timeout -k 2s 180s ../mrworker ../../mrapps/crash.so
      sleep 1
    done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    -e -f检测文件是否存在,-a AND

    这段代码是想让worker数量保持在3个,但是我运行的时候发现,worker数量无法保持在3个。所以我将这部分直接修改为30需根据具体任务数修改

    for i in `seq 1 30` 
    do
    {
    	timeout -k 2s 180s ../mrworker ../../mrapps/crash.so
    }&
    done 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这样修改应该没有改变本意吧

    然后就能通过测试了

    遇到的一些问题

    • 有不懂的地方看源码,看测试脚本,看lab的提示
    • 需写的文件有coordinator.go worker.go rpc.go
    • 中间文件应该以mr-开头,脚本是使用rm -f mr-*指令
    • 不要同时开多个测试
    • os.OpenFile(“mr-tmp-”,os.O_WRONLY|os.O_TRUNC|os.O_CREATE,0777),O_CREATE如果不存在就创建,如果存在就续写;O_TRUNC如果存在就清空。有时只有CREATE会遇到一些问题
    • IO注意是否需要Flush
    • 基本只能用printf调试法
    • 关于go的chan select
    // worker.go
    for{
        go AskForTask(ch1,ch2)
        select {
        case u := <-ch1:
        // ch1为Map
        case u := <-ch2:
        // ch2为Reduce
        }
    }
    
    func AskForTask(ch1 chan *Reply, ch2 chan *Reply) {
    
    	args := Args{}
    
    	reply := Reply{}
    
    	call("Coordinator.Task", &args, &reply)
    	if reply.TaskType == -1 {// 无任务
    
    	} else if reply.TaskType == 0 {// Map
    		ch1 <- &reply
    	} else if reply.TaskType == 2 {// Reduce
    		ch2 <- &reply
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    如果worker没询问到任务,就会一直卡在select中,应改为

    // worker.go
    for{
        go AskForTask(ch1,ch2)
        select {
        case u := <-ch1:
        // ch1为Map
        case u := <-ch2:
        // ch2为Reduce
        default:
        }
        time.Sleep(time.Duration(1) * time.Second)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • 相关阅读:
    基于Nodejs的拼车平台的设计和实现
    重学SpringBoot3-@Import注解的作用
    Redis中最简单的存储类型:String
    AWS动手实验 - 在两层架构中部署 WordPress网站
    利用python的强大函数库,实现波形的小波降噪、带通滤波、时阈分析、FFT波形转换
    005:vue2使用vue-type-writer实现打字机效果
    ubuntu|23 安装Gnome主题
    【Azure 应用服务】App Service 开启了私有终结点(Private Endpoint)模式后,如何来实现公网Git部署呢?
    薪资17K,在字节外包工作是一种什么体验...
    【C++】继承 ⑦ ( 继承中的对象模型分析 | 继承中的构造函数和析构函数 )
  • 原文地址:https://blog.csdn.net/weixin_43627561/article/details/126489951