2023-06-18 21:36:33 +02:00
|
|
|
import { Consumer, Kafka, KafkaMessage, logLevel } from 'kafkajs'
|
|
|
|
|
|
|
|
import { defaultConfig } from '../../src/config/config'
|
|
|
|
import { UUIDT } from '../../src/utils/utils'
|
|
|
|
import { capture, createOrganization, createTeam } from '../api'
|
|
|
|
import { waitForExpect } from '../expectations'
|
|
|
|
|
|
|
|
let kafka: Kafka
|
|
|
|
let organizationId: string
|
|
|
|
|
2023-11-03 15:56:46 +01:00
|
|
|
let warningMessages: KafkaMessage[]
|
|
|
|
let warningConsumer: Consumer
|
2023-06-18 21:36:33 +02:00
|
|
|
|
|
|
|
beforeAll(async () => {
|
|
|
|
kafka = new Kafka({ brokers: [defaultConfig.KAFKA_HOSTS], logLevel: logLevel.NOTHING })
|
|
|
|
|
2023-11-03 15:56:46 +01:00
|
|
|
// Make sure the ingest warnings topic exists before starting the consumer
|
2023-07-13 11:49:01 +02:00
|
|
|
const admin = kafka.admin()
|
2023-11-03 15:56:46 +01:00
|
|
|
const topic = 'clickhouse_ingestion_warnings' // note: functional tests don't use _test suffix as in config
|
|
|
|
await admin.createTopics({ topics: [{ topic: topic }] })
|
2023-07-13 11:49:01 +02:00
|
|
|
await admin.disconnect()
|
|
|
|
|
2023-11-03 15:56:46 +01:00
|
|
|
warningMessages = []
|
|
|
|
warningConsumer = kafka.consumer({ groupId: 'events_plugin_ingestion_test' })
|
|
|
|
await warningConsumer.subscribe({ topic: topic, fromBeginning: true })
|
|
|
|
await warningConsumer.run({
|
2023-06-18 21:36:33 +02:00
|
|
|
eachMessage: ({ message }) => {
|
2023-11-03 15:56:46 +01:00
|
|
|
warningMessages.push(message)
|
2023-06-18 21:36:33 +02:00
|
|
|
return Promise.resolve()
|
|
|
|
},
|
|
|
|
})
|
|
|
|
|
|
|
|
organizationId = await createOrganization()
|
|
|
|
})
|
|
|
|
|
2023-07-11 12:40:40 +02:00
|
|
|
afterAll(async () => {
|
2023-11-03 15:56:46 +01:00
|
|
|
await warningConsumer.disconnect()
|
2023-07-11 12:40:40 +02:00
|
|
|
})
|
|
|
|
|
2023-11-03 15:56:46 +01:00
|
|
|
test.concurrent('consumer produces ingest warnings for messages over 1MB', async () => {
|
2023-06-18 21:36:33 +02:00
|
|
|
// For this we basically want the plugin-server to try and produce a new
|
|
|
|
// message larger than 1MB. We do this by creating a person with a lot of
|
|
|
|
// properties. We will end up denormalizing the person properties onto the
|
|
|
|
// event, which already has the properties as $set therefore resulting in a
|
|
|
|
// message that's larger than 1MB. There may also be other attributes that
|
|
|
|
// are added to the event which pushes it over the limit.
|
|
|
|
//
|
2023-11-03 15:56:46 +01:00
|
|
|
// We verify that this is handled by checking that there is a message in the
|
|
|
|
// appropriate topic.
|
2023-06-18 21:36:33 +02:00
|
|
|
const token = new UUIDT().toString()
|
|
|
|
const teamId = await createTeam(organizationId, undefined, token)
|
|
|
|
const distinctId = new UUIDT().toString()
|
|
|
|
|
|
|
|
const personProperties = {
|
|
|
|
distinct_id: distinctId,
|
|
|
|
$set: {},
|
|
|
|
}
|
|
|
|
|
|
|
|
for (let i = 0; i < 10000; i++) {
|
|
|
|
personProperties.$set[new UUIDT().toString()] = new UUIDT().toString()
|
|
|
|
}
|
|
|
|
|
|
|
|
const personEventUuid = new UUIDT().toString()
|
|
|
|
await capture({
|
|
|
|
teamId,
|
|
|
|
distinctId,
|
|
|
|
uuid: personEventUuid,
|
|
|
|
event: '$identify',
|
|
|
|
properties: personProperties,
|
|
|
|
})
|
|
|
|
|
2023-11-03 15:56:46 +01:00
|
|
|
// Verify we have a message corresponding to the input event.
|
|
|
|
await waitForExpect(() => {
|
|
|
|
const [message] = warningMessages.filter((message: KafkaMessage) => {
|
|
|
|
if (message.value) {
|
|
|
|
const payload = JSON.parse(message.value.toString())
|
|
|
|
const details = JSON.parse(payload.details)
|
|
|
|
return details.eventUuid === personEventUuid && details.distinctId === distinctId
|
|
|
|
}
|
|
|
|
})
|
2023-06-18 21:36:33 +02:00
|
|
|
expect(message).toBeDefined()
|
|
|
|
return message
|
|
|
|
})
|
|
|
|
})
|