import { KAFKA_EVENTS_PLUGIN_INGESTION } from '../src/config/kafka-topics' import { startPluginsServer } from '../src/main/pluginsServer' import { Hub, LogLevel, PluginsServerConfig } from '../src/types' import { delay, UUIDT } from '../src/utils/utils' import { makePiscina } from '../src/worker/piscina' import { createPosthog, DummyPostHog } from '../src/worker/vm/extensions/posthog' import { delayUntilEventIngested, resetTestDatabaseClickhouse } from './helpers/clickhouse' import { resetKafka } from './helpers/kafka' import { pluginConfig39 } from './helpers/plugins' import { resetTestDatabase } from './helpers/sql' jest.setTimeout(60000) // 60 sec timeout const extraServerConfig: Partial = { TASK_TIMEOUT: 2, WORKER_CONCURRENCY: 2, KAFKA_CONSUMPTION_TOPIC: KAFKA_EVENTS_PLUGIN_INGESTION, LOG_LEVEL: LogLevel.Log, } describe('e2e ingestion timeout', () => { let hub: Hub let stopServer: () => Promise let posthog: DummyPostHog beforeAll(async () => { await resetKafka(extraServerConfig) }) beforeEach(async () => { await resetTestDatabase(` async function processEvent (event) { await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) await new Promise(resolve => __jestSetTimeout(() => resolve(), 800)) event.properties = { passed: true } return event } `) await resetTestDatabaseClickhouse(extraServerConfig) const startResponse = await startPluginsServer(extraServerConfig, makePiscina, { ingestion: true }) hub = startResponse.hub stopServer = startResponse.stop posthog = createPosthog(hub, pluginConfig39) }) afterEach(async () => { await stopServer() }) test('event captured, processed, ingested', async () => { expect((await hub.db.fetchEvents()).length).toBe(0) const uuid = new UUIDT().toString() await posthog.capture('custom event', { name: 'haha', uuid, randomProperty: 'lololo' }) await delayUntilEventIngested(() => hub.db.fetchEvents()) await hub.kafkaProducer.flush() const events = await hub.db.fetchEvents() await delay(1000) expect(events.length).toBe(1) expect(events[0].properties.name).toEqual('haha') expect(events[0].properties.passed).not.toEqual(true) }) })