Skip to content

Commit 24a1685

Browse files
committed
backport effect 4 cluster (#5646)
1 parent 2a03c76 commit 24a1685

File tree

74 files changed

+3280
-4594
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+3280
-4594
lines changed

.changeset/loud-cows-prove.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
---
2+
"@effect/platform-node-shared": minor
3+
"@effect/platform-node": minor
4+
"@effect/platform-bun": minor
5+
"@effect/cluster": minor
6+
"@effect/rpc": patch
7+
"@effect/workflow": minor
8+
---
9+
10+
backport @effect/cluster from effect v4
11+
12+
@effect/cluster no longer requires a Shard Manager, and instead relies on the
13+
`RunnerStorage` service to track runner state.
14+
15+
To migrate, remove any Shard Manager deployments and use the updated layers in
16+
`@effect/platform-node` or `@effect/platform-bun`.
17+
18+
# Breaking Changes
19+
20+
- `ShardManager` module has been removed
21+
- `EntityNotManagedByRunner` error has been removed
22+
- Shard locks now use database advisory locks, which requires stable sessions
23+
for database connections. This means load balancers or proxies that rotate
24+
connections may cause issues.
25+
- `@effect/platform-node/NodeClusterSocketRunner` is now
26+
`@effect/cluster/NodeClusterSocket`
27+
- `@effect/platform-node/NodeClusterHttpRunner` is now
28+
`@effect/cluster/NodeClusterHttp`
29+
- `@effect/platform-bun/BunClusterSocketRunner` is now
30+
`@effect/cluster/BunClusterSocket`
31+
- `@effect/platform-bun/BunClusterHttpRunner` is now
32+
`@effect/cluster/BunClusterHttp`
33+
34+
# New Features
35+
36+
- `RunnerHealth.layerK8s` has been added, which uses the Kubernetes API to track
37+
runner health and liveness. To use it, you will need a service account with
38+
permissions to read pod information.

.changeset/petite-signs-thank.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/sql-pg": patch
3+
---
4+
5+
disable pg onnotice by default

.changeset/plenty-bats-ask.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/platform": patch
3+
---
4+
5+
expose Layer output in HttpLayerRouter.serve

.changeset/sad-bags-fall.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@effect/sql-pg": minor
3+
---
4+
5+
Use "pg" npm library for @effect/sql-pg backend

.changeset/warm-aliens-dig.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"effect": minor
3+
---
4+
5+
add experimental HashRing module

packages/cluster/src/ClusterError.ts

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -45,30 +45,6 @@ export class EntityNotAssignedToRunner extends Schema.TaggedError<EntityNotAssig
4545
}
4646
}
4747

48-
/**
49-
* Represents an error that occurs when a Runner receives a message for an entity
50-
* that it is not responsible for managing.
51-
*
52-
* @since 1.0.0
53-
* @category errors
54-
*/
55-
export class EntityNotManagedByRunner extends Schema.TaggedError<EntityNotManagedByRunner>()(
56-
"EntityNotManagedByRunner",
57-
{ address: EntityAddress }
58-
) {
59-
/**
60-
* @since 1.0.0
61-
*/
62-
readonly [TypeId] = TypeId
63-
64-
/**
65-
* @since 1.0.0
66-
*/
67-
static is(u: unknown): u is EntityNotManagedByRunner {
68-
return hasProperty(u, TypeId) && isTagged(u, "EntityNotManagedByRunner")
69-
}
70-
}
71-
7248
/**
7349
* Represents an error that occurs when a message fails to be properly
7450
* deserialized by an entity.

packages/cluster/src/ClusterMetrics.ts

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,6 @@
33
*/
44
import * as Metric from "effect/Metric"
55

6-
/**
7-
* @since 1.0.0
8-
* @category metrics
9-
*/
10-
export const shards = Metric.gauge("effect_cluster_shards")
11-
126
/**
137
* @since 1.0.0
148
* @category metrics
@@ -21,28 +15,30 @@ export const entities = Metric.gauge("effect_cluster_entities", {
2115
* @since 1.0.0
2216
* @category metrics
2317
*/
24-
export const singletons = Metric.gauge("effect_cluster_singletons")
25-
26-
/**
27-
* @since 1.0.0
28-
* @category metrics
29-
*/
30-
export const runners = Metric.gauge("effect_cluster_runners")
18+
export const singletons = Metric.gauge("effect_cluster_singletons", {
19+
bigint: true
20+
})
3121

3222
/**
3323
* @since 1.0.0
3424
* @category metrics
3525
*/
36-
export const assignedShards = Metric.gauge("effect_cluster_shards_assigned")
26+
export const runners = Metric.gauge("effect_cluster_runners", {
27+
bigint: true
28+
})
3729

3830
/**
3931
* @since 1.0.0
4032
* @category metrics
4133
*/
42-
export const unassignedShards = Metric.gauge("effect_cluster_shards_unassigned")
34+
export const runnersHealthy = Metric.gauge("effect_cluster_runners_healthy", {
35+
bigint: true
36+
})
4337

4438
/**
4539
* @since 1.0.0
4640
* @category metrics
4741
*/
48-
export const rebalances = Metric.counter("effect_cluster_rebalances")
42+
export const shards = Metric.gauge("effect_cluster_shards", {
43+
bigint: true
44+
})

packages/cluster/src/ClusterWorkflowEngine.ts

Lines changed: 38 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,27 @@
22
* @since 1.0.0
33
*/
44
import * as Rpc from "@effect/rpc/Rpc"
5+
import * as RpcServer from "@effect/rpc/RpcServer"
56
import { DurableDeferred } from "@effect/workflow"
67
import * as Activity from "@effect/workflow/Activity"
78
import * as DurableClock from "@effect/workflow/DurableClock"
89
import * as Workflow from "@effect/workflow/Workflow"
910
import { WorkflowEngine, WorkflowInstance } from "@effect/workflow/WorkflowEngine"
1011
import * as Arr from "effect/Array"
12+
import * as Cause from "effect/Cause"
1113
import * as Context from "effect/Context"
1214
import * as DateTime from "effect/DateTime"
1315
import * as Duration from "effect/Duration"
1416
import * as Effect from "effect/Effect"
1517
import type * as Exit from "effect/Exit"
1618
import * as Fiber from "effect/Fiber"
19+
import * as FiberId from "effect/FiberId"
1720
import * as Layer from "effect/Layer"
1821
import * as Option from "effect/Option"
1922
import type * as ParseResult from "effect/ParseResult"
2023
import * as PrimaryKey from "effect/PrimaryKey"
2124
import * as RcMap from "effect/RcMap"
22-
import * as Record from "effect/Record"
25+
import type * as Record from "effect/Record"
2326
import * as Runtime from "effect/Runtime"
2427
import * as Schedule from "effect/Schedule"
2528
import * as Schema from "effect/Schema"
@@ -190,16 +193,7 @@ export const make = Effect.gen(function*() {
190193
times: 3,
191194
schedule: Schedule.exponential(250)
192195
}),
193-
Effect.orDie,
194-
(effect, { activity, attempt, executionId }) =>
195-
Effect.withSpan(effect, "WorkflowEngine.resetActivityAttempt", {
196-
captureStackTrace: false,
197-
attributes: {
198-
name: activity.name,
199-
executionId,
200-
attempt
201-
}
202-
})
196+
Effect.orDie
203197
)
204198

205199
const clearClock = Effect.fnUntraced(function*(options: {
@@ -260,13 +254,12 @@ export const make = Effect.gen(function*() {
260254
return {
261255
run: (request: Entity.Request<any>) => {
262256
const instance = WorkflowInstance.initial(workflow, executionId)
263-
let payload = request.payload
257+
const payload = request.payload
264258
let parent: { workflowName: string; executionId: string } | undefined
265259
if (payload[payloadParentKey]) {
266260
parent = payload[payloadParentKey]
267-
payload = Record.remove(payload, payloadParentKey)
268261
}
269-
return execute(payload, executionId).pipe(
262+
return execute(workflow.payloadSchema.make(payload), executionId).pipe(
270263
Effect.ensuring(Effect.suspend(() => {
271264
if (!instance.suspended) {
272265
return parent ? ensureSuccess(sendResumeParent(parent)) : Effect.void
@@ -291,17 +284,17 @@ export const make = Effect.gen(function*() {
291284
) as any
292285
},
293286

294-
activity: Effect.fnUntraced(
295-
function*(request: Entity.Request<any>) {
296-
const activityId = `${executionId}/${request.payload.name}`
287+
activity(request: Entity.Request<any>) {
288+
const activityId = `${executionId}/${request.payload.name}`
289+
const instance = WorkflowInstance.initial(workflow, executionId)
290+
return Effect.gen(function*() {
297291
let entry = activities.get(activityId)
298292
while (!entry) {
299293
const latch = Effect.unsafeMakeLatch()
300294
activityLatches.set(activityId, latch)
301295
yield* latch.await
302296
entry = activities.get(activityId)
303297
}
304-
const instance = WorkflowInstance.initial(workflow, executionId)
305298
const contextMap = new Map(entry.runtime.context.unsafeMap)
306299
contextMap.set(Activity.CurrentAttempt.key, request.payload.attempt)
307300
contextMap.set(WorkflowInstance.key, instance)
@@ -311,23 +304,29 @@ export const make = Effect.gen(function*() {
311304
runtimeFlags: Runtime.defaultRuntimeFlags
312305
})
313306
return yield* entry.activity.executeEncoded.pipe(
314-
Effect.interruptible,
315-
Effect.onInterrupt(() => {
316-
instance.suspended = true
317-
return Effect.void
318-
}),
319-
Workflow.intoResult,
320-
Effect.provide(runtime),
321-
Effect.ensuring(Effect.sync(() => {
322-
activities.delete(activityId)
323-
}))
307+
Effect.provide(runtime)
324308
)
325-
},
326-
Rpc.wrap({
327-
fork: true,
328-
uninterruptible: true
329-
})
330-
),
309+
}).pipe(
310+
Workflow.intoResult,
311+
Effect.catchAllCause((cause) => {
312+
const interruptors = Cause.interruptors(cause)
313+
// we only want to store explicit interrupts
314+
const ids = Array.from(interruptors, (id) => Array.from(FiberId.ids(id))).flat()
315+
const suspend = ids.includes(RpcServer.fiberIdClientInterrupt.id) ||
316+
ids.includes(RpcServer.fiberIdTransientInterrupt.id)
317+
return suspend ? Effect.succeed(new Workflow.Suspended()) : Effect.failCause(cause)
318+
}),
319+
Effect.provideService(WorkflowInstance, instance),
320+
Effect.provideService(Activity.CurrentAttempt, request.payload.attempt),
321+
Effect.ensuring(Effect.sync(() => {
322+
activities.delete(activityId)
323+
})),
324+
Rpc.wrap({
325+
fork: true,
326+
uninterruptible: true
327+
})
328+
)
329+
},
331330

332331
deferred: Effect.fnUntraced(function*(request: Entity.Request<any>) {
333332
yield* ensureSuccess(resume(workflow, executionId))
@@ -407,27 +406,10 @@ export const make = Effect.gen(function*() {
407406
times: 3,
408407
schedule: Schedule.exponential(250)
409408
}),
410-
Effect.orDie,
411-
(effect, workflow, executionId) =>
412-
Effect.withSpan(effect, "WorkflowEngine.interrupt", {
413-
captureStackTrace: false,
414-
attributes: {
415-
name: workflow.name,
416-
executionId
417-
}
418-
})
409+
Effect.orDie
419410
),
420411

421-
resume: (workflow, executionId) =>
422-
ensureSuccess(resume(workflow, executionId)).pipe(
423-
Effect.withSpan("WorkflowEngine.resume", {
424-
captureStackTrace: false,
425-
attributes: {
426-
name: workflow.name,
427-
executionId
428-
}
429-
})
430-
),
412+
resume: (workflow, executionId) => ensureSuccess(resume(workflow, executionId)),
431413

432414
activityExecute: Effect.fnUntraced(
433415
function*({ activity, attempt }) {
@@ -460,15 +442,7 @@ export const make = Effect.gen(function*() {
460442
return result
461443
}
462444
},
463-
Effect.scoped,
464-
(effect, { activity, attempt }) =>
465-
Effect.withSpan(effect, "WorkflowEngine.activityExecute", {
466-
captureStackTrace: false,
467-
attributes: {
468-
name: activity.name,
469-
attempt
470-
}
471-
})
445+
Effect.scoped
472446
),
473447

474448
deferredResult: (deferred) =>
@@ -493,13 +467,7 @@ export const make = Effect.gen(function*() {
493467
times: 3,
494468
schedule: Schedule.exponential(250)
495469
}),
496-
Effect.orDie,
497-
Effect.withSpan("WorkflowEngine.deferredResult", {
498-
captureStackTrace: false,
499-
attributes: {
500-
name: deferred.name
501-
}
502-
})
470+
Effect.orDie
503471
),
504472

505473
deferredDone: Effect.fnUntraced(
@@ -512,15 +480,7 @@ export const make = Effect.gen(function*() {
512480
}, { discard: true })
513481
)
514482
},
515-
Effect.scoped,
516-
(effect, { deferredName, executionId }) =>
517-
Effect.withSpan(effect, "WorkflowEngine.deferredDone", {
518-
captureStackTrace: false,
519-
attributes: {
520-
name: deferredName,
521-
executionId
522-
}
523-
})
483+
Effect.scoped
524484
),
525485

526486
scheduleClock(options) {

packages/cluster/src/Entity.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,7 @@ import * as Predicate from "effect/Predicate"
2424
import type * as Schedule from "effect/Schedule"
2525
import { Scope } from "effect/Scope"
2626
import type * as Stream from "effect/Stream"
27-
import type {
28-
AlreadyProcessingMessage,
29-
EntityNotManagedByRunner,
30-
MailboxFull,
31-
PersistenceError
32-
} from "./ClusterError.js"
27+
import type { AlreadyProcessingMessage, MailboxFull, PersistenceError } from "./ClusterError.js"
3328
import { ShardGroup } from "./ClusterSchema.js"
3429
import { EntityAddress } from "./EntityAddress.js"
3530
import type { EntityId } from "./EntityId.js"
@@ -114,7 +109,7 @@ export interface Entity<
114109
entityId: string
115110
) => RpcClient.RpcClient.From<
116111
Rpcs,
117-
MailboxFull | AlreadyProcessingMessage | PersistenceError | EntityNotManagedByRunner
112+
MailboxFull | AlreadyProcessingMessage | PersistenceError
118113
>,
119114
never,
120115
Sharding

packages/cluster/src/EntityAddress.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,13 @@ export class EntityAddress extends Schema.Class<EntityAddress>(SymbolKey)({
6363
export const EntityAddressFromSelf: Schema.Schema<EntityAddress> = Schema.typeSchema(
6464
EntityAddress
6565
)
66+
67+
/**
68+
* @since 4.0.0
69+
* @category constructors
70+
*/
71+
export const make = (options: {
72+
readonly shardId: ShardId
73+
readonly entityType: EntityType
74+
readonly entityId: EntityId
75+
}): EntityAddress => new EntityAddress(options, { disableValidation: true })

0 commit comments

Comments
 (0)