mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 21:49:51 +01:00
fix: only use stringy distinct ids (#17255)
* fix: only use stringy distinct ids * fix
This commit is contained in:
parent
6b40fee757
commit
c3ec949474
@ -13,12 +13,13 @@
|
||||
<env name="KAFKA_HOSTS" value="localhost" />
|
||||
<env name="KEA_VERBOSE_LOGGING" value="false" />
|
||||
<env name="PRINT_SQL" value="1" />
|
||||
<env name="PYDEVD_USE_CYTHON" value="NO" />
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
<env name="SESSION_RECORDING_KAFKA_COMPRESSION" value="gzip" />
|
||||
<env name="SESSION_RECORDING_KAFKA_HOSTS" value="localhost" />
|
||||
<env name="SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES" value="524288" />
|
||||
<env name="SKIP_SERVICE_VERSION_REQUIREMENTS" value="1" />
|
||||
<env name="PYDEVD_USE_CYTHON" value="NO" />
|
||||
<env name="REPLAY_EVENTS_NEW_CONSUMER_RATIO" value="1" />
|
||||
</envs>
|
||||
<option name="SDK_HOME" value="$PROJECT_DIR$/env/bin/python" />
|
||||
<option name="SDK_NAME" value="Python 3.10 (posthog)" />
|
||||
|
@ -208,7 +208,10 @@ export class SessionRecordingIngesterV2 {
|
||||
// If it is recoverable, we probably want to retry?
|
||||
}
|
||||
|
||||
public async parseKafkaMessage(message: Message): Promise<IncomingRecordingMessage | void> {
|
||||
public async parseKafkaMessage(
|
||||
message: Message,
|
||||
getTeamFn: (s: string) => Promise<TeamId | null>
|
||||
): Promise<IncomingRecordingMessage | void> {
|
||||
const statusWarn = (reason: string, extra?: Record<string, any>) => {
|
||||
status.warn('⚠️', 'invalid_message', {
|
||||
reason,
|
||||
@ -246,7 +249,7 @@ export class SessionRecordingIngesterV2 {
|
||||
const token = messagePayload.token
|
||||
|
||||
if (token) {
|
||||
teamId = await this.teamsRefresher.get().then((teams) => teams[token] || null)
|
||||
teamId = await getTeamFn(token)
|
||||
}
|
||||
|
||||
if (teamId == null) {
|
||||
@ -272,7 +275,7 @@ export class SessionRecordingIngesterV2 {
|
||||
},
|
||||
|
||||
team_id: teamId,
|
||||
distinct_id: event.properties.distinct_id,
|
||||
distinct_id: messagePayload.distinct_id,
|
||||
session_id: event.properties?.$session_id,
|
||||
window_id: event.properties?.$window_id,
|
||||
events: event.properties.$snapshot_items,
|
||||
@ -321,7 +324,9 @@ export class SessionRecordingIngesterV2 {
|
||||
}
|
||||
}
|
||||
|
||||
const recordingMessage = await this.parseKafkaMessage(message)
|
||||
const recordingMessage = await this.parseKafkaMessage(message, (token) =>
|
||||
this.teamsRefresher.get().then((teams) => teams[token] || null)
|
||||
)
|
||||
if (recordingMessage) {
|
||||
recordingMessages.push(recordingMessage)
|
||||
}
|
||||
|
@ -1,4 +1,5 @@
|
||||
import { mkdirSync, rmSync } from 'node:fs'
|
||||
import { Message } from 'node-rdkafka-acosom'
|
||||
import path from 'path'
|
||||
|
||||
import { waitForExpect } from '../../../../functional_tests/expectations'
|
||||
@ -146,6 +147,78 @@ describe('ingester', () => {
|
||||
expect(ingester.sessions['1-session_id_1']).not.toBeDefined()
|
||||
})
|
||||
|
||||
describe('parsing the message', () => {
|
||||
it('can handle numeric distinct_ids', async () => {
|
||||
const numeric_id = 12345
|
||||
|
||||
const parsedMessage = await ingester.parseKafkaMessage(
|
||||
{
|
||||
value: Buffer.from(
|
||||
JSON.stringify({
|
||||
uuid: '018a47df-a0f6-7761-8635-439a0aa873bb',
|
||||
distinct_id: String(numeric_id),
|
||||
ip: '127.0.0.1',
|
||||
site_url: 'http://127.0.0.1:8000',
|
||||
data: JSON.stringify({
|
||||
uuid: '018a47df-a0f6-7761-8635-439a0aa873bb',
|
||||
event: '$snapshot_items',
|
||||
properties: {
|
||||
distinct_id: numeric_id,
|
||||
$session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070',
|
||||
$window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448',
|
||||
$snapshot_items: [
|
||||
{
|
||||
type: 6,
|
||||
data: {
|
||||
plugin: 'rrweb/console@1',
|
||||
payload: {
|
||||
level: 'log',
|
||||
trace: [
|
||||
'HedgehogActor.setAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105543:17)',
|
||||
'HedgehogActor.setRandomAnimation (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105550:14)',
|
||||
'HedgehogActor.update (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105572:16)',
|
||||
'loop (http://127.0.0.1:8000/static/toolbar.js?_ts=1693421010000:105754:15)',
|
||||
],
|
||||
payload: ['"Hedgehog: Will \'jump\' for 2916.6666666666665ms"'],
|
||||
},
|
||||
},
|
||||
timestamp: 1693422950693,
|
||||
},
|
||||
],
|
||||
$snapshot_consumer: 'v2',
|
||||
},
|
||||
offset: 2187,
|
||||
}),
|
||||
now: '2023-08-30T19:15:54.887316+00:00',
|
||||
sent_at: '2023-08-30T19:15:54.882000+00:00',
|
||||
token: 'the_token',
|
||||
})
|
||||
),
|
||||
timestamp: 1,
|
||||
size: 1,
|
||||
topic: 'the_topic',
|
||||
offset: 1,
|
||||
partition: 1,
|
||||
} satisfies Message,
|
||||
() => Promise.resolve(1)
|
||||
)
|
||||
expect(parsedMessage).toEqual({
|
||||
distinct_id: '12345',
|
||||
events: expect.any(Array),
|
||||
metadata: {
|
||||
offset: 1,
|
||||
partition: 1,
|
||||
timestamp: 1,
|
||||
topic: 'the_topic',
|
||||
},
|
||||
replayIngestionConsumer: 'v2',
|
||||
session_id: '018a47c2-2f4a-70a8-b480-5e51d8b8d070',
|
||||
team_id: 1,
|
||||
window_id: '018a47c2-2f4a-70a8-b480-5e52f5480448',
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// NOTE: Committing happens by the parent
|
||||
describe('offset committing', () => {
|
||||
const metadata = {
|
||||
|
Loading…
Reference in New Issue
Block a user