Skip to content

Commit

Permalink
Implement automatic self-referential bindings via ctx.exports.
Browse files Browse the repository at this point in the history
For every top-level export, `ctx.exports` will contain a corresponding property which acts as a binding to that export. If the export is a simple object or `WorkerEntrypoint`, then the `ctx.exports` property is a service binding. If the export is a Durable Object class (and it has been configured with storage), the `ctx.exports` property is a DO namespace binding.

So you can do: `ctx.exports.MyEntrypoint.someRpc()`

And it calls `someRpc()` on the entrypoint named `MyEntrypoint`, as if you had called it over a service binding.

This is marked experimental for now since it won't work on the edge until further changes are made to our control plane.
  • Loading branch information
kentonv committed Feb 5, 2025
1 parent b27f087 commit bdae24f
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 23 deletions.
18 changes: 16 additions & 2 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,12 @@ class TestController: public jsg::Object {

class ExecutionContext: public jsg::Object {
public:
ExecutionContext(jsg::Lock& js): props(js, js.obj()) {}
ExecutionContext(jsg::Lock& js, jsg::JsValue props): props(js, props) {}
ExecutionContext(jsg::Lock& js, jsg::JsValue exports)
: exports(js, exports),
props(js, js.obj()) {}
ExecutionContext(jsg::Lock& js, jsg::JsValue exports, jsg::JsValue props)
: exports(js, exports),
props(js, props) {}

void waitUntil(kj::Promise<void> promise);
void passThroughOnException();
Expand All @@ -214,13 +218,22 @@ class ExecutionContext: public jsg::Object {
// and throwing an error at the client.
void abort(jsg::Lock& js, jsg::Optional<jsg::Value> reason);

jsg::JsValue getExports(jsg::Lock& js) {
return exports.getHandle(js);
}

jsg::JsValue getProps(jsg::Lock& js) {
return props.getHandle(js);
}

JSG_RESOURCE_TYPE(ExecutionContext, CompatibilityFlags::Reader flags) {
JSG_METHOD(waitUntil);
JSG_METHOD(passThroughOnException);
if (flags.getWorkerdExperimental()) {
// TODO(soon): Remove experimental gate as soon as we've wired up the control plane so that
// this works in production.
JSG_LAZY_INSTANCE_PROPERTY(exports, getExports);
}
JSG_LAZY_INSTANCE_PROPERTY(props, getProps);

if (flags.getWorkerdExperimental()) {
Expand All @@ -243,6 +256,7 @@ class ExecutionContext: public jsg::Object {
}

private:
jsg::JsRef<jsg::JsValue> exports;
jsg::JsRef<jsg::JsValue> props;

void visitForGc(jsg::GcVisitor& visitor) {
Expand Down
18 changes: 13 additions & 5 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ struct Worker::Impl {

// The environment blob to pass to handlers.
kj::Maybe<jsg::Value> env;
kj::Maybe<jsg::Value> ctxExports;

struct ActorClassInfo {
EntrypointClass cls;
Expand Down Expand Up @@ -1550,8 +1551,10 @@ kj::Maybe<jsg::JsObject> tryResolveMainModule(jsg::Lock& js,

Worker::Worker(kj::Own<const Script> scriptParam,
kj::Own<WorkerObserver> metricsParam,
kj::FunctionParam<void(jsg::Lock& lock, const Api& api, v8::Local<v8::Object> target)>
compileBindings,
kj::FunctionParam<void(jsg::Lock& lock,
const Api& api,
v8::Local<v8::Object> target,
v8::Local<v8::Object> ctxExports)> compileBindings,
IsolateObserver::StartType startType,
TraceParentContext spans,
LockType lockType,
Expand Down Expand Up @@ -1639,7 +1642,9 @@ Worker::Worker(kj::Own<const Script> scriptParam,
lock.v8Set(bindingsScope, global.name, global.value);
}

compileBindings(lock, script->isolate->getApi(), bindingsScope);
v8::Local<v8::Object> ctxExports = v8::Object::New(lock.v8Isolate);

compileBindings(lock, script->isolate->getApi(), bindingsScope, ctxExports);

// Execute script.
currentSpan = maybeMakeSpan("lw:top_level_execution"_kjc);
Expand All @@ -1656,6 +1661,7 @@ Worker::Worker(kj::Own<const Script> scriptParam,
KJ_IF_SOME(ns,
tryResolveMainModule(lock, mainModule, *jsContext, *script, limitErrorOrTime)) {
impl->env = lock.v8Ref(bindingsScope.As<v8::Value>());
impl->ctxExports = lock.v8Ref(ctxExports.As<v8::Value>());

auto& api = script->isolate->getApi();
auto handlers = api.unwrapExports(lock, ns);
Expand All @@ -1670,7 +1676,7 @@ Worker::Worker(kj::Own<const Script> scriptParam,
// requests. This is weird and obviously wrong but changing it probably
// requires a compat flag. Until then, connection properties will not be
// available for non-class handlers.
obj.ctx = jsg::alloc<api::ExecutionContext>(lock);
obj.ctx = jsg::alloc<api::ExecutionContext>(lock, jsg::JsValue(ctxExports));

impl->namedHandlers.insert(kj::mv(handler.name), kj::mv(obj));
}
Expand Down Expand Up @@ -1974,7 +1980,9 @@ kj::Maybe<kj::Own<api::ExportedHandler>> Worker::Lock::getExportedHandler(
return fakeOwn(h);
} else KJ_IF_SOME(cls, worker.impl->statelessClasses.find(n)) {
jsg::Lock& js = *this;
auto handler = kj::heap(cls(js, jsg::alloc<api::ExecutionContext>(js, props.toJs(js)),
auto handler = kj::heap(cls(js,
jsg::alloc<api::ExecutionContext>(js,
jsg::JsValue(KJ_ASSERT_NONNULL(worker.impl->ctxExports).getHandle(js)), props.toJs(js)),
KJ_ASSERT_NONNULL(worker.impl->env).addRef(js)));

// HACK: We set handler.env and handler.ctx to undefined because we already passed the real
Expand Down
10 changes: 7 additions & 3 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,19 @@ class Worker: public kj::AtomicRefcounted {

explicit Worker(kj::Own<const Script> script,
kj::Own<WorkerObserver> metrics,
kj::FunctionParam<void(jsg::Lock& lock, const Api& api, v8::Local<v8::Object> target)>
compileBindings,
kj::FunctionParam<void(jsg::Lock& lock,
const Api& api,
v8::Local<v8::Object> target,
v8::Local<v8::Object> ctxExports)> compileBindings,
IsolateObserver::StartType startType,
TraceParentContext spans,
LockType lockType,
kj::Maybe<ValidationErrorReporter&> errorReporter = kj::none,
kj::Maybe<kj::Duration&> startupTime = kj::none);
// `compileBindings()` is a callback that constructs all of the bindings and adds them as
// properties to `target`.
// properties to `target`. It also compiles the `ctx.exports` object and writes it to
// `ctxExports`. Note that it is permissible for this callback to save a handle to `ctxExports`
// and fill it in later if needed, as long as it is filled in before any requests are started.

~Worker() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(Worker);
Expand Down
73 changes: 73 additions & 0 deletions src/workerd/server/server-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4006,6 +4006,79 @@ KJ_TEST("Server: Entrypoint binding with props") {
conn.httpGet200("/", "got: 123");
}

KJ_TEST("Server: ctx.exports self-referential bindings") {
TestServer test(R"((
services = [
( name = "hello",
worker = (
compatibilityDate = "2024-02-23",
compatibilityFlags = ["experimental"],
modules = [
( name = "main.js",
esModule =
`import { WorkerEntrypoint, DurableObject } from "cloudflare:workers";
`export default {
` async fetch(request, env, ctx) {
` // First set the actor state the old fashion way, to make sure we get
` // reconnected to the same actor when using self-referential bindings.
` {
` let bindingActor = env.NS.get(env.NS.idFromName("qux"));
` await bindingActor.setValue(234);
` }
`
` let actor = ctx.exports.MyActor.get(ctx.exports.MyActor.idFromName("qux"));
` return new Response([
` await ctx.exports.MyEntrypoint.foo(123),
` await ctx.exports.AnotherEntrypoint.bar(321),
` await actor.baz(),
` "UnconfiguredActor" in ctx.exports, // should be false
` ].join(", "));
` }
`}
`export class MyEntrypoint extends WorkerEntrypoint {
` foo(i) { return `foo: ${i}` }
`}
`export class AnotherEntrypoint extends WorkerEntrypoint {
` bar(i) { return `bar: ${i}` }
`}
`export class MyActor extends DurableObject {
` setValue(i) { this.value = i; }
` baz() { return `baz: ${this.value}` }
`}
`export class UnconfiguredActor extends DurableObject {
` qux(i) { return `qux: ${i}` }
`}
)
],
bindings = [
# A regular binding, just here to make sure it doesn't mess up self-referential
# channel numbers.
( name = "INTERNET", service = "internet" ),
# Similarly, an actor namespace binding.
(name = "NS", durableObjectNamespace = "MyActor")
],
durableObjectNamespaces = [
( className = "MyActor",
uniqueKey = "mykey",
)
],
durableObjectStorage = (inMemory = void)
)
),
],
sockets = [
( name = "main", address = "test-addr", service = "hello" ),
]
))"_kj);

test.server.allowExperimental();
test.start();

auto conn = test.connect("test-addr");
conn.httpGet200("/", "foo: 123, bar: 321, baz: 234, false");
}

// =======================================================================================

// TODO(beta): Test TLS (send and receive)
Expand Down
114 changes: 102 additions & 12 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1856,6 +1856,10 @@ class Server::WorkerService final: public Service,
return kj::heap<EntrypointService>(*this, name, propsJson, *handlers);
}

bool hasDefaultEntrypoint() {
return defaultEntrypointHandlers != kj::none;
}

kj::Array<kj::StringPtr> getEntrypointNames() {
return KJ_MAP(e, namedEntrypoints) -> kj::StringPtr { return e.key; };
}
Expand Down Expand Up @@ -3430,25 +3434,83 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name,
}
}

jsg::V8Ref<v8::Object> ctxExportsHandle = nullptr;
auto worker = kj::atomicRefcounted<Worker>(kj::mv(script), kj::atomicRefcounted<WorkerObserver>(),
[&](jsg::Lock& lock, const Worker::Api& api, v8::Local<v8::Object> target) {
[&](jsg::Lock& lock, const Worker::Api& api, v8::Local<v8::Object> target,
v8::Local<v8::Object> ctxExports) {
// We can't fill in ctx.exports yet because we need to run the validator first to discover
// entrypoints, which we cannot do until after the Worker constructor completes. We are
// permitted to hold a handle until then, though.
ctxExportsHandle = lock.v8Ref(ctxExports);

return WorkerdApi::from(api).compileGlobals(lock, globals, target, 1);
}, IsolateObserver::StartType::COLD,
},
IsolateObserver::StartType::COLD,
TraceParentContext(nullptr, nullptr), // systemTracer -- TODO(beta): factor out
Worker::Lock::TakeSynchronously(kj::none), errorReporter);

{
worker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none),
[&](Worker::Lock& lock) { lock.validateHandlers(errorReporter); });
worker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none), [&](Worker::Lock& lock) {
lock.validateHandlers(errorReporter);

// Build `ctx.exports` based on the entrypoints reported by `validateHandlers()`.
kj::Vector<Global> ctxExports(
errorReporter.namedEntrypoints.size() + localActorConfigs.size());

// Start numbering loopback channels for stateless entrypoints after the last subrequest
// channel used by bindings.
uint nextSubrequestChannel =
subrequestChannels.size() + IoContext::SPECIAL_SUBREQUEST_CHANNEL_COUNT;
if (errorReporter.defaultEntrypoint != kj::none) {
ctxExports.add(Global{.name = kj::str("default"),
.value = Global::Fetcher{
.channel = nextSubrequestChannel++, .requiresHost = true, .isInHouse = false}});
}
for (auto& ep: errorReporter.namedEntrypoints) {
ctxExports.add(Global{.name = kj::str(ep.key),
.value = Global::Fetcher{
.channel = nextSubrequestChannel++, .requiresHost = true, .isInHouse = false}});
}

// Start numbering loopback channels for actor classes after the last actor channel used by
// bindings.
uint nextActorChannel = actorChannels.size();
for (auto& ns: localActorConfigs) {
decltype(Global::value) value;
KJ_SWITCH_ONEOF(ns.value) {
KJ_CASE_ONEOF(durable, Durable) {
value = Global::DurableActorNamespace{
.actorChannel = nextActorChannel++, .uniqueKey = durable.uniqueKey};
}
KJ_CASE_ONEOF(ephemeral, Ephemeral) {
value = Global::EphemeralActorNamespace{
.actorChannel = nextActorChannel++,
};
}
}
ctxExports.add(Global{.name = kj::str(ns.key), .value = kj::mv(value)});
}

JSG_WITHIN_CONTEXT_SCOPE(lock, lock.getContext(), [&](jsg::Lock& js) {
WorkerdApi::from(worker->getIsolate().getApi())
.compileGlobals(lock, ctxExports, ctxExportsHandle.getHandle(js), 1);
});

// As an optimization, drop this now while we have the lock.
{ auto drop = kj::mv(ctxExportsHandle); }
});
}

auto linkCallback = [this, name, conf, subrequestChannels = kj::mv(subrequestChannels),
actorChannels = kj::mv(actorChannels)](
WorkerService& workerService) mutable {
actorChannels = kj::mv(actorChannels),
&localActorConfigs](WorkerService& workerService) mutable {
WorkerService::LinkedIoChannels result{.alarmScheduler = *alarmScheduler};

auto services = kj::heapArrayBuilder<kj::Own<Service>>(
subrequestChannels.size() + IoContext::SPECIAL_SUBREQUEST_CHANNEL_COUNT);
auto entrypointNames = workerService.getEntrypointNames();

auto services = kj::heapArrayBuilder<kj::Own<Service>>(subrequestChannels.size() +
IoContext::SPECIAL_SUBREQUEST_CHANNEL_COUNT + entrypointNames.size() +
workerService.hasDefaultEntrypoint());

kj::Own<Service> globalService =
lookupService(conf.getGlobalOutbound(), kj::str("Worker \"", name, "\"'s globalOutbound"));
Expand All @@ -3469,26 +3531,54 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name,
services.add(lookupService(channel.designator, kj::mv(channel.errorContext)));
}

// Link the ctx.exports self-referential channels. Note that it's important these are added
// in exactyl the same order as the channels were allocated earlier when we compiled the
// ctx.exports bindings.
if (workerService.hasDefaultEntrypoint()) {
services.add(KJ_ASSERT_NONNULL(workerService.getEntrypoint(kj::none, kj::none)));
}
for (auto& ep: entrypointNames) {
services.add(KJ_ASSERT_NONNULL(workerService.getEntrypoint(ep, kj::none)));
}

result.subrequest = services.finish();

result.actor = KJ_MAP(channel, actorChannels) -> kj::Maybe<WorkerService::ActorNamespace&> {
auto linkedActorChannels = kj::heapArrayBuilder<kj::Maybe<WorkerService::ActorNamespace&>>(
actorChannels.size() + localActorConfigs.size());

for (auto& channel: actorChannels) {
WorkerService* targetService = &workerService;
if (channel.designator.hasServiceName()) {
auto& svc = KJ_UNWRAP_OR(this->services.find(channel.designator.getServiceName()), {
// error was reported earlier
return kj::none;
linkedActorChannels.add(kj::none);
continue;
});
targetService = dynamic_cast<WorkerService*>(svc.get());
if (targetService == nullptr) {
// error was reported earlier
return kj::none;
linkedActorChannels.add(kj::none);
continue;
}
}

// (If getActorNamespace() returns null, an error was reported earlier.)
return targetService->getActorNamespace(channel.designator.getClassName());
linkedActorChannels.add(targetService->getActorNamespace(channel.designator.getClassName()));
};

// Link the ctx.exports self-referential actor channels. Again, it's important that these
// be added in the same order as before. kj::HashMap iteration order is deterministic, and
// is exactly insertion order as long as no entries have been removed, so we can expect that
// `workerService.getActorNamespaces()` iterates in the same order as `localActorConfigs` did
// earlier.
auto& selfActorNamespaces = workerService.getActorNamespaces();
KJ_ASSERT(selfActorNamespaces.size() == localActorConfigs.size());
for (auto& ns: selfActorNamespaces) {
linkedActorChannels.add(*ns.value);
}

result.actor = linkedActorChannels.finish();

if (conf.hasCacheApiOutbound()) {
result.cache = lookupService(
conf.getCacheApiOutbound(), kj::str("Worker \"", name, "\"'s cacheApiOutbound"));
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ TestFixture::TestFixture(SetupParams&& params)
nullptr)),
worker(kj::atomicRefcounted<Worker>(kj::atomicAddRef(*workerScript),
kj::atomicRefcounted<WorkerObserver>(),
[](jsg::Lock&, const Worker::Api&, v8::Local<v8::Object>) {
[](jsg::Lock&, const Worker::Api&, v8::Local<v8::Object>, v8::Local<v8::Object>) {
// no bindings, nothing to do
},
IsolateObserver::StartType::COLD,
Expand Down
1 change: 1 addition & 0 deletions types/generated-snapshot/experimental/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ interface TestController {}
interface ExecutionContext {
waitUntil(promise: Promise<any>): void;
passThroughOnException(): void;
exports: any;
props: any;
abort(reason?: any): void;
}
Expand Down
1 change: 1 addition & 0 deletions types/generated-snapshot/experimental/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ export interface TestController {}
export interface ExecutionContext {
waitUntil(promise: Promise<any>): void;
passThroughOnException(): void;
exports: any;
props: any;
abort(reason?: any): void;
}
Expand Down

0 comments on commit bdae24f

Please sign in to comment.