mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-30 19:41:46 +01:00
1e6c062095
* chore(plugin-server): disrtibute scheduled tasks
Changes I've made here from the original PR:
1. add some logging of task run times
2. add concurrency, except only one task of a plugin will run at a time
3. add a timeout to task run times
This reverts commit 23db43a0dc
.
* chore: add timings for scheduled tasks runtime
* chore: add timeouts for scheduled tasks
* chore: clarify duration unit
* chore: deduplicate tasks in a batch, add partition concurrency
* chore: add flag to switch between old and new behaviour
This defaults to new, but can be set to old by setting environment
variable `USE_KAFKA_FOR_SCHEDULED_TASKS=false`
* fix tests
* enable USE_KAFKA_FOR_SCHEDULED_TASKS in tests
47 lines
1.8 KiB
TypeScript
47 lines
1.8 KiB
TypeScript
import { assert } from 'console'
|
|
import { Kafka, logLevel } from 'kafkajs'
|
|
|
|
import { defaultConfig } from '../src/config/config'
|
|
|
|
export default async function () {
|
|
// Ensure add consumer groups hae zero lag. This is intended to catch cases
|
|
// where we have managed to process a message but then failed to commit the
|
|
// offset, leaving the consumer group in a bad state.
|
|
|
|
const kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })
|
|
const admin = kafka.admin()
|
|
try {
|
|
await admin.connect()
|
|
const topics = await admin.listTopics()
|
|
const topicOffsets = Object.fromEntries(
|
|
await Promise.all(topics.map(async (topic) => [topic, await admin.fetchTopicOffsets(topic)]))
|
|
)
|
|
|
|
const { groups } = await admin.listGroups()
|
|
const consumerGroupOffsets = await Promise.all(
|
|
groups.map(async ({ groupId }) => [groupId, await admin.fetchOffsets({ groupId })] as const)
|
|
)
|
|
|
|
for (const [groupId, offsets] of consumerGroupOffsets) {
|
|
for (const { topic, partitions } of offsets) {
|
|
for (const { partition, offset } of partitions) {
|
|
console.debug(
|
|
`Checking ${groupId} ${topic} ${partition} ${offset} ${topicOffsets[topic][partition].offset}`
|
|
)
|
|
assert(
|
|
topicOffsets[topic][partition].offset === offset,
|
|
`Consumer group ${groupId} has lag on ${topic}[${partition}]: ${{
|
|
lastOffset: topicOffsets[topic][partition].offset,
|
|
consumerOffset: offset,
|
|
}}`
|
|
)
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
throw error
|
|
} finally {
|
|
await admin.disconnect()
|
|
}
|
|
}
|