Skip to content

Commit 1608849

Browse files
committed
🎉 feat: generator stream await
1 parent dfbf6d3 commit 1608849

File tree

7 files changed

+160
-25
lines changed

7 files changed

+160
-25
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
Feature:
33
- [#1453](https://github.com/elysiajs/elysia/issues/1453) add `allowUnsafeValidationDetails` for disabling unsafe validation details in production mode
44
- allow rapid stream in non browser mode or `ELYSIA_RAPID_STREAM` is set
5+
- `afterResponse` now wait for generator stream to finish
6+
- trace of `handle`, and `afterResponse` now wait for generator stream to finish
57

68
Bug fix:
79
- [#1502](https://github.com/elysiajs/elysia/issues/1502), [#1501](https://github.com/elysiajs/elysia/issues/1501) afterHandle doesn't update status

example/a.ts

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,22 @@
11
import { Elysia, file } from '../src'
2+
import { delay, req } from '../test/utils'
23

3-
const app = new Elysia({
4-
allowUnsafeValidationDetails: true
5-
})
6-
.get('/', file('test/images/aris-yuzu.jpg'))
4+
const app = new Elysia()
5+
.trace(({ onHandle, onAfterResponse }) => {
6+
onHandle(({ onStop }) => {
7+
onStop(({ error }) => {
8+
console.log("DONE")
9+
})
10+
})
11+
12+
onAfterResponse(() => {
13+
console.log("DONE 2")
14+
})
15+
})
16+
.get('/', async function* () {
17+
for (let i = 0; i < 1000; i++) {
18+
yield `${i}`
19+
await delay(1)
20+
}
21+
})
722
.listen(3000)

src/adapter/utils.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,3 +397,37 @@ export const createResponseHandler = (handler: CreateHandlerParameter) => {
397397
return response
398398
}
399399
}
400+
401+
export async function tee<T>(
402+
source: AsyncIterable<T>,
403+
branches = 2
404+
): Promise<AsyncIterableIterator<T>[]> {
405+
const buffer: T[] = []
406+
let done = false
407+
let waiting: { resolve: () => void }[] = []
408+
409+
;(async () => {
410+
for await (const value of source) {
411+
buffer.push(value)
412+
waiting.forEach((w) => w.resolve())
413+
waiting = []
414+
}
415+
done = true
416+
waiting.forEach((w) => w.resolve())
417+
})()
418+
419+
async function* makeIterator(): AsyncIterableIterator<T> {
420+
let i = 0
421+
while (true) {
422+
if (i < buffer.length) {
423+
yield buffer[i++]
424+
} else if (done) {
425+
return
426+
} else {
427+
await new Promise<void>((resolve) => waiting.push({ resolve }))
428+
}
429+
}
430+
}
431+
432+
return Array.from({ length: branches }, makeIterator)
433+
}

src/compose.ts

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import type {
6262
LifeCycleStore,
6363
SchemaValidator
6464
} from './types'
65+
import { tee } from './adapter/utils'
6566

6667
const allocateIf = (value: string, condition: unknown) =>
6768
condition ? value : ''
@@ -108,15 +109,19 @@ const createReport = ({
108109
`let trace${i}=${context}[ELYSIA_TRACE]?.[${i}]??trace[${i}](${context});\n`
109110
)
110111

112+
// const aliases: string[] = []
113+
111114
return (
112115
event: TraceEvent,
113116
{
114117
name,
115-
total = 0
118+
total = 0,
119+
alias
116120
}: {
117121
name?: string
118122
attribute?: string
119123
total?: number
124+
alias?: string
120125
} = {}
121126
) => {
122127
// ? For debug specific event
@@ -132,9 +137,9 @@ const createReport = ({
132137

133138
const reporter = event === 'error' ? 'reportErr' : 'report'
134139

135-
for (let i = 0; i < trace.length; i++)
140+
for (let i = 0; i < trace.length; i++) {
136141
addFn(
137-
`${reporter}${i} = trace${i}.${event}({` +
142+
`${alias ? 'const ' : ''}${alias ?? reporter}${i}=trace${i}.${event}({` +
138143
`id,` +
139144
`event:'${event}',` +
140145
`name:'${name}',` +
@@ -143,13 +148,29 @@ const createReport = ({
143148
`})\n`
144149
)
145150

151+
if (alias) addFn(`${reporter}${i}=${alias}${i}\n`)
152+
}
153+
154+
// if (event === 'error')
155+
// for (const alias of aliases)
156+
// for (let i = 0; i < trace.length; i++)
157+
// addFn(
158+
// `const ${alias}Err${i}=trace${i}.${event}({` +
159+
// `id,` +
160+
// `event:'${event}',` +
161+
// `name:'${name}',` +
162+
// `begin:performance.now(),` +
163+
// `total:${total}` +
164+
// `})\n`
165+
// )
166+
146167
return {
147168
resolve() {
148169
for (let i = 0; i < trace.length; i++)
149-
addFn(`${reporter}${i}.resolve()\n`)
170+
addFn(`${alias ?? reporter}${i}.resolve()\n`)
150171
},
151172
resolveChild(name: string) {
152-
for (let i = 0; i < trace.length; i++)
173+
for (let i = 0; i < trace.length; i++) {
153174
addFn(
154175
`${reporter}Child${i}=${reporter}${i}.resolveChild?.shift()?.({` +
155176
`id,` +
@@ -158,6 +179,7 @@ const createReport = ({
158179
`begin:performance.now()` +
159180
`})\n`
160181
)
182+
}
161183

162184
return (binding?: string) => {
163185
for (let i = 0; i < trace.length; i++) {
@@ -773,9 +795,8 @@ export const composeHandler = ({
773795
`\nsetImmediate(async()=>{` +
774796
`if(c.responseValue){` +
775797
`if(c.responseValue instanceof ElysiaCustomStatusResponse) c.set.status=c.responseValue.code\n` +
776-
`else if(c.responseValue[Symbol.iterator]) for (const v of c.responseValue) { }` +
777-
`else if(c.responseValue[Symbol.asyncIterator]) for await (const v of c.responseValue) { }` +
778-
`}`
798+
`if(afterHandlerStreamListener)for await(const v of afterHandlerStreamListener){}\n` +
799+
`}\n`
779800

780801
const reporter = createReport({
781802
trace: hooks.trace,
@@ -819,6 +840,8 @@ export const composeHandler = ({
819840
: ''
820841

821842
if (hasTrace || inference.route) fnLiteral += `c.route=\`${path}\`\n`
843+
if (hasTrace || hooks.afterResponse?.length)
844+
fnLiteral += 'let afterHandlerStreamListener\n'
822845

823846
const parseReporter = report('parse', {
824847
total: hooks.parse?.length
@@ -1707,11 +1730,38 @@ export const composeHandler = ({
17071730
reporter.resolve()
17081731
}
17091732

1710-
if (hooks.afterHandle?.length || hasTrace) {
1733+
function reportHandler(name: string | undefined) {
17111734
const handleReporter = report('handle', {
1712-
name: isHandleFn ? (handler as Function).name : undefined
1735+
name,
1736+
alias: 'reportHandler'
17131737
})
17141738

1739+
return () => {
1740+
if (hasTrace) {
1741+
fnLiteral +=
1742+
`if(r&&(r[Symbol.iterator]||r[Symbol.asyncIterator])&&typeof r.next==="function"){` +
1743+
(maybeAsync ? '' : `(async()=>{`) +
1744+
`const stream=await tee(r,3)\n` +
1745+
`r=stream[0]\n` +
1746+
`const listener=stream[1]\n` +
1747+
(hasTrace || hooks.afterResponse?.length
1748+
? `afterHandlerStreamListener=stream[2]\n`
1749+
: '') +
1750+
`setImmediate(async ()=>{` +
1751+
`if(listener)for await(const v of listener){}\n`
1752+
handleReporter.resolve()
1753+
fnLiteral += `})` + (maybeAsync ? '' : `})()`) + `}else{`
1754+
handleReporter.resolve()
1755+
fnLiteral += '}\n'
1756+
}
1757+
}
1758+
}
1759+
1760+
if (hooks.afterHandle?.length || hasTrace) {
1761+
const resolveHandler = reportHandler(
1762+
isHandleFn ? (handler as Function).name : undefined
1763+
)
1764+
17151765
if (hooks.afterHandle?.length)
17161766
fnLiteral += isAsyncHandler
17171767
? `let r=c.response=c.responseValue=await ${handle}\n`
@@ -1721,7 +1771,7 @@ export const composeHandler = ({
17211771
? `let r=await ${handle}\n`
17221772
: `let r=${handle}\n`
17231773

1724-
handleReporter.resolve()
1774+
resolveHandler()
17251775

17261776
const reporter = report('afterHandle', {
17271777
total: hooks.afterHandle?.length
@@ -1795,16 +1845,16 @@ export const composeHandler = ({
17951845

17961846
fnLiteral += mapResponse()
17971847
} else {
1798-
const handleReporter = report('handle', {
1799-
name: isHandleFn ? (handler as Function).name : undefined
1800-
})
1848+
const resolveHandler = reportHandler(
1849+
isHandleFn ? (handler as Function).name : undefined
1850+
)
18011851

18021852
if (validator.response || hooks.mapResponse?.length || hasTrace) {
18031853
fnLiteral += isAsyncHandler
18041854
? `let r=await ${handle}\n`
18051855
: `let r=${handle}\n`
18061856

1807-
handleReporter.resolve()
1857+
resolveHandler()
18081858

18091859
if (validator.response) fnLiteral += validation.response()
18101860

@@ -1856,7 +1906,7 @@ export const composeHandler = ({
18561906
? `let r=await ${handle}\n`
18571907
: `let r=${handle}\n`
18581908

1859-
handleReporter.resolve()
1909+
resolveHandler()
18601910

18611911
const mapResponseReporter = report('mapResponse', {
18621912
total: hooks.mapResponse?.length
@@ -1884,7 +1934,7 @@ export const composeHandler = ({
18841934

18851935
fnLiteral += encodeCookie() + mapResponse()
18861936
} else {
1887-
handleReporter.resolve()
1937+
resolveHandler()
18881938

18891939
const handled = isAsyncHandler ? `await ${handle}` : handle
18901940

@@ -2019,6 +2069,7 @@ export const composeHandler = ({
20192069
`fileType,` +
20202070
`schema,` +
20212071
`definitions,` +
2072+
`tee,` +
20222073
`ERROR_CODE,` +
20232074
allocateIf(`parseCookie,`, hasCookie) +
20242075
allocateIf(`signCookie,`, hasCookie) +
@@ -2073,6 +2124,7 @@ export const composeHandler = ({
20732124
schema: app.router.history,
20742125
// @ts-expect-error
20752126
definitions: app.definitions.type,
2127+
tee,
20762128
ERROR_CODE,
20772129
parseCookie: hasCookie ? parseCookie : undefined,
20782130
signCookie: hasCookie ? signCookie : undefined,

test/lifecycle/before-handle.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ describe('Before Handle', () => {
283283
beforeHandle: ({ status }) =>
284284
status(401, 'unauthorized beforeHandle'),
285285
afterResponse: ({ responseValue }) => {
286+
console.log("Q")
286287
hasAfterResponseResponse = !!responseValue
287288
}
288289
}

test/tracer/timing.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Elysia } from '../../src'
33
import { describe, expect, it } from 'bun:test'
44
import { post, req } from '../utils'
55

6-
const delay = (delay = 10) =>
6+
const delay = (delay = 7) =>
77
new Promise((resolve) => setTimeout(resolve, delay))
88

99
describe('Trace Timing', async () => {

test/tracer/trace.test.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { describe, expect, it } from 'bun:test'
22
import { Elysia, type TraceProcess, type TraceEvent } from '../../src'
3-
import { req } from '../utils'
3+
import { delay, req } from '../utils'
44

55
describe('trace', () => {
66
it('inherits plugin', async () => {
@@ -84,7 +84,7 @@ describe('trace', () => {
8484
'beforeHandle',
8585
'handle',
8686
'afterHandle',
87-
'mapResponse',
87+
'mapResponse'
8888
// afterResponse is being called so we can't check it yet
8989
])
9090
})
@@ -154,7 +154,7 @@ describe('trace', () => {
154154
'beforeHandle',
155155
'handle',
156156
'afterHandle',
157-
'mapResponse',
157+
'mapResponse'
158158
// afterResponse is being called so we can't check it yet
159159
])
160160
})
@@ -475,4 +475,35 @@ describe('trace', () => {
475475

476476
expect(route).toBe('/id/:id')
477477
})
478+
479+
it('defers stream for onHandle, and onAfterResponse', async () => {
480+
const order = <string[]>[]
481+
482+
const app = new Elysia()
483+
.trace(({ onHandle, onAfterResponse }) => {
484+
onHandle(({ onStop }) => {
485+
onStop(({ error }) => {
486+
order.push('HANDLE')
487+
})
488+
})
489+
490+
onAfterResponse(() => {
491+
order.push('AFTER')
492+
})
493+
})
494+
.get('/', async function* () {
495+
for (let i = 0; i < 5; i++) {
496+
yield `${i}`
497+
await delay(1)
498+
}
499+
})
500+
501+
expect(order).toEqual([])
502+
503+
await app.handle(req('/'))
504+
expect(order).toEqual([])
505+
506+
await delay(10)
507+
expect(order).toEqual(['HANDLE', 'AFTER'])
508+
})
478509
})

0 commit comments

Comments
 (0)