From 23db43a0dc0706a8c39ca63a576e0ec02dea7189 Mon Sep 17 00:00:00 2001 From: Harry Waye Date: Sat, 3 Dec 2022 00:13:27 +0000 Subject: [PATCH] Revert "Revert "Revert "fix(plugin-server): ignore old cron tasks from graphile-worker """ (#13107) Revert "Revert "Revert "fix(plugin-server): ignore old cron tasks from graphile-worker "" (#13100)" This reverts commit 8eec4c9346967add7e2544939452e993c5cd60ab. --- .../scheduled-tasks-runner.test.ts | 120 --------------- plugin-server/src/config/kafka-topics.ts | 2 - .../src/main/graphile-worker/schedule.ts | 32 +--- .../src/main/graphile-worker/worker-setup.ts | 12 +- .../scheduled-tasks-consumer.ts | 125 ---------------- plugin-server/src/main/pluginsServer.ts | 12 -- .../tests/main/jobs/schedule.test.ts | 139 +++--------------- 7 files changed, 29 insertions(+), 413 deletions(-) delete mode 100644 plugin-server/functional_tests/scheduled-tasks-runner.test.ts delete mode 100644 plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts diff --git a/plugin-server/functional_tests/scheduled-tasks-runner.test.ts b/plugin-server/functional_tests/scheduled-tasks-runner.test.ts deleted file mode 100644 index 1e1f098c8f7..00000000000 --- a/plugin-server/functional_tests/scheduled-tasks-runner.test.ts +++ /dev/null @@ -1,120 +0,0 @@ -import Redis from 'ioredis' -import { Consumer, Kafka, KafkaMessage, logLevel, Partitioners, Producer } from 'kafkajs' -import { Pool } from 'pg' -import { v4 as uuidv4 } from 'uuid' - -import { defaultConfig } from '../src/config/config' -import { delayUntilEventIngested } from '../tests/helpers/clickhouse' - -let producer: Producer -let postgres: Pool // NOTE: we use a Pool here but it's probably not necessary, but for instance `insertRow` uses a Pool. -let kafka: Kafka -let redis: Redis.Redis - -beforeAll(async () => { - // Setup connections to kafka, clickhouse, and postgres - postgres = new Pool({ - connectionString: defaultConfig.DATABASE_URL!, - // We use a pool only for typings sake, but we don't actually need to, - // so set max connections to 1. - max: 1, - }) - kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING }) - producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner }) - await producer.connect() - redis = new Redis(defaultConfig.REDIS_URL) -}) - -afterAll(async () => { - await Promise.all([producer.disconnect(), postgres.end(), redis.disconnect()]) -}) - -// Test out some error cases that we wouldn't be able to handle without -// producing to the jobs queue directly. - -let dlq: KafkaMessage[] -let dlqConsumer: Consumer - -beforeAll(async () => { - dlq = [] - dlqConsumer = kafka.consumer({ groupId: 'scheduled-tasks-consumer-test' }) - await dlqConsumer.subscribe({ topic: 'scheduled_tasks_dlq' }) - await dlqConsumer.run({ - eachMessage: ({ message }) => { - dlq.push(message) - return Promise.resolve() - }, - }) -}) - -afterAll(async () => { - await dlqConsumer.disconnect() -}) - -test.concurrent(`handles empty messages`, async () => { - const key = uuidv4() - - await producer.send({ - topic: 'scheduled_tasks', - messages: [ - { - key: key, - value: null, - }, - ], - }) - - const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) - expect(messages.length).toBe(1) -}) - -test.concurrent(`handles invalid JSON`, async () => { - const key = uuidv4() - - await producer.send({ - topic: 'scheduled_tasks', - messages: [ - { - key: key, - value: 'invalid json', - }, - ], - }) - - const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) - expect(messages.length).toBe(1) -}) - -test.concurrent(`handles invalid taskType`, async () => { - const key = uuidv4() - - await producer.send({ - topic: 'scheduled_tasks', - messages: [ - { - key: key, - value: JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 }), - }, - ], - }) - - const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) - expect(messages.length).toBe(1) -}) - -test.concurrent(`handles invalid pluginConfigId`, async () => { - const key = uuidv4() - - await producer.send({ - topic: 'scheduled_tasks', - messages: [ - { - key: key, - value: JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' }), - }, - ], - }) - - const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key)) - expect(messages.length).toBe(1) -}) diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index b6ec898e56b..c6c80be4ef5 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -19,6 +19,4 @@ export const KAFKA_INGESTION_WARNINGS = `${prefix}clickhouse_ingestion_warnings$ export const KAFKA_APP_METRICS = `${prefix}clickhouse_app_metrics${suffix}` export const KAFKA_JOBS = `${prefix}jobs${suffix}` export const KAFKA_JOBS_DLQ = `${prefix}jobs_dlq${suffix}` -export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}` -export const KAFKA_SCHEDULED_TASKS_DLQ = `${prefix}scheduled_tasks_dlq${suffix}` export const KAFKA_METRICS_TIME_TO_SEE_DATA = `${prefix}clickhouse_metrics_time_to_see_data${suffix}` diff --git a/plugin-server/src/main/graphile-worker/schedule.ts b/plugin-server/src/main/graphile-worker/schedule.ts index 506431460ba..50e0936c941 100644 --- a/plugin-server/src/main/graphile-worker/schedule.ts +++ b/plugin-server/src/main/graphile-worker/schedule.ts @@ -1,13 +1,9 @@ import Piscina from '@posthog/piscina' -import { JobHelpers } from 'graphile-worker' -import { KAFKA_SCHEDULED_TASKS } from '../../config/kafka-topics' import { Hub, PluginConfigId } from '../../types' import { status } from '../../utils/status' import { delay } from '../../utils/utils' -type TaskTypes = 'runEveryMinute' | 'runEveryHour' | 'runEveryDay' - export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000): Promise { let allThreadsReady = false while (maxIterations--) { @@ -32,30 +28,10 @@ export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000) throw new Error('Could not load plugin schedule in time') } -export async function runScheduledTasks(server: Hub, taskType: TaskTypes, helpers: JobHelpers): Promise { - // If the tasks run_at is older than the grace period, we ignore it. We - // don't want to end up with old tasks being scheduled if we are backed up. - if (new Date(helpers.job.run_at).getTime() < Date.now() - gracePeriodMilliSecondsByTaskType[taskType]) { - status.warn('🔁', 'stale_scheduled_task_skipped', { - taskType: taskType, - runAt: helpers.job.run_at, - }) - server.statsd?.increment('skipped_scheduled_tasks', { taskType }) - return - } - +export async function runScheduledTasks(server: Hub, piscina: Piscina, taskType: string): Promise { for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { - status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId }) - await server.kafkaProducer.producer.send({ - topic: KAFKA_SCHEDULED_TASKS, - messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], - }) - server.statsd?.increment('queued_scheduled_task', { taskType }) + status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`) + await piscina.run({ task: taskType, args: { pluginConfigId } }) + server.statsd?.increment('completed_scheduled_task', { taskType }) } } - -const gracePeriodMilliSecondsByTaskType = { - runEveryMinute: 60 * 1000, - runEveryHour: 60 * 60 * 1000, - runEveryDay: 24 * 60 * 60 * 1000, -} as const diff --git a/plugin-server/src/main/graphile-worker/worker-setup.ts b/plugin-server/src/main/graphile-worker/worker-setup.ts index a24258300ed..771cc05c09a 100644 --- a/plugin-server/src/main/graphile-worker/worker-setup.ts +++ b/plugin-server/src/main/graphile-worker/worker-setup.ts @@ -1,5 +1,5 @@ import Piscina from '@posthog/piscina' -import { CronItem, JobHelpers, TaskList } from 'graphile-worker' +import { CronItem, TaskList } from 'graphile-worker' import { EnqueuedPluginJob, Hub } from '../../types' import { status } from '../../utils/status' @@ -52,7 +52,7 @@ export async function startGraphileWorker(hub: Hub, graphileWorker: GraphileWork jobHandlers = { ...jobHandlers, - ...getScheduledTaskHandlers(hub), + ...getScheduledTaskHandlers(hub, piscina), } status.info('🔄', 'Graphile Worker: set up scheduled task handlers...') @@ -76,11 +76,11 @@ export function getPluginJobHandlers(hub: Hub, graphileWorker: GraphileWorker, p return pluginJobHandlers } -export function getScheduledTaskHandlers(hub: Hub): TaskList { +export function getScheduledTaskHandlers(hub: Hub, piscina: Piscina): TaskList { const scheduledTaskHandlers: TaskList = { - runEveryMinute: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, 'runEveryMinute', helpers), - runEveryHour: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, 'runEveryHour', helpers), - runEveryDay: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, 'runEveryDay', helpers), + runEveryMinute: async () => await runScheduledTasks(hub, piscina, 'runEveryMinute'), + runEveryHour: async () => await runScheduledTasks(hub, piscina, 'runEveryHour'), + runEveryDay: async () => await runScheduledTasks(hub, piscina, 'runEveryDay'), } return scheduledTaskHandlers diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts deleted file mode 100644 index f2569fca075..00000000000 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ /dev/null @@ -1,125 +0,0 @@ -import Piscina from '@posthog/piscina' -import { StatsD } from 'hot-shots' -import { EachBatchHandler, Kafka, Producer } from 'kafkajs' - -import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics' -import { DependencyUnavailableError } from '../../utils/db/error' -import { status } from '../../utils/status' -import { instrumentEachBatch, setupEventHandlers } from './kafka-queue' - -// The valid task types that can be scheduled. -// TODO: not sure if there is another place that defines these but it would be -// good to unify. -const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const - -export const startScheduledTasksConsumer = async ({ - kafka, - piscina, - producer, - statsd, -}: { - kafka: Kafka - piscina: Piscina - producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic - statsd?: StatsD -}) => { - /* - Consumes from the scheduled tasks topic, and executes them within a - Piscina worker. - */ - - const consumer = kafka.consumer({ groupId: 'scheduled-tasks-runner' }) - setupEventHandlers(consumer) - - status.info('🔁', 'Starting scheduled tasks consumer') - - const eachBatch: EachBatchHandler = async ({ batch, resolveOffset, heartbeat }) => { - status.debug('🔁', 'Processing batch', { size: batch.messages.length }) - for (const message of batch.messages) { - if (!message.value) { - status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, { - value: message.value, - }) - await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) - resolveOffset(message.offset) - continue - } - - let task: { - taskType: typeof taskTypes[number] - pluginConfigId: number - } - - try { - task = JSON.parse(message.value.toString()) - } catch (error) { - status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, { - error: error.stack ?? error, - }) - await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) - resolveOffset(message.offset) - continue - } - - if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) { - status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task) - await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) - resolveOffset(message.offset) - continue - } - - status.debug('⬆️', 'Running scheduled task', task) - - try { - status.info('⏲️', 'running_scheduled_task', { - taskType: task.taskType, - pluginConfigId: task.pluginConfigId, - }) - await piscina.run({ task: task.taskType, args: { pluginConfigId: task.pluginConfigId } }) - resolveOffset(message.offset) - statsd?.increment('completed_scheduled_task', { taskType: task.taskType }) - } catch (error) { - if (error instanceof DependencyUnavailableError) { - // For errors relating to PostHog dependencies that are unavailable, - // e.g. Postgres, Kafka, Redis, we don't want to log the error to Sentry - // but rather bubble this up the stack for someone else to decide on - // what to do with it. - status.warn('⚠️', `dependency_unavailable`, { - taskType: task.taskType, - pluginConfigId: task.pluginConfigId, - error: error, - stack: error.stack, - }) - statsd?.increment('retriable_scheduled_task', { taskType: task.taskType }) - throw error - } - - status.error('⚠️', 'scheduled_task_failed', { - taskType: task.taskType, - pluginConfigId: task.pluginConfigId, - error: error, - stack: error.stack, - }) - statsd?.increment('failed_scheduled_tasks', { taskType: task.taskType }) - } - - // After processing each message, we need to heartbeat to ensure - // we don't get kicked out of the group. Note that although we call - // this for each message, it's actually a no-op if we're not over - // the heartbeatInterval. - await heartbeat() - } - - status.info('✅', 'Processed batch', { size: batch.messages.length }) - } - - await consumer.connect() - await consumer.subscribe({ topic: KAFKA_SCHEDULED_TASKS }) - await consumer.run({ - eachBatch: async (payload) => { - return await instrumentEachBatch(KAFKA_SCHEDULED_TASKS, eachBatch, payload, statsd) - }, - }) - - return consumer -} diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index e4de5058066..329ca2e8468 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -23,7 +23,6 @@ import { startAnonymousEventBufferConsumer } from './ingestion-queues/anonymous- import { startJobsConsumer } from './ingestion-queues/jobs-consumer' import { IngestionConsumer } from './ingestion-queues/kafka-queue' import { startQueues } from './ingestion-queues/queue' -import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer' import { createHttpServer } from './services/http-server' import { createMmdbServer, performMmdbStalenessCheck, prepareMmdb } from './services/mmdb' @@ -86,7 +85,6 @@ export async function startPluginsServer( // meantime. let bufferConsumer: Consumer | undefined let jobsConsumer: Consumer | undefined - let schedulerTasksConsumer: Consumer | undefined let httpServer: Server | undefined // healthcheck server let mmdbServer: net.Server | undefined // geoip server @@ -121,7 +119,6 @@ export async function startPluginsServer( graphileWorker?.stop(), bufferConsumer?.disconnect(), jobsConsumer?.disconnect(), - schedulerTasksConsumer?.disconnect(), ]) await new Promise((resolve, reject) => @@ -238,15 +235,6 @@ export async function startPluginsServer( await graphileWorker.connectProducer() await startGraphileWorker(hub, graphileWorker, piscina) - if (hub.capabilities.pluginScheduledTasks) { - schedulerTasksConsumer = await startScheduledTasksConsumer({ - piscina: piscina, - kafka: hub.kafka, - producer: hub.kafkaProducer.producer, - statsd: hub.statsd, - }) - } - if (hub.capabilities.processPluginJobs) { jobsConsumer = await startJobsConsumer({ kafka: hub.kafka, diff --git a/plugin-server/tests/main/jobs/schedule.test.ts b/plugin-server/tests/main/jobs/schedule.test.ts index a809cd988e6..8341a8ebe75 100644 --- a/plugin-server/tests/main/jobs/schedule.test.ts +++ b/plugin-server/tests/main/jobs/schedule.test.ts @@ -1,9 +1,5 @@ -import { Producer } from 'kafkajs' - -import { KAFKA_SCHEDULED_TASKS } from '../../../src/config/kafka-topics' import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule' import { Hub } from '../../../src/types' -import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper' import { UUID } from '../../../src/utils/utils' import { PromiseManager } from '../../../src/worker/vm/promise-manager' @@ -19,6 +15,10 @@ describe('Graphile Worker schedule', () => { }) test('runScheduledTasks()', async () => { + const mockPiscina = { + run: jest.fn(), + } + const mockHubWithPluginSchedule = { ...mockHub, pluginSchedule: { @@ -26,126 +26,25 @@ describe('Graphile Worker schedule', () => { runEveryHour: [4, 5, 6], runEveryDay: [7, 8, 9], }, - kafkaProducer: { - producer: { - send: jest.fn(), - } as unknown as Producer, - } as KafkaProducerWrapper, } - await runScheduledTasks(mockHubWithPluginSchedule, 'runEveryMinute', { job: { run_at: new Date() } } as any) + await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'iDontExist') + expect(mockPiscina.run).not.toHaveBeenCalled() - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(1, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '1', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 1, - }), - }, - ], - }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(2, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '2', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 2, - }), - }, - ], - }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(3, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '3', - value: JSON.stringify({ - taskType: 'runEveryMinute', - pluginConfigId: 3, - }), - }, - ], - }) + await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryMinute') - await runScheduledTasks(mockHubWithPluginSchedule, 'runEveryHour', { job: { run_at: new Date() } } as any) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(4, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '4', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 4, - }), - }, - ], - }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(5, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '5', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 5, - }), - }, - ], - }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(6, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '6', - value: JSON.stringify({ - taskType: 'runEveryHour', - pluginConfigId: 6, - }), - }, - ], - }) + expect(mockPiscina.run).toHaveBeenNthCalledWith(1, { args: { pluginConfigId: 1 }, task: 'runEveryMinute' }) + expect(mockPiscina.run).toHaveBeenNthCalledWith(2, { args: { pluginConfigId: 2 }, task: 'runEveryMinute' }) + expect(mockPiscina.run).toHaveBeenNthCalledWith(3, { args: { pluginConfigId: 3 }, task: 'runEveryMinute' }) - await runScheduledTasks(mockHubWithPluginSchedule, 'runEveryDay', { job: { run_at: new Date() } } as any) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(7, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '7', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 7, - }), - }, - ], - }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(8, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '8', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 8, - }), - }, - ], - }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(9, { - topic: KAFKA_SCHEDULED_TASKS, - messages: [ - { - key: '9', - value: JSON.stringify({ - taskType: 'runEveryDay', - pluginConfigId: 9, - }), - }, - ], - }) + await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour') + expect(mockPiscina.run).toHaveBeenNthCalledWith(4, { args: { pluginConfigId: 4 }, task: 'runEveryHour' }) + expect(mockPiscina.run).toHaveBeenNthCalledWith(5, { args: { pluginConfigId: 5 }, task: 'runEveryHour' }) + expect(mockPiscina.run).toHaveBeenNthCalledWith(6, { args: { pluginConfigId: 6 }, task: 'runEveryHour' }) + + await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay') + expect(mockPiscina.run).toHaveBeenNthCalledWith(7, { args: { pluginConfigId: 7 }, task: 'runEveryDay' }) + expect(mockPiscina.run).toHaveBeenNthCalledWith(8, { args: { pluginConfigId: 8 }, task: 'runEveryDay' }) + expect(mockPiscina.run).toHaveBeenNthCalledWith(9, { args: { pluginConfigId: 9 }, task: 'runEveryDay' }) }) })