0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-25 02:49:32 +01:00

chore: Nuke promiseManager (#19094)

This commit is contained in:
Tiina Turban 2023-12-05 16:47:41 +01:00 committed by GitHub
parent f0aeefbb38
commit 8f81c31d21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 3 additions and 443 deletions

View File

@ -13,7 +13,6 @@ import { Pool } from 'pg'
import { EnqueuedJob, Hub } from '../../types'
import { instrument } from '../../utils/metrics'
import { runRetriableFunction } from '../../utils/retries'
import { status } from '../../utils/status'
import { createPostgresPool } from '../../utils/utils'
import { graphileEnqueueJobCounter } from './metrics'
@ -60,12 +59,7 @@ export class GraphileWorker {
await this.migrate()
}
async enqueue(
jobName: string,
job: EnqueuedJob,
instrumentationContext?: InstrumentationContext,
retryOnFailure = false
): Promise<void> {
async enqueue(jobName: string, job: EnqueuedJob, instrumentationContext?: InstrumentationContext): Promise<void> {
const jobType = 'type' in job ? job.type : 'buffer'
let jobPayload: Record<string, any> = {}
@ -73,23 +67,7 @@ export class GraphileWorker {
jobPayload = job.payload
}
let enqueueFn = () => this._enqueue(jobName, job)
// This branch will be removed once we implement a Kafka queue for all jobs
// as we've done for buffer events (see e.g. anonymous-event-buffer-consumer.ts)
if (retryOnFailure) {
enqueueFn = () =>
runRetriableFunction({
hub: this.hub,
metricName: `job_queues_enqueue_${jobName}`,
maxAttempts: 10,
retryBaseMs: 6000,
retryMultiplier: 2,
tryFn: async () => this._enqueue(jobName, job),
catchFn: () => status.error('🔴', 'Exhausted attempts to enqueue job.'),
payload: job,
})
}
const enqueueFn = () => this._enqueue(jobName, job)
await instrument(
this.hub.statsd,

View File

@ -31,7 +31,6 @@ import { TeamManager } from './worker/ingestion/team-manager'
import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-manager'
import { LazyPluginVM } from './worker/vm/lazy'
import { PromiseManager } from './worker/vm/promise-manager'
export { Element } from '@posthog/plugin-scaffold' // Re-export Element from scaffolding, for backwards compat.
@ -263,7 +262,6 @@ export interface Hub extends PluginsServerConfig {
organizationManager: OrganizationManager
pluginsApiKeyManager: PluginsApiKeyManager
rootAccessManager: RootAccessManager
promiseManager: PromiseManager
eventsProcessor: EventsProcessor
appMetrics: AppMetrics
// geoip database, setup in workers

View File

@ -33,7 +33,6 @@ import { status } from '../status'
import { createRedisPool, UUIDT } from '../utils'
import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './../../worker/vm/extensions/helpers/root-acess-manager'
import { PromiseManager } from './../../worker/vm/promise-manager'
import { DB } from './db'
import { KafkaProducerWrapper } from './kafka-producer-wrapper'
import { PostgresRouter } from './postgres'
@ -135,8 +134,6 @@ export async function createHub(
status.warn('🪣', `Object storage could not be created`)
}
const promiseManager = new PromiseManager(serverConfig)
const db = new DB(
postgres,
redisPool,
@ -195,7 +192,6 @@ export async function createHub(
organizationManager,
pluginsApiKeyManager,
rootAccessManager,
promiseManager,
conversionBufferEnabledTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true),

View File

@ -1,9 +1,4 @@
import { RetryError } from '@posthog/plugin-scaffold'
import { runInTransaction } from '../sentry'
import { Hub } from '../types'
import { status } from '../utils/status'
import { AppMetricIdentifier, ErrorWithContext } from '../worker/ingestion/app-metrics'
import { sleep } from './utils'
// Simple retries in our code
@ -39,116 +34,6 @@ export function getNextRetryMs(baseMs: number, multiplier: number, attempt: numb
return baseMs * multiplier ** (attempt - 1)
}
export interface RetriableFunctionDefinition {
payload: Record<string, any>
tryFn: () => void | Promise<void>
catchFn?: (error: Error | RetryError) => void | Promise<void>
finallyFn?: (attempts: number) => void | Promise<void>
}
export interface RetryParams {
maxAttempts: number
retryBaseMs: number
retryMultiplier: number
}
export interface MetricsDefinition {
metricName: string
appMetric?: AppMetricIdentifier
appMetricErrorContext?: Omit<ErrorWithContext, 'error'>
}
export type RetriableFunctionPayload = RetriableFunctionDefinition &
Partial<RetryParams> &
MetricsDefinition & { hub: Hub }
function iterateRetryLoop(retriableFunctionPayload: RetriableFunctionPayload, attempt = 1): Promise<void> {
const {
metricName,
hub,
payload,
tryFn,
catchFn,
finallyFn,
maxAttempts = process.env.PLUGINS_RETRY_ATTEMPTS ? parseInt(process.env.PLUGINS_RETRY_ATTEMPTS) : 3,
retryBaseMs = 3000,
retryMultiplier = 2,
appMetric,
appMetricErrorContext,
} = retriableFunctionPayload
return runInTransaction(
{
name: 'retryLoop',
op: metricName,
description: '?',
data: {
metricName,
payload,
attempt,
},
},
async () => {
let nextIterationPromise: Promise<void> | undefined
try {
await tryFn()
if (appMetric) {
await hub.appMetrics.queueMetric({
...appMetric,
successes: attempt == 1 ? 1 : 0,
successesOnRetry: attempt == 1 ? 0 : 1,
})
}
} catch (error) {
if (error instanceof RetryError) {
error._attempt = attempt
error._maxAttempts = maxAttempts
}
if (error instanceof RetryError && attempt < maxAttempts) {
const nextRetryMs = getNextRetryMs(retryBaseMs, retryMultiplier, attempt)
nextIterationPromise = new Promise((resolve, reject) =>
setTimeout(() => {
// This is not awaited directly so that attempts beyond the first one don't stall the payload queue
iterateRetryLoop(retriableFunctionPayload, attempt + 1)
.then(resolve)
.catch(reject)
}, nextRetryMs)
)
hub.promiseManager.trackPromise(nextIterationPromise, 'retries')
await hub.promiseManager.awaitPromisesIfNeeded()
} else {
await catchFn?.(error)
if (appMetric) {
await hub.appMetrics.queueError(
{
...appMetric,
failures: 1,
},
{
error,
...appMetricErrorContext,
}
)
}
}
}
if (!nextIterationPromise) {
await finallyFn?.(attempt)
}
}
)
}
/** Run function with `RetryError` handling. */
export async function runRetriableFunction(retriableFunctionPayload: RetriableFunctionPayload): Promise<void> {
const { finallyFn } = retriableFunctionPayload
await iterateRetryLoop({
...retriableFunctionPayload,
finallyFn: async (attempts) => {
await finallyFn?.(attempts)
},
})
}
/**
* Retry a function, respecting `error.isRetriable`.
*/

View File

@ -1,35 +0,0 @@
import { PluginsServerConfig } from '../../types'
import { status } from '../../utils/status'
export class PromiseManager {
pendingPromises: Set<Promise<any>>
config: PluginsServerConfig
constructor(config: PluginsServerConfig) {
this.pendingPromises = new Set()
this.config = config
}
public trackPromise(promise: Promise<any>, key: string): void {
if (typeof promise === 'undefined') {
return
}
status.info('🤝', `Tracking promise ${key} count = ${this.pendingPromises.size}`)
this.pendingPromises.add(promise)
promise.finally(() => {
this.pendingPromises.delete(promise)
})
status.info('✅', `Tracking promise finished ${key}`)
}
public async awaitPromisesIfNeeded(): Promise<void> {
const startTime = performance.now()
while (this.pendingPromises.size > this.config.MAX_PENDING_PROMISES_PER_WORKER) {
status.info('🤝', `looping in awaitPromise since ${startTime} count = ${this.pendingPromises.size}`)
await Promise.race(this.pendingPromises)
}
status.info('🕐', `Finished awaiting promises ${performance.now() - startTime}`)
}
}

View File

@ -1,8 +1,6 @@
import { GraphileWorker } from '../../../src/main/graphile-worker/graphile-worker'
import { EnqueuedJob, Hub, JobName } from '../../../src/types'
import { runRetriableFunction } from '../../../src/utils/retries'
import { UUID } from '../../../src/utils/utils'
import { PromiseManager } from '../../../src/worker/vm/promise-manager'
jest.mock('../../../src/utils/retries')
jest.mock('../../../src/utils/status')
@ -20,7 +18,6 @@ jest.mock('graphile-worker', () => {
const mockHub: Hub = {
instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'),
promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any),
JOB_QUEUES: 'fs',
} as Hub
@ -36,22 +33,8 @@ describe('graphileWorker', () => {
jest.spyOn(graphileWorker, '_enqueue').mockImplementation(() => Promise.resolve())
await graphileWorker.enqueue(JobName.PLUGIN_JOB, { type: 'foo' } as EnqueuedJob)
expect(runRetriableFunction).not.toHaveBeenCalled()
expect(graphileWorker._enqueue).toHaveBeenCalledWith(JobName.PLUGIN_JOB, { type: 'foo' })
})
it('calls runRetriableFunction with the correct parameters if retryOnFailure=true', async () => {
jest.spyOn(graphileWorker, '_enqueue').mockImplementation(() => Promise.resolve())
await graphileWorker.enqueue(JobName.PLUGIN_JOB, { type: 'foo' } as EnqueuedJob, undefined, true)
expect(runRetriableFunction).toHaveBeenCalled()
const runRetriableFunctionArgs = jest.mocked(runRetriableFunction).mock.calls[0][0]
expect(runRetriableFunctionArgs.metricName).toEqual('job_queues_enqueue_pluginJob')
expect(runRetriableFunctionArgs.payload).toEqual({ type: 'foo' })
expect(runRetriableFunctionArgs.tryFn).not.toBeUndefined()
expect(runRetriableFunctionArgs.catchFn).not.toBeUndefined()
expect(runRetriableFunctionArgs.finallyFn).toBeUndefined()
})
})
describe('syncState()', () => {

View File

@ -3,11 +3,9 @@ import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule'
import { Hub } from '../../../src/types'
import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper'
import { UUID } from '../../../src/utils/utils'
import { PromiseManager } from '../../../src/worker/vm/promise-manager'
const mockHub: Hub = {
instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'),
promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any),
JOB_QUEUES: 'fs',
} as Hub

View File

@ -1,39 +1,8 @@
import { ProcessedPluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { Hub } from '../../src/types'
import { getNextRetryMs, runRetriableFunction } from '../../src/utils/retries'
import { UUID } from '../../src/utils/utils'
import { AppMetricIdentifier } from '../../src/worker/ingestion/app-metrics'
import { PromiseManager } from '../../src/worker/vm/promise-manager'
import { getNextRetryMs } from '../../src/utils/retries'
jest.useFakeTimers()
jest.spyOn(global, 'setTimeout')
const mockHub: Hub = {
instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'),
promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any),
appMetrics: {
queueMetric: jest.fn(),
queueError: jest.fn(),
},
} as unknown as Hub
const testEvent: ProcessedPluginEvent = {
uuid: '4CCCB5FD-BD27-4D6C-8737-88EB7294C437',
distinct_id: 'my_id',
ip: '127.0.0.1',
team_id: 3,
timestamp: '2023-04-01T00:00:00.000Z',
event: 'default event',
properties: {},
}
const appMetric: AppMetricIdentifier = {
teamId: 2,
pluginConfigId: 3,
category: 'processEvent',
}
describe('getNextRetryMs', () => {
it('returns the correct number of milliseconds with a multiplier of 1', () => {
expect(getNextRetryMs(500, 1, 1)).toBe(500)
@ -56,175 +25,3 @@ describe('getNextRetryMs', () => {
expect(() => getNextRetryMs(4000, 2, -1)).toThrowError('Attempts are indexed starting with 1')
})
})
describe('runRetriableFunction', () => {
it('runs the function once if it resolves on first try', async () => {
const tryFn = jest.fn().mockResolvedValue('Guten Abend')
const catchFn = jest.fn()
const finallyFn = jest.fn()
const promise = new Promise<number>((resolve) => {
finallyFn.mockImplementation((attempt: number) => resolve(attempt))
void runRetriableFunction({
metricName: 'plugin.on_foo',
hub: mockHub,
payload: testEvent,
tryFn,
catchFn,
finallyFn,
appMetric,
})
})
jest.runAllTimers()
await expect(promise).resolves.toEqual(1)
expect(tryFn).toHaveBeenCalledTimes(1)
expect(catchFn).toHaveBeenCalledTimes(0)
expect(finallyFn).toHaveBeenCalledTimes(1)
expect(setTimeout).not.toHaveBeenCalled()
expect(mockHub.appMetrics.queueMetric).toHaveBeenCalledWith({
...appMetric,
successes: 1,
successesOnRetry: 0,
})
})
it('catches non-RetryError error', async () => {
const tryFn = jest.fn().mockImplementation(() => {
// Faulty plugin code might look like this
let bar
bar.baz = 123
})
const catchFn = jest.fn()
const finallyFn = jest.fn()
const promise = new Promise<number>((resolve) => {
finallyFn.mockImplementation((attempt: number) => resolve(attempt))
void runRetriableFunction({
metricName: 'plugin.on_foo',
hub: mockHub,
payload: testEvent,
tryFn,
catchFn,
finallyFn,
appMetric,
appMetricErrorContext: {
event: testEvent,
},
})
})
jest.runAllTimers()
await expect(promise).resolves.toEqual(1)
expect(tryFn).toHaveBeenCalledTimes(1)
expect(catchFn).toHaveBeenCalledTimes(1)
expect(catchFn).toHaveBeenCalledWith(expect.any(TypeError))
expect(finallyFn).toHaveBeenCalledTimes(1)
expect(setTimeout).not.toHaveBeenCalled()
expect(mockHub.appMetrics.queueError).toHaveBeenCalledWith(
{
...appMetric,
failures: 1,
},
{
error: expect.any(TypeError),
event: testEvent,
}
)
})
it('catches RetryError error and retries up to 3 times', async () => {
const tryFn = jest.fn().mockImplementation(() => {
throw new RetryError()
})
const catchFn = jest.fn()
const finallyFn = jest.fn()
const promise = new Promise<number>((resolve) => {
finallyFn.mockImplementation((attempt: number) => resolve(attempt))
void runRetriableFunction({
metricName: 'plugin.on_foo',
hub: mockHub,
payload: testEvent,
tryFn,
catchFn,
finallyFn,
appMetric,
appMetricErrorContext: {
event: testEvent,
},
})
})
expect(tryFn).toHaveBeenCalledTimes(1)
expect(finallyFn).toHaveBeenCalledTimes(0)
expect(setTimeout).toHaveBeenCalledTimes(1)
jest.runAllTimers()
await expect(promise).resolves.toEqual(3)
expect(tryFn).toHaveBeenCalledTimes(3)
expect(catchFn).toHaveBeenCalledTimes(1)
expect(catchFn).toHaveBeenCalledWith(expect.any(RetryError))
expect(finallyFn).toHaveBeenCalledTimes(1)
expect(setTimeout).toHaveBeenCalledTimes(2)
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), 3_000)
expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), 6_000)
expect(mockHub.appMetrics.queueError).toHaveBeenCalledWith(
{
...appMetric,
failures: 1,
},
{
error: expect.any(RetryError),
event: testEvent,
}
)
})
it('catches RetryError error and allow the function to succeed on 3rd attempt', async () => {
const tryFn = jest
.fn()
.mockImplementationOnce(() => {
throw new RetryError()
})
.mockImplementationOnce(() => {
throw new RetryError()
})
.mockResolvedValue('Gute Nacht')
const catchFn = jest.fn()
const finallyFn = jest.fn()
const promise = new Promise<number>((resolve) => {
finallyFn.mockImplementation((attempt: number) => resolve(attempt))
void runRetriableFunction({
metricName: 'plugin.on_foo',
hub: mockHub,
payload: testEvent,
tryFn,
catchFn,
finallyFn,
appMetric,
})
})
expect(tryFn).toHaveBeenCalledTimes(1)
expect(finallyFn).toHaveBeenCalledTimes(0)
expect(setTimeout).toHaveBeenCalledTimes(1)
jest.runAllTimers()
await expect(promise).resolves.toEqual(3)
expect(tryFn).toHaveBeenCalledTimes(3)
expect(catchFn).toHaveBeenCalledTimes(0)
expect(finallyFn).toHaveBeenCalledTimes(1)
expect(setTimeout).toHaveBeenCalledTimes(2)
expect(setTimeout).toHaveBeenNthCalledWith(1, expect.any(Function), 3_000)
expect(setTimeout).toHaveBeenNthCalledWith(2, expect.any(Function), 6_000)
expect(mockHub.appMetrics.queueMetric).toHaveBeenCalledWith({
...appMetric,
successes: 0,
successesOnRetry: 1,
})
})
})

View File

@ -1,39 +0,0 @@
import { delay } from '../../src/utils/utils'
import { PromiseManager } from '../../src/worker/vm/promise-manager'
jest.setTimeout(100000)
describe('PromiseManager', () => {
let promiseManager: PromiseManager
beforeEach(() => {
promiseManager = new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any)
})
afterEach(async () => {
await Promise.all(promiseManager.pendingPromises)
})
test('promise manager awaits promises if above limit', async () => {
const hello = jest.fn()
const promise = async () => {
await delay(3000)
hello()
}
// we track the promise but don't await it
promiseManager.trackPromise(promise())
expect(promiseManager.pendingPromises.size).toEqual(1)
expect(hello).not.toHaveBeenCalled()
// we add another promise above the limit
promiseManager.trackPromise(promise())
expect(promiseManager.pendingPromises.size).toEqual(2)
expect(hello).not.toHaveBeenCalled()
// we chop one promise off by awaiting it
await promiseManager.awaitPromisesIfNeeded()
expect(hello).toHaveBeenCalled()
expect(promiseManager.pendingPromises.size).toEqual(1)
})
})

View File

@ -25,7 +25,6 @@ describe('captureIngestionWarning()', () => {
it('can read own writes', async () => {
await captureIngestionWarning(hub.db, 2, 'some_type', { foo: 'bar' })
await hub.promiseManager.awaitPromisesIfNeeded()
const warnings = await delayUntilEventIngested(fetchWarnings)
expect(warnings).toEqual([