From 1e6c062095c31c69cd66dfcf8375554cb31c5181 Mon Sep 17 00:00:00 2001 From: Harry Waye Date: Mon, 5 Dec 2022 12:30:52 +0000 Subject: [PATCH] feat(plugin-server): distribute scheduled tasks i.e. runEveryX (#13124) * chore(plugin-server): disrtibute scheduled tasks Changes I've made here from the original PR: 1. add some logging of task run times 2. add concurrency, except only one task of a plugin will run at a time 3. add a timeout to task run times This reverts commit 23db43a0dc0706a8c39ca63a576e0ec02dea7189. * chore: add timings for scheduled tasks runtime * chore: add timeouts for scheduled tasks * chore: clarify duration unit * chore: deduplicate tasks in a batch, add partition concurrency * chore: add flag to switch between old and new behaviour This defaults to new, but can be set to old by setting environment variable `USE_KAFKA_FOR_SCHEDULED_TASKS=false` * fix tests * enable USE_KAFKA_FOR_SCHEDULED_TASKS in tests --- .../functional_tests/jest.global-teardown.ts | 46 ++++ .../scheduled-tasks-runner.test.ts | 120 ++++++++++ plugin-server/jest.config.functional.js | 1 + plugin-server/src/config/config.ts | 2 + plugin-server/src/config/kafka-topics.ts | 2 + .../src/main/graphile-worker/schedule.ts | 47 +++- .../src/main/graphile-worker/worker-setup.ts | 9 +- .../scheduled-tasks-consumer.ts | 206 ++++++++++++++++++ plugin-server/src/main/pluginsServer.ts | 13 ++ plugin-server/src/types.ts | 1 + .../tests/main/jobs/schedule.test.ts | 142 ++++++++++-- 11 files changed, 565 insertions(+), 24 deletions(-) create mode 100644 plugin-server/functional_tests/jest.global-teardown.ts create mode 100644 plugin-server/functional_tests/scheduled-tasks-runner.test.ts create mode 100644 plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts diff --git a/plugin-server/functional_tests/jest.global-teardown.ts b/plugin-server/functional_tests/jest.global-teardown.ts new file mode 100644 index 00000000000..aa18fcd3fee --- /dev/null +++ b/plugin-server/functional_tests/jest.global-teardown.ts @@ -0,0 +1,46 @@ +import { assert } from 'console' +import { Kafka, logLevel } from 'kafkajs' + +import { defaultConfig } from '../src/config/config' + +export default async function () { + // Ensure add consumer groups hae zero lag. This is intended to catch cases + // where we have managed to process a message but then failed to commit the + // offset, leaving the consumer group in a bad state. + + const kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING }) + const admin = kafka.admin() + try { + await admin.connect() + const topics = await admin.listTopics() + const topicOffsets = Object.fromEntries( + await Promise.all(topics.map(async (topic) => [topic, await admin.fetchTopicOffsets(topic)])) + ) + + const { groups } = await admin.listGroups() + const consumerGroupOffsets = await Promise.all( + groups.map(async ({ groupId }) => [groupId, await admin.fetchOffsets({ groupId })] as const) + ) + + for (const [groupId, offsets] of consumerGroupOffsets) { + for (const { topic, partitions } of offsets) { + for (const { partition, offset } of partitions) { + console.debug( + `Checking ${groupId} ${topic} ${partition} ${offset} ${topicOffsets[topic][partition].offset}` + ) + assert( + topicOffsets[topic][partition].offset === offset, + `Consumer group ${groupId} has lag on ${topic}[${partition}]: ${{ + lastOffset: topicOffsets[topic][partition].offset, + consumerOffset: offset, + }}` + ) + } + } + } + } catch (error) { + throw error + } finally { + await admin.disconnect() + } +} diff --git a/plugin-server/functional_tests/scheduled-tasks-runner.test.ts b/plugin-server/functional_tests/scheduled-tasks-runner.test.ts new file mode 100644 index 00000000000..1e1f098c8f7 --- /dev/null +++ b/plugin-server/functional_tests/scheduled-tasks-runner.test.ts @@ -0,0 +1,120 @@ +import Redis from 'ioredis' +import { Consumer, Kafka, KafkaMessage, logLevel, Partitioners, Producer } from 'kafkajs' +import { Pool } from 'pg' +import { v4 as uuidv4 } from 'uuid' + +import { defaultConfig } from '../src/config/config' +import { delayUntilEventIngested } from '../tests/helpers/clickhouse' + +let producer: Producer +let postgres: Pool // NOTE: we use a Pool here but it's probably not necessary, but for instance `insertRow` uses a Pool. +let kafka: Kafka +let redis: Redis.Redis + +beforeAll(async () => { + // Setup connections to kafka, clickhouse, and postgres + postgres = new Pool({ + connectionString: defaultConfig.DATABASE_URL!, + // We use a pool only for typings sake, but we don't actually need to, + // so set max connections to 1. + max: 1, + }) + kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING }) + producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }) + await producer.connect() + redis = new Redis(defaultConfig.REDIS_URL) +}) + +afterAll(async () => { + await Promise.all([producer.disconnect(), postgres.end(), redis.disconnect()]) +}) + +// Test out some error cases that we wouldn't be able to handle without +// producing to the jobs queue directly. + +let dlq: KafkaMessage[] +let dlqConsumer: Consumer + +beforeAll(async () => { + dlq = [] + dlqConsumer = kafka.consumer({ groupId: 'scheduled-tasks-consumer-test' }) + await dlqConsumer.subscribe({ topic: 'scheduled_tasks_dlq' }) + await dlqConsumer.run({ + eachMessage: ({ message }) => { + dlq.push(message) + return Promise.resolve() + }, + }) +}) + +afterAll(async () => { + await dlqConsumer.disconnect() +}) + +test.concurrent(`handles empty messages`, async () => { + const key = uuidv4() + + await producer.send({ + topic: 'scheduled_tasks', + messages: [ + { + key: key, + value: null, + }, + ], + }) + + const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) + expect(messages.length).toBe(1) +}) + +test.concurrent(`handles invalid JSON`, async () => { + const key = uuidv4() + + await producer.send({ + topic: 'scheduled_tasks', + messages: [ + { + key: key, + value: 'invalid json', + }, + ], + }) + + const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) + expect(messages.length).toBe(1) +}) + +test.concurrent(`handles invalid taskType`, async () => { + const key = uuidv4() + + await producer.send({ + topic: 'scheduled_tasks', + messages: [ + { + key: key, + value: JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 }), + }, + ], + }) + + const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) + expect(messages.length).toBe(1) +}) + +test.concurrent(`handles invalid pluginConfigId`, async () => { + const key = uuidv4() + + await producer.send({ + topic: 'scheduled_tasks', + messages: [ + { + key: key, + value: JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' }), + }, + ], + }) + + const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) + expect(messages.length).toBe(1) +}) diff --git a/plugin-server/jest.config.functional.js b/plugin-server/jest.config.functional.js index 237b6e7915d..44c1a77ebe2 100644 --- a/plugin-server/jest.config.functional.js +++ b/plugin-server/jest.config.functional.js @@ -14,6 +14,7 @@ module.exports = { testEnvironment: 'node', testMatch: ['/functional_tests/**/*.test.ts'], setupFilesAfterEnv: ['/functional_tests/jest.setup.ts'], + globalTeardown: '/functional_tests/jest.global-teardown.ts', testTimeout: 60000, maxConcurrency: 10, maxWorkers: 6, diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index 8295beee156..9faaf6267ee 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -103,6 +103,7 @@ export function getDefaultConfig(): PluginsServerConfig { HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: 1.5, APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false, MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0, + USE_KAFKA_FOR_SCHEDULED_TASKS: true, } } @@ -184,6 +185,7 @@ export function getConfigHelp(): Record { OBJECT_STORAGE_BUCKET: 'the object storage bucket name', HISTORICAL_EXPORTS_ENABLED: 'enables historical exports for export apps', APP_METRICS_GATHERED_FOR_ALL: 'whether to gather app metrics for all teams', + USE_KAFKA_FOR_SCHEDULED_TASKS: 'distribute scheduled tasks across the scheduler workers', } } diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index c6c80be4ef5..b6ec898e56b 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -19,4 +19,6 @@ export const KAFKA_INGESTION_WARNINGS = `${prefix}clickhouse_ingestion_warnings$ export const KAFKA_APP_METRICS = `${prefix}clickhouse_app_metrics${suffix}` export const KAFKA_JOBS = `${prefix}jobs${suffix}` export const KAFKA_JOBS_DLQ = `${prefix}jobs_dlq${suffix}` +export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}` +export const KAFKA_SCHEDULED_TASKS_DLQ = `${prefix}scheduled_tasks_dlq${suffix}` export const KAFKA_METRICS_TIME_TO_SEE_DATA = `${prefix}clickhouse_metrics_time_to_see_data${suffix}` diff --git a/plugin-server/src/main/graphile-worker/schedule.ts b/plugin-server/src/main/graphile-worker/schedule.ts index 50e0936c941..9a21a101c41 100644 --- a/plugin-server/src/main/graphile-worker/schedule.ts +++ b/plugin-server/src/main/graphile-worker/schedule.ts @@ -1,9 +1,13 @@ import Piscina from '@posthog/piscina' +import { JobHelpers } from 'graphile-worker' +import { KAFKA_SCHEDULED_TASKS } from '../../config/kafka-topics' import { Hub, PluginConfigId } from '../../types' import { status } from '../../utils/status' import { delay } from '../../utils/utils' +type TaskTypes = 'runEveryMinute' | 'runEveryHour' | 'runEveryDay' + export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000): Promise { let allThreadsReady = false while (maxIterations--) { @@ -28,10 +32,43 @@ export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000) throw new Error('Could not load plugin schedule in time') } -export async function runScheduledTasks(server: Hub, piscina: Piscina, taskType: string): Promise { - for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { - status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`) - await piscina.run({ task: taskType, args: { pluginConfigId } }) - server.statsd?.increment('completed_scheduled_task', { taskType }) +export async function runScheduledTasks( + server: Hub, + piscina: Piscina, + taskType: TaskTypes, + helpers: JobHelpers +): Promise { + // If the tasks run_at is older than the grace period, we ignore it. We + // don't want to end up with old tasks being scheduled if we are backed up. + if (new Date(helpers.job.run_at).getTime() < Date.now() - gracePeriodMilliSecondsByTaskType[taskType]) { + status.warn('🔁', 'stale_scheduled_task_skipped', { + taskType: taskType, + runAt: helpers.job.run_at, + }) + server.statsd?.increment('skipped_scheduled_tasks', { taskType }) + return + } + + if (server.USE_KAFKA_FOR_SCHEDULED_TASKS) { + for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { + status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId }) + await server.kafkaProducer.producer.send({ + topic: KAFKA_SCHEDULED_TASKS, + messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], + }) + server.statsd?.increment('queued_scheduled_task', { taskType }) + } + } else { + for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { + status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`) + await piscina.run({ task: taskType, args: { pluginConfigId } }) + server.statsd?.increment('completed_scheduled_task', { taskType }) + } } } + +const gracePeriodMilliSecondsByTaskType = { + runEveryMinute: 60 * 1000, + runEveryHour: 60 * 60 * 1000, + runEveryDay: 24 * 60 * 60 * 1000, +} as const diff --git a/plugin-server/src/main/graphile-worker/worker-setup.ts b/plugin-server/src/main/graphile-worker/worker-setup.ts index 771cc05c09a..7351a4fbebc 100644 --- a/plugin-server/src/main/graphile-worker/worker-setup.ts +++ b/plugin-server/src/main/graphile-worker/worker-setup.ts @@ -1,5 +1,5 @@ import Piscina from '@posthog/piscina' -import { CronItem, TaskList } from 'graphile-worker' +import { CronItem, JobHelpers, TaskList } from 'graphile-worker' import { EnqueuedPluginJob, Hub } from '../../types' import { status } from '../../utils/status' @@ -78,9 +78,10 @@ export function getPluginJobHandlers(hub: Hub, graphileWorker: GraphileWorker, p export function getScheduledTaskHandlers(hub: Hub, piscina: Piscina): TaskList { const scheduledTaskHandlers: TaskList = { - runEveryMinute: async () => await runScheduledTasks(hub, piscina, 'runEveryMinute'), - runEveryHour: async () => await runScheduledTasks(hub, piscina, 'runEveryHour'), - runEveryDay: async () => await runScheduledTasks(hub, piscina, 'runEveryDay'), + runEveryMinute: async (_, helpers: JobHelpers) => + await runScheduledTasks(hub, piscina, 'runEveryMinute', helpers), + runEveryHour: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, piscina, 'runEveryHour', helpers), + runEveryDay: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, piscina, 'runEveryDay', helpers), } return scheduledTaskHandlers diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts new file mode 100644 index 00000000000..6b99263b9ab --- /dev/null +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -0,0 +1,206 @@ +import Piscina from '@posthog/piscina' +import { StatsD } from 'hot-shots' +import { Batch, EachBatchHandler, Kafka, Producer } from 'kafkajs' + +import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics' +import { DependencyUnavailableError } from '../../utils/db/error' +import { status } from '../../utils/status' +import { instrumentEachBatch, setupEventHandlers } from './kafka-queue' + +// The valid task types that can be scheduled. +// TODO: not sure if there is another place that defines these but it would be +// good to unify. +const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const + +export const startScheduledTasksConsumer = async ({ + kafka, + piscina, + producer, + partitionConcurrency = 3, + statsd, +}: { + kafka: Kafka + piscina: Piscina + producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic + partitionConcurrency: number + statsd?: StatsD +}) => { + /* + + Consumes from the scheduled tasks topic, and executes them within a + Piscina worker. Some features include: + + 1. timing out tasks to ensure we don't end up with backlogs. + 2. retrying on dependency failures (via not committing offsets on seom + failues). + 3. only running one plugin config id task at a time, to avoid + concurrency issues. This is done via partitioning of tasks in the + Kafka topic. + 4. ensuring we only run one task per plugin config id per batch, to + avoid running many tasks back to back in the case that we have a + backlog of tasks in the topic. + + TODO: add in some in partition concurrency control. + + */ + + const consumer = kafka.consumer({ groupId: 'scheduled-tasks-runner' }) + setupEventHandlers(consumer) + + status.info('🔁', 'Starting scheduled tasks consumer') + + const eachBatch: EachBatchHandler = async ({ batch, resolveOffset, heartbeat }) => { + status.debug('🔁', 'Processing batch', { size: batch.messages.length }) + + const tasks = await getTasksFromBatch(batch, producer) + + for (const { taskType, pluginConfigId, message } of tasks) { + status.info('⏲️', 'running_scheduled_task', { + taskType, + pluginConfigId, + }) + const startTime = performance.now() + + // Make sure tasks can't run forever, according to `taskTimeouts`. + const abortController = new AbortController() + const timeout = setTimeout(() => { + abortController.abort() + status.warn('⚠️', 'scheduled_task_timed_out', { + taskType, + pluginConfigId, + }) + }, taskTimeouts[taskType]) + + // Make sure we keep the heartbeat going while the tasks is + // running. + const heartbeatInterval = setInterval(() => heartbeat(), 1000) + + try { + // The part that actually runs the task. + await piscina.run( + { task: taskType, args: { pluginConfigId: pluginConfigId } }, + { signal: abortController.signal } + ) + + resolveOffset(message.offset) + status.info('⏲️', 'finished_scheduled_task', { + taskType, + pluginConfigId, + durationSeconds: (performance.now() - startTime) / 1000, + }) + statsd?.increment('completed_scheduled_task', { taskType }) + } catch (error) { + // TODO: figure out a nice way to test this code path. + + if (error instanceof DependencyUnavailableError) { + // For errors relating to PostHog dependencies that are unavailable, + // e.g. Postgres, Kafka, Redis, we don't want to log the error to Sentry + // but rather bubble this up the stack for someone else to decide on + // what to do with it. + status.warn('⚠️', `dependency_unavailable`, { + taskType, + pluginConfigId, + error: error, + stack: error.stack, + }) + statsd?.increment('retriable_scheduled_task', { taskType }) + throw error + } + + status.error('⚠️', 'scheduled_task_failed', { + taskType: taskType, + pluginConfigId, + error: error, + stack: error.stack, + }) + resolveOffset(message.offset) + statsd?.increment('failed_scheduled_tasks', { taskType }) + } finally { + clearTimeout(timeout) + clearInterval(heartbeatInterval) + } + + // After processing each message, we need to heartbeat to ensure + // we don't get kicked out of the group. Note that although we call + // this for each message, it's actually a no-op if we're not over + // the heartbeatInterval. + await heartbeat() + } + + status.info('✅', 'processed_batch', { batchSize: batch.messages.length, numberOfTasksExecuted: tasks.length }) + } + + await consumer.connect() + await consumer.subscribe({ topic: KAFKA_SCHEDULED_TASKS }) + await consumer.run({ + partitionsConsumedConcurrently: partitionConcurrency, + eachBatch: async (payload) => { + return await instrumentEachBatch(KAFKA_SCHEDULED_TASKS, eachBatch, payload, statsd) + }, + }) + + return consumer +} + +const getTasksFromBatch = async (batch: Batch, producer: Producer) => { + // In any one batch, we only want to run one task per plugin config id. + // Hence here we dedupe the tasks by plugin config id and task type. + const tasksbyTypeAndPluginConfigId = {} as Record< + typeof taskTypes[number], + Record< + number, + { taskType: typeof taskTypes[number]; pluginConfigId: number; message: typeof batch.messages[number] } + > + > + + for (const message of batch.messages) { + if (!message.value) { + status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, { + value: message.value, + }) + await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) + continue + } + + let task: { + taskType: typeof taskTypes[number] + pluginConfigId: number + } + + try { + task = JSON.parse(message.value.toString()) + } catch (error) { + status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, { + error: error.stack ?? error, + }) + await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) + continue + } + + if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) { + status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task) + await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) + continue + } + + tasksbyTypeAndPluginConfigId[task.taskType] ??= {} + // It's important that we only keep the latest message for each, + // such that we commit the offset at the end of the batch. + tasksbyTypeAndPluginConfigId[task.taskType][task.pluginConfigId] = { + taskType: task.taskType, + pluginConfigId: task.pluginConfigId, + message, + } + } + + return Object.values(tasksbyTypeAndPluginConfigId) + .map((tasksByPluginConfigId) => Object.values(tasksByPluginConfigId)) + .flat() + .sort((a, b) => Number.parseInt(a.message.offset) - Number.parseInt(b.message.offset)) +} + +const taskTimeouts = { + runEveryMinute: 1000 * 60, + runEveryHour: 1000 * 60 * 5, + runEveryDay: 1000 * 60 * 5, +} as const diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 329ca2e8468..d94ed5930b3 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -23,6 +23,7 @@ import { startAnonymousEventBufferConsumer } from './ingestion-queues/anonymous- import { startJobsConsumer } from './ingestion-queues/jobs-consumer' import { IngestionConsumer } from './ingestion-queues/kafka-queue' import { startQueues } from './ingestion-queues/queue' +import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' import { createHttpServer } from './services/http-server' import { createMmdbServer, performMmdbStalenessCheck, prepareMmdb } from './services/mmdb' @@ -85,6 +86,7 @@ export async function startPluginsServer( // meantime. let bufferConsumer: Consumer | undefined let jobsConsumer: Consumer | undefined + let schedulerTasksConsumer: Consumer | undefined let httpServer: Server | undefined // healthcheck server let mmdbServer: net.Server | undefined // geoip server @@ -119,6 +121,7 @@ export async function startPluginsServer( graphileWorker?.stop(), bufferConsumer?.disconnect(), jobsConsumer?.disconnect(), + schedulerTasksConsumer?.disconnect(), ]) await new Promise((resolve, reject) => @@ -235,6 +238,16 @@ export async function startPluginsServer( await graphileWorker.connectProducer() await startGraphileWorker(hub, graphileWorker, piscina) + if (hub.capabilities.pluginScheduledTasks) { + schedulerTasksConsumer = await startScheduledTasksConsumer({ + piscina: piscina, + kafka: hub.kafka, + producer: hub.kafkaProducer.producer, + partitionConcurrency: serverConfig.KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY, + statsd: hub.statsd, + }) + } + if (hub.capabilities.processPluginJobs) { jobsConsumer = await startJobsConsumer({ kafka: hub.kafka, diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 06296a7cb8e..838e8897a06 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -159,6 +159,7 @@ export interface PluginsServerConfig extends Record { HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: number APP_METRICS_GATHERED_FOR_ALL: boolean MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: number + USE_KAFKA_FOR_SCHEDULED_TASKS: boolean } export interface Hub extends PluginsServerConfig { diff --git a/plugin-server/tests/main/jobs/schedule.test.ts b/plugin-server/tests/main/jobs/schedule.test.ts index 8341a8ebe75..0b4a1cc66d9 100644 --- a/plugin-server/tests/main/jobs/schedule.test.ts +++ b/plugin-server/tests/main/jobs/schedule.test.ts @@ -1,5 +1,9 @@ +import { Producer } from 'kafkajs' + +import { KAFKA_SCHEDULED_TASKS } from '../../../src/config/kafka-topics' import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule' import { Hub } from '../../../src/types' +import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper' import { UUID } from '../../../src/utils/utils' import { PromiseManager } from '../../../src/worker/vm/promise-manager' @@ -26,25 +30,133 @@ describe('Graphile Worker schedule', () => { runEveryHour: [4, 5, 6], runEveryDay: [7, 8, 9], }, + kafkaProducer: { + producer: { + send: jest.fn(), + } as unknown as Producer, + } as KafkaProducerWrapper, + USE_KAFKA_FOR_SCHEDULED_TASKS: true, } - await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'iDontExist') - expect(mockPiscina.run).not.toHaveBeenCalled() + await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryMinute', { + job: { run_at: new Date() }, + } as any) - await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryMinute') + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(1, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '1', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 1, + }), + }, + ], + }) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(2, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '2', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 2, + }), + }, + ], + }) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(3, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '3', + value: JSON.stringify({ + taskType: 'runEveryMinute', + pluginConfigId: 3, + }), + }, + ], + }) - expect(mockPiscina.run).toHaveBeenNthCalledWith(1, { args: { pluginConfigId: 1 }, task: 'runEveryMinute' }) - expect(mockPiscina.run).toHaveBeenNthCalledWith(2, { args: { pluginConfigId: 2 }, task: 'runEveryMinute' }) - expect(mockPiscina.run).toHaveBeenNthCalledWith(3, { args: { pluginConfigId: 3 }, task: 'runEveryMinute' }) + await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour', { + job: { run_at: new Date() }, + } as any) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(4, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '4', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 4, + }), + }, + ], + }) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(5, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '5', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 5, + }), + }, + ], + }) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(6, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '6', + value: JSON.stringify({ + taskType: 'runEveryHour', + pluginConfigId: 6, + }), + }, + ], + }) - await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour') - expect(mockPiscina.run).toHaveBeenNthCalledWith(4, { args: { pluginConfigId: 4 }, task: 'runEveryHour' }) - expect(mockPiscina.run).toHaveBeenNthCalledWith(5, { args: { pluginConfigId: 5 }, task: 'runEveryHour' }) - expect(mockPiscina.run).toHaveBeenNthCalledWith(6, { args: { pluginConfigId: 6 }, task: 'runEveryHour' }) - - await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay') - expect(mockPiscina.run).toHaveBeenNthCalledWith(7, { args: { pluginConfigId: 7 }, task: 'runEveryDay' }) - expect(mockPiscina.run).toHaveBeenNthCalledWith(8, { args: { pluginConfigId: 8 }, task: 'runEveryDay' }) - expect(mockPiscina.run).toHaveBeenNthCalledWith(9, { args: { pluginConfigId: 9 }, task: 'runEveryDay' }) + await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay', { + job: { run_at: new Date() }, + } as any) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(7, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '7', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 7, + }), + }, + ], + }) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(8, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '8', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 8, + }), + }, + ], + }) + expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(9, { + topic: KAFKA_SCHEDULED_TASKS, + messages: [ + { + key: '9', + value: JSON.stringify({ + taskType: 'runEveryDay', + pluginConfigId: 9, + }), + }, + ], + }) }) })