0
0
mirror of https://github.com/honojs/hono.git synced 2024-12-01 11:51:01 +01:00
hono/deno_dist/helper/streaming/index.ts
watany 7adae90fd7
fix(helper): Stream SSE Helper Non-Closure (#1650)
* fix(helper): Stream SSE Helper Non-Closure

* denoify

* test fixed.
2023-11-08 20:54:02 +09:00

60 lines
1.4 KiB
TypeScript

import type { Context } from '../../context.ts'
import { StreamingApi } from '../../utils/stream.ts'
interface SSEMessage {
data: string
event?: string
id?: string
}
class SSEStreamingApi extends StreamingApi {
constructor(writable: WritableStream) {
super(writable)
}
async writeSSE(message: SSEMessage) {
const data = message.data
.split('\n')
.map((line) => {
return `data: ${line}`
})
.join('\n')
const sseData =
[message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`]
.filter(Boolean)
.join('\n') + '\n\n'
await this.write(sseData)
}
}
const setSSEHeaders = (context: Context) => {
context.header('Transfer-Encoding', 'chunked')
context.header('Content-Type', 'text/event-stream')
context.header('Cache-Control', 'no-cache')
context.header('Connection', 'keep-alive')
}
export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise<void>) => {
return c.stream(async (originalStream: StreamingApi) => {
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable)
originalStream.pipe(readable).catch((err) => {
console.error('Error in stream piping: ', err)
stream.close()
})
setSSEHeaders(c)
try {
await cb(stream)
} catch (err) {
console.error('Error during streaming: ', err)
} finally {
await stream.close()
}
})
}