0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-29 03:04:16 +01:00

shared debounce key for person state

This commit is contained in:
Paul D'Ambra 2024-03-19 10:19:31 +00:00
parent 310a98cadc
commit c0d30028ed
No known key found for this signature in database
2 changed files with 43 additions and 19 deletions

View File

@ -323,19 +323,31 @@ export class PersonState {
return undefined
}
if (isDistinctIdIllegal(mergeIntoDistinctId)) {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: mergeIntoDistinctId,
otherDistinctId: otherPersonDistinctId,
eventUuid: this.event.uuid,
},
(teamId) => `${teamId}:mergeIntoDistinctId`
)
return undefined
}
if (isDistinctIdIllegal(otherPersonDistinctId)) {
await captureIngestionWarning(this.db.kafkaProducer, teamId, 'cannot_merge_with_illegal_distinct_id', {
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
teamId,
'cannot_merge_with_illegal_distinct_id',
{
illegalDistinctId: otherPersonDistinctId,
otherDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
(teamId) => `${teamId}:mergeIntoDistinctId`
)
return undefined
}
return promiseRetry(
@ -403,12 +415,17 @@ export class PersonState {
// If merge isn't allowed, we will ignore it, log an ingestion warning and exit
if (!mergeAllowed) {
// TODO: add event UUID to the ingestion warning
await captureIngestionWarning(this.db.kafkaProducer, this.teamId, 'cannot_merge_already_identified', {
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
})
await captureIngestionWarning(
this.db.kafkaProducer,
this.teamId,
'cannot_merge_already_identified',
{
sourcePersonDistinctId: otherPersonDistinctId,
targetPersonDistinctId: mergeIntoDistinctId,
eventUuid: this.event.uuid,
},
(teamId) => `${teamId}:mergeIntoDistinctId`
)
status.warn('🤔', 'refused to merge an already identified user via an $identify or $create_alias call')
return mergeInto // We're returning the original person tied to distinct_id used for the event
}

View File

@ -67,10 +67,17 @@ export async function captureIngestionWarning(
teamId: TeamId,
type: string,
details: Record<string, any>,
debounce_key?: string
/**
* Optional key to debounce the warning. If not provided, the teamId and type will be used as the key.
* If a function is provided, it will be called with the teamId and type and should return a string key.
* If a string is provided it is combined with the teamId and type.
*/
debounceKey?: string | ((teamId: number, type: string) => string)
) {
const limiter_key = `${teamId}:${type}:${debounce_key}`
if (IngestionWarningLimiter.consume(limiter_key, 1)) {
const limiterKey =
typeof debounceKey === 'function' ? debounceKey(teamId, type) : `${teamId}:${type}:${debounceKey}`
if (IngestionWarningLimiter.consume(limiterKey, 1)) {
await kafkaProducer.queueMessage({
topic: KAFKA_INGESTION_WARNINGS,
messages: [