0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

fix(product-assistant): correctly establish a connection for streaming (#25826)

This commit is contained in:
Georgiy Tarasov 2024-10-25 21:18:54 +02:00 committed by GitHub
parent 6013e43abb
commit 544e239f40
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 46 additions and 55 deletions

View File

@ -79,6 +79,9 @@ class Assistant:
chunks = AIMessageChunk(content="")
# Send a chunk to establish the connection avoiding the worker's timeout.
yield ""
for update in generator:
if is_value_update(update):
_, state_update = update

View File

@ -4,7 +4,7 @@ import { useEffect } from 'react'
import { mswDecorator, useStorybookMocks } from '~/mocks/browser'
import chatResponse from './__mocks__/chatResponse.json'
import { chatResponseChunk } from './__mocks__/chatResponse.mocks'
import { MaxInstance } from './Max'
import { maxLogic } from './maxLogic'
@ -13,7 +13,7 @@ const meta: Meta = {
decorators: [
mswDecorator({
post: {
'/api/environments/:team_id/query/chat/': chatResponse,
'/api/environments/:team_id/query/chat/': (_, res, ctx) => res(ctx.text(chatResponseChunk)),
},
}),
],

View File

@ -0,0 +1,3 @@
import chatResponse from './chatResponse.json'
export const chatResponseChunk = `data: ${JSON.stringify(chatResponse)}\n\n`

View File

@ -1,4 +1,5 @@
import { shuffle } from 'd3'
import { createParser } from 'eventsource-parser'
import { actions, kea, key, listeners, path, props, reducers, selectors } from 'kea'
import { loaders } from 'kea-loaders'
import api from 'lib/api'
@ -118,21 +119,23 @@ export const maxLogic = kea<maxLogicType>([
messages: values.thread.map(({ status, ...message }) => message),
})
const reader = response.body?.getReader()
if (!reader) {
return
}
const decoder = new TextDecoder()
if (reader) {
let firstChunk = true
let firstChunk = true
while (true) {
const { done, value } = await reader.read()
if (done) {
actions.setMessageStatus(newIndex, 'completed')
break
const parser = createParser({
onEvent: (event) => {
const parsedResponse = parseResponse(event.data)
if (!parsedResponse) {
return
}
const text = decoder.decode(value)
const parsedResponse = parseResponse(text)
if (firstChunk) {
firstChunk = false
@ -145,6 +148,17 @@ export const maxLogic = kea<maxLogicType>([
status: 'loading',
})
}
},
})
while (true) {
const { done, value } = await reader.read()
parser.feed(decoder.decode(value))
if (done) {
actions.setMessageStatus(newIndex, 'completed')
break
}
}
} catch {
@ -163,50 +177,11 @@ export const maxLogic = kea<maxLogicType>([
* Parses the generation result from the API. Some generation chunks might be sent in batches.
* @param response
*/
function parseResponse(response: string, recursive = true): RootAssistantMessage | null {
function parseResponse(response: string): RootAssistantMessage | null | undefined {
try {
const parsed = JSON.parse(response)
return parsed as RootAssistantMessage
return parsed as RootAssistantMessage | null | undefined
} catch {
if (!recursive) {
return null
}
const results: [number, number][] = []
let pair: [number, number] = [0, 0]
let seq = 0
for (let i = 0; i < response.length; i++) {
const char = response[i]
if (char === '{') {
if (seq === 0) {
pair[0] = i
}
seq += 1
}
if (char === '}') {
seq -= 1
if (seq === 0) {
pair[1] = i
}
}
if (seq === 0) {
results.push(pair)
pair = [0, 0]
}
}
const lastPair = results.pop()
if (lastPair) {
const [left, right] = lastPair
return parseResponse(response.slice(left, right + 1), false)
}
return null
}
}

View File

@ -124,6 +124,7 @@
"esbuild-plugin-less": "^1.3.1",
"esbuild-plugin-polyfill-node": "^0.3.0",
"esbuild-sass-plugin": "^3.0.0",
"eventsource-parser": "^3.0.0",
"expr-eval": "^2.0.2",
"express": "^4.17.1",
"fast-deep-equal": "^3.1.3",

View File

@ -193,6 +193,9 @@ dependencies:
esbuild-sass-plugin:
specifier: ^3.0.0
version: 3.0.0(esbuild@0.19.8)
eventsource-parser:
specifier: ^3.0.0
version: 3.0.0
expr-eval:
specifier: ^2.0.2
version: 2.0.2
@ -386,7 +389,7 @@ dependencies:
optionalDependencies:
fsevents:
specifier: ^2.3.2
version: 2.3.2
version: 2.3.3
devDependencies:
'@babel/core':
@ -12521,6 +12524,11 @@ packages:
engines: {node: '>=0.8.x'}
dev: true
/eventsource-parser@3.0.0:
resolution: {integrity: sha512-T1C0XCUimhxVQzW4zFipdx0SficT651NnkR0ZSH3yQwh+mFMdLfgjABVi4YtMTtaL4s168593DaoaRLMqryavA==}
engines: {node: '>=18.0.0'}
dev: false
/execa@4.1.0:
resolution: {integrity: sha512-j5W0//W7f8UxAn8hXVnwG8tLwdiUy4FJLcSupCg6maBYZDpyBvTApK7KyuI4bKj8KOh1r2YH+6ucuYtJv1bTZA==}
engines: {node: '>=10'}
@ -13126,6 +13134,7 @@ packages:
engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0}
os: [darwin]
requiresBuild: true
dev: true
optional: true
/fsevents@2.3.3:

View File

@ -185,7 +185,7 @@ class QueryViewSet(TeamAndOrgViewSetMixin, PydanticModelMixin, viewsets.ViewSet)
last_message = None
for message in assistant.stream(validated_body):
last_message = message
yield last_message
yield f"data: {message}\n\n"
human_message = validated_body.messages[-1].root
if isinstance(human_message, HumanMessage):