diff --git a/plugin-server/README.md b/plugin-server/README.md index 0272937b9fe..0603e69d0d0 100644 --- a/plugin-server/README.md +++ b/plugin-server/README.md @@ -61,6 +61,7 @@ testing: DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \ PLUGINS_DEFAULT_LOG_LEVEL=0 \ RELOAD_PLUGIN_JITTER_MAX_MS=0 \ + PLUGIN_SERVER_MODE=functional-tests \ pnpm start:dev ``` 1. run the tests: diff --git a/plugin-server/bin/ci_functional_tests.sh b/plugin-server/bin/ci_functional_tests.sh index 905dc2caadf..2d89a623624 100755 --- a/plugin-server/bin/ci_functional_tests.sh +++ b/plugin-server/bin/ci_functional_tests.sh @@ -19,6 +19,7 @@ export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious erro export APP_METRICS_GATHERED_FOR_ALL=true export PLUGINS_DEFAULT_LOG_LEVEL=0 # All logs, as debug logs are used in synchronization barriers export NODE_ENV=production-functional-tests +export PLUGIN_SERVER_MODE=functional-tests # running all capabilities is too slow # Not important at all, but I like to see nice red/green for tests export FORCE_COLOR=true diff --git a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts index c1e45d476eb..e51285e44f0 100644 --- a/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts +++ b/plugin-server/functional_tests/analytics-ingestion/happy-path.test.ts @@ -1,5 +1,14 @@ import { UUIDT } from '../../src/utils/utils' -import { capture, createOrganization, createTeam, fetchEvents, fetchGroups, fetchPersons, getMetric } from '../api' +import { + capture, + createOrganization, + createTeam, + fetchEvents, + fetchGroups, + fetchIngestionWarnings, + fetchPersons, + getMetric, +} from '../api' import { waitForExpect } from '../expectations' let organizationId: string @@ -8,6 +17,32 @@ beforeAll(async () => { organizationId = await createOrganization() }) +test.concurrent(`event ingestion: handles $$client_ingestion_warning events`, async () => { + const teamId = await createTeam(organizationId) + const distinctId = new UUIDT().toString() + + await capture({ + teamId, + distinctId, + uuid: new UUIDT().toString(), + event: '$$client_ingestion_warning', + properties: { + $$client_ingestion_warning_message: 'test message', + }, + }) + + await waitForExpect(async () => { + const events = await fetchIngestionWarnings(teamId) + expect(events).toEqual([ + expect.objectContaining({ + type: 'client_ingestion_warning', + team_id: teamId, + details: expect.objectContaining({ message: 'test message' }), + }), + ]) + }) +}) + test.concurrent(`event ingestion: can set and update group properties`, async () => { const teamId = await createTeam(organizationId) const distinctId = new UUIDT().toString() diff --git a/plugin-server/functional_tests/api.ts b/plugin-server/functional_tests/api.ts index 35e728843db..dd9392b726b 100644 --- a/plugin-server/functional_tests/api.ts +++ b/plugin-server/functional_tests/api.ts @@ -64,6 +64,8 @@ export const capture = async ({ $set_once = undefined, topic = ['$performance_event', '$snapshot_items'].includes(event) ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + : ['$$client_ingestion_warning'].includes(event) + ? 'client_iwarnings_ingestion' : 'events_plugin_ingestion', }: { teamId: number | null @@ -266,6 +268,16 @@ export const reloadAction = async (teamId: number, actionId: number) => { await redis.publish('reload-action', JSON.stringify({ teamId, actionId })) } +export const fetchIngestionWarnings = async (teamId: number) => { + const queryResult = (await clickHouseClient.querying(` + SELECT *, + FROM ingestion_warnings + WHERE team_id = ${teamId} + ORDER BY timestamp ASC + `)) as unknown as ClickHouse.ObjectQueryResult + return queryResult.data.map((warning) => ({ ...warning, details: JSON.parse(warning.details) })) +} + export const fetchEvents = async (teamId: number, uuid?: string) => { const queryResult = (await clickHouseClient.querying(` SELECT *, diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index 8f45d91d9cc..9bfe5a64215 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -14,6 +14,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin ingestion: true, ingestionOverflow: true, ingestionHistorical: true, + eventsIngestionPipelines: true, // with null PluginServerMode we run all of them pluginScheduledTasks: true, processPluginJobs: true, processAsyncOnEventHandlers: true, @@ -47,6 +48,12 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin ingestionHistorical: true, ...sharedCapabilities, } + case PluginServerMode.events_ingestion: + return { + mmdb: true, + eventsIngestionPipelines: true, + ...sharedCapabilities, + } case PluginServerMode.analytics_ingestion: return { mmdb: true, @@ -99,5 +106,22 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin cdpFunctionOverflow: true, ...sharedCapabilities, } + // This is only for functional tests, which time out if all capabilities are used + // ideally we'd run just the specific capability needed per test, but that's not easy to do atm + case PluginServerMode.functional_tests: + return { + mmdb: true, + ingestion: true, + ingestionHistorical: true, + eventsIngestionPipelines: true, + pluginScheduledTasks: true, + processPluginJobs: true, + processAsyncOnEventHandlers: true, + processAsyncWebhooksHandlers: true, + sessionRecordingBlobIngestion: true, + appManagementSingleton: true, + preflightSchedules: true, + ...sharedCapabilities, + } } } diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index f0a646e47d9..6437dbc56d9 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -119,6 +119,7 @@ export function getDefaultConfig(): PluginsServerConfig { OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password', OBJECT_STORAGE_BUCKET: 'posthog', PLUGIN_SERVER_MODE: null, + PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE: null, PLUGIN_LOAD_SEQUENTIALLY: false, KAFKAJS_LOG_LEVEL: 'WARN', APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false, diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 83ca0f07ea4..2f095e117f7 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -2,7 +2,7 @@ import { isTestEnv } from '../utils/env-utils' -const suffix = isTestEnv() ? '_test' : '' +export const suffix = isTestEnv() ? '_test' : '' export const prefix = process.env.KAFKA_PREFIX || '' export const KAFKA_EVENTS_JSON = `${prefix}clickhouse_events_json${suffix}` diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 5ab1eeacbef..245c2edc77b 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -33,6 +33,7 @@ export enum IngestionOverflowMode { RerouteRandomly, // discards partition locality ConsumeSplitByDistinctId, ConsumeSplitEvenly, + ConsumeSplitEventlyWithoutIngestionWarning, } type IngestionSplitBatch = { @@ -300,10 +301,12 @@ export function splitIngestionBatch( overflowMode ) - if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) { + if ( + overflowMode === IngestionOverflowMode.ConsumeSplitEvenly || + overflowMode === IngestionOverflowMode.ConsumeSplitEventlyWithoutIngestionWarning + ) { /** - * Grouping by distinct_id is inefficient here, because only a few ones are overflowing - * at a time. When messages are sent to overflow, we already give away the ordering guarantee, + * Grouping by distinct_id is not necessary here, we already give away the ordering guarantee, * so we just return batches of one to increase concurrency. * TODO: add a PipelineEvent[] field to IngestionSplitBatch for batches of 1 */ diff --git a/plugin-server/src/main/ingestion-queues/events-ingestion-consumer.ts b/plugin-server/src/main/ingestion-queues/events-ingestion-consumer.ts new file mode 100644 index 00000000000..695185b8074 --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/events-ingestion-consumer.ts @@ -0,0 +1,63 @@ +import { Message } from 'node-rdkafka' + +import { buildStringMatcher } from '../../config/config' +import { prefix as KAFKA_PREFIX, suffix as KAFKA_SUFFIX } from '../../config/kafka-topics' +import { Hub } from '../../types' +import { status } from '../../utils/status' +import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion' +import { IngestionConsumer } from './kafka-queue' + +export type PipelineType = { + topic: string + consumer_group: string +} + +export const PIPELINES: { [key: string]: PipelineType } = { + ingestion_warnings: { + topic: 'client_iwarnings_ingestion', + consumer_group: 'client_iwarnings_ingestion', + }, + heatmaps: { + topic: 'heatmaps_ingestion', + consumer_group: 'heatmaps_ingestion', + }, + exceptions: { + topic: 'exceptions_ingestion', + consumer_group: 'exceptions_ingestion', + }, +} + +export const startEventsIngestionPipelineConsumer = async ({ + hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection. + pipeline, +}: { + hub: Hub + pipeline: PipelineType +}) => { + /* + Consumes events from the topic and consumer passed in. + */ + const kafka_topic = `${KAFKA_PREFIX}${pipeline.topic}${KAFKA_SUFFIX}` + const kafka_consumer = `${KAFKA_PREFIX}${pipeline.consumer_group}` + status.info( + '🔁', + `Starting events ingestion pipeline on topic ${kafka_topic} consumer ${kafka_consumer} with rdkafka` + ) + + const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false) + // No overflow and split all events evenly, i.e. there's no ordering guarantees here. + const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise => { + await eachBatchParallelIngestion( + tokenBlockList, + messages, + queue, + IngestionOverflowMode.ConsumeSplitEventlyWithoutIngestionWarning + ) + } + + const queue = new IngestionConsumer(hub, kafka_topic, kafka_consumer, batchHandler) + + const { isHealthy } = await queue.start() + + return { queue, isHealthy } +} diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 54ddfaff521..d8d619be7e7 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -34,6 +34,11 @@ import { startGraphileWorker } from './graphile-worker/worker-setup' import { startAnalyticsEventsIngestionConsumer } from './ingestion-queues/analytics-events-ingestion-consumer' import { startAnalyticsEventsIngestionHistoricalConsumer } from './ingestion-queues/analytics-events-ingestion-historical-consumer' import { startAnalyticsEventsIngestionOverflowConsumer } from './ingestion-queues/analytics-events-ingestion-overflow-consumer' +import { + PIPELINES, + PipelineType, + startEventsIngestionPipelineConsumer, +} from './ingestion-queues/events-ingestion-consumer' import { startJobsConsumer } from './ingestion-queues/jobs-consumer' import { IngestionConsumer, KafkaJSIngestionConsumer } from './ingestion-queues/kafka-queue' import { @@ -102,6 +107,7 @@ export async function startPluginsServer( let analyticsEventsIngestionConsumer: IngestionConsumer | undefined let analyticsEventsIngestionOverflowConsumer: IngestionConsumer | undefined let analyticsEventsIngestionHistoricalConsumer: IngestionConsumer | undefined + let eventsIngestionConsumer: Map | undefined let onEventHandlerConsumer: KafkaJSIngestionConsumer | undefined let stopWebhooksHandlerConsumer: () => Promise | undefined @@ -149,6 +155,7 @@ export async function startPluginsServer( analyticsEventsIngestionConsumer?.stop(), analyticsEventsIngestionOverflowConsumer?.stop(), analyticsEventsIngestionHistoricalConsumer?.stop(), + ...Array.from(eventsIngestionConsumer?.values() || []).map((consumer) => consumer.stop()), onEventHandlerConsumer?.stop(), stopWebhooksHandlerConsumer?.(), bufferConsumer?.disconnect(), @@ -334,6 +341,36 @@ export async function startPluginsServer( healthChecks['analytics-ingestion-historical'] = isAnalyticsEventsIngestionHistoricalHealthy } + if (capabilities.eventsIngestionPipelines) { + async function start(pipelineKey: string, pipeline: PipelineType) { + ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) + serverInstance = serverInstance ? serverInstance : { hub } + piscina = piscina ?? (await makePiscina(serverConfig, hub)) + const { queue, isHealthy: isHealthy } = await startEventsIngestionPipelineConsumer({ + hub: hub, + pipeline: pipeline, + }) + + eventsIngestionConsumer = eventsIngestionConsumer ?? new Map() + eventsIngestionConsumer.set(pipelineKey, queue) + shutdownOnConsumerExit(eventsIngestionConsumer.get(pipelineKey)!.consumer!) + healthChecks[`events-ingestion-pipeline-${pipelineKey}`] = isHealthy + } + if (serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE === null) { + for (const pipelineKey in PIPELINES) { + await start(pipelineKey, PIPELINES[pipelineKey]) + } + } else { + // Validate we have a valid pipeline + const pipelineKey = serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE + if (pipelineKey === null || !PIPELINES[pipelineKey]) { + throw new Error(`Invalid events ingestion pipeline: ${pipelineKey}`) + } + const pipeline: PipelineType = PIPELINES[pipelineKey] + await start(pipelineKey, pipeline) + } + } + if (capabilities.ingestionOverflow) { ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) serverInstance = serverInstance ? serverInstance : { hub } diff --git a/plugin-server/src/sentry.ts b/plugin-server/src/sentry.ts index 960da7de159..d798820549f 100644 --- a/plugin-server/src/sentry.ts +++ b/plugin-server/src/sentry.ts @@ -39,6 +39,7 @@ export function initSentry(config: PluginsServerConfig): void { tags: { PLUGIN_SERVER_MODE: config.PLUGIN_SERVER_MODE, DEPLOYMENT: config.CLOUD_DEPLOYMENT, + PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE: config.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE, }, }, release, diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index b0281437d19..762a994d0c8 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -74,6 +74,7 @@ export enum PluginServerMode { ingestion = 'ingestion', ingestion_overflow = 'ingestion-overflow', ingestion_historical = 'ingestion-historical', + events_ingestion = 'events-ingestion', async_onevent = 'async-onevent', async_webhooks = 'async-webhooks', jobs = 'jobs', @@ -84,6 +85,7 @@ export enum PluginServerMode { cdp_processed_events = 'cdp-processed-events', cdp_function_callbacks = 'cdp-function-callbacks', cdp_function_overflow = 'cdp-function-overflow', + functional_tests = 'functional-tests', } export const stringToPluginServerMode = Object.fromEntries( @@ -204,6 +206,7 @@ export interface PluginsServerConfig extends CdpConfig { OBJECT_STORAGE_SECRET_ACCESS_KEY: string OBJECT_STORAGE_BUCKET: string // the object storage bucket name PLUGIN_SERVER_MODE: PluginServerMode | null + PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE: string | null // TODO: shouldn't be a string probably PLUGIN_LOAD_SEQUENTIALLY: boolean // could help with reducing memory usage spikes on startup KAFKAJS_LOG_LEVEL: 'NOTHING' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR' APP_METRICS_GATHERED_FOR_ALL: boolean // whether to gather app metrics for all teams @@ -325,6 +328,7 @@ export interface PluginServerCapabilities { ingestion?: boolean ingestionOverflow?: boolean ingestionHistorical?: boolean + eventsIngestionPipelines?: boolean pluginScheduledTasks?: boolean processPluginJobs?: boolean processAsyncOnEventHandlers?: boolean diff --git a/plugin-server/tests/e2e.timeout.test.ts b/plugin-server/tests/e2e.timeout.test.ts index 222f1718ea5..0bde3412315 100644 --- a/plugin-server/tests/e2e.timeout.test.ts +++ b/plugin-server/tests/e2e.timeout.test.ts @@ -40,7 +40,7 @@ describe('e2e ingestion timeout', () => { } `) await resetTestDatabaseClickhouse(extraServerConfig) - const startResponse = await startPluginsServer(extraServerConfig, makePiscina) + const startResponse = await startPluginsServer(extraServerConfig, makePiscina, { ingestion: true }) hub = startResponse.hub stopServer = startResponse.stop posthog = createPosthog(hub, pluginConfig39) diff --git a/plugin-server/tests/server.test.ts b/plugin-server/tests/server.test.ts index b337691e8f6..3f497be0370 100644 --- a/plugin-server/tests/server.test.ts +++ b/plugin-server/tests/server.test.ts @@ -19,10 +19,7 @@ function numberOfScheduledJobs() { describe('server', () => { let pluginsServer: Partial | null = null - function createPluginServer( - config: Partial = {}, - capabilities: PluginServerCapabilities | undefined = undefined - ) { + function createPluginServer(config: Partial, capabilities: PluginServerCapabilities) { return startPluginsServer( { WORKER_CONCURRENCY: 2, @@ -43,20 +40,94 @@ describe('server', () => { pluginsServer = null }) - test('startPluginsServer does not error', async () => { + // Running all capabilities together takes too long in tests, so they are split up + test('startPluginsServer does not error - ingestion', async () => { const testCode = ` async function processEvent (event) { return event } ` await resetTestDatabase(testCode) - pluginsServer = await createPluginServer() + pluginsServer = await createPluginServer( + {}, + { + http: true, + mmdb: true, + ingestion: true, + ingestionOverflow: true, + ingestionHistorical: true, + appManagementSingleton: true, + preflightSchedules: true, + } + ) + }) + test('startPluginsServer does not error - pipelines', async () => { + const testCode = ` + async function processEvent (event) { + return event + } + ` + await resetTestDatabase(testCode) + pluginsServer = await createPluginServer( + {}, + { + http: true, + eventsIngestionPipelines: true, + } + ) + }) + + test('startPluginsServer does not error - cdp', async () => { + const testCode = ` + async function processEvent (event) { + return event + } + ` + await resetTestDatabase(testCode) + pluginsServer = await createPluginServer( + {}, + { + http: true, + pluginScheduledTasks: true, + processPluginJobs: true, + processAsyncOnEventHandlers: true, + processAsyncWebhooksHandlers: true, + cdpProcessedEvents: true, + cdpFunctionCallbacks: true, + cdpFunctionOverflow: true, + } + ) + }) + + test('startPluginsServer does not error - replay', async () => { + const testCode = ` + async function processEvent (event) { + return event + } + ` + await resetTestDatabase(testCode) + pluginsServer = await createPluginServer( + {}, + { + http: true, + sessionRecordingBlobIngestion: true, + sessionRecordingBlobOverflowIngestion: true, + } + ) }) test('starting and stopping node-schedule scheduled jobs', async () => { expect(numberOfScheduledJobs()).toEqual(0) - pluginsServer = await createPluginServer() + pluginsServer = await createPluginServer( + {}, + { + http: true, + pluginScheduledTasks: true, + processAsyncWebhooksHandlers: true, + preflightSchedules: true, + } + ) expect(numberOfScheduledJobs()).toBeGreaterThan(1) @@ -67,8 +138,11 @@ describe('server', () => { }) describe('plugin-server capabilities', () => { - test('starts all main services by default', async () => { - pluginsServer = await createPluginServer() + test('starts graphile for scheduled tasks capability', async () => { + pluginsServer = await createPluginServer( + {}, + { ingestion: true, pluginScheduledTasks: true, processPluginJobs: true } + ) expect(startGraphileWorker).toHaveBeenCalled() }) diff --git a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts index 63224125133..d2e0657d82f 100644 --- a/plugin-server/tests/worker/ingestion/postgres-parity.test.ts +++ b/plugin-server/tests/worker/ingestion/postgres-parity.test.ts @@ -45,7 +45,7 @@ describe('postgres parity', () => { `) await resetTestDatabaseClickhouse(extraServerConfig) console.log('[TEST] Starting plugins server') - const startResponse = await startPluginsServer(extraServerConfig, makePiscina) + const startResponse = await startPluginsServer(extraServerConfig, makePiscina, { ingestion: true }) hub = startResponse.hub stopServer = startResponse.stop teamId++