mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-24 09:14:46 +01:00
Retry queue (#325)
* extract redlock from schedule * implement generic retrying * capture console.log in tests via a temp file * add graphile queue * make it prettier and safe * style fixes * fix some tests * release if there * split postgres tests * don't make a graphile worker in all tests * revert "split postgres tests" * skip retries if pluginConfig not found * reset graphile schema before test * fix failing tests by clearing the retry consumer redlock * bust github actions cache * slight cleanup * fix github/eslint complaining about an `any` * separate url for graphile retry queue, otherwise use existing postgres pool (fixes helm connection string issue) * convert startRedlock params to options object * move type around * use an enum * update typo in comment Co-authored-by: Michael Matloka <dev@twixes.com>
This commit is contained in:
parent
f2145e3b48
commit
b83c84419d
1
.gitignore
vendored
1
.gitignore
vendored
@ -6,3 +6,4 @@ dist/
|
||||
yalc.lock
|
||||
.yalc/
|
||||
src/idl/protos.*
|
||||
tmp
|
||||
|
@ -73,7 +73,7 @@ describe('e2e kafka & clickhouse benchmark', () => {
|
||||
|
||||
// hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited)
|
||||
await delay(5000)
|
||||
queue.resume()
|
||||
await queue.resume()
|
||||
|
||||
console.log('Starting timer')
|
||||
const startTime = performance.now()
|
||||
|
@ -71,7 +71,7 @@ describe('e2e kafka processing timeout benchmark', () => {
|
||||
|
||||
// hope that 5sec is enough to load kafka with all the events (posthog.capture can't be awaited)
|
||||
await delay(5000)
|
||||
queue.resume()
|
||||
await queue.resume()
|
||||
|
||||
console.log('Starting timer')
|
||||
const startTime = performance.now()
|
||||
|
@ -73,7 +73,7 @@ describe('e2e celery & postgres benchmark', () => {
|
||||
}
|
||||
await delay(3000)
|
||||
expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toEqual(count)
|
||||
queue.resume()
|
||||
await queue.resume()
|
||||
|
||||
console.log('Starting timer')
|
||||
const startTime = performance.now()
|
||||
|
@ -63,6 +63,7 @@
|
||||
"fast-deep-equal": "^3.1.3",
|
||||
"fastify": "^3.8.0",
|
||||
"generic-pool": "^3.7.1",
|
||||
"graphile-worker": "^0.11.0",
|
||||
"hot-shots": "^8.2.1",
|
||||
"ioredis": "^4.19.2",
|
||||
"kafkajs": "^1.15.0",
|
||||
|
@ -10,9 +10,10 @@ import { defaultConfig } from '../shared/config'
|
||||
import { createServer } from '../shared/server'
|
||||
import { status } from '../shared/status'
|
||||
import { createRedis, delay, getPiscinaStats } from '../shared/utils'
|
||||
import { PluginsServer, PluginsServerConfig, Queue, ScheduleControl } from '../types'
|
||||
import { PluginsServer, PluginsServerConfig, Queue, RetryQueueConsumerControl, ScheduleControl } from '../types'
|
||||
import { createMmdbServer, performMmdbStalenessCheck, prepareMmdb } from './mmdb'
|
||||
import { startQueue } from './queue'
|
||||
import { startRetryQueueConsumer } from './services/retry-queue-consumer'
|
||||
import { startSchedule } from './services/schedule'
|
||||
import { startFastifyInstance, stopFastifyInstance } from './web/server'
|
||||
|
||||
@ -46,6 +47,7 @@ export async function startPluginsServer(
|
||||
let statsJob: schedule.Job | undefined
|
||||
let piscina: Piscina | undefined
|
||||
let queue: Queue | undefined
|
||||
let retryQueueConsumer: RetryQueueConsumerControl | undefined
|
||||
let closeServer: () => Promise<void> | undefined
|
||||
let scheduleControl: ScheduleControl | undefined
|
||||
let mmdbServer: net.Server | undefined
|
||||
@ -73,6 +75,7 @@ export async function startPluginsServer(
|
||||
await pubSub?.quit()
|
||||
pingJob && schedule.cancelJob(pingJob)
|
||||
statsJob && schedule.cancelJob(statsJob)
|
||||
await retryQueueConsumer?.stop()
|
||||
await scheduleControl?.stopSchedule()
|
||||
await new Promise<void>((resolve, reject) =>
|
||||
!mmdbServer
|
||||
@ -129,9 +132,12 @@ export async function startPluginsServer(
|
||||
}
|
||||
|
||||
scheduleControl = await startSchedule(server, piscina)
|
||||
retryQueueConsumer = await startRetryQueueConsumer(server, piscina)
|
||||
|
||||
queue = await startQueue(server, piscina)
|
||||
piscina.on('drain', () => {
|
||||
queue?.resume()
|
||||
void queue?.resume()
|
||||
void retryQueueConsumer?.resume()
|
||||
})
|
||||
|
||||
// use one extra connection for redis pubsub
|
||||
|
@ -14,9 +14,13 @@ export type WorkerMethods = {
|
||||
ingestEvent: (event: PluginEvent) => Promise<IngestEventResponse>
|
||||
}
|
||||
|
||||
function pauseQueueIfWorkerFull(queue: Queue | undefined, server: PluginsServer, piscina?: Piscina) {
|
||||
if (queue && (piscina?.queueSize || 0) > (server.WORKER_CONCURRENCY || 4) * (server.WORKER_CONCURRENCY || 4)) {
|
||||
void queue.pause()
|
||||
export function pauseQueueIfWorkerFull(
|
||||
pause: undefined | (() => void | Promise<void>),
|
||||
server: PluginsServer,
|
||||
piscina?: Piscina
|
||||
): void {
|
||||
if (pause && (piscina?.queueSize || 0) > (server.WORKER_CONCURRENCY || 4) * (server.WORKER_CONCURRENCY || 4)) {
|
||||
void pause()
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,10 +85,10 @@ function startQueueRedis(server: PluginsServer, piscina: Piscina | undefined, wo
|
||||
...data,
|
||||
} as PluginEvent)
|
||||
try {
|
||||
pauseQueueIfWorkerFull(celeryQueue, server, piscina)
|
||||
pauseQueueIfWorkerFull(() => celeryQueue.pause(), server, piscina)
|
||||
const processedEvent = await workerMethods.processEvent(event)
|
||||
if (processedEvent) {
|
||||
pauseQueueIfWorkerFull(celeryQueue, server, piscina)
|
||||
pauseQueueIfWorkerFull(() => celeryQueue.pause(), server, piscina)
|
||||
await workerMethods.ingestEvent(processedEvent)
|
||||
}
|
||||
} catch (e) {
|
||||
|
75
src/main/retry/fs-queue.ts
Normal file
75
src/main/retry/fs-queue.ts
Normal file
@ -0,0 +1,75 @@
|
||||
import { EnqueuedRetry, OnRetryCallback, RetryQueue } from '../../types'
|
||||
import Timeout = NodeJS.Timeout
|
||||
import * as fs from 'fs'
|
||||
import * as path from 'path'
|
||||
|
||||
export class FsQueue implements RetryQueue {
|
||||
paused: boolean
|
||||
started: boolean
|
||||
interval: Timeout | null
|
||||
filename: string
|
||||
|
||||
constructor(filename?: string) {
|
||||
if (process.env.NODE_ENV !== 'test') {
|
||||
throw new Error('Can not use FsQueue outside tests')
|
||||
}
|
||||
this.paused = false
|
||||
this.started = false
|
||||
this.interval = null
|
||||
this.filename = filename || path.join(process.cwd(), 'tmp', 'fs-queue.txt')
|
||||
|
||||
fs.mkdirSync(path.dirname(this.filename), { recursive: true })
|
||||
fs.writeFileSync(this.filename, '')
|
||||
}
|
||||
|
||||
enqueue(retry: EnqueuedRetry): Promise<void> | void {
|
||||
fs.appendFileSync(this.filename, `${JSON.stringify(retry)}\n`)
|
||||
}
|
||||
|
||||
quit(): void {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
startConsumer(onRetry: OnRetryCallback): void {
|
||||
fs.writeFileSync(this.filename, '')
|
||||
this.started = true
|
||||
this.interval = setInterval(() => {
|
||||
if (this.paused) {
|
||||
return
|
||||
}
|
||||
const timestamp = new Date().valueOf()
|
||||
const queue = fs
|
||||
.readFileSync(this.filename)
|
||||
.toString()
|
||||
.split('\n')
|
||||
.filter((a) => a)
|
||||
.map((s) => JSON.parse(s) as EnqueuedRetry)
|
||||
|
||||
const newQueue = queue.filter((element) => element.timestamp < timestamp)
|
||||
if (newQueue.length > 0) {
|
||||
const oldQueue = queue.filter((element) => element.timestamp >= timestamp)
|
||||
fs.writeFileSync(this.filename, `${oldQueue.map((q) => JSON.stringify(q)).join('\n')}\n`)
|
||||
|
||||
void onRetry(newQueue)
|
||||
}
|
||||
}, 1000)
|
||||
}
|
||||
|
||||
stopConsumer(): void {
|
||||
this.started = false
|
||||
this.interval && clearInterval(this.interval)
|
||||
fs.unlinkSync(this.filename)
|
||||
}
|
||||
|
||||
pauseConsumer(): void {
|
||||
this.paused = true
|
||||
}
|
||||
|
||||
isConsumerPaused(): boolean {
|
||||
return this.paused
|
||||
}
|
||||
|
||||
resumeConsumer(): void {
|
||||
this.paused = false
|
||||
}
|
||||
}
|
94
src/main/retry/graphile-queue.ts
Normal file
94
src/main/retry/graphile-queue.ts
Normal file
@ -0,0 +1,94 @@
|
||||
import { makeWorkerUtils, run, Runner, WorkerUtils, WorkerUtilsOptions } from 'graphile-worker'
|
||||
|
||||
import { EnqueuedRetry, OnRetryCallback, PluginsServer, RetryQueue } from '../../types'
|
||||
|
||||
export class GraphileQueue implements RetryQueue {
|
||||
pluginsServer: PluginsServer
|
||||
started: boolean
|
||||
paused: boolean
|
||||
onRetry: OnRetryCallback | null
|
||||
runner: Runner | null
|
||||
workerUtils: WorkerUtils | null
|
||||
|
||||
constructor(pluginsServer: PluginsServer) {
|
||||
this.pluginsServer = pluginsServer
|
||||
this.started = false
|
||||
this.paused = false
|
||||
this.onRetry = null
|
||||
this.runner = null
|
||||
this.workerUtils = null
|
||||
}
|
||||
|
||||
async enqueue(retry: EnqueuedRetry): Promise<void> {
|
||||
if (!this.workerUtils) {
|
||||
this.workerUtils = await makeWorkerUtils(
|
||||
this.pluginsServer.RETRY_QUEUE_GRAPHILE_URL
|
||||
? {
|
||||
connectionString: this.pluginsServer.RETRY_QUEUE_GRAPHILE_URL,
|
||||
}
|
||||
: ({
|
||||
pgPool: this.pluginsServer.postgres,
|
||||
} as WorkerUtilsOptions)
|
||||
)
|
||||
await this.workerUtils.migrate()
|
||||
}
|
||||
await this.workerUtils.addJob('retryTask', retry, { runAt: new Date(retry.timestamp), maxAttempts: 1 })
|
||||
}
|
||||
|
||||
async quit(): Promise<void> {
|
||||
const oldWorkerUtils = this.workerUtils
|
||||
this.workerUtils = null
|
||||
await oldWorkerUtils?.release()
|
||||
}
|
||||
|
||||
async startConsumer(onRetry: OnRetryCallback): Promise<void> {
|
||||
this.started = true
|
||||
this.onRetry = onRetry
|
||||
await this.syncState()
|
||||
}
|
||||
|
||||
async stopConsumer(): Promise<void> {
|
||||
this.started = false
|
||||
await this.syncState()
|
||||
}
|
||||
|
||||
async pauseConsumer(): Promise<void> {
|
||||
this.paused = true
|
||||
await this.syncState()
|
||||
}
|
||||
|
||||
isConsumerPaused(): boolean {
|
||||
return this.paused
|
||||
}
|
||||
|
||||
async resumeConsumer(): Promise<void> {
|
||||
this.paused = false
|
||||
await this.syncState()
|
||||
}
|
||||
|
||||
async syncState(): Promise<void> {
|
||||
if (this.started && !this.paused) {
|
||||
if (!this.runner) {
|
||||
this.runner = await run({
|
||||
connectionString: this.pluginsServer.DATABASE_URL,
|
||||
concurrency: 1,
|
||||
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
|
||||
noHandleSignals: false,
|
||||
pollInterval: 100,
|
||||
// you can set the taskList or taskDirectory but not both
|
||||
taskList: {
|
||||
retryTask: (payload) => {
|
||||
void this.onRetry?.([payload as EnqueuedRetry])
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
} else {
|
||||
if (this.runner) {
|
||||
const oldRunner = this.runner
|
||||
this.runner = null
|
||||
await oldRunner?.stop()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
80
src/main/retry/retry-queue-manager.ts
Normal file
80
src/main/retry/retry-queue-manager.ts
Normal file
@ -0,0 +1,80 @@
|
||||
import * as Sentry from '@sentry/node'
|
||||
|
||||
import { EnqueuedRetry, OnRetryCallback, PluginsServer, RetryQueue } from '../../types'
|
||||
import { FsQueue } from './fs-queue'
|
||||
import { GraphileQueue } from './graphile-queue'
|
||||
|
||||
enum QueueType {
|
||||
FS = 'fs',
|
||||
Graphile = 'graphile',
|
||||
}
|
||||
|
||||
const queues: Record<QueueType, (server: PluginsServer) => RetryQueue> = {
|
||||
fs: () => new FsQueue(),
|
||||
graphile: (pluginsServer: PluginsServer) => new GraphileQueue(pluginsServer),
|
||||
}
|
||||
|
||||
export class RetryQueueManager implements RetryQueue {
|
||||
pluginsServer: PluginsServer
|
||||
retryQueues: RetryQueue[]
|
||||
|
||||
constructor(pluginsServer: PluginsServer) {
|
||||
this.pluginsServer = pluginsServer
|
||||
|
||||
this.retryQueues = pluginsServer.RETRY_QUEUES.split(',')
|
||||
.map((q) => q.trim() as QueueType)
|
||||
.filter((q) => !!q)
|
||||
.map(
|
||||
(queue): RetryQueue => {
|
||||
if (queues[queue]) {
|
||||
return queues[queue](pluginsServer)
|
||||
} else {
|
||||
throw new Error(`Unknown retry queue "${queue}"`)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
async enqueue(retry: EnqueuedRetry): Promise<void> {
|
||||
for (const retryQueue of this.retryQueues) {
|
||||
try {
|
||||
await retryQueue.enqueue(retry)
|
||||
return
|
||||
} catch (error) {
|
||||
// if one fails, take the next queue
|
||||
Sentry.captureException(error, {
|
||||
extra: {
|
||||
retry: JSON.stringify(retry),
|
||||
queue: retryQueue.toString(),
|
||||
queues: this.retryQueues.map((q) => q.toString()),
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
throw new Error('No RetryQueue available')
|
||||
}
|
||||
|
||||
async quit(): Promise<void> {
|
||||
await Promise.all(this.retryQueues.map((r) => r.quit()))
|
||||
}
|
||||
|
||||
async startConsumer(onRetry: OnRetryCallback): Promise<void> {
|
||||
await Promise.all(this.retryQueues.map((r) => r.startConsumer(onRetry)))
|
||||
}
|
||||
|
||||
async stopConsumer(): Promise<void> {
|
||||
await Promise.all(this.retryQueues.map((r) => r.stopConsumer()))
|
||||
}
|
||||
|
||||
async pauseConsumer(): Promise<void> {
|
||||
await Promise.all(this.retryQueues.map((r) => r.pauseConsumer()))
|
||||
}
|
||||
|
||||
isConsumerPaused(): boolean {
|
||||
return !!this.retryQueues.find((r) => r.isConsumerPaused())
|
||||
}
|
||||
|
||||
async resumeConsumer(): Promise<void> {
|
||||
await Promise.all(this.retryQueues.map((r) => r.resumeConsumer()))
|
||||
}
|
||||
}
|
100
src/main/services/redlock.ts
Normal file
100
src/main/services/redlock.ts
Normal file
@ -0,0 +1,100 @@
|
||||
import * as Sentry from '@sentry/node'
|
||||
import Redlock from 'redlock'
|
||||
|
||||
import { status } from '../../shared/status'
|
||||
import { createRedis } from '../../shared/utils'
|
||||
import { PluginsServer } from '../../types'
|
||||
|
||||
type RedlockOptions = {
|
||||
server: PluginsServer
|
||||
resource: string
|
||||
onLock: () => Promise<void> | void
|
||||
onUnlock: () => Promise<void> | void
|
||||
ttl: number
|
||||
}
|
||||
|
||||
export async function startRedlock({
|
||||
server,
|
||||
resource,
|
||||
onLock,
|
||||
onUnlock,
|
||||
ttl,
|
||||
}: RedlockOptions): Promise<() => Promise<void>> {
|
||||
status.info('⏰', `Starting redlock "${resource}" ...`)
|
||||
|
||||
let stopped = false
|
||||
let weHaveTheLock = false
|
||||
let lock: Redlock.Lock
|
||||
let lockTimeout: NodeJS.Timeout
|
||||
|
||||
const lockTTL = ttl * 1000 // 60 sec if default passed in
|
||||
const retryDelay = lockTTL / 10 // 6 sec
|
||||
const extendDelay = lockTTL / 2 // 30 sec
|
||||
|
||||
// use another redis connection for redlock
|
||||
const redis = await createRedis(server)
|
||||
|
||||
const redlock = new Redlock([redis], {
|
||||
// we handle retries ourselves to have a way to cancel the promises on quit
|
||||
// without this, the `await redlock.lock()` code will remain inflight and cause issues
|
||||
retryCount: 0,
|
||||
})
|
||||
|
||||
redlock.on('clientError', (error) => {
|
||||
if (stopped) {
|
||||
return
|
||||
}
|
||||
status.error('🔴', `Redlock "${resource}" client error occurred:\n`, error)
|
||||
Sentry.captureException(error, { extra: { resource } })
|
||||
})
|
||||
|
||||
const tryToGetTheLock = async () => {
|
||||
try {
|
||||
lock = await redlock.lock(resource, lockTTL)
|
||||
weHaveTheLock = true
|
||||
|
||||
status.info('🔒', `Redlock "${resource}" acquired!`)
|
||||
|
||||
const extendLock = async () => {
|
||||
if (stopped) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
lock = await lock.extend(lockTTL)
|
||||
lockTimeout = setTimeout(extendLock, extendDelay)
|
||||
} catch (error) {
|
||||
status.error('🔴', `Redlock cannot extend lock "${resource}":\n`, error)
|
||||
Sentry.captureException(error, { extra: { resource } })
|
||||
weHaveTheLock = false
|
||||
lockTimeout = setTimeout(tryToGetTheLock, 0)
|
||||
}
|
||||
}
|
||||
|
||||
lockTimeout = setTimeout(extendLock, extendDelay)
|
||||
|
||||
await onLock?.()
|
||||
} catch (error) {
|
||||
if (stopped) {
|
||||
return
|
||||
}
|
||||
weHaveTheLock = false
|
||||
if (error instanceof Redlock.LockError) {
|
||||
lockTimeout = setTimeout(tryToGetTheLock, retryDelay)
|
||||
} else {
|
||||
Sentry.captureException(error, { extra: { resource } })
|
||||
status.error('🔴', `Redlock "${resource}" error:\n`, error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lockTimeout = setTimeout(tryToGetTheLock, 0)
|
||||
|
||||
return async () => {
|
||||
stopped = true
|
||||
lockTimeout && clearTimeout(lockTimeout)
|
||||
|
||||
await lock?.unlock().catch(Sentry.captureException)
|
||||
await redis.quit()
|
||||
await onUnlock?.()
|
||||
}
|
||||
}
|
38
src/main/services/retry-queue-consumer.ts
Normal file
38
src/main/services/retry-queue-consumer.ts
Normal file
@ -0,0 +1,38 @@
|
||||
import Piscina from '@posthog/piscina'
|
||||
|
||||
import { status } from '../../shared/status'
|
||||
import { OnRetryCallback, PluginsServer, RetryQueueConsumerControl } from '../../types'
|
||||
import { pauseQueueIfWorkerFull } from '../queue'
|
||||
import { startRedlock } from './redlock'
|
||||
|
||||
export const LOCKED_RESOURCE = 'plugin-server:locks:retry-queue-consumer'
|
||||
|
||||
export async function startRetryQueueConsumer(
|
||||
server: PluginsServer,
|
||||
piscina: Piscina
|
||||
): Promise<RetryQueueConsumerControl> {
|
||||
status.info('🔄', 'Starting retry queue consumer, trying to get lock...')
|
||||
|
||||
const onRetry: OnRetryCallback = async (retries) => {
|
||||
pauseQueueIfWorkerFull(server.retryQueueManager.pauseConsumer, server, piscina)
|
||||
for (const retry of retries) {
|
||||
await piscina.runTask({ task: 'retry', args: { retry } })
|
||||
}
|
||||
}
|
||||
|
||||
const unlock = await startRedlock({
|
||||
server,
|
||||
resource: LOCKED_RESOURCE,
|
||||
onLock: async () => {
|
||||
status.info('🔄', 'Retry queue consumer lock aquired')
|
||||
await server.retryQueueManager.startConsumer(onRetry)
|
||||
},
|
||||
onUnlock: async () => {
|
||||
status.info('🔄', 'Stopping retry queue consumer')
|
||||
await server.retryQueueManager.stopConsumer()
|
||||
},
|
||||
ttl: server.SCHEDULE_LOCK_TTL,
|
||||
})
|
||||
|
||||
return { stop: () => unlock(), resume: () => server.retryQueueManager.resumeConsumer() }
|
||||
}
|
@ -1,12 +1,11 @@
|
||||
import Piscina from '@posthog/piscina'
|
||||
import * as Sentry from '@sentry/node'
|
||||
import * as schedule from 'node-schedule'
|
||||
import Redlock from 'redlock'
|
||||
|
||||
import { processError } from '../../shared/error'
|
||||
import { status } from '../../shared/status'
|
||||
import { createRedis, delay } from '../../shared/utils'
|
||||
import { delay } from '../../shared/utils'
|
||||
import { PluginConfigId, PluginsServer, ScheduleControl } from '../../types'
|
||||
import { startRedlock } from './redlock'
|
||||
|
||||
export const LOCKED_RESOURCE = 'plugin-server:locks:schedule'
|
||||
|
||||
@ -19,74 +18,9 @@ export async function startSchedule(
|
||||
|
||||
let stopped = false
|
||||
let weHaveTheLock = false
|
||||
let lock: Redlock.Lock
|
||||
let lockTimeout: NodeJS.Timeout
|
||||
|
||||
const lockTTL = server.SCHEDULE_LOCK_TTL * 1000 // 60 sec
|
||||
const retryDelay = lockTTL / 10 // 6 sec
|
||||
const extendDelay = lockTTL / 2 // 30 sec
|
||||
|
||||
// use another redis connection for redlock
|
||||
const redis = await createRedis(server)
|
||||
|
||||
const redlock = new Redlock([redis], {
|
||||
// we handle retires ourselves to have a way to cancel the promises on quit
|
||||
// without this, the `await redlock.lock()` code will remain inflight and cause issues
|
||||
retryCount: 0,
|
||||
})
|
||||
|
||||
redlock.on('clientError', (error) => {
|
||||
if (stopped) {
|
||||
return
|
||||
}
|
||||
status.error('🔴', 'Redlock client error occurred:\n', error)
|
||||
Sentry.captureException(error)
|
||||
})
|
||||
|
||||
const tryToGetTheLock = async () => {
|
||||
try {
|
||||
lock = await redlock.lock(LOCKED_RESOURCE, lockTTL)
|
||||
weHaveTheLock = true
|
||||
|
||||
status.info('🔒', 'Scheduler lock acquired!')
|
||||
|
||||
const extendLock = async () => {
|
||||
if (stopped) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
lock = await lock.extend(lockTTL)
|
||||
lockTimeout = setTimeout(extendLock, extendDelay)
|
||||
} catch (error) {
|
||||
status.error('🔴', 'Redlock cannot extend lock:\n', error)
|
||||
Sentry.captureException(error)
|
||||
weHaveTheLock = false
|
||||
lockTimeout = setTimeout(tryToGetTheLock, 0)
|
||||
}
|
||||
}
|
||||
|
||||
lockTimeout = setTimeout(extendLock, extendDelay)
|
||||
|
||||
onLock?.()
|
||||
} catch (error) {
|
||||
if (stopped) {
|
||||
return
|
||||
}
|
||||
weHaveTheLock = false
|
||||
if (error instanceof Redlock.LockError) {
|
||||
lockTimeout = setTimeout(tryToGetTheLock, retryDelay)
|
||||
} else {
|
||||
Sentry.captureException(error)
|
||||
status.error('🔴', 'Redlock error:\n', error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let pluginSchedulePromise = loadPluginSchedule(piscina)
|
||||
server.pluginSchedule = await pluginSchedulePromise
|
||||
|
||||
lockTimeout = setTimeout(tryToGetTheLock, 0)
|
||||
|
||||
const runEveryMinuteJob = schedule.scheduleJob('* * * * *', async () => {
|
||||
!stopped &&
|
||||
weHaveTheLock &&
|
||||
@ -106,15 +40,26 @@ export async function startSchedule(
|
||||
runTasksDebounced(server!, piscina!, 'runEveryDay')
|
||||
})
|
||||
|
||||
const unlock = await startRedlock({
|
||||
server,
|
||||
resource: LOCKED_RESOURCE,
|
||||
onLock: () => {
|
||||
weHaveTheLock = true
|
||||
onLock?.()
|
||||
},
|
||||
onUnlock: () => {
|
||||
weHaveTheLock = false
|
||||
},
|
||||
ttl: server.SCHEDULE_LOCK_TTL,
|
||||
})
|
||||
|
||||
const stopSchedule = async () => {
|
||||
stopped = true
|
||||
lockTimeout && clearTimeout(lockTimeout)
|
||||
runEveryDayJob && schedule.cancelJob(runEveryDayJob)
|
||||
runEveryHourJob && schedule.cancelJob(runEveryHourJob)
|
||||
runEveryMinuteJob && schedule.cancelJob(runEveryMinuteJob)
|
||||
|
||||
await lock?.unlock().catch(Sentry.captureException)
|
||||
await redis.quit()
|
||||
await unlock()
|
||||
await waitForTasksToFinish(server!)
|
||||
}
|
||||
|
||||
|
@ -59,6 +59,8 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
DISTINCT_ID_LRU_SIZE: 10000,
|
||||
INTERNAL_MMDB_SERVER_PORT: 0,
|
||||
PLUGIN_SERVER_IDLE: false,
|
||||
RETRY_QUEUES: '',
|
||||
RETRY_QUEUE_GRAPHILE_URL: '',
|
||||
ENABLE_PERSISTENT_CONSOLE: false, // TODO: remove when persistent console ships in main repo
|
||||
STALENESS_RESTART_SECONDS: 0,
|
||||
}
|
||||
@ -102,6 +104,8 @@ export function getConfigHelp(): Record<keyof PluginsServerConfig, string> {
|
||||
DISTINCT_ID_LRU_SIZE: 'size of persons distinct ID LRU cache',
|
||||
INTERNAL_MMDB_SERVER_PORT: 'port of the internal server used for IP location (0 means random)',
|
||||
PLUGIN_SERVER_IDLE: 'whether to disengage the plugin server, e.g. for development',
|
||||
RETRY_QUEUES: 'retry queue engine and fallback queues',
|
||||
RETRY_QUEUE_GRAPHILE_URL: 'use a different postgres connection in the graphile retry queue',
|
||||
STALENESS_RESTART_SECONDS: 'trigger a restart if no event ingested for this duration',
|
||||
}
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import * as path from 'path'
|
||||
import { types as pgTypes } from 'pg'
|
||||
import { ConnectionOptions } from 'tls'
|
||||
|
||||
import { RetryQueueManager } from '../main/retry/retry-queue-manager'
|
||||
import { PluginsServer, PluginsServerConfig } from '../types'
|
||||
import { EventsProcessor } from '../worker/ingestion/process-event'
|
||||
import { defaultConfig } from './config'
|
||||
@ -164,9 +165,11 @@ export async function createServer(
|
||||
|
||||
// :TODO: This is only used on worker threads, not main
|
||||
server.eventsProcessor = new EventsProcessor(server as PluginsServer)
|
||||
server.retryQueueManager = new RetryQueueManager(server as PluginsServer)
|
||||
|
||||
const closeServer = async () => {
|
||||
server.mmdbUpdateJob?.cancel()
|
||||
await server.retryQueueManager?.quit()
|
||||
if (kafkaProducer) {
|
||||
clearInterval(kafkaProducer.flushInterval)
|
||||
await kafkaProducer.flush()
|
||||
|
@ -455,12 +455,11 @@ export enum NodeEnv {
|
||||
Test = 'test',
|
||||
}
|
||||
|
||||
export function stringToBoolean(value: any): boolean {
|
||||
export function stringToBoolean(value: unknown): boolean {
|
||||
if (!value) {
|
||||
return false
|
||||
}
|
||||
value = String(value)
|
||||
return ['y', 'yes', 't', 'true', 'on', '1'].includes(value.toLowerCase())
|
||||
return ['y', 'yes', 't', 'true', 'on', '1'].includes(String(value).toLowerCase())
|
||||
}
|
||||
|
||||
export function determineNodeEnv(): NodeEnv {
|
||||
|
38
src/types.ts
38
src/types.ts
@ -1,4 +1,3 @@
|
||||
import { ReaderModel } from '@maxmind/geoip2-node'
|
||||
import ClickHouse from '@posthog/clickhouse'
|
||||
import { PluginAttachment, PluginConfigSchema, PluginEvent, Properties } from '@posthog/plugin-scaffold'
|
||||
import { Pool as GenericPool } from 'generic-pool'
|
||||
@ -6,7 +5,6 @@ import { StatsD } from 'hot-shots'
|
||||
import { Redis } from 'ioredis'
|
||||
import { Kafka } from 'kafkajs'
|
||||
import { DateTime } from 'luxon'
|
||||
import { Job } from 'node-schedule'
|
||||
import { Pool } from 'pg'
|
||||
import { VM } from 'vm2'
|
||||
|
||||
@ -73,6 +71,8 @@ export interface PluginsServerConfig extends Record<string, any> {
|
||||
DISTINCT_ID_LRU_SIZE: number
|
||||
INTERNAL_MMDB_SERVER_PORT: number
|
||||
PLUGIN_SERVER_IDLE: boolean
|
||||
RETRY_QUEUES: string
|
||||
RETRY_QUEUE_GRAPHILE_URL: string
|
||||
ENABLE_PERSISTENT_CONSOLE: boolean
|
||||
STALENESS_RESTART_SECONDS: number
|
||||
}
|
||||
@ -94,24 +94,40 @@ export interface PluginsServer extends PluginsServerConfig {
|
||||
pluginSchedule: Record<string, PluginConfigId[]> | null
|
||||
pluginSchedulePromises: Record<string, Record<PluginConfigId, Promise<any> | null>>
|
||||
eventsProcessor: EventsProcessor
|
||||
retryQueueManager: RetryQueue
|
||||
// diagnostics
|
||||
lastActivity: number
|
||||
lastActivityType: string
|
||||
}
|
||||
|
||||
export interface Pausable {
|
||||
pause: () => Promise<void>
|
||||
resume: () => void
|
||||
pause: () => Promise<void> | void
|
||||
resume: () => Promise<void> | void
|
||||
isPaused: () => boolean
|
||||
}
|
||||
|
||||
export interface Queue extends Pausable {
|
||||
start: () => Promise<void>
|
||||
stop: () => Promise<void>
|
||||
start: () => Promise<void> | void
|
||||
stop: () => Promise<void> | void
|
||||
}
|
||||
|
||||
export interface Queue {
|
||||
stop: () => Promise<void>
|
||||
export type OnRetryCallback = (queue: EnqueuedRetry[]) => Promise<void> | void
|
||||
export interface EnqueuedRetry {
|
||||
type: string
|
||||
payload: Record<string, any>
|
||||
timestamp: number
|
||||
pluginConfigId: number
|
||||
pluginConfigTeam: number
|
||||
}
|
||||
|
||||
export interface RetryQueue {
|
||||
startConsumer: (onRetry: OnRetryCallback) => Promise<void> | void
|
||||
stopConsumer: () => Promise<void> | void
|
||||
pauseConsumer: () => Promise<void> | void
|
||||
resumeConsumer: () => Promise<void> | void
|
||||
isConsumerPaused: () => boolean
|
||||
enqueue: (retry: EnqueuedRetry) => Promise<void> | void
|
||||
quit: () => Promise<void> | void
|
||||
}
|
||||
|
||||
export type PluginId = number
|
||||
@ -219,6 +235,7 @@ export interface PluginConfigVMReponse {
|
||||
teardownPlugin: () => Promise<void>
|
||||
processEvent: (event: PluginEvent) => Promise<PluginEvent>
|
||||
processEventBatch: (batch: PluginEvent[]) => Promise<PluginEvent[]>
|
||||
onRetry: (task: string, payload: Record<string, any>) => Promise<void>
|
||||
}
|
||||
tasks: Record<string, PluginTask>
|
||||
}
|
||||
@ -409,6 +426,11 @@ export interface ScheduleControl {
|
||||
reloadSchedule: () => Promise<void>
|
||||
}
|
||||
|
||||
export interface RetryQueueConsumerControl {
|
||||
stop: () => Promise<void>
|
||||
resume: () => Promise<void> | void
|
||||
}
|
||||
|
||||
export type IngestEventResponse = { success?: boolean; error?: string }
|
||||
|
||||
export interface EventDefinitionType {
|
||||
|
@ -1,7 +1,8 @@
|
||||
import { PluginEvent } from '@posthog/plugin-scaffold'
|
||||
import * as Sentry from '@sentry/node'
|
||||
|
||||
import { processError } from '../../shared/error'
|
||||
import { PluginConfig, PluginsServer } from '../../types'
|
||||
import { EnqueuedRetry, PluginConfig, PluginsServer } from '../../types'
|
||||
|
||||
export async function runPlugins(server: PluginsServer, event: PluginEvent): Promise<PluginEvent | null> {
|
||||
const pluginsToRun = getPluginsForTeam(server, event.team_id)
|
||||
@ -115,3 +116,25 @@ export async function runPluginTask(server: PluginsServer, taskName: string, plu
|
||||
function getPluginsForTeam(server: PluginsServer, teamId: number): PluginConfig[] {
|
||||
return server.pluginConfigsPerTeam.get(teamId) || []
|
||||
}
|
||||
|
||||
export async function runOnRetry(server: PluginsServer, retry: EnqueuedRetry): Promise<any> {
|
||||
const timer = new Date()
|
||||
let response
|
||||
const pluginConfig = server.pluginConfigs.get(retry.pluginConfigId)
|
||||
if (pluginConfig) {
|
||||
try {
|
||||
const task = await pluginConfig.vm?.getOnRetry()
|
||||
response = await task?.(retry.type, retry.payload)
|
||||
} catch (error) {
|
||||
await processError(server, pluginConfig, error)
|
||||
server.statsd?.increment(`plugin.retry.${retry.type}.${retry.pluginConfigId}.ERROR`)
|
||||
}
|
||||
} else {
|
||||
server.statsd?.increment(`plugin.retry.${retry.type}.${retry.pluginConfigId}.SKIP`)
|
||||
Sentry.captureMessage(`Retrying for plugin config ${retry.pluginConfigId} that does not exist`, {
|
||||
extra: { retry: JSON.stringify(retry) },
|
||||
})
|
||||
}
|
||||
server.statsd?.timing(`plugin.retry.${retry.type}.${retry.pluginConfigId}`, timer)
|
||||
return response
|
||||
}
|
||||
|
23
src/worker/vm/extensions/retry.ts
Normal file
23
src/worker/vm/extensions/retry.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { PluginConfig, PluginsServer } from '../../../types'
|
||||
|
||||
const minRetry = process.env.NODE_ENV === 'test' ? 1 : 30
|
||||
|
||||
// TODO: add type to scaffold
|
||||
export function createRetry(
|
||||
server: PluginsServer,
|
||||
pluginConfig: PluginConfig
|
||||
): (type: string, payload: any, retry_in?: number) => Promise<void> {
|
||||
return async (type: string, payload: any, retry_in = 30) => {
|
||||
if (retry_in < minRetry || retry_in > 86400) {
|
||||
throw new Error(`Retries must happen between ${minRetry} seconds and 24 hours from now`)
|
||||
}
|
||||
const timestamp = new Date().valueOf() + retry_in * 1000
|
||||
await server.retryQueueManager.enqueue({
|
||||
type,
|
||||
payload,
|
||||
timestamp,
|
||||
pluginConfigId: pluginConfig.id,
|
||||
pluginConfigTeam: pluginConfig.team_id,
|
||||
})
|
||||
}
|
||||
}
|
31
src/worker/vm/extensions/test-utils.ts
Normal file
31
src/worker/vm/extensions/test-utils.ts
Normal file
@ -0,0 +1,31 @@
|
||||
import fs from 'fs'
|
||||
import path from 'path'
|
||||
|
||||
const consoleFile = path.join(process.cwd(), 'tmp', 'test-console.txt')
|
||||
|
||||
export const writeToFile = {
|
||||
console: {
|
||||
log: (...args: any[]): void => {
|
||||
fs.appendFileSync(consoleFile, `${JSON.stringify(args)}\n`)
|
||||
},
|
||||
reset(): void {
|
||||
fs.mkdirSync(path.join(process.cwd(), 'tmp'), { recursive: true })
|
||||
fs.writeFileSync(consoleFile, '')
|
||||
},
|
||||
read(): any[] {
|
||||
try {
|
||||
return fs
|
||||
.readFileSync(consoleFile)
|
||||
.toString()
|
||||
.split('\n')
|
||||
.filter((str) => !!str)
|
||||
.map((part) => JSON.parse(part))
|
||||
} catch (error) {
|
||||
if (error.code === 'ENOENT') {
|
||||
return []
|
||||
}
|
||||
throw error
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
@ -7,6 +7,8 @@ import fetch from 'node-fetch'
|
||||
import snowflake from 'snowflake-sdk'
|
||||
import * as zlib from 'zlib'
|
||||
|
||||
import { writeToFile } from './extensions/test-utils'
|
||||
|
||||
export const imports = {
|
||||
crypto: crypto,
|
||||
zlib: zlib,
|
||||
@ -16,4 +18,9 @@ export const imports = {
|
||||
'@google-cloud/bigquery': { BigQuery },
|
||||
'@posthog/plugin-contrib': contrib,
|
||||
'aws-sdk': AWS,
|
||||
...(process.env.NODE_ENV === 'test'
|
||||
? {
|
||||
'test-utils/write-to-file': writeToFile,
|
||||
}
|
||||
: {}),
|
||||
}
|
||||
|
@ -70,6 +70,10 @@ export class LazyPluginVM {
|
||||
return (await this.resolveInternalVm)?.methods.teardownPlugin || null
|
||||
}
|
||||
|
||||
async getOnRetry(): Promise<PluginConfigVMReponse['methods']['onRetry'] | null> {
|
||||
return (await this.resolveInternalVm)?.methods.onRetry || null
|
||||
}
|
||||
|
||||
async getTask(name: string): Promise<PluginTask | null> {
|
||||
return (await this.resolveInternalVm)?.tasks[name] || null
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ import { createConsole } from './extensions/console'
|
||||
import { createGeoIp } from './extensions/geoip'
|
||||
import { createGoogle } from './extensions/google'
|
||||
import { createPosthog } from './extensions/posthog'
|
||||
import { createRetry } from './extensions/retry'
|
||||
import { createStorage } from './extensions/storage'
|
||||
import { imports } from './imports'
|
||||
import { transformCode } from './transforms'
|
||||
@ -65,6 +66,7 @@ export async function createPluginConfigVM(
|
||||
attachments: pluginConfig.attachments,
|
||||
storage: createStorage(server, pluginConfig),
|
||||
geoip: createGeoIp(server),
|
||||
retry: createRetry(server, pluginConfig),
|
||||
},
|
||||
'__pluginHostMeta'
|
||||
)
|
||||
@ -137,6 +139,7 @@ export async function createPluginConfigVM(
|
||||
teardownPlugin: __asyncFunctionGuard(__bindMeta('teardownPlugin')),
|
||||
processEvent: __asyncFunctionGuard(__bindMeta('processEvent')),
|
||||
processEventBatch: __asyncFunctionGuard(__bindMeta('processEventBatch')),
|
||||
onRetry: __asyncFunctionGuard(__bindMeta('onRetry')),
|
||||
};
|
||||
|
||||
// gather the runEveryX commands and export in __tasks
|
||||
|
@ -6,7 +6,7 @@ import { status } from '../shared/status'
|
||||
import { cloneObject } from '../shared/utils'
|
||||
import { PluginsServer, PluginsServerConfig } from '../types'
|
||||
import { ingestEvent } from './ingestion/ingest-event'
|
||||
import { runPlugins, runPluginsOnBatch, runPluginTask } from './plugins/run'
|
||||
import { runOnRetry, runPlugins, runPluginsOnBatch, runPluginTask } from './plugins/run'
|
||||
import { loadSchedule, setupPlugins } from './plugins/setup'
|
||||
import { teardownPlugins } from './plugins/teardown'
|
||||
|
||||
@ -46,6 +46,9 @@ export const createTaskRunner = (server: PluginsServer): TaskWorker => async ({
|
||||
// must clone the object, as we may get from VM2 something like { ..., properties: Proxy {} }
|
||||
response = cloneObject(processedEvents as any[])
|
||||
}
|
||||
if (task === 'retry') {
|
||||
response = await runOnRetry(server, args.retry)
|
||||
}
|
||||
if (task === 'getPluginSchedule') {
|
||||
response = cloneObject(server.pluginSchedule)
|
||||
}
|
||||
|
23
tests/helpers/graphile.ts
Normal file
23
tests/helpers/graphile.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { makeWorkerUtils } from 'graphile-worker'
|
||||
import { Pool } from 'pg'
|
||||
|
||||
import { defaultConfig } from '../../src/shared/config'
|
||||
import { status } from '../../src/shared/status'
|
||||
|
||||
export async function resetGraphileSchema(): Promise<void> {
|
||||
const db = new Pool({ connectionString: defaultConfig.DATABASE_URL })
|
||||
|
||||
try {
|
||||
await db.query('DROP SCHEMA graphile_worker CASCADE')
|
||||
} catch (e) {
|
||||
status.error('😱', `Could not dump graphile_worker schema: ${e.message}`)
|
||||
} finally {
|
||||
await db.end()
|
||||
}
|
||||
|
||||
const workerUtils = await makeWorkerUtils({
|
||||
connectionString: defaultConfig.DATABASE_URL,
|
||||
})
|
||||
await workerUtils.migrate()
|
||||
await workerUtils.release()
|
||||
}
|
@ -69,7 +69,13 @@ test('setupPlugins and runPlugins', async () => {
|
||||
})
|
||||
expect(pluginConfig.vm).toBeDefined()
|
||||
const vm = await pluginConfig.vm!.resolveInternalVm
|
||||
expect(Object.keys(vm!.methods)).toEqual(['setupPlugin', 'teardownPlugin', 'processEvent', 'processEventBatch'])
|
||||
expect(Object.keys(vm!.methods).sort()).toEqual([
|
||||
'onRetry',
|
||||
'processEvent',
|
||||
'processEventBatch',
|
||||
'setupPlugin',
|
||||
'teardownPlugin',
|
||||
])
|
||||
|
||||
expect(clearError).toHaveBeenCalledWith(mockServer, pluginConfig)
|
||||
|
||||
@ -122,6 +128,7 @@ test('plugin meta has what it should have', async () => {
|
||||
'config',
|
||||
'geoip',
|
||||
'global',
|
||||
'retry',
|
||||
'storage',
|
||||
])
|
||||
expect(returnedEvent!.properties!['attachments']).toEqual({
|
||||
|
@ -84,7 +84,7 @@ test('pause and resume queue', async () => {
|
||||
|
||||
expect(await redis.llen(server.PLUGINS_CELERY_QUEUE)).toBe(pluginQueue)
|
||||
|
||||
queue.resume()
|
||||
await queue.resume()
|
||||
|
||||
await delay(500)
|
||||
|
||||
|
@ -39,6 +39,7 @@ test('empty plugins', async () => {
|
||||
|
||||
expect(Object.keys(vm).sort()).toEqual(['methods', 'tasks', 'vm'])
|
||||
expect(Object.keys(vm.methods).sort()).toEqual([
|
||||
'onRetry',
|
||||
'processEvent',
|
||||
'processEventBatch',
|
||||
'setupPlugin',
|
||||
|
104
tests/retry.test.ts
Normal file
104
tests/retry.test.ts
Normal file
@ -0,0 +1,104 @@
|
||||
import { startPluginsServer } from '../src/main/pluginsServer'
|
||||
import { LOCKED_RESOURCE } from '../src/main/services/retry-queue-consumer'
|
||||
import { createServer } from '../src/shared/server'
|
||||
import { delay } from '../src/shared/utils'
|
||||
import { LogLevel } from '../src/types'
|
||||
import { makePiscina } from '../src/worker/piscina'
|
||||
import { createPosthog } from '../src/worker/vm/extensions/posthog'
|
||||
import { imports } from '../src/worker/vm/imports'
|
||||
import { resetGraphileSchema } from './helpers/graphile'
|
||||
import { pluginConfig39 } from './helpers/plugins'
|
||||
import { resetTestDatabase } from './helpers/sql'
|
||||
|
||||
jest.mock('../src/shared/sql')
|
||||
jest.setTimeout(60000) // 60 sec timeout
|
||||
|
||||
const { console: testConsole } = imports['test-utils/write-to-file']
|
||||
|
||||
describe('retry queues', () => {
|
||||
beforeEach(async () => {
|
||||
testConsole.reset()
|
||||
|
||||
const [server, stopServer] = await createServer()
|
||||
const redis = await server.redisPool.acquire()
|
||||
await redis.del(LOCKED_RESOURCE)
|
||||
await server.redisPool.release(redis)
|
||||
await stopServer()
|
||||
})
|
||||
|
||||
describe('fs queue', () => {
|
||||
test('onRetry gets called', async () => {
|
||||
const testCode = `
|
||||
import { console } from 'test-utils/write-to-file'
|
||||
|
||||
export async function onRetry (type, payload, meta) {
|
||||
console.log('retrying event!', type)
|
||||
}
|
||||
export async function processEvent (event, meta) {
|
||||
if (event.properties?.hi === 'ha') {
|
||||
console.log('processEvent')
|
||||
meta.retry('processEvent', event, 1)
|
||||
}
|
||||
return event
|
||||
}
|
||||
`
|
||||
await resetTestDatabase(testCode)
|
||||
const server = await startPluginsServer(
|
||||
{
|
||||
WORKER_CONCURRENCY: 2,
|
||||
LOG_LEVEL: LogLevel.Debug,
|
||||
RETRY_QUEUES: 'fs',
|
||||
},
|
||||
makePiscina
|
||||
)
|
||||
const posthog = createPosthog(server.server, pluginConfig39)
|
||||
|
||||
posthog.capture('my event', { hi: 'ha' })
|
||||
await delay(10000)
|
||||
|
||||
expect(testConsole.read()).toEqual([['processEvent'], ['retrying event!', 'processEvent']])
|
||||
|
||||
await server.stop()
|
||||
})
|
||||
})
|
||||
|
||||
describe('graphile', () => {
|
||||
beforeEach(async () => {
|
||||
await resetGraphileSchema()
|
||||
})
|
||||
|
||||
test('graphile retry queue', async () => {
|
||||
const testCode = `
|
||||
import { console } from 'test-utils/write-to-file'
|
||||
|
||||
export async function onRetry (type, payload, meta) {
|
||||
console.log('retrying event!', type)
|
||||
}
|
||||
export async function processEvent (event, meta) {
|
||||
if (event.properties?.hi === 'ha') {
|
||||
console.log('processEvent')
|
||||
meta.retry('processEvent', event, 1)
|
||||
}
|
||||
return event
|
||||
}
|
||||
`
|
||||
await resetTestDatabase(testCode)
|
||||
const server = await startPluginsServer(
|
||||
{
|
||||
WORKER_CONCURRENCY: 2,
|
||||
LOG_LEVEL: LogLevel.Debug,
|
||||
RETRY_QUEUES: 'graphile',
|
||||
},
|
||||
makePiscina
|
||||
)
|
||||
const posthog = createPosthog(server.server, pluginConfig39)
|
||||
|
||||
posthog.capture('my event', { hi: 'ha' })
|
||||
await delay(5000)
|
||||
|
||||
expect(testConsole.read()).toEqual([['processEvent'], ['retrying event!', 'processEvent']])
|
||||
|
||||
await server.stop()
|
||||
})
|
||||
})
|
||||
})
|
119
yarn.lock
119
yarn.lock
@ -1200,6 +1200,11 @@
|
||||
resolved "https://registry.yarnpkg.com/@google-cloud/promisify/-/promisify-2.0.3.tgz#f934b5cdc939e3c7039ff62b9caaf59a9d89e3a8"
|
||||
integrity sha512-d4VSA86eL/AFTe5xtyZX+ePUjE8dIFu2T8zmdeNBSa5/kNgXPCx/o/wbFNHAGLJdGnk1vddRuMESD9HbOC8irw==
|
||||
|
||||
"@graphile/logger@^0.2.0":
|
||||
version "0.2.0"
|
||||
resolved "https://registry.yarnpkg.com/@graphile/logger/-/logger-0.2.0.tgz#e484ec420162157c6e6f0cfb080fa29ef3a714ba"
|
||||
integrity sha512-jjcWBokl9eb1gVJ85QmoaQ73CQ52xAaOCF29ukRbYNl6lY+ts0ErTaDYOBlejcbUs2OpaiqYLO5uDhyLFzWw4w==
|
||||
|
||||
"@istanbuljs/load-nyc-config@^1.0.0":
|
||||
version "1.1.0"
|
||||
resolved "https://registry.yarnpkg.com/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz#fd3db1d59ecf7cf121e80650bb86712f9b55eced"
|
||||
@ -1660,6 +1665,11 @@
|
||||
resolved "https://registry.yarnpkg.com/@types/cookiejar/-/cookiejar-2.1.2.tgz#66ad9331f63fe8a3d3d9d8c6e3906dd10f6446e8"
|
||||
integrity sha512-t73xJJrvdTjXrn4jLS9VSGRbz0nUY3cl2DMGDU48lKl+HR9dbbjW2A9r3g40VA++mQpy6uuHg33gy7du2BKpog==
|
||||
|
||||
"@types/debug@^4.1.2":
|
||||
version "4.1.5"
|
||||
resolved "https://registry.yarnpkg.com/@types/debug/-/debug-4.1.5.tgz#b14efa8852b7768d898906613c23f688713e02cd"
|
||||
integrity sha512-Q1y515GcOdTHgagaVFhHnIFQ38ygs/kmxdNpvpou+raI9UO3YZcHDngBSYKQklcKlvA7iuQlmIKbzvmxcOE9CQ==
|
||||
|
||||
"@types/generic-pool@^3.1.9":
|
||||
version "3.1.9"
|
||||
resolved "https://registry.yarnpkg.com/@types/generic-pool/-/generic-pool-3.1.9.tgz#cc82ee0d92561fce713f8f9a7b2380eda8a89dcb"
|
||||
@ -1773,6 +1783,15 @@
|
||||
resolved "https://registry.yarnpkg.com/@types/pg-types/-/pg-types-1.11.5.tgz#1eebbe62b6772fcc75c18957a90f933d155e005b"
|
||||
integrity sha512-L8ogeT6vDzT1vxlW3KITTCt+BVXXVkLXfZ/XNm6UqbcJgxf+KPO7yjWx7dQQE8RW07KopL10x2gNMs41+IkMGQ==
|
||||
|
||||
"@types/pg@^7.14.3":
|
||||
version "7.14.11"
|
||||
resolved "https://registry.yarnpkg.com/@types/pg/-/pg-7.14.11.tgz#daf5555504a1f7af4263df265d91f140fece52e3"
|
||||
integrity sha512-EnZkZ1OMw9DvNfQkn2MTJrwKmhJYDEs5ujWrPfvseWNoI95N8B4HzU/Ltrq5ZfYxDX/Zg8mTzwr6UAyTjjFvXA==
|
||||
dependencies:
|
||||
"@types/node" "*"
|
||||
pg-protocol "^1.2.0"
|
||||
pg-types "^2.2.0"
|
||||
|
||||
"@types/pg@^7.14.6":
|
||||
version "7.14.7"
|
||||
resolved "https://registry.yarnpkg.com/@types/pg/-/pg-7.14.7.tgz#b25532a424f58e70432ac31c77507dfb7b9349a8"
|
||||
@ -2757,6 +2776,15 @@ cliui@^6.0.0:
|
||||
strip-ansi "^6.0.0"
|
||||
wrap-ansi "^6.2.0"
|
||||
|
||||
cliui@^7.0.2:
|
||||
version "7.0.4"
|
||||
resolved "https://registry.yarnpkg.com/cliui/-/cliui-7.0.4.tgz#a0265ee655476fc807aea9df3df8df7783808b4f"
|
||||
integrity sha512-OcRE68cOsVMXp1Yvonl/fzkQOyjLSu/8bhPDfQt0e0/Eb283TKP20Fs2MqoPsr9SwA595rRCA+QMzYc9nBP+JQ==
|
||||
dependencies:
|
||||
string-width "^4.2.0"
|
||||
strip-ansi "^6.0.0"
|
||||
wrap-ansi "^7.0.0"
|
||||
|
||||
cluster-key-slot@^1.1.0:
|
||||
version "1.1.0"
|
||||
resolved "https://registry.yarnpkg.com/cluster-key-slot/-/cluster-key-slot-1.1.0.tgz#30474b2a981fb12172695833052bc0d01336d10d"
|
||||
@ -3953,7 +3981,7 @@ get-caller-file@^1.0.2:
|
||||
resolved "https://registry.yarnpkg.com/get-caller-file/-/get-caller-file-1.0.3.tgz#f978fa4c90d1dfe7ff2d6beda2a515e713bdcf4a"
|
||||
integrity sha512-3t6rVToeoZfYSGd8YoLFR2DJkiQrIiUrGcjvFX2mDw3bn6k2OtwHN0TNCLbBO+w8qTvimhDkv+LSscbJY1vE6w==
|
||||
|
||||
get-caller-file@^2.0.1:
|
||||
get-caller-file@^2.0.1, get-caller-file@^2.0.5:
|
||||
version "2.0.5"
|
||||
resolved "https://registry.yarnpkg.com/get-caller-file/-/get-caller-file-2.0.5.tgz#4f94412a82db32f36e3b0b9741f8a97feb031f7e"
|
||||
integrity sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg==
|
||||
@ -4086,6 +4114,21 @@ graceful-fs@^4.1.11, graceful-fs@^4.1.2, graceful-fs@^4.2.4:
|
||||
resolved "https://registry.yarnpkg.com/graceful-fs/-/graceful-fs-4.2.4.tgz#2256bde14d3632958c465ebc96dc467ca07a29fb"
|
||||
integrity sha512-WjKPNJF79dtJAVniUlGGWHYGz2jWxT6VhN/4m1NdkbZ2nOsEF+cI1Edgql5zCRhs/VsQYRvrXctxktVXZUkixw==
|
||||
|
||||
graphile-worker@^0.11.0:
|
||||
version "0.11.0"
|
||||
resolved "https://registry.yarnpkg.com/graphile-worker/-/graphile-worker-0.11.0.tgz#eb9984440ebff76f0dd1d3d61c03664064ca255d"
|
||||
integrity sha512-t3GHGQnafZEWNgBoPaAs0FcR4n4V9uGIY+OlmgBtVl+3zHwC80yBcjW0/yMuCGhOvdiC7Yw9MYlPXcodlypGYA==
|
||||
dependencies:
|
||||
"@graphile/logger" "^0.2.0"
|
||||
"@types/debug" "^4.1.2"
|
||||
"@types/pg" "^7.14.3"
|
||||
chokidar "^3.4.0"
|
||||
cosmiconfig "^7.0.0"
|
||||
json5 "^2.1.3"
|
||||
pg ">=6.5 <9"
|
||||
tslib "^2.1.0"
|
||||
yargs "^16.2.0"
|
||||
|
||||
growly@^1.3.0:
|
||||
version "1.3.0"
|
||||
resolved "https://registry.yarnpkg.com/growly/-/growly-1.3.0.tgz#f10748cbe76af964b7c96c93c6bcc28af120c081"
|
||||
@ -5185,6 +5228,13 @@ json5@^1.0.1:
|
||||
dependencies:
|
||||
minimist "^1.2.0"
|
||||
|
||||
json5@^2.1.3:
|
||||
version "2.2.0"
|
||||
resolved "https://registry.yarnpkg.com/json5/-/json5-2.2.0.tgz#2dfefe720c6ba525d9ebd909950f0515316c89a3"
|
||||
integrity sha512-f+8cldu7X/y7RAJurMEJmdoKXGB/X550w2Nr3tTbezL6RwEE/iMcm+tZnXeoZtKuOq6ft8+CqzEkrIgx1fPoQA==
|
||||
dependencies:
|
||||
minimist "^1.2.5"
|
||||
|
||||
jsonwebtoken@^8.5.1:
|
||||
version "8.5.1"
|
||||
resolved "https://registry.yarnpkg.com/jsonwebtoken/-/jsonwebtoken-8.5.1.tgz#00e71e0b8df54c2121a1f26137df2280673bcc0d"
|
||||
@ -6249,6 +6299,11 @@ pg-connection-string@^2.4.0:
|
||||
resolved "https://registry.yarnpkg.com/pg-connection-string/-/pg-connection-string-2.4.0.tgz#c979922eb47832999a204da5dbe1ebf2341b6a10"
|
||||
integrity sha512-3iBXuv7XKvxeMrIgym7njT+HlZkwZqqGX4Bu9cci8xHZNT+Um1gWKqCsAzcC0d95rcKMU5WBg6YRUcHyV0HZKQ==
|
||||
|
||||
pg-connection-string@^2.5.0:
|
||||
version "2.5.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-connection-string/-/pg-connection-string-2.5.0.tgz#538cadd0f7e603fc09a12590f3b8a452c2c0cf34"
|
||||
integrity sha512-r5o/V/ORTA6TmUnyWZR9nCj1klXCO2CEKNRlVuJptZe85QuhFayC7WeMic7ndayT5IRIR0S0xFxFi2ousartlQ==
|
||||
|
||||
pg-int8@1.0.1:
|
||||
version "1.0.1"
|
||||
resolved "https://registry.yarnpkg.com/pg-int8/-/pg-int8-1.0.1.tgz#943bd463bf5b71b4170115f80f8efc9a0c0eb78c"
|
||||
@ -6259,12 +6314,22 @@ pg-pool@^3.2.2:
|
||||
resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.2.2.tgz#a560e433443ed4ad946b84d774b3f22452694dff"
|
||||
integrity sha512-ORJoFxAlmmros8igi608iVEbQNNZlp89diFVx6yV5v+ehmpMY9sK6QgpmgoXbmkNaBAx8cOOZh9g80kJv1ooyA==
|
||||
|
||||
pg-pool@^3.3.0:
|
||||
version "3.3.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-pool/-/pg-pool-3.3.0.tgz#12d5c7f65ea18a6e99ca9811bd18129071e562fc"
|
||||
integrity sha512-0O5huCql8/D6PIRFAlmccjphLYWC+JIzvUhSzXSpGaf+tjTZc4nn+Lr7mLXBbFJfvwbP0ywDv73EiaBsxn7zdg==
|
||||
|
||||
pg-protocol@^1.2.0, pg-protocol@^1.5.0:
|
||||
version "1.5.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.5.0.tgz#b5dd452257314565e2d54ab3c132adc46565a6a0"
|
||||
integrity sha512-muRttij7H8TqRNu/DxrAJQITO4Ac7RmX3Klyr/9mJEOBeIpgnF8f9jAfRz5d3XwQZl5qBjF9gLsUtMPJE0vezQ==
|
||||
|
||||
pg-protocol@^1.4.0:
|
||||
version "1.4.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-protocol/-/pg-protocol-1.4.0.tgz#43a71a92f6fe3ac559952555aa3335c8cb4908be"
|
||||
integrity sha512-El+aXWcwG/8wuFICMQjM5ZSAm6OWiJicFdNYo+VY3QP+8vI4SvLIWVe51PppTzMhikUJR+PsyIFKqfdXPz/yxA==
|
||||
|
||||
pg-types@^2.1.0:
|
||||
pg-types@^2.1.0, pg-types@^2.2.0:
|
||||
version "2.2.0"
|
||||
resolved "https://registry.yarnpkg.com/pg-types/-/pg-types-2.2.0.tgz#2d0250d636454f7cfa3b6ae0382fdfa8063254a3"
|
||||
integrity sha512-qTAAlrEsl8s4OiEQY69wDvcMIdQN6wdz5ojQiOy6YRMuynxenON0O5oCpJI6lshc6scgAY8qvJ2On/p+CXY0GA==
|
||||
@ -6275,6 +6340,19 @@ pg-types@^2.1.0:
|
||||
postgres-date "~1.0.4"
|
||||
postgres-interval "^1.1.0"
|
||||
|
||||
"pg@>=6.5 <9":
|
||||
version "8.6.0"
|
||||
resolved "https://registry.yarnpkg.com/pg/-/pg-8.6.0.tgz#e222296b0b079b280cce106ea991703335487db2"
|
||||
integrity sha512-qNS9u61lqljTDFvmk/N66EeGq3n6Ujzj0FFyNMGQr6XuEv4tgNTXvJQTfJdcvGit5p5/DWPu+wj920hAJFI+QQ==
|
||||
dependencies:
|
||||
buffer-writer "2.0.0"
|
||||
packet-reader "1.0.0"
|
||||
pg-connection-string "^2.5.0"
|
||||
pg-pool "^3.3.0"
|
||||
pg-protocol "^1.5.0"
|
||||
pg-types "^2.1.0"
|
||||
pgpass "1.x"
|
||||
|
||||
pg@^8.4.2:
|
||||
version "8.5.1"
|
||||
resolved "https://registry.yarnpkg.com/pg/-/pg-8.5.1.tgz#34dcb15f6db4a29c702bf5031ef2e1e25a06a120"
|
||||
@ -7771,6 +7849,11 @@ tslib@^1.8.1, tslib@^1.9.0, tslib@^1.9.3:
|
||||
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
|
||||
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==
|
||||
|
||||
tslib@^2.1.0:
|
||||
version "2.2.0"
|
||||
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.2.0.tgz#fb2c475977e35e241311ede2693cee1ec6698f5c"
|
||||
integrity sha512-gS9GVHRU+RGn5KQM2rllAlR3dU6m7AcpJKdtH8gFvQiC4Otgk98XnmMU+nZenHt/+VhnBPWwgrJsyrdcw6i23w==
|
||||
|
||||
tsutils@^3.17.1:
|
||||
version "3.17.1"
|
||||
resolved "https://registry.yarnpkg.com/tsutils/-/tsutils-3.17.1.tgz#ed719917f11ca0dee586272b2ac49e015a2dd759"
|
||||
@ -8089,6 +8172,15 @@ wrap-ansi@^6.2.0:
|
||||
string-width "^4.1.0"
|
||||
strip-ansi "^6.0.0"
|
||||
|
||||
wrap-ansi@^7.0.0:
|
||||
version "7.0.0"
|
||||
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43"
|
||||
integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==
|
||||
dependencies:
|
||||
ansi-styles "^4.0.0"
|
||||
string-width "^4.1.0"
|
||||
strip-ansi "^6.0.0"
|
||||
|
||||
wrappy@1:
|
||||
version "1.0.2"
|
||||
resolved "https://registry.yarnpkg.com/wrappy/-/wrappy-1.0.2.tgz#b5243d8f3ec1aa35f1364605bc0d1036e30ab69f"
|
||||
@ -8142,6 +8234,11 @@ y18n@^4.0.0:
|
||||
resolved "https://registry.yarnpkg.com/y18n/-/y18n-4.0.1.tgz#8db2b83c31c5d75099bb890b23f3094891e247d4"
|
||||
integrity sha512-wNcy4NvjMYL8gogWWYAO7ZFWFfHcbdbE57tZO8e4cbpj8tfUcwrwqSl3ad8HxpYWCdXcJUCeKKZS62Av1affwQ==
|
||||
|
||||
y18n@^5.0.5:
|
||||
version "5.0.8"
|
||||
resolved "https://registry.yarnpkg.com/y18n/-/y18n-5.0.8.tgz#7f4934d0f7ca8c56f95314939ddcd2dd91ce1d55"
|
||||
integrity sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==
|
||||
|
||||
yallist@^4.0.0:
|
||||
version "4.0.0"
|
||||
resolved "https://registry.yarnpkg.com/yallist/-/yallist-4.0.0.tgz#9bb92790d9c0effec63be73519e11a35019a3a72"
|
||||
@ -8165,6 +8262,11 @@ yargs-parser@^18.1.2:
|
||||
camelcase "^5.0.0"
|
||||
decamelize "^1.2.0"
|
||||
|
||||
yargs-parser@^20.2.2:
|
||||
version "20.2.7"
|
||||
resolved "https://registry.yarnpkg.com/yargs-parser/-/yargs-parser-20.2.7.tgz#61df85c113edfb5a7a4e36eb8aa60ef423cbc90a"
|
||||
integrity sha512-FiNkvbeHzB/syOjIUxFDCnhSfzAL8R5vs40MgLFBorXACCOAEaWu0gRZl14vG8MR9AOJIZbmkjhusqBYZ3HTHw==
|
||||
|
||||
yargs@^15.4.1:
|
||||
version "15.4.1"
|
||||
resolved "https://registry.yarnpkg.com/yargs/-/yargs-15.4.1.tgz#0d87a16de01aee9d8bec2bfbf74f67851730f4f8"
|
||||
@ -8182,6 +8284,19 @@ yargs@^15.4.1:
|
||||
y18n "^4.0.0"
|
||||
yargs-parser "^18.1.2"
|
||||
|
||||
yargs@^16.2.0:
|
||||
version "16.2.0"
|
||||
resolved "https://registry.yarnpkg.com/yargs/-/yargs-16.2.0.tgz#1c82bf0f6b6a66eafce7ef30e376f49a12477f66"
|
||||
integrity sha512-D1mvvtDG0L5ft/jGWkLpG1+m0eQxOfaBvTNELraWj22wSVUMWxZUvYgJYcKh6jGGIkJFhH4IZPQhR4TKpc8mBw==
|
||||
dependencies:
|
||||
cliui "^7.0.2"
|
||||
escalade "^3.1.1"
|
||||
get-caller-file "^2.0.5"
|
||||
require-directory "^2.1.1"
|
||||
string-width "^4.2.0"
|
||||
y18n "^5.0.5"
|
||||
yargs-parser "^20.2.2"
|
||||
|
||||
yn@3.1.1:
|
||||
version "3.1.1"
|
||||
resolved "https://registry.yarnpkg.com/yn/-/yn-3.1.1.tgz#1e87401a09d767c1d5eab26a6e4c185182d2eb50"
|
||||
|
Loading…
Reference in New Issue
Block a user