0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-25 11:17:50 +01:00
posthog/plugin-server/tests/main/capabilities.test.ts
Yakko Majuri a34228c49f
refactor: yeet job queues scaffolding in favor of only graphile worker (#12178)
* refactor: rename graphile queue to graphile worker

* refactor: rename job-queues/ to jobs/

* refactor: move graphile-worker to top level jobs/ dir

* refactor: remove references to jobQueueManager

* remove promise from startJobQueueConsumer

* remove job-queue-manager.ts

* remove non-test references to JobQueueBase

* make fs-queue independent from JobQueueBase

* rename FsQueue to MockGraphileWorker

* add missing pauseConsumer method to MockGraphileWorker

* rename fs-queue.ts --> mock-graphile-worker.ts

* delete job-queue-base.ts

* get rid of JobQueue type

* rename graphileQueue --> graphileWorker

* rename JobQueueConsumerControl --> JobsConsumerControl

* remove unused jobs test

* rename startJobQueueConsumer --> startJobsConsumer

* fix tests job imports

* rename jobQueueManager --> graphileWorker in tests

* remove JobQueueManager tests

* fix import

* handle metrics and retries on graphileWorker.enqueue

* minor fix

* Delete buffer.ts

* Revert "Delete buffer.ts"

This reverts commit 40f1761d31.

* add initial test scaffolding

* bring back relevant worker control promises

* fix existing tests

* add tests for graphile worker

* fix exportEvents retries test

* update e2e buffer test
2022-10-11 15:40:34 -03:00

87 lines
2.7 KiB
TypeScript

import Piscina from '@posthog/piscina'
import { KafkaQueue } from '../../src/main/ingestion-queues/kafka-queue'
import { startQueues } from '../../src/main/ingestion-queues/queue'
import { startJobsConsumer } from '../../src/main/jobs/job-queue-consumer'
import { Hub, LogLevel } from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
jest.mock('../../src/main/ingestion-queues/kafka-queue')
describe('capabilities', () => {
let hub: Hub
let piscina: Piscina
let closeHub: () => Promise<void>
beforeEach(async () => {
;[hub, closeHub] = await createHub({
LOG_LEVEL: LogLevel.Warn,
})
piscina = { run: jest.fn() } as any
})
afterEach(async () => {
await closeHub()
})
describe('queue', () => {
it('starts ingestion queue by default', async () => {
const queues = await startQueues(hub, piscina)
expect(queues).toEqual({
ingestion: expect.any(KafkaQueue),
})
})
it('handles ingestion being turned off', async () => {
hub.capabilities.ingestion = false
hub.capabilities.processAsyncHandlers = false
const queues = await startQueues(hub, piscina)
expect(queues).toEqual({
ingestion: null,
})
})
})
describe('startJobsConsumer()', () => {
it('sets up bufferJob handler if ingestion is on', async () => {
hub.graphileWorker.startConsumer = jest.fn()
hub.capabilities.ingestion = true
hub.capabilities.processPluginJobs = false
await startJobsConsumer(hub, piscina)
expect(hub.graphileWorker.startConsumer).toHaveBeenCalledWith({
bufferJob: expect.anything(),
})
})
it('sets up pluginJob handler if processPluginJobs is on', async () => {
hub.graphileWorker.startConsumer = jest.fn()
hub.capabilities.ingestion = false
hub.capabilities.processPluginJobs = true
await startJobsConsumer(hub, piscina)
expect(hub.graphileWorker.startConsumer).toHaveBeenCalledWith({
pluginJob: expect.anything(),
})
})
it('sets up bufferJob and pluginJob handlers if ingestion and processPluginJobs are on', async () => {
hub.graphileWorker.startConsumer = jest.fn()
hub.capabilities.ingestion = true
hub.capabilities.processPluginJobs = true
await startJobsConsumer(hub, piscina)
expect(hub.graphileWorker.startConsumer).toHaveBeenCalledWith({
bufferJob: expect.anything(),
pluginJob: expect.anything(),
})
})
})
})