Notable Changes: crypto: * (SEMVER-MAJOR) use WebIDL converters in WebCryptoAPI (Filip Skokan) https://github.com/nodejs/node/pull/46067 deps: * update ada to 2.0.0 (Node.js GitHub Bot) https://github.com/nodejs/node/pull/47339 esm: * move hook execution to separate thread (Jacob Smith) https://github.com/nodejs/node/pull/44710 sea: * use JSON configuration and blob content for SEA (Joyee Cheung) https://github.com/nodejs/node/pull/47125 src,process: * (SEMVER-MINOR) add permission model (Rafael Gonzaga) https://github.com/nodejs/node/pull/44004 url: * drop ICU requirement for parsing hostnames (Yagiz Nizipli) https://github.com/nodejs/node/pull/47339 * use ada::url_aggregator for parsing urls (Yagiz Nizipli) https://github.com/nodejs/node/pull/47339 * (SEMVER-MAJOR) runtime-deprecate url.parse() with invalid ports (Rich Trott) https://github.com/nodejs/node/pull/45526 Semver-Major Commits: * [9fafb0a090
] - (SEMVER-MAJOR) async_hooks: deprecate the AsyncResource.bind asyncResource property (James M Snell) https://github.com/nodejs/node/pull/46432 * [1948d37595
] - (SEMVER-MAJOR) buffer: check INSPECT_MAX_BYTES with validateNumber (Umuoy) https://github.com/nodejs/node/pull/46599 * [7bc0e6a4e7
] - (SEMVER-MAJOR) buffer: graduate File from experimental and expose as global (Khafra) https://github.com/nodejs/node/pull/47153 * [671ffd7825
] - (SEMVER-MAJOR) buffer: use min/max of `validateNumber` (Deokjin Kim) https://github.com/nodejs/node/pull/45796 * [ab1614d280
] - (SEMVER-MAJOR) build: reset embedder string to "-node.0" (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [c1bcdbcf79
] - (SEMVER-MAJOR) build: warn for gcc versions earlier than 10.1 (Richard Lau) https://github.com/nodejs/node/pull/46806 * [649f68fc1e
] - (SEMVER-MAJOR) build: reset embedder string to "-node.0" (Yagiz Nizipli) https://github.com/nodejs/node/pull/45579 * [9374700d7a
] - (SEMVER-MAJOR) crypto: remove DEFAULT_ENCODING (Tobias Nießen) https://github.com/nodejs/node/pull/47182 * [1640aeb680
] - (SEMVER-MAJOR) crypto: remove obsolete SSL_OP_* constants (Tobias Nießen) https://github.com/nodejs/node/pull/47073 * [c2e4b1fa9a
] - (SEMVER-MAJOR) crypto: remove ALPN_ENABLED (Tobias Nießen) https://github.com/nodejs/node/pull/47028 * [3ef38c4bd7
] - (SEMVER-MAJOR) crypto: use WebIDL converters in WebCryptoAPI (Filip Skokan) https://github.com/nodejs/node/pull/46067 * [08af023b1f
] - (SEMVER-MAJOR) crypto: runtime deprecate replaced rsa-pss keygen parameters (Filip Skokan) https://github.com/nodejs/node/pull/45653 * [7eb0ac3cb6
] - (SEMVER-MAJOR) deps: patch V8 to support compilation on win-arm64 (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [a7c129f286
] - (SEMVER-MAJOR) deps: silence irrelevant V8 warning (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [6f5655a18e
] - (SEMVER-MAJOR) deps: always define V8_EXPORT_PRIVATE as no-op (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [f226350fcb
] - (SEMVER-MAJOR) deps: update V8 to 11.3.244.4 (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [d6dae7420e
] - (SEMVER-MAJOR) deps: V8: cherry-pick f1c888e7093e (Michaël Zasso) https://github.com/nodejs/node/pull/45579 * [56c436533e
] - (SEMVER-MAJOR) deps: fix V8 build on Windows with MSVC (Michaël Zasso) https://github.com/nodejs/node/pull/45579 * [51ab98c71b
] - (SEMVER-MAJOR) deps: silence irrelevant V8 warning (Michaël Zasso) https://github.com/nodejs/node/pull/45579 * [9f84d3eea8
] - (SEMVER-MAJOR) deps: V8: fix v8-cppgc.h for MSVC (Jiawen Geng) https://github.com/nodejs/node/pull/45579 * [f2318cd4b5
] - (SEMVER-MAJOR) deps: fix V8 build issue with inline methods (Jiawen Geng) https://github.com/nodejs/node/pull/45579 * [16e03e7968
] - (SEMVER-MAJOR) deps: update V8 to 10.9.194.4 (Yagiz Nizipli) https://github.com/nodejs/node/pull/45579 * [6473f5e7f7
] - (SEMVER-MAJOR) doc: update toolchains used for Node.js 20 releases (Richard Lau) https://github.com/nodejs/node/pull/47352 * [cc18fd9608
] - (SEMVER-MAJOR) events: refactor to use `validateNumber` (Deokjin Kim) https://github.com/nodejs/node/pull/45770 * [ff92b40ffc
] - (SEMVER-MAJOR) http: close the connection after sending a body without declared length (Tim Perry) https://github.com/nodejs/node/pull/46333 * [2a29df6464
] - (SEMVER-MAJOR) http: keep HTTP/1.1 conns alive even if the Connection header is removed (Tim Perry) https://github.com/nodejs/node/pull/46331 * [391dc74a10
] - (SEMVER-MAJOR) http: throw error if options of http.Server is array (Deokjin Kim) https://github.com/nodejs/node/pull/46283 * [ed3604cd64
] - (SEMVER-MAJOR) http: server check Host header, to meet RFC 7230 5.4 requirement (wwwzbwcom) https://github.com/nodejs/node/pull/45597 * [88d71dc301
] - (SEMVER-MAJOR) lib: refactor to use min/max of `validateNumber` (Deokjin Kim) https://github.com/nodejs/node/pull/45772 * [e4d641f02a
] - (SEMVER-MAJOR) lib: refactor to use validators in http2 (Debadree Chatterjee) https://github.com/nodejs/node/pull/46174 * [0f3e531096
] - (SEMVER-MAJOR) lib: performance improvement on readline async iterator (Thiago Oliveira Santos) https://github.com/nodejs/node/pull/41276 * [5b5898ac86
] - (SEMVER-MAJOR) lib,src: update exit codes as per todos (Debadree Chatterjee) https://github.com/nodejs/node/pull/45841 * [55321bafd1
] - (SEMVER-MAJOR) net: enable autoSelectFamily by default (Paolo Insogna) https://github.com/nodejs/node/pull/46790 * [2d0d99733b
] - (SEMVER-MAJOR) process: remove `process.exit()`, `process.exitCode` coercion to integer (Daeyeon Jeong) https://github.com/nodejs/node/pull/43716 * [dc06df31b6
] - (SEMVER-MAJOR) readline: refactor to use `validateNumber` (Deokjin Kim) https://github.com/nodejs/node/pull/45801 * [295b2f3ff4
] - (SEMVER-MAJOR) src: update NODE_MODULE_VERSION to 115 (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [3803b028dd
] - (SEMVER-MAJOR) src: share common code paths for SEA and embedder script (Anna Henningsen) https://github.com/nodejs/node/pull/46825 * [e8bddac3e9
] - (SEMVER-MAJOR) src: apply ABI-breaking API simplifications (Anna Henningsen) https://github.com/nodejs/node/pull/46705 * [f84de0ad4c
] - (SEMVER-MAJOR) src: use uint32_t for process initialization flags enum (Anna Henningsen) https://github.com/nodejs/node/pull/46427 * [a6242772ec
] - (SEMVER-MAJOR) src: fix ArrayBuffer::Detach deprecation (Michaël Zasso) https://github.com/nodejs/node/pull/45579 * [dd5c39a808
] - (SEMVER-MAJOR) src: update NODE_MODULE_VERSION to 112 (Yagiz Nizipli) https://github.com/nodejs/node/pull/45579 * [63eca7fec0
] - (SEMVER-MAJOR) stream: validate readable defaultEncoding (Marco Ippolito) https://github.com/nodejs/node/pull/46430 * [9e7093f416
] - (SEMVER-MAJOR) stream: validate writable defaultEncoding (Marco Ippolito) https://github.com/nodejs/node/pull/46322 * [fb91ee4f26
] - (SEMVER-MAJOR) test: make trace-gc-flag tests less strict (Yagiz Nizipli) https://github.com/nodejs/node/pull/45579 * [eca618071e
] - (SEMVER-MAJOR) test: adapt test-v8-stats for V8 update (Michaël Zasso) https://github.com/nodejs/node/pull/45579 * [c03354d3e0
] - (SEMVER-MAJOR) test: test case for multiple res.writeHead and res.getHeader (Marco Ippolito) https://github.com/nodejs/node/pull/45508 * [c733cc0c7f
] - (SEMVER-MAJOR) test_runner: mark module as stable (Colin Ihrig) https://github.com/nodejs/node/pull/46983 * [7ce223273d
] - (SEMVER-MAJOR) tools: update V8 gypfiles for 11.1 (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [ca4bd3023e
] - (SEMVER-MAJOR) tools: update V8 gypfiles for 11.0 (Michaël Zasso) https://github.com/nodejs/node/pull/47251 * [58b06a269a
] - (SEMVER-MAJOR) tools: update V8 gypfiles (Michaël Zasso) https://github.com/nodejs/node/pull/45579 * [027841c964
] - (SEMVER-MAJOR) url: use private properties for brand check (Yagiz Nizipli) https://github.com/nodejs/node/pull/46904 * [3bed5f11e0
] - (SEMVER-MAJOR) url: runtime-deprecate url.parse() with invalid ports (Rich Trott) https://github.com/nodejs/node/pull/45526 * [7c76fddf25
] - (SEMVER-MAJOR) util,doc: mark parseArgs() as stable (Colin Ihrig) https://github.com/nodejs/node/pull/46718 * [4b52727976
] - (SEMVER-MAJOR) wasi: make version non-optional (Michael Dawson) https://github.com/nodejs/node/pull/47391 Signed-off-by: RafaelGSS <rafael.nunu@hotmail.com> PR-URL: https://github.com/nodejs/node/pull/47441 Signed-off-by: RafaelGSS <rafael.nunu@hotmail.com>
25 KiB
Asynchronous context tracking
Stability: 2 - Stable
Introduction
These classes are used to associate state and propagate it throughout callbacks and promise chains. They allow storing data throughout the lifetime of a web request or any other asynchronous duration. It is similar to thread-local storage in other languages.
The AsyncLocalStorage
and AsyncResource
classes are part of the
node:async_hooks
module:
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');
Class: AsyncLocalStorage
This class creates stores that stay coherent through asynchronous operations.
While you can create your own implementation on top of the node:async_hooks
module, AsyncLocalStorage
should be preferred as it is a performant and memory
safe implementation that involves significant optimizations that are non-obvious
to implement.
The following example uses AsyncLocalStorage
to build a simple logger
that assigns IDs to incoming HTTP requests and includes them in messages
logged within each request.
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
const http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('http://localhost:8080');
http.get('http://localhost:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
Each instance of AsyncLocalStorage
maintains an independent storage context.
Multiple instances can safely exist simultaneously without risk of interfering
with each other's data.
new AsyncLocalStorage()
Creates a new instance of AsyncLocalStorage
. Store is only provided within a
run()
call or after an enterWith()
call.
Static method: AsyncLocalStorage.bind(fn)
Stability: 1 - Experimental
fn
{Function} The function to bind to the current execution context.- Returns: {Function} A new function that calls
fn
within the captured execution context.
Binds the given function to the current execution context.
Static method: AsyncLocalStorage.snapshot()
Stability: 1 - Experimental
- Returns: {Function} A new function with the signature
(fn: (...args) : R, ...args) : R
.
Captures the current execution context and returns a function that accepts a function as an argument. Whenever the returned function is called, it calls the function passed to it within the captured context.
const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result); // returns 123
AsyncLocalStorage.snapshot() can replace the use of AsyncResource for simple async context tracking purposes, for example:
class Foo {
#runInAsyncScope = AsyncLocalStorage.snapshot();
get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}
const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123
asyncLocalStorage.disable()
Stability: 1 - Experimental
Disables the instance of AsyncLocalStorage
. All subsequent calls
to asyncLocalStorage.getStore()
will return undefined
until
asyncLocalStorage.run()
or asyncLocalStorage.enterWith()
is called again.
When calling asyncLocalStorage.disable()
, all current contexts linked to the
instance will be exited.
Calling asyncLocalStorage.disable()
is required before the
asyncLocalStorage
can be garbage collected. This does not apply to stores
provided by the asyncLocalStorage
, as those objects are garbage collected
along with the corresponding async resources.
Use this method when the asyncLocalStorage
is not in use anymore
in the current process.
asyncLocalStorage.getStore()
- Returns: {any}
Returns the current store.
If called outside of an asynchronous context initialized by
calling asyncLocalStorage.run()
or asyncLocalStorage.enterWith()
, it
returns undefined
.
asyncLocalStorage.enterWith(store)
Stability: 1 - Experimental
store
{any}
Transitions into the context for the remainder of the current synchronous execution and then persists the store through any following asynchronous calls.
Example:
const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
asyncLocalStorage.getStore(); // Returns the same object
});
This transition will continue for the entire synchronous execution.
This means that if, for example, the context is entered within an event
handler subsequent event handlers will also run within that context unless
specifically bound to another context with an AsyncResource
. That is why
run()
should be preferred over enterWith()
unless there are strong reasons
to use the latter method.
const store = { id: 1 };
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
asyncLocalStorage.getStore(); // Returns the same object
});
asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object
asyncLocalStorage.run(store, callback[, ...args])
store
{any}callback
{Function}...args
{any}
Runs a function synchronously within a context and returns its return value. The store is not accessible outside of the callback function. The store is accessible to any asynchronous operations created within the callback.
The optional args
are passed to the callback function.
If the callback function throws an error, the error is thrown by run()
too.
The stacktrace is not impacted by this call and the context is exited.
Example:
const store = { id: 2 };
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore(); // Returns the store object
setTimeout(() => {
asyncLocalStorage.getStore(); // Returns the store object
}, 200);
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns undefined
// The error will be caught here
}
asyncLocalStorage.exit(callback[, ...args])
Stability: 1 - Experimental
callback
{Function}...args
{any}
Runs a function synchronously outside of a context and returns its
return value. The store is not accessible within the callback function or
the asynchronous operations created within the callback. Any getStore()
call done within the callback function will always return undefined
.
The optional args
are passed to the callback function.
If the callback function throws an error, the error is thrown by exit()
too.
The stacktrace is not impacted by this call and the context is re-entered.
Example:
// Within a call to run
try {
asyncLocalStorage.getStore(); // Returns the store object or value
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore(); // Returns undefined
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns the same object or value
// The error will be caught here
}
Usage with async/await
If, within an async function, only one await
call is to run within a context,
the following pattern should be used:
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value);
return foo(); // The return value of foo will be awaited
});
}
In this example, the store is only available in the callback function and the
functions called by foo
. Outside of run
, calling getStore
will return
undefined
.
Troubleshooting: Context loss
In most cases, AsyncLocalStorage
works without issues. In rare situations, the
current store is lost in one of the asynchronous operations.
If your code is callback-based, it is enough to promisify it with
util.promisify()
so it starts working with native promises.
If you need to use a callback-based API or your code assumes
a custom thenable implementation, use the AsyncResource
class
to associate the asynchronous operation with the correct execution context.
Find the function call responsible for the context loss by logging the content
of asyncLocalStorage.getStore()
after the calls you suspect are responsible
for the loss. When the code logs undefined
, the last callback called is
probably responsible for the context loss.
Class: AsyncResource
The class AsyncResource
is designed to be extended by the embedder's async
resources. Using this, users can easily trigger the lifetime events of their
own resources.
The init
hook will trigger when an AsyncResource
is instantiated.
The following is an overview of the AsyncResource
API.
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
new AsyncResource(type[, options])
type
{string} The type of async event.options
{Object}triggerAsyncId
{number} The ID of the execution context that created this async event. Default:executionAsyncId()
.requireManualDestroy
{boolean} If set totrue
, disablesemitDestroy
when the object is garbage collected. This usually does not need to be set (even ifemitDestroy
is called manually), unless the resource'sasyncId
is retrieved and the sensitive API'semitDestroy
is called with it. When set tofalse
, theemitDestroy
call on garbage collection will only take place if there is at least one activedestroy
hook. Default:false
.
Example usage:
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery');
this.db = db;
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data);
});
}
close() {
this.db = null;
this.emitDestroy();
}
}
Static method: AsyncResource.bind(fn[, type[, thisArg]])
fn
{Function} The function to bind to the current execution context.type
{string} An optional name to associate with the underlyingAsyncResource
.thisArg
{any}
Binds the given function to the current execution context.
asyncResource.bind(fn[, thisArg])
fn
{Function} The function to bind to the currentAsyncResource
.thisArg
{any}
Binds the given function to execute to this AsyncResource
's scope.
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
fn
{Function} The function to call in the execution context of this async resource.thisArg
{any} The receiver to be used for the function call....args
{any} Optional arguments to pass to the function.
Call the provided function with the provided arguments in the execution context of the async resource. This will establish the context, trigger the AsyncHooks before callbacks, call the function, trigger the AsyncHooks after callbacks, and then restore the original execution context.
asyncResource.emitDestroy()
- Returns: {AsyncResource} A reference to
asyncResource
.
Call all destroy
hooks. This should only ever be called once. An error will
be thrown if it is called more than once. This must be manually called. If
the resource is left to be collected by the GC then the destroy
hooks will
never be called.
asyncResource.asyncId()
- Returns: {number} The unique
asyncId
assigned to the resource.
asyncResource.triggerAsyncId()
- Returns: {number} The same
triggerAsyncId
that is passed to theAsyncResource
constructor.
Using AsyncResource
for a Worker
thread pool
The following example shows how to use the AsyncResource
class to properly
provide async tracking for a Worker
pool. Other resource pools, such as
database connection pools, can follow a similar model.
Assuming that the task is adding two numbers, using a file named
task_processor.js
with the following content:
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
a Worker pool around it could use the following structure:
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import path from 'node:path';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('task_processor.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
Without the explicit tracking added by the WorkerPoolTaskInfo
objects,
it would appear that the callbacks are associated with the individual Worker
objects. However, the creation of the Worker
s is not associated with the
creation of the tasks and does not provide information about when tasks
were scheduled.
This pool could be used as follows:
import WorkerPool from './worker_pool.js';
import os from 'node:os';
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
const WorkerPool = require('./worker_pool.js');
const os = require('node:os');
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
Integrating AsyncResource
with EventEmitter
Event listeners triggered by an EventEmitter
may be run in a different
execution context than the one that was active when eventEmitter.on()
was
called.
The following example shows how to use the AsyncResource
class to properly
associate an event listener with the correct execution context. The same
approach can be applied to a Stream
or a similar event-driven class.
import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);
const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);