mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-24 18:07:17 +01:00
feat(plugin-server): distribute scheduled tasks i.e. runEveryX (#13124)
* chore(plugin-server): disrtibute scheduled tasks
Changes I've made here from the original PR:
1. add some logging of task run times
2. add concurrency, except only one task of a plugin will run at a time
3. add a timeout to task run times
This reverts commit 23db43a0dc
.
* chore: add timings for scheduled tasks runtime
* chore: add timeouts for scheduled tasks
* chore: clarify duration unit
* chore: deduplicate tasks in a batch, add partition concurrency
* chore: add flag to switch between old and new behaviour
This defaults to new, but can be set to old by setting environment
variable `USE_KAFKA_FOR_SCHEDULED_TASKS=false`
* fix tests
* enable USE_KAFKA_FOR_SCHEDULED_TASKS in tests
This commit is contained in:
parent
6afb5dc3eb
commit
1e6c062095
46
plugin-server/functional_tests/jest.global-teardown.ts
Normal file
46
plugin-server/functional_tests/jest.global-teardown.ts
Normal file
@ -0,0 +1,46 @@
|
||||
import { assert } from 'console'
|
||||
import { Kafka, logLevel } from 'kafkajs'
|
||||
|
||||
import { defaultConfig } from '../src/config/config'
|
||||
|
||||
export default async function () {
|
||||
// Ensure add consumer groups hae zero lag. This is intended to catch cases
|
||||
// where we have managed to process a message but then failed to commit the
|
||||
// offset, leaving the consumer group in a bad state.
|
||||
|
||||
const kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })
|
||||
const admin = kafka.admin()
|
||||
try {
|
||||
await admin.connect()
|
||||
const topics = await admin.listTopics()
|
||||
const topicOffsets = Object.fromEntries(
|
||||
await Promise.all(topics.map(async (topic) => [topic, await admin.fetchTopicOffsets(topic)]))
|
||||
)
|
||||
|
||||
const { groups } = await admin.listGroups()
|
||||
const consumerGroupOffsets = await Promise.all(
|
||||
groups.map(async ({ groupId }) => [groupId, await admin.fetchOffsets({ groupId })] as const)
|
||||
)
|
||||
|
||||
for (const [groupId, offsets] of consumerGroupOffsets) {
|
||||
for (const { topic, partitions } of offsets) {
|
||||
for (const { partition, offset } of partitions) {
|
||||
console.debug(
|
||||
`Checking ${groupId} ${topic} ${partition} ${offset} ${topicOffsets[topic][partition].offset}`
|
||||
)
|
||||
assert(
|
||||
topicOffsets[topic][partition].offset === offset,
|
||||
`Consumer group ${groupId} has lag on ${topic}[${partition}]: ${{
|
||||
lastOffset: topicOffsets[topic][partition].offset,
|
||||
consumerOffset: offset,
|
||||
}}`
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
throw error
|
||||
} finally {
|
||||
await admin.disconnect()
|
||||
}
|
||||
}
|
120
plugin-server/functional_tests/scheduled-tasks-runner.test.ts
Normal file
120
plugin-server/functional_tests/scheduled-tasks-runner.test.ts
Normal file
@ -0,0 +1,120 @@
|
||||
import Redis from 'ioredis'
|
||||
import { Consumer, Kafka, KafkaMessage, logLevel, Partitioners, Producer } from 'kafkajs'
|
||||
import { Pool } from 'pg'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
|
||||
import { defaultConfig } from '../src/config/config'
|
||||
import { delayUntilEventIngested } from '../tests/helpers/clickhouse'
|
||||
|
||||
let producer: Producer
|
||||
let postgres: Pool // NOTE: we use a Pool here but it's probably not necessary, but for instance `insertRow` uses a Pool.
|
||||
let kafka: Kafka
|
||||
let redis: Redis.Redis
|
||||
|
||||
beforeAll(async () => {
|
||||
// Setup connections to kafka, clickhouse, and postgres
|
||||
postgres = new Pool({
|
||||
connectionString: defaultConfig.DATABASE_URL!,
|
||||
// We use a pool only for typings sake, but we don't actually need to,
|
||||
// so set max connections to 1.
|
||||
max: 1,
|
||||
})
|
||||
kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })
|
||||
producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })
|
||||
await producer.connect()
|
||||
redis = new Redis(defaultConfig.REDIS_URL)
|
||||
})
|
||||
|
||||
afterAll(async () => {
|
||||
await Promise.all([producer.disconnect(), postgres.end(), redis.disconnect()])
|
||||
})
|
||||
|
||||
// Test out some error cases that we wouldn't be able to handle without
|
||||
// producing to the jobs queue directly.
|
||||
|
||||
let dlq: KafkaMessage[]
|
||||
let dlqConsumer: Consumer
|
||||
|
||||
beforeAll(async () => {
|
||||
dlq = []
|
||||
dlqConsumer = kafka.consumer({ groupId: 'scheduled-tasks-consumer-test' })
|
||||
await dlqConsumer.subscribe({ topic: 'scheduled_tasks_dlq' })
|
||||
await dlqConsumer.run({
|
||||
eachMessage: ({ message }) => {
|
||||
dlq.push(message)
|
||||
return Promise.resolve()
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
afterAll(async () => {
|
||||
await dlqConsumer.disconnect()
|
||||
})
|
||||
|
||||
test.concurrent(`handles empty messages`, async () => {
|
||||
const key = uuidv4()
|
||||
|
||||
await producer.send({
|
||||
topic: 'scheduled_tasks',
|
||||
messages: [
|
||||
{
|
||||
key: key,
|
||||
value: null,
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
|
||||
expect(messages.length).toBe(1)
|
||||
})
|
||||
|
||||
test.concurrent(`handles invalid JSON`, async () => {
|
||||
const key = uuidv4()
|
||||
|
||||
await producer.send({
|
||||
topic: 'scheduled_tasks',
|
||||
messages: [
|
||||
{
|
||||
key: key,
|
||||
value: 'invalid json',
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
|
||||
expect(messages.length).toBe(1)
|
||||
})
|
||||
|
||||
test.concurrent(`handles invalid taskType`, async () => {
|
||||
const key = uuidv4()
|
||||
|
||||
await producer.send({
|
||||
topic: 'scheduled_tasks',
|
||||
messages: [
|
||||
{
|
||||
key: key,
|
||||
value: JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 }),
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
|
||||
expect(messages.length).toBe(1)
|
||||
})
|
||||
|
||||
test.concurrent(`handles invalid pluginConfigId`, async () => {
|
||||
const key = uuidv4()
|
||||
|
||||
await producer.send({
|
||||
topic: 'scheduled_tasks',
|
||||
messages: [
|
||||
{
|
||||
key: key,
|
||||
value: JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' }),
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
|
||||
expect(messages.length).toBe(1)
|
||||
})
|
@ -14,6 +14,7 @@ module.exports = {
|
||||
testEnvironment: 'node',
|
||||
testMatch: ['<rootDir>/functional_tests/**/*.test.ts'],
|
||||
setupFilesAfterEnv: ['<rootDir>/functional_tests/jest.setup.ts'],
|
||||
globalTeardown: '<rootDir>/functional_tests/jest.global-teardown.ts',
|
||||
testTimeout: 60000,
|
||||
maxConcurrency: 10,
|
||||
maxWorkers: 6,
|
||||
|
@ -103,6 +103,7 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: 1.5,
|
||||
APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false,
|
||||
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0,
|
||||
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,6 +185,7 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
|
||||
OBJECT_STORAGE_BUCKET: 'the object storage bucket name',
|
||||
HISTORICAL_EXPORTS_ENABLED: 'enables historical exports for export apps',
|
||||
APP_METRICS_GATHERED_FOR_ALL: 'whether to gather app metrics for all teams',
|
||||
USE_KAFKA_FOR_SCHEDULED_TASKS: 'distribute scheduled tasks across the scheduler workers',
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,4 +19,6 @@ export const KAFKA_INGESTION_WARNINGS = `${prefix}clickhouse_ingestion_warnings$
|
||||
export const KAFKA_APP_METRICS = `${prefix}clickhouse_app_metrics${suffix}`
|
||||
export const KAFKA_JOBS = `${prefix}jobs${suffix}`
|
||||
export const KAFKA_JOBS_DLQ = `${prefix}jobs_dlq${suffix}`
|
||||
export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}`
|
||||
export const KAFKA_SCHEDULED_TASKS_DLQ = `${prefix}scheduled_tasks_dlq${suffix}`
|
||||
export const KAFKA_METRICS_TIME_TO_SEE_DATA = `${prefix}clickhouse_metrics_time_to_see_data${suffix}`
|
||||
|
@ -1,9 +1,13 @@
|
||||
import Piscina from '@posthog/piscina'
|
||||
import { JobHelpers } from 'graphile-worker'
|
||||
|
||||
import { KAFKA_SCHEDULED_TASKS } from '../../config/kafka-topics'
|
||||
import { Hub, PluginConfigId } from '../../types'
|
||||
import { status } from '../../utils/status'
|
||||
import { delay } from '../../utils/utils'
|
||||
|
||||
type TaskTypes = 'runEveryMinute' | 'runEveryHour' | 'runEveryDay'
|
||||
|
||||
export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000): Promise<Hub['pluginSchedule']> {
|
||||
let allThreadsReady = false
|
||||
while (maxIterations--) {
|
||||
@ -28,10 +32,43 @@ export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000)
|
||||
throw new Error('Could not load plugin schedule in time')
|
||||
}
|
||||
|
||||
export async function runScheduledTasks(server: Hub, piscina: Piscina, taskType: string): Promise<void> {
|
||||
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
|
||||
status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`)
|
||||
await piscina.run({ task: taskType, args: { pluginConfigId } })
|
||||
server.statsd?.increment('completed_scheduled_task', { taskType })
|
||||
export async function runScheduledTasks(
|
||||
server: Hub,
|
||||
piscina: Piscina,
|
||||
taskType: TaskTypes,
|
||||
helpers: JobHelpers
|
||||
): Promise<void> {
|
||||
// If the tasks run_at is older than the grace period, we ignore it. We
|
||||
// don't want to end up with old tasks being scheduled if we are backed up.
|
||||
if (new Date(helpers.job.run_at).getTime() < Date.now() - gracePeriodMilliSecondsByTaskType[taskType]) {
|
||||
status.warn('🔁', 'stale_scheduled_task_skipped', {
|
||||
taskType: taskType,
|
||||
runAt: helpers.job.run_at,
|
||||
})
|
||||
server.statsd?.increment('skipped_scheduled_tasks', { taskType })
|
||||
return
|
||||
}
|
||||
|
||||
if (server.USE_KAFKA_FOR_SCHEDULED_TASKS) {
|
||||
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
|
||||
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
|
||||
await server.kafkaProducer.producer.send({
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
|
||||
})
|
||||
server.statsd?.increment('queued_scheduled_task', { taskType })
|
||||
}
|
||||
} else {
|
||||
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
|
||||
status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`)
|
||||
await piscina.run({ task: taskType, args: { pluginConfigId } })
|
||||
server.statsd?.increment('completed_scheduled_task', { taskType })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const gracePeriodMilliSecondsByTaskType = {
|
||||
runEveryMinute: 60 * 1000,
|
||||
runEveryHour: 60 * 60 * 1000,
|
||||
runEveryDay: 24 * 60 * 60 * 1000,
|
||||
} as const
|
||||
|
@ -1,5 +1,5 @@
|
||||
import Piscina from '@posthog/piscina'
|
||||
import { CronItem, TaskList } from 'graphile-worker'
|
||||
import { CronItem, JobHelpers, TaskList } from 'graphile-worker'
|
||||
|
||||
import { EnqueuedPluginJob, Hub } from '../../types'
|
||||
import { status } from '../../utils/status'
|
||||
@ -78,9 +78,10 @@ export function getPluginJobHandlers(hub: Hub, graphileWorker: GraphileWorker, p
|
||||
|
||||
export function getScheduledTaskHandlers(hub: Hub, piscina: Piscina): TaskList {
|
||||
const scheduledTaskHandlers: TaskList = {
|
||||
runEveryMinute: async () => await runScheduledTasks(hub, piscina, 'runEveryMinute'),
|
||||
runEveryHour: async () => await runScheduledTasks(hub, piscina, 'runEveryHour'),
|
||||
runEveryDay: async () => await runScheduledTasks(hub, piscina, 'runEveryDay'),
|
||||
runEveryMinute: async (_, helpers: JobHelpers) =>
|
||||
await runScheduledTasks(hub, piscina, 'runEveryMinute', helpers),
|
||||
runEveryHour: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, piscina, 'runEveryHour', helpers),
|
||||
runEveryDay: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, piscina, 'runEveryDay', helpers),
|
||||
}
|
||||
|
||||
return scheduledTaskHandlers
|
||||
|
@ -0,0 +1,206 @@
|
||||
import Piscina from '@posthog/piscina'
|
||||
import { StatsD } from 'hot-shots'
|
||||
import { Batch, EachBatchHandler, Kafka, Producer } from 'kafkajs'
|
||||
|
||||
import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics'
|
||||
import { DependencyUnavailableError } from '../../utils/db/error'
|
||||
import { status } from '../../utils/status'
|
||||
import { instrumentEachBatch, setupEventHandlers } from './kafka-queue'
|
||||
|
||||
// The valid task types that can be scheduled.
|
||||
// TODO: not sure if there is another place that defines these but it would be
|
||||
// good to unify.
|
||||
const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const
|
||||
|
||||
export const startScheduledTasksConsumer = async ({
|
||||
kafka,
|
||||
piscina,
|
||||
producer,
|
||||
partitionConcurrency = 3,
|
||||
statsd,
|
||||
}: {
|
||||
kafka: Kafka
|
||||
piscina: Piscina
|
||||
producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic
|
||||
partitionConcurrency: number
|
||||
statsd?: StatsD
|
||||
}) => {
|
||||
/*
|
||||
|
||||
Consumes from the scheduled tasks topic, and executes them within a
|
||||
Piscina worker. Some features include:
|
||||
|
||||
1. timing out tasks to ensure we don't end up with backlogs.
|
||||
2. retrying on dependency failures (via not committing offsets on seom
|
||||
failues).
|
||||
3. only running one plugin config id task at a time, to avoid
|
||||
concurrency issues. This is done via partitioning of tasks in the
|
||||
Kafka topic.
|
||||
4. ensuring we only run one task per plugin config id per batch, to
|
||||
avoid running many tasks back to back in the case that we have a
|
||||
backlog of tasks in the topic.
|
||||
|
||||
TODO: add in some in partition concurrency control.
|
||||
|
||||
*/
|
||||
|
||||
const consumer = kafka.consumer({ groupId: 'scheduled-tasks-runner' })
|
||||
setupEventHandlers(consumer)
|
||||
|
||||
status.info('🔁', 'Starting scheduled tasks consumer')
|
||||
|
||||
const eachBatch: EachBatchHandler = async ({ batch, resolveOffset, heartbeat }) => {
|
||||
status.debug('🔁', 'Processing batch', { size: batch.messages.length })
|
||||
|
||||
const tasks = await getTasksFromBatch(batch, producer)
|
||||
|
||||
for (const { taskType, pluginConfigId, message } of tasks) {
|
||||
status.info('⏲️', 'running_scheduled_task', {
|
||||
taskType,
|
||||
pluginConfigId,
|
||||
})
|
||||
const startTime = performance.now()
|
||||
|
||||
// Make sure tasks can't run forever, according to `taskTimeouts`.
|
||||
const abortController = new AbortController()
|
||||
const timeout = setTimeout(() => {
|
||||
abortController.abort()
|
||||
status.warn('⚠️', 'scheduled_task_timed_out', {
|
||||
taskType,
|
||||
pluginConfigId,
|
||||
})
|
||||
}, taskTimeouts[taskType])
|
||||
|
||||
// Make sure we keep the heartbeat going while the tasks is
|
||||
// running.
|
||||
const heartbeatInterval = setInterval(() => heartbeat(), 1000)
|
||||
|
||||
try {
|
||||
// The part that actually runs the task.
|
||||
await piscina.run(
|
||||
{ task: taskType, args: { pluginConfigId: pluginConfigId } },
|
||||
{ signal: abortController.signal }
|
||||
)
|
||||
|
||||
resolveOffset(message.offset)
|
||||
status.info('⏲️', 'finished_scheduled_task', {
|
||||
taskType,
|
||||
pluginConfigId,
|
||||
durationSeconds: (performance.now() - startTime) / 1000,
|
||||
})
|
||||
statsd?.increment('completed_scheduled_task', { taskType })
|
||||
} catch (error) {
|
||||
// TODO: figure out a nice way to test this code path.
|
||||
|
||||
if (error instanceof DependencyUnavailableError) {
|
||||
// For errors relating to PostHog dependencies that are unavailable,
|
||||
// e.g. Postgres, Kafka, Redis, we don't want to log the error to Sentry
|
||||
// but rather bubble this up the stack for someone else to decide on
|
||||
// what to do with it.
|
||||
status.warn('⚠️', `dependency_unavailable`, {
|
||||
taskType,
|
||||
pluginConfigId,
|
||||
error: error,
|
||||
stack: error.stack,
|
||||
})
|
||||
statsd?.increment('retriable_scheduled_task', { taskType })
|
||||
throw error
|
||||
}
|
||||
|
||||
status.error('⚠️', 'scheduled_task_failed', {
|
||||
taskType: taskType,
|
||||
pluginConfigId,
|
||||
error: error,
|
||||
stack: error.stack,
|
||||
})
|
||||
resolveOffset(message.offset)
|
||||
statsd?.increment('failed_scheduled_tasks', { taskType })
|
||||
} finally {
|
||||
clearTimeout(timeout)
|
||||
clearInterval(heartbeatInterval)
|
||||
}
|
||||
|
||||
// After processing each message, we need to heartbeat to ensure
|
||||
// we don't get kicked out of the group. Note that although we call
|
||||
// this for each message, it's actually a no-op if we're not over
|
||||
// the heartbeatInterval.
|
||||
await heartbeat()
|
||||
}
|
||||
|
||||
status.info('✅', 'processed_batch', { batchSize: batch.messages.length, numberOfTasksExecuted: tasks.length })
|
||||
}
|
||||
|
||||
await consumer.connect()
|
||||
await consumer.subscribe({ topic: KAFKA_SCHEDULED_TASKS })
|
||||
await consumer.run({
|
||||
partitionsConsumedConcurrently: partitionConcurrency,
|
||||
eachBatch: async (payload) => {
|
||||
return await instrumentEachBatch(KAFKA_SCHEDULED_TASKS, eachBatch, payload, statsd)
|
||||
},
|
||||
})
|
||||
|
||||
return consumer
|
||||
}
|
||||
|
||||
const getTasksFromBatch = async (batch: Batch, producer: Producer) => {
|
||||
// In any one batch, we only want to run one task per plugin config id.
|
||||
// Hence here we dedupe the tasks by plugin config id and task type.
|
||||
const tasksbyTypeAndPluginConfigId = {} as Record<
|
||||
typeof taskTypes[number],
|
||||
Record<
|
||||
number,
|
||||
{ taskType: typeof taskTypes[number]; pluginConfigId: number; message: typeof batch.messages[number] }
|
||||
>
|
||||
>
|
||||
|
||||
for (const message of batch.messages) {
|
||||
if (!message.value) {
|
||||
status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, {
|
||||
value: message.value,
|
||||
})
|
||||
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
|
||||
continue
|
||||
}
|
||||
|
||||
let task: {
|
||||
taskType: typeof taskTypes[number]
|
||||
pluginConfigId: number
|
||||
}
|
||||
|
||||
try {
|
||||
task = JSON.parse(message.value.toString())
|
||||
} catch (error) {
|
||||
status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, {
|
||||
error: error.stack ?? error,
|
||||
})
|
||||
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
|
||||
continue
|
||||
}
|
||||
|
||||
if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
|
||||
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
|
||||
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
|
||||
continue
|
||||
}
|
||||
|
||||
tasksbyTypeAndPluginConfigId[task.taskType] ??= {}
|
||||
// It's important that we only keep the latest message for each,
|
||||
// such that we commit the offset at the end of the batch.
|
||||
tasksbyTypeAndPluginConfigId[task.taskType][task.pluginConfigId] = {
|
||||
taskType: task.taskType,
|
||||
pluginConfigId: task.pluginConfigId,
|
||||
message,
|
||||
}
|
||||
}
|
||||
|
||||
return Object.values(tasksbyTypeAndPluginConfigId)
|
||||
.map((tasksByPluginConfigId) => Object.values(tasksByPluginConfigId))
|
||||
.flat()
|
||||
.sort((a, b) => Number.parseInt(a.message.offset) - Number.parseInt(b.message.offset))
|
||||
}
|
||||
|
||||
const taskTimeouts = {
|
||||
runEveryMinute: 1000 * 60,
|
||||
runEveryHour: 1000 * 60 * 5,
|
||||
runEveryDay: 1000 * 60 * 5,
|
||||
} as const
|
@ -23,6 +23,7 @@ import { startAnonymousEventBufferConsumer } from './ingestion-queues/anonymous-
|
||||
import { startJobsConsumer } from './ingestion-queues/jobs-consumer'
|
||||
import { IngestionConsumer } from './ingestion-queues/kafka-queue'
|
||||
import { startQueues } from './ingestion-queues/queue'
|
||||
import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer'
|
||||
import { createHttpServer } from './services/http-server'
|
||||
import { createMmdbServer, performMmdbStalenessCheck, prepareMmdb } from './services/mmdb'
|
||||
|
||||
@ -85,6 +86,7 @@ export async function startPluginsServer(
|
||||
// meantime.
|
||||
let bufferConsumer: Consumer | undefined
|
||||
let jobsConsumer: Consumer | undefined
|
||||
let schedulerTasksConsumer: Consumer | undefined
|
||||
|
||||
let httpServer: Server | undefined // healthcheck server
|
||||
let mmdbServer: net.Server | undefined // geoip server
|
||||
@ -119,6 +121,7 @@ export async function startPluginsServer(
|
||||
graphileWorker?.stop(),
|
||||
bufferConsumer?.disconnect(),
|
||||
jobsConsumer?.disconnect(),
|
||||
schedulerTasksConsumer?.disconnect(),
|
||||
])
|
||||
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
@ -235,6 +238,16 @@ export async function startPluginsServer(
|
||||
await graphileWorker.connectProducer()
|
||||
await startGraphileWorker(hub, graphileWorker, piscina)
|
||||
|
||||
if (hub.capabilities.pluginScheduledTasks) {
|
||||
schedulerTasksConsumer = await startScheduledTasksConsumer({
|
||||
piscina: piscina,
|
||||
kafka: hub.kafka,
|
||||
producer: hub.kafkaProducer.producer,
|
||||
partitionConcurrency: serverConfig.KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY,
|
||||
statsd: hub.statsd,
|
||||
})
|
||||
}
|
||||
|
||||
if (hub.capabilities.processPluginJobs) {
|
||||
jobsConsumer = await startJobsConsumer({
|
||||
kafka: hub.kafka,
|
||||
|
@ -159,6 +159,7 @@ export interface PluginsServerConfig extends Record<string, any> {
|
||||
HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: number
|
||||
APP_METRICS_GATHERED_FOR_ALL: boolean
|
||||
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: number
|
||||
USE_KAFKA_FOR_SCHEDULED_TASKS: boolean
|
||||
}
|
||||
|
||||
export interface Hub extends PluginsServerConfig {
|
||||
|
@ -1,5 +1,9 @@
|
||||
import { Producer } from 'kafkajs'
|
||||
|
||||
import { KAFKA_SCHEDULED_TASKS } from '../../../src/config/kafka-topics'
|
||||
import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule'
|
||||
import { Hub } from '../../../src/types'
|
||||
import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper'
|
||||
import { UUID } from '../../../src/utils/utils'
|
||||
import { PromiseManager } from '../../../src/worker/vm/promise-manager'
|
||||
|
||||
@ -26,25 +30,133 @@ describe('Graphile Worker schedule', () => {
|
||||
runEveryHour: [4, 5, 6],
|
||||
runEveryDay: [7, 8, 9],
|
||||
},
|
||||
kafkaProducer: {
|
||||
producer: {
|
||||
send: jest.fn(),
|
||||
} as unknown as Producer,
|
||||
} as KafkaProducerWrapper,
|
||||
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
|
||||
}
|
||||
|
||||
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'iDontExist')
|
||||
expect(mockPiscina.run).not.toHaveBeenCalled()
|
||||
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryMinute', {
|
||||
job: { run_at: new Date() },
|
||||
} as any)
|
||||
|
||||
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryMinute')
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(1, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '1',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryMinute',
|
||||
pluginConfigId: 1,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(2, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '2',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryMinute',
|
||||
pluginConfigId: 2,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(3, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '3',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryMinute',
|
||||
pluginConfigId: 3,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(1, { args: { pluginConfigId: 1 }, task: 'runEveryMinute' })
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(2, { args: { pluginConfigId: 2 }, task: 'runEveryMinute' })
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(3, { args: { pluginConfigId: 3 }, task: 'runEveryMinute' })
|
||||
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour', {
|
||||
job: { run_at: new Date() },
|
||||
} as any)
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(4, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '4',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryHour',
|
||||
pluginConfigId: 4,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(5, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '5',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryHour',
|
||||
pluginConfigId: 5,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(6, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '6',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryHour',
|
||||
pluginConfigId: 6,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
|
||||
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour')
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(4, { args: { pluginConfigId: 4 }, task: 'runEveryHour' })
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(5, { args: { pluginConfigId: 5 }, task: 'runEveryHour' })
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(6, { args: { pluginConfigId: 6 }, task: 'runEveryHour' })
|
||||
|
||||
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay')
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(7, { args: { pluginConfigId: 7 }, task: 'runEveryDay' })
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(8, { args: { pluginConfigId: 8 }, task: 'runEveryDay' })
|
||||
expect(mockPiscina.run).toHaveBeenNthCalledWith(9, { args: { pluginConfigId: 9 }, task: 'runEveryDay' })
|
||||
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay', {
|
||||
job: { run_at: new Date() },
|
||||
} as any)
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(7, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '7',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryDay',
|
||||
pluginConfigId: 7,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(8, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '8',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryDay',
|
||||
pluginConfigId: 8,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(9, {
|
||||
topic: KAFKA_SCHEDULED_TASKS,
|
||||
messages: [
|
||||
{
|
||||
key: '9',
|
||||
value: JSON.stringify({
|
||||
taskType: 'runEveryDay',
|
||||
pluginConfigId: 9,
|
||||
}),
|
||||
},
|
||||
],
|
||||
})
|
||||
})
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user