0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 12:21:02 +01:00

fix(exports): hopefully fix onevent stalling by catching job producing errors (#17087)

This commit is contained in:
Xavier Vello 2023-08-18 14:26:29 +02:00 committed by GitHub
parent 9ff3e95aca
commit 938f45beb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1,7 +1,9 @@
import { Plugin, PluginEvent, PluginMeta, ProcessedPluginEvent, RetryError } from '@posthog/plugin-scaffold'
import * as Sentry from '@sentry/node'
import { Counter } from 'prom-client'
import { Hub, PluginConfig, PluginConfigVMInternalResponse, PluginTaskType } from '../../../types'
import { MessageSizeTooLarge } from '../../../utils/db/error'
import { isTestEnv } from '../../../utils/env-utils'
import { status } from '../../../utils/status'
import { stringClamp } from '../../../utils/utils'
@ -9,7 +11,7 @@ import { ExportEventsBuffer } from './utils/export-events-buffer'
export const MAXIMUM_RETRIES = 3
const EXPORT_BUFFER_BYTES_MINIMUM = 1
const EXPORT_BUFFER_BYTES_DEFAULT = 1024 * 1024
const EXPORT_BUFFER_BYTES_DEFAULT = 900 * 1024 // 900 KiB
const EXPORT_BUFFER_BYTES_MAXIMUM = 100 * 1024 * 1024
const EXPORT_BUFFER_SECONDS_MINIMUM = 1
const EXPORT_BUFFER_SECONDS_MAXIMUM = 600
@ -110,10 +112,30 @@ export function upgradeExportEvents(
if (err instanceof RetryError) {
if (payload.retriesPerformedSoFar < MAXIMUM_RETRIES) {
const nextRetrySeconds = 2 ** (payload.retriesPerformedSoFar + 1) * 3
await meta.jobs
.exportEventsWithRetry({ ...payload, retriesPerformedSoFar: payload.retriesPerformedSoFar + 1 })
.runIn(nextRetrySeconds, 'seconds')
try {
await meta.jobs
.exportEventsWithRetry({
...payload,
retriesPerformedSoFar: payload.retriesPerformedSoFar + 1,
})
.runIn(nextRetrySeconds, 'seconds')
} catch (error) {
if (error instanceof MessageSizeTooLarge) {
Sentry.captureException(error, {
tags: {
team_id: pluginConfig.team_id,
plugin_config_id: pluginConfig.id,
},
})
status.error('💥', 'Export buffer too big to fit in a job, dropping it.', {
error,
team_id: pluginConfig.team_id,
plugin_config_id: pluginConfig.id,
})
} else {
throw error
}
}
status.info(
'🚃',
`Enqueued PluginConfig ${pluginConfig.id} (plugin=${pluginConfig.plugin_id}, team=${