diff --git a/.run/Plugin Server.run.xml b/.run/Plugin Server.run.xml index 0fd84dbd7e7..cfa9467dac7 100644 --- a/.run/Plugin Server.run.xml +++ b/.run/Plugin Server.run.xml @@ -10,9 +10,10 @@ - + + - + \ No newline at end of file diff --git a/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts b/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts index 5e43a993fbc..2ad539c3410 100644 --- a/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts +++ b/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts @@ -1,9 +1,11 @@ import { GetObjectCommand, GetObjectCommandOutput, ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3' -import fs from 'fs' +import { readdir, readFile } from 'fs/promises' +import { join } from 'path' import * as zlib from 'zlib' import { defaultConfig } from '../src/config/config' import { compressToString } from '../src/main/ingestion-queues/session-recording/blob-ingester/utils' +import { bufferFileDir } from '../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer' import { getObjectStorage } from '../src/main/services/object_storage' import { UUIDT } from '../src/utils/utils' import { capture, createOrganization, createTeam } from './api' @@ -58,7 +60,7 @@ test.skip(`single recording event writes data to local tmp file`, async () => { let tempFiles: string[] = [] await waitForExpect(async () => { - const files = await fs.promises.readdir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY) + const files = await readdir(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY)) tempFiles = files.filter((f) => f.startsWith(`${teamId}.${sessionId}`)) expect(tempFiles.length).toBe(1) }) @@ -66,8 +68,8 @@ test.skip(`single recording event writes data to local tmp file`, async () => { await waitForExpect(async () => { const currentFile = tempFiles[0] - const fileContents = await fs.promises.readFile( - `${defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY}/${currentFile}`, + const fileContents = await readFile( + join(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY), currentFile), 'utf8' ) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts index edfa3291ac3..9d43951c6c9 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts @@ -10,6 +10,7 @@ import * as zlib from 'zlib' import { PluginsServerConfig } from '../../../../types' import { status } from '../../../../utils/status' import { ObjectStorage } from '../../../services/object_storage' +import { bufferFileDir } from '../session-recordings-blob-consumer' import { IncomingRecordingMessage } from './types' import { convertToPersistedMessage } from './utils' @@ -175,7 +176,7 @@ export class SessionManager { size: 0, createdAt: new Date(), file: path.join( - this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, + bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), `${this.teamId}.${this.sessionId}.${id}.jsonl` ), offsets: [], diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts index 89292aef5e2..7a19153f201 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts @@ -1,5 +1,6 @@ import { mkdirSync, rmSync } from 'node:fs' import { CODES, HighLevelProducer as RdKafkaProducer, Message } from 'node-rdkafka-acosom' +import path from 'path' import { KAFKA_SESSION_RECORDING_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' @@ -18,6 +19,8 @@ const groupId = 'session-recordings-blob' const sessionTimeout = 30000 const fetchBatchSize = 500 +export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files') + export class SessionRecordingBlobIngester { sessions: Map = new Map() offsetManager?: OffsetManager @@ -33,7 +36,8 @@ export class SessionRecordingBlobIngester { private objectStorage: ObjectStorage ) { const enabledTeamsString = this.serverConfig.SESSION_RECORDING_BLOB_PROCESSING_TEAMS - this.enabledTeams = enabledTeamsString === 'all' ? null : enabledTeamsString.split(',').map(parseInt) + this.enabledTeams = + enabledTeamsString === 'all' ? null : enabledTeamsString.split(',').filter(Boolean).map(parseInt) } public async consume(event: IncomingRecordingMessage): Promise { @@ -165,8 +169,8 @@ export class SessionRecordingBlobIngester { status.info('🔁', 'Starting session recordings blob consumer') // Currently we can't reuse any files stored on disk, so we opt to delete them all - rmSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) - mkdirSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true }) + rmSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true, force: true }) + mkdirSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true }) status.info('🔁', 'Starting session recordings consumer') diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index bc102587fff..77fd06abd9d 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -183,6 +183,7 @@ export interface PluginsServerConfig { CLOUD_DEPLOYMENT: string SESSION_RECORDING_BLOB_PROCESSING_TEAMS: string + // local directory might be a volume mount or a directory on disk (e.g. in local dev) SESSION_RECORDING_LOCAL_DIRECTORY: string SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number