diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts index 033c2b1fb24..63465e8f827 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts @@ -10,15 +10,17 @@ import { TopicPartition, } from 'node-rdkafka-acosom' import path from 'path' +import { Pool } from 'pg' import { Gauge } from 'prom-client' import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer' -import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, Team } from '../../../types' +import { PipelineEvent, PluginsServerConfig, RawEventMessage, RedisPool, TeamId } from '../../../types' +import { BackgroundRefresher } from '../../../utils/background-refresher' import { status } from '../../../utils/status' -import { TeamManager } from '../../../worker/ingestion/team-manager' +import { fetchTeamTokensWithRecordings } from '../../../worker/ingestion/team-manager' import { ObjectStorage } from '../../services/object_storage' import { eventDroppedCounter } from '../metrics' import { RealtimeManager } from './blob-ingester/realtime-manager' @@ -82,10 +84,11 @@ export class SessionRecordingBlobIngester { flushInterval: NodeJS.Timer | null = null // the time at the most recent message of a particular partition partitionNow: Record = {} + teamsRefresher: BackgroundRefresher> constructor( - private teamManager: TeamManager, private serverConfig: PluginsServerConfig, + private postgres: Pool, private objectStorage: ObjectStorage, private redisPool: RedisPool ) { @@ -100,6 +103,17 @@ export class SessionRecordingBlobIngester { this.redisPool, serverConfig.SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY ) + + this.teamsRefresher = new BackgroundRefresher(async () => { + try { + status.info('🔁', 'blob_ingester_consumer - refreshing teams in the background') + return await fetchTeamTokensWithRecordings(this.postgres) + } catch (e) { + status.error('🔥', 'blob_ingester_consumer - failed to refresh teams in the background', e) + captureException(e) + throw e + } + }) } public async consume(event: IncomingRecordingMessage, sentrySpan?: Sentry.Span): Promise { @@ -208,33 +222,29 @@ export class SessionRecordingBlobIngester { return statusWarn('no_token') } - let team: Team | null = null + let teamId: TeamId | null = null + const token = messagePayload.token const teamSpan = span?.startChild({ op: 'fetchTeam', }) - if (messagePayload.team_id != null) { - team = await this.teamManager.fetchTeam(messagePayload.team_id) - } else if (messagePayload.token) { - team = await this.teamManager.getTeamByToken(messagePayload.token) + if (token) { + teamId = await this.teamsRefresher.get().then((teams) => teams[token] || null) } teamSpan?.finish() - if (team == null) { - return statusWarn('team_not_found', { - teamId: messagePayload.team_id, - payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', - }) - } - - if (!team.session_recording_opt_in) { + if (teamId == null) { eventDroppedCounter .labels({ event_type: 'session_recordings_blob_ingestion', - drop_cause: 'disabled', + drop_cause: 'team_missing_or_disabled', }) .inc() - return + + return statusWarn('team_missing_or_disabled', { + teamId: messagePayload.team_id, + payloadTeamSource: messagePayload.team_id ? 'team' : messagePayload.token ? 'token' : 'unknown', + }) } const recordingMessage: IncomingRecordingMessage = { @@ -245,7 +255,7 @@ export class SessionRecordingBlobIngester { timestamp: message.timestamp, }, - team_id: team.id, + team_id: teamId, distinct_id: event.distinct_id, session_id: event.properties?.$session_id, window_id: event.properties?.$window_id, @@ -291,6 +301,8 @@ export class SessionRecordingBlobIngester { throw e } await this.realtimeManager.subscribe() + // Load teams into memory + await this.teamsRefresher.refresh() const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig) this.producer = await createKafkaProducer(connectionConfig) diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 2f21f6c178c..501bb85d529 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -436,14 +436,13 @@ export async function startPluginsServer( if (capabilities.sessionRecordingBlobIngestion) { const blobServerConfig = sessionRecordingBlobConsumerConfig(serverConfig) const postgres = hub?.postgres ?? createPostgresPool(blobServerConfig.DATABASE_URL) - const teamManager = hub?.teamManager ?? new TeamManager(postgres, blobServerConfig) const s3 = hub?.objectStorage ?? getObjectStorage(blobServerConfig) const redisPool = hub?.db.redisPool ?? createRedisPool(blobServerConfig) if (!s3) { throw new Error("Can't start session recording blob ingestion without object storage") } - const ingester = new SessionRecordingBlobIngester(teamManager, blobServerConfig, s3, redisPool) + const ingester = new SessionRecordingBlobIngester(blobServerConfig, postgres, s3, redisPool) await ingester.start() const batchConsumer = ingester.batchConsumer if (batchConsumer) { diff --git a/plugin-server/src/utils/background-refresher.ts b/plugin-server/src/utils/background-refresher.ts new file mode 100644 index 00000000000..c2645d8ba54 --- /dev/null +++ b/plugin-server/src/utils/background-refresher.ts @@ -0,0 +1,42 @@ +import { status } from './status' + +// A background refresher will act like a TTL cache but choosing to refresh the value in the background rather than +// dropping the data or blocking the request. +export class BackgroundRefresher { + private cachedValue: T | undefined = undefined + private cachedValuePromise: Promise | null = null + private lastRefreshTime = 0 + + constructor(private readonly refreshFunction: () => Promise, private readonly maxAgeMs: number = 1000 * 60) {} + + public async refresh(): Promise { + if (this.cachedValuePromise) { + return this.cachedValuePromise + } + try { + this.cachedValuePromise = this.refreshFunction() + this.cachedValue = await this.cachedValuePromise + } catch (e) { + status.error('BackgroundRefresher: Error refreshing background task', e) + throw e + } finally { + this.cachedValuePromise = null + this.lastRefreshTime = Date.now() + } + + return this.cachedValue + } + + public async get(): Promise { + if (!this.cachedValue) { + await this.refresh() + } + + if (Date.now() - this.lastRefreshTime > this.maxAgeMs) { + // We trigger the refresh but we don't use it + void this.refresh() + } + + return this.cachedValue! + } +} diff --git a/plugin-server/src/worker/ingestion/team-manager.ts b/plugin-server/src/worker/ingestion/team-manager.ts index 206a01e6082..ff6aa711571 100644 --- a/plugin-server/src/worker/ingestion/team-manager.ts +++ b/plugin-server/src/worker/ingestion/team-manager.ts @@ -192,3 +192,21 @@ export async function fetchTeamByToken(client: Client | Pool, token: string): Pr ) return selectResult.rows[0] ?? null } + +export async function fetchTeamTokensWithRecordings(client: Client | Pool): Promise> { + const selectResult = await postgresQuery>( + client, + ` + SELECT id, api_token + FROM posthog_team + WHERE session_recording_opt_in = true + `, + [], + 'fetchTeamTokensWithRecordings' + ) + + return selectResult.rows.reduce((acc, row) => { + acc[row.api_token] = row.id + return acc + }, {} as Record) +} diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts index 1052e0df425..ee7507df8db 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts @@ -48,7 +48,7 @@ describe('ingester rebalancing tests', () => { }) it('rebalances partitions safely from one to two consumers', async () => { - ingesterOne = new SessionRecordingBlobIngester(hub.teamManager, config, hub.objectStorage, hub.redisPool) + ingesterOne = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool) await ingesterOne.start() @@ -59,7 +59,7 @@ describe('ingester rebalancing tests', () => { assertIngesterHasExpectedPartitions(ingesterOne, [1]) }) - ingesterTwo = new SessionRecordingBlobIngester(hub.teamManager, config, hub.objectStorage, hub.redisPool) + ingesterTwo = new SessionRecordingBlobIngester(config, hub.postgres, hub.objectStorage, hub.redisPool) await ingesterTwo.start() await waitForExpect(() => { diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts index bc11a622b49..6f522480700 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts @@ -84,11 +84,11 @@ describe('ingester', () => { // these tests assume that a flush won't run while they run beforeEach(async () => { ingester = new SessionRecordingBlobIngester( - hub.teamManager, { ...defaultConfig, SESSION_RECORDING_REDIS_OFFSET_STORAGE_KEY: keyPrefix, }, + hub.postgres, hub.objectStorage, hub.redisPool ) diff --git a/plugin-server/tests/utils/background-refresher.test.ts b/plugin-server/tests/utils/background-refresher.test.ts new file mode 100644 index 00000000000..66a49a3775d --- /dev/null +++ b/plugin-server/tests/utils/background-refresher.test.ts @@ -0,0 +1,82 @@ +import { BackgroundRefresher } from '../../src/utils/background-refresher' + +const realNow = Date.now + +describe('getNextRetryMs', () => { + jest.useFakeTimers() + jest.setTimeout(1000) + let refreshFunction = jest.fn() + let refresher = new BackgroundRefresher(refreshFunction) + + beforeEach(() => { + refreshFunction = jest.fn() + refresher = new BackgroundRefresher(refreshFunction, 100) + }) + + beforeAll(() => { + global.Date.now = jest.fn(() => new Date('2019-04-07T10:20:30Z').getTime()) + }) + + afterAll(() => { + global.Date.now = realNow + }) + + it('simple gets', async () => { + refreshFunction.mockResolvedValue('foo') + await expect(refresher.get()).resolves.toEqual('foo') + refreshFunction.mockResolvedValue('foo2') + await expect(refresher.get()).resolves.toEqual('foo') + await expect(refresher.refresh()).resolves.toEqual('foo2') + await expect(refresher.get()).resolves.toEqual('foo2') + }) + + it('only one call per refresh', async () => { + refreshFunction.mockImplementation(async () => { + await new Promise((resolve) => setTimeout(resolve, 100)) + return 'foo' + }) + + const promises: Promise[] = [] + + expect(refreshFunction).toHaveBeenCalledTimes(0) + promises.push(refresher.get()) + expect(refreshFunction).toHaveBeenCalledTimes(1) + promises.push(refresher.get()) + promises.push(refresher.get()) + expect(refreshFunction).toHaveBeenCalledTimes(1) + jest.runOnlyPendingTimers() + + expect(await Promise.all(promises)).toEqual(['foo', 'foo', 'foo']) + }) + + it('refreshes in the background', async () => { + let count = 1 + let timeAdavance = 0 + global.Date.now = jest.fn(() => realNow() + timeAdavance) + refresher = new BackgroundRefresher(refreshFunction, 10000) + + refreshFunction.mockImplementation(async () => { + await new Promise((r) => setTimeout(r, 1000)) + return 'foo' + count++ + }) + + // First time we need to trigger the timer before awaiting as it waits for the value + let response = refresher.get() + jest.runOnlyPendingTimers() + await expect(response).resolves.toEqual('foo1') + await expect(refresher.get()).resolves.toEqual('foo1') + expect(refreshFunction).toHaveBeenCalledTimes(1) + // Advance time forward by more than the refresh interval + timeAdavance = 10000 + 1000 + // This will trigger the background refresh + response = refresher.get() + // which we resolve the timeout for + jest.runOnlyPendingTimers() + // the original call gets the old value as it doesn't wait + await expect(response).resolves.toEqual('foo1') + expect(refreshFunction).toHaveBeenCalledTimes(2) + // the next call gets the new value + await expect(refresher.get()).resolves.toEqual('foo2') + expect(refreshFunction).toHaveBeenCalledTimes(2) + }) +})