diff --git a/.run/PostHog.run.xml b/.run/PostHog.run.xml
index 7dedccabd3b..df41d468add 100644
--- a/.run/PostHog.run.xml
+++ b/.run/PostHog.run.xml
@@ -13,12 +13,13 @@
+
-
+
diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts
index 369e7b295c8..be9ddf7bc21 100644
--- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts
+++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer-v2.ts
@@ -208,7 +208,10 @@ export class SessionRecordingIngesterV2 {
// If it is recoverable, we probably want to retry?
}
- public async parseKafkaMessage(message: Message): Promise {
+ public async parseKafkaMessage(
+ message: Message,
+ getTeamFn: (s: string) => Promise
+ ): Promise {
const statusWarn = (reason: string, extra?: Record) => {
status.warn('⚠️', 'invalid_message', {
reason,
@@ -246,7 +249,7 @@ export class SessionRecordingIngesterV2 {
const token = messagePayload.token
if (token) {
- teamId = await this.teamsRefresher.get().then((teams) => teams[token] || null)
+ teamId = await getTeamFn(token)
}
if (teamId == null) {
@@ -272,7 +275,7 @@ export class SessionRecordingIngesterV2 {
},
team_id: teamId,
- distinct_id: event.properties.distinct_id,
+ distinct_id: messagePayload.distinct_id,
session_id: event.properties?.$session_id,
window_id: event.properties?.$window_id,
events: event.properties.$snapshot_items,
@@ -321,7 +324,9 @@ export class SessionRecordingIngesterV2 {
}
}
- const recordingMessage = await this.parseKafkaMessage(message)
+ const recordingMessage = await this.parseKafkaMessage(message, (token) =>
+ this.teamsRefresher.get().then((teams) => teams[token] || null)
+ )
if (recordingMessage) {
recordingMessages.push(recordingMessage)
}
diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts
index e3021bbf33a..87b66a7210f 100644
--- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts
+++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v2.test.ts
@@ -1,4 +1,5 @@
import { mkdirSync, rmSync } from 'node:fs'
+import { Message } from 'node-rdkafka-acosom'
import path from 'path'
import { waitForExpect } from '../../../../functional_tests/expectations'
@@ -146,6 +147,78 @@ describe('ingester', () => {
expect(ingester.sessions['1-session_id_1']).not.toBeDefined()
})
+ describe('parsing the message', () => {
+ it('can handle numeric distinct_ids', async () => {
+ const numeric_id = 12345
+
+ const parsedMessage = await ingester.parseKafkaMessage(
+ {
+ value: Buffer.from(
+ JSON.stringify({
+ uuid: '018a47df-a0f6-7761-8635-439a0aa873bb',
+ distinct_id: String(numeric_id),
+ ip: '127.0.0.1',
+ site_url: 'http://127.0.0.1:8000',
+ data: JSON.stringify({
+ uuid: '018a47df-a0f6-7761-8635-439a0aa873bb',
+ event: '$snapshot_items',
+ properties: {
+ distinct_id: numeric_id,
+ $session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070',
+ $window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448',
+ $snapshot_items: [
+ {
+ type: 6,
+ data: {
+ plugin: 'rrweb/console@1',
+ payload: {
+ level: 'log',
+ trace: [
+ 'HedgehogActor.setAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105543:17)',
+ 'HedgehogActor.setRandomAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105550:14)',
+ 'HedgehogActor.update (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105572:16)',
+ 'loop (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105754:15)',
+ ],
+ payload: ['"Hedgehog: Will \'jump\' for 2916.6666666666665ms"'],
+ },
+ },
+ timestamp: 1693422950693,
+ },
+ ],
+ $snapshot_consumer: 'v2',
+ },
+ offset: 2187,
+ }),
+ now: '2023-08-30T19:15:54.887316+00:00',
+ sent_at: '2023-08-30T19:15:54.882000+00:00',
+ token: 'the_token',
+ })
+ ),
+ timestamp: 1,
+ size: 1,
+ topic: 'the_topic',
+ offset: 1,
+ partition: 1,
+ } satisfies Message,
+ () => Promise.resolve(1)
+ )
+ expect(parsedMessage).toEqual({
+ distinct_id: '12345',
+ events: expect.any(Array),
+ metadata: {
+ offset: 1,
+ partition: 1,
+ timestamp: 1,
+ topic: 'the_topic',
+ },
+ replayIngestionConsumer: 'v2',
+ session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070',
+ team_id: 1,
+ window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448',
+ })
+ })
+ })
+
// NOTE: Committing happens by the parent
describe('offset committing', () => {
const metadata = {