0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-25 02:49:32 +01:00
posthog/plugin-server/functional_tests/jobs-consumer.test.ts
Harry Waye 0869801a8e
chore(plugin-server): split functional tests into feature based files (#13031)
* chore(plugin-server): split functional tests into feature based files

This is intended to make it more obvious what we are testing, and to try
and identify the major themes of the plugin-server functionality.

As a by product it should make things more parallelizable for jest as
the tests in different files will be isolated, runnable in separate
workers.

* use random api token, avoid db constraints

* make tests silent

* format

* chore: set number of jest workers

These tests should be pretty light given they just hit other APIs and
don't do much themselves. Memory could be an issue on constrained
environments. We shall see.
2022-11-30 12:49:17 +00:00

87 lines
2.5 KiB
TypeScript

import Redis from 'ioredis'
import { Consumer, Kafka, KafkaMessage, logLevel, Partitioners, Producer } from 'kafkajs'
import { Pool } from 'pg'
import { v4 as uuidv4 } from 'uuid'
import { defaultConfig } from '../src/config/config'
import { delayUntilEventIngested } from '../tests/helpers/clickhouse'
let producer: Producer
let postgres: Pool // NOTE: we use a Pool here but it's probably not necessary, but for instance `insertRow` uses a Pool.
let kafka: Kafka
let redis: Redis.Redis
beforeAll(async () => {
// Setup connections to kafka, clickhouse, and postgres
postgres = new Pool({
connectionString: defaultConfig.DATABASE_URL!,
// We use a pool only for typings sake, but we don't actually need to,
// so set max connections to 1.
max: 1,
})
kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })
producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })
await producer.connect()
redis = new Redis(defaultConfig.REDIS_URL)
})
afterAll(async () => {
await Promise.all([producer.disconnect(), postgres.end(), redis.disconnect()])
})
// Test out some error cases that we wouldn't be able to handle without
// producing to the jobs queue directly.
let dlq: KafkaMessage[]
let dlqConsumer: Consumer
beforeAll(async () => {
dlq = []
dlqConsumer = kafka.consumer({ groupId: 'jobs-consumer-test' })
await dlqConsumer.subscribe({ topic: 'jobs_dlq' })
await dlqConsumer.run({
eachMessage: ({ message }) => {
dlq.push(message)
return Promise.resolve()
},
})
})
afterAll(async () => {
await dlqConsumer.disconnect()
})
test.concurrent(`jobs-consumer: handles empty messages`, async () => {
const key = uuidv4()
await producer.send({
topic: 'jobs',
messages: [
{
key: key,
value: null,
},
],
})
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
expect(messages.length).toBe(1)
})
test.concurrent(`jobs-consumer: handles invalid JSON`, async () => {
const key = uuidv4()
await producer.send({
topic: 'jobs',
messages: [
{
key: key,
value: 'invalid json',
},
],
})
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
expect(messages.length).toBe(1)
})