mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-25 11:17:50 +01:00
30bafdd382
* cleanup: remove unused team arg from registerLastStep * cleanup: rename promises to ackPromises to make it more clear thats what they are * cleanup(plugin-server): make waitForAck explicit/required * add Kafka produce/ack metrics * Clarify Kafka produce metric/labels
53 lines
1.3 KiB
TypeScript
53 lines
1.3 KiB
TypeScript
import { CompressionCodecs, CompressionTypes } from 'kafkajs'
|
|
import SnappyCodec from 'kafkajs-snappy'
|
|
import { HighLevelProducer } from 'node-rdkafka'
|
|
|
|
import { defaultConfig } from '../src/config/config'
|
|
import { produce as defaultProduce } from '../src/kafka/producer'
|
|
|
|
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
|
|
|
|
let producer: HighLevelProducer
|
|
|
|
beforeAll(async () => {
|
|
producer = await createKafkaProducer()
|
|
})
|
|
|
|
afterAll(async () => {
|
|
await new Promise((resolve, reject) =>
|
|
producer?.disconnect((error, data) => {
|
|
return error ? reject(error) : resolve(data)
|
|
})
|
|
)
|
|
})
|
|
|
|
export async function createKafkaProducer() {
|
|
producer = new HighLevelProducer({
|
|
'metadata.broker.list': defaultConfig.KAFKA_HOSTS,
|
|
'linger.ms': 0,
|
|
})
|
|
|
|
await new Promise((resolve, reject) =>
|
|
producer.connect(undefined, (error, data) => {
|
|
return error ? reject(error) : resolve(data)
|
|
})
|
|
)
|
|
|
|
return producer
|
|
}
|
|
|
|
export async function produce({
|
|
topic,
|
|
message,
|
|
key,
|
|
waitForAck,
|
|
}: {
|
|
topic: string
|
|
message: Buffer | null
|
|
key: string
|
|
waitForAck: boolean
|
|
}) {
|
|
producer = producer ?? (await createKafkaProducer())
|
|
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key), waitForAck })
|
|
}
|