diff --git a/plugin-server/src/main/graphile-worker/graphile-worker.ts b/plugin-server/src/main/graphile-worker/graphile-worker.ts index 041819a8fa8..02a4d028f75 100644 --- a/plugin-server/src/main/graphile-worker/graphile-worker.ts +++ b/plugin-server/src/main/graphile-worker/graphile-worker.ts @@ -13,7 +13,6 @@ import { Pool } from 'pg' import { EnqueuedJob, Hub } from '../../types' import { instrument } from '../../utils/metrics' -import { runRetriableFunction } from '../../utils/retries' import { status } from '../../utils/status' import { createPostgresPool } from '../../utils/utils' import { graphileEnqueueJobCounter } from './metrics' @@ -60,12 +59,7 @@ export class GraphileWorker { await this.migrate() } - async enqueue( - jobName: string, - job: EnqueuedJob, - instrumentationContext?: InstrumentationContext, - retryOnFailure = false - ): Promise { + async enqueue(jobName: string, job: EnqueuedJob, instrumentationContext?: InstrumentationContext): Promise { const jobType = 'type' in job ? job.type : 'buffer' let jobPayload: Record = {} @@ -73,23 +67,7 @@ export class GraphileWorker { jobPayload = job.payload } - let enqueueFn = () => this._enqueue(jobName, job) - - // This branch will be removed once we implement a Kafka queue for all jobs - // as we've done for buffer events (see e.g. anonymous-event-buffer-consumer.ts) - if (retryOnFailure) { - enqueueFn = () => - runRetriableFunction({ - hub: this.hub, - metricName: `job_queues_enqueue_${jobName}`, - maxAttempts: 10, - retryBaseMs: 6000, - retryMultiplier: 2, - tryFn: async () => this._enqueue(jobName, job), - catchFn: () => status.error('🔴', 'Exhausted attempts to enqueue job.'), - payload: job, - }) - } + const enqueueFn = () => this._enqueue(jobName, job) await instrument( this.hub.statsd, diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 68bef340d1e..b29ed86d68b 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -31,7 +31,6 @@ import { TeamManager } from './worker/ingestion/team-manager' import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager' import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-manager' import { LazyPluginVM } from './worker/vm/lazy' -import { PromiseManager } from './worker/vm/promise-manager' export { Element } from '@posthog/plugin-scaffold' // Re-export Element from scaffolding, for backwards compat. @@ -263,7 +262,6 @@ export interface Hub extends PluginsServerConfig { organizationManager: OrganizationManager pluginsApiKeyManager: PluginsApiKeyManager rootAccessManager: RootAccessManager - promiseManager: PromiseManager eventsProcessor: EventsProcessor appMetrics: AppMetrics // geoip database, setup in workers diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index a2212628c08..b161de1873f 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -33,7 +33,6 @@ import { status } from '../status' import { createRedisPool, UUIDT } from '../utils' import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-key-manager' import { RootAccessManager } from './../../worker/vm/extensions/helpers/root-acess-manager' -import { PromiseManager } from './../../worker/vm/promise-manager' import { DB } from './db' import { KafkaProducerWrapper } from './kafka-producer-wrapper' import { PostgresRouter } from './postgres' @@ -135,8 +134,6 @@ export async function createHub( status.warn('🪣', `Object storage could not be created`) } - const promiseManager = new PromiseManager(serverConfig) - const db = new DB( postgres, redisPool, @@ -195,7 +192,6 @@ export async function createHub( organizationManager, pluginsApiKeyManager, rootAccessManager, - promiseManager, conversionBufferEnabledTeams, pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true), poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true), diff --git a/plugin-server/src/utils/retries.ts b/plugin-server/src/utils/retries.ts index fe7fe22f932..8107f5af0f0 100644 --- a/plugin-server/src/utils/retries.ts +++ b/plugin-server/src/utils/retries.ts @@ -1,9 +1,4 @@ -import { RetryError } from '@posthog/plugin-scaffold' - -import { runInTransaction } from '../sentry' -import { Hub } from '../types' import { status } from '../utils/status' -import { AppMetricIdentifier, ErrorWithContext } from '../worker/ingestion/app-metrics' import { sleep } from './utils' // Simple retries in our code @@ -39,116 +34,6 @@ export function getNextRetryMs(baseMs: number, multiplier: number, attempt: numb return baseMs * multiplier ** (attempt - 1) } -export interface RetriableFunctionDefinition { - payload: Record - tryFn: () => void | Promise - catchFn?: (error: Error | RetryError) => void | Promise - finallyFn?: (attempts: number) => void | Promise -} - -export interface RetryParams { - maxAttempts: number - retryBaseMs: number - retryMultiplier: number -} - -export interface MetricsDefinition { - metricName: string - appMetric?: AppMetricIdentifier - appMetricErrorContext?: Omit -} - -export type RetriableFunctionPayload = RetriableFunctionDefinition & - Partial & - MetricsDefinition & { hub: Hub } - -function iterateRetryLoop(retriableFunctionPayload: RetriableFunctionPayload, attempt = 1): Promise { - const { - metricName, - hub, - payload, - tryFn, - catchFn, - finallyFn, - maxAttempts = process.env.PLUGINS_RETRY_ATTEMPTS ? parseInt(process.env.PLUGINS_RETRY_ATTEMPTS) : 3, - retryBaseMs = 3000, - retryMultiplier = 2, - appMetric, - appMetricErrorContext, - } = retriableFunctionPayload - return runInTransaction( - { - name: 'retryLoop', - op: metricName, - description: '?', - data: { - metricName, - payload, - attempt, - }, - }, - async () => { - let nextIterationPromise: Promise | undefined - try { - await tryFn() - if (appMetric) { - await hub.appMetrics.queueMetric({ - ...appMetric, - successes: attempt == 1 ? 1 : 0, - successesOnRetry: attempt == 1 ? 0 : 1, - }) - } - } catch (error) { - if (error instanceof RetryError) { - error._attempt = attempt - error._maxAttempts = maxAttempts - } - if (error instanceof RetryError && attempt < maxAttempts) { - const nextRetryMs = getNextRetryMs(retryBaseMs, retryMultiplier, attempt) - nextIterationPromise = new Promise((resolve, reject) => - setTimeout(() => { - // This is not awaited directly so that attempts beyond the first one don't stall the payload queue - iterateRetryLoop(retriableFunctionPayload, attempt + 1) - .then(resolve) - .catch(reject) - }, nextRetryMs) - ) - hub.promiseManager.trackPromise(nextIterationPromise, 'retries') - await hub.promiseManager.awaitPromisesIfNeeded() - } else { - await catchFn?.(error) - if (appMetric) { - await hub.appMetrics.queueError( - { - ...appMetric, - failures: 1, - }, - { - error, - ...appMetricErrorContext, - } - ) - } - } - } - if (!nextIterationPromise) { - await finallyFn?.(attempt) - } - } - ) -} - -/** Run function with `RetryError` handling. */ -export async function runRetriableFunction(retriableFunctionPayload: RetriableFunctionPayload): Promise { - const { finallyFn } = retriableFunctionPayload - await iterateRetryLoop({ - ...retriableFunctionPayload, - finallyFn: async (attempts) => { - await finallyFn?.(attempts) - }, - }) -} - /** * Retry a function, respecting `error.isRetriable`. */ diff --git a/plugin-server/src/worker/vm/promise-manager.ts b/plugin-server/src/worker/vm/promise-manager.ts deleted file mode 100644 index d6825b9efb3..00000000000 --- a/plugin-server/src/worker/vm/promise-manager.ts +++ /dev/null @@ -1,35 +0,0 @@ -import { PluginsServerConfig } from '../../types' -import { status } from '../../utils/status' - -export class PromiseManager { - pendingPromises: Set> - config: PluginsServerConfig - - constructor(config: PluginsServerConfig) { - this.pendingPromises = new Set() - this.config = config - } - - public trackPromise(promise: Promise, key: string): void { - if (typeof promise === 'undefined') { - return - } - - status.info('🤝', `Tracking promise ${key} count = ${this.pendingPromises.size}`) - this.pendingPromises.add(promise) - - promise.finally(() => { - this.pendingPromises.delete(promise) - }) - status.info('✅', `Tracking promise finished ${key}`) - } - - public async awaitPromisesIfNeeded(): Promise { - const startTime = performance.now() - while (this.pendingPromises.size > this.config.MAX_PENDING_PROMISES_PER_WORKER) { - status.info('🤝', `looping in awaitPromise since ${startTime} count = ${this.pendingPromises.size}`) - await Promise.race(this.pendingPromises) - } - status.info('🕐', `Finished awaiting promises ${performance.now() - startTime}`) - } -} diff --git a/plugin-server/tests/main/jobs/graphile-worker.test.ts b/plugin-server/tests/main/jobs/graphile-worker.test.ts index 081faeac01c..f46817606a0 100644 --- a/plugin-server/tests/main/jobs/graphile-worker.test.ts +++ b/plugin-server/tests/main/jobs/graphile-worker.test.ts @@ -1,8 +1,6 @@ import { GraphileWorker } from '../../../src/main/graphile-worker/graphile-worker' import { EnqueuedJob, Hub, JobName } from '../../../src/types' -import { runRetriableFunction } from '../../../src/utils/retries' import { UUID } from '../../../src/utils/utils' -import { PromiseManager } from '../../../src/worker/vm/promise-manager' jest.mock('../../../src/utils/retries') jest.mock('../../../src/utils/status') @@ -20,7 +18,6 @@ jest.mock('graphile-worker', () => { const mockHub: Hub = { instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'), - promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any), JOB_QUEUES: 'fs', } as Hub @@ -36,22 +33,8 @@ describe('graphileWorker', () => { jest.spyOn(graphileWorker, '_enqueue').mockImplementation(() => Promise.resolve()) await graphileWorker.enqueue(JobName.PLUGIN_JOB, { type: 'foo' } as EnqueuedJob) - expect(runRetriableFunction).not.toHaveBeenCalled() expect(graphileWorker._enqueue).toHaveBeenCalledWith(JobName.PLUGIN_JOB, { type: 'foo' }) }) - - it('calls runRetriableFunction with the correct parameters if retryOnFailure=true', async () => { - jest.spyOn(graphileWorker, '_enqueue').mockImplementation(() => Promise.resolve()) - await graphileWorker.enqueue(JobName.PLUGIN_JOB, { type: 'foo' } as EnqueuedJob, undefined, true) - expect(runRetriableFunction).toHaveBeenCalled() - const runRetriableFunctionArgs = jest.mocked(runRetriableFunction).mock.calls[0][0] - - expect(runRetriableFunctionArgs.metricName).toEqual('job_queues_enqueue_pluginJob') - expect(runRetriableFunctionArgs.payload).toEqual({ type: 'foo' }) - expect(runRetriableFunctionArgs.tryFn).not.toBeUndefined() - expect(runRetriableFunctionArgs.catchFn).not.toBeUndefined() - expect(runRetriableFunctionArgs.finallyFn).toBeUndefined() - }) }) describe('syncState()', () => { diff --git a/plugin-server/tests/main/jobs/schedule.test.ts b/plugin-server/tests/main/jobs/schedule.test.ts index 6c280d5e96b..150d171f97d 100644 --- a/plugin-server/tests/main/jobs/schedule.test.ts +++ b/plugin-server/tests/main/jobs/schedule.test.ts @@ -3,11 +3,9 @@ import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule' import { Hub } from '../../../src/types' import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper' import { UUID } from '../../../src/utils/utils' -import { PromiseManager } from '../../../src/worker/vm/promise-manager' const mockHub: Hub = { instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'), - promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any), JOB_QUEUES: 'fs', } as Hub diff --git a/plugin-server/tests/utils/retries.test.ts b/plugin-server/tests/utils/retries.test.ts index 15193e8ad82..6bd6e8b40be 100644 --- a/plugin-server/tests/utils/retries.test.ts +++ b/plugin-server/tests/utils/retries.test.ts @@ -1,39 +1,8 @@ -import { ProcessedPluginEvent, RetryError } from '@posthog/plugin-scaffold' - -import { Hub } from '../../src/types' -import { getNextRetryMs, runRetriableFunction } from '../../src/utils/retries' -import { UUID } from '../../src/utils/utils' -import { AppMetricIdentifier } from '../../src/worker/ingestion/app-metrics' -import { PromiseManager } from '../../src/worker/vm/promise-manager' +import { getNextRetryMs } from '../../src/utils/retries' jest.useFakeTimers() jest.spyOn(global, 'setTimeout') -const mockHub: Hub = { - instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'), - promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any), - appMetrics: { - queueMetric: jest.fn(), - queueError: jest.fn(), - }, -} as unknown as Hub - -const testEvent: ProcessedPluginEvent = { - uuid: '4CCCB5FD-BD27-4D6C-8737-88EB7294C437', - distinct_id: 'my_id', - ip: '127.0.0.1', - team_id: 3, - timestamp: '2023-04-01T00:00:00.000Z', - event: 'default event', - properties: {}, -} - -const appMetric: AppMetricIdentifier = { - teamId: 2, - pluginConfigId: 3, - category: 'processEvent', -} - describe('getNextRetryMs', () => { it('returns the correct number of milliseconds with a multiplier of 1', () => { expect(getNextRetryMs(500, 1, 1)).toBe(500) @@ -56,175 +25,3 @@ describe('getNextRetryMs', () => { expect(() => getNextRetryMs(4000, 2, -1)).toThrowError('Attempts are indexed starting with 1') }) }) - -describe('runRetriableFunction', () => { - it('runs the function once if it resolves on first try', async () => { - const tryFn = jest.fn().mockResolvedValue('Guten Abend') - const catchFn = jest.fn() - const finallyFn = jest.fn() - - const promise = new Promise((resolve) => { - finallyFn.mockImplementation((attempt: number) => resolve(attempt)) - void runRetriableFunction({ - metricName: 'plugin.on_foo', - hub: mockHub, - payload: testEvent, - tryFn, - catchFn, - finallyFn, - appMetric, - }) - }) - jest.runAllTimers() - - await expect(promise).resolves.toEqual(1) - expect(tryFn).toHaveBeenCalledTimes(1) - expect(catchFn).toHaveBeenCalledTimes(0) - expect(finallyFn).toHaveBeenCalledTimes(1) - expect(setTimeout).not.toHaveBeenCalled() - expect(mockHub.appMetrics.queueMetric).toHaveBeenCalledWith({ - ...appMetric, - successes: 1, - successesOnRetry: 0, - }) - }) - - it('catches non-RetryError error', async () => { - const tryFn = jest.fn().mockImplementation(() => { - // Faulty plugin code might look like this - let bar - bar.baz = 123 - }) - const catchFn = jest.fn() - const finallyFn = jest.fn() - - const promise = new Promise((resolve) => { - finallyFn.mockImplementation((attempt: number) => resolve(attempt)) - void runRetriableFunction({ - metricName: 'plugin.on_foo', - hub: mockHub, - payload: testEvent, - tryFn, - catchFn, - finallyFn, - appMetric, - appMetricErrorContext: { - event: testEvent, - }, - }) - }) - jest.runAllTimers() - - await expect(promise).resolves.toEqual(1) - expect(tryFn).toHaveBeenCalledTimes(1) - expect(catchFn).toHaveBeenCalledTimes(1) - expect(catchFn).toHaveBeenCalledWith(expect.any(TypeError)) - expect(finallyFn).toHaveBeenCalledTimes(1) - expect(setTimeout).not.toHaveBeenCalled() - expect(mockHub.appMetrics.queueError).toHaveBeenCalledWith( - { - ...appMetric, - failures: 1, - }, - { - error: expect.any(TypeError), - event: testEvent, - } - ) - }) - - it('catches RetryError error and retries up to 3 times', async () => { - const tryFn = jest.fn().mockImplementation(() => { - throw new RetryError() - }) - const catchFn = jest.fn() - const finallyFn = jest.fn() - - const promise = new Promise((resolve) => { - finallyFn.mockImplementation((attempt: number) => resolve(attempt)) - void runRetriableFunction({ - metricName: 'plugin.on_foo', - hub: mockHub, - payload: testEvent, - tryFn, - catchFn, - finallyFn, - appMetric, - appMetricErrorContext: { - event: testEvent, - }, - }) - }) - - expect(tryFn).toHaveBeenCalledTimes(1) - expect(finallyFn).toHaveBeenCalledTimes(0) - expect(setTimeout).toHaveBeenCalledTimes(1) - - jest.runAllTimers() - - await expect(promise).resolves.toEqual(3) - expect(tryFn).toHaveBeenCalledTimes(3) - expect(catchFn).toHaveBeenCalledTimes(1) - expect(catchFn).toHaveBeenCalledWith(expect.any(RetryError)) - expect(finallyFn).toHaveBeenCalledTimes(1) - expect(setTimeout).toHaveBeenCalledTimes(2) - expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), 3_000) - expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), 6_000) - expect(mockHub.appMetrics.queueError).toHaveBeenCalledWith( - { - ...appMetric, - failures: 1, - }, - { - error: expect.any(RetryError), - event: testEvent, - } - ) - }) - - it('catches RetryError error and allow the function to succeed on 3rd attempt', async () => { - const tryFn = jest - .fn() - .mockImplementationOnce(() => { - throw new RetryError() - }) - .mockImplementationOnce(() => { - throw new RetryError() - }) - .mockResolvedValue('Gute Nacht') - const catchFn = jest.fn() - const finallyFn = jest.fn() - - const promise = new Promise((resolve) => { - finallyFn.mockImplementation((attempt: number) => resolve(attempt)) - void runRetriableFunction({ - metricName: 'plugin.on_foo', - hub: mockHub, - payload: testEvent, - tryFn, - catchFn, - finallyFn, - appMetric, - }) - }) - - expect(tryFn).toHaveBeenCalledTimes(1) - expect(finallyFn).toHaveBeenCalledTimes(0) - expect(setTimeout).toHaveBeenCalledTimes(1) - - jest.runAllTimers() - - await expect(promise).resolves.toEqual(3) - expect(tryFn).toHaveBeenCalledTimes(3) - expect(catchFn).toHaveBeenCalledTimes(0) - expect(finallyFn).toHaveBeenCalledTimes(1) - expect(setTimeout).toHaveBeenCalledTimes(2) - expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), 3_000) - expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), 6_000) - expect(mockHub.appMetrics.queueMetric).toHaveBeenCalledWith({ - ...appMetric, - successes: 0, - successesOnRetry: 1, - }) - }) -}) diff --git a/plugin-server/tests/worker/buffer.test.ts b/plugin-server/tests/worker/buffer.test.ts deleted file mode 100644 index 90100a2bac4..00000000000 --- a/plugin-server/tests/worker/buffer.test.ts +++ /dev/null @@ -1,39 +0,0 @@ -import { delay } from '../../src/utils/utils' -import { PromiseManager } from '../../src/worker/vm/promise-manager' - -jest.setTimeout(100000) - -describe('PromiseManager', () => { - let promiseManager: PromiseManager - - beforeEach(() => { - promiseManager = new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any) - }) - - afterEach(async () => { - await Promise.all(promiseManager.pendingPromises) - }) - - test('promise manager awaits promises if above limit', async () => { - const hello = jest.fn() - const promise = async () => { - await delay(3000) - hello() - } - - // we track the promise but don't await it - promiseManager.trackPromise(promise()) - expect(promiseManager.pendingPromises.size).toEqual(1) - expect(hello).not.toHaveBeenCalled() - - // we add another promise above the limit - promiseManager.trackPromise(promise()) - expect(promiseManager.pendingPromises.size).toEqual(2) - expect(hello).not.toHaveBeenCalled() - - // we chop one promise off by awaiting it - await promiseManager.awaitPromisesIfNeeded() - expect(hello).toHaveBeenCalled() - expect(promiseManager.pendingPromises.size).toEqual(1) - }) -}) diff --git a/plugin-server/tests/worker/ingestion/utils.test.ts b/plugin-server/tests/worker/ingestion/utils.test.ts index 3e65b096459..6144c68fa24 100644 --- a/plugin-server/tests/worker/ingestion/utils.test.ts +++ b/plugin-server/tests/worker/ingestion/utils.test.ts @@ -25,7 +25,6 @@ describe('captureIngestionWarning()', () => { it('can read own writes', async () => { await captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' }) - await hub.promiseManager.awaitPromisesIfNeeded() const warnings = await delayUntilEventIngested(fetchWarnings) expect(warnings).toEqual([