0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-25 11:17:50 +01:00

Revert "Revert "Revert "fix(plugin-server): ignore old cron tasks from graphile-worker """ (#13107)

Revert "Revert "Revert "fix(plugin-server): ignore old cron tasks from graphile-worker "" (#13100)"

This reverts commit 8eec4c9346.
This commit is contained in:
Harry Waye 2022-12-03 00:13:27 +00:00 committed by GitHub
parent 8eec4c9346
commit 23db43a0dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 29 additions and 413 deletions

View File

@ -1,120 +0,0 @@
import Redis from 'ioredis'
import { Consumer, Kafka, KafkaMessage, logLevel, Partitioners, Producer } from 'kafkajs'
import { Pool } from 'pg'
import { v4 as uuidv4 } from 'uuid'
import { defaultConfig } from '../src/config/config'
import { delayUntilEventIngested } from '../tests/helpers/clickhouse'
let producer: Producer
let postgres: Pool // NOTE: we use a Pool here but it's probably not necessary, but for instance `insertRow` uses a Pool.
let kafka: Kafka
let redis: Redis.Redis
beforeAll(async () => {
// Setup connections to kafka, clickhouse, and postgres
postgres = new Pool({
connectionString: defaultConfig.DATABASE_URL!,
// We use a pool only for typings sake, but we don't actually need to,
// so set max connections to 1.
max: 1,
})
kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })
producer = kafka.producer({ createPartitioner: Partitioners.DefaultPartitioner })
await producer.connect()
redis = new Redis(defaultConfig.REDIS_URL)
})
afterAll(async () => {
await Promise.all([producer.disconnect(), postgres.end(), redis.disconnect()])
})
// Test out some error cases that we wouldn't be able to handle without
// producing to the jobs queue directly.
let dlq: KafkaMessage[]
let dlqConsumer: Consumer
beforeAll(async () => {
dlq = []
dlqConsumer = kafka.consumer({ groupId: 'scheduled-tasks-consumer-test' })
await dlqConsumer.subscribe({ topic: 'scheduled_tasks_dlq' })
await dlqConsumer.run({
eachMessage: ({ message }) => {
dlq.push(message)
return Promise.resolve()
},
})
})
afterAll(async () => {
await dlqConsumer.disconnect()
})
test.concurrent(`handles empty messages`, async () => {
const key = uuidv4()
await producer.send({
topic: 'scheduled_tasks',
messages: [
{
key: key,
value: null,
},
],
})
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
expect(messages.length).toBe(1)
})
test.concurrent(`handles invalid JSON`, async () => {
const key = uuidv4()
await producer.send({
topic: 'scheduled_tasks',
messages: [
{
key: key,
value: 'invalid json',
},
],
})
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
expect(messages.length).toBe(1)
})
test.concurrent(`handles invalid taskType`, async () => {
const key = uuidv4()
await producer.send({
topic: 'scheduled_tasks',
messages: [
{
key: key,
value: JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 }),
},
],
})
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
expect(messages.length).toBe(1)
})
test.concurrent(`handles invalid pluginConfigId`, async () => {
const key = uuidv4()
await producer.send({
topic: 'scheduled_tasks',
messages: [
{
key: key,
value: JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' }),
},
],
})
const messages = await delayUntilEventIngested(() => dlq.filter((message) => message.key?.toString() === key))
expect(messages.length).toBe(1)
})

View File

@ -19,6 +19,4 @@ export const KAFKA_INGESTION_WARNINGS = `${prefix}clickhouse_ingestion_warnings$
export const KAFKA_APP_METRICS = `${prefix}clickhouse_app_metrics${suffix}`
export const KAFKA_JOBS = `${prefix}jobs${suffix}`
export const KAFKA_JOBS_DLQ = `${prefix}jobs_dlq${suffix}`
export const KAFKA_SCHEDULED_TASKS = `${prefix}scheduled_tasks${suffix}`
export const KAFKA_SCHEDULED_TASKS_DLQ = `${prefix}scheduled_tasks_dlq${suffix}`
export const KAFKA_METRICS_TIME_TO_SEE_DATA = `${prefix}clickhouse_metrics_time_to_see_data${suffix}`

View File

@ -1,13 +1,9 @@
import Piscina from '@posthog/piscina'
import { JobHelpers } from 'graphile-worker'
import { KAFKA_SCHEDULED_TASKS } from '../../config/kafka-topics'
import { Hub, PluginConfigId } from '../../types'
import { status } from '../../utils/status'
import { delay } from '../../utils/utils'
type TaskTypes = 'runEveryMinute' | 'runEveryHour' | 'runEveryDay'
export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000): Promise<Hub['pluginSchedule']> {
let allThreadsReady = false
while (maxIterations--) {
@ -32,30 +28,10 @@ export async function loadPluginSchedule(piscina: Piscina, maxIterations = 2000)
throw new Error('Could not load plugin schedule in time')
}
export async function runScheduledTasks(server: Hub, taskType: TaskTypes, helpers: JobHelpers): Promise<void> {
// If the tasks run_at is older than the grace period, we ignore it. We
// don't want to end up with old tasks being scheduled if we are backed up.
if (new Date(helpers.job.run_at).getTime() < Date.now() - gracePeriodMilliSecondsByTaskType[taskType]) {
status.warn('🔁', 'stale_scheduled_task_skipped', {
taskType: taskType,
runAt: helpers.job.run_at,
})
server.statsd?.increment('skipped_scheduled_tasks', { taskType })
return
}
export async function runScheduledTasks(server: Hub, piscina: Piscina, taskType: string): Promise<void> {
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
await server.kafkaProducer.producer.send({
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
})
server.statsd?.increment('queued_scheduled_task', { taskType })
status.info('⏲️', `Running ${taskType} for plugin config with ID ${pluginConfigId}`)
await piscina.run({ task: taskType, args: { pluginConfigId } })
server.statsd?.increment('completed_scheduled_task', { taskType })
}
}
const gracePeriodMilliSecondsByTaskType = {
runEveryMinute: 60 * 1000,
runEveryHour: 60 * 60 * 1000,
runEveryDay: 24 * 60 * 60 * 1000,
} as const

View File

@ -1,5 +1,5 @@
import Piscina from '@posthog/piscina'
import { CronItem, JobHelpers, TaskList } from 'graphile-worker'
import { CronItem, TaskList } from 'graphile-worker'
import { EnqueuedPluginJob, Hub } from '../../types'
import { status } from '../../utils/status'
@ -52,7 +52,7 @@ export async function startGraphileWorker(hub: Hub, graphileWorker: GraphileWork
jobHandlers = {
...jobHandlers,
...getScheduledTaskHandlers(hub),
...getScheduledTaskHandlers(hub, piscina),
}
status.info('🔄', 'Graphile Worker: set up scheduled task handlers...')
@ -76,11 +76,11 @@ export function getPluginJobHandlers(hub: Hub, graphileWorker: GraphileWorker, p
return pluginJobHandlers
}
export function getScheduledTaskHandlers(hub: Hub): TaskList {
export function getScheduledTaskHandlers(hub: Hub, piscina: Piscina): TaskList {
const scheduledTaskHandlers: TaskList = {
runEveryMinute: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, 'runEveryMinute', helpers),
runEveryHour: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, 'runEveryHour', helpers),
runEveryDay: async (_, helpers: JobHelpers) => await runScheduledTasks(hub, 'runEveryDay', helpers),
runEveryMinute: async () => await runScheduledTasks(hub, piscina, 'runEveryMinute'),
runEveryHour: async () => await runScheduledTasks(hub, piscina, 'runEveryHour'),
runEveryDay: async () => await runScheduledTasks(hub, piscina, 'runEveryDay'),
}
return scheduledTaskHandlers

View File

@ -1,125 +0,0 @@
import Piscina from '@posthog/piscina'
import { StatsD } from 'hot-shots'
import { EachBatchHandler, Kafka, Producer } from 'kafkajs'
import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics'
import { DependencyUnavailableError } from '../../utils/db/error'
import { status } from '../../utils/status'
import { instrumentEachBatch, setupEventHandlers } from './kafka-queue'
// The valid task types that can be scheduled.
// TODO: not sure if there is another place that defines these but it would be
// good to unify.
const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const
export const startScheduledTasksConsumer = async ({
kafka,
piscina,
producer,
statsd,
}: {
kafka: Kafka
piscina: Piscina
producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic
statsd?: StatsD
}) => {
/*
Consumes from the scheduled tasks topic, and executes them within a
Piscina worker.
*/
const consumer = kafka.consumer({ groupId: 'scheduled-tasks-runner' })
setupEventHandlers(consumer)
status.info('🔁', 'Starting scheduled tasks consumer')
const eachBatch: EachBatchHandler = async ({ batch, resolveOffset, heartbeat }) => {
status.debug('🔁', 'Processing batch', { size: batch.messages.length })
for (const message of batch.messages) {
if (!message.value) {
status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, {
value: message.value,
})
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
resolveOffset(message.offset)
continue
}
let task: {
taskType: typeof taskTypes[number]
pluginConfigId: number
}
try {
task = JSON.parse(message.value.toString())
} catch (error) {
status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, {
error: error.stack ?? error,
})
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
resolveOffset(message.offset)
continue
}
if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
resolveOffset(message.offset)
continue
}
status.debug('⬆️', 'Running scheduled task', task)
try {
status.info('⏲️', 'running_scheduled_task', {
taskType: task.taskType,
pluginConfigId: task.pluginConfigId,
})
await piscina.run({ task: task.taskType, args: { pluginConfigId: task.pluginConfigId } })
resolveOffset(message.offset)
statsd?.increment('completed_scheduled_task', { taskType: task.taskType })
} catch (error) {
if (error instanceof DependencyUnavailableError) {
// For errors relating to PostHog dependencies that are unavailable,
// e.g. Postgres, Kafka, Redis, we don't want to log the error to Sentry
// but rather bubble this up the stack for someone else to decide on
// what to do with it.
status.warn('⚠️', `dependency_unavailable`, {
taskType: task.taskType,
pluginConfigId: task.pluginConfigId,
error: error,
stack: error.stack,
})
statsd?.increment('retriable_scheduled_task', { taskType: task.taskType })
throw error
}
status.error('⚠️', 'scheduled_task_failed', {
taskType: task.taskType,
pluginConfigId: task.pluginConfigId,
error: error,
stack: error.stack,
})
statsd?.increment('failed_scheduled_tasks', { taskType: task.taskType })
}
// After processing each message, we need to heartbeat to ensure
// we don't get kicked out of the group. Note that although we call
// this for each message, it's actually a no-op if we're not over
// the heartbeatInterval.
await heartbeat()
}
status.info('✅', 'Processed batch', { size: batch.messages.length })
}
await consumer.connect()
await consumer.subscribe({ topic: KAFKA_SCHEDULED_TASKS })
await consumer.run({
eachBatch: async (payload) => {
return await instrumentEachBatch(KAFKA_SCHEDULED_TASKS, eachBatch, payload, statsd)
},
})
return consumer
}

View File

@ -23,7 +23,6 @@ import { startAnonymousEventBufferConsumer } from './ingestion-queues/anonymous-
import { startJobsConsumer } from './ingestion-queues/jobs-consumer'
import { IngestionConsumer } from './ingestion-queues/kafka-queue'
import { startQueues } from './ingestion-queues/queue'
import { startScheduledTasksConsumer } from './ingestion-queues/scheduled-tasks-consumer'
import { createHttpServer } from './services/http-server'
import { createMmdbServer, performMmdbStalenessCheck, prepareMmdb } from './services/mmdb'
@ -86,7 +85,6 @@ export async function startPluginsServer(
// meantime.
let bufferConsumer: Consumer | undefined
let jobsConsumer: Consumer | undefined
let schedulerTasksConsumer: Consumer | undefined
let httpServer: Server | undefined // healthcheck server
let mmdbServer: net.Server | undefined // geoip server
@ -121,7 +119,6 @@ export async function startPluginsServer(
graphileWorker?.stop(),
bufferConsumer?.disconnect(),
jobsConsumer?.disconnect(),
schedulerTasksConsumer?.disconnect(),
])
await new Promise<void>((resolve, reject) =>
@ -238,15 +235,6 @@ export async function startPluginsServer(
await graphileWorker.connectProducer()
await startGraphileWorker(hub, graphileWorker, piscina)
if (hub.capabilities.pluginScheduledTasks) {
schedulerTasksConsumer = await startScheduledTasksConsumer({
piscina: piscina,
kafka: hub.kafka,
producer: hub.kafkaProducer.producer,
statsd: hub.statsd,
})
}
if (hub.capabilities.processPluginJobs) {
jobsConsumer = await startJobsConsumer({
kafka: hub.kafka,

View File

@ -1,9 +1,5 @@
import { Producer } from 'kafkajs'
import { KAFKA_SCHEDULED_TASKS } from '../../../src/config/kafka-topics'
import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule'
import { Hub } from '../../../src/types'
import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper'
import { UUID } from '../../../src/utils/utils'
import { PromiseManager } from '../../../src/worker/vm/promise-manager'
@ -19,6 +15,10 @@ describe('Graphile Worker schedule', () => {
})
test('runScheduledTasks()', async () => {
const mockPiscina = {
run: jest.fn(),
}
const mockHubWithPluginSchedule = {
...mockHub,
pluginSchedule: {
@ -26,126 +26,25 @@ describe('Graphile Worker schedule', () => {
runEveryHour: [4, 5, 6],
runEveryDay: [7, 8, 9],
},
kafkaProducer: {
producer: {
send: jest.fn(),
} as unknown as Producer,
} as KafkaProducerWrapper,
}
await runScheduledTasks(mockHubWithPluginSchedule, 'runEveryMinute', { job: { run_at: new Date() } } as any)
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'iDontExist')
expect(mockPiscina.run).not.toHaveBeenCalled()
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(1, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '1',
value: JSON.stringify({
taskType: 'runEveryMinute',
pluginConfigId: 1,
}),
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(2, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '2',
value: JSON.stringify({
taskType: 'runEveryMinute',
pluginConfigId: 2,
}),
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(3, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '3',
value: JSON.stringify({
taskType: 'runEveryMinute',
pluginConfigId: 3,
}),
},
],
})
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryMinute')
await runScheduledTasks(mockHubWithPluginSchedule, 'runEveryHour', { job: { run_at: new Date() } } as any)
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(4, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '4',
value: JSON.stringify({
taskType: 'runEveryHour',
pluginConfigId: 4,
}),
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(5, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '5',
value: JSON.stringify({
taskType: 'runEveryHour',
pluginConfigId: 5,
}),
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(6, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '6',
value: JSON.stringify({
taskType: 'runEveryHour',
pluginConfigId: 6,
}),
},
],
})
expect(mockPiscina.run).toHaveBeenNthCalledWith(1, { args: { pluginConfigId: 1 }, task: 'runEveryMinute' })
expect(mockPiscina.run).toHaveBeenNthCalledWith(2, { args: { pluginConfigId: 2 }, task: 'runEveryMinute' })
expect(mockPiscina.run).toHaveBeenNthCalledWith(3, { args: { pluginConfigId: 3 }, task: 'runEveryMinute' })
await runScheduledTasks(mockHubWithPluginSchedule, 'runEveryDay', { job: { run_at: new Date() } } as any)
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(7, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '7',
value: JSON.stringify({
taskType: 'runEveryDay',
pluginConfigId: 7,
}),
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(8, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '8',
value: JSON.stringify({
taskType: 'runEveryDay',
pluginConfigId: 8,
}),
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(9, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
key: '9',
value: JSON.stringify({
taskType: 'runEveryDay',
pluginConfigId: 9,
}),
},
],
})
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour')
expect(mockPiscina.run).toHaveBeenNthCalledWith(4, { args: { pluginConfigId: 4 }, task: 'runEveryHour' })
expect(mockPiscina.run).toHaveBeenNthCalledWith(5, { args: { pluginConfigId: 5 }, task: 'runEveryHour' })
expect(mockPiscina.run).toHaveBeenNthCalledWith(6, { args: { pluginConfigId: 6 }, task: 'runEveryHour' })
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay')
expect(mockPiscina.run).toHaveBeenNthCalledWith(7, { args: { pluginConfigId: 7 }, task: 'runEveryDay' })
expect(mockPiscina.run).toHaveBeenNthCalledWith(8, { args: { pluginConfigId: 8 }, task: 'runEveryDay' })
expect(mockPiscina.run).toHaveBeenNthCalledWith(9, { args: { pluginConfigId: 9 }, task: 'runEveryDay' })
})
})