diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index 6d551221340..ffbc5839039 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -33,6 +33,8 @@ env: OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password' OBJECT_STORAGE_SESSION_RECORDING_FOLDER: 'session_recordings' OBJECT_STORAGE_BUCKET: 'posthog' + # set the max buffer size small enough that the functional tests behave the same in CI as when running locally + SESSION_RECORDING_MAX_BUFFER_SIZE_KB: 1024 concurrency: group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }} diff --git a/plugin-server/.gitignore b/plugin-server/.gitignore index 854b302b961..4464e35ebcd 100644 --- a/plugin-server/.gitignore +++ b/plugin-server/.gitignore @@ -8,3 +8,4 @@ yalc.lock tmp *.0x coverage/ +.tmp/ diff --git a/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts b/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts new file mode 100644 index 00000000000..4cb4f598100 --- /dev/null +++ b/plugin-server/functional_tests/session-recordings-blob-ingestion.test.ts @@ -0,0 +1,150 @@ +import { GetObjectCommand, GetObjectCommandOutput, ListObjectsV2Command, S3Client } from '@aws-sdk/client-s3' +import fs from 'fs' +import { Consumer, Kafka, KafkaMessage, logLevel } from 'kafkajs' +import * as zlib from 'zlib' + +import { defaultConfig } from '../src/config/config' +import { compressToString } from '../src/main/ingestion-queues/session-recording/blob-ingester/utils' +import { getObjectStorage } from '../src/main/services/object_storage' +import { UUIDT } from '../src/utils/utils' +import { capture, createOrganization, createTeam } from './api' +import { waitForExpect } from './expectations' + +let kafka: Kafka +let organizationId: string + +let dlq: KafkaMessage[] +let dlqConsumer: Consumer + +let s3: S3Client + +function generateVeryLongString(length = 1025) { + return [...Array(length)].map(() => Math.random().toString(36)[2]).join('') +} + +beforeAll(async () => { + kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING }) + + dlq = [] + dlqConsumer = kafka.consumer({ groupId: 'session_recording_events_test' }) + await dlqConsumer.subscribe({ topic: 'session_recording_events_dlq' }) + await dlqConsumer.run({ + eachMessage: ({ message }) => { + dlq.push(message) + return Promise.resolve() + }, + }) + + organizationId = await createOrganization() + + const objectStorage = getObjectStorage({ + OBJECT_STORAGE_ENDPOINT: defaultConfig.OBJECT_STORAGE_ENDPOINT, + OBJECT_STORAGE_REGION: defaultConfig.OBJECT_STORAGE_REGION, + OBJECT_STORAGE_ACCESS_KEY_ID: defaultConfig.OBJECT_STORAGE_ACCESS_KEY_ID, + OBJECT_STORAGE_SECRET_ACCESS_KEY: defaultConfig.OBJECT_STORAGE_SECRET_ACCESS_KEY, + OBJECT_STORAGE_ENABLED: defaultConfig.OBJECT_STORAGE_ENABLED, + OBJECT_STORAGE_BUCKET: defaultConfig.OBJECT_STORAGE_BUCKET, + }) + if (!objectStorage) { + throw new Error('S3 not configured') + } + s3 = objectStorage.s3 +}) + +afterAll(async () => { + await Promise.all([await dlqConsumer.disconnect()]) +}) + +test.skip(`single recording event writes data to local tmp file`, async () => { + const teamId = await createTeam(organizationId) + const distinctId = new UUIDT().toString() + const uuid = new UUIDT().toString() + const sessionId = new UUIDT().toString() + const veryLongString = generateVeryLongString() + await capture({ + teamId, + distinctId, + uuid, + event: '$snapshot', + properties: { + $session_id: sessionId, + $window_id: 'abc1234', + $snapshot_data: { data: compressToString(veryLongString), chunk_count: 1 }, + }, + }) + + let tempFiles: string[] = [] + + await waitForExpect(async () => { + const files = await fs.promises.readdir(defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY) + tempFiles = files.filter((f) => f.startsWith(`${teamId}.${sessionId}`)) + expect(tempFiles.length).toBe(1) + }) + + await waitForExpect(async () => { + const currentFile = tempFiles[0] + + const fileContents = await fs.promises.readFile( + `${defaultConfig.SESSION_RECORDING_LOCAL_DIRECTORY}/${currentFile}`, + 'utf8' + ) + + expect(fileContents).toEqual(`{"window_id":"abc1234","data":"${veryLongString}"}\n`) + }) +}, 40000) + +test.skip(`multiple recording events writes compressed data to s3`, async () => { + const teamId = await createTeam(organizationId) + const distinctId = new UUIDT().toString() + const sessionId = new UUIDT().toString() + + // need to send enough data to trigger the s3 upload exactly once. + // with a buffer of 1024, an estimated gzip compression of 0.1, and 1025 default length for generateAVeryLongString + // we need 25,000 events. + // if any of those things change then the number of events probably needs to change too + const captures = Array.from({ length: 25000 }).map(() => { + return capture({ + teamId, + distinctId, + uuid: new UUIDT().toString(), + event: '$snapshot', + properties: { + $session_id: sessionId, + $window_id: 'abc1234', + $snapshot_data: { data: compressToString(generateVeryLongString()), chunk_count: 1 }, + }, + }) + }) + await Promise.all(captures) + + await waitForExpect(async () => { + const s3Files = await s3.send( + new ListObjectsV2Command({ + Bucket: defaultConfig.OBJECT_STORAGE_BUCKET, + Prefix: `${defaultConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${teamId}/session_id/${sessionId}`, + }) + ) + expect(s3Files.Contents?.length).toBe(1) + + const s3File = s3Files.Contents?.[0] + if (!s3File) { + throw new Error('No s3File') + } + const s3FileContents: GetObjectCommandOutput = await s3.send( + new GetObjectCommand({ + Bucket: defaultConfig.OBJECT_STORAGE_BUCKET, + Key: s3File.Key, + }) + ) + const fileStream = await s3FileContents.Body?.transformToByteArray() + if (!fileStream) { + throw new Error('No fileStream') + } + const text = zlib.gunzipSync(fileStream).toString().trim() + // text contains JSON for { + // "window_id": "abc1234", + // "data": "random...string" // thousands of characters + // } + expect(text).toMatch(/{"window_id":"abc1234","data":"\w+"}/) + }) +}, 20000) diff --git a/plugin-server/functional_tests/session-recordings.test.ts b/plugin-server/functional_tests/session-recordings.test.ts index 7f4d43c649d..39903eb68fe 100644 --- a/plugin-server/functional_tests/session-recordings.test.ts +++ b/plugin-server/functional_tests/session-recordings.test.ts @@ -54,6 +54,7 @@ test.concurrent( const teamId = await createTeam(organizationId) const distinctId = new UUIDT().toString() const uuid = new UUIDT().toString() + const sessionId = new UUIDT().toString() await capture({ teamId, @@ -61,7 +62,7 @@ test.concurrent( uuid, event: '$snapshot', properties: { - $session_id: '1234abc', + $session_id: sessionId, $window_id: 'abc1234', $snapshot_data: 'yes way', }, @@ -84,7 +85,7 @@ test.concurrent( has_full_snapshot: 0, keypress_count: 0, last_event_timestamp: null, - session_id: '1234abc', + session_id: sessionId, snapshot_data: 'yes way', team_id: teamId, timestamp: expect.any(String), diff --git a/plugin-server/package.json b/plugin-server/package.json index efab69aa11c..2babe7e1ff9 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -35,6 +35,8 @@ "repository": "https://github.com/PostHog/posthog-plugin-server", "license": "MIT", "dependencies": { + "@aws-sdk/client-s3": "^3.315.0", + "@aws-sdk/lib-storage": "^3.315.0", "@babel/core": "^7.18.10", "@babel/plugin-transform-react-jsx": "^7.18.10", "@babel/preset-env": "^7.18.10", diff --git a/plugin-server/pnpm-lock.yaml b/plugin-server/pnpm-lock.yaml index 6a8e1de5066..33dd4832a08 100644 --- a/plugin-server/pnpm-lock.yaml +++ b/plugin-server/pnpm-lock.yaml @@ -2,6 +2,8 @@ lockfileVersion: 5.4 specifiers: 0x: ^5.5.0 + '@aws-sdk/client-s3': ^3.315.0 + '@aws-sdk/lib-storage': ^3.315.0 '@babel/cli': ^7.18.10 '@babel/core': ^7.18.10 '@babel/plugin-transform-react-jsx': ^7.18.10 @@ -94,6 +96,8 @@ specifiers: vm2: 3.9.11 dependencies: + '@aws-sdk/client-s3': 3.315.0 + '@aws-sdk/lib-storage': 3.315.0_@aws-sdk+client-s3@3.315.0 '@babel/core': 7.20.2 '@babel/plugin-transform-react-jsx': 7.19.0_@babel+core@7.20.2 '@babel/preset-env': 7.20.2_@babel+core@7.20.2 @@ -238,6 +242,971 @@ packages: resolution: {integrity: sha512-H71nDOOL8Y7kWRLqf6Sums+01Q5msqBW2KhDUTemh1tvY04eSkSXrK0uj/4mmY0Xr16/3zyZmsrxN7CKuRbNRg==} dev: false + /@aws-crypto/crc32/3.0.0: + resolution: {integrity: sha512-IzSgsrxUcsrejQbPVilIKy16kAT52EwB6zSaI+M3xxIhKh5+aldEyvI+z6erM7TCLB2BJsFrtHjp6/4/sr+3dA==} + dependencies: + '@aws-crypto/util': 3.0.0 + '@aws-sdk/types': 3.310.0 + tslib: 1.14.1 + dev: false + + /@aws-crypto/crc32c/3.0.0: + resolution: {integrity: sha512-ENNPPManmnVJ4BTXlOjAgD7URidbAznURqD0KvfREyc4o20DPYdEldU1f5cQ7Jbj0CJJSPaMIk/9ZshdB3210w==} + dependencies: + '@aws-crypto/util': 3.0.0 + '@aws-sdk/types': 3.310.0 + tslib: 1.14.1 + dev: false + + /@aws-crypto/ie11-detection/3.0.0: + resolution: {integrity: sha512-341lBBkiY1DfDNKai/wXM3aujNBkXR7tq1URPQDL9wi3AUbI80NR74uF1TXHMm7po1AcnFk8iu2S2IeU/+/A+Q==} + dependencies: + tslib: 1.14.1 + dev: false + + /@aws-crypto/sha1-browser/3.0.0: + resolution: {integrity: sha512-NJth5c997GLHs6nOYTzFKTbYdMNA6/1XlKVgnZoaZcQ7z7UJlOgj2JdbHE8tiYLS3fzXNCguct77SPGat2raSw==} + dependencies: + '@aws-crypto/ie11-detection': 3.0.0 + '@aws-crypto/supports-web-crypto': 3.0.0 + '@aws-crypto/util': 3.0.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-locate-window': 3.310.0 + '@aws-sdk/util-utf8-browser': 3.259.0 + tslib: 1.14.1 + dev: false + + /@aws-crypto/sha256-browser/3.0.0: + resolution: {integrity: sha512-8VLmW2B+gjFbU5uMeqtQM6Nj0/F1bro80xQXCW6CQBWgosFWXTx77aeOF5CAIAmbOK64SdMBJdNr6J41yP5mvQ==} + dependencies: + '@aws-crypto/ie11-detection': 3.0.0 + '@aws-crypto/sha256-js': 3.0.0 + '@aws-crypto/supports-web-crypto': 3.0.0 + '@aws-crypto/util': 3.0.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-locate-window': 3.310.0 + '@aws-sdk/util-utf8-browser': 3.259.0 + tslib: 1.14.1 + dev: false + + /@aws-crypto/sha256-js/3.0.0: + resolution: {integrity: sha512-PnNN7os0+yd1XvXAy23CFOmTbMaDxgxXtTKHybrJ39Y8kGzBATgBFibWJKH6BhytLI/Zyszs87xCOBNyBig6vQ==} + dependencies: + '@aws-crypto/util': 3.0.0 + '@aws-sdk/types': 3.310.0 + tslib: 1.14.1 + dev: false + + /@aws-crypto/supports-web-crypto/3.0.0: + resolution: {integrity: sha512-06hBdMwUAb2WFTuGG73LSC0wfPu93xWwo5vL2et9eymgmu3Id5vFAHBbajVWiGhPO37qcsdCap/FqXvJGJWPIg==} + dependencies: + tslib: 1.14.1 + dev: false + + /@aws-crypto/util/3.0.0: + resolution: {integrity: sha512-2OJlpeJpCR48CC8r+uKVChzs9Iungj9wkZrl8Z041DWEWvyIHILYKCPNzJghKsivj+S3mLo6BVc7mBNzdxA46w==} + dependencies: + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-utf8-browser': 3.259.0 + tslib: 1.14.1 + dev: false + + /@aws-sdk/abort-controller/3.310.0: + resolution: {integrity: sha512-v1zrRQxDLA1MdPim159Vx/CPHqsB4uybSxRi1CnfHO5ZjHryx3a5htW2gdGAykVCul40+yJXvfpufMrELVxH+g==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/chunked-blob-reader/3.310.0: + resolution: {integrity: sha512-CrJS3exo4mWaLnWxfCH+w88Ou0IcAZSIkk4QbmxiHl/5Dq705OLoxf4385MVyExpqpeVJYOYQ2WaD8i/pQZ2fg==} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/client-s3/3.315.0: + resolution: {integrity: sha512-sE2pCFNrhkn1XdqkHx1GEd4eKg/kITk2zHETpkQCUMAVZ1MDuY/uUZzRjbAn9sm9EsJ03Z/vOuK4DkxlLFY+8g==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-crypto/sha1-browser': 3.0.0 + '@aws-crypto/sha256-browser': 3.0.0 + '@aws-crypto/sha256-js': 3.0.0 + '@aws-sdk/client-sts': 3.315.0 + '@aws-sdk/config-resolver': 3.310.0 + '@aws-sdk/credential-provider-node': 3.315.0 + '@aws-sdk/eventstream-serde-browser': 3.310.0 + '@aws-sdk/eventstream-serde-config-resolver': 3.310.0 + '@aws-sdk/eventstream-serde-node': 3.310.0 + '@aws-sdk/fetch-http-handler': 3.310.0 + '@aws-sdk/hash-blob-browser': 3.310.0 + '@aws-sdk/hash-node': 3.310.0 + '@aws-sdk/hash-stream-node': 3.310.0 + '@aws-sdk/invalid-dependency': 3.310.0 + '@aws-sdk/md5-js': 3.310.0 + '@aws-sdk/middleware-bucket-endpoint': 3.310.0 + '@aws-sdk/middleware-content-length': 3.310.0 + '@aws-sdk/middleware-endpoint': 3.310.0 + '@aws-sdk/middleware-expect-continue': 3.310.0 + '@aws-sdk/middleware-flexible-checksums': 3.310.0 + '@aws-sdk/middleware-host-header': 3.310.0 + '@aws-sdk/middleware-location-constraint': 3.310.0 + '@aws-sdk/middleware-logger': 3.310.0 + '@aws-sdk/middleware-recursion-detection': 3.310.0 + '@aws-sdk/middleware-retry': 3.310.0 + '@aws-sdk/middleware-sdk-s3': 3.310.0 + '@aws-sdk/middleware-serde': 3.310.0 + '@aws-sdk/middleware-signing': 3.310.0 + '@aws-sdk/middleware-ssec': 3.310.0 + '@aws-sdk/middleware-stack': 3.310.0 + '@aws-sdk/middleware-user-agent': 3.310.0 + '@aws-sdk/node-config-provider': 3.310.0 + '@aws-sdk/node-http-handler': 3.310.0 + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/signature-v4-multi-region': 3.310.0 + '@aws-sdk/smithy-client': 3.315.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/url-parser': 3.310.0 + '@aws-sdk/util-base64': 3.310.0 + '@aws-sdk/util-body-length-browser': 3.310.0 + '@aws-sdk/util-body-length-node': 3.310.0 + '@aws-sdk/util-defaults-mode-browser': 3.315.0 + '@aws-sdk/util-defaults-mode-node': 3.315.0 + '@aws-sdk/util-endpoints': 3.310.0 + '@aws-sdk/util-retry': 3.310.0 + '@aws-sdk/util-stream-browser': 3.310.0 + '@aws-sdk/util-stream-node': 3.310.0 + '@aws-sdk/util-user-agent-browser': 3.310.0 + '@aws-sdk/util-user-agent-node': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + '@aws-sdk/util-waiter': 3.310.0 + '@aws-sdk/xml-builder': 3.310.0 + fast-xml-parser: 4.1.2 + tslib: 2.5.0 + transitivePeerDependencies: + - '@aws-sdk/signature-v4-crt' + - aws-crt + dev: false + + /@aws-sdk/client-sso-oidc/3.315.0: + resolution: {integrity: sha512-OJgtmx6SpCWHBDCxBBi36Ro44uCqZBufGkThP/PVYrgVnRVnJ4V18d2wNGKmS37zKmCHHJPnhMPlGOgE2qyVPQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-crypto/sha256-browser': 3.0.0 + '@aws-crypto/sha256-js': 3.0.0 + '@aws-sdk/config-resolver': 3.310.0 + '@aws-sdk/fetch-http-handler': 3.310.0 + '@aws-sdk/hash-node': 3.310.0 + '@aws-sdk/invalid-dependency': 3.310.0 + '@aws-sdk/middleware-content-length': 3.310.0 + '@aws-sdk/middleware-endpoint': 3.310.0 + '@aws-sdk/middleware-host-header': 3.310.0 + '@aws-sdk/middleware-logger': 3.310.0 + '@aws-sdk/middleware-recursion-detection': 3.310.0 + '@aws-sdk/middleware-retry': 3.310.0 + '@aws-sdk/middleware-serde': 3.310.0 + '@aws-sdk/middleware-stack': 3.310.0 + '@aws-sdk/middleware-user-agent': 3.310.0 + '@aws-sdk/node-config-provider': 3.310.0 + '@aws-sdk/node-http-handler': 3.310.0 + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/smithy-client': 3.315.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/url-parser': 3.310.0 + '@aws-sdk/util-base64': 3.310.0 + '@aws-sdk/util-body-length-browser': 3.310.0 + '@aws-sdk/util-body-length-node': 3.310.0 + '@aws-sdk/util-defaults-mode-browser': 3.315.0 + '@aws-sdk/util-defaults-mode-node': 3.315.0 + '@aws-sdk/util-endpoints': 3.310.0 + '@aws-sdk/util-retry': 3.310.0 + '@aws-sdk/util-user-agent-browser': 3.310.0 + '@aws-sdk/util-user-agent-node': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + transitivePeerDependencies: + - aws-crt + dev: false + + /@aws-sdk/client-sso/3.315.0: + resolution: {integrity: sha512-P3QOOyHQER7EDVCzXOsAaJE2p/qfdsSFsYv8k2S8LqEKGH0fViQ4Ph540uKlmaOt1kEhwH1wI6cLRMJJX9XV4Q==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-crypto/sha256-browser': 3.0.0 + '@aws-crypto/sha256-js': 3.0.0 + '@aws-sdk/config-resolver': 3.310.0 + '@aws-sdk/fetch-http-handler': 3.310.0 + '@aws-sdk/hash-node': 3.310.0 + '@aws-sdk/invalid-dependency': 3.310.0 + '@aws-sdk/middleware-content-length': 3.310.0 + '@aws-sdk/middleware-endpoint': 3.310.0 + '@aws-sdk/middleware-host-header': 3.310.0 + '@aws-sdk/middleware-logger': 3.310.0 + '@aws-sdk/middleware-recursion-detection': 3.310.0 + '@aws-sdk/middleware-retry': 3.310.0 + '@aws-sdk/middleware-serde': 3.310.0 + '@aws-sdk/middleware-stack': 3.310.0 + '@aws-sdk/middleware-user-agent': 3.310.0 + '@aws-sdk/node-config-provider': 3.310.0 + '@aws-sdk/node-http-handler': 3.310.0 + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/smithy-client': 3.315.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/url-parser': 3.310.0 + '@aws-sdk/util-base64': 3.310.0 + '@aws-sdk/util-body-length-browser': 3.310.0 + '@aws-sdk/util-body-length-node': 3.310.0 + '@aws-sdk/util-defaults-mode-browser': 3.315.0 + '@aws-sdk/util-defaults-mode-node': 3.315.0 + '@aws-sdk/util-endpoints': 3.310.0 + '@aws-sdk/util-retry': 3.310.0 + '@aws-sdk/util-user-agent-browser': 3.310.0 + '@aws-sdk/util-user-agent-node': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + transitivePeerDependencies: + - aws-crt + dev: false + + /@aws-sdk/client-sts/3.315.0: + resolution: {integrity: sha512-e34plg6m0hScADIPiu5kCKoiJVXRLRiAuens+iwMse0oPUmrv41hdjgufwWGA/pcNkEGzMdVS88Z4khxB3LHBw==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-crypto/sha256-browser': 3.0.0 + '@aws-crypto/sha256-js': 3.0.0 + '@aws-sdk/config-resolver': 3.310.0 + '@aws-sdk/credential-provider-node': 3.315.0 + '@aws-sdk/fetch-http-handler': 3.310.0 + '@aws-sdk/hash-node': 3.310.0 + '@aws-sdk/invalid-dependency': 3.310.0 + '@aws-sdk/middleware-content-length': 3.310.0 + '@aws-sdk/middleware-endpoint': 3.310.0 + '@aws-sdk/middleware-host-header': 3.310.0 + '@aws-sdk/middleware-logger': 3.310.0 + '@aws-sdk/middleware-recursion-detection': 3.310.0 + '@aws-sdk/middleware-retry': 3.310.0 + '@aws-sdk/middleware-sdk-sts': 3.310.0 + '@aws-sdk/middleware-serde': 3.310.0 + '@aws-sdk/middleware-signing': 3.310.0 + '@aws-sdk/middleware-stack': 3.310.0 + '@aws-sdk/middleware-user-agent': 3.310.0 + '@aws-sdk/node-config-provider': 3.310.0 + '@aws-sdk/node-http-handler': 3.310.0 + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/smithy-client': 3.315.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/url-parser': 3.310.0 + '@aws-sdk/util-base64': 3.310.0 + '@aws-sdk/util-body-length-browser': 3.310.0 + '@aws-sdk/util-body-length-node': 3.310.0 + '@aws-sdk/util-defaults-mode-browser': 3.315.0 + '@aws-sdk/util-defaults-mode-node': 3.315.0 + '@aws-sdk/util-endpoints': 3.310.0 + '@aws-sdk/util-retry': 3.310.0 + '@aws-sdk/util-user-agent-browser': 3.310.0 + '@aws-sdk/util-user-agent-node': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + fast-xml-parser: 4.1.2 + tslib: 2.5.0 + transitivePeerDependencies: + - aws-crt + dev: false + + /@aws-sdk/config-resolver/3.310.0: + resolution: {integrity: sha512-8vsT+/50lOqfDxka9m/rRt6oxv1WuGZoP8oPMk0Dt+TxXMbAzf4+rejBgiB96wshI1k3gLokYRjSQZn+dDtT8g==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-config-provider': 3.310.0 + '@aws-sdk/util-middleware': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/credential-provider-env/3.310.0: + resolution: {integrity: sha512-vvIPQpI16fj95xwS7M3D48F7QhZJBnnCgB5lR+b7So+vsG9ibm1mZRVGzVpdxCvgyOhHFbvrby9aalNJmmIP1A==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/credential-provider-imds/3.310.0: + resolution: {integrity: sha512-baxK7Zp6dai5AGW01FIW27xS2KAaPUmKLIXv5SvFYsUgXXvNW55im4uG3b+2gA0F7V+hXvVBH08OEqmwW6we5w==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/node-config-provider': 3.310.0 + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/url-parser': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/credential-provider-ini/3.315.0: + resolution: {integrity: sha512-TZbYNbQkNgANx3KsWmJEyBsnfUBq/XKqYYc/VQf1L4eI+GMUw2eKpNV0MTsyviViy2st7W4SiSgtsvXyeVp9xg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/credential-provider-env': 3.310.0 + '@aws-sdk/credential-provider-imds': 3.310.0 + '@aws-sdk/credential-provider-process': 3.310.0 + '@aws-sdk/credential-provider-sso': 3.315.0 + '@aws-sdk/credential-provider-web-identity': 3.310.0 + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/shared-ini-file-loader': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + transitivePeerDependencies: + - aws-crt + dev: false + + /@aws-sdk/credential-provider-node/3.315.0: + resolution: {integrity: sha512-OuzKAIg+xPAzBrb/Big5VKDpJmBhVR+N0Hfflrjj2BunQGWO7zxtkKFCz921MtP9ZunDV+UxzTpar8U5TAPtzA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/credential-provider-env': 3.310.0 + '@aws-sdk/credential-provider-imds': 3.310.0 + '@aws-sdk/credential-provider-ini': 3.315.0 + '@aws-sdk/credential-provider-process': 3.310.0 + '@aws-sdk/credential-provider-sso': 3.315.0 + '@aws-sdk/credential-provider-web-identity': 3.310.0 + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/shared-ini-file-loader': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + transitivePeerDependencies: + - aws-crt + dev: false + + /@aws-sdk/credential-provider-process/3.310.0: + resolution: {integrity: sha512-h73sg6GPMUWC+3zMCbA1nZ2O03nNJt7G96JdmnantiXBwHpRKWW8nBTLzx5uhXn6hTuTaoQRP/P+oxQJKYdMmA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/shared-ini-file-loader': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/credential-provider-sso/3.315.0: + resolution: {integrity: sha512-oMDGwT67cLgLiLEj5UwAiOVo7mb0l4vi2nk+5pgPMpC3cBlAfA0y1IJe4FHp+Vz52F0nvURZZbdWhX6RgMMaqQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/client-sso': 3.315.0 + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/shared-ini-file-loader': 3.310.0 + '@aws-sdk/token-providers': 3.315.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + transitivePeerDependencies: + - aws-crt + dev: false + + /@aws-sdk/credential-provider-web-identity/3.310.0: + resolution: {integrity: sha512-H4SzuZXILNhK6/IR1uVvsUDZvzc051hem7GLyYghBCu8mU+tq28YhKE8MfSroi6eL2e5Vujloij1OM2EQQkPkw==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/eventstream-codec/3.310.0: + resolution: {integrity: sha512-clIeSgWbZbxwtsxZ/yoedNM0/kJFSIjjHPikuDGhxhqc+vP6TN3oYyVMFrYwFaTFhk2+S5wZcWYMw8Op1pWo+A==} + dependencies: + '@aws-crypto/crc32': 3.0.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-hex-encoding': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/eventstream-serde-browser/3.310.0: + resolution: {integrity: sha512-3S6ziuQVALgEyz0TANGtYDVeG8ArK4Y05mcgrs8qUTmsvlDIXX37cR/DvmVbNB76M4IrsZeSAIajL9644CywkA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/eventstream-serde-universal': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/eventstream-serde-config-resolver/3.310.0: + resolution: {integrity: sha512-8s1Qdn9STj+sV75nUp9yt0W6fHS4BZ2jTm4Z/1Pcbvh2Gqs0WjH5n2StS+pDW5Y9J/HSGBl0ogmUr5lC5bXFHg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/eventstream-serde-node/3.310.0: + resolution: {integrity: sha512-kSnRomCgW43K9TmQYuwN9+AoYPnhyOKroanUMyZEzJk7rpCPMj4OzaUpXfDYOvznFNYn7NLaH6nHLJAr0VPlJA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/eventstream-serde-universal': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/eventstream-serde-universal/3.310.0: + resolution: {integrity: sha512-Qyjt5k/waV5cDukpgT824ISZAz5U0pwzLz5ztR409u85AGNkF/9n7MS+LSyBUBSb0WJ5pUeSD47WBk+nLq9Nhw==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/eventstream-codec': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/fetch-http-handler/3.310.0: + resolution: {integrity: sha512-Bi9vIwzdkw1zMcvi/zGzlWS9KfIEnAq4NNhsnCxbQ4OoIRU9wvU+WGZdBBhxg0ZxZmpp1j1aZhU53lLjA07MHw==} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/querystring-builder': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-base64': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/hash-blob-browser/3.310.0: + resolution: {integrity: sha512-OoR8p0cbypToysLT0v3o2oyjy6+DKrY7GNCAzHOHJK9xmqXCt+DsjKoPeiY7o1sWX2aN6Plmvubj/zWxMKEn/A==} + dependencies: + '@aws-sdk/chunked-blob-reader': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/hash-node/3.310.0: + resolution: {integrity: sha512-NvE2fhRc8GRwCXBfDehxVAWCmVwVMILliAKVPAEr4yz2CkYs0tqU51S48x23dtna07H4qHtgpeNqVTthcIQOEQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-buffer-from': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/hash-stream-node/3.310.0: + resolution: {integrity: sha512-ZoXdybNgvMz1Hl6k/e32xVL3jmG5p2IEk5mTtLfFEuskTJ74Z+VMYKkkF1whyy7KQfH83H+TQGnsGtlRCchQKw==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/invalid-dependency/3.310.0: + resolution: {integrity: sha512-1s5RG5rSPXoa/aZ/Kqr5U/7lqpx+Ry81GprQ2bxWqJvWQIJ0IRUwo5pk8XFxbKVr/2a+4lZT/c3OGoBOM1yRRA==} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/is-array-buffer/3.310.0: + resolution: {integrity: sha512-urnbcCR+h9NWUnmOtet/s4ghvzsidFmspfhYaHAmSRdy9yDjdjBJMFjjsn85A1ODUktztm+cVncXjQ38WCMjMQ==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/lib-storage/3.315.0_@aws-sdk+client-s3@3.315.0: + resolution: {integrity: sha512-woIiR5PlTlJwjlkgUDS7YPKLq+1sdIQGkpAWkA8UXuWvZwvAQCT0KD5JXa8RMmta6TJqHm4lRg6VGIkp2Z6+ww==} + engines: {node: '>=14.0.0'} + peerDependencies: + '@aws-sdk/abort-controller': ^3.0.0 + '@aws-sdk/client-s3': ^3.0.0 + dependencies: + '@aws-sdk/client-s3': 3.315.0 + '@aws-sdk/middleware-endpoint': 3.310.0 + '@aws-sdk/smithy-client': 3.315.0 + buffer: 5.6.0 + events: 3.3.0 + stream-browserify: 3.0.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/md5-js/3.310.0: + resolution: {integrity: sha512-x5sRBUrEfLWAS1EhwbbDQ7cXq6uvBxh3qR2XAsnGvFFceTeAadk7cVogWxlk3PC+OCeeym7c3/6Bv2HQ2f1YyQ==} + dependencies: + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-bucket-endpoint/3.310.0: + resolution: {integrity: sha512-uJJfHI7v4AgbJZRLtyI8ap2QRWkBokGc3iyUoQ+dVNT3/CE2ZCu694A6W+H0dRqg79dIE+f9CRNdtLGa/Ehhvg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-arn-parser': 3.310.0 + '@aws-sdk/util-config-provider': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-content-length/3.310.0: + resolution: {integrity: sha512-P8tQZxgDt6CAh1wd/W6WPzjc+uWPJwQkm+F7rAwRlM+k9q17HrhnksGDKcpuuLyIhPQYdmOMIkpKVgXGa4avhQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-endpoint/3.310.0: + resolution: {integrity: sha512-Z+N2vOL8K354/lstkClxLLsr6hCpVRh+0tCMXrVj66/NtKysCEZ/0b9LmqOwD9pWHNiI2mJqXwY0gxNlKAroUg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/middleware-serde': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/url-parser': 3.310.0 + '@aws-sdk/util-middleware': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-expect-continue/3.310.0: + resolution: {integrity: sha512-l3d1z2gt+gINJDnPSyu84IxfzjzPfCQrqC1sunw2cZGo/sXtEiq698Q3SiTcO2PGP4LBQAy2RHb5wVBJP708CQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-flexible-checksums/3.310.0: + resolution: {integrity: sha512-5ndnLgzgGVpWkmHBAiYkagHqiSuow8q62J4J6E2PzaQ77+fm8W3nfdy7hK5trHokEyouCZdxT/XK/IRhgj/4PA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-crypto/crc32': 3.0.0 + '@aws-crypto/crc32c': 3.0.0 + '@aws-sdk/is-array-buffer': 3.310.0 + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-host-header/3.310.0: + resolution: {integrity: sha512-QWSA+46/hXorXyWa61ic2K7qZzwHTiwfk2e9mRRjeIRepUgI3qxFjsYqrWtrOGBjmFmq0pYIY8Bb/DCJuQqcoA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-location-constraint/3.310.0: + resolution: {integrity: sha512-LFm0JTQWwTPWL/tZU2wsQTl8J5PpDEkXjEhaXVKamtyH0xhysRqd+0n92n65dc8oztAuQkb9xUbErGn5b6gsew==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-logger/3.310.0: + resolution: {integrity: sha512-Lurm8XofrASBRnAVtiSNuDSRsRqPNg27RIFLLsLp/pqog9nFJ0vz0kgdb9S5Z+zw83Mm+UlqOe6D8NTUNp4fVg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-recursion-detection/3.310.0: + resolution: {integrity: sha512-SuB75/xk/gyue24gkriTwO2jFd7YcUGZDClQYuRejgbXSa3CO0lWyawQtfLcSSEBp9izrEVXuFH24K1eAft5nQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-retry/3.310.0: + resolution: {integrity: sha512-oTPsRy2W4s+dfxbJPW7Km+hHtv/OMsNsVfThAq8DDYKC13qlr1aAyOqGLD+dpBy2aKe7ss517Sy2HcHtHqm7/g==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/service-error-classification': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-middleware': 3.310.0 + '@aws-sdk/util-retry': 3.310.0 + tslib: 2.5.0 + uuid: 8.3.2 + dev: false + + /@aws-sdk/middleware-sdk-s3/3.310.0: + resolution: {integrity: sha512-QK9x9g2ksg0hOjjYgqddeFcn5ctUEGdxJVu4OumPXceulefMcSO2jyH2qTybYSA93nqNQFdFmg5wQfvIRUWFCQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-arn-parser': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-sdk-sts/3.310.0: + resolution: {integrity: sha512-+5PFwlYNLvLLIfw0ASAoWV/iIF8Zv6R6QGtyP0CclhRSvNjgbQDVnV0g95MC5qvh+GB/Yjlkt8qAjLSPjHfsrQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/middleware-signing': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-serde/3.310.0: + resolution: {integrity: sha512-RNeeTVWSLTaentUeCgQKZhAl+C6hxtwD78cQWS10UymWpQFwbaxztzKUu4UQS5xA2j6PxwPRRUjqa4jcFjfLsg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-signing/3.310.0: + resolution: {integrity: sha512-f9mKq+XMdW207Af3hKjdTnpNhdtwqWuvFs/ZyXoOkp/g1MY1O6L23Jy6i52m29LxbT4AuNRG1oKODfXM0vYVjQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/signature-v4': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-middleware': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-ssec/3.310.0: + resolution: {integrity: sha512-CnEwNKVpd5bXnrCKPaePF8mWTA9ET21OMBb54y9b0fd8K02zoOcdBz4DWfh1SjFD4HkgCdja4egd8l2ivyvqmw==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-stack/3.310.0: + resolution: {integrity: sha512-010O1PD+UAcZVKRvqEusE1KJqN96wwrf6QsqbRM0ywsKQ21NDweaHvEDlds2VHpgmofxkRLRu/IDrlPkKRQrRg==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/middleware-user-agent/3.310.0: + resolution: {integrity: sha512-x3IOwSwSbwKidlxRk3CNVHVUb06SRuaELxggCaR++QVI8NU6qD/l4VHXKVRvbTHiC/cYxXE/GaBDgQVpDR7V/g==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-endpoints': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/node-config-provider/3.310.0: + resolution: {integrity: sha512-T/Pp6htc6hq/Cq+MLNDSyiwWCMVF6GqbBbXKVlO5L8rdHx4sq9xPdoPveZhGWrxvkanjA6eCwUp6E0riBOSVng==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/shared-ini-file-loader': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/node-http-handler/3.310.0: + resolution: {integrity: sha512-irv9mbcM9xC2xYjArQF5SYmHBMu4ciMWtGsoHII1nRuFOl9FoT4ffTvEPuLlfC6pznzvKt9zvnm6xXj7gDChKg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/abort-controller': 3.310.0 + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/querystring-builder': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/property-provider/3.310.0: + resolution: {integrity: sha512-3lxDb0akV6BBzmFe4nLPaoliQbAifyWJhuvuDOu7e8NzouvpQXs0275w9LePhhcgjKAEVXUIse05ZW2DLbxo/g==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/protocol-http/3.310.0: + resolution: {integrity: sha512-fgZ1aw/irQtnrsR58pS8ThKOWo57Py3xX6giRvwSgZDEcxHfVzuQjy9yPuV++v04fdmdtgpbGf8WfvAAJ11yXQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/querystring-builder/3.310.0: + resolution: {integrity: sha512-ZHH8GV/80+pWGo7DzsvwvXR5xVxUHXUvPJPFAkhr6nCf78igdoF8gR10ScFoEKbtEapoNTaZlKHPXxpD8aPG7A==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-uri-escape': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/querystring-parser/3.310.0: + resolution: {integrity: sha512-YkIznoP6lsiIUHinx++/lbb3tlMURGGqMpo0Pnn32zYzGrJXA6eC3D0as2EcMjo55onTfuLcIiX4qzXes2MYOA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/service-error-classification/3.310.0: + resolution: {integrity: sha512-PuyC7k3qfIKeH2LCnDwbttMOKq3qAx4buvg0yfnJtQOz6t1AR8gsnAq0CjKXXyfkXwNKWTqCpE6lVNUIkXgsMw==} + engines: {node: '>=14.0.0'} + dev: false + + /@aws-sdk/shared-ini-file-loader/3.310.0: + resolution: {integrity: sha512-N0q9pG0xSjQwc690YQND5bofm+4nfUviQ/Ppgan2kU6aU0WUq8KwgHJBto/YEEI+VlrME30jZJnxtOvcZJc2XA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/signature-v4-multi-region/3.310.0: + resolution: {integrity: sha512-q8W+RIomTS/q85Ntgks/CoDElwqkC9+4OCicee5YznNHjQ4gtNWhUkYIyIRWRmXa/qx/AUreW9DM8FAecCOdng==} + engines: {node: '>=14.0.0'} + peerDependencies: + '@aws-sdk/signature-v4-crt': ^3.118.0 + peerDependenciesMeta: + '@aws-sdk/signature-v4-crt': + optional: true + dependencies: + '@aws-sdk/protocol-http': 3.310.0 + '@aws-sdk/signature-v4': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/signature-v4/3.310.0: + resolution: {integrity: sha512-1M60P1ZBNAjCFv9sYW29OF6okktaeibWyW3lMXqzoHF70lHBZh+838iUchznXUA5FLabfn4jBFWMRxlAXJUY2Q==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/is-array-buffer': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-hex-encoding': 3.310.0 + '@aws-sdk/util-middleware': 3.310.0 + '@aws-sdk/util-uri-escape': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/smithy-client/3.315.0: + resolution: {integrity: sha512-qTm0lwTh6IZMiWs3U9k2veoF6gV9yE0B9Z34yMxagOfQFQgxMih0aiH25MD25eRigjJ3sfUeZ+B0mRycmJZdkQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/middleware-stack': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/token-providers/3.315.0: + resolution: {integrity: sha512-EjLUQ9JLqU3eJfJyzpcVjFnuJ1MCCodZaVJmuX/a/as4TK41bKMvkVojjsU7pDSYzl+tuXE+ceivcWK4H0HQdQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/client-sso-oidc': 3.315.0 + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/shared-ini-file-loader': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + transitivePeerDependencies: + - aws-crt + dev: false + + /@aws-sdk/types/3.310.0: + resolution: {integrity: sha512-j8eamQJ7YcIhw7fneUfs8LYl3t01k4uHi4ZDmNRgtbmbmTTG3FZc2MotStZnp3nZB6vLiPF1o5aoJxWVvkzS6A==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/url-parser/3.310.0: + resolution: {integrity: sha512-mCLnCaSB9rQvAgx33u0DujLvr4d5yEm/W5r789GblwwQnlNXedVu50QRizMLTpltYWyAUoXjJgQnJHmJMaKXhw==} + dependencies: + '@aws-sdk/querystring-parser': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-arn-parser/3.310.0: + resolution: {integrity: sha512-jL8509owp/xB9+Or0pvn3Fe+b94qfklc2yPowZZIFAkFcCSIdkIglz18cPDWnYAcy9JGewpMS1COXKIUhZkJsA==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-base64/3.310.0: + resolution: {integrity: sha512-v3+HBKQvqgdzcbL+pFswlx5HQsd9L6ZTlyPVL2LS9nNXnCcR3XgGz9jRskikRUuUvUXtkSG1J88GAOnJ/apTPg==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/util-buffer-from': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-body-length-browser/3.310.0: + resolution: {integrity: sha512-sxsC3lPBGfpHtNTUoGXMQXLwjmR0zVpx0rSvzTPAuoVILVsp5AU/w5FphNPxD5OVIjNbZv9KsKTuvNTiZjDp9g==} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-body-length-node/3.310.0: + resolution: {integrity: sha512-2tqGXdyKhyA6w4zz7UPoS8Ip+7sayOg9BwHNidiGm2ikbDxm1YrCfYXvCBdwaJxa4hJfRVz+aL9e+d3GqPI9pQ==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-buffer-from/3.310.0: + resolution: {integrity: sha512-i6LVeXFtGih5Zs8enLrt+ExXY92QV25jtEnTKHsmlFqFAuL3VBeod6boeMXkN2p9lbSVVQ1sAOOYZOHYbYkntw==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/is-array-buffer': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-config-provider/3.310.0: + resolution: {integrity: sha512-xIBaYo8dwiojCw8vnUcIL4Z5tyfb1v3yjqyJKJWV/dqKUFOOS0U591plmXbM+M/QkXyML3ypon1f8+BoaDExrg==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-defaults-mode-browser/3.315.0: + resolution: {integrity: sha512-5cqNvfGos3FB/MHNl+g2fr+tPY7s3k3+96V3wOPWLOksdACth10OxPpHfboXXZDHHkR0hmyJwJcfgA4uQrUcGg==} + engines: {node: '>= 10.0.0'} + dependencies: + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/types': 3.310.0 + bowser: 2.11.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-defaults-mode-node/3.315.0: + resolution: {integrity: sha512-vSPIGpzh6NJIMLoh31p7CczSatN46kJdJBrHfODHaIGe4t156x+LfkkcxGQhtifqxglhL7l+fmn5D1fM5exHuA==} + engines: {node: '>= 10.0.0'} + dependencies: + '@aws-sdk/config-resolver': 3.310.0 + '@aws-sdk/credential-provider-imds': 3.310.0 + '@aws-sdk/node-config-provider': 3.310.0 + '@aws-sdk/property-provider': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-endpoints/3.310.0: + resolution: {integrity: sha512-zG+/d/O5KPmAaeOMPd6bW1abifdT0H03f42keLjYEoRZzYtHPC5DuPE0UayiWGckI6BCDgy0sRKXCYS49UNFaQ==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-hex-encoding/3.310.0: + resolution: {integrity: sha512-sVN7mcCCDSJ67pI1ZMtk84SKGqyix6/0A1Ab163YKn+lFBQRMKexleZzpYzNGxYzmQS6VanP/cfU7NiLQOaSfA==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-locate-window/3.310.0: + resolution: {integrity: sha512-qo2t/vBTnoXpjKxlsC2e1gBrRm80M3bId27r0BRB2VniSSe7bL1mmzM+/HFtujm0iAxtPM+aLEflLJlJeDPg0w==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-middleware/3.310.0: + resolution: {integrity: sha512-FTSUKL/eRb9X6uEZClrTe27QFXUNNp7fxYrPndZwk1hlaOP5ix+MIHBcI7pIiiY/JPfOUmPyZOu+HetlFXjWog==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-retry/3.310.0: + resolution: {integrity: sha512-FwWGhCBLfoivTMUHu1LIn4NjrN9JLJ/aX5aZmbcPIOhZVFJj638j0qDgZXyfvVqBuBZh7M8kGq0Oahy3dp69OA==} + engines: {node: '>= 14.0.0'} + dependencies: + '@aws-sdk/service-error-classification': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-stream-browser/3.310.0: + resolution: {integrity: sha512-bysXZHwFwvbqOTCScCdCnoLk1K3GCo0HRIYEZuL7O7MHrQmfaYRXcaft/p22+GUv9VeFXS/eJJZ5r4u32az94w==} + dependencies: + '@aws-sdk/fetch-http-handler': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-base64': 3.310.0 + '@aws-sdk/util-hex-encoding': 3.310.0 + '@aws-sdk/util-utf8': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-stream-node/3.310.0: + resolution: {integrity: sha512-hueAXFK0GVvnfYFgqbF7587xZfMZff5jlIFZOHqx7XVU7bl7qrRUCnphHk8H6yZ7RoQbDPcfmHJgtEoAJg1T1Q==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/node-http-handler': 3.310.0 + '@aws-sdk/types': 3.310.0 + '@aws-sdk/util-buffer-from': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-uri-escape/3.310.0: + resolution: {integrity: sha512-drzt+aB2qo2LgtDoiy/3sVG8w63cgLkqFIa2NFlGpUgHFWTXkqtbgf4L5QdjRGKWhmZsnqkbtL7vkSWEcYDJ4Q==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-user-agent-browser/3.310.0: + resolution: {integrity: sha512-yU/4QnHHuQ5z3vsUqMQVfYLbZGYwpYblPiuZx4Zo9+x0PBkNjYMqctdDcrpoH9Z2xZiDN16AmQGK1tix117ZKw==} + dependencies: + '@aws-sdk/types': 3.310.0 + bowser: 2.11.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-user-agent-node/3.310.0: + resolution: {integrity: sha512-Ra3pEl+Gn2BpeE7KiDGpi4zj7WJXZA5GXnGo3mjbi9+Y3zrbuhJAbdZO3mO/o7xDgMC6ph4xCTbaSGzU6b6EDg==} + engines: {node: '>=14.0.0'} + peerDependencies: + aws-crt: '>=1.0.0' + peerDependenciesMeta: + aws-crt: + optional: true + dependencies: + '@aws-sdk/node-config-provider': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-utf8-browser/3.259.0: + resolution: {integrity: sha512-UvFa/vR+e19XookZF8RzFZBrw2EUkQWxiBW0yYQAhvk3C+QVGl0H3ouca8LDBlBfQKXwmW3huo/59H8rwb1wJw==} + dependencies: + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-utf8/3.310.0: + resolution: {integrity: sha512-DnLfFT8uCO22uOJc0pt0DsSNau1GTisngBCDw8jQuWT5CqogMJu4b/uXmwEqfj8B3GX6Xsz8zOd6JpRlPftQoA==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/util-buffer-from': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/util-waiter/3.310.0: + resolution: {integrity: sha512-AV5j3guH/Y4REu+Qh3eXQU9igljHuU4XjX2sADAgf54C0kkhcCCkkiuzk3IsX089nyJCqIcj5idbjdvpnH88Vw==} + engines: {node: '>=14.0.0'} + dependencies: + '@aws-sdk/abort-controller': 3.310.0 + '@aws-sdk/types': 3.310.0 + tslib: 2.5.0 + dev: false + + /@aws-sdk/xml-builder/3.310.0: + resolution: {integrity: sha512-TqELu4mOuSIKQCqj63fGVs86Yh+vBx5nHRpWKNUNhB2nPTpfbziTs5c1X358be3peVWA4wPxW7Nt53KIg1tnNw==} + engines: {node: '>=14.0.0'} + dependencies: + tslib: 2.5.0 + dev: false + /@azure/abort-controller/1.1.0: resolution: {integrity: sha512-TrRLIoSQVzfAJX9H1JeFjzAoDGcoK1IYX1UImfceTZpsyYfWr09Ss1aHW1y5TrrR3iq6RZLBwJ3E24uwPhwahw==} engines: {node: '>=12.0.0'} @@ -3591,6 +4560,10 @@ packages: /bn.js/5.2.1: resolution: {integrity: sha512-eXRvHzWyYPBuB4NBy0cmYQjGitUrtqwbvlzP3G6VFnNRbsZQIxQ10PbKKHt8gZ/HW/D/747aDl+QkDqg3KQLMQ==} + /bowser/2.11.0: + resolution: {integrity: sha512-AlcaJBi/pqqJBIQ8U9Mcpc9i8Aqxn88Skv5d+xBX006BY5u8N3mGLHa5Lgppa7L/HfwgwLgZ6NYs+Ag6uUmJRA==} + dev: false + /brace-expansion/1.1.11: resolution: {integrity: sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==} dependencies: @@ -3796,6 +4769,13 @@ packages: ieee754: 1.2.1 dev: true + /buffer/5.6.0: + resolution: {integrity: sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==} + dependencies: + base64-js: 1.5.1 + ieee754: 1.2.1 + dev: false + /buffer/6.0.3: resolution: {integrity: sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA==} dependencies: @@ -5207,6 +6187,13 @@ packages: resolution: {integrity: sha512-VhXlQgj9ioXCqGstD37E/HBeqEGV/qOD/kmbVG8h5xKBYvM1L3lR1Zn4555cQ8GkYbJa8aJSipLPndE1k6zK2w==} dev: false + /fast-xml-parser/4.1.2: + resolution: {integrity: sha512-CDYeykkle1LiA/uqQyNwYpFbyF6Axec6YapmpUP+/RHWIoR1zKjocdvNaTsxCxZzQ6v9MLXaSYm9Qq0thv0DHg==} + hasBin: true + dependencies: + strnum: 1.0.5 + dev: false + /fastq/1.13.0: resolution: {integrity: sha512-YpkpUnK8od0o1hmeSc7UUs/eB/vIPWJYjKck2QKIzAf71Vm1AAQ3EbuZB3g2JIy+pg+ERD0vqI79KyZiB2e2Nw==} dependencies: @@ -8900,7 +9887,6 @@ packages: dependencies: inherits: 2.0.4 readable-stream: 3.6.0 - dev: true /stream-combiner2/1.1.1: resolution: {integrity: sha512-3PnJbYgS56AeWgtKF5jtJRT6uFJe56Z0Hc5Ngg/6sI6rIt8iiMBTa9cvdyFfpMQjaVHr8dusbNeFGIIonxOvKw==} @@ -9036,6 +10022,10 @@ packages: resolution: {integrity: sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig==} engines: {node: '>=8'} + /strnum/1.0.5: + resolution: {integrity: sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA==} + dev: false + /stubs/3.0.0: resolution: {integrity: sha512-PdHt7hHUJKxvTCgbKX9C1V/ftOcjJQgz8BZwNfV5c4B6dcGqlpelTbJ999jBGZ2jYiPAwcX5dP6oBwVlBlUbxw==} dev: false @@ -9356,6 +10346,10 @@ packages: /tslib/2.4.1: resolution: {integrity: sha512-tGyy4dAjRIEwI7BzsB0lynWgOpfqjUdq91XXAlIWD2OwKBH7oCl/GZG/HT4BOHrTlPMOASlMQ7veyTqpmRcrNA==} + /tslib/2.5.0: + resolution: {integrity: sha512-336iVw3rtn2BUK7ORdIAHTyxHGRIHVReokCR3XjbckJMK7ms8FysBfhLR8IXnAgy7T0PTPNBWKiH514FOW/WSg==} + dev: false + /tsutils/3.21.0_typescript@4.8.4: resolution: {integrity: sha512-mHKK3iUXL+3UF6xL5k0PEhKRUBKPBCv/+RkEOpjRWxxx27KKRBmmA60A9pgOUvMi8GKhRMPEmjBRPzs2W7O1OA==} engines: {node: '>= 6'} diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index 7b5e61728df..cb501fc143d 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -15,6 +15,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processPluginJobs: true, processAsyncHandlers: true, sessionRecordingIngestion: true, + sessionRecordingBlobIngestion: true, ...sharedCapabilities, } case 'ingestion': @@ -40,10 +41,14 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin } case 'recordings-ingestion': return { - mmdb: false, sessionRecordingIngestion: true, ...sharedCapabilities, } + case 'recordings-blob-ingestion': + return { + sessionRecordingBlobIngestion: true, + ...sharedCapabilities, + } case 'async': return { diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index b7e7f448c5e..33d6ec46557 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -96,11 +96,11 @@ export function getDefaultConfig(): PluginsServerConfig { BUFFER_CONVERSION_SECONDS: isDevEnv() ? 2 : 60, // KEEP IN SYNC WITH posthog/settings/ingestion.py PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min KAFKA_HEALTHCHECK_SECONDS: 20, - OBJECT_STORAGE_ENABLED: false, + OBJECT_STORAGE_ENABLED: true, OBJECT_STORAGE_ENDPOINT: 'http://localhost:19000', + OBJECT_STORAGE_REGION: 'us-east-1', OBJECT_STORAGE_ACCESS_KEY_ID: 'object_storage_root_user', OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password', - OBJECT_STORAGE_SESSION_RECORDING_FOLDER: 'session_recordings', OBJECT_STORAGE_BUCKET: 'posthog', PLUGIN_SERVER_MODE: null, KAFKAJS_LOG_LEVEL: 'WARN', @@ -112,6 +112,14 @@ export function getDefaultConfig(): PluginsServerConfig { MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0, USE_KAFKA_FOR_SCHEDULED_TASKS: true, CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag + + SESSION_RECORDING_BLOB_PROCESSING_TEAMS: '', // TODO: BW Change this to 'all' when we release it fully + SESSION_RECORDING_LOCAL_DIRECTORY: '.tmp/sessions', + SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: 60 * 10, // NOTE: 10 minutes + SESSION_RECORDING_MAX_BUFFER_SIZE_KB: ['dev', 'test'].includes(process.env.NODE_ENV || 'undefined') + ? 1024 // NOTE: ~100KB in dev or test, so that even with gzipped content we still flush pretty frequently + : 1024 * 50, // ~50MB after compression in prod + SESSION_RECORDING_REMOTE_FOLDER: 'session_recordings', } } diff --git a/plugin-server/src/healthcheck.ts b/plugin-server/src/healthcheck.ts index 5cbdbe8cf25..24e39d3c3f9 100644 --- a/plugin-server/src/healthcheck.ts +++ b/plugin-server/src/healthcheck.ts @@ -1,5 +1,5 @@ import { defaultConfig } from './config/config' -import { connectObjectStorage } from './main/services/object_storage' +import { getObjectStorage } from './main/services/object_storage' import { status } from './utils/status' import { createRedis } from './utils/utils' @@ -23,11 +23,11 @@ const redisHealthcheck = async (): Promise => { } const storageHealthcheck = async (): Promise => { - if (!defaultConfig.OBJECT_STORAGE_ENABLED) { + const storage = getObjectStorage(defaultConfig) + + if (!storage) { return true } - - const storage = connectObjectStorage(defaultConfig) try { const storageHealthy = await storage.healthcheck() if (storageHealthy) { diff --git a/plugin-server/src/kafka/batch-consumer.ts b/plugin-server/src/kafka/batch-consumer.ts index e871aebd599..7e796bba2c3 100644 --- a/plugin-server/src/kafka/batch-consumer.ts +++ b/plugin-server/src/kafka/batch-consumer.ts @@ -1,4 +1,4 @@ -import { GlobalConfig, Message } from 'node-rdkafka' +import { GlobalConfig, KafkaConsumer, Message } from 'node-rdkafka' import { exponentialBuckets, Histogram } from 'prom-client' import { status } from '../utils/status' @@ -11,6 +11,13 @@ import { instrumentConsumerMetrics, } from './consumer' +export interface BatchConsumer { + consumer: KafkaConsumer + join: () => Promise + stop: () => Promise + isHealthy: () => boolean +} + export const startBatchConsumer = async ({ connectionConfig, groupId, @@ -21,6 +28,7 @@ export const startBatchConsumer = async ({ consumerMaxWaitMs, fetchBatchSize, eachBatch, + autoCommit = true, }: { connectionConfig: GlobalConfig groupId: string @@ -31,7 +39,8 @@ export const startBatchConsumer = async ({ consumerMaxWaitMs: number fetchBatchSize: number eachBatch: (messages: Message[]) => Promise -}) => { + autoCommit?: boolean +}): Promise => { // Starts consuming from `topic` in batches of `fetchBatchSize` messages, // with consumer group id `groupId`. We use `connectionConfig` to connect // to Kafka. We commit offsets after each batch has been processed, @@ -124,6 +133,11 @@ export const startBatchConsumer = async ({ const messages = await consumeMessages(consumer, fetchBatchSize) status.debug('🔁', 'main_loop_consumed', { messagesLength: messages.length }) + if (!messages.length) { + // For now + continue + } + consumerBatchSize.labels({ topic, groupId }).observe(messages.length) for (const message of messages) { consumedMessageSizeBytes.labels({ topic, groupId }).observe(message.size) @@ -133,7 +147,9 @@ export const startBatchConsumer = async ({ // the implementation of `eachBatch`. await eachBatch(messages) - commitOffsetsForMessages(messages, consumer) + if (autoCommit) { + commitOffsetsForMessages(messages, consumer) + } } } catch (error) { status.error('🔁', 'main_loop_error', { error }) @@ -178,7 +194,7 @@ export const startBatchConsumer = async ({ } } - return { isHealthy, stop, join } + return { isHealthy, stop, join, consumer } } export const consumerBatchSize = new Histogram({ diff --git a/plugin-server/src/kafka/consumer.ts b/plugin-server/src/kafka/consumer.ts index 83d2091e124..149f33a2d47 100644 --- a/plugin-server/src/kafka/consumer.ts +++ b/plugin-server/src/kafka/consumer.ts @@ -1,5 +1,6 @@ import { ClientMetrics, + CODES, ConsumerGlobalConfig, KafkaConsumer as RdKafkaConsumer, LibrdKafkaError, @@ -79,10 +80,20 @@ export const instrumentConsumerMetrics = (consumer: RdKafkaConsumer, groupId: st // TODO: add other relevant metrics here // TODO: expose the internal librdkafka metrics as well. consumer.on('rebalance', (error: LibrdKafkaError, assignments: TopicPartition[]) => { - if (error) { - status.error('⚠️', 'rebalance_error', { error: error }) + /** + * see https://github.com/Blizzard/node-rdkafka#rebalancing errors are used to signal + * both errors and _not_ errors + * + * When rebalancing starts the consumer receives ERR_REVOKED_PARTITIONS + * And when the balancing is completed the new assignments are received with ERR__ASSIGN_PARTITIONS + */ + if (error.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { + status.info('📝️', 'librdkafka rebalance, partitions assigned', { assignments }) + } else if (error.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) { + status.info('📝️', 'librdkafka rebalance started, partitions revoked', { assignments }) } else { - status.info('📝', 'librdkafka rebalance', { assignments: assignments }) + // We had a "real" error + status.error('⚠️', 'rebalance_error', { error }) } latestOffsetTimestampGauge.reset() @@ -160,10 +171,10 @@ export const disconnectConsumer = async (consumer: RdKafkaConsumer) => { await new Promise((resolve, reject) => { consumer.disconnect((error, data) => { if (error) { - status.error('🔥', 'Failed to disconnect session recordings consumer', { error }) + status.error('🔥', 'Failed to disconnect node-rdkafka consumer', { error }) reject(error) } else { - status.info('🔁', 'Disconnected session recordings consumer') + status.info('🔁', 'Disconnected session node-rdkafka consumer') resolve(data) } }) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/offset-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/offset-manager.ts new file mode 100644 index 00000000000..41a64b2c508 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/offset-manager.ts @@ -0,0 +1,117 @@ +/** + * - A note on Kafka partitioning + * + * We are ingesting events and partitioning based on session_id. This means that we can have multiple sessions + * going to one partition, but for any given ID they should land all on the same partition. + * + * As we want to buffer events before writing them to S3, we don't auto-commit our kafka consumer offsets. + * Instead, we track all offsets that are "in-flight" and when we flush a buffer to S3 we remove these in-flight offsets + * and write the oldest offset to Kafka. This allows us to resume from the oldest offset in the case of a consumer + * restart or rebalance, even if some of the following offsets have already been written to S3. + * + * The other trick is when a rebalance occurs we need to remove all in-flight sessions for partitions that are no longer + * assigned to us. + * + * This all works based on the idea that there is only one consumer (Orchestrator) per partition, allowing us to + * track everything in this single process + */ + +import { KafkaConsumer } from 'node-rdkafka' + +import { status } from '../../../../utils/status' + +export class OffsetManager { + // We have to track every message's offset so that we can commit them only after they've been written to S3 + offsetsByPartitionTopic: Map = new Map() + + constructor(private consumer: KafkaConsumer) {} + + public addOffset(topic: string, partition: number, offset: number): void { + const key = `${topic}-${partition}` + + if (!this.offsetsByPartitionTopic.has(key)) { + this.offsetsByPartitionTopic.set(key, []) + } + + // TODO: We should parseInt when we handle the message + this.offsetsByPartitionTopic.get(key)?.push(offset) + } + + /** + * When a rebalance occurs we need to remove all in-flight offsets for partitions that are no longer + * assigned to this consumer. + * + * @param topic + * @param assignedPartitions The partitions that are still assigned to this consumer after a rebalance + */ + public cleanPartitions(topic: string, assignedPartitions: number[]): void { + const assignedKeys = assignedPartitions.map((partition) => `${topic}-${partition}`) + + const keysToDelete = new Set() + for (const [key] of this.offsetsByPartitionTopic) { + if (!assignedKeys.includes(key)) { + keysToDelete.add(key) + } + } + + keysToDelete.forEach((key) => { + this.offsetsByPartitionTopic.delete(key) + }) + } + + // TODO: Ensure all offsets passed here are already checked to be part of the same partition + public removeOffsets(topic: string, partition: number, offsets: number[]): number | undefined { + // TRICKY - We want to find the newest offset from the ones being removed that is + // older than the oldest in the list + // e.g. [3, 4, 8, 10] -> removing [3,8] should end up with [4,10] and commit 3 + // e.g. [3, 4, 8, 10 ] -> removing [10] should end up with [3,4,8] and commit nothing + + if (!offsets.length) { + return + } + + let offsetToCommit: number | undefined + const offsetsToRemove = offsets.sort((a, b) => a - b) + + const key = `${topic}-${partition}` + const inFlightOffsets = this.offsetsByPartitionTopic.get(key) + + status.info('💾', `Current offsets: ${inFlightOffsets}`) + status.info('💾', `Removing offsets: ${offsets}`) + + if (!inFlightOffsets) { + // TODO: Add a metric so that we can see if and when this happens + status.warn('💾', `No inflight offsets found for key: ${key}.`) + return + } + + offsetsToRemove.forEach((offset) => { + // Remove from the list. If it is the lowest value - set it + const offsetIndex = inFlightOffsets.indexOf(offset) + if (offsetIndex >= 0) { + inFlightOffsets.splice(offsetIndex, 1) + } + + // As the offsets are ordered we can simply check if we are removing from the start + // Higher offsets will update this value + if (offsetIndex === 0) { + offsetToCommit = offset + } + }) + + this.offsetsByPartitionTopic.set(key, inFlightOffsets) + + if (offsetToCommit) { + status.info('💾', `Committing offset ${offsetToCommit} for ${topic}-${partition}`) + this.consumer.commit({ + topic, + partition, + offset: offsetToCommit, + }) + } else { + status.info('💾', `No offset to commit from: ${inFlightOffsets}`) + } + + return offsetToCommit + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts new file mode 100644 index 00000000000..b9a59723d79 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/session-manager.ts @@ -0,0 +1,257 @@ +import { Upload } from '@aws-sdk/lib-storage' +import { captureException } from '@sentry/node' +import { randomUUID } from 'crypto' +import { createReadStream, writeFileSync } from 'fs' +import { appendFile, unlink } from 'fs/promises' +import path from 'path' +import { Counter } from 'prom-client' +import * as zlib from 'zlib' + +import { PluginsServerConfig } from '../../../../types' +import { status } from '../../../../utils/status' +import { ObjectStorage } from '../../../services/object_storage' +import { IncomingRecordingMessage } from './types' +import { convertToPersistedMessage } from './utils' + +export const counterS3FilesWritten = new Counter({ + name: 'recording_s3_files_written', + help: 'Indicates that a given key has overflowed capacity and been redirected to a different topic. Value incremented once a minute.', + labelNames: ['partition_key'], +}) + +const ESTIMATED_GZIP_COMPRESSION_RATIO = 0.1 + +// The buffer is a list of messages grouped +type SessionBuffer = { + id: string + count: number + size: number + createdAt: Date + file: string + offsets: number[] +} + +async function deleteFile(file: string) { + try { + await unlink(file) + } catch (err) { + if (err && err.code === 'ENOENT') { + status.warn('⚠️', 'blob_ingester_session_manager failed deleting file ' + file + ', file not found', { + err, + file, + }) + return + } + status.error('🧨', 'blob_ingester_session_manager failed deleting file ' + file, { err, file }) + captureException(err) + throw err + } +} + +export class SessionManager { + chunks: Map = new Map() + buffer: SessionBuffer + flushBuffer?: SessionBuffer + + constructor( + public readonly serverConfig: PluginsServerConfig, + public readonly s3Client: ObjectStorage['s3'], + public readonly teamId: number, + public readonly sessionId: string, + public readonly partition: number, + public readonly topic: string, + private readonly onFinish: (offsetsToRemove: number[]) => void + ) { + this.buffer = this.createBuffer() + + // this.lastProcessedOffset = redis.get(`session-recording-last-offset-${this.sessionId}`) || 0 + } + + public async add(message: IncomingRecordingMessage): Promise { + // TODO: Check that the offset is higher than the lastProcessed + // If not - ignore it + // If it is - update lastProcessed and process it + if (message.chunk_count === 1) { + await this.addToBuffer(message) + } else { + await this.addToChunks(message) + } + + await this.flushIfNeccessary(true) + } + + public get isEmpty(): boolean { + return this.buffer.count === 0 && this.chunks.size === 0 + } + + public async flushIfNeccessary(shouldLog = false): Promise { + const bufferSizeKb = this.buffer.size / 1024 + const gzipSizeKb = bufferSizeKb * ESTIMATED_GZIP_COMPRESSION_RATIO + const gzippedCapacity = gzipSizeKb / this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB + + if (shouldLog) { + status.info( + '🚽', + `blob_ingester_session_manager Buffer ${this.sessionId}:: buffer size: ${ + this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB + }kb capacity: ${(gzippedCapacity * 100).toFixed(2)}%: count: ${this.buffer.count} ${Math.round( + bufferSizeKb + )}KB (~ ${Math.round(gzipSizeKb)}KB GZIP) chunks: ${this.chunks.size})`, + { + sizeInBufferKB: bufferSizeKb, + estimatedSizeInGzipKB: gzipSizeKb, + bufferThreshold: this.serverConfig.SESSION_RECORDING_MAX_BUFFER_SIZE_KB, + calculatedCapacity: gzippedCapacity, + } + ) + } + + const overCapacity = gzippedCapacity > 1 + const timeSinceLastFlushTooLong = + Date.now() - this.buffer.createdAt.getTime() >= + this.serverConfig.SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS * 1000 + const readyToFlush = overCapacity || timeSinceLastFlushTooLong + + if (readyToFlush) { + status.info('🚽', `blob_ingester_session_manager Flushing buffer ${this.sessionId}...`) + await this.flush() + } + } + + /** + * Flushing takes the current buffered file and moves it to the flush buffer + * We then attempt to write the events to S3 and if successful, we clear the flush buffer + */ + public async flush(): Promise { + if (this.flushBuffer) { + status.warn('⚠️', "blob_ingester_session_manager Flush called but we're already flushing") + return + } + + // We move the buffer to the flush buffer and create a new buffer so that we can safely write the buffer to disk + this.flushBuffer = this.buffer + this.buffer = this.createBuffer() + + try { + const baseKey = `${this.serverConfig.SESSION_RECORDING_REMOTE_FOLDER}/team_id/${this.teamId}/session_id/${this.sessionId}` + const dataKey = `${baseKey}/data/${this.flushBuffer.createdAt.getTime()}` // TODO: Change to be based on events times + + // TODO should only compress over some threshold? Depends how many uncompressed files we see below c200kb + const fileStream = createReadStream(this.flushBuffer.file).pipe(zlib.createGzip()) + + const parallelUploads3 = new Upload({ + client: this.s3Client, + params: { + Bucket: this.serverConfig.OBJECT_STORAGE_BUCKET, + Key: dataKey, + Body: fileStream, + }, + }) + await parallelUploads3.done() + + counterS3FilesWritten.inc(1) + // TODO: Add prometheus metric for the size of the file as well + // counterS3FilesWritten.add(1, { + // bytes: this.flushBuffer.size, // since the file is compressed this is wrong, and we don't know the compressed size 🤔 + // }) + } catch (error) { + // TODO: If we fail to write to S3 we should be do something about it + status.error('🧨', 'blob_ingester_session_manager failed writing session recording blob to S3', error) + captureException(error) + } finally { + await deleteFile(this.flushBuffer.file) + + const offsets = this.flushBuffer.offsets + this.flushBuffer = undefined + + status.debug( + '🚽', + `blob_ingester_session_manager Flushed buffer ${this.sessionId} (removing offsets: ${offsets})` + ) + // TODO: Sync the last processed offset to redis + this.onFinish(offsets) + } + } + + private createBuffer(): SessionBuffer { + const id = randomUUID() + const buffer = { + id, + count: 0, + size: 0, + createdAt: new Date(), + file: path.join( + this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, + `${this.teamId}.${this.sessionId}.${id}.jsonl` + ), + offsets: [], + } + + // NOTE: We can't do this easily async as we would need to handle the race condition of multiple events coming in at once. + writeFileSync(buffer.file, '', 'utf-8') + + return buffer + } + + /** + * Full messages (all chunks) are added to the buffer directly + */ + private async addToBuffer(message: IncomingRecordingMessage): Promise { + const content = JSON.stringify(convertToPersistedMessage(message)) + '\n' + this.buffer.count += 1 + this.buffer.size += Buffer.byteLength(content) + this.buffer.offsets.push(message.metadata.offset) + + try { + await appendFile(this.buffer.file, content, 'utf-8') + } catch (e) { + status.error('🧨', 'blob_ingester_session_manager failed writing session recording buffer to disk', e) + captureException(e) + throw e + } + } + + /** + * Chunked messages are added to the chunks map + * Once all chunks are received, the message is added to the buffer + * + */ + private async addToChunks(message: IncomingRecordingMessage): Promise { + // If it is a chunked message we add to the collected chunks + let chunks: IncomingRecordingMessage[] = [] + + if (!this.chunks.has(message.chunk_id)) { + this.chunks.set(message.chunk_id, chunks) + } else { + chunks = this.chunks.get(message.chunk_id) || [] + } + + chunks.push(message) + + if (chunks.length === message.chunk_count) { + // If we have all the chunks, we can add the message to the buffer + // We want to add all the chunk offsets as well so that they are tracked correctly + chunks.forEach((x) => { + this.buffer.offsets.push(x.metadata.offset) + }) + + await this.addToBuffer({ + ...message, + data: chunks + .sort((a, b) => a.chunk_index - b.chunk_index) + .map((c) => c.data) + .join(''), + }) + + this.chunks.delete(message.chunk_id) + } + } + + public async destroy(): Promise { + status.debug('␡', `blob_ingester_session_manager Destroying session manager ${this.sessionId}`) + const filePromises: Promise[] = [this.flushBuffer?.file, this.buffer.file] + .filter((x): x is string => x !== undefined) + .map((x) => deleteFile(x)) + await Promise.all(filePromises) + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/types.ts b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/types.ts new file mode 100644 index 00000000000..dc1007ba2ae --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/types.ts @@ -0,0 +1,32 @@ +// This is the incoming message from Kafka +export type IncomingRecordingMessage = { + metadata: { + topic: string + partition: number + offset: number + } + + team_id: number + distinct_id: string + session_id: string + window_id?: string + + // Properties data + chunk_id: string + chunk_index: number + chunk_count: number + data: string + compresssion: string + has_full_snapshot: boolean + events_summary: { + timestamp: number + type: number + data: any + }[] +} + +// This is the incoming message from Kafka +export type PersistedRecordingMessage = { + window_id?: string + data: any +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/utils.ts new file mode 100644 index 00000000000..6fea3650533 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/blob-ingester/utils.ts @@ -0,0 +1,23 @@ +import zlib from 'node:zlib' + +import { IncomingRecordingMessage, PersistedRecordingMessage } from './types' + +// NOTE: These functions are meant to be identical to those in posthog/session_recordings/session_recording_helpers.py +export function compressToString(input: string): string { + const compressed_data = zlib.gzipSync(Buffer.from(input, 'utf16le')) + return compressed_data.toString('base64') +} + +export function decompressFromString(input: string): string { + const compressedData = Buffer.from(input, 'base64') + const uncompressed = zlib.gunzipSync(compressedData) + // Trim is quick way to get rid of BOMs created by python + return uncompressed.toString('utf16le').trim() +} + +export const convertToPersistedMessage = (message: IncomingRecordingMessage): PersistedRecordingMessage => { + return { + window_id: message.window_id, + data: decompressFromString(message.data), + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts new file mode 100644 index 00000000000..059602131b7 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-blob-consumer.ts @@ -0,0 +1,295 @@ +import { mkdirSync, rmSync } from 'node:fs' +import { CODES, HighLevelProducer as RdKafkaProducer, Message } from 'node-rdkafka' + +import { KAFKA_SESSION_RECORDING_EVENTS } from '../../../config/kafka-topics' +import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' +import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { createKafkaProducer, disconnectProducer } from '../../../kafka/producer' +import { PipelineEvent, PluginsServerConfig, RawEventMessage, Team } from '../../../types' +import { KafkaConfig } from '../../../utils/db/hub' +import { status } from '../../../utils/status' +import { TeamManager } from '../../../worker/ingestion/team-manager' +import { ObjectStorage } from '../../services/object_storage' +import { OffsetManager } from './blob-ingester/offset-manager' +import { SessionManager } from './blob-ingester/session-manager' +import { IncomingRecordingMessage } from './blob-ingester/types' + +const groupId = 'session-recordings-blob' +const sessionTimeout = 30000 +const fetchBatchSize = 500 + +export class SessionRecordingBlobIngester { + sessions: Map = new Map() + offsetManager?: OffsetManager + batchConsumer?: BatchConsumer + producer?: RdKafkaProducer + lastHeartbeat: number = Date.now() + flushInterval: NodeJS.Timer | null = null + enabledTeams: number[] | null + + constructor( + private teamManager: TeamManager, + private serverConfig: PluginsServerConfig, + private objectStorage: ObjectStorage + ) { + const enabledTeamsString = this.serverConfig.SESSION_RECORDING_BLOB_PROCESSING_TEAMS + this.enabledTeams = + enabledTeamsString === 'all' || enabledTeamsString.trim().length === 0 + ? null + : enabledTeamsString.split(',').map(parseInt) + } + + public async consume(event: IncomingRecordingMessage): Promise { + const { team_id, session_id } = event + const key = `${team_id}-${session_id}` + + const { partition, topic, offset } = event.metadata + + if (!this.sessions.has(key)) { + const { partition, topic } = event.metadata + + const sessionManager = new SessionManager( + this.serverConfig, + this.objectStorage.s3, + team_id, + session_id, + partition, + topic, + (offsets) => { + this.offsetManager?.removeOffsets(topic, partition, offsets) + + // If the SessionManager is done (flushed and with no more queued events) then we remove it to free up memory + if (sessionManager.isEmpty) { + this.sessions.delete(key) + } + } + ) + + this.sessions.set(key, sessionManager) + } + + this.offsetManager?.addOffset(topic, partition, offset) + await this.sessions.get(key)?.add(event) + // TODO: If we error here, what should we do...? + // If it is unrecoverable we probably want to remove the offset + // If it is recoverable, we probably want to retry? + } + + public async handleKafkaMessage(message: Message): Promise { + const statusWarn = (reason: string, error?: Error) => { + status.warn('⚠️', 'invalid_message', { + reason, + error, + partition: message.partition, + offset: message.offset, + }) + } + + if (!message.value) { + return statusWarn('empty') + } + + let messagePayload: RawEventMessage + let event: PipelineEvent + + try { + messagePayload = JSON.parse(message.value.toString()) + event = JSON.parse(messagePayload.data) + } catch (error) { + return statusWarn('invalid_json', error) + } + + if (event.event !== '$snapshot') { + status.debug('🙈', 'Received non-snapshot message, ignoring') + return + } + + if (messagePayload.team_id == null && !messagePayload.token) { + return statusWarn('no_token') + } + + let team: Team | null = null + + if (messagePayload.team_id != null) { + team = await this.teamManager.fetchTeam(messagePayload.team_id) + } else if (messagePayload.token) { + team = await this.teamManager.getTeamByToken(messagePayload.token) + } + + if (team == null) { + return statusWarn('team_not_found') + } + + if (this.enabledTeams && !this.enabledTeams.includes(team.id)) { + // NOTE: due to the high volume of hits here we don't log this + return + } + + status.info('⬆️', 'processing_session_recording_blob', { uuid: messagePayload.uuid }) + + const $snapshot_data = event.properties?.$snapshot_data + + const recordingMessage: IncomingRecordingMessage = { + metadata: { + partition: message.partition, + topic: message.topic, + offset: message.offset, + }, + + team_id: team.id, + distinct_id: event.distinct_id, + session_id: event.properties?.$session_id, + window_id: event.properties?.$window_id, + + // Properties data + chunk_id: $snapshot_data.chunk_id, + chunk_index: $snapshot_data.chunk_index, + chunk_count: $snapshot_data.chunk_count, + data: $snapshot_data.data, + compresssion: $snapshot_data.compression, + has_full_snapshot: $snapshot_data.has_full_snapshot, + events_summary: $snapshot_data.events_summary, + } + + await this.consume(recordingMessage) + } + + private async handleEachBatch(messages: Message[]): Promise { + status.info('🔁', 'Processing recordings blob batch', { size: messages.length }) + + for (const message of messages) { + await this.handleKafkaMessage(message) + } + } + + public async start(): Promise { + status.info('🔁', 'Starting session recordings blob consumer') + + // Currently we can't reuse any files stored on disk, so we opt to delete them all + rmSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) + mkdirSync(this.serverConfig.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true }) + + status.info('🔁', 'Starting session recordings consumer') + + const connectionConfig = createRdConnectionConfigFromEnvVars(this.serverConfig as KafkaConfig) + this.producer = await createKafkaProducer(connectionConfig) + + // Create a node-rdkafka consumer that fetches batches of messages, runs + // eachBatchWithContext, then commits offsets for the batch. + this.batchConsumer = await startBatchConsumer({ + connectionConfig, + groupId, + topic: KAFKA_SESSION_RECORDING_EVENTS, + sessionTimeout, + consumerMaxBytes: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES, + consumerMaxBytesPerPartition: this.serverConfig.KAFKA_CONSUMPTION_MAX_BYTES_PER_PARTITION, + consumerMaxWaitMs: this.serverConfig.KAFKA_CONSUMPTION_MAX_WAIT_MS, + fetchBatchSize, + autoCommit: false, + eachBatch: async (messages) => { + return await this.handleEachBatch(messages) + }, + }) + + this.offsetManager = new OffsetManager(this.batchConsumer.consumer) + + this.batchConsumer.consumer.on('rebalance', async (err, assignments) => { + /** + * see https://github.com/Blizzard/node-rdkafka#rebalancing + * + * This event is received when the consumer group starts _or_ finishes rebalancing. + * + * Also, see https://docs.confluent.io/platform/current/clients/librdkafka/html/classRdKafka_1_1RebalanceCb.html + * For eager/non-cooperative partition.assignment.strategy assignors, such as range and roundrobin, + * the application must use assign() to set and unassign() to clear the entire assignment. + * For the cooperative assignors, such as cooperative-sticky, the application must use + * incremental_assign() for ERR__ASSIGN_PARTITIONS and incremental_unassign() for ERR__REVOKE_PARTITIONS. + */ + status.info('🏘️', 'Blob ingestion consumer rebalanced') + if (err.code === CODES.ERRORS.ERR__ASSIGN_PARTITIONS) { + status.info('⚖️', 'Blob ingestion consumer has received assignments', { assignments }) + + const partitions = assignments.map((assignment) => assignment.partition) + + this.offsetManager?.cleanPartitions(KAFKA_SESSION_RECORDING_EVENTS, partitions) + + await Promise.all( + [...this.sessions.values()] + .filter((session) => !partitions.includes(session.partition)) + .map((session) => session.destroy()) + ) + + // Assign partitions to the consumer + // TODO read offset position from partitions so we can read from the correct place + // TODO looking here https://github.com/Blizzard/node-rdkafka/blob/master/lib/kafka-consumer.js#L54 + // TODO we should not need to handle the assignment ourself since rebalance_cb = true + // this.batchConsumer?.consumer.assign(assignments) + } else if (err.code === CODES.ERRORS.ERR__REVOKE_PARTITIONS) { + status.info('⚖️', 'Blob ingestion consumer has had assignments revoked', { assignments }) + /** + * The revoke_partitions event occurs when the Kafka Consumer is part of a consumer group and the group rebalances. + * As a result, some partitions previously assigned to a consumer might be taken away (revoked) and reassigned to another consumer. + * After the revoke_partitions event is handled, the consumer will receive an assign_partitions event, + * which will inform the consumer of the new set of partitions it is responsible for processing. + * + * Depending on why the rebalancing is occurring and the partition.assignment.strategy, + * A partition revoked here, may be assigned back to the same consumer. + * + * This is where we could act to reduce raciness/duplication when partitions are reassigned to different consumers + * e.g. stop the `flushInterval` and wait for the `assign_partitions` event to start it again. + */ + // this.batchConsumer?.consumer.unassign() + } else { + // We had a "real" error + status.error('🔥', 'Blob ingestion consumer rebalancing error', { err }) + // TODO: immediately die? or just keep going? + } + }) + + // Make sure to disconnect the producer after we've finished consuming. + this.batchConsumer.join().finally(async () => { + if (this.producer && this.producer.isConnected()) { + status.debug('🔁', 'disconnecting kafka producer in session recordings batchConsumer finally') + await disconnectProducer(this.producer) + } + }) + + this.batchConsumer.consumer.on('disconnected', async (err) => { + // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect + // we need to listen to disconnect and make sure we're stopped + status.info('🔁', 'Blob ingestion consumer disconnected, cleaning up', { err }) + await this.stop() + }) + + // We trigger the flushes from this level to reduce the number of running timers + this.flushInterval = setInterval(() => { + this.sessions.forEach((sessionManager) => { + void sessionManager.flushIfNeccessary() + }) + }, 10000) + } + + public async stop(): Promise { + status.info('🔁', 'Stopping session recordings consumer') + + if (this.flushInterval) { + clearInterval(this.flushInterval) + } + + if (this.producer && this.producer.isConnected()) { + status.info('🔁', 'disconnecting kafka producer in session recordings batchConsumer stop') + await disconnectProducer(this.producer) + } + await this.batchConsumer?.stop() + + // This is inefficient but currently necessary due to new instances restarting from the committed offset point + const destroyPromises: Promise[] = [] + this.sessions.forEach((sessionManager) => { + destroyPromises.push(sessionManager.destroy()) + }) + + await Promise.allSettled(destroyPromises) + + this.sessions = new Map() + } +} diff --git a/plugin-server/src/main/ingestion-queues/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts similarity index 94% rename from plugin-server/src/main/ingestion-queues/session-recordings-consumer.ts rename to plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 8b468cd3029..3fe62dc36c7 100644 --- a/plugin-server/src/main/ingestion-queues/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -6,18 +6,18 @@ import { KAFKA_PERFORMANCE_EVENTS, KAFKA_SESSION_RECORDING_EVENTS, KAFKA_SESSION_RECORDING_EVENTS_DLQ, -} from '../../config/kafka-topics' -import { startBatchConsumer } from '../../kafka/batch-consumer' -import { createRdConnectionConfigFromEnvVars } from '../../kafka/config' -import { retryOnDependencyUnavailableError } from '../../kafka/error-handling' -import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../kafka/producer' -import { PipelineEvent, RawEventMessage, Team } from '../../types' -import { KafkaConfig } from '../../utils/db/hub' -import { status } from '../../utils/status' -import { createPerformanceEvent, createSessionRecordingEvent } from '../../worker/ingestion/process-event' -import { TeamManager } from '../../worker/ingestion/team-manager' -import { parseEventTimestamp } from '../../worker/ingestion/timestamps' -import { eventDroppedCounter } from './metrics' +} from '../../../config/kafka-topics' +import { startBatchConsumer } from '../../../kafka/batch-consumer' +import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' +import { retryOnDependencyUnavailableError } from '../../../kafka/error-handling' +import { createKafkaProducer, disconnectProducer, flushProducer, produce } from '../../../kafka/producer' +import { PipelineEvent, RawEventMessage, Team } from '../../../types' +import { KafkaConfig } from '../../../utils/db/hub' +import { status } from '../../../utils/status' +import { createPerformanceEvent, createSessionRecordingEvent } from '../../../worker/ingestion/process-event' +import { TeamManager } from '../../../worker/ingestion/team-manager' +import { parseEventTimestamp } from '../../../worker/ingestion/timestamps' +import { eventDroppedCounter } from '../metrics' export const startSessionRecordingEventsConsumer = async ({ teamManager, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 4f27df4ac58..039318da56e 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -27,8 +27,10 @@ import { startJobsConsumer } from './ingestion-queues/jobs-consumer' import { IngestionConsumer } from './ingestion-queues/kafka-queue' import { startOnEventHandlerConsumer } from './ingestion-queues/on-event-handler-consumer' import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' -import { startSessionRecordingEventsConsumer } from './ingestion-queues/session-recordings-consumer' +import { SessionRecordingBlobIngester } from './ingestion-queues/session-recording/session-recordings-blob-consumer' +import { startSessionRecordingEventsConsumer } from './ingestion-queues/session-recording/session-recordings-consumer' import { createHttpServer } from './services/http-server' +import { getObjectStorage } from './services/object_storage' const { version } = require('../../package.json') @@ -87,7 +89,9 @@ export async function startPluginsServer( // meantime. let bufferConsumer: Consumer | undefined let stopSessionRecordingEventsConsumer: (() => void) | undefined + let stopSessionRecordingBlobConsumer: (() => void) | undefined let joinSessionRecordingEventsConsumer: ((timeout?: number) => Promise) | undefined + let joinSessionRecordingBlobConsumer: ((timeout?: number) => Promise) | undefined let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined @@ -126,6 +130,7 @@ export async function startPluginsServer( bufferConsumer?.disconnect(), jobsConsumer?.disconnect(), stopSessionRecordingEventsConsumer?.(), + stopSessionRecordingBlobConsumer?.(), schedulerTasksConsumer?.disconnect(), ]) @@ -419,6 +424,23 @@ export async function startPluginsServer( healthChecks['session-recordings'] = isSessionRecordingsHealthy } + if (capabilities.sessionRecordingBlobIngestion) { + const postgres = hub?.postgres ?? createPostgresPool(serverConfig.DATABASE_URL) + const teamManager = hub?.teamManager ?? new TeamManager(postgres, serverConfig) + const s3 = hub?.objectStorage ?? getObjectStorage(serverConfig) + if (!s3) { + throw new Error("Can't start session recording blob ingestion without object storage") + } + const ingester = new SessionRecordingBlobIngester(teamManager, serverConfig, s3) + await ingester.start() + const batchConsumer = ingester.batchConsumer + if (batchConsumer) { + stopSessionRecordingBlobConsumer = () => ingester.stop() + joinSessionRecordingBlobConsumer = () => batchConsumer.join() + healthChecks['session-recordings-blob'] = () => batchConsumer.isHealthy() ?? false + } + } + if (capabilities.http) { httpServer = createHttpServer(healthChecks, analyticsEventsIngestionConsumer, piscina) } @@ -444,6 +466,9 @@ export async function startPluginsServer( if (joinSessionRecordingEventsConsumer) { joinSessionRecordingEventsConsumer().catch(closeJobs) } + if (joinSessionRecordingBlobConsumer) { + joinSessionRecordingBlobConsumer().catch(closeJobs) + } return serverInstance ?? { stop: closeJobs } } catch (error) { diff --git a/plugin-server/src/main/services/object_storage.ts b/plugin-server/src/main/services/object_storage.ts index b95560c27b3..852bb4f7bd2 100644 --- a/plugin-server/src/main/services/object_storage.ts +++ b/plugin-server/src/main/services/object_storage.ts @@ -1,63 +1,69 @@ +import { HeadBucketCommand, S3Client } from '@aws-sdk/client-s3' + import { PluginsServerConfig } from '../../types' import { status } from '../../utils/status' -const aws = require('aws-sdk') - -let S3: typeof aws.S3 | null = null - export interface ObjectStorage { healthcheck: () => Promise + s3: S3Client } +let objectStorage: ObjectStorage | undefined + // Object Storage added without any uses to flush out deployment concerns. // see https://github.com/PostHog/posthog/pull/9901 -export const connectObjectStorage = (serverConfig: Partial): ObjectStorage => { - let storage = { - healthcheck: async () => { - return Promise.resolve(true) // healthy if object storage isn't configured - }, - } - try { - const { - OBJECT_STORAGE_ENDPOINT, - OBJECT_STORAGE_ACCESS_KEY_ID, - OBJECT_STORAGE_SECRET_ACCESS_KEY, - OBJECT_STORAGE_ENABLED, - OBJECT_STORAGE_BUCKET, - } = serverConfig +export const getObjectStorage = (serverConfig: Partial): ObjectStorage | undefined => { + if (!objectStorage) { + try { + const { + OBJECT_STORAGE_ENDPOINT, + OBJECT_STORAGE_REGION, + OBJECT_STORAGE_ACCESS_KEY_ID, + OBJECT_STORAGE_SECRET_ACCESS_KEY, + OBJECT_STORAGE_ENABLED, + OBJECT_STORAGE_BUCKET, + } = serverConfig - if (OBJECT_STORAGE_ENABLED && !S3) { - S3 = new aws.S3({ - endpoint: OBJECT_STORAGE_ENDPOINT, - accessKeyId: OBJECT_STORAGE_ACCESS_KEY_ID, - secretAccessKey: OBJECT_STORAGE_SECRET_ACCESS_KEY, - s3ForcePathStyle: true, // needed with minio? - signatureVersion: 'v4', - }) + if (OBJECT_STORAGE_ENABLED) { + const credentials = + OBJECT_STORAGE_ACCESS_KEY_ID && OBJECT_STORAGE_SECRET_ACCESS_KEY + ? { + accessKeyId: OBJECT_STORAGE_ACCESS_KEY_ID, + secretAccessKey: OBJECT_STORAGE_SECRET_ACCESS_KEY, + } + : undefined - storage = { - healthcheck: async () => { - if (!OBJECT_STORAGE_BUCKET) { - status.error('😢', 'No object storage bucket configured') - return false - } + const S3 = new S3Client({ + region: OBJECT_STORAGE_REGION, + endpoint: OBJECT_STORAGE_ENDPOINT, + credentials, + forcePathStyle: true, // needed with minio? + // signatureVersion: 'v4', + }) - try { - await S3.headBucket({ - Bucket: OBJECT_STORAGE_BUCKET, - }).promise() - return true - } catch (error) { - status.error('💣', 'Could not access bucket:', error) - return false - } - }, + objectStorage = { + healthcheck: async () => { + if (!OBJECT_STORAGE_BUCKET) { + status.error('😢', 'No object storage bucket configured') + return false + } + + try { + await S3.send(new HeadBucketCommand({ Bucket: OBJECT_STORAGE_BUCKET })) + return true + } catch (error) { + status.error('💣', 'Could not access bucket:', error) + return false + } + }, + s3: S3, + } } + } catch (e) { + // only warn here... object storage is not mandatory until after #9901 at the earliest + status.warn('😢', 'could not initialise storage:', e) } - } catch (e) { - // only warn here... object storage is not mandatory until after #9901 at the earliest - status.warn('😢', 'could not initialise storage:', e) } - return storage + return objectStorage } diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 644af118d30..bc102587fff 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -154,10 +154,10 @@ export interface PluginsServerConfig { PERSON_INFO_CACHE_TTL: number KAFKA_HEALTHCHECK_SECONDS: number OBJECT_STORAGE_ENABLED: boolean // Disables or enables the use of object storage. It will become mandatory to use object storage - OBJECT_STORAGE_ENDPOINT: string // minio endpoint + OBJECT_STORAGE_REGION: string // s3 region + OBJECT_STORAGE_ENDPOINT: string // s3 endpoint OBJECT_STORAGE_ACCESS_KEY_ID: string OBJECT_STORAGE_SECRET_ACCESS_KEY: string - OBJECT_STORAGE_SESSION_RECORDING_FOLDER: string // the top level folder for storing session recordings inside the storage bucket OBJECT_STORAGE_BUCKET: string // the object storage bucket name PLUGIN_SERVER_MODE: | 'ingestion' @@ -168,6 +168,7 @@ export interface PluginsServerConfig { | 'scheduler' | 'analytics-ingestion' | 'recordings-ingestion' + | 'recordings-blob-ingestion' | null KAFKAJS_LOG_LEVEL: 'NOTHING' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR' HISTORICAL_EXPORTS_ENABLED: boolean // enables historical exports for export apps @@ -180,6 +181,12 @@ export interface PluginsServerConfig { EVENT_OVERFLOW_BUCKET_CAPACITY: number EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number CLOUD_DEPLOYMENT: string + + SESSION_RECORDING_BLOB_PROCESSING_TEAMS: string + SESSION_RECORDING_LOCAL_DIRECTORY: string + SESSION_RECORDING_MAX_BUFFER_AGE_SECONDS: number + SESSION_RECORDING_MAX_BUFFER_SIZE_KB: number + SESSION_RECORDING_REMOTE_FOLDER: string } export interface Hub extends PluginsServerConfig { @@ -236,6 +243,7 @@ export interface PluginServerCapabilities { processPluginJobs?: boolean processAsyncHandlers?: boolean sessionRecordingIngestion?: boolean + sessionRecordingBlobIngestion?: boolean http?: boolean mmdb?: boolean } diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index f0e6774c3c0..28a1c63fade 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -15,7 +15,7 @@ import { getPluginServerCapabilities } from '../../capabilities' import { defaultConfig } from '../../config/config' import { KAFKAJS_LOG_LEVEL_MAPPING } from '../../config/constants' import { KAFKA_JOBS } from '../../config/kafka-topics' -import { connectObjectStorage } from '../../main/services/object_storage' +import { getObjectStorage } from '../../main/services/object_storage' import { EnqueuedPluginJob, Hub, @@ -160,11 +160,12 @@ export async function createHub( status.info('👍', `Redis ready`) status.info('🤔', `Connecting to object storage...`) - try { - connectObjectStorage(serverConfig) + + const objectStorage = getObjectStorage(serverConfig) + if (objectStorage) { status.info('👍', 'Object storage ready') - } catch (e) { - status.warn('🪣', `Object storage could not be created: ${e}`) + } else { + status.warn('🪣', `Object storage could not be created`) } const promiseManager = new PromiseManager(serverConfig, statsd) @@ -232,6 +233,7 @@ export async function createHub( kafkaProducer, statsd, enqueuePluginJob, + objectStorage: objectStorage, plugins: new Map(), pluginConfigs: new Map(), diff --git a/plugin-server/src/utils/status.ts b/plugin-server/src/utils/status.ts index 57771e2a87e..8eb92072047 100644 --- a/plugin-server/src/utils/status.ts +++ b/plugin-server/src/utils/status.ts @@ -94,6 +94,8 @@ function promptForMode(mode: PluginsServerConfig['PLUGIN_SERVER_MODE']): string return 'INGESTION-OVERFLOW' case 'recordings-ingestion': return 'RECORDINGS-INGESTION' + case 'recordings-blob-ingestion': + return 'RECORDINGS-BLOB-INGESTION' case 'async': return 'ASYNC' case 'exports': diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/offset-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/offset-manager.test.ts new file mode 100644 index 00000000000..c6a544be0c5 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/offset-manager.test.ts @@ -0,0 +1,105 @@ +import { KafkaConsumer } from 'node-rdkafka' + +import { OffsetManager } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/offset-manager' + +describe('offset-manager', () => { + const TOPIC = 'test-session-recordings' + + let offsetManager: OffsetManager + const mockConsumer = { + commit: jest.fn(() => Promise.resolve()), + } + + beforeEach(() => { + mockConsumer.commit.mockClear() + offsetManager = new OffsetManager(mockConsumer as unknown as KafkaConsumer) + }) + + it('collects new offsets', () => { + offsetManager.addOffset(TOPIC, 1, 1) + offsetManager.addOffset(TOPIC, 2, 1) + offsetManager.addOffset(TOPIC, 3, 4) + offsetManager.addOffset(TOPIC, 1, 2) + offsetManager.addOffset(TOPIC, 1, 5) + offsetManager.addOffset(TOPIC, 3, 4) + + expect(offsetManager.offsetsByPartitionTopic).toEqual( + new Map([ + ['test-session-recordings-1', [1, 2, 5]], + ['test-session-recordings-2', [1]], + ['test-session-recordings-3', [4, 4]], + ]) + ) + }) + + it('removes offsets', () => { + offsetManager.addOffset(TOPIC, 1, 1) + offsetManager.addOffset(TOPIC, 2, 1) + offsetManager.addOffset(TOPIC, 3, 4) + offsetManager.addOffset(TOPIC, 1, 2) + offsetManager.addOffset(TOPIC, 1, 5) + offsetManager.addOffset(TOPIC, 3, 4) + + offsetManager.removeOffsets(TOPIC, 1, [1, 2]) + + expect(offsetManager.offsetsByPartitionTopic).toEqual( + new Map([ + ['test-session-recordings-1', [5]], + ['test-session-recordings-2', [1]], + ['test-session-recordings-3', [4, 4]], + ]) + ) + }) + + it.each([ + [[1], 1], + [[2, 5, 10], undefined], + [[1, 2, 3, 9], 3], + ])('commits the appropriate offset ', (removals: number[], expectation: number | null | undefined) => { + ;[1, 2, 3, 4, 5, 6, 7, 8, 9, 10].forEach((offset) => { + offsetManager.addOffset(TOPIC, 1, offset) + }) + + const result = offsetManager.removeOffsets(TOPIC, 1, removals) + + expect(result).toEqual(expectation) + if (result === undefined) { + expect(mockConsumer.commit).toHaveBeenCalledTimes(0) + } else { + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + expect(mockConsumer.commit).toHaveBeenCalledWith({ + offset: result, + partition: 1, + topic: 'test-session-recordings', + }) + } + }) + + it('does not commits revoked partition offsets ', () => { + ;[1, 2, 3, 4, 5, 6, 7, 8, 9, 10].forEach((offset) => { + offsetManager.addOffset(TOPIC, 1, offset) + }) + + offsetManager.addOffset(TOPIC, 1, 1) + offsetManager.addOffset(TOPIC, 2, 2) + offsetManager.addOffset(TOPIC, 3, 3) + + offsetManager.cleanPartitions(TOPIC, [2, 3]) + + const resultOne = offsetManager.removeOffsets(TOPIC, 1, [1]) + expect(resultOne).toEqual(undefined) + expect(mockConsumer.commit).toHaveBeenCalledTimes(0) + + mockConsumer.commit.mockClear() + + const resultTwo = offsetManager.removeOffsets(TOPIC, 2, [2]) + expect(resultTwo).toEqual(2) + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + + mockConsumer.commit.mockClear() + + const resultThree = offsetManager.removeOffsets(TOPIC, 3, [3]) + expect(resultThree).toEqual(3) + expect(mockConsumer.commit).toHaveBeenCalledTimes(1) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-manager.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-manager.test.ts new file mode 100644 index 00000000000..ffe45e978bd --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/session-manager.test.ts @@ -0,0 +1,102 @@ +import fs from 'node:fs' + +import { defaultConfig } from '../../../../../src/config/config' +import { SessionManager } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/session-manager' +import { compressToString } from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils' +import { createChunkedIncomingRecordingMessage, createIncomingRecordingMessage } from '../fixtures' + +describe('session-manager', () => { + let sessionManager: SessionManager + const mockFinish = jest.fn() + const mockS3Client: any = { + send: jest.fn(), + } + + beforeEach(() => { + sessionManager = new SessionManager(defaultConfig, mockS3Client, 1, 'session_id_1', 1, 'topic', mockFinish) + mockFinish.mockClear() + }) + + it('adds a message', async () => { + const payload = JSON.stringify([{ simple: 'data' }]) + const event = createIncomingRecordingMessage({ + data: compressToString(payload), + }) + await sessionManager.add(event) + + expect(sessionManager.buffer).toEqual({ + count: 1, + createdAt: expect.any(Date), + file: expect.any(String), + id: expect.any(String), + size: 61, // The size of the event payload - this may change when test data changes + offsets: [1], + }) + const fileContents = JSON.parse(fs.readFileSync(sessionManager.buffer.file, 'utf-8')) + expect(fileContents.data).toEqual(payload) + }) + + it('flushes messages', async () => { + const event = createIncomingRecordingMessage() + await sessionManager.add(event) + expect(sessionManager.buffer.count).toEqual(1) + const file = sessionManager.buffer.file + expect(fs.existsSync(file)).toEqual(true) + + const afterResumeFlushPromise = sessionManager.flush() + + expect(sessionManager.buffer.count).toEqual(0) + expect(sessionManager.flushBuffer?.count).toEqual(1) + + await afterResumeFlushPromise + + expect(sessionManager.flushBuffer).toEqual(undefined) + expect(mockFinish).toBeCalledTimes(1) + expect(fs.existsSync(file)).toEqual(false) + }) + + it('flushes messages and whilst collecting new ones', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage() + await sessionManager.add(event) + expect(sessionManager.buffer.count).toEqual(1) + + const flushPromise = sessionManager.flush() + await sessionManager.add(event2) + + expect(sessionManager.buffer.count).toEqual(1) + expect(sessionManager.flushBuffer?.count).toEqual(1) + + await flushPromise + + expect(sessionManager.flushBuffer).toEqual(undefined) + expect(sessionManager.buffer.count).toEqual(1) + }) + + it('chunks incoming messages', async () => { + const events = createChunkedIncomingRecordingMessage(3, { + data: compressToString(JSON.stringify([{ simple: 'data' }])), + }) + + expect(events.length).toEqual(3) + + expect(events[0].data.length).toBeGreaterThan(1) + expect(events[1].data.length).toBeGreaterThan(1) + expect(events[2].data.length).toBeGreaterThan(1) + + await sessionManager.add(events[0]) + expect(sessionManager.buffer.count).toEqual(0) + expect(sessionManager.chunks.size).toEqual(1) + + await sessionManager.add(events[2]) + expect(sessionManager.buffer.count).toEqual(0) + expect(sessionManager.chunks.size).toEqual(1) + + await sessionManager.add(events[1]) + expect(sessionManager.buffer.count).toEqual(1) + expect(sessionManager.chunks.size).toEqual(0) + + const fileContents = JSON.parse(fs.readFileSync(sessionManager.buffer.file, 'utf-8')) + expect(fileContents.data).toEqual('[{"simple":"data"}]') + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/utils.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/utils.test.ts new file mode 100644 index 00000000000..192b27e211f --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/blob-ingester/utils.test.ts @@ -0,0 +1,22 @@ +import { + compressToString, + decompressFromString, +} from '../../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils' + +// NOTE: This was copied from the output of the related python ingestion code +const compressedData = + 'H4sIAFUyHGQC/1WOwQ6CMBBE36cYzh5QFMWf8AOMB6KQ9IASC4nE+OvqlK4kpNl2OjO7O9/PiRcJHQMtldCBBRlL3QlXSimlscHnudPz4DJ5V+ZtpXic/E7oJhz1OP9pv5wdhXUMxgUmNc5p5zxDmNdo25Faxwt15kh5c1bNfX5M3CjPP1/cudVbsFGtNTtjP3b/AMlNphkAAQAA' + +describe('compression', () => { + it('should compress and decompress a string consistently', () => { + const compressedAndDecompressed = decompressFromString(compressToString('hello world')) + expect(compressedAndDecompressed).toEqual('hello world') + }) + + it('should decompress string from the python version', () => { + const decompressed = decompressFromString(compressedData) + expect(decompressed).toEqual( + `[{"type": 3, "data": {"source": 1, "positions": [{"x": 679, "y": 790, "id": 3, "timeOffset": 0}]}, "timestamp": 1679569492338}]` + ) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/data/snapshot-full.json b/plugin-server/tests/main/ingestion-queues/session-recording/data/snapshot-full.json new file mode 100644 index 00000000000..4d4edce44e3 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/data/snapshot-full.json @@ -0,0 +1,316 @@ +{ + "event": "$snapshot", + "properties": { + "$snapshot_data": { + "type": 2, + "data": { + "node": { + "type": 0, + "childNodes": [ + { "type": 1, "name": "html", "publicId": "", "systemId": "", "id": 2 }, + { + "type": 2, + "tagName": "html", + "attributes": { "lang": "en" }, + "childNodes": [ + { + "type": 2, + "tagName": "head", + "attributes": {}, + "childNodes": [ + { + "type": 2, + "tagName": "style", + "attributes": { "data-next-hide-fouc": "true" }, + "childNodes": [ + { + "type": 3, + "textContent": "body { display: none; }", + "isStyle": true, + "id": 6 + } + ], + "id": 5 + }, + { + "type": 2, + "tagName": "noscript", + "attributes": { "data-next-hide-fouc": "true" }, + "childNodes": [ + { + "type": 3, + "textContent": "", + "id": 8 + } + ], + "id": 7 + }, + { + "type": 2, + "tagName": "meta", + "attributes": { "charset": "utf-8" }, + "childNodes": [], + "id": 9 + }, + { + "type": 2, + "tagName": "title", + "attributes": {}, + "childNodes": [{ "type": 3, "textContent": "PostHog", "id": 11 }], + "id": 10 + }, + { + "type": 2, + "tagName": "meta", + "attributes": { + "name": "viewport", + "content": "width=device-width, initial-scale=1" + }, + "childNodes": [], + "id": 12 + }, + { + "type": 2, + "tagName": "meta", + "attributes": { "name": "next-head-count", "content": "3" }, + "childNodes": [], + "id": 13 + }, + { + "type": 2, + "tagName": "noscript", + "attributes": { "data-n-css": "" }, + "childNodes": [], + "id": 14 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "defer": "", + "nomodule": "", + "src": "http://localhost:3001/_next/static/chunks/polyfills.js?ts=1679568313753" + }, + "childNodes": [], + "id": 15 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "src": "http://localhost:3001/_next/static/chunks/webpack.js?ts=1679568313753", + "defer": "" + }, + "childNodes": [], + "id": 16 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "src": "http://localhost:3001/_next/static/chunks/main.js?ts=1679568313753", + "defer": "" + }, + "childNodes": [], + "id": 17 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "src": "http://localhost:3001/_next/static/chunks/pages/_app.js?ts=1679568313753", + "defer": "" + }, + "childNodes": [], + "id": 18 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "src": "http://localhost:3001/_next/static/chunks/pages/index.js?ts=1679568313753", + "defer": "" + }, + "childNodes": [], + "id": 19 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "src": "http://localhost:3001/_next/static/development/_buildManifest.js?ts=1679568313753", + "defer": "" + }, + "childNodes": [], + "id": 20 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "src": "http://localhost:3001/_next/static/development/_ssgManifest.js?ts=1679568313753", + "defer": "" + }, + "childNodes": [], + "id": 21 + }, + { + "type": 2, + "tagName": "style", + "attributes": {}, + "childNodes": [ + { + "type": 3, + "textContent": "main { margin: 0px auto; max-width: 1200px; padding: 2rem; font-family: helvetica, arial, sans-serif; }.buttons { display: flex; gap: 0.5rem; }", + "isStyle": true, + "id": 23 + } + ], + "id": 22 + }, + { + "type": 2, + "tagName": "noscript", + "attributes": { "id": "__next_css__DO_NOT_USE__" }, + "childNodes": [], + "id": 24 + } + ], + "id": 4 + }, + { + "type": 2, + "tagName": "body", + "attributes": {}, + "childNodes": [ + { + "type": 2, + "tagName": "div", + "attributes": { "id": "__next" }, + "childNodes": [ + { + "type": 2, + "tagName": "main", + "attributes": {}, + "childNodes": [ + { + "type": 2, + "tagName": "h1", + "attributes": {}, + "childNodes": [ + { "type": 3, "textContent": "PostHog React", "id": 29 } + ], + "id": 28 + }, + { + "type": 2, + "tagName": "div", + "attributes": { "class": "buttons" }, + "childNodes": [ + { + "type": 2, + "tagName": "button", + "attributes": {}, + "childNodes": [ + { + "type": 3, + "textContent": "Capture event", + "id": 32 + } + ], + "id": 31 + }, + { + "type": 2, + "tagName": "button", + "attributes": { "data-attr": "autocapture-button" }, + "childNodes": [ + { + "type": 3, + "textContent": "Autocapture buttons", + "id": 34 + } + ], + "id": 33 + }, + { + "type": 2, + "tagName": "button", + "attributes": { + "class": "ph-no-capture", + "rr_width": "0px", + "rr_height": "0px" + }, + "childNodes": [], + "id": 35 + } + ], + "id": 30 + }, + { + "type": 2, + "tagName": "p", + "attributes": {}, + "childNodes": [ + { + "type": 3, + "textContent": "Feature flag response: ", + "id": 37 + } + ], + "id": 36 + } + ], + "id": 27 + } + ], + "id": 26 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "type": "text/javascript", + "src": "http://localhost:8000/static/recorder.js?v=1.51.2" + }, + "childNodes": [], + "id": 38 + }, + { + "type": 2, + "tagName": "script", + "attributes": { + "src": "http://localhost:3001/_next/static/chunks/react-refresh.js?ts=1679568313753" + }, + "childNodes": [], + "id": 39 + }, + { + "type": 2, + "tagName": "script", + "attributes": { "id": "__NEXT_DATA__", "type": "application/json" }, + "childNodes": [ + { "type": 3, "textContent": "SCRIPT_PLACEHOLDER", "id": 41 } + ], + "id": 40 + } + ], + "id": 25 + } + ], + "id": 3 + } + ], + "id": 1 + }, + "initialOffset": { "left": 0, "top": 0 } + }, + "timestamp": 1679568314158 + }, + "$session_id": "1870e0e4f40d73-07e335ea680392-1e525634-384000-1870e0e4f412a52", + "$window_id": "1870e0e4f422d99-042f3b7593666e-1e525634-384000-1870e0e4f432e59", + "token": "phc_6L6EWZMlGPXWTSbwpqChJM6IuR0XV4I98wMjgLD8Yds", + "distinct_id": "64H9Wag8cxxCaBtb4tTG1ENFQ8K9fqIQPm2zEq6JOgg" + }, + "offset": 2984 +} diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/data/snapshot-initial.json b/plugin-server/tests/main/ingestion-queues/session-recording/data/snapshot-initial.json new file mode 100644 index 00000000000..0b42e0f5a5c --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/data/snapshot-initial.json @@ -0,0 +1,15 @@ +{ + "event": "$snapshot", + "properties": { + "$snapshot_data": { + "type": 4, + "data": { "href": "http://localhost:3001/", "width": 2560, "height": 832 }, + "timestamp": 1679568314155 + }, + "$session_id": "1870e0e4f40d73-07e335ea680392-1e525634-384000-1870e0e4f412a52", + "$window_id": "1870e0e4f422d99-042f3b7593666e-1e525634-384000-1870e0e4f432e59", + "token": "phc_6L6EWZMlGPXWTSbwpqChJM6IuR0XV4I98wMjgLD8Yds", + "distinct_id": "64H9Wag8cxxCaBtb4tTG1ENFQ8K9fqIQPm2zEq6JOgg" + }, + "offset": 2987 +} diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts new file mode 100644 index 00000000000..797526b897c --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -0,0 +1,59 @@ +import { randomUUID } from 'node:crypto' + +import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/types' +import { compressToString } from '../../../../src/main/ingestion-queues/session-recording/blob-ingester/utils' +import jsonFullSnapshot from './data/snapshot-full.json' + +export function createIncomingRecordingMessage(data: Partial = {}): IncomingRecordingMessage { + return { + metadata: { + topic: 'session_recording_events', + partition: 1, + offset: 1, + }, + + team_id: 1, + distinct_id: 'distinct_id', + session_id: 'session_id_1', + window_id: 'window_id_1', + + // Properties data + chunk_id: 'chunk_id_1', + chunk_index: 0, + chunk_count: 1, + data: compressToString(JSON.stringify(jsonFullSnapshot)), + compresssion: 'gzip-base64', + has_full_snapshot: true, + events_summary: [ + { + timestamp: 1679568043305, + type: 4, + data: { href: 'http://localhost:3001/', width: 2560, height: 1304 }, + }, + ], + ...data, + } +} + +export function createChunkedIncomingRecordingMessage( + chunks: number, + data: Partial = {} +): IncomingRecordingMessage[] { + const chunkId = randomUUID() + const coreMessage = createIncomingRecordingMessage(data) + const chunkLength = coreMessage.data.length / chunks + const messages: IncomingRecordingMessage[] = [] + + // Iterate over chunks count and clone the core message with the data split into chunks + for (let i = 0; i < chunks; i++) { + messages.push({ + ...coreMessage, + chunk_id: chunkId, + chunk_index: i, + chunk_count: chunks, + data: coreMessage.data.slice(i * chunkLength, (i + 1) * chunkLength), + }) + } + + return messages +} diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts new file mode 100644 index 00000000000..d8387f6dfc8 --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer-rebalancing.test.ts @@ -0,0 +1,60 @@ +import { waitForExpect } from '../../../../functional_tests/expectations' +import { defaultConfig } from '../../../../src/config/config' +import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer' +import { Hub } from '../../../../src/types' +import { createHub } from '../../../../src/utils/db/hub' +import { UUIDT } from '../../../../src/utils/utils' +import { createIncomingRecordingMessage } from './fixtures' + +function assertIngesterHasExpectedPartitions(ingester: SessionRecordingBlobIngester, expectedPartitions: number[]) { + const partitions: Set = new Set() + ingester.sessions.forEach((session) => { + partitions.add(session.partition) + }) + expect(Array.from(partitions)).toEqual(expectedPartitions) +} + +describe('ingester rebalancing tests', () => { + let ingesterOne: SessionRecordingBlobIngester + let ingesterTwo: SessionRecordingBlobIngester + + let hub: Hub + let closeHub: () => Promise + + beforeEach(async () => { + ;[hub, closeHub] = await createHub() + }) + + afterEach(async () => { + await ingesterOne?.stop() + await ingesterTwo?.stop() + await closeHub() + }) + + it('rebalances partitions safely from one to two consumers', async () => { + ingesterOne = new SessionRecordingBlobIngester(hub.teamManager, defaultConfig, hub.objectStorage) + + await ingesterOne.start() + + await ingesterOne.consume( + createIncomingRecordingMessage({ session_id: new UUIDT().toString(), chunk_count: 2 }) + ) + await ingesterOne.consume( + createIncomingRecordingMessage({ session_id: new UUIDT().toString(), chunk_count: 2 }) + ) + + await waitForExpect(() => { + assertIngesterHasExpectedPartitions(ingesterOne, [1]) + }) + + ingesterTwo = new SessionRecordingBlobIngester(hub.teamManager, defaultConfig, hub.objectStorage) + await ingesterTwo.start() + + await waitForExpect(() => { + assertIngesterHasExpectedPartitions(ingesterOne, [1]) + + // only one partition so nothing for the new consumer to do + assertIngesterHasExpectedPartitions(ingesterTwo, []) + }) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts new file mode 100644 index 00000000000..27038e893ff --- /dev/null +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-blob-consumer.test.ts @@ -0,0 +1,50 @@ +import { defaultConfig } from '../../../../src/config/config' +import { SessionRecordingBlobIngester } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-blob-consumer' +import { Hub } from '../../../../src/types' +import { createHub } from '../../../../src/utils/db/hub' +import { createIncomingRecordingMessage } from './fixtures' + +describe('ingester', () => { + let ingester: SessionRecordingBlobIngester + + let hub: Hub + let closeHub: () => Promise + + beforeEach(async () => { + ;[hub, closeHub] = await createHub() + }) + + afterEach(async () => { + await closeHub() + }) + + beforeEach(() => { + ingester = new SessionRecordingBlobIngester(hub.teamManager, defaultConfig, hub.objectStorage) + }) + + it('creates a new session manager if needed', async () => { + const event = createIncomingRecordingMessage() + await ingester.consume(event) + expect(ingester.sessions.size).toBe(1) + expect(ingester.sessions.has('1-session_id_1')).toEqual(true) + }) + + it('handles multiple incoming sessions', async () => { + const event = createIncomingRecordingMessage() + const event2 = createIncomingRecordingMessage({ + session_id: 'session_id_2', + }) + await Promise.all([ingester.consume(event), ingester.consume(event2)]) + expect(ingester.sessions.size).toBe(2) + expect(ingester.sessions.has('1-session_id_1')).toEqual(true) + expect(ingester.sessions.has('1-session_id_2')).toEqual(true) + }) + + it('destroys a session manager if finished', async () => { + const event = createIncomingRecordingMessage() + await ingester.consume(event) + expect(ingester.sessions.has('1-session_id_1')).toEqual(true) + await ingester.sessions.get('1-session_id_1')?.flush() + expect(ingester.sessions.has('1-session_id_1')).toEqual(false) + }) +}) diff --git a/plugin-server/tests/main/ingestion-queues/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts similarity index 87% rename from plugin-server/tests/main/ingestion-queues/session-recordings-consumer.test.ts rename to plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index c28c96694dc..dd2654750ce 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -1,10 +1,10 @@ import LibrdKafkaError from 'node-rdkafka/lib/error' import { Pool } from 'pg' -import { defaultConfig } from '../../../src/config/config' -import { eachBatch } from '../../../src/main/ingestion-queues/session-recordings-consumer' -import { TeamManager } from '../../../src/worker/ingestion/team-manager' -import { createOrganization, createTeam } from '../../helpers/sql' +import { defaultConfig } from '../../../../src/config/config' +import { eachBatch } from '../../../../src/main/ingestion-queues/session-recording/session-recordings-consumer' +import { TeamManager } from '../../../../src/worker/ingestion/team-manager' +import { createOrganization, createTeam } from '../../../helpers/sql' describe('session-recordings-consumer', () => { const producer = {