0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

chore(plugin-server): use librdkafka producer everywhere (#15314)

* chore(plugin-server): use librdkafka producer everywhere

We say some 10x improvements in the throughput for session recordings.
Hopefully there will be more improvements here as well, although it's a
little less clear cut.

I don't try to provide any improvements in guarantees around message
production here.

* we still need to enable snappy for kafkajs
This commit is contained in:
Harry Waye 2023-05-04 14:02:44 +01:00 committed by GitHub
parent 6c68566ca2
commit 2f9e2928fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 184 additions and 662 deletions

View File

@ -275,8 +275,8 @@
"prettier --write"
],
"(plugin-server/**).{js,jsx,mjs,ts,tsx}": [
"eslint -c plugin-server/.eslintrc.js --fix",
"prettier --write"
"pnpm --dir plugin-server exec eslint --fix",
"pnpm --dir plugin-server exec prettier --write"
],
"!(posthog/hogql/grammar/*)*.{py,pyi}": [
"black",

View File

@ -3,6 +3,7 @@ import SnappyCodec from 'kafkajs-snappy'
import { HighLevelProducer } from 'node-rdkafka-acosom'
import { defaultConfig } from '../src/config/config'
import { produce as defaultProduce } from '../src/kafka/producer'
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
@ -23,6 +24,7 @@ afterAll(async () => {
export async function createKafkaProducer() {
producer = new HighLevelProducer({
'metadata.broker.list': defaultConfig.KAFKA_HOSTS,
'linger.ms': 0,
})
await new Promise((resolve, reject) =>
@ -36,22 +38,5 @@ export async function createKafkaProducer() {
export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) {
producer = producer ?? (await createKafkaProducer())
await new Promise((resolve, reject) =>
producer.produce(topic, undefined, message, Buffer.from(key), Date.now(), (err, offset) => {
if (err) {
reject(err)
} else {
resolve(offset)
}
})
)
await new Promise((resolve, reject) =>
producer.flush(10000, (err) => {
if (err) {
reject(err)
} else {
resolve(null)
}
})
)
await defaultProduce({ producer, topic, value: message, key: Buffer.from(key) })
}

View File

@ -176,7 +176,7 @@ test.concurrent(`recording events not ingested to ClickHouse if team is opted ou
})
// NOTE: we're assuming that we have a single partition for the Kafka topic,
// and that the consumer produces messages in the order they are consumed.
// and that the consumer produceAndFlushs messages in the order they are consumed.
// TODO: add some side-effect we can assert on rather than relying on the
// partitioning / ordering setup e.g. an ingestion warning.
const events = await fetchSessionRecordingsEvents(teamOptedOutId, uuidOptedOut)

View File

@ -59,12 +59,17 @@ export const createKafkaProducer = async (config: ProducerGlobalConfig) => {
return producer
}
export const produce = async (
producer: RdKafkaProducer,
topic: string,
value: Buffer | null,
export const produce = async ({
producer,
topic,
value,
key,
}: {
producer: RdKafkaProducer
topic: string
value: Buffer | null
key: Buffer | null
): Promise<number | null | undefined> => {
}): Promise<number | null | undefined> => {
status.debug('📤', 'Producing message', { topic: topic })
return await new Promise((resolve, reject) =>
producer.produce(topic, null, value, key, Date.now(), (error: any, offset: NumberNullUndefined) => {

View File

@ -52,7 +52,7 @@ export async function runScheduledTasks(
if (server.USE_KAFKA_FOR_SCHEDULED_TASKS) {
for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) {
status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId })
await server.kafkaProducer.producer.send({
await server.kafkaProducer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS,
messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }],
})

View File

@ -1,6 +1,7 @@
import { StatsD } from 'hot-shots'
import { EachBatchHandler, Kafka, Producer } from 'kafkajs'
import { EachBatchHandler, Kafka } from 'kafkajs'
import { Counter } from 'prom-client'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'
import { KAFKA_JOBS, KAFKA_JOBS_DLQ } from '../../config/kafka-topics'
import { EnqueuedPluginJob, JobName } from '../../types'
@ -26,7 +27,7 @@ export const startJobsConsumer = async ({
statsd,
}: {
kafka: Kafka
producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic
producer: KafkaProducerWrapper
graphileWorker: GraphileWorker
statsd?: StatsD
}) => {
@ -49,7 +50,10 @@ export const startJobsConsumer = async ({
value: message.value,
})
// TODO: handle resolving offsets asynchronously
await producer.send({ topic: KAFKA_JOBS_DLQ, messages: [message] })
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
resolveOffset(message.offset)
continue
}
@ -63,7 +67,10 @@ export const startJobsConsumer = async ({
error,
})
// TODO: handle resolving offsets asynchronously
await producer.send({ topic: KAFKA_JOBS_DLQ, messages: [message] })
await producer.queueMessage({
topic: KAFKA_JOBS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
resolveOffset(message.offset)
continue
}
@ -111,5 +118,10 @@ export const startJobsConsumer = async ({
},
})
return consumer
return {
...consumer,
stop: async () => {
await consumer.stop()
},
}
}

View File

@ -1,5 +1,6 @@
import { StatsD } from 'hot-shots'
import { Batch, EachBatchHandler, Kafka, Producer } from 'kafkajs'
import { Batch, EachBatchHandler, Kafka } from 'kafkajs'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'
import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics'
import { DependencyUnavailableError } from '../../utils/db/error'
@ -15,14 +16,14 @@ const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const
export const startScheduledTasksConsumer = async ({
kafka,
piscina,
producer,
piscina,
partitionConcurrency = 3,
statsd,
}: {
kafka: Kafka
producer: KafkaProducerWrapper
piscina: Piscina
producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic
partitionConcurrency: number
statsd?: StatsD
}) => {
@ -134,10 +135,15 @@ export const startScheduledTasksConsumer = async ({
},
})
return consumer
return {
...consumer,
stop: async () => {
await consumer.stop()
},
}
}
const getTasksFromBatch = async (batch: Batch, producer: Producer) => {
const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) => {
// In any one batch, we only want to run one task per plugin config id.
// Hence here we dedupe the tasks by plugin config id and task type.
const tasksbyTypeAndPluginConfigId = {} as Record<
@ -153,7 +159,10 @@ const getTasksFromBatch = async (batch: Batch, producer: Producer) => {
status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, {
value: message.value,
})
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
continue
}
@ -168,13 +177,19 @@ const getTasksFromBatch = async (batch: Batch, producer: Producer) => {
status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, {
error: error.stack ?? error,
})
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
continue
}
if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) {
status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task)
await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] })
await producer.queueMessage({
topic: KAFKA_SCHEDULED_TASKS_DLQ,
messages: [{ value: message.value, key: message.key }],
})
continue
}

View File

@ -164,12 +164,12 @@ const eachMessage =
partition: message.partition,
})
return [
produce(
produce({
producer,
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
message.value,
message.key ? Buffer.from(message.key) : null
),
topic: KAFKA_SESSION_RECORDING_EVENTS_DLQ,
value: message.value,
key: message.key ? Buffer.from(message.key) : null,
}),
]
}
@ -191,12 +191,12 @@ const eachMessage =
partition: message.partition,
})
return [
produce(
produce({
producer,
KAFKA_SESSION_RECORDING_EVENTS_DLQ,
message.value,
message.key ? Buffer.from(message.key) : null
),
topic: KAFKA_SESSION_RECORDING_EVENTS_DLQ,
value: message.value,
key: message.key ? Buffer.from(message.key) : null,
}),
]
}
@ -253,12 +253,12 @@ const eachMessage =
)
return [
produce(
produce({
producer,
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
Buffer.from(JSON.stringify(clickHouseRecord)),
message.key ? Buffer.from(message.key) : null
),
topic: KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS,
value: Buffer.from(JSON.stringify(clickHouseRecord)),
key: message.key ? Buffer.from(message.key) : null,
}),
]
} else if (event.event === '$performance_event') {
const clickHouseRecord = createPerformanceEvent(
@ -269,12 +269,12 @@ const eachMessage =
)
return [
produce(
produce({
producer,
KAFKA_PERFORMANCE_EVENTS,
Buffer.from(JSON.stringify(clickHouseRecord)),
message.key ? Buffer.from(message.key) : null
),
topic: KAFKA_PERFORMANCE_EVENTS,
value: Buffer.from(JSON.stringify(clickHouseRecord)),
key: message.key ? Buffer.from(message.key) : null,
}),
]
} else {
status.warn('⚠️', 'invalid_message', {

View File

@ -1,6 +1,9 @@
import * as Sentry from '@sentry/node'
import { Server } from 'http'
import { Consumer, KafkaJSProtocolError } from 'kafkajs'
import { CompressionCodecs, CompressionTypes } from 'kafkajs'
// @ts-expect-error no type definitions
import SnappyCodec from 'kafkajs-snappy'
import * as schedule from 'node-schedule'
import { Counter } from 'prom-client'
@ -32,6 +35,8 @@ import { startSessionRecordingEventsConsumer } from './ingestion-queues/session-
import { createHttpServer } from './services/http-server'
import { getObjectStorage } from './services/object_storage'
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
const { version } = require('../../package.json')
// TODO: refactor this into a class, removing the need for many different Servers
@ -242,8 +247,8 @@ export async function startPluginsServer(
if (capabilities.pluginScheduledTasks) {
schedulerTasksConsumer = await startScheduledTasksConsumer({
piscina: piscina,
producer: hub.kafkaProducer,
kafka: hub.kafka,
producer: hub.kafkaProducer.producer,
partitionConcurrency: serverConfig.KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY,
statsd: hub.statsd,
})
@ -252,7 +257,7 @@ export async function startPluginsServer(
if (capabilities.processPluginJobs) {
jobsConsumer = await startJobsConsumer({
kafka: hub.kafka,
producer: hub.kafkaProducer.producer,
producer: hub.kafkaProducer,
graphileWorker: graphileWorker,
statsd: hub.statsd,
})

View File

@ -4,7 +4,7 @@ import * as fs from 'fs'
import { createPool } from 'generic-pool'
import { StatsD } from 'hot-shots'
import Redis from 'ioredis'
import { Kafka, KafkaJSError, Partitioners, SASLOptions } from 'kafkajs'
import { Kafka, SASLOptions } from 'kafkajs'
import { DateTime } from 'luxon'
import { hostname } from 'os'
import * as path from 'path'
@ -15,6 +15,8 @@ import { getPluginServerCapabilities } from '../../capabilities'
import { defaultConfig } from '../../config/config'
import { KAFKAJS_LOG_LEVEL_MAPPING } from '../../config/constants'
import { KAFKA_JOBS } from '../../config/kafka-topics'
import { createRdConnectionConfigFromEnvVars } from '../../kafka/config'
import { createKafkaProducer } from '../../kafka/producer'
import { getObjectStorage } from '../../main/services/object_storage'
import {
EnqueuedPluginJob,
@ -39,7 +41,6 @@ import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-k
import { RootAccessManager } from './../../worker/vm/extensions/helpers/root-acess-manager'
import { PromiseManager } from './../../worker/vm/promise-manager'
import { DB } from './db'
import { DependencyUnavailableError } from './error'
import { KafkaProducerWrapper } from './kafka-producer-wrapper'
// `node-postgres` would return dates as plain JS Date objects, which would use the local timezone.
@ -130,13 +131,10 @@ export async function createHub(
const kafka = createKafkaClient(serverConfig as KafkaConfig)
const producer = kafka.producer({
retry: { retries: 10, initialRetryTime: 1000, maxRetryTime: 30 },
createPartitioner: Partitioners.LegacyPartitioner,
})
await producer.connect()
const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig as KafkaConfig)
const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 })
const kafkaProducer = new KafkaProducerWrapper(producer, statsd, serverConfig)
const kafkaProducer = new KafkaProducerWrapper(producer)
status.info('👍', `Kafka ready`)
status.info('🤔', `Connecting to Postgresql...`)
@ -194,31 +192,15 @@ export async function createHub(
// an acknowledgement as for instance there are some jobs that are
// chained, and if we do not manage to produce then the chain will be
// broken.
try {
await kafkaProducer.producer.send({
topic: KAFKA_JOBS,
messages: [
{
key: job.pluginConfigTeam.toString(),
value: JSON.stringify(job),
},
],
})
} catch (error) {
if (error instanceof KafkaJSError) {
// If we get a retriable Kafka error (maybe it's down for
// example), rethrow the error as a generic `DependencyUnavailableError`
// passing through retriable such that we can decide if this is
// something we should retry at the consumer level.
if (error.retriable) {
throw new DependencyUnavailableError(error.message, 'Kafka', error)
}
}
// Otherwise, just rethrow the error as is. E.g. if we fail to
// serialize then we don't want to retry.
throw error
}
await kafkaProducer.queueMessage({
topic: KAFKA_JOBS,
messages: [
{
value: Buffer.from(JSON.stringify(job)),
key: Buffer.from(job.pluginConfigTeam.toString()),
},
],
})
}
const hub: Partial<Hub> = {

View File

@ -1,111 +1,55 @@
import * as Sentry from '@sentry/node'
import { StatsD } from 'hot-shots'
import {
CompressionCodecs,
CompressionTypes,
KafkaJSError,
KafkaJSNumberOfRetriesExceeded,
Message,
Producer,
ProducerRecord,
} from 'kafkajs'
// @ts-expect-error no type definitions
import SnappyCodec from 'kafkajs-snappy'
import { Message, ProducerRecord } from 'kafkajs'
import { HighLevelProducer, LibrdKafkaError } from 'node-rdkafka-acosom'
import { runInSpan } from '../../sentry'
import { PluginsServerConfig } from '../../types'
import { instrument } from '../metrics'
import { status } from '../status'
import { disconnectProducer, flushProducer, produce } from '../../kafka/producer'
import { status } from '../../utils/status'
import { DependencyUnavailableError } from './error'
import { timeoutGuard } from './utils'
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
/** This class wraps kafkajs producer, adding batching to optimize performance.
/** This class is a wrapper around the rdkafka producer, and does very little.
* It used to be a wrapper around KafkaJS, but we switched to rdkafka because of
* increased performance.
*
* As messages get queued, we flush the queue in the following cases.
* The big difference between this and the original is that we return a promise from
* queueMessage, which will only resolve once we get an ack that the message has
* been persisted to Kafka. So we should get stronger guarantees on processing.
*
* 1. Message size + current batch exceeds max batch message size
* 2. Too much time passed
* 3. Too many messages queued.
*
* We also flush the queue regularly to avoid dropping any messages as the program quits.
* TODO: refactor Kafka producer usage to use rdkafka directly.
*/
export class KafkaProducerWrapper {
/** StatsD instance used to do instrumentation */
private statsd: StatsD | undefined
/** Kafka producer used for syncing Postgres and ClickHouse person data. */
producer: Producer
public producer: HighLevelProducer
lastFlushTime: number
currentBatch: Array<ProducerRecord>
currentBatchSize: number
flushFrequencyMs: number
maxQueueSize: number
maxBatchSize: number
flushInterval: NodeJS.Timeout | undefined
constructor(producer: Producer, statsd: StatsD | undefined, serverConfig: PluginsServerConfig) {
constructor(producer: HighLevelProducer) {
this.producer = producer
this.statsd = statsd
this.lastFlushTime = Date.now()
this.currentBatch = []
this.currentBatchSize = 0
this.flushFrequencyMs = serverConfig.KAFKA_FLUSH_FREQUENCY_MS
this.maxQueueSize = serverConfig.KAFKA_PRODUCER_MAX_QUEUE_SIZE
this.maxBatchSize = serverConfig.KAFKA_MAX_MESSAGE_BATCH_SIZE
if (this.flushFrequencyMs > 0) {
// If flush frequency is set, we flush the queue at intervals. We
// allow disabling this to avoid out of band flushing occuring for
// which we would not be able to explicitly handle.
this.flushInterval = setInterval(async () => {
// :TRICKY: Swallow uncaught errors from flush as flush is already doing custom error reporting which would get lost.
try {
await this.flush()
} catch (err) {}
}, this.flushFrequencyMs)
}
}
queueMessage(kafkaMessage: ProducerRecord): Promise<void> {
return runInSpan(
{
op: 'kafka.queueMessage',
description: kafkaMessage.topic,
},
async () => {
const messageSize = this.estimateMessageSize(kafkaMessage)
async queueMessage(kafkaMessage: ProducerRecord) {
try {
return await Promise.all(
kafkaMessage.messages.map((message) =>
produce({
producer: this.producer,
topic: kafkaMessage.topic,
key: message.key ? Buffer.from(message.key) : null,
value: message.value ? Buffer.from(message.value) : null,
})
)
)
} catch (error) {
status.error('⚠️', 'kafka_produce_error', { error: error, topic: kafkaMessage.topic })
if (this.currentBatch.length > 0 && this.currentBatchSize + messageSize > this.maxBatchSize) {
// :TRICKY: We want to first flush then immediately add the message to the queue. Awaiting and then pushing would result in a race condition.
await this.flush(kafkaMessage)
} else {
this.currentBatch.push(kafkaMessage)
this.currentBatchSize += messageSize
const timeSinceLastFlush = Date.now() - this.lastFlushTime
if (
this.currentBatchSize > this.maxBatchSize ||
timeSinceLastFlush > this.flushFrequencyMs ||
this.currentBatch.length >= this.maxQueueSize
) {
await this.flush()
}
}
if ((error as LibrdKafkaError).isRetriable) {
// If we get a retriable error, bubble that up so that the
// caller can retry.
throw new DependencyUnavailableError(error.message, 'Kafka', error)
}
)
throw error
}
}
async queueMessages(kafkaMessages: ProducerRecord[]): Promise<void> {
for (const message of kafkaMessages) {
await this.queueMessage(message)
}
await Promise.all(kafkaMessages.map((message) => this.queueMessage(message)))
}
async queueSingleJsonMessage(topic: string, key: Message['key'], object: Record<string, any>): Promise<void> {
@ -115,68 +59,12 @@ export class KafkaProducerWrapper {
})
}
public flush(append?: ProducerRecord): Promise<void> {
if (this.currentBatch.length === 0) {
return Promise.resolve()
}
return instrument(this.statsd, { metricName: 'query.kafka_send' }, async () => {
const messages = this.currentBatch
const batchSize = this.currentBatchSize
this.lastFlushTime = Date.now()
this.currentBatch = append ? [append] : []
this.currentBatchSize = append ? this.estimateMessageSize(append) : 0
this.statsd?.histogram('query.kafka_send.size', batchSize)
const timeout = timeoutGuard('Kafka message sending delayed. Waiting over 30 sec to send messages.')
try {
await this.producer.sendBatch({
topicMessages: messages,
compression: CompressionTypes.Snappy,
})
} catch (err) {
Sentry.captureException(err, {
extra: {
batchCount: messages.length,
topics: messages.map((record) => record.topic),
messageCounts: messages.map((record) => record.messages.length),
estimatedSize: batchSize,
},
})
// :TODO: Implement some retrying, https://github.com/PostHog/plugin-server/issues/511
this.statsd?.increment('query.kafka_send.failure', {
firstTopic: messages[0].topic,
})
status.error('⚠️', 'Failed to flush kafka messages that were produced', {
error: err,
batchCount: messages.length,
topics: messages.map((record) => record.topic),
messageCounts: messages.map((record) => record.messages.length),
estimatedSize: batchSize,
})
if (err instanceof KafkaJSNumberOfRetriesExceeded && err.cause instanceof KafkaJSError) {
if (err.cause.retriable === true) {
throw new DependencyUnavailableError(err.cause.name, 'Kafka', err.cause)
}
}
throw err
} finally {
clearTimeout(timeout)
}
})
public async flush() {
return await flushProducer(this.producer)
}
public async disconnect(): Promise<void> {
clearInterval(this.flushInterval)
await this.flush()
await this.producer.disconnect()
}
private estimateMessageSize(kafkaMessage: ProducerRecord): number {
// :TRICKY: This length respects unicode
return Buffer.from(JSON.stringify(kafkaMessage)).length
await disconnectProducer(this.producer)
}
}

View File

@ -120,8 +120,7 @@ export function processUnhandledException(error: Error, server: Hub, kind: strin
},
})
status.error('🤮', `${kind}!`)
status.error('🤮', error)
status.error('🤮', `${kind}!`, { error, stack: error.stack })
}
const jobDuration = new Histogram({

View File

@ -1,153 +0,0 @@
import IORedis from 'ioredis'
import { ONE_HOUR } from '../src/config/constants'
import { startPluginsServer } from '../src/main/pluginsServer'
import { LogLevel, PluginsServerConfig } from '../src/types'
import { Hub } from '../src/types'
import { UUIDT } from '../src/utils/utils'
import { makePiscina } from '../src/worker/piscina'
import { createPosthog, DummyPostHog } from '../src/worker/vm/extensions/posthog'
import { writeToFile } from '../src/worker/vm/extensions/test-utils'
import { delayUntilEventIngested, resetTestDatabaseClickhouse } from './helpers/clickhouse'
import { resetKafka } from './helpers/kafka'
import { pluginConfig39 } from './helpers/plugins'
import { resetTestDatabase } from './helpers/sql'
const { console: testConsole } = writeToFile
jest.setTimeout(60000) // 60 sec timeout
const indexJs = `
import { console as testConsole } from 'test-utils/write-to-file'
export async function processEvent (event) {
testConsole.log('processEvent')
console.info('amogus')
event.properties.processed = 'hell yes'
event.properties.upperUuid = event.properties.uuid?.toUpperCase()
event.properties['$snapshot_data'] = 'no way'
return event
}
export function onEvent (event, { global }) {
// we use this to mock setupPlugin being
// run after some events were already ingested
global.timestampBoundariesForTeam = {
max: new Date(),
min: new Date(Date.now()-${ONE_HOUR})
}
testConsole.log('onEvent', event.event)
}`
describe('E2E with buffer topic enabled', () => {
let hub: Hub
let stopServer: () => Promise<void>
let posthog: DummyPostHog
let redis: IORedis.Redis
const extraServerConfig: Partial<PluginsServerConfig> = {
WORKER_CONCURRENCY: 1,
LOG_LEVEL: LogLevel.Log,
KAFKA_PRODUCER_MAX_QUEUE_SIZE: 100, // The default in tests is 0 but here we specifically want to test batching
KAFKA_FLUSH_FREQUENCY_MS: 0, // Same as above, but with time
BUFFER_CONVERSION_SECONDS: 3, // We want to test the delay mechanism, but with a much lower delay than in prod
CONVERSION_BUFFER_ENABLED: true,
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '2', // For these tests we want to validate that we function correctly with the buffer topic enabled
}
beforeEach(async () => {
testConsole.reset()
await resetTestDatabase(indexJs)
await resetTestDatabaseClickhouse(extraServerConfig)
await resetKafka(extraServerConfig)
const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
hub = startResponse.hub
stopServer = startResponse.stop
redis = await hub.redisPool.acquire()
posthog = createPosthog(hub, pluginConfig39)
})
afterEach(async () => {
await hub.redisPool.release(redis)
await stopServer()
})
describe('ClickHouse ingestion', () => {
test('event captured, processed, ingested', async () => {
expect((await hub.db.fetchEvents()).length).toBe(0)
const uuid = new UUIDT().toString()
await posthog.capture('custom event via buffer', { name: 'hehe', uuid })
await hub.kafkaProducer.flush()
await delayUntilEventIngested(() => hub.db.fetchEvents(), undefined, undefined, 500)
const events = await hub.db.fetchEvents()
expect(events.length).toBe(1)
// processEvent ran and modified
expect(events[0].properties.processed).toEqual('hell yes')
expect(events[0].properties.upperUuid).toEqual(uuid.toUpperCase())
// onEvent ran
expect(testConsole.read()).toEqual([['processEvent'], ['onEvent', 'custom event via buffer']])
})
})
})
describe('E2E with direct to graphile worker', () => {
let hub: Hub
let stopServer: () => Promise<void>
let posthog: DummyPostHog
let redis: IORedis.Redis
const extraServerConfig: Partial<PluginsServerConfig> = {
WORKER_CONCURRENCY: 1,
LOG_LEVEL: LogLevel.Log,
KAFKA_PRODUCER_MAX_QUEUE_SIZE: 100, // The default in tests is 0 but here we specifically want to test batching
KAFKA_FLUSH_FREQUENCY_MS: 0, // Same as above, but with time
BUFFER_CONVERSION_SECONDS: 3, // We want to test the delay mechanism, but with a much lower delay than in prod
CONVERSION_BUFFER_ENABLED: true,
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '',
}
beforeEach(async () => {
testConsole.reset()
await resetTestDatabase(indexJs)
await resetTestDatabaseClickhouse(extraServerConfig)
await resetKafka(extraServerConfig)
const startResponse = await startPluginsServer(extraServerConfig, makePiscina)
hub = startResponse.hub
stopServer = startResponse.stop
redis = await hub.redisPool.acquire()
posthog = createPosthog(hub, pluginConfig39)
})
afterEach(async () => {
await hub.redisPool.release(redis)
await stopServer()
})
describe('ClickHouse ingestion', () => {
test('event captured, processed, ingested', async () => {
expect((await hub.db.fetchEvents()).length).toBe(0)
const uuid = new UUIDT().toString()
await posthog.capture('custom event via buffer', { name: 'hehe', uuid })
await hub.kafkaProducer.flush()
await delayUntilEventIngested(() => hub.db.fetchEvents(), undefined, undefined, 500)
const events = await hub.db.fetchEvents()
expect(events.length).toBe(1)
// processEvent ran and modified
expect(events[0].properties.processed).toEqual('hell yes')
expect(events[0].properties.upperUuid).toEqual(uuid.toUpperCase())
// onEvent ran
expect(testConsole.read()).toEqual([['processEvent'], ['onEvent', 'custom event via buffer']])
})
})
})

View File

@ -15,9 +15,8 @@
// 2. the KafkaQueue consumer handler will let the error bubble up to the
// KafkaJS consumer runner, which we assume will handle retries.
import { RetryError } from '@posthog/plugin-scaffold'
import Redis from 'ioredis'
import { KafkaJSError } from 'kafkajs'
import LibrdKafkaError from 'node-rdkafka-acosom/lib/error'
import { defaultConfig } from '../../../src/config/config'
import { KAFKA_EVENTS_JSON } from '../../../src/config/kafka-topics'
@ -28,6 +27,7 @@ import { createHub } from '../../../src/utils/db/hub'
import { UUIDT } from '../../../src/utils/utils'
import Piscina, { makePiscina } from '../../../src/worker/piscina'
import { setupPlugins } from '../../../src/worker/plugins/setup'
import { teardownPlugins } from '../../../src/worker/plugins/teardown'
import { createTaskRunner } from '../../../src/worker/worker'
import {
createOrganization,
@ -37,6 +37,8 @@ import {
POSTGRES_DELETE_TABLES_QUERY,
} from '../../helpers/sql'
jest.setTimeout(10000)
describe('workerTasks.runAsyncHandlersEventPipeline()', () => {
// Tests the failure cases for the workerTasks.runAsyncHandlersEventPipeline
// task. Note that this equally applies to e.g. runEventPipeline task as
@ -51,29 +53,31 @@ describe('workerTasks.runAsyncHandlersEventPipeline()', () => {
let closeHub: () => Promise<void>
let piscinaTaskRunner: ({ task, args }) => Promise<any>
beforeEach(() => {
// Use fake timers to ensure that we don't need to wait on e.g. retry logic.
jest.useFakeTimers({ advanceTimers: true })
})
beforeAll(async () => {
jest.useFakeTimers({ advanceTimers: true })
;[hub, closeHub] = await createHub()
redis = await hub.redisPool.acquire()
piscinaTaskRunner = createTaskRunner(hub)
await hub.postgres.query(POSTGRES_DELETE_TABLES_QUERY) // Need to clear the DB to avoid unique constraint violations on ids
})
afterAll(async () => {
await hub.redisPool.release(redis)
await closeHub()
})
beforeEach(() => {
// Use fake timers to ensure that we don't need to wait on e.g. retry logic.
jest.useFakeTimers({ advanceTimers: 30 })
})
afterEach(() => {
jest.clearAllTimers()
jest.runAllTimers()
jest.useRealTimers()
jest.clearAllMocks()
})
afterAll(async () => {
await hub.redisPool.release(redis)
await teardownPlugins(hub)
await closeHub()
})
test('throws on produce errors', async () => {
// To ensure that producer errors are retried and not swallowed, we need
// to ensure that these are bubbled up to the main consumer loop. Note
@ -101,10 +105,19 @@ describe('workerTasks.runAsyncHandlersEventPipeline()', () => {
await createPluginConfig(hub.postgres, { team_id: teamId, plugin_id: plugin.id })
await setupPlugins(hub)
jest.spyOn(hub.kafkaProducer.producer, 'send').mockImplementationOnce(() => {
return Promise.reject(new KafkaJSError('Failed to produce'))
const error = new LibrdKafkaError({
name: 'Failed to produce',
message: 'Failed to produce',
code: 1,
errno: 1,
origin: 'test',
isRetriable: true,
})
jest.spyOn(hub.kafkaProducer.producer, 'produce').mockImplementation(
(topic, partition, message, key, timestamp, cb) => cb(error)
)
await expect(
piscinaTaskRunner({
task: 'runAsyncHandlersEventPipeline',
@ -119,70 +132,7 @@ describe('workerTasks.runAsyncHandlersEventPipeline()', () => {
},
},
})
).rejects.toEqual(
new DependencyUnavailableError('Failed to produce', 'Kafka', new KafkaJSError('Failed to produce'))
)
})
test('retry on RetryError', async () => {
// If we receive a `RetryError`, we should retry the task within the
// pipeline rather than throwing it to the main consumer loop.
// Note that we assume the retries are happening async as is the
// currently functionality, i.e. outside of the consumer loop, but we
// should arguably move this to a separate retry topic.
const organizationId = await createOrganization(hub.postgres)
const plugin = await createPlugin(hub.postgres, {
organization_id: organizationId,
name: 'runEveryMinute plugin',
plugin_type: 'source',
is_global: false,
source__index_ts: `
export async function onEvent(event, { jobs }) {
await jobs.test().runNow()
}
export const jobs = {
test: async () => {}
}
`,
})
const teamId = await createTeam(hub.postgres, organizationId)
await createPluginConfig(hub.postgres, { team_id: teamId, plugin_id: plugin.id })
await setupPlugins(hub)
// This isn't strictly correct in terms of where this is being raised
// from i.e. `producer.send` doesn't ever raise a `RetryError`, but
// it was just convenient to do so and is hopefully close enough to
// reality.
// NOTE: we only mock once such that the second call will succeed
jest.spyOn(hub.kafkaProducer.producer, 'send').mockImplementationOnce(() => {
return Promise.reject(new RetryError('retry error'))
})
const event = {
distinctId: 'asdf',
ip: '',
teamId: teamId,
event: 'some event',
properties: {},
eventUuid: new UUIDT().toString(),
}
await expect(
piscinaTaskRunner({
task: 'runAsyncHandlersEventPipeline',
args: { event },
})
).resolves.toEqual({
args: [expect.objectContaining(event)],
lastStep: 'runAsyncHandlersStep',
})
// Ensure the retry call is made.
jest.runOnlyPendingTimers()
expect(hub.kafkaProducer.producer.send).toHaveBeenCalledTimes(2)
).rejects.toEqual(new DependencyUnavailableError('Failed to produce', 'Kafka', error))
})
test(`doesn't throw on arbitrary failures`, async () => {
@ -238,19 +188,23 @@ describe('eachBatchAsyncHandlers', () => {
let piscina: Piscina
beforeEach(async () => {
jest.useFakeTimers({ advanceTimers: true })
;[hub, closeHub] = await createHub()
})
afterEach(async () => {
await closeHub?.()
jest.useRealTimers()
})
test('rejections from piscina are bubbled up to the consumer', async () => {
piscina = await makePiscina(defaultConfig, hub)
const ingestionConsumer = buildOnEventIngestionConsumer({ hub, piscina })
const error = new LibrdKafkaError({ message: 'test', code: 1, errno: 1, origin: 'test', isRetriable: true })
jest.spyOn(ingestionConsumer, 'eachBatch').mockRejectedValue(
new DependencyUnavailableError('Failed to produce', 'Kafka', new KafkaJSError('Failed to produce'))
new DependencyUnavailableError('Failed to produce', 'Kafka', error)
)
await expect(
@ -293,8 +247,6 @@ describe('eachBatchAsyncHandlers', () => {
uncommittedOffsets: jest.fn(),
pause: jest.fn(),
})
).rejects.toEqual(
new DependencyUnavailableError('Failed to produce', 'Kafka', new KafkaJSError('Failed to produce'))
)
).rejects.toEqual(new DependencyUnavailableError('Failed to produce', 'Kafka', error))
})
})

View File

@ -1,5 +1,3 @@
import { Producer } from 'kafkajs'
import { KAFKA_SCHEDULED_TASKS } from '../../../src/config/kafka-topics'
import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule'
import { Hub } from '../../../src/types'
@ -31,10 +29,8 @@ describe('Graphile Worker schedule', () => {
runEveryDay: [7, 8, 9],
},
kafkaProducer: {
producer: {
send: jest.fn(),
} as unknown as Producer,
} as KafkaProducerWrapper,
queueMessage: jest.fn(),
} as unknown as KafkaProducerWrapper,
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
}
@ -42,7 +38,7 @@ describe('Graphile Worker schedule', () => {
job: { run_at: new Date() },
} as any)
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(1, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(1, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -54,7 +50,7 @@ describe('Graphile Worker schedule', () => {
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(2, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(2, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -66,7 +62,7 @@ describe('Graphile Worker schedule', () => {
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(3, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(3, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -82,7 +78,7 @@ describe('Graphile Worker schedule', () => {
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour', {
job: { run_at: new Date() },
} as any)
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(4, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(4, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -94,7 +90,7 @@ describe('Graphile Worker schedule', () => {
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(5, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(5, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -106,7 +102,7 @@ describe('Graphile Worker schedule', () => {
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(6, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(6, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -122,7 +118,7 @@ describe('Graphile Worker schedule', () => {
await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay', {
job: { run_at: new Date() },
} as any)
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(7, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(7, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -134,7 +130,7 @@ describe('Graphile Worker schedule', () => {
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(8, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(8, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{
@ -146,7 +142,7 @@ describe('Graphile Worker schedule', () => {
},
],
})
expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(9, {
expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(9, {
topic: KAFKA_SCHEDULED_TASKS,
messages: [
{

View File

@ -1,164 +0,0 @@
import { CompressionTypes, Producer } from 'kafkajs'
import { PluginsServerConfig } from '../../src/types'
import { KafkaProducerWrapper } from '../../src/utils/db/kafka-producer-wrapper'
describe('KafkaProducerWrapper', () => {
let producer: KafkaProducerWrapper
let mockKafkaProducer: Producer
let flushSpy: any
beforeEach(() => {
jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:05').getTime())
mockKafkaProducer = { sendBatch: jest.fn() } as any
producer = new KafkaProducerWrapper(mockKafkaProducer, undefined, {
KAFKA_FLUSH_FREQUENCY_MS: 20000,
KAFKA_PRODUCER_MAX_QUEUE_SIZE: 4,
KAFKA_MAX_MESSAGE_BATCH_SIZE: 500,
} as PluginsServerConfig)
clearInterval(producer.flushInterval)
flushSpy = jest.spyOn(producer, 'flush')
})
describe('queueMessage()', () => {
it('respects MAX_QUEUE_SIZE', async () => {
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(10) }],
})
await producer.queueMessage({
topic: 'b',
messages: [{ value: '1'.repeat(30) }],
})
await producer.queueMessage({
topic: 'b',
messages: [{ value: '1'.repeat(30) }],
})
expect(flushSpy).not.toHaveBeenCalled()
expect(producer.currentBatch.length).toEqual(3)
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(30) }],
})
expect(flushSpy).toHaveBeenCalled()
expect(producer.currentBatch.length).toEqual(0)
expect(producer.currentBatchSize).toEqual(0)
expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({
compression: CompressionTypes.Snappy,
topicMessages: [expect.anything(), expect.anything(), expect.anything(), expect.anything()],
})
})
it('respects KAFKA_MAX_MESSAGE_BATCH_SIZE', async () => {
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(400) }],
})
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(20) }],
})
expect(flushSpy).not.toHaveBeenCalled()
expect(producer.currentBatch.length).toEqual(2)
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(40) }],
})
expect(flushSpy).toHaveBeenCalled()
expect(producer.currentBatch.length).toEqual(1)
expect(producer.currentBatchSize).toBeGreaterThan(40)
expect(producer.currentBatchSize).toBeLessThan(100)
expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({
compression: CompressionTypes.Snappy,
topicMessages: [expect.anything(), expect.anything()],
})
})
it('flushes immediately when message exceeds KAFKA_MAX_MESSAGE_BATCH_SIZE', async () => {
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(10000) }],
})
expect(flushSpy).toHaveBeenCalled()
expect(producer.currentBatch.length).toEqual(0)
expect(producer.currentBatchSize).toEqual(0)
expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({
compression: CompressionTypes.Snappy,
topicMessages: [expect.anything()],
})
})
it('respects KAFKA_FLUSH_FREQUENCY_MS', async () => {
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(10) }],
})
jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:20').getTime())
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(10) }],
})
expect(flushSpy).not.toHaveBeenCalled()
expect(producer.currentBatch.length).toEqual(2)
jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:26').getTime())
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(10) }],
})
expect(flushSpy).toHaveBeenCalled()
expect(producer.currentBatch.length).toEqual(0)
expect(producer.lastFlushTime).toEqual(Date.now())
expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({
compression: CompressionTypes.Snappy,
topicMessages: [expect.anything(), expect.anything(), expect.anything()],
})
})
})
describe('flush()', () => {
it('flushes messages in memory', async () => {
await producer.queueMessage({
topic: 'a',
messages: [{ value: '1'.repeat(10) }],
})
jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:15').getTime())
await producer.flush()
expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({
compression: CompressionTypes.Snappy,
topicMessages: [
{
topic: 'a',
messages: [{ value: '1'.repeat(10) }],
},
],
})
expect(producer.currentBatch.length).toEqual(0)
expect(producer.currentBatchSize).toEqual(0)
expect(producer.lastFlushTime).toEqual(Date.now())
})
it('does nothing if nothing queued', async () => {
await producer.flush()
expect(mockKafkaProducer.sendBatch).not.toHaveBeenCalled()
})
})
})