2022-05-24 09:10:52 +02:00
|
|
|
import Piscina from '@posthog/piscina'
|
|
|
|
|
|
|
|
import { KafkaQueue } from '../../src/main/ingestion-queues/kafka-queue'
|
|
|
|
import { startQueues } from '../../src/main/ingestion-queues/queue'
|
2022-10-11 20:40:34 +02:00
|
|
|
import { startJobsConsumer } from '../../src/main/jobs/job-queue-consumer'
|
2022-05-24 09:10:52 +02:00
|
|
|
import { Hub, LogLevel } from '../../src/types'
|
|
|
|
import { createHub } from '../../src/utils/db/hub'
|
|
|
|
|
|
|
|
jest.mock('../../src/main/ingestion-queues/kafka-queue')
|
|
|
|
|
2022-07-20 14:16:13 +02:00
|
|
|
describe('capabilities', () => {
|
|
|
|
let hub: Hub
|
|
|
|
let piscina: Piscina
|
|
|
|
let closeHub: () => Promise<void>
|
2022-05-24 09:10:52 +02:00
|
|
|
|
2022-07-20 14:16:13 +02:00
|
|
|
beforeEach(async () => {
|
|
|
|
;[hub, closeHub] = await createHub({
|
|
|
|
LOG_LEVEL: LogLevel.Warn,
|
2022-05-24 09:10:52 +02:00
|
|
|
})
|
2022-07-20 14:16:13 +02:00
|
|
|
piscina = { run: jest.fn() } as any
|
|
|
|
})
|
2022-05-24 09:10:52 +02:00
|
|
|
|
2022-07-20 14:16:13 +02:00
|
|
|
afterEach(async () => {
|
|
|
|
await closeHub()
|
|
|
|
})
|
2022-05-24 09:10:52 +02:00
|
|
|
|
2022-07-20 14:16:13 +02:00
|
|
|
describe('queue', () => {
|
2022-05-24 09:10:52 +02:00
|
|
|
it('starts ingestion queue by default', async () => {
|
|
|
|
const queues = await startQueues(hub, piscina)
|
|
|
|
|
|
|
|
expect(queues).toEqual({
|
|
|
|
ingestion: expect.any(KafkaQueue),
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
it('handles ingestion being turned off', async () => {
|
|
|
|
hub.capabilities.ingestion = false
|
2022-05-25 13:50:36 +02:00
|
|
|
hub.capabilities.processAsyncHandlers = false
|
2022-05-24 09:10:52 +02:00
|
|
|
|
|
|
|
const queues = await startQueues(hub, piscina)
|
|
|
|
|
|
|
|
expect(queues).toEqual({
|
|
|
|
ingestion: null,
|
|
|
|
})
|
|
|
|
})
|
|
|
|
})
|
2022-07-20 14:16:13 +02:00
|
|
|
|
2022-10-11 20:40:34 +02:00
|
|
|
describe('startJobsConsumer()', () => {
|
2022-07-20 14:16:13 +02:00
|
|
|
it('sets up bufferJob handler if ingestion is on', async () => {
|
2022-10-11 20:40:34 +02:00
|
|
|
hub.graphileWorker.startConsumer = jest.fn()
|
2022-07-20 14:16:13 +02:00
|
|
|
hub.capabilities.ingestion = true
|
|
|
|
hub.capabilities.processPluginJobs = false
|
|
|
|
|
2022-10-11 20:40:34 +02:00
|
|
|
await startJobsConsumer(hub, piscina)
|
2022-07-20 14:16:13 +02:00
|
|
|
|
2022-10-11 20:40:34 +02:00
|
|
|
expect(hub.graphileWorker.startConsumer).toHaveBeenCalledWith({
|
2022-07-20 14:16:13 +02:00
|
|
|
bufferJob: expect.anything(),
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
it('sets up pluginJob handler if processPluginJobs is on', async () => {
|
2022-10-11 20:40:34 +02:00
|
|
|
hub.graphileWorker.startConsumer = jest.fn()
|
2022-07-20 14:16:13 +02:00
|
|
|
hub.capabilities.ingestion = false
|
|
|
|
hub.capabilities.processPluginJobs = true
|
|
|
|
|
2022-10-11 20:40:34 +02:00
|
|
|
await startJobsConsumer(hub, piscina)
|
2022-07-20 14:16:13 +02:00
|
|
|
|
2022-10-11 20:40:34 +02:00
|
|
|
expect(hub.graphileWorker.startConsumer).toHaveBeenCalledWith({
|
2022-07-20 14:16:13 +02:00
|
|
|
pluginJob: expect.anything(),
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
it('sets up bufferJob and pluginJob handlers if ingestion and processPluginJobs are on', async () => {
|
2022-10-11 20:40:34 +02:00
|
|
|
hub.graphileWorker.startConsumer = jest.fn()
|
2022-07-20 14:16:13 +02:00
|
|
|
hub.capabilities.ingestion = true
|
|
|
|
hub.capabilities.processPluginJobs = true
|
|
|
|
|
2022-10-11 20:40:34 +02:00
|
|
|
await startJobsConsumer(hub, piscina)
|
2022-07-20 14:16:13 +02:00
|
|
|
|
2022-10-11 20:40:34 +02:00
|
|
|
expect(hub.graphileWorker.startConsumer).toHaveBeenCalledWith({
|
2022-07-20 14:16:13 +02:00
|
|
|
bufferJob: expect.anything(),
|
|
|
|
pluginJob: expect.anything(),
|
|
|
|
})
|
|
|
|
})
|
|
|
|
})
|
2022-05-24 09:10:52 +02:00
|
|
|
})
|