0
0
mirror of https://github.com/honojs/hono.git synced 2024-11-21 18:18:57 +01:00

fix(streaming): call stream.abort() explicitly when request is aborted (#3042)

* feat(utils/stream): enable to abort streaming manually

* feat(utils/stream): prevent multiple aborts, and enable to get the abort status

* fix(streaming): call `stream.abort()` explicitly when request is aborted

* test: add tests for streaming

* docs(stream): add comments

* test: add --allow-net to deno test command in ci.yml

* test(streaming): update test code

* test(stream): retry flaky test up to 3 times at "bun"

* test(streaming): refactor test to use afterEach

* fix(streaming): in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming

* refactor(streaming): tweaks code layout
This commit is contained in:
Taku Amano 2024-06-29 07:00:28 +09:00 committed by GitHub
parent a6ad42d03b
commit 2d3bc55954
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 318 additions and 4 deletions

View File

@ -77,7 +77,7 @@ jobs:
- uses: denoland/setup-deno@v1
with:
deno-version: v1.x
- run: env NAME=Deno deno test --coverage=coverage/raw/deno-runtime --allow-read --allow-env --allow-write -c runtime_tests/deno/deno.json runtime_tests/deno
- run: env NAME=Deno deno test --coverage=coverage/raw/deno-runtime --allow-read --allow-env --allow-write --allow-net -c runtime_tests/deno/deno.json runtime_tests/deno
- run: deno test -c runtime_tests/deno-jsx/deno.precompile.json --coverage=coverage/raw/deno-precompile-jsx runtime_tests/deno-jsx
- run: deno test -c runtime_tests/deno-jsx/deno.react-jsx.json --coverage=coverage/raw/deno-react-jsx runtime_tests/deno-jsx
- uses: actions/upload-artifact@v4

View File

@ -12,7 +12,7 @@
"scripts": {
"test": "tsc --noEmit && vitest --run && vitest -c .vitest.config/jsx-runtime-default.ts --run && vitest -c .vitest.config/jsx-runtime-dom.ts --run",
"test:watch": "vitest --watch",
"test:deno": "deno test --allow-read --allow-env --allow-write -c runtime_tests/deno/deno.json runtime_tests/deno && deno test --no-lock -c runtime_tests/deno-jsx/deno.precompile.json runtime_tests/deno-jsx && deno test --no-lock -c runtime_tests/deno-jsx/deno.react-jsx.json runtime_tests/deno-jsx",
"test:deno": "deno test --allow-read --allow-env --allow-write --allow-net -c runtime_tests/deno/deno.json runtime_tests/deno && deno test --no-lock -c runtime_tests/deno-jsx/deno.precompile.json runtime_tests/deno-jsx && deno test --no-lock -c runtime_tests/deno-jsx/deno.react-jsx.json runtime_tests/deno-jsx",
"test:bun": "bun test --jsx-import-source ../../src/jsx runtime_tests/bun/index.test.tsx",
"test:fastly": "vitest --run --config ./runtime_tests/fastly/vitest.config.ts",
"test:node": "vitest --run --config ./runtime_tests/node/vitest.config.ts",

View File

@ -1,4 +1,4 @@
import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'
import { afterAll, afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { serveStatic, toSSG } from '../../src/adapter/bun'
import { createBunWebSocket } from '../../src/adapter/bun/websocket'
import type { BunWebSocketData } from '../../src/adapter/bun/websocket'
@ -11,6 +11,7 @@ import { jsx } from '../../src/jsx'
import { basicAuth } from '../../src/middleware/basic-auth'
import { jwt } from '../../src/middleware/jwt'
import { HonoRequest } from '../../src/request'
import { stream, streamSSE } from '../..//src/helper/streaming'
// Test just only minimal patterns.
// Because others are tested well in Cloudflare Workers environment already.
@ -316,3 +317,74 @@ async function deleteDirectory(dirPath) {
await fs.unlink(dirPath)
}
}
describe('streaming', () => {
const app = new Hono()
let server: ReturnType<typeof Bun.serve>
let aborted = false
app.get('/stream', (c) => {
return stream(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
return new Promise<void>((resolve) => {
stream.onAbort(resolve)
})
})
})
app.get('/streamSSE', (c) => {
return streamSSE(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
return new Promise<void>((resolve) => {
stream.onAbort(resolve)
})
})
})
beforeEach(() => {
aborted = false
server = Bun.serve({ port: 0, fetch: app.fetch })
})
afterEach(() => {
server.stop()
})
describe('stream', () => {
it('Should call onAbort', async () => {
const ac = new AbortController()
const req = new Request(`http://localhost:${server.port}/stream`, {
signal: ac.signal,
})
expect(aborted).toBe(false)
const res = fetch(req).catch(() => {})
await new Promise((resolve) => setTimeout(resolve, 10))
ac.abort()
await res
while (!aborted) {
await new Promise((resolve) => setTimeout(resolve))
}
expect(aborted).toBe(true)
})
})
describe('streamSSE', () => {
it('Should call onAbort', async () => {
const ac = new AbortController()
const req = new Request(`http://localhost:${server.port}/streamSSE`, {
signal: ac.signal,
})
const res = fetch(req).catch(() => {})
await new Promise((resolve) => setTimeout(resolve, 10))
ac.abort()
await res
while (!aborted) {
await new Promise((resolve) => setTimeout(resolve))
}
expect(aborted).toBe(true)
})
})
})

View File

@ -0,0 +1,69 @@
import { Hono } from '../../src/hono.ts'
import { assertEquals } from './deps.ts'
import { stream, streamSSE } from '../../src/helper/streaming/index.ts'
Deno.test('Shuld call onAbort via stream', async () => {
const app = new Hono()
let aborted = false
app.get('/stream', (c) => {
return stream(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
return new Promise<void>((resolve) => {
stream.onAbort(resolve)
})
})
})
const server = Deno.serve({ port: 0 }, app.fetch)
const ac = new AbortController()
const req = new Request(`http://localhost:${server.addr.port}/stream`, {
signal: ac.signal,
})
assertEquals
const res = fetch(req).catch(() => {})
assertEquals(aborted, false)
await new Promise((resolve) => setTimeout(resolve, 10))
ac.abort()
await res
while (!aborted) {
await new Promise((resolve) => setTimeout(resolve))
}
assertEquals(aborted, true)
await server.shutdown()
})
Deno.test('Shuld call onAbort via streamSSE', async () => {
const app = new Hono()
let aborted = false
app.get('/stream', (c) => {
return streamSSE(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
return new Promise<void>((resolve) => {
stream.onAbort(resolve)
})
})
})
const server = Deno.serve({ port: 0 }, app.fetch)
const ac = new AbortController()
const req = new Request(`http://localhost:${server.addr.port}/stream`, {
signal: ac.signal,
})
assertEquals
const res = fetch(req).catch(() => {})
assertEquals(aborted, false)
await new Promise((resolve) => setTimeout(resolve, 10))
ac.abort()
await res
while (!aborted) {
await new Promise((resolve) => setTimeout(resolve))
}
assertEquals(aborted, true)
await server.shutdown()
})

View File

@ -6,6 +6,7 @@ import { env, getRuntimeKey } from '../../src/helper/adapter'
import { basicAuth } from '../../src/middleware/basic-auth'
import { jwt } from '../../src/middleware/jwt'
import { HonoRequest } from '../../src/request'
import { stream, streamSSE } from '../../src/helper/streaming'
// Test only minimal patterns.
// See <https://github.com/honojs/node-server> for more tests and information.
@ -96,3 +97,69 @@ describe('JWT Auth Middleware', () => {
expect(res.text).toBe('auth')
})
})
describe('stream', () => {
const app = new Hono()
let aborted = false
app.get('/stream', (c) => {
return stream(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
return new Promise<void>((resolve) => {
stream.onAbort(resolve)
})
})
})
const server = createAdaptorServer(app)
it('Should call onAbort', async () => {
const req = request(server)
.get('/stream')
.end(() => {})
expect(aborted).toBe(false)
await new Promise((resolve) => setTimeout(resolve, 10))
req.abort()
while (!aborted) {
await new Promise((resolve) => setTimeout(resolve))
}
expect(aborted).toBe(true)
})
})
describe('streamSSE', () => {
const app = new Hono()
let aborted = false
app.get('/stream', (c) => {
return streamSSE(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
return new Promise<void>((resolve) => {
stream.onAbort(resolve)
})
})
})
const server = createAdaptorServer(app)
it('Should call onAbort', async () => {
const req = request(server)
.get('/stream')
.end(() => {})
expect(aborted).toBe(false)
await new Promise((resolve) => setTimeout(resolve, 10))
req.abort()
while (!aborted) {
await new Promise((resolve) => setTimeout(resolve))
}
expect(aborted).toBe(true)
})
})

View File

@ -73,6 +73,33 @@ describe('SSE Streaming helper', () => {
expect(aborted).toBeTruthy()
})
it('Check streamSSE Response if aborted by abort signal', async () => {
const ac = new AbortController()
const req = new Request('http://localhost/', { signal: ac.signal })
const c = new Context(req)
let aborted = false
const res = streamSSE(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
for (let i = 0; i < 3; i++) {
await stream.writeSSE({
data: `Message ${i}`,
})
await stream.sleep(1)
}
})
if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
const { value } = await reader.read()
expect(value).toEqual(new TextEncoder().encode('data: Message 0\n\n'))
ac.abort()
expect(aborted).toBeTruthy()
})
it('Should include retry in the SSE message', async () => {
const retryTime = 3000 // 3 seconds
const res = streamSSE(c, async (stream) => {

View File

@ -58,6 +58,7 @@ const run = async (
}
}
const contextStash = new WeakMap<ReadableStream, Context>()
export const streamSSE = (
c: Context,
cb: (stream: SSEStreamingApi) => Promise<void>,
@ -66,6 +67,13 @@ export const streamSSE = (
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable, readable)
// bun does not cancel response stream when request is canceled, so detect abort by signal
c.req.raw.signal.addEventListener('abort', () => {
stream.abort()
})
// in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming
contextStash.set(stream.responseReadable, c)
c.header('Transfer-Encoding', 'chunked')
c.header('Content-Type', 'text/event-stream')
c.header('Cache-Control', 'no-cache')

View File

@ -46,6 +46,31 @@ describe('Basic Streaming Helper', () => {
expect(aborted).toBeTruthy()
})
it('Check stream Response if aborted by abort signal', async () => {
const ac = new AbortController()
const req = new Request('http://localhost/', { signal: ac.signal })
const c = new Context(req)
let aborted = false
const res = stream(c, async (stream) => {
stream.onAbort(() => {
aborted = true
})
for (let i = 0; i < 3; i++) {
await stream.write(new Uint8Array([i]))
await stream.sleep(1)
}
})
if (!res.body) {
throw new Error('Body is null')
}
const reader = res.body.getReader()
const { value } = await reader.read()
expect(value).toEqual(new Uint8Array([0]))
ac.abort()
expect(aborted).toBeTruthy()
})
it('Check stream Response if error occurred', async () => {
const onError = vi.fn()
const res = stream(

View File

@ -1,6 +1,7 @@
import type { Context } from '../../context'
import { StreamingApi } from '../../utils/stream'
const contextStash = new WeakMap<ReadableStream, Context>()
export const stream = (
c: Context,
cb: (stream: StreamingApi) => Promise<void>,
@ -8,6 +9,13 @@ export const stream = (
): Response => {
const { readable, writable } = new TransformStream()
const stream = new StreamingApi(writable, readable)
// bun does not cancel response stream when request is canceled, so detect abort by signal
c.req.raw.signal.addEventListener('abort', () => {
stream.abort()
})
// in bun, `c` is destroyed when the request is returned, so hold it until the end of streaming
contextStash.set(stream.responseReadable, c)
;(async () => {
try {
await cb(stream)
@ -21,5 +29,6 @@ export const stream = (
stream.close()
}
})()
return c.newResponse(stream.responseReadable)
}

View File

@ -96,4 +96,26 @@ describe('StreamingApi', () => {
expect(handleAbort1).toBeCalled()
expect(handleAbort2).toBeCalled()
})
it('abort()', async () => {
const { readable, writable } = new TransformStream()
const handleAbort1 = vi.fn()
const handleAbort2 = vi.fn()
const api = new StreamingApi(writable, readable)
api.onAbort(handleAbort1)
api.onAbort(handleAbort2)
expect(handleAbort1).not.toBeCalled()
expect(handleAbort2).not.toBeCalled()
expect(api.aborted).toBe(false)
api.abort()
expect(handleAbort1).toHaveBeenCalledOnce()
expect(handleAbort2).toHaveBeenCalledOnce()
expect(api.aborted).toBe(true)
api.abort()
expect(handleAbort1).toHaveBeenCalledOnce()
expect(handleAbort2).toHaveBeenCalledOnce()
expect(api.aborted).toBe(true)
})
})

View File

@ -9,6 +9,10 @@ export class StreamingApi {
private writable: WritableStream
private abortSubscribers: (() => void | Promise<void>)[] = []
responseReadable: ReadableStream
/**
* Whether the stream has been aborted.
*/
aborted: boolean = false
constructor(writable: WritableStream, _readable: ReadableStream) {
this.writable = writable
@ -30,7 +34,7 @@ export class StreamingApi {
done ? controller.close() : controller.enqueue(value)
},
cancel: () => {
this.abortSubscribers.forEach((subscriber) => subscriber())
this.abort()
},
})
}
@ -73,4 +77,15 @@ export class StreamingApi {
onAbort(listener: () => void | Promise<void>) {
this.abortSubscribers.push(listener)
}
/**
* Abort the stream.
* You can call this method when stream is aborted by external event.
*/
abort() {
if (!this.aborted) {
this.aborted = true
this.abortSubscribers.forEach((subscriber) => subscriber())
}
}
}