0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 18:07:17 +01:00

fix: push the buffer files storage down a level (#15295)

This commit is contained in:
Paul D'Ambra 2023-05-02 08:12:52 +01:00 committed by GitHub
parent b08238b340
commit 359177127d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 19 additions and 10 deletions

View File

@ -10,9 +10,10 @@
<env name="CLICKHOUSE_SECURE" value="False" />
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
<env name="KAFKA_HOSTS" value="localhost:9092" />
<env name="WORKER_CONCURRENCY" value="2" />
<env name="OBJECT_STORAGE_ENABLED" value="True" />
<env name="WORKER_CONCURRENCY" value="2" />
<env name="SESSION_RECORDING_BLOB_PROCESSING_TEAMS" value="all" />
</envs>
<method v="2" />
</configuration>
</component>
</component>

View File

@ -1,9 +1,11 @@
import { GetObjectCommand, GetObjectCommandOutput, ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3'
import fs from 'fs'
import { readdir, readFile } from 'fs/promises'
import { join } from 'path'
import * as zlib from 'zlib'
import { defaultConfig } from '../src/config/config'
import { compressToString } from '../src/main/ingestion-queues/session-recording/blob-ingester/utils'
import { bufferFileDir } from '../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer'
import { getObjectStorage } from '../src/main/services/object_storage'
import { UUIDT } from '../src/utils/utils'
import { capture, createOrganization, createTeam } from './api'
@ -58,7 +60,7 @@ test.skip(`single recording event writes data to local tmp file`, async () => {
let tempFiles: string[] = []
await waitForExpect(async () => {
const files = await fs.promises.readdir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY)
const files = await readdir(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY))
tempFiles = files.filter((f) => f.startsWith(`${teamId}.${sessionId}`))
expect(tempFiles.length).toBe(1)
})
@ -66,8 +68,8 @@ test.skip(`single recording event writes data to local tmp file`, async () => {
await waitForExpect(async () => {
const currentFile = tempFiles[0]
const fileContents = await fs.promises.readFile(
`${defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY}/${currentFile}`,
const fileContents = await readFile(
join(bufferFileDir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY), currentFile),
'utf8'
)

View File

@ -10,6 +10,7 @@ import * as zlib from 'zlib'
import { PluginsServerConfig } from '../../../../types'
import { status } from '../../../../utils/status'
import { ObjectStorage } from '../../../services/object_storage'
import { bufferFileDir } from '../session-recordings-blob-consumer'
import { IncomingRecordingMessage } from './types'
import { convertToPersistedMessage } from './utils'
@ -175,7 +176,7 @@ export class SessionManager {
size: 0,
createdAt: new Date(),
file: path.join(
this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY,
bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY),
`${this.teamId}.${this.sessionId}.${id}.jsonl`
),
offsets: [],

View File

@ -1,5 +1,6 @@
import { mkdirSync, rmSync } from 'node:fs'
import { CODES, HighLevelProducer as RdKafkaProducer, Message } from 'node-rdkafka-acosom'
import path from 'path'
import { KAFKA_SESSION_RECORDING_EVENTS } from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
@ -18,6 +19,8 @@ const groupId = 'session-recordings-blob'
const sessionTimeout = 30000
const fetchBatchSize = 500
export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-files')
export class SessionRecordingBlobIngester {
sessions: Map<string, SessionManager> = new Map()
offsetManager?: OffsetManager
@ -33,7 +36,8 @@ export class SessionRecordingBlobIngester {
private objectStorage: ObjectStorage
) {
const enabledTeamsString = this.serverConfig.SESSION_RECORDING_BLOB_PROCESSING_TEAMS
this.enabledTeams = enabledTeamsString === 'all' ? null : enabledTeamsString.split(',').map(parseInt)
this.enabledTeams =
enabledTeamsString === 'all' ? null : enabledTeamsString.split(',').filter(Boolean).map(parseInt)
}
public async consume(event: IncomingRecordingMessage): Promise<void> {
@ -165,8 +169,8 @@ export class SessionRecordingBlobIngester {
status.info('🔁', 'Starting session recordings blob consumer')
// Currently we can't reuse any files stored on disk, so we opt to delete them all
rmSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true })
mkdirSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true })
rmSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true, force: true })
mkdirSync(bufferFileDir(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY), { recursive: true })
status.info('🔁', 'Starting session recordings consumer')

View File

@ -183,6 +183,7 @@ export interface PluginsServerConfig {
CLOUD_DEPLOYMENT: string
SESSION_RECORDING_BLOB_PROCESSING_TEAMS: string
// local directory might be a volume mount or a directory on disk (e.g. in local dev)
SESSION_RECORDING_LOCAL_DIRECTORY: string
SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number
SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number