diff --git a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts index 47db9a4ad93..b4293e2f538 100644 --- a/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts +++ b/plugin-server/src/main/ingestion-queues/batch-processing/each-batch-ingestion.ts @@ -313,7 +313,10 @@ export function splitIngestionBatch( } const pluginEvent = formPipelineEvent(message) const eventKey = computeKey(pluginEvent) - if (overflowMode === IngestionOverflowMode.Reroute && !ConfiguredLimiter.consume(eventKey, 1)) { + if ( + overflowMode === IngestionOverflowMode.Reroute && + !ConfiguredLimiter.consume(eventKey, 1, message.timestamp) + ) { // Local overflow detection triggering, reroute to overflow topic too message.key = null ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc() diff --git a/plugin-server/src/utils/token-bucket.ts b/plugin-server/src/utils/token-bucket.ts index 30f9e1e846e..962383554c4 100644 --- a/plugin-server/src/utils/token-bucket.ts +++ b/plugin-server/src/utils/token-bucket.ts @@ -21,22 +21,20 @@ export class Storage { } replenish(key: string, now?: number): void { - if (typeof now === 'undefined') { - now = Date.now() - } - - if (this.buckets.has(key) === false) { - this.buckets.set(key, [this.bucketCapacity, now]) + const replenish_timestamp: number = now ?? Date.now() + const bucket = this.buckets.get(key) + if (bucket === undefined) { + this.buckets.set(key, [this.bucketCapacity, replenish_timestamp]) return } - // We have checked the key exists already, so this cannot be undefined - const bucket: Bucket = this.buckets.get(key)! - - // replenishRate is per second, but timestamps are in milliseconds - const replenishedTokens = this.replenishRate * ((now - bucket[1]) / 1000) + bucket[0] - bucket[0] = Math.min(replenishedTokens, this.bucketCapacity) - bucket[1] = now + // Replenish the bucket if replenish_timestamp is higher than lastReplenishedTimestamp + const secondsToReplenish = (replenish_timestamp - bucket[1]) / 1000 + if (secondsToReplenish > 0) { + bucket[0] += this.replenishRate * secondsToReplenish + bucket[0] = Math.min(bucket[0], this.bucketCapacity) + bucket[1] = replenish_timestamp + } } consume(key: string, tokens: number): boolean { diff --git a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts index cbb3d06e6a2..462677a9b46 100644 --- a/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/analytics-events-ingestion-consumer.test.ts @@ -4,6 +4,7 @@ import { IngestionOverflowMode, } from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion' import { ConfiguredLimiter } from '../../../src/utils/token-bucket' +import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' import { captureIngestionWarning } from './../../../src/worker/ingestion/utils' jest.mock('../../../src/utils/status') @@ -12,7 +13,6 @@ jest.mock('./../../../src/worker/ingestion/utils') jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({ runEventPipeline: jest.fn().mockResolvedValue('default value'), })) -import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner' const captureEndpointEvent = { uuid: 'uuid1', @@ -94,14 +94,16 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { }) it('reroutes excess events to OVERFLOW topic', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) + const now = Date.now() + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false) await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute) expect(consume).toHaveBeenCalledWith( captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 + 1, + now ) expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({ @@ -118,14 +120,16 @@ describe('eachBatchParallelIngestion with overflow reroute', () => { }) it('does not reroute if not over capacity limit', async () => { - const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent]) + const now = Date.now() + const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now) const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true) await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute) expect(consume).toHaveBeenCalledWith( captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'], - 1 + 1, + now ) expect(captureIngestionWarning).not.toHaveBeenCalled() expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled() diff --git a/plugin-server/tests/utils/token-bucket.test.ts b/plugin-server/tests/utils/token-bucket.test.ts index 0b1fe853436..8b12ccdf29d 100644 --- a/plugin-server/tests/utils/token-bucket.test.ts +++ b/plugin-server/tests/utils/token-bucket.test.ts @@ -59,6 +59,10 @@ describe('Storage', () => { expect(storage.buckets.get(key)![0]).toEqual(10) expect(storage.buckets.get(key)![1]).toEqual(now.valueOf()) + // get two tokens to be replenished + storage.consume(key, 2) + expect(storage.buckets.get(key)![0]).toEqual(8) + // 20 seconds would exceed capacity of 10 tokens at 1 token/sec. storage.replenish(key, now.valueOf() + 20000) @@ -66,6 +70,27 @@ describe('Storage', () => { expect(storage.buckets.get(key)![0]).toEqual(10) expect(storage.buckets.get(key)![1]).toEqual(now.valueOf() + 20000) }) + + it('does not add if now is in the past', () => { + const key = 'test' + const storage = new Storage(10, 1) + const now = new Date('2023-02-08T08:00:00') + + storage.replenish(key, now.valueOf()) + expect(storage.buckets.get(key)![0]).toEqual(10) + expect(storage.buckets.get(key)![1]).toEqual(now.valueOf()) + + // get two tokens to be replenished + storage.consume(key, 2) + expect(storage.buckets.get(key)![0]).toEqual(8) + + // Will be a no-op due to a lower now value + storage.replenish(key, now.valueOf() - 20000) + + expect(storage.buckets.has(key)).toEqual(true) + expect(storage.buckets.get(key)![0]).toEqual(8) + expect(storage.buckets.get(key)![1]).toEqual(now.valueOf()) + }) }) describe('consume()', () => {