From c0d30028edbc53e61d2b16737e5621d2b5360942 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 10:19:31 +0000 Subject: [PATCH] shared debounce key for person state --- .../src/worker/ingestion/person-state.ts | 49 +++++++++++++------ plugin-server/src/worker/ingestion/utils.ts | 13 +++-- 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/plugin-server/src/worker/ingestion/person-state.ts b/plugin-server/src/worker/ingestion/person-state.ts index 53daf093ef9..61b0c42ce36 100644 --- a/plugin-server/src/worker/ingestion/person-state.ts +++ b/plugin-server/src/worker/ingestion/person-state.ts @@ -323,19 +323,31 @@ export class PersonState { return undefined } if (isDistinctIdIllegal(mergeIntoDistinctId)) { - await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', { - illegalDistinctId: mergeIntoDistinctId, - otherDistinctId: otherPersonDistinctId, - eventUuid: this.event.uuid, - }) + await captureIngestionWarning( + this.db.kafkaProducer, + teamId, + 'cannot_merge_with_illegal_distinct_id', + { + illegalDistinctId: mergeIntoDistinctId, + otherDistinctId: otherPersonDistinctId, + eventUuid: this.event.uuid, + }, + (teamId) => `${teamId}:mergeIntoDistinctId` + ) return undefined } if (isDistinctIdIllegal(otherPersonDistinctId)) { - await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', { - illegalDistinctId: otherPersonDistinctId, - otherDistinctId: mergeIntoDistinctId, - eventUuid: this.event.uuid, - }) + await captureIngestionWarning( + this.db.kafkaProducer, + teamId, + 'cannot_merge_with_illegal_distinct_id', + { + illegalDistinctId: otherPersonDistinctId, + otherDistinctId: mergeIntoDistinctId, + eventUuid: this.event.uuid, + }, + (teamId) => `${teamId}:mergeIntoDistinctId` + ) return undefined } return promiseRetry( @@ -403,12 +415,17 @@ export class PersonState { // If merge isn't allowed, we will ignore it, log an ingestion warning and exit if (!mergeAllowed) { - // TODO: add event UUID to the ingestion warning - await captureIngestionWarning(this.db.kafkaProducer, this.teamId, 'cannot_merge_already_identified', { - sourcePersonDistinctId: otherPersonDistinctId, - targetPersonDistinctId: mergeIntoDistinctId, - eventUuid: this.event.uuid, - }) + await captureIngestionWarning( + this.db.kafkaProducer, + this.teamId, + 'cannot_merge_already_identified', + { + sourcePersonDistinctId: otherPersonDistinctId, + targetPersonDistinctId: mergeIntoDistinctId, + eventUuid: this.event.uuid, + }, + (teamId) => `${teamId}:mergeIntoDistinctId` + ) status.warn('🤔', 'refused to merge an already identified user via an $identify or $create_alias call') return mergeInto // We're returning the original person tied to distinct_id used for the event } diff --git a/plugin-server/src/worker/ingestion/utils.ts b/plugin-server/src/worker/ingestion/utils.ts index 0ac139dfb13..66b7935400b 100644 --- a/plugin-server/src/worker/ingestion/utils.ts +++ b/plugin-server/src/worker/ingestion/utils.ts @@ -67,10 +67,17 @@ export async function captureIngestionWarning( teamId: TeamId, type: string, details: Record, - debounce_key?: string + /** + * Optional key to debounce the warning. If not provided, the teamId and type will be used as the key. + * If a function is provided, it will be called with the teamId and type and should return a string key. + * If a string is provided it is combined with the teamId and type. + */ + debounceKey?: string | ((teamId: number, type: string) => string) ) { - const limiter_key = `${teamId}:${type}:${debounce_key}` - if (IngestionWarningLimiter.consume(limiter_key, 1)) { + const limiterKey = + typeof debounceKey === 'function' ? debounceKey(teamId, type) : `${teamId}:${type}:${debounceKey}` + + if (IngestionWarningLimiter.consume(limiterKey, 1)) { await kafkaProducer.queueMessage({ topic: KAFKA_INGESTION_WARNINGS, messages: [