diff --git a/package.json b/package.json index 1a853cf4789..1ba5bd9da68 100644 --- a/package.json +++ b/package.json @@ -275,8 +275,8 @@ "prettier --write" ], "(plugin-server/**).{js,jsx,mjs,ts,tsx}": [ - "eslint -c plugin-server/.eslintrc.js --fix", - "prettier --write" + "pnpm --dir plugin-server exec eslint --fix", + "pnpm --dir plugin-server exec prettier --write" ], "!(posthog/hogql/grammar/*)*.{py,pyi}": [ "black", diff --git a/plugin-server/functional_tests/kafka.ts b/plugin-server/functional_tests/kafka.ts index 2144c428816..4c33975ca69 100644 --- a/plugin-server/functional_tests/kafka.ts +++ b/plugin-server/functional_tests/kafka.ts @@ -3,6 +3,7 @@ import SnappyCodec from 'kafkajs-snappy' import { HighLevelProducer } from 'node-rdkafka-acosom' import { defaultConfig } from '../src/config/config' +import { produce as defaultProduce } from '../src/kafka/producer' CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec @@ -23,6 +24,7 @@ afterAll(async () => { export async function createKafkaProducer() { producer = new HighLevelProducer({ 'metadata.broker.list': defaultConfig.KAFKA_HOSTS, + 'linger.ms': 0, }) await new Promise((resolve, reject) => @@ -36,22 +38,5 @@ export async function createKafkaProducer() { export async function produce({ topic, message, key }: { topic: string; message: Buffer | null; key: string }) { producer = producer ?? (await createKafkaProducer()) - await new Promise((resolve, reject) => - producer.produce(topic, undefined, message, Buffer.from(key), Date.now(), (err, offset) => { - if (err) { - reject(err) - } else { - resolve(offset) - } - }) - ) - await new Promise((resolve, reject) => - producer.flush(10000, (err) => { - if (err) { - reject(err) - } else { - resolve(null) - } - }) - ) + await defaultProduce({ producer, topic, value: message, key: Buffer.from(key) }) } diff --git a/plugin-server/functional_tests/session-recordings.test.ts b/plugin-server/functional_tests/session-recordings.test.ts index 3f131f830b0..538c2e65263 100644 --- a/plugin-server/functional_tests/session-recordings.test.ts +++ b/plugin-server/functional_tests/session-recordings.test.ts @@ -176,7 +176,7 @@ test.concurrent(`recording events not ingested to ClickHouse if team is opted ou }) // NOTE: we're assuming that we have a single partition for the Kafka topic, - // and that the consumer produces messages in the order they are consumed. + // and that the consumer produceAndFlushs messages in the order they are consumed. // TODO: add some side-effect we can assert on rather than relying on the // partitioning / ordering setup e.g. an ingestion warning. const events = await fetchSessionRecordingsEvents(teamOptedOutId, uuidOptedOut) diff --git a/plugin-server/src/kafka/producer.ts b/plugin-server/src/kafka/producer.ts index f2f429244f1..f060f1ed942 100644 --- a/plugin-server/src/kafka/producer.ts +++ b/plugin-server/src/kafka/producer.ts @@ -59,12 +59,17 @@ export const createKafkaProducer = async (config: ProducerGlobalConfig) => { return producer } -export const produce = async ( - producer: RdKafkaProducer, - topic: string, - value: Buffer | null, +export const produce = async ({ + producer, + topic, + value, + key, +}: { + producer: RdKafkaProducer + topic: string + value: Buffer | null key: Buffer | null -): Promise => { +}): Promise => { status.debug('📤', 'Producing message', { topic: topic }) return await new Promise((resolve, reject) => producer.produce(topic, null, value, key, Date.now(), (error: any, offset: NumberNullUndefined) => { diff --git a/plugin-server/src/main/graphile-worker/schedule.ts b/plugin-server/src/main/graphile-worker/schedule.ts index e88ddd2f0e1..949e929899b 100644 --- a/plugin-server/src/main/graphile-worker/schedule.ts +++ b/plugin-server/src/main/graphile-worker/schedule.ts @@ -52,7 +52,7 @@ export async function runScheduledTasks( if (server.USE_KAFKA_FOR_SCHEDULED_TASKS) { for (const pluginConfigId of server.pluginSchedule?.[taskType] || []) { status.info('⏲️', 'queueing_schedule_task', { taskType, pluginConfigId }) - await server.kafkaProducer.producer.send({ + await server.kafkaProducer.queueMessage({ topic: KAFKA_SCHEDULED_TASKS, messages: [{ key: pluginConfigId.toString(), value: JSON.stringify({ taskType, pluginConfigId }) }], }) diff --git a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts index d42e5fc1f0a..e0b0311fcd5 100644 --- a/plugin-server/src/main/ingestion-queues/jobs-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/jobs-consumer.ts @@ -1,6 +1,7 @@ import { StatsD } from 'hot-shots' -import { EachBatchHandler, Kafka, Producer } from 'kafkajs' +import { EachBatchHandler, Kafka } from 'kafkajs' import { Counter } from 'prom-client' +import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' import { KAFKA_JOBS, KAFKA_JOBS_DLQ } from '../../config/kafka-topics' import { EnqueuedPluginJob, JobName } from '../../types' @@ -26,7 +27,7 @@ export const startJobsConsumer = async ({ statsd, }: { kafka: Kafka - producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic + producer: KafkaProducerWrapper graphileWorker: GraphileWorker statsd?: StatsD }) => { @@ -49,7 +50,10 @@ export const startJobsConsumer = async ({ value: message.value, }) // TODO: handle resolving offsets asynchronously - await producer.send({ topic: KAFKA_JOBS_DLQ, messages: [message] }) + await producer.queueMessage({ + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], + }) resolveOffset(message.offset) continue } @@ -63,7 +67,10 @@ export const startJobsConsumer = async ({ error, }) // TODO: handle resolving offsets asynchronously - await producer.send({ topic: KAFKA_JOBS_DLQ, messages: [message] }) + await producer.queueMessage({ + topic: KAFKA_JOBS_DLQ, + messages: [{ value: message.value, key: message.key }], + }) resolveOffset(message.offset) continue } @@ -111,5 +118,10 @@ export const startJobsConsumer = async ({ }, }) - return consumer + return { + ...consumer, + stop: async () => { + await consumer.stop() + }, + } } diff --git a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts index 36d7edc7060..473f68f023d 100644 --- a/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/scheduled-tasks-consumer.ts @@ -1,5 +1,6 @@ import { StatsD } from 'hot-shots' -import { Batch, EachBatchHandler, Kafka, Producer } from 'kafkajs' +import { Batch, EachBatchHandler, Kafka } from 'kafkajs' +import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper' import { KAFKA_SCHEDULED_TASKS, KAFKA_SCHEDULED_TASKS_DLQ } from '../../config/kafka-topics' import { DependencyUnavailableError } from '../../utils/db/error' @@ -15,14 +16,14 @@ const taskTypes = ['runEveryMinute', 'runEveryHour', 'runEveryDay'] as const export const startScheduledTasksConsumer = async ({ kafka, - piscina, producer, + piscina, partitionConcurrency = 3, statsd, }: { kafka: Kafka + producer: KafkaProducerWrapper piscina: Piscina - producer: Producer // NOTE: not using KafkaProducerWrapper here to avoid buffering logic partitionConcurrency: number statsd?: StatsD }) => { @@ -134,10 +135,15 @@ export const startScheduledTasksConsumer = async ({ }, }) - return consumer + return { + ...consumer, + stop: async () => { + await consumer.stop() + }, + } } -const getTasksFromBatch = async (batch: Batch, producer: Producer) => { +const getTasksFromBatch = async (batch: Batch, producer: KafkaProducerWrapper) => { // In any one batch, we only want to run one task per plugin config id. // Hence here we dedupe the tasks by plugin config id and task type. const tasksbyTypeAndPluginConfigId = {} as Record< @@ -153,7 +159,10 @@ const getTasksFromBatch = async (batch: Batch, producer: Producer) => { status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, { value: message.value, }) - await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) + await producer.queueMessage({ + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }) continue } @@ -168,13 +177,19 @@ const getTasksFromBatch = async (batch: Batch, producer: Producer) => { status.warn('⚠️', `Invalid message for partition ${batch.partition} offset ${message.offset}.`, { error: error.stack ?? error, }) - await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) + await producer.queueMessage({ + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }) continue } if (!taskTypes.includes(task.taskType) || isNaN(task.pluginConfigId)) { status.warn('⚠️', `Invalid schema for partition ${batch.partition} offset ${message.offset}.`, task) - await producer.send({ topic: KAFKA_SCHEDULED_TASKS_DLQ, messages: [message] }) + await producer.queueMessage({ + topic: KAFKA_SCHEDULED_TASKS_DLQ, + messages: [{ value: message.value, key: message.key }], + }) continue } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index f19ddfcf6fe..c8be1315fd3 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -164,12 +164,12 @@ const eachMessage = partition: message.partition, }) return [ - produce( + produce({ producer, - KAFKA_SESSION_RECORDING_EVENTS_DLQ, - message.value, - message.key ? Buffer.from(message.key) : null - ), + topic: KAFKA_SESSION_RECORDING_EVENTS_DLQ, + value: message.value, + key: message.key ? Buffer.from(message.key) : null, + }), ] } @@ -191,12 +191,12 @@ const eachMessage = partition: message.partition, }) return [ - produce( + produce({ producer, - KAFKA_SESSION_RECORDING_EVENTS_DLQ, - message.value, - message.key ? Buffer.from(message.key) : null - ), + topic: KAFKA_SESSION_RECORDING_EVENTS_DLQ, + value: message.value, + key: message.key ? Buffer.from(message.key) : null, + }), ] } @@ -253,12 +253,12 @@ const eachMessage = ) return [ - produce( + produce({ producer, - KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS, - Buffer.from(JSON.stringify(clickHouseRecord)), - message.key ? Buffer.from(message.key) : null - ), + topic: KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS, + value: Buffer.from(JSON.stringify(clickHouseRecord)), + key: message.key ? Buffer.from(message.key) : null, + }), ] } else if (event.event === '$performance_event') { const clickHouseRecord = createPerformanceEvent( @@ -269,12 +269,12 @@ const eachMessage = ) return [ - produce( + produce({ producer, - KAFKA_PERFORMANCE_EVENTS, - Buffer.from(JSON.stringify(clickHouseRecord)), - message.key ? Buffer.from(message.key) : null - ), + topic: KAFKA_PERFORMANCE_EVENTS, + value: Buffer.from(JSON.stringify(clickHouseRecord)), + key: message.key ? Buffer.from(message.key) : null, + }), ] } else { status.warn('⚠️', 'invalid_message', { diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index b7c3a03eaeb..b9be3d4edc4 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -1,6 +1,9 @@ import * as Sentry from '@sentry/node' import { Server } from 'http' import { Consumer, KafkaJSProtocolError } from 'kafkajs' +import { CompressionCodecs, CompressionTypes } from 'kafkajs' +// @ts-expect-error no type definitions +import SnappyCodec from 'kafkajs-snappy' import * as schedule from 'node-schedule' import { Counter } from 'prom-client' @@ -32,6 +35,8 @@ import { startSessionRecordingEventsConsumer } from './ingestion-queues/session- import { createHttpServer } from './services/http-server' import { getObjectStorage } from './services/object_storage' +CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec + const { version } = require('../../package.json') // TODO: refactor this into a class, removing the need for many different Servers @@ -242,8 +247,8 @@ export async function startPluginsServer( if (capabilities.pluginScheduledTasks) { schedulerTasksConsumer = await startScheduledTasksConsumer({ piscina: piscina, + producer: hub.kafkaProducer, kafka: hub.kafka, - producer: hub.kafkaProducer.producer, partitionConcurrency: serverConfig.KAFKA_PARTITIONS_CONSUMED_CONCURRENTLY, statsd: hub.statsd, }) @@ -252,7 +257,7 @@ export async function startPluginsServer( if (capabilities.processPluginJobs) { jobsConsumer = await startJobsConsumer({ kafka: hub.kafka, - producer: hub.kafkaProducer.producer, + producer: hub.kafkaProducer, graphileWorker: graphileWorker, statsd: hub.statsd, }) diff --git a/plugin-server/src/utils/db/hub.ts b/plugin-server/src/utils/db/hub.ts index 28a1c63fade..a86a502861b 100644 --- a/plugin-server/src/utils/db/hub.ts +++ b/plugin-server/src/utils/db/hub.ts @@ -4,7 +4,7 @@ import * as fs from 'fs' import { createPool } from 'generic-pool' import { StatsD } from 'hot-shots' import Redis from 'ioredis' -import { Kafka, KafkaJSError, Partitioners, SASLOptions } from 'kafkajs' +import { Kafka, SASLOptions } from 'kafkajs' import { DateTime } from 'luxon' import { hostname } from 'os' import * as path from 'path' @@ -15,6 +15,8 @@ import { getPluginServerCapabilities } from '../../capabilities' import { defaultConfig } from '../../config/config' import { KAFKAJS_LOG_LEVEL_MAPPING } from '../../config/constants' import { KAFKA_JOBS } from '../../config/kafka-topics' +import { createRdConnectionConfigFromEnvVars } from '../../kafka/config' +import { createKafkaProducer } from '../../kafka/producer' import { getObjectStorage } from '../../main/services/object_storage' import { EnqueuedPluginJob, @@ -39,7 +41,6 @@ import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-k import { RootAccessManager } from './../../worker/vm/extensions/helpers/root-acess-manager' import { PromiseManager } from './../../worker/vm/promise-manager' import { DB } from './db' -import { DependencyUnavailableError } from './error' import { KafkaProducerWrapper } from './kafka-producer-wrapper' // `node-postgres` would return dates as plain JS Date objects, which would use the local timezone. @@ -130,13 +131,10 @@ export async function createHub( const kafka = createKafkaClient(serverConfig as KafkaConfig) - const producer = kafka.producer({ - retry: { retries: 10, initialRetryTime: 1000, maxRetryTime: 30 }, - createPartitioner: Partitioners.LegacyPartitioner, - }) - await producer.connect() + const kafkaConnectionConfig = createRdConnectionConfigFromEnvVars(serverConfig as KafkaConfig) + const producer = await createKafkaProducer({ ...kafkaConnectionConfig, 'linger.ms': 0 }) - const kafkaProducer = new KafkaProducerWrapper(producer, statsd, serverConfig) + const kafkaProducer = new KafkaProducerWrapper(producer) status.info('👍', `Kafka ready`) status.info('🤔', `Connecting to Postgresql...`) @@ -194,31 +192,15 @@ export async function createHub( // an acknowledgement as for instance there are some jobs that are // chained, and if we do not manage to produce then the chain will be // broken. - try { - await kafkaProducer.producer.send({ - topic: KAFKA_JOBS, - messages: [ - { - key: job.pluginConfigTeam.toString(), - value: JSON.stringify(job), - }, - ], - }) - } catch (error) { - if (error instanceof KafkaJSError) { - // If we get a retriable Kafka error (maybe it's down for - // example), rethrow the error as a generic `DependencyUnavailableError` - // passing through retriable such that we can decide if this is - // something we should retry at the consumer level. - if (error.retriable) { - throw new DependencyUnavailableError(error.message, 'Kafka', error) - } - } - - // Otherwise, just rethrow the error as is. E.g. if we fail to - // serialize then we don't want to retry. - throw error - } + await kafkaProducer.queueMessage({ + topic: KAFKA_JOBS, + messages: [ + { + value: Buffer.from(JSON.stringify(job)), + key: Buffer.from(job.pluginConfigTeam.toString()), + }, + ], + }) } const hub: Partial = { diff --git a/plugin-server/src/utils/db/kafka-producer-wrapper.ts b/plugin-server/src/utils/db/kafka-producer-wrapper.ts index 740db6e9977..9a6c532e5eb 100644 --- a/plugin-server/src/utils/db/kafka-producer-wrapper.ts +++ b/plugin-server/src/utils/db/kafka-producer-wrapper.ts @@ -1,111 +1,55 @@ -import * as Sentry from '@sentry/node' -import { StatsD } from 'hot-shots' -import { - CompressionCodecs, - CompressionTypes, - KafkaJSError, - KafkaJSNumberOfRetriesExceeded, - Message, - Producer, - ProducerRecord, -} from 'kafkajs' -// @ts-expect-error no type definitions -import SnappyCodec from 'kafkajs-snappy' +import { Message, ProducerRecord } from 'kafkajs' +import { HighLevelProducer, LibrdKafkaError } from 'node-rdkafka-acosom' -import { runInSpan } from '../../sentry' -import { PluginsServerConfig } from '../../types' -import { instrument } from '../metrics' -import { status } from '../status' +import { disconnectProducer, flushProducer, produce } from '../../kafka/producer' +import { status } from '../../utils/status' import { DependencyUnavailableError } from './error' -import { timeoutGuard } from './utils' -CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec - -/** This class wraps kafkajs producer, adding batching to optimize performance. +/** This class is a wrapper around the rdkafka producer, and does very little. + * It used to be a wrapper around KafkaJS, but we switched to rdkafka because of + * increased performance. * - * As messages get queued, we flush the queue in the following cases. + * The big difference between this and the original is that we return a promise from + * queueMessage, which will only resolve once we get an ack that the message has + * been persisted to Kafka. So we should get stronger guarantees on processing. * - * 1. Message size + current batch exceeds max batch message size - * 2. Too much time passed - * 3. Too many messages queued. - * - * We also flush the queue regularly to avoid dropping any messages as the program quits. + * TODO: refactor Kafka producer usage to use rdkafka directly. */ export class KafkaProducerWrapper { - /** StatsD instance used to do instrumentation */ - private statsd: StatsD | undefined - /** Kafka producer used for syncing Postgres and ClickHouse person data. */ - producer: Producer + public producer: HighLevelProducer - lastFlushTime: number - currentBatch: Array - currentBatchSize: number - - flushFrequencyMs: number - maxQueueSize: number - maxBatchSize: number - - flushInterval: NodeJS.Timeout | undefined - - constructor(producer: Producer, statsd: StatsD | undefined, serverConfig: PluginsServerConfig) { + constructor(producer: HighLevelProducer) { this.producer = producer - this.statsd = statsd - - this.lastFlushTime = Date.now() - this.currentBatch = [] - this.currentBatchSize = 0 - - this.flushFrequencyMs = serverConfig.KAFKA_FLUSH_FREQUENCY_MS - this.maxQueueSize = serverConfig.KAFKA_PRODUCER_MAX_QUEUE_SIZE - this.maxBatchSize = serverConfig.KAFKA_MAX_MESSAGE_BATCH_SIZE - - if (this.flushFrequencyMs > 0) { - // If flush frequency is set, we flush the queue at intervals. We - // allow disabling this to avoid out of band flushing occuring for - // which we would not be able to explicitly handle. - this.flushInterval = setInterval(async () => { - // :TRICKY: Swallow uncaught errors from flush as flush is already doing custom error reporting which would get lost. - try { - await this.flush() - } catch (err) {} - }, this.flushFrequencyMs) - } } - queueMessage(kafkaMessage: ProducerRecord): Promise { - return runInSpan( - { - op: 'kafka.queueMessage', - description: kafkaMessage.topic, - }, - async () => { - const messageSize = this.estimateMessageSize(kafkaMessage) + async queueMessage(kafkaMessage: ProducerRecord) { + try { + return await Promise.all( + kafkaMessage.messages.map((message) => + produce({ + producer: this.producer, + topic: kafkaMessage.topic, + key: message.key ? Buffer.from(message.key) : null, + value: message.value ? Buffer.from(message.value) : null, + }) + ) + ) + } catch (error) { + status.error('⚠️', 'kafka_produce_error', { error: error, topic: kafkaMessage.topic }) - if (this.currentBatch.length > 0 && this.currentBatchSize + messageSize > this.maxBatchSize) { - // :TRICKY: We want to first flush then immediately add the message to the queue. Awaiting and then pushing would result in a race condition. - await this.flush(kafkaMessage) - } else { - this.currentBatch.push(kafkaMessage) - this.currentBatchSize += messageSize - - const timeSinceLastFlush = Date.now() - this.lastFlushTime - if ( - this.currentBatchSize > this.maxBatchSize || - timeSinceLastFlush > this.flushFrequencyMs || - this.currentBatch.length >= this.maxQueueSize - ) { - await this.flush() - } - } + if ((error as LibrdKafkaError).isRetriable) { + // If we get a retriable error, bubble that up so that the + // caller can retry. + throw new DependencyUnavailableError(error.message, 'Kafka', error) } - ) + + throw error + } } async queueMessages(kafkaMessages: ProducerRecord[]): Promise { - for (const message of kafkaMessages) { - await this.queueMessage(message) - } + await Promise.all(kafkaMessages.map((message) => this.queueMessage(message))) } async queueSingleJsonMessage(topic: string, key: Message['key'], object: Record): Promise { @@ -115,68 +59,12 @@ export class KafkaProducerWrapper { }) } - public flush(append?: ProducerRecord): Promise { - if (this.currentBatch.length === 0) { - return Promise.resolve() - } - - return instrument(this.statsd, { metricName: 'query.kafka_send' }, async () => { - const messages = this.currentBatch - const batchSize = this.currentBatchSize - this.lastFlushTime = Date.now() - this.currentBatch = append ? [append] : [] - this.currentBatchSize = append ? this.estimateMessageSize(append) : 0 - - this.statsd?.histogram('query.kafka_send.size', batchSize) - const timeout = timeoutGuard('Kafka message sending delayed. Waiting over 30 sec to send messages.') - - try { - await this.producer.sendBatch({ - topicMessages: messages, - compression: CompressionTypes.Snappy, - }) - } catch (err) { - Sentry.captureException(err, { - extra: { - batchCount: messages.length, - topics: messages.map((record) => record.topic), - messageCounts: messages.map((record) => record.messages.length), - estimatedSize: batchSize, - }, - }) - // :TODO: Implement some retrying, https://github.com/PostHog/plugin-server/issues/511 - this.statsd?.increment('query.kafka_send.failure', { - firstTopic: messages[0].topic, - }) - status.error('⚠️', 'Failed to flush kafka messages that were produced', { - error: err, - batchCount: messages.length, - topics: messages.map((record) => record.topic), - messageCounts: messages.map((record) => record.messages.length), - estimatedSize: batchSize, - }) - - if (err instanceof KafkaJSNumberOfRetriesExceeded && err.cause instanceof KafkaJSError) { - if (err.cause.retriable === true) { - throw new DependencyUnavailableError(err.cause.name, 'Kafka', err.cause) - } - } - - throw err - } finally { - clearTimeout(timeout) - } - }) + public async flush() { + return await flushProducer(this.producer) } public async disconnect(): Promise { - clearInterval(this.flushInterval) await this.flush() - await this.producer.disconnect() - } - - private estimateMessageSize(kafkaMessage: ProducerRecord): number { - // :TRICKY: This length respects unicode - return Buffer.from(JSON.stringify(kafkaMessage)).length + await disconnectProducer(this.producer) } } diff --git a/plugin-server/src/worker/worker.ts b/plugin-server/src/worker/worker.ts index 47ce71808d5..e45f9b2c24f 100644 --- a/plugin-server/src/worker/worker.ts +++ b/plugin-server/src/worker/worker.ts @@ -120,8 +120,7 @@ export function processUnhandledException(error: Error, server: Hub, kind: strin }, }) - status.error('🤮', `${kind}!`) - status.error('🤮', error) + status.error('🤮', `${kind}!`, { error, stack: error.stack }) } const jobDuration = new Histogram({ diff --git a/plugin-server/tests/e2e.buffer.test.ts b/plugin-server/tests/e2e.buffer.test.ts deleted file mode 100644 index 7564c379c08..00000000000 --- a/plugin-server/tests/e2e.buffer.test.ts +++ /dev/null @@ -1,153 +0,0 @@ -import IORedis from 'ioredis' - -import { ONE_HOUR } from '../src/config/constants' -import { startPluginsServer } from '../src/main/pluginsServer' -import { LogLevel, PluginsServerConfig } from '../src/types' -import { Hub } from '../src/types' -import { UUIDT } from '../src/utils/utils' -import { makePiscina } from '../src/worker/piscina' -import { createPosthog, DummyPostHog } from '../src/worker/vm/extensions/posthog' -import { writeToFile } from '../src/worker/vm/extensions/test-utils' -import { delayUntilEventIngested, resetTestDatabaseClickhouse } from './helpers/clickhouse' -import { resetKafka } from './helpers/kafka' -import { pluginConfig39 } from './helpers/plugins' -import { resetTestDatabase } from './helpers/sql' -const { console: testConsole } = writeToFile - -jest.setTimeout(60000) // 60 sec timeout - -const indexJs = ` -import { console as testConsole } from 'test-utils/write-to-file' - -export async function processEvent (event) { - testConsole.log('processEvent') - console.info('amogus') - event.properties.processed = 'hell yes' - event.properties.upperUuid = event.properties.uuid?.toUpperCase() - event.properties['$snapshot_data'] = 'no way' - return event -} - -export function onEvent (event, { global }) { - // we use this to mock setupPlugin being - // run after some events were already ingested - global.timestampBoundariesForTeam = { - max: new Date(), - min: new Date(Date.now()-${ONE_HOUR}) - } - testConsole.log('onEvent', event.event) -}` - -describe('E2E with buffer topic enabled', () => { - let hub: Hub - let stopServer: () => Promise - let posthog: DummyPostHog - let redis: IORedis.Redis - - const extraServerConfig: Partial = { - WORKER_CONCURRENCY: 1, - LOG_LEVEL: LogLevel.Log, - KAFKA_PRODUCER_MAX_QUEUE_SIZE: 100, // The default in tests is 0 but here we specifically want to test batching - KAFKA_FLUSH_FREQUENCY_MS: 0, // Same as above, but with time - BUFFER_CONVERSION_SECONDS: 3, // We want to test the delay mechanism, but with a much lower delay than in prod - CONVERSION_BUFFER_ENABLED: true, - CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '2', // For these tests we want to validate that we function correctly with the buffer topic enabled - } - - beforeEach(async () => { - testConsole.reset() - await resetTestDatabase(indexJs) - await resetTestDatabaseClickhouse(extraServerConfig) - await resetKafka(extraServerConfig) - const startResponse = await startPluginsServer(extraServerConfig, makePiscina) - hub = startResponse.hub - stopServer = startResponse.stop - redis = await hub.redisPool.acquire() - posthog = createPosthog(hub, pluginConfig39) - }) - - afterEach(async () => { - await hub.redisPool.release(redis) - await stopServer() - }) - - describe('ClickHouse ingestion', () => { - test('event captured, processed, ingested', async () => { - expect((await hub.db.fetchEvents()).length).toBe(0) - - const uuid = new UUIDT().toString() - - await posthog.capture('custom event via buffer', { name: 'hehe', uuid }) - await hub.kafkaProducer.flush() - - await delayUntilEventIngested(() => hub.db.fetchEvents(), undefined, undefined, 500) - const events = await hub.db.fetchEvents() - - expect(events.length).toBe(1) - - // processEvent ran and modified - expect(events[0].properties.processed).toEqual('hell yes') - expect(events[0].properties.upperUuid).toEqual(uuid.toUpperCase()) - - // onEvent ran - expect(testConsole.read()).toEqual([['processEvent'], ['onEvent', 'custom event via buffer']]) - }) - }) -}) - -describe('E2E with direct to graphile worker', () => { - let hub: Hub - let stopServer: () => Promise - let posthog: DummyPostHog - let redis: IORedis.Redis - - const extraServerConfig: Partial = { - WORKER_CONCURRENCY: 1, - LOG_LEVEL: LogLevel.Log, - KAFKA_PRODUCER_MAX_QUEUE_SIZE: 100, // The default in tests is 0 but here we specifically want to test batching - KAFKA_FLUSH_FREQUENCY_MS: 0, // Same as above, but with time - BUFFER_CONVERSION_SECONDS: 3, // We want to test the delay mechanism, but with a much lower delay than in prod - CONVERSION_BUFFER_ENABLED: true, - CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '', - } - - beforeEach(async () => { - testConsole.reset() - await resetTestDatabase(indexJs) - await resetTestDatabaseClickhouse(extraServerConfig) - await resetKafka(extraServerConfig) - const startResponse = await startPluginsServer(extraServerConfig, makePiscina) - hub = startResponse.hub - stopServer = startResponse.stop - redis = await hub.redisPool.acquire() - posthog = createPosthog(hub, pluginConfig39) - }) - - afterEach(async () => { - await hub.redisPool.release(redis) - await stopServer() - }) - - describe('ClickHouse ingestion', () => { - test('event captured, processed, ingested', async () => { - expect((await hub.db.fetchEvents()).length).toBe(0) - - const uuid = new UUIDT().toString() - - await posthog.capture('custom event via buffer', { name: 'hehe', uuid }) - await hub.kafkaProducer.flush() - - await delayUntilEventIngested(() => hub.db.fetchEvents(), undefined, undefined, 500) - const events = await hub.db.fetchEvents() - - expect(events.length).toBe(1) - - // processEvent ran and modified - expect(events[0].properties.processed).toEqual('hell yes') - expect(events[0].properties.upperUuid).toEqual(uuid.toUpperCase()) - - // onEvent ran - expect(testConsole.read()).toEqual([['processEvent'], ['onEvent', 'custom event via buffer']]) - }) - }) -}) diff --git a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts index 39787bcf242..a35ac4089ea 100644 --- a/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts +++ b/plugin-server/tests/main/ingestion-queues/run-async-handlers-event-pipeline.test.ts @@ -15,9 +15,8 @@ // 2. the KafkaQueue consumer handler will let the error bubble up to the // KafkaJS consumer runner, which we assume will handle retries. -import { RetryError } from '@posthog/plugin-scaffold' import Redis from 'ioredis' -import { KafkaJSError } from 'kafkajs' +import LibrdKafkaError from 'node-rdkafka-acosom/lib/error' import { defaultConfig } from '../../../src/config/config' import { KAFKA_EVENTS_JSON } from '../../../src/config/kafka-topics' @@ -28,6 +27,7 @@ import { createHub } from '../../../src/utils/db/hub' import { UUIDT } from '../../../src/utils/utils' import Piscina, { makePiscina } from '../../../src/worker/piscina' import { setupPlugins } from '../../../src/worker/plugins/setup' +import { teardownPlugins } from '../../../src/worker/plugins/teardown' import { createTaskRunner } from '../../../src/worker/worker' import { createOrganization, @@ -37,6 +37,8 @@ import { POSTGRES_DELETE_TABLES_QUERY, } from '../../helpers/sql' +jest.setTimeout(10000) + describe('workerTasks.runAsyncHandlersEventPipeline()', () => { // Tests the failure cases for the workerTasks.runAsyncHandlersEventPipeline // task. Note that this equally applies to e.g. runEventPipeline task as @@ -51,29 +53,31 @@ describe('workerTasks.runAsyncHandlersEventPipeline()', () => { let closeHub: () => Promise let piscinaTaskRunner: ({ task, args }) => Promise + beforeEach(() => { + // Use fake timers to ensure that we don't need to wait on e.g. retry logic. + jest.useFakeTimers({ advanceTimers: true }) + }) + beforeAll(async () => { + jest.useFakeTimers({ advanceTimers: true }) ;[hub, closeHub] = await createHub() redis = await hub.redisPool.acquire() piscinaTaskRunner = createTaskRunner(hub) await hub.postgres.query(POSTGRES_DELETE_TABLES_QUERY) // Need to clear the DB to avoid unique constraint violations on ids }) - afterAll(async () => { - await hub.redisPool.release(redis) - await closeHub() - }) - - beforeEach(() => { - // Use fake timers to ensure that we don't need to wait on e.g. retry logic. - jest.useFakeTimers({ advanceTimers: 30 }) - }) - afterEach(() => { - jest.clearAllTimers() + jest.runAllTimers() jest.useRealTimers() jest.clearAllMocks() }) + afterAll(async () => { + await hub.redisPool.release(redis) + await teardownPlugins(hub) + await closeHub() + }) + test('throws on produce errors', async () => { // To ensure that producer errors are retried and not swallowed, we need // to ensure that these are bubbled up to the main consumer loop. Note @@ -101,10 +105,19 @@ describe('workerTasks.runAsyncHandlersEventPipeline()', () => { await createPluginConfig(hub.postgres, { team_id: teamId, plugin_id: plugin.id }) await setupPlugins(hub) - jest.spyOn(hub.kafkaProducer.producer, 'send').mockImplementationOnce(() => { - return Promise.reject(new KafkaJSError('Failed to produce')) + const error = new LibrdKafkaError({ + name: 'Failed to produce', + message: 'Failed to produce', + code: 1, + errno: 1, + origin: 'test', + isRetriable: true, }) + jest.spyOn(hub.kafkaProducer.producer, 'produce').mockImplementation( + (topic, partition, message, key, timestamp, cb) => cb(error) + ) + await expect( piscinaTaskRunner({ task: 'runAsyncHandlersEventPipeline', @@ -119,70 +132,7 @@ describe('workerTasks.runAsyncHandlersEventPipeline()', () => { }, }, }) - ).rejects.toEqual( - new DependencyUnavailableError('Failed to produce', 'Kafka', new KafkaJSError('Failed to produce')) - ) - }) - - test('retry on RetryError', async () => { - // If we receive a `RetryError`, we should retry the task within the - // pipeline rather than throwing it to the main consumer loop. - // Note that we assume the retries are happening async as is the - // currently functionality, i.e. outside of the consumer loop, but we - // should arguably move this to a separate retry topic. - const organizationId = await createOrganization(hub.postgres) - const plugin = await createPlugin(hub.postgres, { - organization_id: organizationId, - name: 'runEveryMinute plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` - export async function onEvent(event, { jobs }) { - await jobs.test().runNow() - } - - export const jobs = { - test: async () => {} - } - `, - }) - - const teamId = await createTeam(hub.postgres, organizationId) - await createPluginConfig(hub.postgres, { team_id: teamId, plugin_id: plugin.id }) - await setupPlugins(hub) - - // This isn't strictly correct in terms of where this is being raised - // from i.e. `producer.send` doesn't ever raise a `RetryError`, but - // it was just convenient to do so and is hopefully close enough to - // reality. - // NOTE: we only mock once such that the second call will succeed - jest.spyOn(hub.kafkaProducer.producer, 'send').mockImplementationOnce(() => { - return Promise.reject(new RetryError('retry error')) - }) - - const event = { - distinctId: 'asdf', - ip: '', - teamId: teamId, - event: 'some event', - properties: {}, - eventUuid: new UUIDT().toString(), - } - - await expect( - piscinaTaskRunner({ - task: 'runAsyncHandlersEventPipeline', - args: { event }, - }) - ).resolves.toEqual({ - args: [expect.objectContaining(event)], - lastStep: 'runAsyncHandlersStep', - }) - - // Ensure the retry call is made. - jest.runOnlyPendingTimers() - - expect(hub.kafkaProducer.producer.send).toHaveBeenCalledTimes(2) + ).rejects.toEqual(new DependencyUnavailableError('Failed to produce', 'Kafka', error)) }) test(`doesn't throw on arbitrary failures`, async () => { @@ -238,19 +188,23 @@ describe('eachBatchAsyncHandlers', () => { let piscina: Piscina beforeEach(async () => { + jest.useFakeTimers({ advanceTimers: true }) ;[hub, closeHub] = await createHub() }) afterEach(async () => { await closeHub?.() + jest.useRealTimers() }) test('rejections from piscina are bubbled up to the consumer', async () => { piscina = await makePiscina(defaultConfig, hub) const ingestionConsumer = buildOnEventIngestionConsumer({ hub, piscina }) + const error = new LibrdKafkaError({ message: 'test', code: 1, errno: 1, origin: 'test', isRetriable: true }) + jest.spyOn(ingestionConsumer, 'eachBatch').mockRejectedValue( - new DependencyUnavailableError('Failed to produce', 'Kafka', new KafkaJSError('Failed to produce')) + new DependencyUnavailableError('Failed to produce', 'Kafka', error) ) await expect( @@ -293,8 +247,6 @@ describe('eachBatchAsyncHandlers', () => { uncommittedOffsets: jest.fn(), pause: jest.fn(), }) - ).rejects.toEqual( - new DependencyUnavailableError('Failed to produce', 'Kafka', new KafkaJSError('Failed to produce')) - ) + ).rejects.toEqual(new DependencyUnavailableError('Failed to produce', 'Kafka', error)) }) }) diff --git a/plugin-server/tests/main/jobs/schedule.test.ts b/plugin-server/tests/main/jobs/schedule.test.ts index 0b4a1cc66d9..6c280d5e96b 100644 --- a/plugin-server/tests/main/jobs/schedule.test.ts +++ b/plugin-server/tests/main/jobs/schedule.test.ts @@ -1,5 +1,3 @@ -import { Producer } from 'kafkajs' - import { KAFKA_SCHEDULED_TASKS } from '../../../src/config/kafka-topics' import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule' import { Hub } from '../../../src/types' @@ -31,10 +29,8 @@ describe('Graphile Worker schedule', () => { runEveryDay: [7, 8, 9], }, kafkaProducer: { - producer: { - send: jest.fn(), - } as unknown as Producer, - } as KafkaProducerWrapper, + queueMessage: jest.fn(), + } as unknown as KafkaProducerWrapper, USE_KAFKA_FOR_SCHEDULED_TASKS: true, } @@ -42,7 +38,7 @@ describe('Graphile Worker schedule', () => { job: { run_at: new Date() }, } as any) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(1, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(1, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -54,7 +50,7 @@ describe('Graphile Worker schedule', () => { }, ], }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(2, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(2, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -66,7 +62,7 @@ describe('Graphile Worker schedule', () => { }, ], }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(3, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(3, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -82,7 +78,7 @@ describe('Graphile Worker schedule', () => { await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryHour', { job: { run_at: new Date() }, } as any) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(4, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(4, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -94,7 +90,7 @@ describe('Graphile Worker schedule', () => { }, ], }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(5, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(5, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -106,7 +102,7 @@ describe('Graphile Worker schedule', () => { }, ], }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(6, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(6, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -122,7 +118,7 @@ describe('Graphile Worker schedule', () => { await runScheduledTasks(mockHubWithPluginSchedule, mockPiscina as any, 'runEveryDay', { job: { run_at: new Date() }, } as any) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(7, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(7, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -134,7 +130,7 @@ describe('Graphile Worker schedule', () => { }, ], }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(8, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(8, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { @@ -146,7 +142,7 @@ describe('Graphile Worker schedule', () => { }, ], }) - expect(mockHubWithPluginSchedule.kafkaProducer.producer.send).toHaveBeenNthCalledWith(9, { + expect(mockHubWithPluginSchedule.kafkaProducer.queueMessage).toHaveBeenNthCalledWith(9, { topic: KAFKA_SCHEDULED_TASKS, messages: [ { diff --git a/plugin-server/tests/main/kafka-producer-wrapper.test.ts b/plugin-server/tests/main/kafka-producer-wrapper.test.ts deleted file mode 100644 index 9c6d606b864..00000000000 --- a/plugin-server/tests/main/kafka-producer-wrapper.test.ts +++ /dev/null @@ -1,164 +0,0 @@ -import { CompressionTypes, Producer } from 'kafkajs' - -import { PluginsServerConfig } from '../../src/types' -import { KafkaProducerWrapper } from '../../src/utils/db/kafka-producer-wrapper' - -describe('KafkaProducerWrapper', () => { - let producer: KafkaProducerWrapper - let mockKafkaProducer: Producer - let flushSpy: any - - beforeEach(() => { - jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:05').getTime()) - - mockKafkaProducer = { sendBatch: jest.fn() } as any - producer = new KafkaProducerWrapper(mockKafkaProducer, undefined, { - KAFKA_FLUSH_FREQUENCY_MS: 20000, - KAFKA_PRODUCER_MAX_QUEUE_SIZE: 4, - KAFKA_MAX_MESSAGE_BATCH_SIZE: 500, - } as PluginsServerConfig) - clearInterval(producer.flushInterval) - - flushSpy = jest.spyOn(producer, 'flush') - }) - - describe('queueMessage()', () => { - it('respects MAX_QUEUE_SIZE', async () => { - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(10) }], - }) - await producer.queueMessage({ - topic: 'b', - messages: [{ value: '1'.repeat(30) }], - }) - await producer.queueMessage({ - topic: 'b', - messages: [{ value: '1'.repeat(30) }], - }) - - expect(flushSpy).not.toHaveBeenCalled() - expect(producer.currentBatch.length).toEqual(3) - - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(30) }], - }) - - expect(flushSpy).toHaveBeenCalled() - expect(producer.currentBatch.length).toEqual(0) - expect(producer.currentBatchSize).toEqual(0) - expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({ - compression: CompressionTypes.Snappy, - topicMessages: [expect.anything(), expect.anything(), expect.anything(), expect.anything()], - }) - }) - - it('respects KAFKA_MAX_MESSAGE_BATCH_SIZE', async () => { - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(400) }], - }) - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(20) }], - }) - expect(flushSpy).not.toHaveBeenCalled() - expect(producer.currentBatch.length).toEqual(2) - - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(40) }], - }) - - expect(flushSpy).toHaveBeenCalled() - - expect(producer.currentBatch.length).toEqual(1) - expect(producer.currentBatchSize).toBeGreaterThan(40) - expect(producer.currentBatchSize).toBeLessThan(100) - expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({ - compression: CompressionTypes.Snappy, - topicMessages: [expect.anything(), expect.anything()], - }) - }) - - it('flushes immediately when message exceeds KAFKA_MAX_MESSAGE_BATCH_SIZE', async () => { - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(10000) }], - }) - - expect(flushSpy).toHaveBeenCalled() - - expect(producer.currentBatch.length).toEqual(0) - expect(producer.currentBatchSize).toEqual(0) - expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({ - compression: CompressionTypes.Snappy, - topicMessages: [expect.anything()], - }) - }) - - it('respects KAFKA_FLUSH_FREQUENCY_MS', async () => { - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(10) }], - }) - - jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:20').getTime()) - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(10) }], - }) - - expect(flushSpy).not.toHaveBeenCalled() - expect(producer.currentBatch.length).toEqual(2) - - jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:26').getTime()) - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(10) }], - }) - - expect(flushSpy).toHaveBeenCalled() - - expect(producer.currentBatch.length).toEqual(0) - expect(producer.lastFlushTime).toEqual(Date.now()) - expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({ - compression: CompressionTypes.Snappy, - topicMessages: [expect.anything(), expect.anything(), expect.anything()], - }) - }) - }) - - describe('flush()', () => { - it('flushes messages in memory', async () => { - await producer.queueMessage({ - topic: 'a', - messages: [{ value: '1'.repeat(10) }], - }) - - jest.spyOn(global.Date, 'now').mockImplementation(() => new Date('2020-02-27 11:00:15').getTime()) - - await producer.flush() - - expect(mockKafkaProducer.sendBatch).toHaveBeenCalledWith({ - compression: CompressionTypes.Snappy, - topicMessages: [ - { - topic: 'a', - messages: [{ value: '1'.repeat(10) }], - }, - ], - }) - expect(producer.currentBatch.length).toEqual(0) - expect(producer.currentBatchSize).toEqual(0) - expect(producer.lastFlushTime).toEqual(Date.now()) - }) - - it('does nothing if nothing queued', async () => { - await producer.flush() - - expect(mockKafkaProducer.sendBatch).not.toHaveBeenCalled() - }) - }) -})