在new Worker之后,worker就已经立即执行了,这样一来,会出现一个问题,就是在new之后,我们注册'messge'事件时,可能worker就已经执行完了,这样一来我们注册的'message'事件就不会被触发。
比如下代码:
- const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
-
- exports.multiInsert = function (target) {
- if (isMainThread) {
- const max = 12
- const min = 1
- let primes = []
-
- const threadCount = +process.argv[2] || 2
- const threads = new Set()
- console.log(`Running with ${threadCount} threads...`)
- const range = Math.ceil((max - min) / threadCount)
- let start = min
-
- for (let i = 0; i < threadCount - 1; i++) {
- const myStart = start
- threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
- start += range
- }
-
- threads.add(new Worker(__filename, {
- workerData: {
- start,
- range: range + ((max - min + 1) % threadCount)
- }
- }))
-
- setTimeout(function () {
- for (const worker of threads) {
- const workerTmp = worker;//as Worker
- workerTmp.on('error', (err) => {
- throw err
- });
- workerTmp.on('exit', () => {
- threads.delete(worker)
- console.log(`Thread exiting, ${threads.size} running...`)
- if (threads.size === 0) {
- // console.log(primes.join('\n'))
- }
- })
-
- workerTmp.on('message', (msg) => {
- // console.log(" workerTmp.on('message'")
- console.log('onMessage=' + msg);
- primes = primes.concat(msg)
- })
- }
- }, 3000);
-
-
- } else {
- target(workerData.start, workerData.range)
- }
- }
-
- let insert2DB = function (start, range) {
-
- console.log("start=" + start + ",range" + range);
- let arr = [];
- for (let i = 0; i < range; i++) {
- arr[arr.length] = start + i;
- }
- console.log("insert2DB=" + arr.toString());
- parentPort.postMessage(arr)
-
- }
- this.multiInsert(insert2DB);
上述代码中,有两个函数:
multiInsert()一个负责创建线程,并在子线程环境下执行insert2DB()函数。
而insert2DB()函数,就是具体的子线程的执行逻辑(执行任务)。
其中,注意的是:
1. insert2DB()函数的内容是:得出从start值开始,一次加1,放入数组,加够range次后,返回数组。
2. multiInsert中,在创建worker后,注册message事件的代码是被setTimeout包裹的,延时执行3秒。也就是说,创建完worker后,过三秒才注册监听事件。
这时候,运行代码,得到的结果就是:
只有这5行输出。
而在message和exit事件的回调函数中的console.log并没有被触发。
为了更能说明问题,再增加一项实验,见如下代码:
- const {isMainThread, parentPort, Worker, workerData} = require("worker_threads");
-
- exports.multiInsert = function (target) {
- if (isMainThread) {
- const max = 12
- const min = 1
- let primes = []
-
- const threadCount = +process.argv[2] || 2
- const threads = new Set()
- console.log(`Running with ${threadCount} threads...`)
- const range = Math.ceil((max - min) / threadCount)
- let start = min
-
- for (let i = 0; i < threadCount - 1; i++) {
- const myStart = start
- threads.add(new Worker(__filename, {workerData: {start: myStart, range}}))
- start += range
- }
-
- threads.add(new Worker(__filename, {
- workerData: {
- start,
- range: range + ((max - min + 1) % threadCount)
- }
- }))
-
- setTimeout(function () {
- for (const worker of threads) {
- const workerTmp = worker;//as Worker
- workerTmp.on('error', (err) => {
- throw err
- });
- workerTmp.on('exit', () => {
- threads.delete(worker)
- console.log(`Thread exiting, ${threads.size} running...`)
- if (threads.size === 0) {
- // console.log(primes.join('\n'))
- }
- })
-
- workerTmp.on('message', (msg) => {
- // console.log(" workerTmp.on('message'")
- console.log('onMessage=' + msg);
- primes = primes.concat(msg)
- })
- }
- }, 3000);
-
-
- } else {
- target(workerData.start, workerData.range)
- }
- }
-
- let insert2DB = function (start, range) {
- setTimeout(function () {
- console.log("start=" + start + ",range" + range);
- let arr = [];
- for (let i = 0; i < range; i++) {
- arr[arr.length] = start + i;
- }
- console.log("insert2DB=" + arr.toString());
- parentPort.postMessage(arr)
- }, 5000);
- }
- this.multiInsert(insert2DB);
说明:与实例1的代码不同的是,修改了insert2DB函数,把里面的内容也交给了一个setTimeout,设置延时5秒,比注册逻辑部分晚发生2秒。
再次执行,打印出运行结果
(打印效果是,先输出‘Running with 2 threads...’,然后延时一会儿,输出‘start=1,range6
start=7,range6
onMessage=1,2,3,4,5,6
onMessage=7,8,9,10,11,12
insert2DB=1,2,3,4,5,6
insert2DB=7,8,9,10,11,12
Thread exiting, 1 running...
Thread exiting, 0 running...
’)
可以看到,message和exiting事件被触发了。
以上是自己实验的结果,如有错误,请大家指正!