• nodejs worker_threads的事件监听问题


    注册事件的时间有效性

    在new Worker之后,worker就已经立即执行了,这样一来,会出现一个问题,就是在new之后,我们注册'messge'事件时,可能worker就已经执行完了,这样一来我们注册的'message'事件就不会被触发。

    验证实例 1

    比如下代码:

    1. const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
    2. exports.multiInsert = function (target) {
    3. if (isMainThread) {
    4. const max = 12
    5. const min = 1
    6. let primes = []
    7. const threadCount = +process.argv[2] || 2
    8. const threads = new Set()
    9. console.log(`Running with ${threadCount} threads...`)
    10. const range = Math.ceil((max - min) / threadCount)
    11. let start = min
    12. for (let i = 0; i < threadCount - 1; i++) {
    13. const myStart = start
    14. threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
    15. start += range
    16. }
    17. threads.add(new Worker(__filename, {
    18. workerData: {
    19. start,
    20. range: range + ((max - min + 1) % threadCount)
    21. }
    22. }))
    23. setTimeout(function () {
    24. for (const worker of threads) {
    25. const workerTmp = worker;//as Worker
    26. workerTmp.on('error', (err) => {
    27. throw err
    28. });
    29. workerTmp.on('exit', () => {
    30. threads.delete(worker)
    31. console.log(`Thread exiting, ${threads.size} running...`)
    32. if (threads.size === 0) {
    33. // console.log(primes.join('\n'))
    34. }
    35. })
    36. workerTmp.on('message', (msg) => {
    37. // console.log(" workerTmp.on('message'")
    38. console.log('onMessage=' + msg);
    39. primes = primes.concat(msg)
    40. })
    41. }
    42. }, 3000);
    43. } else {
    44. target(workerData.start, workerData.range)
    45. }
    46. }
    47. let insert2DB = function (start, range) {
    48. console.log("start=" + start + ",range" + range);
    49. let arr = [];
    50. for (let i = 0; i < range; i++) {
    51. arr[arr.length] = start + i;
    52. }
    53. console.log("insert2DB=" + arr.toString());
    54. parentPort.postMessage(arr)
    55. }
    56. this.multiInsert(insert2DB);

    上述代码中,有两个函数:

    multiInsert()一个负责创建线程,并在子线程环境下执行insert2DB()函数。

    而insert2DB()函数,就是具体的子线程的执行逻辑(执行任务)。

    其中,注意的是:

    1. insert2DB()函数的内容是:得出从start值开始,一次加1,放入数组,加够range次后,返回数组。

    2. multiInsert中,在创建worker后,注册message事件的代码是被setTimeout包裹的,延时执行3秒。也就是说,创建完worker后,过三秒才注册监听事件。

    这时候,运行代码,得到的结果就是:

    只有这5行输出。 

    而在message和exit事件的回调函数中的console.log并没有被触发。

    为了更能说明问题,再增加一项实验,见如下代码:

    验证实例 2

    1. const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
    2. exports.multiInsert = function (target) {
    3. if (isMainThread) {
    4. const max = 12
    5. const min = 1
    6. let primes = []
    7. const threadCount = +process.argv[2] || 2
    8. const threads = new Set()
    9. console.log(`Running with ${threadCount} threads...`)
    10. const range = Math.ceil((max - min) / threadCount)
    11. let start = min
    12. for (let i = 0; i < threadCount - 1; i++) {
    13. const myStart = start
    14. threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
    15. start += range
    16. }
    17. threads.add(new Worker(__filename, {
    18. workerData: {
    19. start,
    20. range: range + ((max - min + 1) % threadCount)
    21. }
    22. }))
    23. setTimeout(function () {
    24. for (const worker of threads) {
    25. const workerTmp = worker;//as Worker
    26. workerTmp.on('error', (err) => {
    27. throw err
    28. });
    29. workerTmp.on('exit', () => {
    30. threads.delete(worker)
    31. console.log(`Thread exiting, ${threads.size} running...`)
    32. if (threads.size === 0) {
    33. // console.log(primes.join('\n'))
    34. }
    35. })
    36. workerTmp.on('message', (msg) => {
    37. // console.log(" workerTmp.on('message'")
    38. console.log('onMessage=' + msg);
    39. primes = primes.concat(msg)
    40. })
    41. }
    42. }, 3000);
    43. } else {
    44. target(workerData.start, workerData.range)
    45. }
    46. }
    47. let insert2DB = function (start, range) {
    48. setTimeout(function () {
    49. console.log("start=" + start + ",range" + range);
    50. let arr = [];
    51. for (let i = 0; i < range; i++) {
    52. arr[arr.length] = start + i;
    53. }
    54. console.log("insert2DB=" + arr.toString());
    55. parentPort.postMessage(arr)
    56. }, 5000);
    57. }
    58. this.multiInsert(insert2DB);

    说明:与实例1的代码不同的是,修改了insert2DB函数,把里面的内容也交给了一个setTimeout,设置延时5秒,比注册逻辑部分晚发生2秒。

    再次执行,打印出运行结果

    打印效果是,先输出‘Running with 2 threads...’,然后延时一会儿,输出‘start=1,range6
    start=7,range6
    onMessage=1,2,3,4,5,6
    onMessage=7,8,9,10,11,12
    insert2DB=1,2,3,4,5,6
    insert2DB=7,8,9,10,11,12
    Thread exiting, 1 running...
    Thread exiting, 0 running...

    ’)

    可以看到,message和exiting事件被触发了。

    以上是自己实验的结果,如有错误,请大家指正!

  • 相关阅读:
    ip 网段设置 --chatGPT
    葡萄酒质量预测python
    day2:算法之美|打开算法之门与算法复杂性
    js基础知识整理之 —— 求最大值 最小值 平均数的几种方法
    编程扎记01
    B/S网络架构概述
    HTML爱心特效代码
    数据库系统工程师------流水线
    调用okhttp3的案例代码
    微信小程序开发 开启
  • 原文地址:https://blog.csdn.net/jfqqqqq/article/details/126221010