0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

feat: Part 1 of ability to spin up different pipelines (#23601)

This commit is contained in:
Tiina Turban 2024-07-17 14:11:49 +02:00 committed by GitHub
parent 87504a1c62
commit 60ba76d9c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 272 additions and 16 deletions

View File

@ -61,6 +61,7 @@ testing:
DATABASE_URL=postgres://posthog:posthog@localhost:5432/test_posthog \
PLUGINS_DEFAULT_LOG_LEVEL=0 \
RELOAD_PLUGIN_JITTER_MAX_MS=0 \
PLUGIN_SERVER_MODE=functional-tests \
pnpm start:dev
```
1. run the tests:

View File

@ -19,6 +19,7 @@ export APP_METRICS_FLUSH_FREQUENCY_MS=0 # Reduce the potential for spurious erro
export APP_METRICS_GATHERED_FOR_ALL=true
export PLUGINS_DEFAULT_LOG_LEVEL=0 # All logs, as debug logs are used in synchronization barriers
export NODE_ENV=production-functional-tests
export PLUGIN_SERVER_MODE=functional-tests # running all capabilities is too slow
# Not important at all, but I like to see nice red/green for tests
export FORCE_COLOR=true

View File

@ -1,5 +1,14 @@
import { UUIDT } from '../../src/utils/utils'
import { capture, createOrganization, createTeam, fetchEvents, fetchGroups, fetchPersons, getMetric } from '../api'
import {
capture,
createOrganization,
createTeam,
fetchEvents,
fetchGroups,
fetchIngestionWarnings,
fetchPersons,
getMetric,
} from '../api'
import { waitForExpect } from '../expectations'
let organizationId: string
@ -8,6 +17,32 @@ beforeAll(async () => {
organizationId = await createOrganization()
})
test.concurrent(`event ingestion: handles $$client_ingestion_warning events`, async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()
await capture({
teamId,
distinctId,
uuid: new UUIDT().toString(),
event: '$$client_ingestion_warning',
properties: {
$$client_ingestion_warning_message: 'test message',
},
})
await waitForExpect(async () => {
const events = await fetchIngestionWarnings(teamId)
expect(events).toEqual([
expect.objectContaining({
type: 'client_ingestion_warning',
team_id: teamId,
details: expect.objectContaining({ message: 'test message' }),
}),
])
})
})
test.concurrent(`event ingestion: can set and update group properties`, async () => {
const teamId = await createTeam(organizationId)
const distinctId = new UUIDT().toString()

View File

@ -64,6 +64,8 @@ export const capture = async ({
$set_once = undefined,
topic = ['$performance_event', '$snapshot_items'].includes(event)
? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
: ['$$client_ingestion_warning'].includes(event)
? 'client_iwarnings_ingestion'
: 'events_plugin_ingestion',
}: {
teamId: number | null
@ -266,6 +268,16 @@ export const reloadAction = async (teamId: number, actionId: number) => {
await redis.publish('reload-action', JSON.stringify({ teamId, actionId }))
}
export const fetchIngestionWarnings = async (teamId: number) => {
const queryResult = (await clickHouseClient.querying(`
SELECT *,
FROM ingestion_warnings
WHERE team_id = ${teamId}
ORDER BY timestamp ASC
`)) as unknown as ClickHouse.ObjectQueryResult<any>
return queryResult.data.map((warning) => ({ ...warning, details: JSON.parse(warning.details) }))
}
export const fetchEvents = async (teamId: number, uuid?: string) => {
const queryResult = (await clickHouseClient.querying(`
SELECT *,

View File

@ -14,6 +14,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
ingestion: true,
ingestionOverflow: true,
ingestionHistorical: true,
eventsIngestionPipelines: true, // with null PluginServerMode we run all of them
pluginScheduledTasks: true,
processPluginJobs: true,
processAsyncOnEventHandlers: true,
@ -47,6 +48,12 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
ingestionHistorical: true,
...sharedCapabilities,
}
case PluginServerMode.events_ingestion:
return {
mmdb: true,
eventsIngestionPipelines: true,
...sharedCapabilities,
}
case PluginServerMode.analytics_ingestion:
return {
mmdb: true,
@ -99,5 +106,22 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpFunctionOverflow: true,
...sharedCapabilities,
}
// This is only for functional tests, which time out if all capabilities are used
// ideally we'd run just the specific capability needed per test, but that's not easy to do atm
case PluginServerMode.functional_tests:
return {
mmdb: true,
ingestion: true,
ingestionHistorical: true,
eventsIngestionPipelines: true,
pluginScheduledTasks: true,
processPluginJobs: true,
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
sessionRecordingBlobIngestion: true,
appManagementSingleton: true,
preflightSchedules: true,
...sharedCapabilities,
}
}
}

View File

@ -119,6 +119,7 @@ export function getDefaultConfig(): PluginsServerConfig {
OBJECT_STORAGE_SECRET_ACCESS_KEY: 'object_storage_root_password',
OBJECT_STORAGE_BUCKET: 'posthog',
PLUGIN_SERVER_MODE: null,
PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE: null,
PLUGIN_LOAD_SEQUENTIALLY: false,
KAFKAJS_LOG_LEVEL: 'WARN',
APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false,

View File

@ -2,7 +2,7 @@
import { isTestEnv } from '../utils/env-utils'
const suffix = isTestEnv() ? '_test' : ''
export const suffix = isTestEnv() ? '_test' : ''
export const prefix = process.env.KAFKA_PREFIX || ''
export const KAFKA_EVENTS_JSON = `${prefix}clickhouse_events_json${suffix}`

View File

@ -33,6 +33,7 @@ export enum IngestionOverflowMode {
RerouteRandomly, // discards partition locality
ConsumeSplitByDistinctId,
ConsumeSplitEvenly,
ConsumeSplitEventlyWithoutIngestionWarning,
}
type IngestionSplitBatch = {
@ -300,10 +301,12 @@ export function splitIngestionBatch(
overflowMode
)
if (overflowMode === IngestionOverflowMode.ConsumeSplitEvenly) {
if (
overflowMode === IngestionOverflowMode.ConsumeSplitEvenly ||
overflowMode === IngestionOverflowMode.ConsumeSplitEventlyWithoutIngestionWarning
) {
/**
* Grouping by distinct_id is inefficient here, because only a few ones are overflowing
* at a time. When messages are sent to overflow, we already give away the ordering guarantee,
* Grouping by distinct_id is not necessary here, we already give away the ordering guarantee,
* so we just return batches of one to increase concurrency.
* TODO: add a PipelineEvent[] field to IngestionSplitBatch for batches of 1
*/

View File

@ -0,0 +1,63 @@
import { Message } from 'node-rdkafka'
import { buildStringMatcher } from '../../config/config'
import { prefix as KAFKA_PREFIX, suffix as KAFKA_SUFFIX } from '../../config/kafka-topics'
import { Hub } from '../../types'
import { status } from '../../utils/status'
import { eachBatchParallelIngestion, IngestionOverflowMode } from './batch-processing/each-batch-ingestion'
import { IngestionConsumer } from './kafka-queue'
export type PipelineType = {
topic: string
consumer_group: string
}
export const PIPELINES: { [key: string]: PipelineType } = {
ingestion_warnings: {
topic: 'client_iwarnings_ingestion',
consumer_group: 'client_iwarnings_ingestion',
},
heatmaps: {
topic: 'heatmaps_ingestion',
consumer_group: 'heatmaps_ingestion',
},
exceptions: {
topic: 'exceptions_ingestion',
consumer_group: 'exceptions_ingestion',
},
}
export const startEventsIngestionPipelineConsumer = async ({
hub, // TODO: remove needing to pass in the whole hub and be more selective on dependency injection.
pipeline,
}: {
hub: Hub
pipeline: PipelineType
}) => {
/*
Consumes events from the topic and consumer passed in.
*/
const kafka_topic = `${KAFKA_PREFIX}${pipeline.topic}${KAFKA_SUFFIX}`
const kafka_consumer = `${KAFKA_PREFIX}${pipeline.consumer_group}`
status.info(
'🔁',
`Starting events ingestion pipeline on topic ${kafka_topic} consumer ${kafka_consumer} with rdkafka`
)
const tokenBlockList = buildStringMatcher(hub.DROP_EVENTS_BY_TOKEN, false)
// No overflow and split all events evenly, i.e. there's no ordering guarantees here.
const batchHandler = async (messages: Message[], queue: IngestionConsumer): Promise<void> => {
await eachBatchParallelIngestion(
tokenBlockList,
messages,
queue,
IngestionOverflowMode.ConsumeSplitEventlyWithoutIngestionWarning
)
}
const queue = new IngestionConsumer(hub, kafka_topic, kafka_consumer, batchHandler)
const { isHealthy } = await queue.start()
return { queue, isHealthy }
}

View File

@ -34,6 +34,11 @@ import { startGraphileWorker } from './graphile-worker/worker-setup'
import { startAnalyticsEventsIngestionConsumer } from './ingestion-queues/analytics-events-ingestion-consumer'
import { startAnalyticsEventsIngestionHistoricalConsumer } from './ingestion-queues/analytics-events-ingestion-historical-consumer'
import { startAnalyticsEventsIngestionOverflowConsumer } from './ingestion-queues/analytics-events-ingestion-overflow-consumer'
import {
PIPELINES,
PipelineType,
startEventsIngestionPipelineConsumer,
} from './ingestion-queues/events-ingestion-consumer'
import { startJobsConsumer } from './ingestion-queues/jobs-consumer'
import { IngestionConsumer, KafkaJSIngestionConsumer } from './ingestion-queues/kafka-queue'
import {
@ -102,6 +107,7 @@ export async function startPluginsServer(
let analyticsEventsIngestionConsumer: IngestionConsumer | undefined
let analyticsEventsIngestionOverflowConsumer: IngestionConsumer | undefined
let analyticsEventsIngestionHistoricalConsumer: IngestionConsumer | undefined
let eventsIngestionConsumer: Map<string, IngestionConsumer> | undefined
let onEventHandlerConsumer: KafkaJSIngestionConsumer | undefined
let stopWebhooksHandlerConsumer: () => Promise<void> | undefined
@ -149,6 +155,7 @@ export async function startPluginsServer(
analyticsEventsIngestionConsumer?.stop(),
analyticsEventsIngestionOverflowConsumer?.stop(),
analyticsEventsIngestionHistoricalConsumer?.stop(),
...Array.from(eventsIngestionConsumer?.values() || []).map((consumer) => consumer.stop()),
onEventHandlerConsumer?.stop(),
stopWebhooksHandlerConsumer?.(),
bufferConsumer?.disconnect(),
@ -334,6 +341,36 @@ export async function startPluginsServer(
healthChecks['analytics-ingestion-historical'] = isAnalyticsEventsIngestionHistoricalHealthy
}
if (capabilities.eventsIngestionPipelines) {
async function start(pipelineKey: string, pipeline: PipelineType) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
serverInstance = serverInstance ? serverInstance : { hub }
piscina = piscina ?? (await makePiscina(serverConfig, hub))
const { queue, isHealthy: isHealthy } = await startEventsIngestionPipelineConsumer({
hub: hub,
pipeline: pipeline,
})
eventsIngestionConsumer = eventsIngestionConsumer ?? new Map<string, IngestionConsumer>()
eventsIngestionConsumer.set(pipelineKey, queue)
shutdownOnConsumerExit(eventsIngestionConsumer.get(pipelineKey)!.consumer!)
healthChecks[`events-ingestion-pipeline-${pipelineKey}`] = isHealthy
}
if (serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE === null) {
for (const pipelineKey in PIPELINES) {
await start(pipelineKey, PIPELINES[pipelineKey])
}
} else {
// Validate we have a valid pipeline
const pipelineKey = serverConfig.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE
if (pipelineKey === null || !PIPELINES[pipelineKey]) {
throw new Error(`Invalid events ingestion pipeline: ${pipelineKey}`)
}
const pipeline: PipelineType = PIPELINES[pipelineKey]
await start(pipelineKey, pipeline)
}
}
if (capabilities.ingestionOverflow) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
serverInstance = serverInstance ? serverInstance : { hub }

View File

@ -39,6 +39,7 @@ export function initSentry(config: PluginsServerConfig): void {
tags: {
PLUGIN_SERVER_MODE: config.PLUGIN_SERVER_MODE,
DEPLOYMENT: config.CLOUD_DEPLOYMENT,
PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE: config.PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE,
},
},
release,

View File

@ -74,6 +74,7 @@ export enum PluginServerMode {
ingestion = 'ingestion',
ingestion_overflow = 'ingestion-overflow',
ingestion_historical = 'ingestion-historical',
events_ingestion = 'events-ingestion',
async_onevent = 'async-onevent',
async_webhooks = 'async-webhooks',
jobs = 'jobs',
@ -84,6 +85,7 @@ export enum PluginServerMode {
cdp_processed_events = 'cdp-processed-events',
cdp_function_callbacks = 'cdp-function-callbacks',
cdp_function_overflow = 'cdp-function-overflow',
functional_tests = 'functional-tests',
}
export const stringToPluginServerMode = Object.fromEntries(
@ -204,6 +206,7 @@ export interface PluginsServerConfig extends CdpConfig {
OBJECT_STORAGE_SECRET_ACCESS_KEY: string
OBJECT_STORAGE_BUCKET: string // the object storage bucket name
PLUGIN_SERVER_MODE: PluginServerMode | null
PLUGIN_SERVER_EVENTS_INGESTION_PIPELINE: string | null // TODO: shouldn't be a string probably
PLUGIN_LOAD_SEQUENTIALLY: boolean // could help with reducing memory usage spikes on startup
KAFKAJS_LOG_LEVEL: 'NOTHING' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR'
APP_METRICS_GATHERED_FOR_ALL: boolean // whether to gather app metrics for all teams
@ -325,6 +328,7 @@ export interface PluginServerCapabilities {
ingestion?: boolean
ingestionOverflow?: boolean
ingestionHistorical?: boolean
eventsIngestionPipelines?: boolean
pluginScheduledTasks?: boolean
processPluginJobs?: boolean
processAsyncOnEventHandlers?: boolean

View File

@ -40,7 +40,7 @@ describe('e2e ingestion timeout', () => {
}
`)
await resetTestDatabaseClickhouse(extraServerConfig)
const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
const startResponse = await startPluginsServer(extraServerConfig, makePiscina, { ingestion: true })
hub = startResponse.hub
stopServer = startResponse.stop
posthog = createPosthog(hub, pluginConfig39)

View File

@ -19,10 +19,7 @@ function numberOfScheduledJobs() {
describe('server', () => {
let pluginsServer: Partial<ServerInstance> | null = null
function createPluginServer(
config: Partial<PluginsServerConfig> = {},
capabilities: PluginServerCapabilities | undefined = undefined
) {
function createPluginServer(config: Partial<PluginsServerConfig>, capabilities: PluginServerCapabilities) {
return startPluginsServer(
{
WORKER_CONCURRENCY: 2,
@ -43,20 +40,94 @@ describe('server', () => {
pluginsServer = null
})
test('startPluginsServer does not error', async () => {
// Running all capabilities together takes too long in tests, so they are split up
test('startPluginsServer does not error - ingestion', async () => {
const testCode = `
async function processEvent (event) {
return event
}
`
await resetTestDatabase(testCode)
pluginsServer = await createPluginServer()
pluginsServer = await createPluginServer(
{},
{
http: true,
mmdb: true,
ingestion: true,
ingestionOverflow: true,
ingestionHistorical: true,
appManagementSingleton: true,
preflightSchedules: true,
}
)
})
test('startPluginsServer does not error - pipelines', async () => {
const testCode = `
async function processEvent (event) {
return event
}
`
await resetTestDatabase(testCode)
pluginsServer = await createPluginServer(
{},
{
http: true,
eventsIngestionPipelines: true,
}
)
})
test('startPluginsServer does not error - cdp', async () => {
const testCode = `
async function processEvent (event) {
return event
}
`
await resetTestDatabase(testCode)
pluginsServer = await createPluginServer(
{},
{
http: true,
pluginScheduledTasks: true,
processPluginJobs: true,
processAsyncOnEventHandlers: true,
processAsyncWebhooksHandlers: true,
cdpProcessedEvents: true,
cdpFunctionCallbacks: true,
cdpFunctionOverflow: true,
}
)
})
test('startPluginsServer does not error - replay', async () => {
const testCode = `
async function processEvent (event) {
return event
}
`
await resetTestDatabase(testCode)
pluginsServer = await createPluginServer(
{},
{
http: true,
sessionRecordingBlobIngestion: true,
sessionRecordingBlobOverflowIngestion: true,
}
)
})
test('starting and stopping node-schedule scheduled jobs', async () => {
expect(numberOfScheduledJobs()).toEqual(0)
pluginsServer = await createPluginServer()
pluginsServer = await createPluginServer(
{},
{
http: true,
pluginScheduledTasks: true,
processAsyncWebhooksHandlers: true,
preflightSchedules: true,
}
)
expect(numberOfScheduledJobs()).toBeGreaterThan(1)
@ -67,8 +138,11 @@ describe('server', () => {
})
describe('plugin-server capabilities', () => {
test('starts all main services by default', async () => {
pluginsServer = await createPluginServer()
test('starts graphile for scheduled tasks capability', async () => {
pluginsServer = await createPluginServer(
{},
{ ingestion: true, pluginScheduledTasks: true, processPluginJobs: true }
)
expect(startGraphileWorker).toHaveBeenCalled()
})

View File

@ -45,7 +45,7 @@ describe('postgres parity', () => {
`)
await resetTestDatabaseClickhouse(extraServerConfig)
console.log('[TEST] Starting plugins server')
const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
const startResponse = await startPluginsServer(extraServerConfig, makePiscina, { ingestion: true })
hub = startResponse.hub
stopServer = startResponse.stop
teamId++