mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-29 03:04:16 +01:00
feat(ingestion): use kafka message ts when rate-limiting to overflow (#18119)
This commit is contained in:
parent
7ff535e3c9
commit
9179e65f88
@ -313,7 +313,10 @@ export function splitIngestionBatch(
|
||||
}
|
||||
const pluginEvent = formPipelineEvent(message)
|
||||
const eventKey = computeKey(pluginEvent)
|
||||
if (overflowMode === IngestionOverflowMode.Reroute && !ConfiguredLimiter.consume(eventKey, 1)) {
|
||||
if (
|
||||
overflowMode === IngestionOverflowMode.Reroute &&
|
||||
!ConfiguredLimiter.consume(eventKey, 1, message.timestamp)
|
||||
) {
|
||||
// Local overflow detection triggering, reroute to overflow topic too
|
||||
message.key = null
|
||||
ingestionPartitionKeyOverflowed.labels(`${pluginEvent.team_id ?? pluginEvent.token}`).inc()
|
||||
|
@ -21,22 +21,20 @@ export class Storage {
|
||||
}
|
||||
|
||||
replenish(key: string, now?: number): void {
|
||||
if (typeof now === 'undefined') {
|
||||
now = Date.now()
|
||||
}
|
||||
|
||||
if (this.buckets.has(key) === false) {
|
||||
this.buckets.set(key, [this.bucketCapacity, now])
|
||||
const replenish_timestamp: number = now ?? Date.now()
|
||||
const bucket = this.buckets.get(key)
|
||||
if (bucket === undefined) {
|
||||
this.buckets.set(key, [this.bucketCapacity, replenish_timestamp])
|
||||
return
|
||||
}
|
||||
|
||||
// We have checked the key exists already, so this cannot be undefined
|
||||
const bucket: Bucket = this.buckets.get(key)!
|
||||
|
||||
// replenishRate is per second, but timestamps are in milliseconds
|
||||
const replenishedTokens = this.replenishRate * ((now - bucket[1]) / 1000) + bucket[0]
|
||||
bucket[0] = Math.min(replenishedTokens, this.bucketCapacity)
|
||||
bucket[1] = now
|
||||
// Replenish the bucket if replenish_timestamp is higher than lastReplenishedTimestamp
|
||||
const secondsToReplenish = (replenish_timestamp - bucket[1]) / 1000
|
||||
if (secondsToReplenish > 0) {
|
||||
bucket[0] += this.replenishRate * secondsToReplenish
|
||||
bucket[0] = Math.min(bucket[0], this.bucketCapacity)
|
||||
bucket[1] = replenish_timestamp
|
||||
}
|
||||
}
|
||||
|
||||
consume(key: string, tokens: number): boolean {
|
||||
|
@ -4,6 +4,7 @@ import {
|
||||
IngestionOverflowMode,
|
||||
} from '../../../src/main/ingestion-queues/batch-processing/each-batch-ingestion'
|
||||
import { ConfiguredLimiter } from '../../../src/utils/token-bucket'
|
||||
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'
|
||||
import { captureIngestionWarning } from './../../../src/worker/ingestion/utils'
|
||||
|
||||
jest.mock('../../../src/utils/status')
|
||||
@ -12,7 +13,6 @@ jest.mock('./../../../src/worker/ingestion/utils')
|
||||
jest.mock('./../../../src/worker/ingestion/event-pipeline/runner', () => ({
|
||||
runEventPipeline: jest.fn().mockResolvedValue('default value'),
|
||||
}))
|
||||
import { runEventPipeline } from './../../../src/worker/ingestion/event-pipeline/runner'
|
||||
|
||||
const captureEndpointEvent = {
|
||||
uuid: 'uuid1',
|
||||
@ -94,14 +94,16 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
|
||||
})
|
||||
|
||||
it('reroutes excess events to OVERFLOW topic', async () => {
|
||||
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
|
||||
const now = Date.now()
|
||||
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
|
||||
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => false)
|
||||
|
||||
await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute)
|
||||
|
||||
expect(consume).toHaveBeenCalledWith(
|
||||
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
|
||||
1
|
||||
1,
|
||||
now
|
||||
)
|
||||
expect(captureIngestionWarning).not.toHaveBeenCalled()
|
||||
expect(queue.pluginsServer.kafkaProducer.produce).toHaveBeenCalledWith({
|
||||
@ -118,14 +120,16 @@ describe('eachBatchParallelIngestion with overflow reroute', () => {
|
||||
})
|
||||
|
||||
it('does not reroute if not over capacity limit', async () => {
|
||||
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent])
|
||||
const now = Date.now()
|
||||
const batch = createBatchWithMultipleEventsWithKeys([captureEndpointEvent], now)
|
||||
const consume = jest.spyOn(ConfiguredLimiter, 'consume').mockImplementation(() => true)
|
||||
|
||||
await eachBatchParallelIngestion(batch, queue, IngestionOverflowMode.Reroute)
|
||||
|
||||
expect(consume).toHaveBeenCalledWith(
|
||||
captureEndpointEvent['team_id'] + ':' + captureEndpointEvent['distinct_id'],
|
||||
1
|
||||
1,
|
||||
now
|
||||
)
|
||||
expect(captureIngestionWarning).not.toHaveBeenCalled()
|
||||
expect(queue.pluginsServer.kafkaProducer.produce).not.toHaveBeenCalled()
|
||||
|
@ -59,6 +59,10 @@ describe('Storage', () => {
|
||||
expect(storage.buckets.get(key)![0]).toEqual(10)
|
||||
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf())
|
||||
|
||||
// get two tokens to be replenished
|
||||
storage.consume(key, 2)
|
||||
expect(storage.buckets.get(key)![0]).toEqual(8)
|
||||
|
||||
// 20 seconds would exceed capacity of 10 tokens at 1 token/sec.
|
||||
storage.replenish(key, now.valueOf() + 20000)
|
||||
|
||||
@ -66,6 +70,27 @@ describe('Storage', () => {
|
||||
expect(storage.buckets.get(key)![0]).toEqual(10)
|
||||
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf() + 20000)
|
||||
})
|
||||
|
||||
it('does not add if now is in the past', () => {
|
||||
const key = 'test'
|
||||
const storage = new Storage(10, 1)
|
||||
const now = new Date('2023-02-08T08:00:00')
|
||||
|
||||
storage.replenish(key, now.valueOf())
|
||||
expect(storage.buckets.get(key)![0]).toEqual(10)
|
||||
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf())
|
||||
|
||||
// get two tokens to be replenished
|
||||
storage.consume(key, 2)
|
||||
expect(storage.buckets.get(key)![0]).toEqual(8)
|
||||
|
||||
// Will be a no-op due to a lower now value
|
||||
storage.replenish(key, now.valueOf() - 20000)
|
||||
|
||||
expect(storage.buckets.has(key)).toEqual(true)
|
||||
expect(storage.buckets.get(key)![0]).toEqual(8)
|
||||
expect(storage.buckets.get(key)![1]).toEqual(now.valueOf())
|
||||
})
|
||||
})
|
||||
|
||||
describe('consume()', () => {
|
||||
|
Loading…
Reference in New Issue
Block a user