0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-22 08:40:03 +01:00
posthog/plugin-server/tests/helpers/kafka.ts

63 lines
2.0 KiB
TypeScript

import { CompressionCodecs, CompressionTypes, Kafka, logLevel } from 'kafkajs'
import SnappyCodec from 'kafkajs-snappy'
import { defaultConfig, overrideWithEnv } from '../../src/config/config'
import {
KAFKA_BUFFER,
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_GROUPS,
KAFKA_PERFORMANCE_EVENTS,
KAFKA_PERSON,
KAFKA_PERSON_DISTINCT_ID,
KAFKA_PERSON_UNIQUE_ID,
KAFKA_PLUGIN_LOG_ENTRIES,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
} from '../../src/config/kafka-topics'
import { PluginsServerConfig } from '../../src/types'
import { KAFKA_EVENTS_DEAD_LETTER_QUEUE } from './../../src/config/kafka-topics'
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
/** Clear the Kafka queue and return Kafka object */
export async function resetKafka(extraServerConfig?: Partial<PluginsServerConfig>): Promise<Kafka> {
const config = { ...overrideWithEnv(defaultConfig, process.env), ...extraServerConfig }
const kafka = new Kafka({
clientId: `plugin-server-test`,
brokers: (config.KAFKA_HOSTS || '').split(','),
logLevel: logLevel.WARN,
})
await createTopics(kafka, [
KAFKA_EVENTS_JSON,
KAFKA_EVENTS_PLUGIN_INGESTION,
KAFKA_BUFFER,
KAFKA_GROUPS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_PERFORMANCE_EVENTS,
KAFKA_PERSON,
KAFKA_PERSON_UNIQUE_ID,
KAFKA_PERSON_DISTINCT_ID,
KAFKA_PLUGIN_LOG_ENTRIES,
KAFKA_EVENTS_DEAD_LETTER_QUEUE,
])
return kafka
}
export async function createTopics(kafka: Kafka, topics: string[]) {
const admin = kafka.admin()
await admin.connect()
const existingTopics = await admin.listTopics()
const topicsToCreate = topics.filter((topic) => !existingTopics.includes(topic)).map((topic) => ({ topic }))
if (topicsToCreate.length > 0) {
await admin.createTopics({
waitForLeaders: true,
topics: topicsToCreate,
})
}
await admin.disconnect()
}