八线程完全指南,进度与线程

2020-03-08 作者:网站首页   |   浏览(188)

时间: 2019-12-30阅读: 45标签: 线程本文首发于政采云前端团队博客:浅析 Node 进程与线程

时间: 2019-03-26阅读: 291标签: 线程

进程与线程是操作系统中两个重要的角色,它们维系着不同程序的执行流程,通过系统内核的调度,完成多任务执行。今天我们从 Node.js(以下简称 Node)的角度来一起学习相关知识,通过本文读者将了解 Node 进程与线程的特点、代码层面的使用以及它们之间的通信。

很多人都想知道单线程的Node.js怎么能与多线程后端竞争。考虑到其所谓的单线程特性,许多大公司选择 Node 作为其后端似乎违反直觉。要想知道原因,必须理解其单线程的真正含义。

概念

JavaScript 的设计非常适合在网上做比较简单的事情,比如验证表单,或者说创建彩虹色的鼠标轨迹。在2009年,Node.js的创始人 Ryan Dahl使开发人员可以用该语言编写后端代码。

首先,我们还是回顾一下相关的定义:

通常支持多线程的后端语言具有各种机制,用于在线程和其他面向线程的功能之间同步数据。要向 JavaScript 添加对此类功能的支持,需要修改整个语言,这不是 Dahl 的目标。为了让纯 JavaScript 支持多线程,他必须想一个变通方法。接下来让我们探索一下其中的奥秘……

进程是一个具有一定独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统进行资源分配和调度的一个独立单位,是应用程序运行的载体。

Node.js 是如何工作的

线程是程序执行中一个单一的顺序控制流,它存在于进程之中,是比进程更小的能独立运行的基本单位。

Node.js 使用两种线程:event loop处理的主线程和worker pool中的几个辅助线程。

早期在单核 CPU 的系统中,为了实现多任务的运行,引入了进程的概念,不同的程序运行在数据与指令相互隔离的进程中,通过时间片轮转调度执行,由于 CPU 时间片切换与执行很快,所以看上去像是在同一时间运行了多个程序。

事件循环是一种机制,它采用回调(函数)并注册它们,准备在将来的某个时刻执行。它与相关的 JavaScript 代码在同一个线程中运行。当 JavaScript 操作阻塞线程时,事件循环也会被阻止。

由于进程切换时需要保存相关硬件现场、进程控制块等信息,所以系统开销较大。为了进一步提高系统吞吐率,在同一进程执行时更充分的利用 CPU 资源,引入了线程的概念。线程是操作系统调度执行的最小单位,它们依附于进程中,共享同一进程中的资源,基本不拥有或者只拥有少量系统资源,切换开销极小。

工作池是一种执行模型,它产生并处理单独的线程,然后同步执行任务,并将结果返回到事件循环。事件循环使用返回的结果执行提供的回调。

单线程?

简而言之,它负责异步 I/O操作 —— 主要是与系统磁盘和网络的交互。它主要由诸如fs(I/O 密集)或crypto(CPU 密集)等模块使用。工作池用libuv实现,当 Node 需要在 JavaScript 和 C 之间进行内部通信时,会导致轻微的延迟,但这几乎不可察觉。

我们常常听到有开发者说 “ Node.js 是单线程的”,那么 Node 确实是只有一个线程在运行吗?

基于这两种机制,我们可以编写如下代码:

首先,在终行以下 Node 代码(示例一):

fs.readFile(path.join(__dirname, './package.json'), (err, content) = { if (err) { return null; } console.log(content.toString());});
# 示例一require('http').createServer((req, res) = { res.writeHead(200); res.end('Hello World');}).listen(8000);console.log('process id', process.pid);

前面提到的fs模块告诉工作池使用其中一个线程来读取文件的内容,并在完成后通知事件循环。然后事件循环获取提供的回调函数,并用文件的内容执行它。

Node 内建模块 http 创建了一个监听 8000 端口的服务,并打印出该服务运行进程的 pid,控制台输出 pid 为 35919(可变),然后我们通过命令top -pid 35919查看进程的详细信息,如下所示:

以上是非阻塞代码的示例,我们不必同步等待某事的发生。只需告诉工作池去读取文件,并用结果去调用提供的函数即可。由于工作池有自己的线程,因此事件循环可以在读取文件时继续正常执行。

PID COMMAND %CPU TIME #TH #WQ #POR MEM PURG CMPRS PGRP PPID STATE BOOSTS %CPU_ME35919 node 0.0 00:00.09 7 0 35 8564K 0B 8548K 35919 35622 sleeping *0[1] 0.00000

在不需要同步执行某些复杂操作时,这一切都相安无事:任何运行时间太长的函数都会阻塞线程。如果应用程序中有大量这类功能,就可能会明显降低服务器的吞吐量,甚至完全冻结它。在这种情况下,无法继续将工作委派给工作池。

我们看到#TH(threads 线程) 这一列显示此进程中包含 7 个线程,说明 Node 进程中并非只有一个线程。事实上一个 Node 进程通常包含:1 个 Javascript 执行主线程;1 个 watchdog 监控线程用于处理调试信息;1 个 v8 task scheduler 线程用于调度任务优先级,加速延迟敏感任务执行;4 个 v8 线程(可参考以下代码),主要用来执行代码调优与 GC 等后台任务;以及用于异步 I / O 的 libuv 线程池。

在需要对数据进行复杂的计算时(如AI、机器学习或大数据)无法真正有效地使用 Node.js,因为操作阻塞了主(且唯一)线程,使服务器无响应。在 Node.js v10.5.0 发布之前就是这种情况,在这一版本增加了对多线程的支持。

// v8 初始化线程const int thread_pool_size = 4; // 默认 4 个线程default_platform = v8::platform::CreateDefaultPlatform(thread_pool_size);V8::InitializePlatform(default_platform);V8::Initialize();

简介:worker_threads

其中异步 I/O 线程池,如果执行程序中不包含 I/O 操作如文件读写等,则默认线程池大小为 0,否则 Node 会初始化大小为 4 的异步 I/O 线程池,当然我们也可以通过process.env.UV_THREADPOOL_SIZE自己设定线程池大小。需要注意的是在 Node 中网络 I/O 并不占用线程池。

worker_threads模块允许我们创建功能齐全的多线程 Node.js 程序。

下图为 Node 的进程结构图:

thread worker 是在单独的线程中生成的一段代码(通常从文件中取出)。

为了验证上述分析,我们运行示例二的代码,加入文件 I/O 操作:

注意,术语thread workerworkerthread经常互换使用,他们都指的是同一件事。

# 示例二require('fs').readFile('./test.log', err = { if (err) { console.log(err); process.exit(); } else { console.log(Date.now(), 'Read File I/O'); }});console.log(process.pid);

要想使用 thread worker,必须导入worker_threads模块。让我们先写一个函数来帮助我们生成这些thread worker,然后再讨论它们的属性。

然后得到如下结果:

type WorkerCallback = (err: any, result?: any) = any;export function runWorker(path: string, cb: WorkerCallback, workerData: object | null = null) { const worker = new Worker(path, { workerData }); worker.on('message', cb.bind(null, null)); worker.on('error', cb); worker.on('exit', (exitCode) = { if (exitCode === 0) { return null; } return cb(new Error(`Worker has stopped with code ${exitCode}`)); }); return worker;}
PID COMMAND %CPU TIME #TH #WQ #POR MEM PURG CMPR PGRP PPID STATE BOOSTS %CPU_ME %CPU_OTHRS39443 node 0.0 00:00.10 11 0 39 8088K 0B 0B 39443 35622 sleeping *0[1] 0.00000 0.00000

要创建一个 worker,首先必须创建一个Worker类的实例。它的第一个参数提供了包含 worker 的代码的文件的路径;第二个参数提供了一个名为workerData的包含一个属性的对象。这是我们希望线程在开始运行时可以访问的数据。

此时#TH一栏的线程数变成了 11,即大小为 4 的 I/O 线程池被创建。至此,我们针对段首的问题心里有了答案,Node 严格意义讲并非只有一个线程,通常说的 “Node 是单线程” 其实是指 JS 的执行主线程只有一个

请注意:不管你是用的是 JavaScript, 还是最终要转换为 JavaScript 的语言(例如,TypeScript),路径应该始终引用带有.js或.mjs扩展名的文件。

事件循环

我还想指出为什么使用回调方法,而不是返回在触发message事件时将解决的 promise。这是因为 worker 可以发送许多message事件,而不是一个。

既然 JS 执行线程只有一个,那么 Node 为什么还能支持较高的并发?

正如你在上面的例子中所看到的,线程间的通信是基于事件的,这意味着我们设置了 worker 在发送给定事件后调用的侦听器。

从上文异步 I/O 我们也能获得一些思路,Node 进程中通过 libuv 实现了一个事件循环机制(uv_event_loop),当执主程发生阻塞事件,如 I/O 操作时,主线程会将耗时的操作放入事件队列中,然后继续执行后续程序。

以下是最常见的事件:

uv_event_loop 尝试从 libuv 的线程池(uv_thread_pool)中取出一个空闲线程去执行队列中的操作,执行完毕获得结果后,通知主线程,主线程执行相关回调,并且将线程实例归还给线程池。通过此模式循环往复,来保证非阻塞 I/O,以及主线程的高效执行。

worker.on('error',(error)={});

相关流程可参照下图:

只要 worker 中有未捕获的异常,就会发出error事件。然后终止 worker,错误可以作为提供的回调中的第一个参数。

子进程

worker.on('exit',(exitCode)={});

通过事件循环机制,Node 实现了在 I/O 密集型(I/O-Sensitive)场景下的高并发,但是如果代码中遇到 CPU 密集场景(CPU-Sensitive)的场景,那么主线程将长时间阻塞,无法处理额外的请求。为了应对 CPU-Sensitive 场景,以及充分发挥 CPU 多核性能,Node 提供了 child_process 模块(官方文档)进行进程的创建、通信、销毁等等。

在 worker 退出时会发出exit事件。如果在worker中调用了process.exit(),那么exitCode将被提供给回调。如果 worker 以worker.terminate()终止,则代码为1。

创建

worker.on('online',()={});

child_process 模块提供了 4 种异步创建 Node 进程的方法,具体可参考 child_process API,这里做一下简要介绍。

只要 worker 停止解析 JavaScript 代码并开始执行,就会发出online事件。它不常用,但在特定情况下可以提供信息。

spawn 以主命令加参数数组的形式创建一个子进程,子进程以流的形式返回 data 和 error 信息。exec 是对 spawn 的封装,可直接传入命令行执行,以 callback 形式返回 error stdout stderr 信息execFile 类似于 exec 函数,但默认不会创建命令行环境,将直接以传入的文件创建新的进程,性能略微优于 execfork 是 spawn 的特殊场景,只能用于创建 node 程序的子进程,默认会建立父子进程的 IPC 信道来传递消息通信

worker.on('message',(data)={});

在 Linux 系统中,可以通过管道、消息队列、信号量、共享内存、Socket 等手段来实现进程通信。在 Node 中,父子进程可通过 IPC(Inter-Process Communication) 信道收发消息,IPC 由 libuv 通过管道 pipe 实现。一旦子进程被创建,并设置父子进程的通信方式为 IPC(参考 stdio 设置),父子进程即可双向通信。

只要 worker 将数据发送到父线程,就会发出message事件。

进程之间通过process.send发送消息,通过监听message事件接收消息。当一个进程发送消息时,会先序列化为字符串,送入 IPC 信道的一端,另一个进程在另一端接收消息内容,并且反序列化,因此我们可以在进程之间传递对象。

现在让我们来看看如何在线程之间共享数据。

示例

在线程之间交换数据

以下是 Node.js 创建进程和通信的一个基础示例,主进程创建一个子进程并将计算斐波那契数列的第 44 项这一 CPU 密集型的任务交给子进程,子进程执行完成后通过 IPC 信道将结果发送给主进程:

要将数据发送到另一个线程,可以用port.postMessage()方法。它的原型如下:

main_process.js

port.postMessage(data[, transferList])
# 主进程const { fork } = require('child_process');const child = fork('./fib.js'); // 创建子进程child.send({ num: 44 }); // 将任务执行数据通过信道发送给子进程child.on('message', message = { console.log('receive from child process, calculate result: ', message.data); child.kill();});child.on('exit', () = { console.log('child process exit');});setInterval(() = { // 主进程继续执行 console.log('continue excute javascript code', new Date().getSeconds());}, 1000);

port 对象可以是parentPort,也可以是MessagePort的实例 —— 稍后会详细讲解。

fib.js

数据参数

# 子进程 fib.js// 接收主进程消息,计算斐波那契数列第 N 项,并发送结果给主进程// 计算斐波那契数列第 n 项function fib(num) { if (num === 0) return 0; if (num === 1) return 1; return fib(num - 2)   fib(num - 1);}process.on('message', msg = { // 获取主进程传递的计算数据 console.log('child pid', process.pid); const { num } = msg; const data = fib(num); process.send({ data }); // 将计算结果发送主进程});// 收到 kill 信息,进程退出process.on('SIGHUP', function() { process.exit();});

第一个参数 —— 这里被称为data—— 是一个被复制到另一个线程的对象。它可以是复制算法所支持的任何内容。

结果:

数据由结构化克隆算法进行复制。引用自 Mozilla:

child pid 39974continue excute javascript code 41continue excute javascript code 42continue excute javascript code 43continue excute javascript code 44receive from child process, calculate result: 1134903170child process exit

它通过递归输入对象来进行克隆,同时保持之前访问过的引用的映射,以避免无限遍历循环。

集群模式

该算法不复制函数、错误、属性描述符或原型链。还需要注意的是,以这种方式复制对象与使用 JSON 不同,因为它可以包含循环引用和类型化数组,而 JSON 不能。

为了更加方便的管理进程、负载均衡以及实现端口复用,Node 在 v0.6 之后引入了 cluster 模块(官方文档),相对于子进程模块,cluster 实现了单 master 主控节点和多 worker 执行节点的通用集群模式。cluster master 节点可以创建销毁进程并与子进程通信,子进程之间不能直接通信;worker 节点则负责执行耗时的任务。

由于能够复制类型化数组,该算法可以在线程之间共享内存。

cluster 模块同时实现了负载均衡调度算法,在类 unix 系统中,cluster 使用轮转调度(round-robin),node 中维护一个可用 worker 节点的队列 free,和一个任务队列 handles。当一个新的任务到来时,节点队列队首节点出队,处理该任务,并返回确认处理标识,依次调度执行。而在 win 系统中,Node 通过 Shared Handle 来处理负载,通过将文件描述符、端口等信息传递给子进程,子进程通过信息创建相应的 SocketHandle / ServerHandle,然后进行相应的端口绑定和监听,处理请求。

在线程之间共享内存

cluster 大大的简化了多进程模型的使用,以下是使用示例:

人们可能会说像cluster或child_process这样的模块在很久以前就开始使用线程了。这话对,也不对。

# 计算斐波那契数列第 43 / 44 项const cluster = require('cluster');// 计算斐波那契数列第 n 项function fib(num) { if (num === 0) return 0; if (num === 1) return 1; return fib(num - 2)   fib(num - 1);}if (cluster.isMaster) { // 主控节点逻辑 for (let i = 43; i  45; i  ) { const worker = cluster.fork() // 启动子进程 // 发送任务数据给执行进程,并监听子进程回传的消息 worker.send({ num: i }); worker.on('message', message = { console.log(`receive fib(${message.num}) calculate result ${message.data}`) worker.kill(); }); } // 监听子进程退出的消息,直到子进程全部退出 cluster.on('exit', worker = { console.log('worker '   worker.process.pid   ' killed!'); if (Object.keys(cluster.workers).length === 0) { console.log('calculate main process end'); } });} else { // 子进程执行逻辑 process.on('message', message = { // 监听主进程发送的信息 const { num } = message; console.log('child pid', process.pid, 'receive num', num); const data = fib(num); process.send({ data, num }); // 将计算结果发送给主进程 })}

cluster模块可以创建多个节点实例,其中一个主进程在它们之间对请求进行路由。集群能够有效地增加服务器的吞吐量;但是我们不能用cluster模块生成一个单独的线程。

工作线程

人们倾向于用 PM2 这样的工具来集中管理他们的程序,而不是在自己的代码中手动执行,如果你有兴趣,可以研究一下如何使用cluster模块。

在 Node v10 以后,为了减小 CPU 密集型任务计算的系统开销,引入了新的特性:工作线程 worker_threads(官方文档)。通过 worker_threads 可以在进程内创建多个线程,主线程与 worker 线程使用 parentPort 通信,worker 线程之间可通过 MessageChannel 直接通信。

child_process模块可以生成任何可执行文件,无论它是否是用 JavaScript 写的。它和worker_threads非常相似,但缺少后者的几个重要功能。

创建

具体来说 thread workers 更轻量,并且与其父线程共享相同的进程 ID。它们还可以与父线程共享内存,这样可以避免对大的数据负载进行序列化,从而更有效地来回传递数据。

通过 worker_threads 模块中的 Worker 类我们可以通过传入执行文件的路径创建线程。

现在让我们看一下如何在线程之间共享内存。为了共享内存,必须将ArrayBuffer或SharedArrayBuffer的实例作为数据参数发送到另一个线程。

const { Worker } = require('worker_threads');...const worker = new Worker(filepath);

这是一个与其父线程共享内存的 worker:

通信使用 parentPort 进行父子线程通信

import { parentPort } from 'worker_threads';parentPort.on('message', () = { const numberOfElements = 100; const sharedBuffer = new SharedArrayBuffer(Int32Array.BYTES_PER_ELEMENT * numberOfElements); const arr = new Int32Array(sharedBuffer); for (let i = 0; i  numberOfElements; i  = 1) { arr[i] = Math.round(Math.random() * 30); } parentPort.postMessage({ arr });});

worker_threads 中使用了 MessagePort(继承于 EventEmitter,参考)来实现线程通信。worker 线程实例上有 parentPort 属性,是 MessagePort 类型的一个实例,子线程可利用 postMessage 通过 parentPort 向父线程传递数据,示例如下:

首先,我们创建一个SharedArrayBuffer,其内存需要包含100个32位整数。接下来创建一个Int32Array实例,它将用缓冲区来保存其结构,然后用一些随机数填充数组并将其发送到父线程。

const { Worker, isMainThread, parentPort } = require('worker_threads');// 计算斐波那契数列第 n 项function fib(num) { if (num === 0) return 0; if (num === 1) return 1; return fib(num - 2)   fib(num - 1);}if (isMainThread) { // 主线程执行函数 const worker = new Worker(__filename); worker.once('message', (message) = { const { num, result } = message; console.log(`Fibonacci(${num}) is ${result}`); process.exit(); }); worker.postMessage(43); console.log('start calculate Fibonacci'); // 继续执行后续的计算程序 setInterval(() = { console.log(`continue execute code ${new Date().getSeconds()}`); }, 1000);} else { // 子线程执行函数 parentPort.once('message', (message) = { const num = message; const result = fib(num); // 子线程执行完毕,发消息给父线程 parentPort.postMessage({ num, result }); });}

在父线程中:

结果:

import path from 'path';import { runWorker } from '../run-worker';const worker = runWorker(path.join(__dirname, 'worker.js'), (err, { arr }) = { if (err) { return null; } arr[0] = 5;});worker.postMessage({});
start calculate Fibonaccicontinue execute code 8continue execute code 9continue execute code 10continue execute code 11Fibonacci(43) is 433494437

把arr [0]的值改为5,实际上会在两个线程中修改它。

使用 MessageChannel 实现线程间通信

当然,通过共享内存,我们冒险在一个线程中修改一个值,同时也在另一个线程中进行了修改。但是我们在这个过程中也得到了一个好处:该值不需要进行序列化就可以另一个线程中使用,这极大地提高了效率。只需记住管理数据正确的引用,以便在完成数据处理后对其进行垃圾回收。

worker_threads 还可以支持线程间的直接通信,通过两个连接在一起的 MessagePort 端口,worker_threads 实现了双向通信的 MessageChannel。线程间可通过 postMessage 相互通信,示例如下:

共享一个整数数组固然很好,但我们真正感兴趣的是共享对象 —— 这是存储信息的默认方式。不幸的是,没有SharedObjectBuffer或类似的东西,但我们可以自己创建一个类似的结构。

const { isMainThread, parentPort, threadId, MessageChannel, Worker} = require('worker_threads'); if (isMainThread) { const worker1 = new Worker(__filename); const worker2 = new Worker(__filename); // 创建通信信道,包含 port1 / port2 两个端口 const subChannel = new MessageChannel(); // 两个子线程绑定各自信道的通信入口 worker1.postMessage({ port: subChannel.port1 }, [ subChannel.port1 ]); worker2.postMessage({ port: subChannel.port2 }, [ subChannel.port2 ]);} else { parentPort.once('message', value = { value.port.postMessage(`Hi, I am thread${threadId}`); value.port.on('message', msg = { console.log(`thread${threadId} receive: ${msg}`); }); });}

transferList参数

结果:

transferList中只能包含ArrayBuffer和MessagePort。一旦它们被传送到另一个线程,就不能再次被传送了;因为内存里的内容已经被移动到了另一个线程。

thread2 receive: Hi, I am thread1thread1 receive: Hi, I am thread2

目前,还不能通过transferList(可以使用child_process模块)来传输网络套接字。

注意

创建通信渠道

worker_threads 只适用于进程内部 CPU 计算密集型的场景,而不适合于 I/O 密集场景,针对后者,官方建议使用进程的 event_loop 机制,将会更加高效可靠。

线程之间的通信是通过 port 进行的,port 是MessagePort类的实例,并启用基于事件的通信。

总结

使用 port 在线程之间进行通信的方法有两种。第一个是默认值,这个方法比较容易。在 worker 的代码中,我们从worker_threads模块导入一个名为parentPort的对象,并使用对象的.postMessage()方法将消息发送到父线程。

Node.js 本身设计为单线程执行语言,通过 libuv 的线程池实现了高效的非阻塞异步 I/O,保证语言简单的特性,尽量减少编程复杂度。但是也带来了在多核应用以及 CPU 密集场景下的劣势,为了补齐这块短板,Node 可通过内建模块 child_process 创建额外的子进程来发挥多核的能力,以及在不阻塞主进程的前提下处理 CPU 密集任务。

这是一个例子:

为了简化开发者使用多进程模型以及端口复用,Node 又提供了 cluster 模块实现主-从节点模式的进程管理以及负载调度。由于进程创建、销毁、切换时系统开销较大,worker_threads 模块又随之推出,在保持轻量的前提下,可以利用更少的系统资源高效地处理 进程内 CPU 密集型任务,如数学计算、加解密,进一步提高进程的吞吐率。因篇幅有限,本次分享到此为止,诸多细节期待与大家相互探讨,共同钻研。

import { parentPort } from 'worker_threads';const data = { // ...};parentPort.postMessage(data);

parentPort是 Node.js 在幕后创建的MessagePort实例,用于与父线程进行通信。这样就可以用parentPort和worker对象在线程之间进行通信。

线程间的第二种通信方式是创建一个MessageChannel并将其发送给 worker。以下代码是如何创建一个新的MessagePort并与我们的 worker 共享它:

import path from 'path';import { Worker, MessageChannel } from 'worker_threads';const worker = new Worker(path.join(__dirname, 'worker.js'));const { port1, port2 } = new MessageChannel();port1.on('message', (message) = { console.log('message from worker:', message);});worker.postMessage({ port: port2 }, [port2]);

在创建port1和port2之后,我们在port1上设置事件监听器并将port2发送给 worker。我们必须将它包含在transferList中,以便将其传输给 worker 。

在 worker 内部:

import { parentPort, MessagePort } from 'worker_threads';parentPort.on('message', (data) = { const { port }: { port: MessagePort } = data; port.postMessage('heres your message!');});

这样,我们就能使用父线程发送的 port 了。

使用parentPort不一定是错误的方法,但最好用MessageChannel的实例创建一个新的MessagePort,然后与生成的 worker 共享它。

请注意,在后面的例子中,为了简便起见,我用了parentPort。

使用 worker 的两种方式

可以通过两种方式使用 worker。第一种是生成一个 worker,然后执行它的代码,并将结果发送到父线程。通过这种方法,每当出现新任务时,都必须重新创建一个工作者。

第二种方法是生成一个 worker 并为message事件设置监听器。每次触发message时,它都会完成工作并将结果发送回父线程,这会使 worker 保持活动状态以供以后使用。

Node.js 文档推荐第二种方法,因为在创建 thread worker 时需要创建虚拟机并解析和执行代码,这会产生比较大的开销。所以这种方法比不断产生新 worker 的效率更高。

这种方法被称为工作池,因为我们创建了一个工作池并让它们等待,在需要时调度message事件来完成工作。

以下是一个产生、执行然后关闭 worker 例子:

import { parentPort } from 'worker_threads';const collection = [];for (let i = 0; i  10; i  = 1) { collection[i] = i;}parentPort.postMessage(collection);

将collection发送到父线程后,它就会退出。

下面是一个 worker 的例子,它可以在给定任务之前等待很长一段时间:

import { parentPort } from 'worker_threads';parentPort.on('message', (data: any) = { const result = doSomething(data); parentPort.postMessage(result);});

worker_threads 模块中可用的重要属性

worker_threads模块中有一些可用的属性:

isMainThread

当不在工作线程内操作时,该属性为true。如果你觉得有必要,可以在 worker 文件的开头包含一个简单的if语句,以确保它只作为 worker 运行。

import { isMainThread } from 'worker_threads';if (isMainThread) { throw new Error('Its not a worker');}

workerData

产生线程时包含在 worker 的构造函数中的数据。

constworker =newWorker(path, { workerData });

在工作线程中:

import { workerData } from 'worker_threads';console.log(workerData.property);

parentPort

前面提到的MessagePort实例,用于与父线程通信。

threadId

分配给 worker 的唯一标识符。

现在我们知道了技术细节,接下来实现一些东西并在实践中检验学到的知识。

实现setTimeout

setTimeout是一个无限循环,顾名思义,用来检测程序运行时间是否超时。它在循环中检查起始时间与给定毫秒数之和是否小于实际日期。

import { parentPort, workerData } from 'worker_threads';const time = Date.now();while (true) { if (time   workerData.time = Date.now()) { parentPort.postMessage({}); break; }}

这个特定的实现产生一个线程,然后执行它的代码,最后在完成后退出。

接下来实现使用这个 worker 的代码。首先创建一个状态,用它来跟踪生成的 worker:

consttimeoutState: { [key:string]: Worker } = {};

然后时负责创建 worker 并将其保存到状态的函数:

export function setTimeout(callback: (err: any) = any, time: number) { const id = uuidv4(); const worker = runWorker( path.join(__dirname, './timeout-worker.js'), (err) = { if (!timeoutState[id]) { return null; } timeoutState[id] = null; if (err) { return callback(err); } callback(null); }, { time, }, ); timeoutState[id] = worker; return id;}

首先,我们使用 UUID 包为 worker 创建一个唯一的标识符,然后用先前定义的函数runWorker来获取 worker。我们还向 worker 传入一个回调函数,一旦 worker 发送了数据就会被触发。最后,把 worker 保存在状态中并返回id。

在回调函数中,我们必须检查该 worker 是否仍然存在于该状态中,因为有可能会cancelTimeout(),这将会把它删除。如果确实存在,就把它从状态中删除,并调用传给setTimeout函数的callback。

cancelTimeout函数使用.terminate()方法强制 worker 退出,并从该状态中删除该这个worker:

export function cancelTimeout(id: string) { if (timeoutState[id]) { timeoutState[id].terminate(); timeoutState[id] = undefined; return true; } return false;}

如果你有兴趣,我也实现了setInterval,代码在这里,但因为它对线程什么都没做(我们重用setTimeout的代码),所以我决定不在这里进行解释。

我已经创建了一个短小的测试代码,目的是检查这种方法与原生方法的不同之处。你可以在这里找到代码。这些是结果:

native setTimeout { ms: 7004, averageCPUCost: 0.1416 }worker setTimeout { ms: 7046, averageCPUCost: 0.308 }

我们可以看到setTimeout有一点延迟 - 大约40ms - 这时 worker 被创建时的消耗。平均 CPU 成本也略高,但没什么难以忍受的(CPU 成本是整个过程持续时间内 CPU 使用率的平均值)。

如果我们可以重用 worker,就能够降低延迟和 CPU 使用率,这就是要实现工作池的原因。

实现工作池

如上所述,工作池是给定数量的被事先创建的 worker,他们保持空闲并监听message事件。一旦message事件被触发,他们就会开始工作并发回结果。

为了更好地描述我们将要做的事情,下面我们来创建一个由八个 thread worker 组成的工作池:

constpool =newWorkerPool(path.join(__dirname,'./test-worker.js'),8);

如果你熟悉限制并发操作,那么你在这里看到的逻辑几乎相同,只是一个不同的用例。

如上面的代码片段所示,我们把指向 worker 的路径和要生成的 worker 数量传给了WorkerPool的构造函数。

export class WorkerPoolT, N { private queue: QueueItemT, N[] = []; private workersById: { [key: number]: Worker } = {}; private activeWorkersById: { [key: number]: boolean } = {}; public constructor(public workerPath: string, public numberOfThreads: number) { this.init(); }}

这里还有其他一些属性,如workersById和activeWorkersById,我们可以分别保存现有的 worker 和当前正在运行的 worker 的 ID。还有queue,我们可以使用以下结构来保存对象:

type QueueCallbackN = (err: any, result?: N) = void;interface QueueItemT, N { callback: QueueCallbackN; getData: () = T;}

callback只是默认的节点回调,第一个参数是错误,第二个参数是可能的结果。getData是传递给工作池.run()方法的函数(如下所述),一旦项目开始处理就会被调用。getData函数返回的数据将传给工作线程。

在.init()方法中,我们创建了 worker 并将它们保存在以下状态中:

private init() { if (this.numberOfThreads  1) { return null; } for (let i = 0; i  this.numberOfThreads; i  = 1) { const worker = new Worker(this.workerPath); this.workersById[i] = worker; this.activeWorkersById[i] = false; }}

为避免无限循环,我们首先要确保线程数 1。然后创建有效的 worker 数,并将它们的索引保存在workersById状态。我们在activeWorkersById状态中保存了它们当前是否正在运行的信息,默认情况下该状态始终为false。

现在我们必须实现前面提到的.run()方法来设置一个 worker 可用的任务。

public run(getData: () = T) { return new PromiseN((resolve, reject) = { const availableWorkerId = this.getInactiveWorkerId(); const queueItem: QueueItemT, N = { getData, callback: (error, result) = { if (error) { return reject(error); }return resolve(result); }, }; if (availableWorkerId === -1) { this.queue.push(queueItem); return null; } this.runWorker(availableWorkerId, queueItem); });}

在 promise 函数里,我们首先通过调用.getInactiveWorkerId()来检查是否存在空闲的 worker 可以来处理数据:

private getInactiveWorkerId(): number { for (let i = 0; i  this.numberOfThreads; i  = 1) { if (!this.activeWorkersById[i]) { return i; } } return -1;}

接下来,我们创建一个queueItem,在其中保存传递给.run()方法的getData函数以及回调。在回调中,我们要么resolve或者rejectpromise,这取决于 worker 是否将错误传递给回调。

如果availableWorkerId的值是 -1,意味着当前没有可用的 worker,我们将queueItem添加到queue。如果有可用的 worker,则调用.runWorker()方法来执行 worker。

在.runWorker()方法中,我们必须把当前 worker 的activeWorkersById设置为使用状态;为message和error事件设置事件监听器(并在之后清理它们);最后将数据发送给 worker。

private async runWorker(workerId: number, queueItem: QueueItemT, N) { const worker = this.workersById[workerId]; this.activeWorkersById[workerId] = true; const messageCallback = (result: N) = { queueItem.callback(null, result); cleanUp(); }; const errorCallback = (error: any) = { queueItem.callback(error); cleanUp(); }; const cleanUp = () = { worker.removeAllListeners('message'); worker.removeAllListeners('error'); this.activeWorkersById[workerId] = false; if (!this.queue.length) { return null; } this.runWorker(workerId, this.queue.shift()); }; worker.once('message', messageCallback); worker.once('error', errorCallback); worker.postMessage(await queueItem.getData());}

首先,通过使用传递的workerId,我们从workersById中获得 worker 引用。然后,在activeWorkersById中,将[workerId]属性设置为true,这样我们就能知道在 worker 在忙,不要运行其他任务。

接下来,分别创建messageCallback和errorCallback用来在消息和错误事件上调用,然后注册所述函数来监听事件并将数据发送给 worker。

在回调中,我们调用queueItem的回调,然后调用cleanUp函数。在cleanUp函数中,要删除事件侦听器,因为我们会多次重用同一个 worker。如果没有删除监听器的话就会发生内存泄漏,内存会被慢慢耗尽。

在activeWorkersById状态中,我们将[workerId]属性设置为false,并检查队列是否为空。如果不是,就从queue中删除第一个项目,并用另一个queueItem再次调用 worker。

接着创建一个在收到message事件中的数据后进行一些计算的 worker:

import { isMainThread, parentPort } from 'worker_threads';if (isMainThread) { throw new Error('Its not a worker');}const doCalcs = (data: any) = { const collection = []; for (let i = 0; i  1000000; i  = 1) { collection[i] = Math.round(Math.random() * 100000); } return collection.sort((a, b) = { if (a  b) { return 1; } return -1; });};parentPort.on('message', (data: any) = { const result = doCalcs(data); parentPort.postMessage(result);});

worker 创建了一个包含 100 万个随机数的数组,然后对它们进行排序。只要能够多花费一些时间才能完成,做些什么事情并不重要。

以下是工作池简单用法的示例:

const pool = new WorkerPool{ i: number }, number(path.join(__dirname, './test-worker.js'), 8);const items = [...new Array(100)].fill(null);Promise.all( items.map(async (_, i) = { await pool.run(() = ({ i })); console.log('finished', i); }),).then(() = { console.log('finished all');});

首先创建一个由八个 worker 组成的工作池。然后创建一个包含 100 个元素的数组,对于每个元素,我们在工作池中运行一个任务。开始运行后将立即执行八个任务,其余任务被放入队列并逐个执行。通过使用工作池,我们不必每次都创建一个 worker,从而大大提高了效率。

结论

worker_threads提供了一种为程序添加多线程支持的简单的方法。通过将繁重的 CPU 计算委托给其他线程,可以显着提高服务器的吞吐量。通过官方线程支持,我们可以期待更多来自AI、机器学习和大数据等领域的开发人员和工程师使用 Node.js.

本文首发微信公众号:jingchengyideng

翻译:疯狂的技术宅原文:-...

本文由yzc216亚洲城发布于网站首页,转载请注明出处:八线程完全指南,进度与线程

关键词: yzc216亚洲城