mirror of
https://github.com/honojs/hono.git
synced 2024-11-21 18:18:57 +01:00
feat(adaptor): Support AWS Function URL Streaming (#1625)
* 0.1 * lint * stream * type safe * ignore * lint * test Node.writablestream * 1.0 * fixed pipeline * fixed
This commit is contained in:
parent
88e89a46a8
commit
8b4392fa36
@ -1,18 +1,53 @@
|
||||
import { Readable } from 'stream'
|
||||
import type {
|
||||
ApiGatewayRequestContext,
|
||||
LambdaFunctionUrlRequestContext,
|
||||
} from '../../src/adapter/aws-lambda/custom-context'
|
||||
import { handle } from '../../src/adapter/aws-lambda/handler'
|
||||
import { handle, streamHandle } from '../../src/adapter/aws-lambda/handler'
|
||||
import type { LambdaContext } from '../../src/adapter/aws-lambda/types'
|
||||
import { getCookie, setCookie } from '../../src/helper/cookie'
|
||||
import { streamSSE } from '../../src/helper/streaming'
|
||||
import { Hono } from '../../src/hono'
|
||||
import { basicAuth } from '../../src/middleware/basic-auth'
|
||||
import './mock'
|
||||
|
||||
type Bindings = {
|
||||
lambdaContext: LambdaContext
|
||||
requestContext: ApiGatewayRequestContext | LambdaFunctionUrlRequestContext
|
||||
}
|
||||
|
||||
const testLambdaFunctionUrlRequestContext = {
|
||||
accountId: '123456789012',
|
||||
apiId: 'urlid',
|
||||
authentication: null,
|
||||
authorizer: {
|
||||
iam: {
|
||||
accessKey: 'AKIA...',
|
||||
accountId: '111122223333',
|
||||
callerId: 'AIDA...',
|
||||
cognitoIdentity: null,
|
||||
principalOrgId: null,
|
||||
userArn: 'arn:aws:iam::111122223333:user/example-user',
|
||||
userId: 'AIDA...',
|
||||
},
|
||||
},
|
||||
domainName: 'example.com',
|
||||
domainPrefix: '<url-id>',
|
||||
http: {
|
||||
method: 'POST',
|
||||
path: '/my/path',
|
||||
protocol: 'HTTP/1.1',
|
||||
sourceIp: '123.123.123.123',
|
||||
userAgent: 'agent',
|
||||
},
|
||||
requestId: 'id',
|
||||
routeKey: '$default',
|
||||
stage: '$default',
|
||||
time: '12/Mar/2020:19:03:58 +0000',
|
||||
timeEpoch: 1583348638390,
|
||||
customProperty: 'customValue',
|
||||
}
|
||||
|
||||
describe('AWS Lambda Adapter for Hono', () => {
|
||||
const app = new Hono<{ Bindings: Bindings }>()
|
||||
|
||||
@ -112,38 +147,6 @@ describe('AWS Lambda Adapter for Hono', () => {
|
||||
customProperty: 'customValue',
|
||||
}
|
||||
|
||||
const testLambdaFunctionUrlRequestContext = {
|
||||
accountId: '123456789012',
|
||||
apiId: 'urlid',
|
||||
authentication: null,
|
||||
authorizer: {
|
||||
iam: {
|
||||
accessKey: 'AKIA...',
|
||||
accountId: '111122223333',
|
||||
callerId: 'AIDA...',
|
||||
cognitoIdentity: null,
|
||||
principalOrgId: null,
|
||||
userArn: 'arn:aws:iam::111122223333:user/example-user',
|
||||
userId: 'AIDA...',
|
||||
},
|
||||
},
|
||||
domainName: 'example.com',
|
||||
domainPrefix: '<url-id>',
|
||||
http: {
|
||||
method: 'POST',
|
||||
path: '/my/path',
|
||||
protocol: 'HTTP/1.1',
|
||||
sourceIp: '123.123.123.123',
|
||||
userAgent: 'agent',
|
||||
},
|
||||
requestId: 'id',
|
||||
routeKey: '$default',
|
||||
stage: '$default',
|
||||
time: '12/Mar/2020:19:03:58 +0000',
|
||||
timeEpoch: 1583348638390,
|
||||
customProperty: 'customValue',
|
||||
}
|
||||
|
||||
it('Should handle a GET request and return a 200 response', async () => {
|
||||
const event = {
|
||||
httpMethod: 'GET',
|
||||
@ -347,7 +350,7 @@ describe('AWS Lambda Adapter for Hono', () => {
|
||||
expect(JSON.parse(response.body).callbackWaitsForEmptyEventLoop).toEqual(false)
|
||||
})
|
||||
|
||||
it('Shoul handle a POST request and return a 200 response with cookies set (APIGatewayProxyEvent V1 and V2)', async () => {
|
||||
it('Should handle a POST request and return a 200 response with cookies set (APIGatewayProxyEvent V1 and V2)', async () => {
|
||||
const apiGatewayEvent = {
|
||||
httpMethod: 'POST',
|
||||
headers: { 'content-type': 'text/plain' },
|
||||
@ -384,7 +387,7 @@ describe('AWS Lambda Adapter for Hono', () => {
|
||||
])
|
||||
})
|
||||
|
||||
it('Shoul handle a POST request and return a 200 response if cookies match (APIGatewayProxyEvent V1 and V2)', async () => {
|
||||
it('Should handle a POST request and return a 200 response if cookies match (APIGatewayProxyEvent V1 and V2)', async () => {
|
||||
const apiGatewayEvent = {
|
||||
httpMethod: 'GET',
|
||||
headers: {
|
||||
@ -423,3 +426,68 @@ describe('AWS Lambda Adapter for Hono', () => {
|
||||
expect(apiGatewayResponseV2.isBase64Encoded).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
describe('streamHandle function', () => {
|
||||
const app = new Hono<{ Bindings: Bindings }>()
|
||||
|
||||
app.get('/', (c) => {
|
||||
return c.text('Hello Lambda!')
|
||||
})
|
||||
|
||||
app.get('/stream/text', async (c) => {
|
||||
return c.streamText(async (stream) => {
|
||||
for (let i = 0; i < 3; i++) {
|
||||
await stream.writeln(`${i}`)
|
||||
await stream.sleep(1)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
app.get('/sse', async (c) => {
|
||||
return streamSSE(c, async (stream) => {
|
||||
let id = 0
|
||||
const maxIterations = 2
|
||||
|
||||
while (id < maxIterations) {
|
||||
const message = `Message\nIt is ${id}`
|
||||
await stream.writeSSE({ data: message, event: 'time-update', id: String(id++) })
|
||||
await stream.sleep(10)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
const handler = streamHandle(app)
|
||||
|
||||
it('Should streamHandle a GET request and return a 200 response (LambdaFunctionUrlEvent)', async () => {
|
||||
const event = {
|
||||
headers: { 'content-type': ' binary/octet-stream' },
|
||||
rawPath: '/stream/text',
|
||||
rawQueryString: '',
|
||||
body: null,
|
||||
isBase64Encoded: false,
|
||||
requestContext: testLambdaFunctionUrlRequestContext,
|
||||
}
|
||||
|
||||
testLambdaFunctionUrlRequestContext.http.method = 'GET'
|
||||
|
||||
const mockReadableStream = new Readable({
|
||||
// eslint-disable-next-line @typescript-eslint/no-empty-function
|
||||
read() {},
|
||||
})
|
||||
|
||||
mockReadableStream.push('0\n')
|
||||
mockReadableStream.push('1\n')
|
||||
mockReadableStream.push('2\n')
|
||||
mockReadableStream.push('3\n')
|
||||
mockReadableStream.push(null) // EOF
|
||||
|
||||
const res = await handler(event, mockReadableStream)
|
||||
|
||||
const chunks = []
|
||||
for await (const chunk of mockReadableStream) {
|
||||
chunks.push(chunk)
|
||||
}
|
||||
console.log(res)
|
||||
expect(chunks.join('')).toContain('0\n1\n2\n3\n')
|
||||
})
|
||||
})
|
||||
|
42
runtime_tests/lambda/mock.ts
Normal file
42
runtime_tests/lambda/mock.ts
Normal file
@ -0,0 +1,42 @@
|
||||
import { vi } from 'vitest'
|
||||
import type {
|
||||
APIGatewayProxyEvent,
|
||||
APIGatewayProxyEventV2,
|
||||
LambdaFunctionUrlEvent,
|
||||
} from '../../src/adapter/aws-lambda/handler'
|
||||
import type { LambdaContext } from '../../src/adapter/aws-lambda/types'
|
||||
|
||||
type StreamifyResponseHandler = (
|
||||
handlerFunc: (
|
||||
event: APIGatewayProxyEvent | APIGatewayProxyEventV2 | LambdaFunctionUrlEvent,
|
||||
responseStream: NodeJS.WritableStream,
|
||||
context: LambdaContext
|
||||
) => Promise<void>
|
||||
) => (event: APIGatewayProxyEvent, context: LambdaContext) => Promise<NodeJS.WritableStream>
|
||||
|
||||
const mockStreamifyResponse: StreamifyResponseHandler = (handlerFunc) => {
|
||||
return async (event, context) => {
|
||||
const mockWritableStream: NodeJS.WritableStream = new (require('stream').Writable)({
|
||||
write(chunk, encoding, callback) {
|
||||
console.log('Writing chunk:', chunk.toString())
|
||||
callback()
|
||||
},
|
||||
final(callback) {
|
||||
console.log('Finalizing stream.')
|
||||
callback()
|
||||
},
|
||||
})
|
||||
mockWritableStream.on('finish', () => {
|
||||
console.log('Stream has finished')
|
||||
})
|
||||
await handlerFunc(event, mockWritableStream, context)
|
||||
mockWritableStream.end()
|
||||
return mockWritableStream
|
||||
}
|
||||
}
|
||||
|
||||
const awslambda = {
|
||||
streamifyResponse: mockStreamifyResponse,
|
||||
}
|
||||
|
||||
vi.stubGlobal('awslambda', awslambda)
|
@ -5,6 +5,6 @@ export default defineConfig({
|
||||
test: {
|
||||
globals: true,
|
||||
include: ['**/runtime_tests/lambda/**/*.+(ts|tsx|js)'],
|
||||
exclude: ['**/runtime_tests/lambda/vitest.config.ts'],
|
||||
exclude: ['**/runtime_tests/lambda/vitest.config.ts', '**/runtime_tests/lambda/mock.ts'],
|
||||
},
|
||||
})
|
||||
|
16
src/adapter/aws-lambda/awslambda.d.ts
vendored
Normal file
16
src/adapter/aws-lambda/awslambda.d.ts
vendored
Normal file
@ -0,0 +1,16 @@
|
||||
// @denoify-ignore
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
|
||||
import type { LambdaContext, Handler } from './types'
|
||||
|
||||
declare global {
|
||||
namespace awslambda {
|
||||
function streamifyResponse(
|
||||
f: (
|
||||
event: any,
|
||||
responseStream: NodeJS.WritableStream,
|
||||
context: LambdaContext
|
||||
) => Promise<void>
|
||||
): Handler
|
||||
}
|
||||
}
|
@ -12,7 +12,7 @@ import type { LambdaContext } from './types'
|
||||
globalThis.crypto ??= crypto
|
||||
|
||||
// When calling Lambda directly through function urls
|
||||
interface APIGatewayProxyEventV2 {
|
||||
export interface APIGatewayProxyEventV2 {
|
||||
httpMethod: string
|
||||
headers: Record<string, string | undefined>
|
||||
cookies?: string[]
|
||||
@ -24,7 +24,7 @@ interface APIGatewayProxyEventV2 {
|
||||
}
|
||||
|
||||
// When calling Lambda through an API Gateway or an ELB
|
||||
interface APIGatewayProxyEvent {
|
||||
export interface APIGatewayProxyEvent {
|
||||
httpMethod: string
|
||||
headers: Record<string, string | undefined>
|
||||
multiValueHeaders?: {
|
||||
@ -38,7 +38,7 @@ interface APIGatewayProxyEvent {
|
||||
}
|
||||
|
||||
// When calling Lambda through an Lambda Function URLs
|
||||
interface LambdaFunctionUrlEvent {
|
||||
export interface LambdaFunctionUrlEvent {
|
||||
headers: Record<string, string | undefined>
|
||||
rawPath: string
|
||||
rawQueryString: string
|
||||
@ -64,6 +64,59 @@ const getRequestContext = (
|
||||
return event.requestContext
|
||||
}
|
||||
|
||||
const streamToNodeStream = async (
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||
writer: NodeJS.WritableStream
|
||||
) => {
|
||||
let readResult = await reader.read()
|
||||
while (!readResult.done) {
|
||||
writer.write(readResult.value)
|
||||
readResult = await reader.read()
|
||||
}
|
||||
writer.end()
|
||||
}
|
||||
|
||||
export const streamHandle = <
|
||||
E extends Env = Env,
|
||||
S extends Schema = {},
|
||||
BasePath extends string = '/'
|
||||
>(
|
||||
app: Hono<E, S, BasePath>
|
||||
) => {
|
||||
return awslambda.streamifyResponse(
|
||||
async (
|
||||
event: APIGatewayProxyEvent | APIGatewayProxyEventV2 | LambdaFunctionUrlEvent,
|
||||
responseStream: NodeJS.WritableStream,
|
||||
context: LambdaContext
|
||||
) => {
|
||||
try {
|
||||
const req = createRequest(event)
|
||||
const requestContext = getRequestContext(event)
|
||||
|
||||
const res = await app.fetch(req, {
|
||||
requestContext,
|
||||
context,
|
||||
})
|
||||
|
||||
// Check content type
|
||||
const contentType = res.headers.get('content-type')
|
||||
if (!contentType) {
|
||||
console.warn('Content Type is not set in the response.')
|
||||
}
|
||||
|
||||
if (res.body) {
|
||||
await streamToNodeStream(res.body.getReader(), responseStream)
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Error processing request:', error)
|
||||
responseStream.write('Internal Server Error')
|
||||
} finally {
|
||||
responseStream.end()
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Accepts events from API Gateway/ELB(`APIGatewayProxyEvent`) and directly through Function Url(`APIGatewayProxyEventV2`)
|
||||
*/
|
||||
|
@ -1,4 +1,4 @@
|
||||
// @denoify-ignore
|
||||
export { handle } from './handler'
|
||||
export { handle, streamHandle } from './handler'
|
||||
export type { ApiGatewayRequestContext, LambdaFunctionUrlRequestContext } from './custom-context'
|
||||
export type { LambdaContext } from './types'
|
||||
|
@ -1,4 +1,6 @@
|
||||
// @denoify-ignore
|
||||
/* eslint-disable @typescript-eslint/no-explicit-any */
|
||||
|
||||
export interface CognitoIdentity {
|
||||
cognitoIdentityId: string
|
||||
cognitoIdentityPoolId: string
|
||||
@ -45,3 +47,11 @@ export interface LambdaContext {
|
||||
|
||||
getRemainingTimeInMillis(): number
|
||||
}
|
||||
|
||||
type Callback<TResult = any> = (error?: Error | string | null, result?: TResult) => void
|
||||
|
||||
export type Handler<TEvent = any, TResult = any> = (
|
||||
event: TEvent,
|
||||
context: LambdaContext,
|
||||
callback: Callback<TResult>
|
||||
) => void | Promise<TResult>
|
||||
|
@ -20,6 +20,7 @@
|
||||
},
|
||||
"include": [
|
||||
"src/**/*.ts",
|
||||
"src/**/*.d.ts",
|
||||
"src/**/*.mts",
|
||||
"src/**/*.test.ts",
|
||||
"src/**/*.test.tsx"
|
||||
|
Loading…
Reference in New Issue
Block a user