• Dlang 并行化


    Dlang 并行化

    好难受,dlang 生态太差,没办法,学了半天才明白。

    我尽量以精炼的语言解释。

    采用 定义,例子(代码),解释 的步骤讲解。

    所以你可能看到很多代码,一点解释……

    我会省略一些 import,让代码短一些

    parallelism 并行

    感觉好废物,这一小部分了解即可。

    这部分只需要会 parallelmap & amap 其实就差不多了。

    介绍比较实用的几种方法。

    parallel 迭代

    foreach (i; parallel(range, work_uint_size = 100)) {
        // do something here
    }
    

    其中 work_unit_size 表示最多同时运行的数量。

    例子

    import std.stdio, std.parallelism;
    import core.thread;
    
    struct Producer {
        void produce() {
            Thread.sleep(1.seconds);
    
            writeln("Process +1");
        }
    };
    
    void main() {
        auto prods = new Producer[](10);
    
        foreach (prod; parallel(prods)) {
            prod.produce();
        }
    }
    

    Task

    创建任务:

    auto theTask = task!anOperation(arguments);
    // or
    auto theTask = task(&someFunction, parameters...)
    

    运行任务:theTask.executeInNewThread()

    查看是否完成:if (theTask.done) { ... }

    获取结果:auto result = theTask.yeildForce()


    asyncBuf

    感觉没啥用。

    并行保存多个需要长时间制作的元素。还需要保证使用的长时间的……

    例子:

    struct Producer {
        int i, total;
    
        bool empty() const {
            return total <= i;
        }
    
        int front() const {
            return i;
        }
    
        void popFront() {
            writefln("Producing product ID: %d", i);
            Thread.sleep(1.seconds / 2);
            ++i;
        }
    };
    
    void main() {
        auto prods = Producer(0, 10);
        foreach (prod; taskPool.asyncBuf(prods, 3)) {
            writef("Got product id: %d\n", prod);
            Thread.sleep(1.seconds);
            writeln("Used product...");
        }
    }
    

    map & amap

    先看例子:

    int increase(int x) {
        Thread.sleep(500.msecs);
        return x + 3;
    }
    
    void main() {
        int[] nums;
        foreach (i; 0 .. 10) {
            nums ~= i;
        }
    
        // auto results = taskPool.map!increase(nums);
        auto results = taskPool.amap!increase(nums);
        foreach (result; results) {
            writeln(result);
        }
    }
    

    可以类比 python 中的 map

    两者的区别:

    • map 可以指定同时运行的数量,而 amap 是有多少运行多少。

    • map 会一定程度上按顺序执行,而 amap 并不是顺序执行,它依靠 RandomAccessRange,也就是随机顺序执行。


    消息并发

    我不知道怎么翻译,反正就是 Message Passing Concurrency

    核心方法: spawn (唤起)

    我们可以形象的认为,spawn 方法可以唤起一个新的工人(线程)来为我们工作。

    并且这个工人与主线程是分开的(先看代码后面解释):

    import std.stdio;
    import std.concurrency;
    import core.thread;
    void worker() {
        foreach (i; 0 .. 5) {
            Thread.sleep(500.msecs);
            writeln(i, " (worker) in ", thisTid);
    
        }
    
    }
    void main() {
        Tid myWorkerTid = spawn(&worker);
        foreach (i; 0 .. 5) {
            Thread.sleep(300.msecs);
            writeln(i, " (main) in ", thisTid);
    
        }
    
        writeln("main is done!");
    }
    

    最终输出:

    0 (main) in Tid(7f0eb19bc0b0)
    0 (worker) in Tid(7f0eb19bc000)
    1 (main) in Tid(7f0eb19bc0b0)
    2 (main) in Tid(7f0eb19bc0b0)
    1 (worker) in Tid(7f0eb19bc000)
    3 (main) in Tid(7f0eb19bc0b0)
    2 (worker) in Tid(7f0eb19bc000)
    4 (main) in Tid(7f0eb19bc0b0)
    main is done!
    3 (worker) in Tid(7f0eb19bc000)
    4 (worker) in Tid(7f0eb19bc000)
    

    实际输出可能略有差异。

    解释

    • spawn(&worker) 唤起了一个新的线程运行 worker 函数,并返回了新的线程的 id 是一个结构体 Tid

    • thisTid 类似于一个宏,用于获取当前所在线程的 id


    发送消息

    先看代码后解释:

    void worker() {
        int value = 0;
        while (value >= 0) {
            value = receiveOnly!int();
            double result = cast(double)value / 7;
            ownerTid.send(result);
        }
    }
    
    void main() {
        Tid myWorker = spawn(&worker);
    
        foreach (val; 0 .. 10) {
            myWorker.send(val);
            double result = receiveOnly!double();
            writefln("Send %s got %s", val, result);
        }
    
        myWorker.send(-1); // terminate worker process
    }
    

    最终输出:

    Send 0 got 0
    Send 1 got 0.142857
    Send 2 got 0.285714
    Send 3 got 0.428571
    Send 4 got 0.571429
    Send 5 got 0.714286
    Send 6 got 0.857143
    Send 7 got 1
    Send 8 got 1.14286
    Send 9 got 1.28571
    

    解释

    • ownerTid 类似于一个宏,用于取得唤醒自己的线程的 Tid,从而发送消息。

    • Tid.send(...) 可以向 Tid 代表的那个线程发送一条消息。

      • 如果同时要发送多个东西,在发送的地方是 Tid.send(a, b, c, ...)

      • 在接受的地方要变化为 receiveOnly!(typeof(a), typeof(b), typeof(c), ...),最终得到的是一个 tuple,可以通过下标访问。

    • receiveOnly!type() 表示只接受类型为 type 的消息。

    • 最后 myWorker.send(-1) 是根据代码逻辑结束的,并不属于通法。

    如果我们需要更灵活的接受方法怎么办?

    void workerFunc() {
        bool isDone = false;
        while (!isDone) {
            void intHandler(int message) {
                writeln("handling int message: ", message);
    
                if (message == -1) {
                    writeln("exiting");
                    isDone = true;
                }
            }
    
            void stringHandler(string message) {
                writeln("handling string message: ", message);
            }
            
            receive(&intHandler, &stringHandler);
        }    
    }
    

    我们可以指定多种 Handler 以处理不同的数据类型。利用 receive 注册 到处理类型消息的函数中。


    更优雅的方式

    处理更多的类型:

    struct Exit {}
    
    void worker() {
        bool done = false;
    
        while (!done) {
            receive(
                (int message) {
                    writeln("int message ", message);
                },
    
                (string message) {
                    writeln("string message", message);
                },
    
                (Exit message) {
                    writeln("Exit message");
                    done = true;
                },
    
                (Variant message) {
                    writeln("Unexpected message: ", message);
                }
            );
        }
    }
    
    void main() {
        Tid myWorker = spawn(&worker);
    
        myWorker.send(10);
        myWorker.send("hello");
        myWorker.send(10.1);
        myWorker.send(Exit());
    }
    

    主要是使用了匿名函数……

    解释

    • 利用 std.variant.Variant 以接收任何类型的数据。但是需要保证,处理所有类型数据的方法应该放在最后面,不然会导致全部被判断成 Variant

    超时接受

    我们可以定一个超时时间,超过这个时间就直接返回。

    先看代码:

    struct Exit {}
    
    void worker() {
        bool done = false;
    
        while (!done) {
            bool received = receiveTimeout(600.msecs,
                (Exit message) {
                    writeln("Exit message");
                    done = true;
                },
    
                (Variant message) {
                    writeln("Some message: ", message);
                }
            );
            if (!received) {
                writeln("no message yet...");
            }
        }
    }
    
    void main() {
        Tid myWorker = spawn(&worker);
    
        myWorker.send(10);
        myWorker.send("hello");
        Thread.sleep(1.seconds);
        myWorker.send(10.1);
        myWorker.send(Exit());
    }
    

    最终输出

    Some message: 10
    Some message: hello
    no message yet...
    Some message: 10.1
    Exit message
    

    解释

    • receiveTimeout 只比 recieve 多了一个参数,用于指定超时时间。

    • 返回一个 bool 变量,如果为 false 则没有接收到任何消息。


    等待所有线程结束thread_joinAll()

    一般来说放在需要放的地方……即可。


    数据共享

    终于讲到这里了。

    我们先考虑一个程序:

    import std.stdio;
    import std.concurrency;
    import core.thread;
    
    int variable;
    
    void printInfo(string message) {
        writefln("%s: %s (@%s)", message, variable, &variable);
    }
    
    void worker() {
        variable = 42;
        printInfo("Before the worker is terminated");
    }
    
    void main() {
        spawn(&worker);
        thread_joinAll();
        printInfo("After the worker is terminated");
    }
    

    其输出是这样的:

    Before the worker is terminated: 42 (@7F308C88C530)
    After the worker is terminated: 0 (@7F308C98D730)
    

    可以发现,同样的变量在不同的线程里面地址是不一样的,也就是说数据是独立的,所以要有共享。

    此时我们只需要修改:

    shared int variable;
    

    即可。

    实际上写为 shared(int) variable; 会更标准,但是好麻烦……

    当然,不得不说,有了消息传递,那么数据共享就是备用的方案了。


    Data Race

    数据竞争是一个很常见的问题。

    例子

    void worker(shared int* i) {
        foreach (t; 0 .. 200000) {
            *i = *i + 1;
        }
    }
    
    void main() {
        shared int i = 0;
    
        foreach (id; 0 .. 10) {
            spawn(&worker, &i);
        }
    
        thread_joinAll();
        writeln("after i to ", i);
    }
    

    期望输出 2000000,但是实际输出可能远小于此。

    所以我们要考虑同步:

    void worker(shared int* i) {
        foreach (t; 0 .. 200000) {
            synchronized {
                *i = *i + 1;
            }
        }
    }
    

    解释

    • synchronized 会隐式地创建一个锁,保证只有一个线程会持有这个锁,并且执行这些操作。

    • 有些时候,synchronized 会使得因为等待锁的额外开销使得程序变慢。但有些时候,我们可以通过更好的方法避免等待的开销,例如使用原子操作。

    • synchronized 创建的锁只会对于这一个代码块生效,不会影响到其他的代码块。


    共用锁

    void increase(shared int* i) {
        foreach (t; 0 .. 200000) {
            synchronized {
                *i = *i + 1;
            }
        }
    }
    
    void decrese(shared int* i) {
        foreach (t; 0 .. 200000) {
            synchronized {
                *i = *i - 1;
            }
        }
    }
    
    void main() {
        shared int i = 0;
    
        foreach (id; 0 .. 10) {
            if (id & 1) spawn(&increase, &i);
            else spawn(&decrese, &i);
        }
    
        thread_joinAll();
        writeln("after i to ", i);
    }
    

    期望输出 0 但是实际输出……不知道。所以我们需要共用锁:

    synchronized (lock_object) {
        // ...
    }
    

    修改后的代码

    class Lock {}
    shared Lock lock = new Lock();
    
    void increase(shared int* i) {
        foreach (t; 0 .. 200000) {
            synchronized (lock) {
                *i = *i + 1;
            }
        }
    }
    
    void decrese(shared int* i) {
        foreach (t; 0 .. 200000) {
            synchronized (lock) {
                *i = *i - 1;
            }
        }
    }
    

    现在就可以得到正确的答案了。


    同步类

    我们可以使用 synchronized 修饰一个类。这相当于在每一个代码块里面嵌套一个 synchronzied

    synchronized class Cls {
        void func() {
            // ...
        }
    }
    

    上面的等价于:

    class Cls {
        void func() {
            synchronized (this) {
                // ...
            }
        }
    }
    

    同步初始化

    我们考虑这份代码:

    static this() {
        writeln("executing static this()");
    }
    
    void worker() {
    }
    void main() {
        spawn(&worker);
        thread_joinAll();
    }
    

    最终会输出两次 executing static this()

    如果我们修改为 shared static this() { ... },那么最终只会输出一次。


    原子操作

    需要用到 core.atomic 库。

    有代码:

    atomic!"+="(var, x);
    atomic!"-="(var, x);
    // ... like *= /= ^= ...
    

    这些都是原子操作。

    有方法:

    shared(int) *value;
    bool is_mutated = cas(value, currentValue, newValue);
    

    如果返回 true,那么值会改变,否则没有。

    原子操作一般来说快于 synchronized

    同时,原子操作也可以作用于结构体上,这里不作为讲解。

    更多操作可以参考标准库:

    • core.sync.barrier

    • core.sync.condition

    • core.sync.config

    • core.sync.exception

    • core.sync.mutex

    • core.sync.rwmutex

    • core.sync.semaphore

  • 相关阅读:
    华为云云耀云服务器L实例评测|教你如何使用云服务器L实例
    Mysql 按照每小时,每天,每月,每年,不存在数据也显示
    CSS:line-height是什么?,height是什么?
    分割链表[先取next再斩断链表防止断链]
    R16 Type II量化反馈码本的产生
    Kafka源码简要分析
    JavaScript高级,ES6 笔记 第一天
    既然有了量化交易,技术分析还有存在的必要么?有专门收割自动交易系统的策略吗?
    [附源码]Python计算机毕业设计Django绿色生鲜
    .NET 缓存:内存缓存 IMemoryCache、分布式缓存 IDistributedCache(Redis)
  • 原文地址:https://www.cnblogs.com/jeefy/p/17512683.html