0
0
mirror of https://github.com/nodejs/node.git synced 2024-11-21 13:09:21 +01:00

lib: rewrite AsyncLocalStorage without async_hooks

PR-URL: https://github.com/nodejs/node/pull/48528
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com>
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
Reviewed-By: Santiago Gimeno <santiago.gimeno@gmail.com>
Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
This commit is contained in:
Stephen Belanger 2024-08-02 12:44:20 -07:00 committed by GitHub
parent 0c1877a82a
commit d1229eeca4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 658 additions and 173 deletions

View File

@ -38,9 +38,11 @@ function runInAsyncScopes(resourceCount, cb, i = 0) {
function main({ n, resourceCount }) {
const store = new AsyncLocalStorage();
runInAsyncScopes(resourceCount, () => {
bench.start();
runBenchmark(store, n);
bench.end(n);
store.run({}, () => {
runInAsyncScopes(resourceCount, () => {
bench.start();
runBenchmark(store, n);
bench.end(n);
});
});
}

View File

@ -14,7 +14,7 @@ const { AsyncLocalStorage } = require('async_hooks');
* - AsyncLocalStorage1.getStore()
*/
const bench = common.createBenchmark(main, {
sotrageCount: [1, 10, 100],
storageCount: [1, 10, 100],
n: [1e4],
});
@ -34,8 +34,8 @@ function runStores(stores, value, cb, idx = 0) {
}
}
function main({ n, sotrageCount }) {
const stores = new Array(sotrageCount).fill(0).map(() => new AsyncLocalStorage());
function main({ n, storageCount }) {
const stores = new Array(storageCount).fill(0).map(() => new AsyncLocalStorage());
const contextValue = {};
runStores(stores, contextValue, () => {

View File

@ -886,6 +886,21 @@ and `"` are usable.
It is possible to run code containing inline types by passing
[`--experimental-strip-types`][].
### `--experimental-async-context-frame`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
Enables the use of AsyncLocalStorage backed by AsyncContextFrame rather than
the default implementation which relies on async\_hooks. This new model is
implemented very differently and so could have differences in how context data
flows within the application. As such, it is presently recommended to be sure
your application behaviour is unaffected by this change before using it in
production.
### `--experimental-default-type=type`
<!-- YAML
@ -2893,6 +2908,7 @@ one is included in the list below.
* `--enable-network-family-autoselection`
* `--enable-source-maps`
* `--experimental-abortcontroller`
* `--experimental-async-context-frame`
* `--experimental-default-type`
* `--experimental-detect-module`
* `--experimental-eventsource`

View File

@ -10,7 +10,6 @@ const {
NumberIsSafeInteger,
ObjectDefineProperties,
ObjectFreeze,
ObjectIs,
ReflectApply,
Symbol,
} = primordials;
@ -30,6 +29,8 @@ const {
} = require('internal/validators');
const internal_async_hooks = require('internal/async_hooks');
const AsyncContextFrame = require('internal/async_context_frame');
// Get functions
// For userland AsyncResources, make sure to emit a destroy event when the
// resource gets gced.
@ -158,6 +159,7 @@ function createHook(fns) {
// Embedder API //
const destroyedSymbol = Symbol('destroyed');
const contextFrameSymbol = Symbol('context_frame');
class AsyncResource {
constructor(type, opts = kEmptyObject) {
@ -177,6 +179,8 @@ class AsyncResource {
throw new ERR_INVALID_ASYNC_ID('triggerAsyncId', triggerAsyncId);
}
this[contextFrameSymbol] = AsyncContextFrame.current();
const asyncId = newAsyncId();
this[async_id_symbol] = asyncId;
this[trigger_async_id_symbol] = triggerAsyncId;
@ -201,12 +205,12 @@ class AsyncResource {
const asyncId = this[async_id_symbol];
emitBefore(asyncId, this[trigger_async_id_symbol], this);
const contextFrame = this[contextFrameSymbol];
const prior = AsyncContextFrame.exchange(contextFrame);
try {
const ret =
ReflectApply(fn, thisArg, args);
return ret;
return ReflectApply(fn, thisArg, args);
} finally {
AsyncContextFrame.set(prior);
if (hasAsyncIdStack())
emitAfter(asyncId);
}
@ -270,110 +274,15 @@ class AsyncResource {
}
}
const storageList = [];
const storageHook = createHook({
init(asyncId, type, triggerAsyncId, resource) {
const currentResource = executionAsyncResource();
// Value of currentResource is always a non null object
for (let i = 0; i < storageList.length; ++i) {
storageList[i]._propagate(resource, currentResource, type);
}
},
});
class AsyncLocalStorage {
constructor() {
this.kResourceStore = Symbol('kResourceStore');
this.enabled = false;
}
static bind(fn) {
return AsyncResource.bind(fn);
}
static snapshot() {
return AsyncLocalStorage.bind((cb, ...args) => cb(...args));
}
disable() {
if (this.enabled) {
this.enabled = false;
// If this.enabled, the instance must be in storageList
ArrayPrototypeSplice(storageList,
ArrayPrototypeIndexOf(storageList, this), 1);
if (storageList.length === 0) {
storageHook.disable();
}
}
}
_enable() {
if (!this.enabled) {
this.enabled = true;
ArrayPrototypePush(storageList, this);
storageHook.enable();
}
}
// Propagate the context from a parent resource to a child one
_propagate(resource, triggerResource, type) {
const store = triggerResource[this.kResourceStore];
if (this.enabled) {
resource[this.kResourceStore] = store;
}
}
enterWith(store) {
this._enable();
const resource = executionAsyncResource();
resource[this.kResourceStore] = store;
}
run(store, callback, ...args) {
// Avoid creation of an AsyncResource if store is already active
if (ObjectIs(store, this.getStore())) {
return ReflectApply(callback, null, args);
}
this._enable();
const resource = executionAsyncResource();
const oldStore = resource[this.kResourceStore];
resource[this.kResourceStore] = store;
try {
return ReflectApply(callback, null, args);
} finally {
resource[this.kResourceStore] = oldStore;
}
}
exit(callback, ...args) {
if (!this.enabled) {
return ReflectApply(callback, null, args);
}
this.disable();
try {
return ReflectApply(callback, null, args);
} finally {
this._enable();
}
}
getStore() {
if (this.enabled) {
const resource = executionAsyncResource();
return resource[this.kResourceStore];
}
}
}
// Placing all exports down here because the exported classes won't export
// otherwise.
module.exports = {
// Public API
AsyncLocalStorage,
get AsyncLocalStorage() {
return AsyncContextFrame.enabled ?
require('internal/async_local_storage/native') :
require('internal/async_local_storage/async_hooks');
},
createHook,
executionAsyncId,
triggerAsyncId,

View File

@ -0,0 +1,50 @@
'use strict';
const {
getContinuationPreservedEmbedderData,
setContinuationPreservedEmbedderData,
} = internalBinding('async_context_frame');
let enabled_;
class AsyncContextFrame extends Map {
constructor(store, data) {
super(AsyncContextFrame.current());
this.set(store, data);
}
static get enabled() {
enabled_ ??= require('internal/options')
.getOptionValue('--experimental-async-context-frame');
return enabled_;
}
static current() {
if (this.enabled) {
return getContinuationPreservedEmbedderData();
}
}
static set(frame) {
if (this.enabled) {
setContinuationPreservedEmbedderData(frame);
}
}
static exchange(frame) {
const prior = this.current();
this.set(frame);
return prior;
}
static disable(store) {
const frame = this.current();
frame?.disable(store);
}
disable(store) {
this.delete(store);
}
}
module.exports = AsyncContextFrame;

View File

@ -0,0 +1,117 @@
'use strict';
const {
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
ObjectIs,
ReflectApply,
Symbol,
} = primordials;
const {
AsyncResource,
createHook,
executionAsyncResource,
} = require('async_hooks');
const storageList = [];
const storageHook = createHook({
init(asyncId, type, triggerAsyncId, resource) {
const currentResource = executionAsyncResource();
// Value of currentResource is always a non null object
for (let i = 0; i < storageList.length; ++i) {
storageList[i]._propagate(resource, currentResource, type);
}
},
});
class AsyncLocalStorage {
constructor() {
this.kResourceStore = Symbol('kResourceStore');
this.enabled = false;
}
static bind(fn) {
return AsyncResource.bind(fn);
}
static snapshot() {
return AsyncLocalStorage.bind((cb, ...args) => cb(...args));
}
disable() {
if (this.enabled) {
this.enabled = false;
// If this.enabled, the instance must be in storageList
const index = ArrayPrototypeIndexOf(storageList, this);
ArrayPrototypeSplice(storageList, index, 1);
if (storageList.length === 0) {
storageHook.disable();
}
}
}
_enable() {
if (!this.enabled) {
this.enabled = true;
ArrayPrototypePush(storageList, this);
storageHook.enable();
}
}
// Propagate the context from a parent resource to a child one
_propagate(resource, triggerResource, type) {
const store = triggerResource[this.kResourceStore];
if (this.enabled) {
resource[this.kResourceStore] = store;
}
}
enterWith(store) {
this._enable();
const resource = executionAsyncResource();
resource[this.kResourceStore] = store;
}
run(store, callback, ...args) {
// Avoid creation of an AsyncResource if store is already active
if (ObjectIs(store, this.getStore())) {
return ReflectApply(callback, null, args);
}
this._enable();
const resource = executionAsyncResource();
const oldStore = resource[this.kResourceStore];
resource[this.kResourceStore] = store;
try {
return ReflectApply(callback, null, args);
} finally {
resource[this.kResourceStore] = oldStore;
}
}
exit(callback, ...args) {
if (!this.enabled) {
return ReflectApply(callback, null, args);
}
this.disable();
try {
return ReflectApply(callback, null, args);
} finally {
this._enable();
}
}
getStore() {
if (this.enabled) {
const resource = executionAsyncResource();
return resource[this.kResourceStore];
}
}
}
module.exports = AsyncLocalStorage;

View File

@ -0,0 +1,47 @@
'use strict';
const {
ReflectApply,
} = primordials;
const AsyncContextFrame = require('internal/async_context_frame');
const { AsyncResource } = require('async_hooks');
class AsyncLocalStorage {
static bind(fn) {
return AsyncResource.bind(fn);
}
static snapshot() {
return AsyncLocalStorage.bind((cb, ...args) => cb(...args));
}
disable() {
AsyncContextFrame.disable(this);
}
enterWith(data) {
const frame = new AsyncContextFrame(this, data);
AsyncContextFrame.set(frame);
}
run(data, fn, ...args) {
const prior = AsyncContextFrame.current();
this.enterWith(data);
try {
return ReflectApply(fn, null, args);
} finally {
AsyncContextFrame.set(prior);
}
}
exit(fn, ...args) {
return this.run(undefined, fn, ...args);
}
getStore() {
return AsyncContextFrame.current()?.get(this);
}
}
module.exports = AsyncLocalStorage;

View File

@ -38,6 +38,8 @@ const {
} = require('internal/async_hooks');
const { isErrorStackTraceLimitWritable } = require('internal/errors');
const AsyncContextFrame = require('internal/async_context_frame');
// *Must* match Environment::TickInfo::Fields in src/env.h.
const kHasRejectionToWarn = 1;
@ -260,6 +262,7 @@ function unhandledRejection(promise, reason) {
uid: ++lastPromiseId,
warned: false,
domain: process.domain,
contextFrame: AsyncContextFrame.current(),
});
setHasRejectionToWarn(true);
}
@ -466,9 +469,12 @@ function processPromiseRejections() {
);
}
const { contextFrame } = promiseInfo;
const priorContextFrame = AsyncContextFrame.exchange(contextFrame);
try {
needPop = unhandledRejectionsMode(promise, promiseInfo, promiseAsyncId);
} finally {
AsyncContextFrame.set(priorContextFrame);
needPop &&
promiseAsyncId !== undefined &&
popAsyncContext(promiseAsyncId);

View File

@ -3,6 +3,7 @@
const {
Array,
FunctionPrototypeBind,
Symbol,
} = primordials;
const {
@ -41,6 +42,10 @@ const {
const { AsyncResource } = require('async_hooks');
const AsyncContextFrame = require('internal/async_context_frame');
const async_context_frame = Symbol('asyncContextFrame');
// *Must* match Environment::TickInfo::Fields in src/env.h.
const kHasTickScheduled = 0;
@ -68,6 +73,9 @@ function processTicksAndRejections() {
let tock;
do {
while ((tock = queue.shift()) !== null) {
const priorContextFrame =
AsyncContextFrame.exchange(tock[async_context_frame]);
const asyncId = tock[async_id_symbol];
emitBefore(asyncId, tock[trigger_async_id_symbol], tock);
@ -91,6 +99,8 @@ function processTicksAndRejections() {
}
emitAfter(asyncId);
AsyncContextFrame.set(priorContextFrame);
}
runMicrotasks();
} while (!queue.isEmpty() || processPromiseRejections());
@ -125,6 +135,7 @@ function nextTick(callback) {
const tickObject = {
[async_id_symbol]: asyncId,
[trigger_async_id_symbol]: triggerAsyncId,
[async_context_frame]: AsyncContextFrame.current(),
callback,
args,
};

View File

@ -122,6 +122,10 @@ let debug = require('internal/util/debuglog').debuglog('timer', (fn) => {
debug = fn;
});
const AsyncContextFrame = require('internal/async_context_frame');
const async_context_frame = Symbol('asyncContextFrame');
// *Must* match Environment::ImmediateInfo::Fields in src/env.h.
const kCount = 0;
const kRefCount = 1;
@ -162,6 +166,7 @@ function initAsyncResource(resource, type) {
const asyncId = resource[async_id_symbol] = newAsyncId();
const triggerAsyncId =
resource[trigger_async_id_symbol] = getDefaultTriggerAsyncId();
resource[async_context_frame] = AsyncContextFrame.current();
if (initHooksExist())
emitInit(asyncId, type, triggerAsyncId, resource);
}
@ -494,6 +499,9 @@ function getTimerCallbacks(runNextTicks) {
prevImmediate = immediate;
const priorContextFrame =
AsyncContextFrame.exchange(immediate[async_context_frame]);
const asyncId = immediate[async_id_symbol];
emitBefore(asyncId, immediate[trigger_async_id_symbol], immediate);
@ -513,6 +521,8 @@ function getTimerCallbacks(runNextTicks) {
}
emitAfter(asyncId);
AsyncContextFrame.set(priorContextFrame);
}
if (queue === outstandingQueue)
@ -587,6 +597,9 @@ function getTimerCallbacks(runNextTicks) {
continue;
}
const priorContextFrame =
AsyncContextFrame.exchange(timer[async_context_frame]);
emitBefore(asyncId, timer[trigger_async_id_symbol], timer);
let start;
@ -620,6 +633,8 @@ function getTimerCallbacks(runNextTicks) {
}
emitAfter(asyncId);
AsyncContextFrame.set(priorContextFrame);
}
// If `L.peek(list)` returned nothing, the list was either empty or we have

View File

@ -68,6 +68,7 @@
'src/api/exceptions.cc',
'src/api/hooks.cc',
'src/api/utils.cc',
'src/async_context_frame.cc',
'src/async_wrap.cc',
'src/base_object.cc',
'src/cares_wrap.cc',
@ -185,6 +186,7 @@
'src/aliased_buffer-inl.h',
'src/aliased_struct.h',
'src/aliased_struct-inl.h',
'src/async_context_frame.h',
'src/async_wrap.h',
'src/async_wrap-inl.h',
'src/base_object.h',

View File

@ -1,5 +1,6 @@
#include "node.h"
#include "async_context_frame.h"
#include "env-inl.h"
#include "node.h"
namespace node {
@ -18,36 +19,55 @@ AsyncResource::AsyncResource(Isolate* isolate,
: env_(Environment::GetCurrent(isolate)),
resource_(isolate, resource) {
CHECK_NOT_NULL(env_);
async_context_ = EmitAsyncInit(isolate, resource, name,
trigger_async_id);
env_->SetAsyncResourceContextFrame(
reinterpret_cast<std::uintptr_t>(this),
{isolate, async_context_frame::current(isolate)});
async_context_ = EmitAsyncInit(isolate, resource, name, trigger_async_id);
}
AsyncResource::~AsyncResource() {
CHECK_NOT_NULL(env_);
env_->RemoveAsyncResourceContextFrame(reinterpret_cast<std::uintptr_t>(this));
EmitAsyncDestroy(env_, async_context_);
}
MaybeLocal<Value> AsyncResource::MakeCallback(Local<Function> callback,
int argc,
Local<Value>* argv) {
return node::MakeCallback(env_->isolate(), get_resource(),
callback, argc, argv,
async_context_);
auto isolate = env_->isolate();
auto context_frame =
env_->GetAsyncResourceContextFrame(reinterpret_cast<std::uintptr_t>(this))
.Get(isolate);
async_context_frame::Scope async_context_frame_scope(isolate, context_frame);
return node::MakeCallback(
isolate, get_resource(), callback, argc, argv, async_context_);
}
MaybeLocal<Value> AsyncResource::MakeCallback(const char* method,
int argc,
Local<Value>* argv) {
return node::MakeCallback(env_->isolate(), get_resource(),
method, argc, argv,
async_context_);
auto isolate = env_->isolate();
auto context_frame =
env_->GetAsyncResourceContextFrame(reinterpret_cast<std::uintptr_t>(this))
.Get(isolate);
async_context_frame::Scope async_context_frame_scope(isolate, context_frame);
return node::MakeCallback(
isolate, get_resource(), method, argc, argv, async_context_);
}
MaybeLocal<Value> AsyncResource::MakeCallback(Local<String> symbol,
int argc,
Local<Value>* argv) {
return node::MakeCallback(env_->isolate(), get_resource(),
symbol, argc, argv,
async_context_);
auto isolate = env_->isolate();
auto context_frame =
env_->GetAsyncResourceContextFrame(reinterpret_cast<std::uintptr_t>(this))
.Get(isolate);
async_context_frame::Scope async_context_frame_scope(isolate, context_frame);
return node::MakeCallback(
isolate, get_resource(), symbol, argc, argv, async_context_);
}
Local<Object> AsyncResource::get_resource() {

View File

@ -1,6 +1,7 @@
#include "node.h"
#include "async_context_frame.h"
#include "async_wrap-inl.h"
#include "env-inl.h"
#include "node.h"
#include "v8.h"
namespace node {
@ -14,6 +15,7 @@ using v8::Local;
using v8::MaybeLocal;
using v8::Object;
using v8::String;
using v8::Undefined;
using v8::Value;
CallbackScope::CallbackScope(Isolate* isolate,
@ -38,21 +40,23 @@ CallbackScope::~CallbackScope() {
}
InternalCallbackScope::InternalCallbackScope(AsyncWrap* async_wrap, int flags)
: InternalCallbackScope(async_wrap->env(),
async_wrap->object(),
{ async_wrap->get_async_id(),
async_wrap->get_trigger_async_id() },
flags) {}
: InternalCallbackScope(
async_wrap->env(),
async_wrap->object(),
{async_wrap->get_async_id(), async_wrap->get_trigger_async_id()},
flags,
async_wrap->context_frame()) {}
InternalCallbackScope::InternalCallbackScope(Environment* env,
Local<Object> object,
const async_context& asyncContext,
int flags)
: env_(env),
async_context_(asyncContext),
object_(object),
skip_hooks_(flags & kSkipAsyncHooks),
skip_task_queues_(flags & kSkipTaskQueues) {
int flags,
v8::Local<v8::Value> context_frame)
: env_(env),
async_context_(asyncContext),
object_(object),
skip_hooks_(flags & kSkipAsyncHooks),
skip_task_queues_(flags & kSkipTaskQueues) {
CHECK_NOT_NULL(env);
env->PushAsyncCallbackScope();
@ -76,6 +80,9 @@ InternalCallbackScope::InternalCallbackScope(Environment* env,
isolate->SetIdle(false);
prior_context_frame_.Reset(
isolate, async_context_frame::exchange(isolate, context_frame));
env->async_hooks()->push_async_context(
async_context_.async_id, async_context_.trigger_async_id, object);
@ -117,9 +124,12 @@ void InternalCallbackScope::Close() {
AsyncWrap::EmitAfter(env_, async_context_.async_id);
}
if (pushed_ids_)
if (pushed_ids_) {
env_->async_hooks()->pop_async_context(async_context_.async_id);
async_context_frame::exchange(isolate, prior_context_frame_.Get(isolate));
}
if (failed_) return;
if (env_->async_callback_scope_depth() > 1 || skip_task_queues_) {
@ -173,7 +183,8 @@ MaybeLocal<Value> InternalMakeCallback(Environment* env,
const Local<Function> callback,
int argc,
Local<Value> argv[],
async_context asyncContext) {
async_context asyncContext,
Local<Value> context_frame) {
CHECK(!recv.IsEmpty());
#ifdef DEBUG
for (int i = 0; i < argc; i++)
@ -194,7 +205,8 @@ MaybeLocal<Value> InternalMakeCallback(Environment* env,
async_hooks->fields()[AsyncHooks::kUsesExecutionAsyncResource] > 0;
}
InternalCallbackScope scope(env, resource, asyncContext, flags);
InternalCallbackScope scope(
env, resource, asyncContext, flags, context_frame);
if (scope.Failed()) {
return MaybeLocal<Value>();
}
@ -270,6 +282,17 @@ MaybeLocal<Value> MakeCallback(Isolate* isolate,
int argc,
Local<Value> argv[],
async_context asyncContext) {
return InternalMakeCallback(
isolate, recv, callback, argc, argv, asyncContext, Undefined(isolate));
}
MaybeLocal<Value> InternalMakeCallback(Isolate* isolate,
Local<Object> recv,
Local<Function> callback,
int argc,
Local<Value> argv[],
async_context asyncContext,
Local<Value> context_frame) {
// Observe the following two subtleties:
//
// 1. The environment is retrieved from the callback function's context.
@ -281,8 +304,8 @@ MaybeLocal<Value> MakeCallback(Isolate* isolate,
Environment::GetCurrent(callback->GetCreationContextChecked());
CHECK_NOT_NULL(env);
Context::Scope context_scope(env->context());
MaybeLocal<Value> ret =
InternalMakeCallback(env, recv, recv, callback, argc, argv, asyncContext);
MaybeLocal<Value> ret = InternalMakeCallback(
env, recv, recv, callback, argc, argv, asyncContext, context_frame);
if (ret.IsEmpty() && env->async_callback_scope_depth() == 0) {
// This is only for legacy compatibility and we may want to look into
// removing/adjusting it.
@ -315,9 +338,14 @@ MaybeLocal<Value> MakeSyncCallback(Isolate* isolate,
// This is a toplevel invocation and the caller (intentionally)
// didn't provide any async_context to run in. Install a default context.
MaybeLocal<Value> ret =
InternalMakeCallback(env, env->process_object(), recv, callback, argc, argv,
async_context{0, 0});
MaybeLocal<Value> ret = InternalMakeCallback(env,
env->process_object(),
recv,
callback,
argc,
argv,
async_context{0, 0},
v8::Undefined(isolate));
return ret;
}

View File

@ -0,0 +1,89 @@
#include "async_context_frame.h" // NOLINT(build/include_inline)
#include "env-inl.h"
#include "node_errors.h"
#include "node_external_reference.h"
#include "tracing/traced_value.h"
#include "util-inl.h"
#include "debug_utils-inl.h"
#include "v8.h"
using v8::Context;
using v8::Isolate;
using v8::Local;
using v8::Object;
using v8::String;
using v8::Value;
namespace node {
namespace async_context_frame {
//
// Scope helper
//
Scope::Scope(Isolate* isolate, Local<Value> object) : isolate_(isolate) {
auto prior = exchange(isolate, object);
prior_.Reset(isolate, prior);
}
Scope::~Scope() {
auto value = prior_.Get(isolate_);
set(isolate_, value);
}
Local<Value> current(Isolate* isolate) {
return isolate->GetContinuationPreservedEmbedderData();
}
void set(Isolate* isolate, Local<Value> value) {
auto env = Environment::GetCurrent(isolate);
if (!env->options()->async_context_frame) {
return;
}
isolate->SetContinuationPreservedEmbedderData(value);
}
// NOTE: It's generally recommended to use async_context_frame::Scope
// but sometimes (such as enterWith) a direct exchange is needed.
Local<Value> exchange(Isolate* isolate, Local<Value> value) {
auto prior = current(isolate);
set(isolate, value);
return prior;
}
void CreatePerContextProperties(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
Local<String> getContinuationPreservedEmbedderData = FIXED_ONE_BYTE_STRING(
env->isolate(), "getContinuationPreservedEmbedderData");
Local<String> setContinuationPreservedEmbedderData = FIXED_ONE_BYTE_STRING(
env->isolate(), "setContinuationPreservedEmbedderData");
// Grab the intrinsics from the binding object and expose those to our
// binding layer.
Local<Object> binding = context->GetExtrasBindingObject();
target
->Set(context,
getContinuationPreservedEmbedderData,
binding->Get(context, getContinuationPreservedEmbedderData)
.ToLocalChecked())
.Check();
target
->Set(context,
setContinuationPreservedEmbedderData,
binding->Get(context, setContinuationPreservedEmbedderData)
.ToLocalChecked())
.Check();
}
} // namespace async_context_frame
} // namespace node
NODE_BINDING_CONTEXT_AWARE_INTERNAL(
async_context_frame, node::async_context_frame::CreatePerContextProperties)

33
src/async_context_frame.h Normal file
View File

@ -0,0 +1,33 @@
#ifndef SRC_ASYNC_CONTEXT_FRAME_H_
#define SRC_ASYNC_CONTEXT_FRAME_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "base_object.h"
#include "v8.h"
#include <cstdint>
namespace node {
namespace async_context_frame {
class Scope {
public:
explicit Scope(v8::Isolate* isolate, v8::Local<v8::Value> object);
~Scope();
private:
v8::Isolate* isolate_;
v8::Global<v8::Value> prior_;
};
v8::Local<v8::Value> current(v8::Isolate* isolate);
void set(v8::Isolate* isolate, v8::Local<v8::Value> value);
v8::Local<v8::Value> exchange(v8::Isolate* isolate, v8::Local<v8::Value> value);
} // namespace async_context_frame
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#endif // SRC_ASYNC_CONTEXT_FRAME_H_

View File

@ -49,6 +49,9 @@ inline double AsyncWrap::get_trigger_async_id() const {
return trigger_async_id_;
}
inline v8::Local<v8::Value> AsyncWrap::context_frame() const {
return context_frame_.Get(env()->isolate());
}
inline v8::MaybeLocal<v8::Value> AsyncWrap::MakeCallback(
const v8::Local<v8::String> symbol,

View File

@ -20,6 +20,7 @@
// USE OR OTHER DEALINGS IN THE SOFTWARE.
#include "async_wrap.h" // NOLINT(build/include_inline)
#include "async_context_frame.h"
#include "async_wrap-inl.h"
#include "env-inl.h"
#include "node_errors.h"
@ -513,8 +514,9 @@ AsyncWrap::AsyncWrap(Environment* env,
}
AsyncWrap::AsyncWrap(Environment* env, Local<Object> object)
: BaseObject(env, object) {
}
: BaseObject(env, object),
context_frame_(env->isolate(),
async_context_frame::current(env->isolate())) {}
// This method is necessary to work around one specific problem:
// Before the init() hook runs, if there is one, the BaseObject() constructor
@ -606,8 +608,9 @@ void AsyncWrap::AsyncReset(Local<Object> resource, double execution_async_id,
: execution_async_id;
trigger_async_id_ = env()->get_default_trigger_async_id();
Isolate* isolate = env()->isolate();
{
HandleScope handle_scope(env()->isolate());
HandleScope handle_scope(isolate);
Local<Object> obj = object();
CHECK(!obj.IsEmpty());
if (resource != obj) {
@ -637,6 +640,8 @@ void AsyncWrap::AsyncReset(Local<Object> resource, double execution_async_id,
UNREACHABLE();
}
context_frame_.Reset(isolate, async_context_frame::current(isolate));
if (silent) return;
EmitAsyncInit(env(), resource,
@ -680,8 +685,15 @@ MaybeLocal<Value> AsyncWrap::MakeCallback(const Local<Function> cb,
ProviderType provider = provider_type();
async_context context { get_async_id(), get_trigger_async_id() };
MaybeLocal<Value> ret = InternalMakeCallback(
env(), object(), object(), cb, argc, argv, context);
MaybeLocal<Value> ret =
InternalMakeCallback(env(),
object(),
object(),
cb,
argc,
argv,
context,
context_frame_.Get(env()->isolate()));
// This is a static call with cached values because the `this` object may
// no longer be alive at this point.

View File

@ -187,6 +187,8 @@ class AsyncWrap : public BaseObject {
inline double get_async_id() const;
inline double get_trigger_async_id() const;
inline v8::Local<v8::Value> context_frame() const;
void AsyncReset(v8::Local<v8::Object> resource,
double execution_async_id = kInvalidAsyncId,
bool silent = false);
@ -239,6 +241,8 @@ class AsyncWrap : public BaseObject {
// Because the values may be Reset(), cannot be made const.
double async_id_ = kInvalidAsyncId;
double trigger_async_id_ = kInvalidAsyncId;
v8::Global<v8::Value> context_frame_;
};
} // namespace node

View File

@ -934,6 +934,26 @@ inline void Environment::RemoveHeapSnapshotNearHeapLimitCallback(
heap_limit);
}
inline void Environment::SetAsyncResourceContextFrame(
std::uintptr_t async_resource_handle,
v8::Global<v8::Value>&& context_frame) {
async_resource_context_frames_.emplace(
std::make_pair(async_resource_handle, std::move(context_frame)));
}
inline const v8::Global<v8::Value>& Environment::GetAsyncResourceContextFrame(
std::uintptr_t async_resource_handle) {
auto&& async_resource_context_frame =
async_resource_context_frames_.find(async_resource_handle);
CHECK_NE(async_resource_context_frame, async_resource_context_frames_.end());
return async_resource_context_frame->second;
}
inline void Environment::RemoveAsyncResourceContextFrame(
std::uintptr_t async_resource_handle) {
async_resource_context_frames_.erase(async_resource_handle);
}
} // namespace node
// These two files depend on each other. Including base_object-inl.h after this

View File

@ -596,7 +596,7 @@ v8::Maybe<ExitCode> EmitProcessExitInternal(Environment* env);
* environment. Each environment has a principal realm. An environment can
* create multiple subsidiary synthetic realms.
*/
class Environment : public MemoryRetainer {
class Environment final : public MemoryRetainer {
public:
Environment(const Environment&) = delete;
Environment& operator=(const Environment&) = delete;
@ -1059,6 +1059,14 @@ class Environment : public MemoryRetainer {
v8::Global<v8::Module> temporary_required_module_facade_original;
void SetAsyncResourceContextFrame(std::uintptr_t async_resource_handle,
v8::Global<v8::Value>&&);
const v8::Global<v8::Value>& GetAsyncResourceContextFrame(
std::uintptr_t async_resource_handle);
void RemoveAsyncResourceContextFrame(std::uintptr_t async_resource_handle);
private:
inline void ThrowError(v8::Local<v8::Value> (*fun)(v8::Local<v8::String>,
v8::Local<v8::Value>),
@ -1231,6 +1239,9 @@ class Environment : public MemoryRetainer {
// track of the BackingStore for a given pointer.
std::unordered_map<char*, std::unique_ptr<v8::BackingStore>>
released_allocated_buffers_;
std::unordered_map<std::uintptr_t, v8::Global<v8::Value>>
async_resource_context_frames_;
};
} // namespace node

View File

@ -1,3 +1,4 @@
#include "async_context_frame.h"
#include "async_wrap-inl.h"
#include "env-inl.h"
#define NAPI_EXPERIMENTAL
@ -545,7 +546,9 @@ class AsyncContext {
: env_(env) {
async_id_ = node_env()->new_async_id();
trigger_async_id_ = node_env()->get_default_trigger_async_id();
resource_.Reset(node_env()->isolate(), resource_object);
v8::Isolate* isolate = node_env()->isolate();
resource_.Reset(isolate, resource_object);
context_frame_.Reset(isolate, node::async_context_frame::current(isolate));
lost_reference_ = false;
if (externally_managed_resource) {
resource_.SetWeak(
@ -571,13 +574,15 @@ class AsyncContext {
int argc,
v8::Local<v8::Value> argv[]) {
EnsureReference();
return node::InternalMakeCallback(node_env(),
resource(),
recv,
callback,
argc,
argv,
{async_id_, trigger_async_id_});
return node::InternalMakeCallback(
node_env(),
resource(),
recv,
callback,
argc,
argv,
{async_id_, trigger_async_id_},
context_frame_.Get(node_env()->isolate()));
}
inline napi_callback_scope OpenCallbackScope() {
@ -633,6 +638,7 @@ class AsyncContext {
double trigger_async_id_;
v8::Global<v8::Object> resource_;
bool lost_reference_;
v8::Global<v8::Value> context_frame_;
};
} // end of anonymous namespace

View File

@ -29,6 +29,7 @@
// The binding IDs that start with 'internal_only' are not exposed to the user
// land even from internal/test/binding module under --expose-internals.
#define NODE_BUILTIN_STANDARD_BINDINGS(V) \
V(async_context_frame) \
V(async_wrap) \
V(blob) \
V(block_list) \

View File

@ -212,7 +212,17 @@ v8::MaybeLocal<v8::Value> InternalMakeCallback(
const v8::Local<v8::Function> callback,
int argc,
v8::Local<v8::Value> argv[],
async_context asyncContext);
async_context asyncContext,
v8::Local<v8::Value> context_frame);
v8::MaybeLocal<v8::Value> InternalMakeCallback(
v8::Isolate* isolate,
v8::Local<v8::Object> recv,
const v8::Local<v8::Function> callback,
int argc,
v8::Local<v8::Value> argv[],
async_context asyncContext,
v8::Local<v8::Value> context_frame);
v8::MaybeLocal<v8::Value> MakeSyncCallback(v8::Isolate* isolate,
v8::Local<v8::Object> recv,
@ -232,10 +242,13 @@ class InternalCallbackScope {
// compatibility issues, but it shouldn't.)
kSkipTaskQueues = 2
};
InternalCallbackScope(Environment* env,
v8::Local<v8::Object> object,
const async_context& asyncContext,
int flags = kNoFlags);
InternalCallbackScope(
Environment* env,
v8::Local<v8::Object> object,
const async_context& asyncContext,
int flags = kNoFlags,
v8::Local<v8::Value> context_frame = v8::Local<v8::Value>());
// Utility that can be used by AsyncWrap classes.
explicit InternalCallbackScope(AsyncWrap* async_wrap, int flags = 0);
~InternalCallbackScope();
@ -253,6 +266,7 @@ class InternalCallbackScope {
bool failed_ = false;
bool pushed_ids_ = false;
bool closed_ = false;
v8::Global<v8::Value> prior_context_frame_;
};
class DebugSealHandleScope {

View File

@ -487,6 +487,10 @@ EnvironmentOptionsParser::EnvironmentOptionsParser() {
AddOption(
"--experimental-wasi-unstable-preview1", "", NoOp{}, kAllowedInEnvvar);
AddOption("--expose-gc", "expose gc extension", V8Option{}, kAllowedInEnvvar);
AddOption("--experimental-async-context-frame",
"Improve AsyncLocalStorage performance with AsyncContextFrame",
&EnvironmentOptions::async_context_frame,
kAllowedInEnvvar);
AddOption("--expose-internals", "", &EnvironmentOptions::expose_internals);
AddOption("--frozen-intrinsics",
"experimental frozen intrinsics support",

View File

@ -136,6 +136,7 @@ class EnvironmentOptions : public Options {
bool allow_worker_threads = false;
bool experimental_repl_await = true;
bool experimental_vm_modules = false;
bool async_context_frame = false;
bool expose_internals = false;
bool force_node_api_uncaught_exceptions_policy = false;
bool frozen_intrinsics = false;

View File

@ -1,7 +1,7 @@
Error: test
at one (*fixtures*async-error.js:4:9)
at two (*fixtures*async-error.js:17:9)
at process.processTicksAndRejections (node:internal*process*task_queues:95:5)
at process.processTicksAndRejections (node:internal*process*task_queues:105:5)
at async three (*fixtures*async-error.js:20:3)
at async four (*fixtures*async-error.js:24:3)
at async main (*async_error_nexttick_main.js:7:5)

View File

@ -0,0 +1,59 @@
import { isWindows } from '../common/index.mjs';
import { spawn } from 'node:child_process';
import { once } from 'node:events';
import { opendir } from 'node:fs/promises';
import { fileURLToPath } from 'node:url';
import { describe, it } from 'node:test';
import { sep } from 'node:path';
const python = process.env.PYTHON || (isWindows ? 'python' : 'python3');
const testRunner = fileURLToPath(
new URL('../../tools/test.py', import.meta.url)
);
const setNames = ['async-hooks', 'parallel'];
// Get all test names for each set
const testSets = await Promise.all(setNames.map(async (name) => {
const path = fileURLToPath(new URL(`../${name}`, import.meta.url));
const dir = await opendir(path);
const tests = [];
for await (const entry of dir) {
if (entry.name.startsWith('test-async-local-storage-')) {
tests.push(entry.name);
}
}
return {
name,
tests
};
}));
// Merge test sets with set name prefix
const tests = testSets.reduce((m, v) => {
for (const test of v.tests) {
m.push(`${v.name}${sep}${test}`);
}
return m;
}, []);
describe('AsyncContextFrame', {
concurrency: tests.length
}, () => {
for (const test of tests) {
it(test, async () => {
const proc = spawn(python, [
testRunner,
'--node-args=--experimental-async-context-frame',
test,
], {
stdio: ['ignore', 'ignore', 'inherit'],
});
await once(proc, 'exit');
});
}
});

View File

@ -5,20 +5,23 @@ const { AsyncLocalStorage } = require('async_hooks');
const als = new AsyncLocalStorage();
// Make sure _propagate function exists.
assert.ok(typeof als._propagate === 'function');
// The als instance should be getting removed from the storageList in
// lib/async_hooks.js when exit(...) is called, therefore when the nested runs
// are called there should be no copy of the als in the storageList to run the
// _propagate method on.
als._propagate = common.mustNotCall('_propagate() should not be called');
// The _propagate function only exists on the old JavaScript implementation.
if (typeof als._propagate === 'function') {
// The als instance should be getting removed from the storageList in
// lib/async_hooks.js when exit(...) is called, therefore when the nested runs
// are called there should be no copy of the als in the storageList to run the
// _propagate method on.
als._propagate = common.mustNotCall('_propagate() should not be called');
}
const done = common.mustCall();
const data = true;
function run(count) {
if (count === 0) return done();
als.run({}, () => {
assert.notStrictEqual(als.getStore(), data);
als.run(data, () => {
als.exit(run, --count);
});
}

View File

@ -30,6 +30,8 @@ expected.beforePreExec = new Set([
'NativeModule internal/errors',
'Internal Binding config',
'Internal Binding timers',
'Internal Binding async_context_frame',
'NativeModule internal/async_context_frame',
'Internal Binding async_wrap',
'Internal Binding task_queue',
'Internal Binding symbols',