0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-25 11:17:50 +01:00
posthog/plugin-server/functional_tests/kafka.ts
Brett Hoerner 30bafdd382
chore(plugin-server): kafka ack cleanup and metric (#21111)
* cleanup: remove unused team arg from registerLastStep

* cleanup: rename promises to ackPromises to make it more clear thats what they are

* cleanup(plugin-server): make waitForAck explicit/required

* add Kafka produce/ack metrics

* Clarify Kafka produce metric/labels
2024-03-25 13:01:15 +00:00

53 lines
1.3 KiB
TypeScript

import { CompressionCodecs, CompressionTypes } from 'kafkajs'
import SnappyCodec from 'kafkajs-snappy'
import { HighLevelProducer } from 'node-rdkafka'
import { defaultConfig } from '../src/config/config'
import { produce as defaultProduce } from '../src/kafka/producer'
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
let producer: HighLevelProducer
beforeAll(async () => {
producer = await createKafkaProducer()
})
afterAll(async () => {
await new Promise((resolve, reject) =>
producer?.disconnect((error, data) => {
return error ? reject(error) : resolve(data)
})
)
})
export async function createKafkaProducer() {
producer = new HighLevelProducer({
'metadata.broker.list': defaultConfig.KAFKA_HOSTS,
'linger.ms': 0,
})
await new Promise((resolve, reject) =>
producer.connect(undefined, (error, data) => {
return error ? reject(error) : resolve(data)
})
)
return producer
}
export async function produce({
topic,
message,
key,
waitForAck,
}: {
topic: string
message: Buffer | null
key: string
waitForAck: boolean
}) {
producer = producer ?? (await createKafkaProducer())
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key), waitForAck })
}