mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-25 11:17:50 +01:00
985148ee7e
* feat: buffer 2.0 proposal * add tests * prevent infinite retrying * perf * updates * tweaks * Update latest_migrations.manifest * Update plugin-server/src/main/ingestion-queues/buffer.ts * update * updates * fix migrations issue * reliability uopdates * fix tests * test fix * e2e test * test * test * ?? * cleanup
98 lines
3.5 KiB
TypeScript
98 lines
3.5 KiB
TypeScript
import IORedis from 'ioredis'
|
|
|
|
import { ONE_HOUR } from '../src/config/constants'
|
|
import { startPluginsServer } from '../src/main/pluginsServer'
|
|
import { LogLevel, PluginsServerConfig } from '../src/types'
|
|
import { Hub } from '../src/types'
|
|
import { UUIDT } from '../src/utils/utils'
|
|
import { makePiscina } from '../src/worker/piscina'
|
|
import { createPosthog, DummyPostHog } from '../src/worker/vm/extensions/posthog'
|
|
import { writeToFile } from '../src/worker/vm/extensions/test-utils'
|
|
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from './helpers/clickhouse'
|
|
import { resetKafka } from './helpers/kafka'
|
|
import { pluginConfig39 } from './helpers/plugins'
|
|
import { resetTestDatabase } from './helpers/sql'
|
|
|
|
const { console: testConsole } = writeToFile
|
|
|
|
jest.mock('../src/utils/status')
|
|
jest.setTimeout(60000) // 60 sec timeout
|
|
|
|
const extraServerConfig: Partial<PluginsServerConfig> = {
|
|
WORKER_CONCURRENCY: 2,
|
|
LOG_LEVEL: LogLevel.Log,
|
|
KAFKA_PRODUCER_MAX_QUEUE_SIZE: 100, // The default in tests is 0 but here we specifically want to test batching
|
|
KAFKA_FLUSH_FREQUENCY_MS: 5000, // Same as above, but with time
|
|
BUFFER_CONVERSION_SECONDS: 3, // We want to test the delay mechanism, but with a much lower delay than in prod
|
|
CONVERSION_BUFFER_ENABLED: true,
|
|
}
|
|
|
|
const indexJs = `
|
|
import { console as testConsole } from 'test-utils/write-to-file'
|
|
|
|
export async function processEvent (event) {
|
|
testConsole.log('processEvent')
|
|
console.info('amogus')
|
|
event.properties.processed = 'hell yes'
|
|
event.properties.upperUuid = event.properties.uuid?.toUpperCase()
|
|
event.properties['$snapshot_data'] = 'no way'
|
|
return event
|
|
}
|
|
|
|
export function onEvent (event, { global }) {
|
|
// we use this to mock setupPlugin being
|
|
// run after some events were already ingested
|
|
global.timestampBoundariesForTeam = {
|
|
max: new Date(),
|
|
min: new Date(Date.now()-${ONE_HOUR})
|
|
}
|
|
testConsole.log('onEvent', event.event)
|
|
}`
|
|
|
|
describe('E2E with buffer enabled', () => {
|
|
let hub: Hub
|
|
let stopServer: () => Promise<void>
|
|
let posthog: DummyPostHog
|
|
let redis: IORedis.Redis
|
|
|
|
beforeEach(async () => {
|
|
testConsole.reset()
|
|
await resetTestDatabase(indexJs)
|
|
await resetTestDatabaseClickhouse(extraServerConfig)
|
|
await resetKafka(extraServerConfig)
|
|
const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
|
|
hub = startResponse.hub
|
|
stopServer = startResponse.stop
|
|
redis = await hub.redisPool.acquire()
|
|
posthog = createPosthog(hub, pluginConfig39)
|
|
})
|
|
|
|
afterEach(async () => {
|
|
await hub.redisPool.release(redis)
|
|
await stopServer()
|
|
})
|
|
|
|
describe('ClickHouse ingestion', () => {
|
|
test('event captured, processed, ingested', async () => {
|
|
expect((await hub.db.fetchEvents()).length).toBe(0)
|
|
|
|
const uuid = new UUIDT().toString()
|
|
|
|
await posthog.capture('custom event via buffer', { name: 'hehe', uuid })
|
|
await hub.kafkaProducer.flush()
|
|
|
|
await delayUntilEventIngested(() => hub.db.fetchEvents(), undefined, undefined, 500)
|
|
const events = await hub.db.fetchEvents()
|
|
|
|
expect(events.length).toBe(1)
|
|
|
|
// processEvent ran and modified
|
|
expect(events[0].properties.processed).toEqual('hell yes')
|
|
expect(events[0].properties.upperUuid).toEqual(uuid.toUpperCase())
|
|
|
|
// onEvent ran
|
|
expect(testConsole.read()).toEqual([['processEvent'], ['onEvent', 'custom event via buffer']])
|
|
})
|
|
})
|
|
})
|