0
0
mirror of https://github.com/honojs/hono.git synced 2024-11-29 17:46:30 +01:00
hono/deno_dist/helper/streaming/sse.ts
Sor4chi 7768865e7a
fix: move c.stream* to helper (#1846)
* refactor: move streaming helper to `streaming/sse.ts`

* feat: add streamSSE's export in streaming handler

* feat: move `stream` and `streamText` to streaming helper

* chore: add deprecated expression for `c.stream` and `c.streamText`

* fix: use `stream` helper in streamSSE

* refactor: move `streamText` to `text.ts`

* test: add some case to `stream` and `streamText`

* test: refactor `streamSSE` case for uniformity

* chore: denoify

* fix: update jsdoc's deprecated description, simplify `c.stream` and `c.streamText`

* fix: match the header notation with that of `streamSSE

* chore: denoify

* refactor: remove unnecesary export
2023-12-27 06:55:24 +09:00

61 lines
1.5 KiB
TypeScript

import type { Context } from '../../context.ts'
import { StreamingApi } from '../../utils/stream.ts'
import { stream } from './index.ts'
interface SSEMessage {
data: string
event?: string
id?: string
}
class SSEStreamingApi extends StreamingApi {
constructor(writable: WritableStream) {
super(writable)
}
async writeSSE(message: SSEMessage) {
const data = message.data
.split('\n')
.map((line) => {
return `data: ${line}`
})
.join('\n')
const sseData =
[message.event && `event: ${message.event}`, data, message.id && `id: ${message.id}`]
.filter(Boolean)
.join('\n') + '\n\n'
await this.write(sseData)
}
}
const setSSEHeaders = (context: Context) => {
context.header('Transfer-Encoding', 'chunked')
context.header('Content-Type', 'text/event-stream')
context.header('Cache-Control', 'no-cache')
context.header('Connection', 'keep-alive')
}
export const streamSSE = (c: Context, cb: (stream: SSEStreamingApi) => Promise<void>) => {
return stream(c, async (originalStream: StreamingApi) => {
const { readable, writable } = new TransformStream()
const stream = new SSEStreamingApi(writable)
originalStream.pipe(readable).catch((err) => {
console.error('Error in stream piping: ', err)
stream.close()
})
setSSEHeaders(c)
try {
await cb(stream)
} catch (err) {
console.error('Error during streaming: ', err)
} finally {
await stream.close()
}
})
}