0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 13:39:22 +01:00

feat(cdp): Use cyclotron part 2 (#24746)

This commit is contained in:
Ben White 2024-09-10 09:05:30 +02:00 committed by GitHub
parent e809e5863f
commit 7fd0fbb207
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1062 additions and 522 deletions

View File

@ -57,7 +57,6 @@ jobs:
defaults:
run:
working-directory: 'plugin-server'
steps:
- uses: actions/checkout@v3
@ -82,6 +81,7 @@ jobs:
tests:
name: Plugin Server Tests (${{matrix.shard}})
needs: changes
if: needs.changes.outputs.plugin-server == 'true'
runs-on: ubuntu-latest
strategy:
@ -97,21 +97,17 @@ jobs:
steps:
- name: Code check out
if: needs.changes.outputs.plugin-server == 'true'
uses: actions/checkout@v3
- name: Stop/Start stack with Docker Compose
if: needs.changes.outputs.plugin-server == 'true'
run: |
docker compose -f docker-compose.dev.yml down
docker compose -f docker-compose.dev.yml up -d
- name: Add Kafka to /etc/hosts
if: needs.changes.outputs.plugin-server == 'true'
run: echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts
- name: Set up Python
if: needs.changes.outputs.plugin-server == 'true'
uses: actions/setup-python@v5
with:
python-version: 3.11.9
@ -122,24 +118,35 @@ jobs:
# uv is a fast pip alternative: https://github.com/astral-sh/uv/
- run: pip install uv
- name: Install rust
uses: dtolnay/rust-toolchain@1.77
- uses: actions/cache@v4
with:
path: |
~/.cargo/registry
~/.cargo/git
rust/target
key: ${{ runner.os }}-cargo-release-${{ hashFiles('**/Cargo.lock') }}
- name: Install sqlx-cli
working-directory: rust
run: cargo install sqlx-cli@0.7.3 --no-default-features --features native-tls,postgres
- name: Install SAML (python3-saml) dependencies
if: needs.changes.outputs.plugin-server == 'true'
run: |
sudo apt-get update
sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl
- name: Install python dependencies
if: needs.changes.outputs.plugin-server == 'true'
run: |
uv pip install --system -r requirements-dev.txt
uv pip install --system -r requirements.txt
- name: Install pnpm
if: needs.changes.outputs.plugin-server == 'true'
uses: pnpm/action-setup@v4
- name: Set up Node.js
if: needs.changes.outputs.plugin-server == 'true'
uses: actions/setup-node@v4
with:
node-version: 18.12.1
@ -147,17 +154,14 @@ jobs:
cache-dependency-path: plugin-server/pnpm-lock.yaml
- name: Install package.json dependencies with pnpm
if: needs.changes.outputs.plugin-server == 'true'
run: cd plugin-server && pnpm i
- name: Wait for Clickhouse, Redis & Kafka
if: needs.changes.outputs.plugin-server == 'true'
run: |
docker compose -f docker-compose.dev.yml up kafka redis clickhouse -d --wait
bin/check_kafka_clickhouse_up
- name: Set up databases
if: needs.changes.outputs.plugin-server == 'true'
env:
TEST: 'true'
SECRET_KEY: 'abcdef' # unsafe - for testing only
@ -165,7 +169,6 @@ jobs:
run: cd plugin-server && pnpm setup:test
- name: Test with Jest
if: needs.changes.outputs.plugin-server == 'true'
env:
# Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB
DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_posthog'

3
.vscode/launch.json vendored
View File

@ -119,7 +119,8 @@
"WORKER_CONCURRENCY": "2",
"OBJECT_STORAGE_ENABLED": "True",
"HOG_HOOK_URL": "http://localhost:3300/hoghook",
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": ""
"CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "",
"CDP_CYCLOTRON_ENABLED_TEAMS": "*"
},
"presentation": {
"group": "main"

View File

@ -1,5 +1,11 @@
#!/bin/bash
set -e
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
# NOTE when running in docker, rust might not exist so we need to check for it
if [ -d "$SCRIPT_DIR/../rust" ]; then
bash $SCRIPT_DIR/../rust/bin/migrate-cyclotron
fi
python manage.py migrate
python manage.py migrate_clickhouse

View File

@ -12,7 +12,7 @@ export RUST_LOG=${DEBUG:-debug}
SQLX_QUERY_LEVEL=${SQLX_QUERY_LEVEL:-warn}
export RUST_LOG=$RUST_LOG,sqlx::query=$SQLX_QUERY_LEVEL
export DATABASE_URL=${DATABASE_URL:-postgres://posthog:posthog@localhost:5432/posthog}
export DATABASE_URL=${CYCLOTRON_DATABASE_URL:-postgres://posthog:posthog@localhost:5432/cyclotron}
export ALLOW_INTERNAL_IPS=${ALLOW_INTERNAL_IPS:-true}
./target/debug/cyclotron-fetch &

View File

@ -23,7 +23,8 @@
"prettier:check": "prettier --check .",
"prepublishOnly": "pnpm build",
"setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse",
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment",
"setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron",
"setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron",
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",

View File

@ -9,7 +9,7 @@ import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogWatcher, HogWatcherState } from './hog-watcher'
import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types'
import { createInvocation } from './utils'
import { createInvocation, queueBlobToString } from './utils'
export class CdpApi {
private hogExecutor: HogExecutor
@ -144,11 +144,19 @@ export class CdpApi {
if (invocation.queue === 'fetch') {
if (mock_async_functions) {
// Add the state, simulating what executeAsyncResponse would do
// Re-parse the fetch args for the logging
const fetchArgs = {
...invocation.queueParameters,
body: queueBlobToString(invocation.queueBlob),
}
response = {
invocation: {
...invocation,
queue: 'hog',
queueParameters: { response: { status: 200, body: {} } },
queueParameters: { response: { status: 200 } },
queueBlob: Buffer.from('{}'),
},
finished: false,
logs: [
@ -160,7 +168,7 @@ export class CdpApi {
{
level: 'info',
timestamp: DateTime.now(),
message: `fetch(${JSON.stringify(invocation.queueParameters, null, 2)})`,
message: `fetch(${JSON.stringify(fetchArgs, null, 2)})`,
},
],
}

View File

@ -1,8 +1,9 @@
import cyclotron from '@posthog/cyclotron'
import { CyclotronJob, CyclotronManager, CyclotronWorker } from '@posthog/cyclotron'
import { captureException } from '@sentry/node'
import { Message } from 'node-rdkafka'
import { Counter, Histogram } from 'prom-client'
import { buildIntegerMatcher } from '../config/config'
import {
KAFKA_APP_METRICS_2,
KAFKA_CDP_FUNCTION_CALLBACKS,
@ -14,7 +15,15 @@ import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../kafka/config'
import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics'
import { runInstrumentedFunction } from '../main/utils'
import { AppMetric2Type, Hub, PluginServerService, RawClickHouseEvent, TeamId, TimestampFormat } from '../types'
import {
AppMetric2Type,
Hub,
PluginServerService,
RawClickHouseEvent,
TeamId,
TimestampFormat,
ValueMatcher,
} from '../types'
import { createKafkaProducerWrapper } from '../utils/db/hub'
import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper'
import { captureTeamEvent } from '../utils/posthog'
@ -31,6 +40,7 @@ import { CdpRedis, createCdpRedisPool } from './redis'
import {
HogFunctionInvocation,
HogFunctionInvocationGlobals,
HogFunctionInvocationQueueParameters,
HogFunctionInvocationResult,
HogFunctionInvocationSerialized,
HogFunctionInvocationSerializedCompressed,
@ -44,7 +54,7 @@ import {
createInvocation,
gzipObject,
prepareLogEntriesForClickhouse,
serializeInvocation,
serializeHogFunctionInvocation,
unGzipObject,
} from './utils'
@ -88,8 +98,6 @@ abstract class CdpConsumerBase {
protected kafkaProducer?: KafkaProducerWrapper
protected abstract name: string
protected abstract topic: string
protected abstract consumerGroupId: string
protected heartbeat = () => {}
@ -108,7 +116,7 @@ abstract class CdpConsumerBase {
public get service(): PluginServerService {
return {
id: this.consumerGroupId,
id: this.name,
onShutdown: async () => await this.stop(),
healthcheck: () => this.isHealthy() ?? false,
batchConsumer: this.batchConsumer,
@ -156,8 +164,6 @@ abstract class CdpConsumerBase {
return results
}
protected abstract _handleKafkaBatch(messages: Message[]): Promise<void>
protected async produceQueuedMessages() {
const messages = [...this.messagesToProduce]
this.messagesToProduce = []
@ -205,20 +211,23 @@ abstract class CdpConsumerBase {
})
}
protected async queueInvocations(invocation: HogFunctionInvocation[]) {
// NOTE: These will be removed once we are only on Cyclotron
protected async queueInvocationsToKafka(invocation: HogFunctionInvocation[]) {
await Promise.all(
invocation.map(async (item) => {
await this.queueInvocation(item)
await this.queueInvocationToKafka(item)
})
)
}
protected async queueInvocation(invocation: HogFunctionInvocation) {
// TODO: Add cylcotron check here and enqueue that way
// For now we just enqueue to kafka
// For kafka style this is overkill to enqueue this way but it simplifies migrating to the new system
protected async queueInvocationToKafka(invocation: HogFunctionInvocation) {
// NOTE: WE keep the queueParams args as kafka land still needs them
const serializedInvocation: HogFunctionInvocationSerialized = {
...invocation,
hogFunctionId: invocation.hogFunction.id,
}
const serializedInvocation = serializeInvocation(invocation)
delete (serializedInvocation as any).hogFunction
const request: HogFunctionInvocationSerializedCompressed = {
state: await gzipObject(serializedInvocation),
@ -234,12 +243,22 @@ abstract class CdpConsumerBase {
}
protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise<void> {
await runInstrumentedFunction({
return await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.produceResults`,
func: async () => {
await this.hogWatcher.observeResults(results)
await Promise.all(
results.map(async (result) => {
// Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions
if (result.finished || result.error) {
this.produceAppMetric({
team_id: result.invocation.teamId,
app_source_id: result.invocation.hogFunction.id,
metric_kind: result.error ? 'failure' : 'success',
metric_name: result.error ? 'failed' : 'succeeded',
count: 1,
})
}
this.produceLogs(result)
@ -258,30 +277,20 @@ abstract class CdpConsumerBase {
key: `${team!.api_token}:${event.distinct_id}`,
})
}
if (result.finished || result.error) {
this.produceAppMetric({
team_id: result.invocation.teamId,
app_source_id: result.invocation.hogFunction.id,
metric_kind: result.error ? 'failure' : 'success',
metric_name: result.error ? 'failed' : 'succeeded',
count: 1,
})
} else {
// Means there is follow up so we enqueue it
await this.queueInvocation(result.invocation)
}
})
)
},
})
}
protected async startKafkaConsumer() {
protected async startKafkaConsumer(options: {
topic: string
groupId: string
handleBatch: (messages: Message[]) => Promise<void>
}): Promise<void> {
this.batchConsumer = await startBatchConsumer({
...options,
connectionConfig: createRdConnectionConfigFromEnvVars(this.hub),
groupId: this.consumerGroupId,
topic: this.topic,
autoCommit: true,
sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS,
maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS,
@ -312,7 +321,7 @@ abstract class CdpConsumerBase {
statsKey: `cdpConsumer.handleEachBatch`,
sendTimeoutGuardToSentry: false,
func: async () => {
await this._handleKafkaBatch(messages)
await options.handleBatch(messages)
},
})
},
@ -322,6 +331,9 @@ abstract class CdpConsumerBase {
addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer)
this.batchConsumer.consumer.on('disconnected', async (err) => {
if (!this.isStopping) {
return
}
// since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect
// we need to listen to disconnect and make sure we're stopped
status.info('🔁', `${this.name} batch consumer disconnected, cleaning up`, { err })
@ -333,15 +345,11 @@ abstract class CdpConsumerBase {
// NOTE: This is only for starting shared services
await Promise.all([
this.hogFunctionManager.start(),
this.hub.CYCLOTRON_DATABASE_URL
? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
: Promise.resolve(),
createKafkaProducerWrapper(this.hub).then((producer) => {
this.kafkaProducer = producer
this.kafkaProducer.producer.connect()
}),
])
this.kafkaProducer = await createKafkaProducerWrapper(this.hub)
this.kafkaProducer.producer.connect()
await this.startKafkaConsumer()
}
public async stop(): Promise<void> {
@ -360,20 +368,27 @@ abstract class CdpConsumerBase {
}
public isHealthy() {
// TODO: Check either kafka consumer or cyclotron worker exists
// and that whatever exists is healthy
return this.batchConsumer?.isHealthy()
}
}
/**
* This consumer handles incoming events from the main clickhouse topic
* Currently it produces to both kafka and Cyclotron based on the team
*/
export class CdpProcessedEventsConsumer extends CdpConsumerBase {
protected name = 'CdpProcessedEventsConsumer'
protected topic = KAFKA_EVENTS_JSON
protected consumerGroupId = 'cdp-processed-events-consumer'
private cyclotronMatcher: ValueMatcher<number>
private cyclotronManager?: CyclotronManager
constructor(hub: Hub) {
super(hub)
this.cyclotronMatcher = buildIntegerMatcher(hub.CDP_CYCLOTRON_ENABLED_TEAMS, true)
}
private cyclotronEnabled(invocation: HogFunctionInvocation): boolean {
return !!(this.cyclotronManager && this.cyclotronMatcher(invocation.globals.project.id))
}
public async processBatch(invocationGlobals: HogFunctionInvocationGlobals[]): Promise<HogFunctionInvocation[]> {
if (!invocationGlobals.length) {
@ -384,23 +399,48 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
this.createHogFunctionInvocations(invocationGlobals)
)
if (this.hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP) {
// NOTE: This is for testing the two ways of enqueueing processing. It will be swapped out for a cyclotron env check
// Kafka based workflow
// Split out the cyclotron invocations
const [cyclotronInvocations, kafkaInvocations] = invocationsToBeQueued.reduce(
(acc, item) => {
if (this.cyclotronEnabled(item)) {
acc[0].push(item)
} else {
acc[1].push(item)
}
return acc
},
[[], []] as [HogFunctionInvocation[], HogFunctionInvocation[]]
)
// For the cyclotron ones we simply create the jobs
await Promise.all(
cyclotronInvocations.map((item) =>
this.cyclotronManager?.createJob({
teamId: item.globals.project.id,
functionId: item.hogFunction.id,
queueName: 'hog',
priority: item.priority,
vmState: serializeHogFunctionInvocation(item),
})
)
)
if (kafkaInvocations.length) {
// As we don't want to over-produce to kafka we invoke the hog functions and then queue the results
const invocationResults = await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeInvocations`,
func: async () => {
const hogResults = await this.runManyWithHeartbeat(invocationsToBeQueued, (item) =>
const hogResults = await this.runManyWithHeartbeat(kafkaInvocations, (item) =>
this.hogExecutor.execute(item)
)
return [...hogResults]
},
})
await this.hogWatcher.observeResults(invocationResults)
await this.processInvocationResults(invocationResults)
} else {
await this.queueInvocations(invocationsToBeQueued)
const newInvocations = invocationResults.filter((r) => !r.finished).map((r) => r.invocation)
await this.queueInvocationsToKafka(newInvocations)
}
await this.produceQueuedMessages()
@ -411,7 +451,6 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
/**
* Finds all matching hog functions for the given globals.
* Filters them for their disabled state as well as masking configs
*
*/
protected async createHogFunctionInvocations(
invocationGlobals: HogFunctionInvocationGlobals[]
@ -444,8 +483,10 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
})
const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id))
const validInvocations: HogFunctionInvocation[] = []
const notDisabledInvocations = possibleInvocations.filter((item) => {
// Iterate over adding them to the list and updating their priority
possibleInvocations.forEach((item) => {
const state = states[item.hogFunction.id].state
if (state >= HogWatcherState.disabledForPeriod) {
this.produceAppMetric({
@ -458,15 +499,19 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
: 'disabled_permanently',
count: 1,
})
return false
return
}
return true
if (state === HogWatcherState.degraded) {
item.priority = 2
}
validInvocations.push(item)
})
// Now we can filter by masking configs
const { masked, notMasked: notMaskedInvocations } = await this.hogMasker.filterByMasking(
notDisabledInvocations
validInvocations
)
masked.forEach((item) => {
@ -525,15 +570,28 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase {
await this.processBatch(invocationGlobals)
}
public async start(): Promise<void> {
await super.start()
await this.startKafkaConsumer({
topic: KAFKA_EVENTS_JSON,
groupId: 'cdp-processed-events-consumer',
handleBatch: (messages) => this._handleKafkaBatch(messages),
})
this.cyclotronManager = this.hub.CYCLOTRON_DATABASE_URL
? new CyclotronManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
: undefined
await this.cyclotronManager?.connect()
}
}
/**
* This consumer handles actually invoking hog in a loop
* This consumer only deals with kafka messages and will eventually be replaced by the Cyclotron worker
*/
export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
protected name = 'CdpFunctionCallbackConsumer'
protected topic = KAFKA_CDP_FUNCTION_CALLBACKS
protected consumerGroupId = 'cdp-function-callback-consumer'
public async processBatch(invocations: HogFunctionInvocation[]): Promise<void> {
if (!invocations.length) {
@ -563,8 +621,9 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
},
})
await this.hogWatcher.observeResults(invocationResults)
await this.processInvocationResults(invocationResults)
const newInvocations = invocationResults.filter((r) => !r.finished).map((r) => r.invocation)
await this.queueInvocationsToKafka(newInvocations)
await this.produceQueuedMessages()
}
@ -640,52 +699,143 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase {
await this.processBatch(events)
}
public async start(): Promise<void> {
await super.start()
await this.startKafkaConsumer({
topic: KAFKA_CDP_FUNCTION_CALLBACKS,
groupId: 'cdp-function-callback-consumer',
handleBatch: (messages) => this._handleKafkaBatch(messages),
})
}
}
// // TODO: Split out non-Kafka specific parts of CdpConsumerBase so that it can be used by the
// // Cyclotron worker below. Or maybe we can just wait, and rip the Kafka bits out once Cyclotron is
// // shipped (and rename it something other than consumer, probably). For now, this is an easy way to
// // use existing code and get an end-to-end demo shipped.
// export class CdpCyclotronWorker extends CdpFunctionCallbackConsumer {
// protected name = 'CdpCyclotronWorker'
// protected topic = 'UNUSED-CdpCyclotronWorker'
// protected consumerGroupId = 'UNUSED-CdpCyclotronWorker'
// private runningWorker: Promise<void> | undefined
// private isUnhealthy = false
/**
* The future of the CDP consumer. This will be the main consumer that will handle all hog jobs from Cyclotron
*/
export class CdpCyclotronWorker extends CdpConsumerBase {
protected name = 'CdpCyclotronWorker'
private cyclotronWorker?: CyclotronWorker
private runningWorker: Promise<void> | undefined
protected queue: 'hog' | 'fetch' = 'hog'
// private async innerStart() {
// try {
// const limit = 100 // TODO: Make configurable.
// while (!this.isStopping) {
// const jobs = await cyclotron.dequeueJobsWithVmState('hog', limit)
// // TODO: Decode jobs into the right types
public async processBatch(invocations: HogFunctionInvocation[]): Promise<void> {
if (!invocations.length) {
return
}
// await this.processBatch(jobs)
// }
// } catch (err) {
// this.isUnhealthy = true
// console.error('Error in Cyclotron worker', err)
// throw err
// }
// }
const invocationResults = await runInstrumentedFunction({
statsKey: `cdpConsumer.handleEachBatch.executeInvocations`,
func: async () => {
// NOTE: In the future this service will never do fetching (unless we decide we want to do it in node at some point)
// This is just "for now" to support the transition to cyclotron
const fetchQueue = invocations.filter((item) => item.queue === 'fetch')
const fetchResults = await this.runManyWithHeartbeat(fetchQueue, (item) =>
this.fetchExecutor.execute(item)
)
// public async start() {
// await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
// await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL })
const hogQueue = invocations.filter((item) => item.queue === 'hog')
const hogResults = await this.runManyWithHeartbeat(hogQueue, (item) => this.hogExecutor.execute(item))
return [...hogResults, ...(fetchResults.filter(Boolean) as HogFunctionInvocationResult[])]
},
})
// // Consumer `start` expects an async task is started, and not that `start` itself blocks
// // indefinitely.
// this.runningWorker = this.innerStart()
await this.processInvocationResults(invocationResults)
await this.updateJobs(invocationResults)
await this.produceQueuedMessages()
}
// return Promise.resolve()
// }
private async updateJobs(invocations: HogFunctionInvocationResult[]) {
await Promise.all(
invocations.map(async (item) => {
const id = item.invocation.id
if (item.error) {
status.debug('⚡️', 'Updating job to failed', id)
this.cyclotronWorker?.updateJob(id, 'failed')
} else if (item.finished) {
status.debug('⚡️', 'Updating job to completed', id)
this.cyclotronWorker?.updateJob(id, 'completed')
} else {
status.debug('⚡️', 'Updating job to available', id)
this.cyclotronWorker?.updateJob(id, 'available', {
priority: item.invocation.priority,
vmState: serializeHogFunctionInvocation(item.invocation),
queueName: item.invocation.queue,
parameters: item.invocation.queueParameters ?? null,
blob: item.invocation.queueBlob ?? null,
})
}
await this.cyclotronWorker?.flushJob(id)
})
)
}
// public async stop() {
// await super.stop()
// await this.runningWorker
// }
private async handleJobBatch(jobs: CyclotronJob[]) {
const invocations: HogFunctionInvocation[] = []
// public isHealthy() {
// return this.isUnhealthy
// }
// }
for (const job of jobs) {
// NOTE: This is all a bit messy and might be better to refactor into a helper
if (!job.functionId) {
throw new Error('Bad job: ' + JSON.stringify(job))
}
const hogFunction = this.hogFunctionManager.getHogFunction(job.functionId)
if (!hogFunction) {
// Here we need to mark the job as failed
status.error('Error finding hog function', {
id: job.functionId,
})
this.cyclotronWorker?.updateJob(job.id, 'failed')
await this.cyclotronWorker?.flushJob(job.id)
continue
}
const parsedState = job.vmState as HogFunctionInvocationSerialized
invocations.push({
id: job.id,
globals: parsedState.globals,
teamId: hogFunction.team_id,
hogFunction,
priority: job.priority,
queue: (job.queueName as any) ?? 'hog',
queueParameters: job.parameters as HogFunctionInvocationQueueParameters | undefined,
queueBlob: job.blob ?? undefined,
vmState: parsedState.vmState,
timings: parsedState.timings,
})
}
await this.processBatch(invocations)
}
public async start() {
await super.start()
this.cyclotronWorker = new CyclotronWorker({
pool: { dbUrl: this.hub.CYCLOTRON_DATABASE_URL },
queueName: this.queue,
includeVmState: true,
batchMaxSize: this.hub.CDP_CYCLOTRON_BATCH_SIZE,
pollDelayMs: this.hub.CDP_CYCLOTRON_BATCH_DELAY_MS,
})
await this.cyclotronWorker.connect((jobs) => this.handleJobBatch(jobs))
}
public async stop() {
await super.stop()
await this.cyclotronWorker?.disconnect()
await this.runningWorker
}
public isHealthy() {
return this.cyclotronWorker?.isHealthy() ?? false
}
}
// Mostly used for testing
export class CdpCyclotronWorkerFetch extends CdpCyclotronWorker {
protected name = 'CdpCyclotronWorkerFetch'
protected queue = 'fetch' as const
}

View File

@ -12,7 +12,7 @@ import {
HogFunctionQueueParametersFetchRequest,
HogFunctionQueueParametersFetchResponse,
} from './types'
import { gzipObject, serializeInvocation } from './utils'
import { gzipObject, queueBlobToString, serializeHogFunctionInvocation } from './utils'
export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity]
@ -40,19 +40,22 @@ export class FetchExecutor {
async execute(invocation: HogFunctionInvocation): Promise<HogFunctionInvocationResult | undefined> {
if (invocation.queue !== 'fetch' || !invocation.queueParameters) {
throw new Error('Bad invocation')
status.error('🦔', `[HogExecutor] Bad invocation`, { invocation })
return
}
const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest
if (params.body) {
histogramFetchPayloadSize.observe(params.body.length / 1024)
const body = queueBlobToString(invocation.queueBlob)
if (body) {
histogramFetchPayloadSize.observe(body.length / 1024)
}
try {
if (this.hogHookEnabledForTeams(invocation.teamId)) {
// This is very temporary until we are commited to Cyclotron
const payload: HogFunctionInvocationAsyncRequest = {
state: await gzipObject(serializeInvocation(invocation)),
state: await gzipObject(serializeHogFunctionInvocation(invocation)),
teamId: invocation.teamId,
hogFunctionId: invocation.hogFunction.id,
asyncFunctionRequest: {
@ -61,6 +64,7 @@ export class FetchExecutor {
params.url,
{
...params,
body,
},
],
},
@ -88,11 +92,12 @@ export class FetchExecutor {
}
const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest
const body = queueBlobToString(invocation.queueBlob) || ''
let responseBody = ''
const resParams: HogFunctionQueueParametersFetchResponse = {
response: {
status: 0,
body: {},
},
error: null,
timings: [],
@ -102,17 +107,12 @@ export class FetchExecutor {
const start = performance.now()
const fetchResponse = await trackedFetch(params.url, {
method: params.method,
body: params.body,
body,
headers: params.headers,
timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS,
})
let responseBody = await fetchResponse.text()
try {
responseBody = JSON.parse(responseBody)
} catch (err) {
// Ignore
}
responseBody = await fetchResponse.text()
const duration = performance.now() - start
@ -123,7 +123,6 @@ export class FetchExecutor {
resParams.response = {
status: fetchResponse.status,
body: responseBody,
}
} catch (err) {
status.error('🦔', `[HogExecutor] Error during fetch`, { error: String(err) })
@ -135,6 +134,7 @@ export class FetchExecutor {
...invocation,
queue: 'hog',
queueParameters: resParams,
queueBlob: Buffer.from(responseBody),
},
finished: false,
logs: [],

View File

@ -14,7 +14,7 @@ import {
HogFunctionQueueParametersFetchResponse,
HogFunctionType,
} from './types'
import { convertToHogFunctionFilterGlobal } from './utils'
import { convertToHogFunctionFilterGlobal, queueBlobToString } from './utils'
const MAX_ASYNC_STEPS = 2
const MAX_HOG_LOGS = 10
@ -153,25 +153,33 @@ export class HogExecutor {
try {
// If the queueParameter is set then we have an expected format that we want to parse and add to the stack
if (invocation.queueParameters) {
// NOTE: This is all based around the only response type being fetch currently
const {
logs = [],
response = null,
error,
timings = [],
} = invocation.queueParameters as HogFunctionQueueParametersFetchResponse
let responseBody: any = undefined
if (response) {
// Convert from buffer to string
responseBody = queueBlobToString(invocation.queueBlob)
}
// Reset the queue parameters to be sure
invocation.queue = 'hog'
invocation.queueParameters = undefined
invocation.queueBlob = undefined
const status = typeof response?.status === 'number' ? response.status : 503
// Special handling for fetch
// TODO: Would be good to have a dedicated value in the fetch response for the status code
if (response?.status && response.status >= 400) {
if (status >= 400) {
// Generic warn log for bad status codes
logs.push({
level: 'warn',
timestamp: DateTime.now(),
message: `Fetch returned bad status: ${response.status}`,
message: `Fetch returned bad status: ${status}`,
})
}
@ -183,16 +191,22 @@ export class HogExecutor {
throw new Error(error)
}
if (typeof response?.body === 'string') {
if (typeof responseBody === 'string') {
try {
response.body = JSON.parse(response.body)
responseBody = JSON.parse(responseBody)
} catch (e) {
// pass - if it isn't json we just pass it on
}
}
// Finally we create the response object as the VM expects
const fetchResponse = {
status,
body: responseBody,
}
// Add the response to the stack to continue execution
invocation.vmState!.stack.push(response)
invocation.vmState!.stack.push(fetchResponse)
invocation.timings.push(...timings)
result.logs = [...logs, ...result.logs]
}
@ -327,18 +341,22 @@ export class HogExecutor {
const headers = fetchOptions?.headers || {
'Content-Type': 'application/json',
}
let body = fetchOptions?.body
// Modify the body to ensure it is a string (we allow Hog to send an object to keep things simple)
body = body ? (typeof body === 'string' ? body : JSON.stringify(body)) : body
const body: string | undefined = fetchOptions?.body
? typeof fetchOptions.body === 'string'
? fetchOptions.body
: JSON.stringify(fetchOptions.body)
: fetchOptions?.body
result.invocation.queue = 'fetch'
result.invocation.queueParameters = {
url,
method,
headers,
body,
return_queue: 'hog',
}
// The payload is always blob encoded
result.invocation.queueBlob = body ? Buffer.from(body) : undefined
break
default:
throw new Error(`Unknown async function '${execRes.asyncFunctionName}'`)
@ -366,6 +384,7 @@ export class HogExecutor {
}
} catch (err) {
result.error = err.message
result.finished = true // Explicitly set to true to prevent infinite loops
status.error(
'🦔',
`[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}`,

View File

@ -95,6 +95,7 @@ export class HogFunctionManager {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
return this.cache.functions[id]
}
@ -102,6 +103,7 @@ export class HogFunctionManager {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
}
const fn = this.cache.functions[hogFunctionId]
if (fn?.team_id === teamId) {
return fn

View File

@ -146,8 +146,9 @@ export interface HogFunctionTiming {
export type HogFunctionQueueParametersFetchRequest = {
url: string
method: string
body: string
headers: Record<string, string>
return_queue: string
max_tries?: number
headers?: Record<string, string>
}
export type HogFunctionQueueParametersFetchResponse = {
@ -156,7 +157,6 @@ export type HogFunctionQueueParametersFetchResponse = {
/** The data to be passed to the Hog function from the response */
response?: {
status: number
body: any
} | null
timings?: HogFunctionTiming[]
logs?: LogEntry[]
@ -171,8 +171,10 @@ export type HogFunctionInvocation = {
globals: HogFunctionInvocationGlobals
teamId: Team['id']
hogFunction: HogFunctionType
priority: number
queue: 'hog' | 'fetch'
queueParameters?: HogFunctionInvocationQueueParameters
queueBlob?: Uint8Array
// The current vmstate (set if the invocation is paused)
vmState?: VMState
timings: HogFunctionTiming[]

View File

@ -281,16 +281,25 @@ export function createInvocation(
teamId: hogFunction.team_id,
hogFunction,
queue: 'hog',
priority: 1,
timings: [],
}
}
export function serializeInvocation(invocation: HogFunctionInvocation): HogFunctionInvocationSerialized {
export function serializeHogFunctionInvocation(invocation: HogFunctionInvocation): HogFunctionInvocationSerialized {
const serializedInvocation: HogFunctionInvocationSerialized = {
...invocation,
hogFunctionId: invocation.hogFunction.id,
// We clear the params as they are never used in the serialized form
queueParameters: undefined,
queueBlob: undefined,
}
delete (serializedInvocation as any).hogFunction
return invocation
return serializedInvocation
}
export function queueBlobToString(blob?: HogFunctionInvocation['queueBlob']): string | undefined {
return blob ? Buffer.from(blob).toString('utf-8') : undefined
}

View File

@ -183,14 +183,20 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_WATCHER_REFILL_RATE: 10,
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: '',
CDP_CYCLOTRON_ENABLED_TEAMS: '',
CDP_REDIS_PASSWORD: '',
CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: true,
CDP_REDIS_HOST: '',
CDP_REDIS_PORT: 6479,
CDP_CYCLOTRON_BATCH_DELAY_MS: 50,
CDP_CYCLOTRON_BATCH_SIZE: 500,
// Cyclotron
CYCLOTRON_DATABASE_URL: '',
CYCLOTRON_DATABASE_URL: isTestEnv()
? 'postgres://posthog:posthog@localhost:5432/test_cyclotron'
: isDevEnv()
? 'postgres://posthog:posthog@localhost:5432/cyclotron'
: '',
}
}

View File

@ -10,7 +10,12 @@ import v8Profiler from 'v8-profiler-next'
import { getPluginServerCapabilities } from '../capabilities'
import { CdpApi } from '../cdp/cdp-api'
import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-consumers'
import {
CdpCyclotronWorker,
CdpCyclotronWorkerFetch,
CdpFunctionCallbackConsumer,
CdpProcessedEventsConsumer,
} from '../cdp/cdp-consumers'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginServerService, PluginsServerConfig } from '../types'
import { closeHub, createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
@ -458,16 +463,23 @@ export async function startPluginsServer(
}
}
// if (capabilities.cdpCyclotronWorker) {
// ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
// if (hub.CYCLOTRON_DATABASE_URL) {
// const worker = new CdpCyclotronWorker(hub)
// await worker.start()
// } else {
// // This is a temporary solution until we *require* Cyclotron to be configured.
// status.warn('💥', 'CYCLOTRON_DATABASE_URL is not set, not running Cyclotron worker')
// }
// }
if (capabilities.cdpCyclotronWorker) {
const hub = await setupHub()
if (!hub.CYCLOTRON_DATABASE_URL) {
status.error('💥', 'Cyclotron database URL not set.')
} else {
const worker = new CdpCyclotronWorker(hub)
await worker.start()
services.push(worker.service)
if (process.env.EXPERIMENTAL_CDP_FETCH_WORKER) {
const workerFetch = new CdpCyclotronWorkerFetch(hub)
await workerFetch.start()
services.push(workerFetch.service)
}
}
}
if (capabilities.http) {
const app = setupCommonRoutes(services)

View File

@ -113,7 +113,9 @@ export type CdpConfig = {
CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: string
CDP_CYCLOTRON_ENABLED_TEAMS: string
CDP_CYCLOTRON_BATCH_SIZE: number
CDP_CYCLOTRON_BATCH_DELAY_MS: number
CDP_REDIS_HOST: string
CDP_REDIS_PORT: number
CDP_REDIS_PASSWORD: string

View File

@ -15,7 +15,7 @@ export interface StatusBlueprint {
export class Status implements StatusBlueprint {
mode?: string
logger: pino.Logger
private logger?: pino.Logger
prompt: string
transport: any
@ -59,11 +59,23 @@ export class Status implements StatusBlueprint {
close() {
this.transport?.end()
this.logger = undefined
}
buildMethod(type: keyof StatusBlueprint): StatusMethod {
return (icon: string, message: string, extra: object) => {
const logMessage = `[${this.prompt}] ${icon} ${message}`
if (!this.logger) {
if (isProdEnv()) {
// This can throw on tests if the logger is closed. We don't really want tests to be bothered with this.
throw new Error(`Logger has been closed! Cannot log: ${logMessage}`)
}
console.log(
`Logger has been closed! Cannot log: ${logMessage}. Logging to console instead due to non-prod env.`
)
return
}
if (extra instanceof Object) {
this.logger[type]({ ...extra, msg: logMessage })
} else {

View File

@ -0,0 +1,225 @@
import {
CdpCyclotronWorker,
CdpCyclotronWorkerFetch,
CdpFunctionCallbackConsumer,
CdpProcessedEventsConsumer,
} from '../../src/cdp/cdp-consumers'
import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types'
import { KAFKA_APP_METRICS_2, KAFKA_LOG_ENTRIES } from '../../src/config/kafka-topics'
import { Hub, Team } from '../../src/types'
import { closeHub, createHub } from '../../src/utils/db/hub'
import { waitForExpect } from '../helpers/expectations'
import { getFirstTeam, resetTestDatabase } from '../helpers/sql'
import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples'
import { createHogExecutionGlobals, insertHogFunction as _insertHogFunction } from './fixtures'
import { createKafkaObserver, TestKafkaObserver } from './helpers/kafka-observer'
jest.mock('../../src/utils/fetch', () => {
return {
trackedFetch: jest.fn(() =>
Promise.resolve({
status: 200,
text: () => Promise.resolve(JSON.stringify({ success: true })),
json: () => Promise.resolve({ success: true }),
})
),
}
})
const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch
describe('CDP E2E', () => {
jest.setTimeout(10000)
describe.each(['kafka', 'cyclotron'])('e2e fetch call: %s', (mode) => {
let processedEventsConsumer: CdpProcessedEventsConsumer
let functionProcessor: CdpFunctionCallbackConsumer
let cyclotronWorker: CdpCyclotronWorker | undefined
let cyclotronFetchWorker: CdpCyclotronWorkerFetch | undefined
let hub: Hub
let team: Team
let kafkaObserver: TestKafkaObserver
let fnFetchNoFilters: HogFunctionType
let globals: HogFunctionInvocationGlobals
const insertHogFunction = async (hogFunction: Partial<HogFunctionType>) => {
const item = await _insertHogFunction(hub.postgres, team.id, hogFunction)
return item
}
beforeEach(async () => {
await resetTestDatabase()
hub = await createHub()
team = await getFirstTeam(hub)
fnFetchNoFilters = await insertHogFunction({
...HOG_EXAMPLES.simple_fetch,
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.no_filters,
})
if (mode === 'cyclotron') {
hub.CDP_CYCLOTRON_ENABLED_TEAMS = '*'
hub.CYCLOTRON_DATABASE_URL = 'postgres://posthog:posthog@localhost:5432/test_cyclotron'
}
kafkaObserver = await createKafkaObserver(hub, [KAFKA_APP_METRICS_2, KAFKA_LOG_ENTRIES])
processedEventsConsumer = new CdpProcessedEventsConsumer(hub)
await processedEventsConsumer.start()
functionProcessor = new CdpFunctionCallbackConsumer(hub)
await functionProcessor.start()
if (mode === 'cyclotron') {
cyclotronWorker = new CdpCyclotronWorker(hub)
await cyclotronWorker.start()
cyclotronFetchWorker = new CdpCyclotronWorkerFetch(hub)
await cyclotronFetchWorker.start()
}
globals = createHogExecutionGlobals({
project: {
id: team.id,
} as any,
event: {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
name: '$pageview',
properties: {
$current_url: 'https://posthog.com',
$lib_version: '1.0.0',
},
timestamp: '2024-09-03T09:00:00Z',
} as any,
})
mockFetch.mockClear()
})
afterEach(async () => {
console.log('AfterEach', {
processedEventsConsumer,
functionProcessor,
kafkaObserver,
cyclotronWorker,
cyclotronFetchWorker,
})
const stoppers = [
processedEventsConsumer?.stop().then(() => console.log('Stopped processedEventsConsumer')),
functionProcessor?.stop().then(() => console.log('Stopped functionProcessor')),
kafkaObserver?.stop().then(() => console.log('Stopped kafkaObserver')),
cyclotronWorker?.stop().then(() => console.log('Stopped cyclotronWorker')),
cyclotronFetchWorker?.stop().then(() => console.log('Stopped cyclotronFetchWorker')),
]
await Promise.all(stoppers)
await closeHub(hub)
})
afterAll(() => {
jest.useRealTimers()
})
/**
* Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios
*/
it('should invoke a function in the worker loop until completed', async () => {
// NOTE: We can skip kafka as the entry point
const invocations = await processedEventsConsumer.processBatch([globals])
expect(invocations).toHaveLength(1)
await waitForExpect(() => {
expect(kafkaObserver.messages).toHaveLength(6)
})
expect(mockFetch).toHaveBeenCalledTimes(1)
expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"https://example.com/posthog-webhook",
Object {
"body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"name\\":\\"$pageview\\",\\"distinct_id\\":\\"distinct_id\\",\\"url\\":\\"http://localhost:8000/events/1\\",\\"properties\\":{\\"$current_url\\":\\"https://posthog.com\\",\\"$lib_version\\":\\"1.0.0\\"},\\"timestamp\\":\\"2024-09-03T09:00:00Z\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/events/1\\"},\\"person\\":{\\"uuid\\":\\"uuid\\",\\"name\\":\\"test\\",\\"url\\":\\"http://localhost:8000/persons/1\\",\\"properties\\":{\\"email\\":\\"test@posthog.com\\"}},\\"event_url\\":\\"http://localhost:8000/events/1-test\\"}",
"headers": Object {
"version": "v=1.0.0",
},
"method": "POST",
"timeout": 10000,
},
]
`)
const logMessages = kafkaObserver.messages.filter((m) => m.topic === KAFKA_LOG_ENTRIES)
const metricsMessages = kafkaObserver.messages.filter((m) => m.topic === KAFKA_APP_METRICS_2)
expect(metricsMessages).toMatchObject([
{
topic: 'clickhouse_app_metrics2_test',
value: {
app_source: 'hog_function',
app_source_id: fnFetchNoFilters.id.toString(),
count: 1,
metric_kind: 'success',
metric_name: 'succeeded',
team_id: 2,
},
},
])
expect(logMessages).toMatchObject([
{
topic: 'log_entries_test',
value: {
level: 'debug',
log_source: 'hog_function',
log_source_id: fnFetchNoFilters.id.toString(),
message: 'Executing function',
team_id: 2,
},
},
{
topic: 'log_entries_test',
value: {
level: 'debug',
log_source: 'hog_function',
log_source_id: fnFetchNoFilters.id.toString(),
message: expect.stringContaining(
"Suspending function due to async function call 'fetch'. Payload:"
),
team_id: 2,
},
},
{
topic: 'log_entries_test',
value: {
level: 'debug',
log_source: 'hog_function',
log_source_id: fnFetchNoFilters.id.toString(),
message: 'Resuming function',
team_id: 2,
},
},
{
topic: 'log_entries_test',
value: {
level: 'info',
log_source: 'hog_function',
log_source_id: fnFetchNoFilters.id.toString(),
message: `Fetch response:, {"status":200,"body":{"success":true}}`,
team_id: 2,
},
},
{
topic: 'log_entries_test',
value: {
level: 'debug',
log_source: 'hog_function',
log_source_id: fnFetchNoFilters.id.toString(),
message: expect.stringContaining('Function completed in'),
team_id: 2,
},
},
])
})
})
})

View File

@ -80,10 +80,7 @@ const convertToKafkaMessage = (message: any): any => {
}
}
/**
* NOTE: This isn't fully e2e... We still mock kafka but we trigger one queue from the other in a loop
*/
describe('CDP Consumers E2E', () => {
describe('CDP Function Processor', () => {
let processedEventsConsumer: CdpProcessedEventsConsumer
let functionProcessor: CdpFunctionCallbackConsumer
let hub: Hub
@ -121,7 +118,7 @@ describe('CDP Consumers E2E', () => {
jest.useRealTimers()
})
describe('e2e fetch function', () => {
describe('full fetch function', () => {
/**
* Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios
*/

View File

@ -5,12 +5,7 @@ import { Hub, Team } from '../../src/types'
import { closeHub, createHub } from '../../src/utils/db/hub'
import { getFirstTeam, resetTestDatabase } from '../helpers/sql'
import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples'
import {
createHogExecutionGlobals,
createIncomingEvent,
createMessage,
insertHogFunction as _insertHogFunction,
} from './fixtures'
import { createHogExecutionGlobals, insertHogFunction as _insertHogFunction } from './fixtures'
const mockConsumer = {
on: jest.fn(),
@ -113,10 +108,6 @@ describe('CDP Processed Events Consumer', () => {
})
describe('general event processing', () => {
beforeEach(() => {
hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = false
})
describe('common processing', () => {
let fnFetchNoFilters: HogFunctionType
let fnPrinterPageviewFilters: HogFunctionType
@ -170,23 +161,89 @@ describe('CDP Processed Events Consumer', () => {
matchInvocation(fnPrinterPageviewFilters, globals),
])
expect(mockProducer.produce).toHaveBeenCalledTimes(2)
expect(mockProducer.produce).toHaveBeenCalledTimes(11)
expect(decodeAllKafkaMessages()).toMatchObject([
{
key: expect.any(String),
topic: 'cdp_function_callbacks_test',
topic: 'log_entries_test',
value: {
state: expect.any(String),
message: 'Executing function',
log_source_id: fnFetchNoFilters.id,
},
},
{
topic: 'log_entries_test',
value: {
message: "Suspending function due to async function call 'fetch'. Payload: 1902 bytes",
log_source_id: fnFetchNoFilters.id,
},
},
{
topic: 'clickhouse_app_metrics2_test',
value: {
app_source: 'hog_function',
team_id: 2,
app_source_id: fnPrinterPageviewFilters.id,
metric_kind: 'success',
metric_name: 'succeeded',
count: 1,
},
},
{
topic: 'log_entries_test',
value: {
message: 'Executing function',
log_source_id: fnPrinterPageviewFilters.id,
},
},
{
topic: 'log_entries_test',
value: {
message: 'test',
log_source_id: fnPrinterPageviewFilters.id,
},
},
{
topic: 'log_entries_test',
value: {
message: '{"nested":{"foo":"***REDACTED***","bool":false,"null":null}}',
log_source_id: fnPrinterPageviewFilters.id,
},
},
{
topic: 'log_entries_test',
value: {
message: '{"foo":"***REDACTED***","bool":false,"null":null}',
log_source_id: fnPrinterPageviewFilters.id,
},
},
{
topic: 'log_entries_test',
value: {
message: 'substring: ***REDACTED***',
log_source_id: fnPrinterPageviewFilters.id,
},
},
{
topic: 'log_entries_test',
value: {
message:
'{"input_1":"test","secret_input_2":{"foo":"***REDACTED***","bool":false,"null":null},"secret_input_3":"***REDACTED***"}',
log_source_id: fnPrinterPageviewFilters.id,
},
},
{
topic: 'log_entries_test',
value: {
message: expect.stringContaining('Function completed'),
log_source_id: fnPrinterPageviewFilters.id,
},
waitForAck: true,
},
{
key: expect.any(String),
topic: 'cdp_function_callbacks_test',
value: {
state: expect.any(String),
},
key: expect.stringContaining(fnFetchNoFilters.id.toString()),
waitForAck: true,
},
])
@ -199,7 +256,7 @@ describe('CDP Processed Events Consumer', () => {
expect(invocations).toHaveLength(1)
expect(invocations).toMatchObject([matchInvocation(fnFetchNoFilters, globals)])
expect(mockProducer.produce).toHaveBeenCalledTimes(2)
expect(mockProducer.produce).toHaveBeenCalledTimes(4)
expect(decodeAllKafkaMessages()).toMatchObject([
{
@ -215,6 +272,12 @@ describe('CDP Processed Events Consumer', () => {
timestamp: expect.any(String),
},
},
{
topic: 'log_entries_test',
},
{
topic: 'log_entries_test',
},
{
topic: 'cdp_function_callbacks_test',
},
@ -259,97 +322,5 @@ describe('CDP Processed Events Consumer', () => {
])
})
})
describe('kafka parsing', () => {
it('can parse incoming messages correctly', async () => {
await insertHogFunction({
...HOG_EXAMPLES.simple_fetch,
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.no_filters,
})
// Create a message that should be processed by this function
// Run the function and check that it was executed
await processor._handleKafkaBatch([
createMessage(
createIncomingEvent(team.id, {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
event: '$pageview',
properties: JSON.stringify({
$lib_version: '1.0.0',
}),
})
),
])
// Generall check that the message seemed to get processed
expect(decodeAllKafkaMessages()).toMatchObject([
{
key: expect.any(String),
topic: 'cdp_function_callbacks_test',
value: {
state: expect.any(String),
},
waitForAck: true,
},
])
})
})
describe('no delayed execution', () => {
beforeEach(() => {
hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = true
})
it('should invoke the initial function before enqueuing', async () => {
await insertHogFunction({
...HOG_EXAMPLES.simple_fetch,
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.no_filters,
})
// Create a message that should be processed by this function
// Run the function and check that it was executed
await processor._handleKafkaBatch([
createMessage(
createIncomingEvent(team.id, {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
event: '$pageview',
properties: JSON.stringify({
$lib_version: '1.0.0',
}),
})
),
])
// General check that the message seemed to get processed
expect(decodeAllKafkaMessages()).toMatchObject([
{
key: expect.any(String),
topic: 'log_entries_test',
value: {
message: 'Executing function',
},
waitForAck: true,
},
{
key: expect.any(String),
topic: 'log_entries_test',
value: {
message: expect.stringContaining(
"Suspending function due to async function call 'fetch'. Payload"
),
},
waitForAck: true,
},
{
key: expect.any(String),
topic: 'cdp_function_callbacks_test',
value: {
state: expect.any(String),
},
waitForAck: true,
},
])
})
})
})
})

View File

@ -0,0 +1,72 @@
import { KafkaConsumer, Message } from 'node-rdkafka'
import { createAdminClient, ensureTopicExists } from '../../../src/kafka/admin'
import { createRdConnectionConfigFromEnvVars } from '../../../src/kafka/config'
import { createKafkaConsumer } from '../../../src/kafka/consumer'
import { Hub } from '../../../src/types'
import { delay, UUIDT } from '../../../src/utils/utils'
export type TestKafkaObserver = {
messages: {
topic: string
value: any
}[]
consumer: KafkaConsumer
stop: () => Promise<void>
expectMessageCount: (count: number) => Promise<void>
}
export const createKafkaObserver = async (hub: Hub, topics: string[]): Promise<TestKafkaObserver> => {
const consumer = await createKafkaConsumer({
...createRdConnectionConfigFromEnvVars(hub),
'group.id': `test-group-${new UUIDT().toString()}`,
})
const adminClient = createAdminClient(createRdConnectionConfigFromEnvVars(hub))
await Promise.all(topics.map((topic) => ensureTopicExists(adminClient, topic, 1000)))
adminClient.disconnect()
await new Promise<void>((res, rej) => consumer.connect({}, (err) => (err ? rej(err) : res())))
consumer.subscribe(topics)
const messages: {
topic: string
value: any
}[] = []
const poll = async () => {
await delay(50)
if (!consumer.isConnected()) {
return
}
const newMessages = await new Promise<Message[]>((res, rej) =>
consumer.consume(10, (err, messages) => (err ? rej(err) : res(messages)))
)
messages.push(
...newMessages.map((message) => ({
topic: message.topic,
value: JSON.parse(message.value?.toString() ?? ''),
}))
)
poll()
}
poll()
return {
messages,
consumer,
stop: () => new Promise((res) => consumer.disconnect(res)),
expectMessageCount: async (count: number): Promise<void> => {
const timeout = 5000
const now = Date.now()
while (messages.length < count && Date.now() - now < timeout) {
await delay(100)
}
if (messages.length < count) {
throw new Error(`Expected ${count} messages, got ${messages.length}`)
}
},
}
}

View File

@ -2,7 +2,7 @@ import { DateTime } from 'luxon'
import { HogExecutor } from '../../src/cdp/hog-executor'
import { HogFunctionManager } from '../../src/cdp/hog-function-manager'
import { HogFunctionAsyncFunctionResponse, HogFunctionType } from '../../src/cdp/types'
import { HogFunctionInvocation, HogFunctionType } from '../../src/cdp/types'
import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples'
import {
createHogExecutionGlobals,
@ -11,8 +11,9 @@ import {
insertHogFunction as _insertHogFunction,
} from './fixtures'
const createAsyncFunctionResponse = (response?: Record<string, any>): HogFunctionAsyncFunctionResponse => {
return {
const setupFetchResponse = (invocation: HogFunctionInvocation, options?: { status?: number; body?: string }): void => {
invocation.queue = 'hog'
invocation.queueParameters = {
timings: [
{
kind: 'async_function',
@ -20,11 +21,10 @@ const createAsyncFunctionResponse = (response?: Record<string, any>): HogFunctio
},
],
response: {
status: 200,
body: 'success',
...response,
status: options?.status ?? 200,
},
}
invocation.queueBlob = Buffer.from(options?.body ?? 'success')
}
describe('Hog Executor', () => {
@ -69,6 +69,7 @@ describe('Hog Executor', () => {
hogFunction: invocation.hogFunction,
queue: 'fetch',
queueParameters: expect.any(Object),
queueBlob: expect.any(Buffer),
timings: [
{
kind: 'hog',
@ -133,7 +134,8 @@ describe('Hog Executor', () => {
},
})
expect(JSON.parse(result.invocation.queueParameters!.body)).toEqual({
const body = JSON.parse(Buffer.from(result.invocation.queueBlob!).toString())
expect(body).toEqual({
event: {
uuid: 'uuid',
name: 'test',
@ -163,8 +165,7 @@ describe('Hog Executor', () => {
expect(result.invocation.vmState).toBeDefined()
// Simulate what the callback does
result.invocation.queue = 'hog'
result.invocation.queueParameters = createAsyncFunctionResponse()
setupFetchResponse(result.invocation)
const secondResult = executor.execute(result.invocation)
logs.push(...secondResult.logs)
@ -185,10 +186,7 @@ describe('Hog Executor', () => {
it('parses the responses body if a string', () => {
const result = executor.execute(createInvocation(hogFunction))
const logs = result.logs.splice(0, 100)
result.invocation.queue = 'hog'
result.invocation.queueParameters = createAsyncFunctionResponse({
body: JSON.stringify({ foo: 'bar' }),
})
setupFetchResponse(result.invocation, { body: JSON.stringify({ foo: 'bar' }) })
const secondResult = executor.execute(result.invocation)
logs.push(...secondResult.logs)
@ -399,18 +397,16 @@ describe('Hog Executor', () => {
// Start the function
const result1 = executor.execute(invocation)
// Run the response one time simulating a successful fetch
result1.invocation.queue = 'hog'
result1.invocation.queueParameters = createAsyncFunctionResponse()
setupFetchResponse(result1.invocation)
const result2 = executor.execute(result1.invocation)
expect(result2.finished).toBe(false)
expect(result2.error).toBe(undefined)
expect(result2.invocation.queue).toBe('fetch')
// This time we should see an error for hitting the loop limit
result2.invocation.queue = 'hog'
result2.invocation.queueParameters = createAsyncFunctionResponse()
setupFetchResponse(result2.invocation)
const result3 = executor.execute(result1.invocation)
expect(result3.finished).toBe(false)
expect(result3.finished).toBe(true)
expect(result3.error).toEqual('Exceeded maximum number of async steps: 2')
expect(result3.logs.map((log) => log.message)).toEqual([
'Resuming function',

View File

@ -81,6 +81,7 @@ describe('HogFunctionManager', () => {
})
afterEach(async () => {
await manager.stop()
await closeHub(hub)
})

View File

@ -0,0 +1,17 @@
export const waitForExpect = async <T>(fn: () => T | Promise<T>, timeout = 10_000, interval = 1_000): Promise<T> => {
// Allows for running expectations that are expected to pass eventually.
// This is useful for, e.g. waiting for events to have been ingested into
// the database.
const start = Date.now()
while (true) {
try {
return await fn()
} catch (error) {
if (Date.now() - start > timeout) {
throw error
}
await new Promise((resolve) => setTimeout(resolve, interval))
}
}
}

10
rust/bin/migrate-cyclotron Executable file
View File

@ -0,0 +1,10 @@
#!/bin/sh
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
CYCLOTRON_DATABASE_NAME=${CYCLOTRON_DATABASE_NAME:-cyclotron}
CYCLOTRON_DATABASE_URL=${CYCLOTRON_DATABASE_URL:-postgres://posthog:posthog@localhost:5432/$CYCLOTRON_DATABASE_NAME}
echo "Performing cyclotron migrations for $CYCLOTRON_DATABASE_URL (DATABASE_NAME=$CYCLOTRON_DATABASE_NAME)"
sqlx database create -D "$CYCLOTRON_DATABASE_URL"
sqlx migrate run -D "$CYCLOTRON_DATABASE_URL" --source $SCRIPT_DIR/../cyclotron-core/migrations

View File

@ -0,0 +1,30 @@
import { CyclotronInternalPoolConfig, CyclotronPoolConfig } from './types'
export function convertToInternalPoolConfig(poolConfig: CyclotronPoolConfig): CyclotronInternalPoolConfig {
return {
db_url: poolConfig.dbUrl,
max_connections: poolConfig.maxConnections,
min_connections: poolConfig.minConnections,
acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds,
max_lifetime_seconds: poolConfig.maxLifetimeSeconds,
idle_timeout_seconds: poolConfig.idleTimeoutSeconds,
}
}
export function serializeObject(name: string, obj: Record<string, any> | null): string | null {
if (obj === null) {
return null
} else if (typeof obj === 'object' && obj !== null) {
return JSON.stringify(obj)
}
throw new Error(`${name} must be either an object or null`)
}
export function deserializeObject(name: string, str: any): Record<string, any> | null {
if (str === null) {
return null
} else if (typeof str === 'string') {
return JSON.parse(str)
}
throw new Error(`${name} must be either a string or null`)
}

View File

@ -1,222 +1,3 @@
// eslint-disable-next-line @typescript-eslint/no-var-requires
const cyclotron = require('../index.node')
export interface PoolConfig {
dbUrl: string
maxConnections?: number
minConnections?: number
acquireTimeoutSeconds?: number
maxLifetimeSeconds?: number
idleTimeoutSeconds?: number
}
// Type as expected by Cyclotron.
interface InternalPoolConfig {
db_url: string
max_connections?: number
min_connections?: number
acquire_timeout_seconds?: number
max_lifetime_seconds?: number
idle_timeout_seconds?: number
}
export interface ManagerConfig {
shards: PoolConfig[]
}
// Type as expected by Cyclotron.
interface InternalManagerConfig {
shards: InternalPoolConfig[]
}
export interface JobInit {
teamId: number
functionId: string
queueName: string
priority?: number
scheduled?: Date
vmState?: string
parameters?: string
blob?: Uint8Array
metadata?: string
}
// Type as expected by Cyclotron.
interface InternalJobInit {
team_id: number
function_id: string
queue_name: string
priority?: number
scheduled?: Date
vm_state?: string
parameters?: string
metadata?: string
}
export type JobState = 'available' | 'running' | 'completed' | 'failed' | 'paused'
export interface Job {
id: string
teamId: number
functionId: string | null
created: Date
lockId: string | null
lastHeartbeat: Date | null
janitorTouchCount: number
transitionCount: number
lastTransition: Date
queueName: string
state: JobState
priority: number
scheduled: Date
vmState: string | null
metadata: string | null
parameters: string | null
blob: Uint8Array | null
}
export async function initWorker(poolConfig: PoolConfig): Promise<void> {
const initWorkerInternal: InternalPoolConfig = {
db_url: poolConfig.dbUrl,
max_connections: poolConfig.maxConnections,
min_connections: poolConfig.minConnections,
acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds,
max_lifetime_seconds: poolConfig.maxLifetimeSeconds,
idle_timeout_seconds: poolConfig.idleTimeoutSeconds,
}
return await cyclotron.initWorker(JSON.stringify(initWorkerInternal))
}
export async function initManager(managerConfig: ManagerConfig): Promise<void> {
const managerConfigInternal: InternalManagerConfig = {
shards: managerConfig.shards.map((shard) => ({
db_url: shard.dbUrl,
max_connections: shard.maxConnections,
min_connections: shard.minConnections,
acquire_timeout_seconds: shard.acquireTimeoutSeconds,
max_lifetime_seconds: shard.maxLifetimeSeconds,
idle_timeout_seconds: shard.idleTimeoutSeconds,
})),
}
return await cyclotron.initManager(JSON.stringify(managerConfigInternal))
}
export async function maybeInitWorker(poolConfig: PoolConfig): Promise<void> {
const initWorkerInternal: InternalPoolConfig = {
db_url: poolConfig.dbUrl,
max_connections: poolConfig.maxConnections,
min_connections: poolConfig.minConnections,
acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds,
max_lifetime_seconds: poolConfig.maxLifetimeSeconds,
idle_timeout_seconds: poolConfig.idleTimeoutSeconds,
}
return await cyclotron.maybeInitWorker(JSON.stringify(initWorkerInternal))
}
export async function maybeInitManager(managerConfig: ManagerConfig): Promise<void> {
const managerConfigInternal: InternalManagerConfig = {
shards: managerConfig.shards.map((shard) => ({
db_url: shard.dbUrl,
max_connections: shard.maxConnections,
min_connections: shard.minConnections,
acquire_timeout_seconds: shard.acquireTimeoutSeconds,
max_lifetime_seconds: shard.maxLifetimeSeconds,
idle_timeout_seconds: shard.idleTimeoutSeconds,
})),
}
return await cyclotron.maybeInitManager(JSON.stringify(managerConfigInternal))
}
export async function createJob(job: JobInit): Promise<void> {
job.priority ??= 1
job.scheduled ??= new Date()
const jobInitInternal: InternalJobInit = {
team_id: job.teamId,
function_id: job.functionId,
queue_name: job.queueName,
priority: job.priority,
scheduled: job.scheduled,
vm_state: job.vmState,
parameters: job.parameters,
metadata: job.metadata,
}
const json = JSON.stringify(jobInitInternal)
return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined)
}
export async function dequeueJobs(queueName: string, limit: number): Promise<Job[]> {
return await cyclotron.dequeueJobs(queueName, limit)
}
export async function dequeueJobsWithVmState(queueName: string, limit: number): Promise<Job[]> {
return await cyclotron.dequeueJobsWithVmState(queueName, limit)
}
export async function flushJob(jobId: string): Promise<void> {
return await cyclotron.flushJob(jobId)
}
export function setState(jobId: string, jobState: JobState): Promise<void> {
return cyclotron.setState(jobId, jobState)
}
export function setQueue(jobId: string, queueName: string): Promise<void> {
return cyclotron.setQueue(jobId, queueName)
}
export function setPriority(jobId: string, priority: number): Promise<void> {
return cyclotron.setPriority(jobId, priority)
}
export function setScheduledAt(jobId: string, scheduledAt: Date): Promise<void> {
return cyclotron.setScheduledAt(jobId, scheduledAt.toISOString())
}
export function serializeObject(name: string, obj: Record<string, any> | null): string | null {
if (obj === null) {
return null
} else if (typeof obj === 'object' && obj !== null) {
return JSON.stringify(obj)
}
throw new Error(`${name} must be either an object or null`)
}
export function setVmState(jobId: string, vmState: Record<string, any> | null): Promise<void> {
const serialized = serializeObject('vmState', vmState)
return cyclotron.setVmState(jobId, serialized)
}
export function setMetadata(jobId: string, metadata: Record<string, any> | null): Promise<void> {
const serialized = serializeObject('metadata', metadata)
return cyclotron.setMetadata(jobId, serialized)
}
export function setParameters(jobId: string, parameters: Record<string, any> | null): Promise<void> {
const serialized = serializeObject('parameters', parameters)
return cyclotron.setParameters(jobId, serialized)
}
export function setBlob(jobId: string, blob: Uint8Array | null): Promise<void> {
return cyclotron.setBlob(jobId, blob)
}
export default {
initWorker,
initManager,
maybeInitWorker,
maybeInitManager,
createJob,
dequeueJobs,
dequeueJobsWithVmState,
flushJob,
setState,
setQueue,
setPriority,
setScheduledAt,
setVmState,
setMetadata,
setParameters,
setBlob,
}
export * from './manager'
export * from './types'
export * from './worker'

View File

@ -0,0 +1,39 @@
// eslint-disable-next-line @typescript-eslint/no-var-requires
const cyclotron = require('../index.node')
import { convertToInternalPoolConfig, serializeObject } from './helpers'
import { CyclotronJobInit, CyclotronPoolConfig } from './types'
export class CyclotronManager {
constructor(private config: { shards: CyclotronPoolConfig[] }) {
this.config = config
}
async connect(): Promise<void> {
return await cyclotron.maybeInitManager(
JSON.stringify({
shards: this.config.shards.map((shard) => convertToInternalPoolConfig(shard)),
})
)
}
async createJob(job: CyclotronJobInit): Promise<void> {
job.priority ??= 1
job.scheduled ??= new Date()
// TODO: Why is this type of job snake case whereas the dequeue return type is camel case?
const jobInitInternal = {
team_id: job.teamId,
function_id: job.functionId,
queue_name: job.queueName,
priority: job.priority,
scheduled: job.scheduled,
vm_state: job.vmState ? serializeObject('vmState', job.vmState) : null,
parameters: job.parameters ? serializeObject('parameters', job.parameters) : null,
metadata: job.metadata ? serializeObject('metadata', job.metadata) : null,
}
const json = JSON.stringify(jobInitInternal)
return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined)
}
}

View File

@ -0,0 +1,48 @@
export type CyclotronPoolConfig = {
dbUrl: string
maxConnections?: number
minConnections?: number
acquireTimeoutSeconds?: number
maxLifetimeSeconds?: number
idleTimeoutSeconds?: number
}
// Type as expected by Cyclotron.
export type CyclotronInternalPoolConfig = {
db_url: string
max_connections?: number
min_connections?: number
acquire_timeout_seconds?: number
max_lifetime_seconds?: number
idle_timeout_seconds?: number
}
export type CyclotronJobState = 'available' | 'running' | 'completed' | 'failed' | 'paused'
export type CyclotronJob = {
id: string
teamId: number
functionId: string | null
created: Date
lockId: string | null
lastHeartbeat: Date | null
janitorTouchCount: number
transitionCount: number
lastTransition: Date
queueName: string
state: CyclotronJobState
priority: number
scheduled: Date
vmState: object | null
metadata: object | null
parameters: object | null
blob: Uint8Array | null
}
export type CyclotronJobInit = Pick<CyclotronJob, 'teamId' | 'functionId' | 'queueName' | 'priority'> &
Pick<Partial<CyclotronJob>, 'scheduled' | 'vmState' | 'parameters' | 'metadata' | 'blob'>
export type CyclotronJobUpdate = Pick<
Partial<CyclotronJob>,
'queueName' | 'priority' | 'vmState' | 'parameters' | 'metadata' | 'blob'
>

View File

@ -0,0 +1,120 @@
// eslint-disable-next-line @typescript-eslint/no-var-requires
const cyclotron = require('../index.node')
import { convertToInternalPoolConfig, deserializeObject, serializeObject } from './helpers'
import { CyclotronJob, CyclotronJobState, CyclotronJobUpdate, CyclotronPoolConfig } from './types'
const parseJob = (job: CyclotronJob): CyclotronJob => {
return {
...job,
vmState: deserializeObject('vmState', job.vmState),
metadata: deserializeObject('metadata', job.metadata),
parameters: deserializeObject('parameters', job.parameters),
}
}
export type CyclotronWorkerConfig = {
pool: CyclotronPoolConfig
/** The queue to be consumed from */
queueName: string
/** Max number of jobs to consume in a batch. Default: 100 */
batchMaxSize?: number
/** Whether the vmState will be included or not */
includeVmState?: boolean
/** Amount of delay between dequeue polls. Default: 50ms */
pollDelayMs?: number
/** Heartbeat timeout. After this time without response from the worker loop the worker will be considered unhealthy. Default 30000 */
heartbeatTimeoutMs?: number
}
export class CyclotronWorker {
isConsuming: boolean = false
lastHeartbeat: Date = new Date()
private consumerLoopPromise: Promise<void> | null = null
constructor(private config: CyclotronWorkerConfig) {
this.config = config
}
public isHealthy(): boolean {
return (
this.isConsuming &&
new Date().getTime() - this.lastHeartbeat.getTime() < (this.config.heartbeatTimeoutMs ?? 30000)
)
}
async connect(processBatch: (jobs: CyclotronJob[]) => Promise<void>): Promise<void> {
if (this.isConsuming) {
throw new Error('Already consuming')
}
await cyclotron.maybeInitWorker(JSON.stringify(convertToInternalPoolConfig(this.config.pool)))
this.isConsuming = true
this.consumerLoopPromise = this.startConsumerLoop(processBatch).finally(() => {
this.isConsuming = false
this.consumerLoopPromise = null
})
}
private async startConsumerLoop(processBatch: (jobs: CyclotronJob[]) => Promise<void>): Promise<void> {
try {
this.isConsuming = true
const batchMaxSize = this.config.batchMaxSize ?? 100
const pollDelayMs = this.config.pollDelayMs ?? 50
while (this.isConsuming) {
this.lastHeartbeat = new Date()
const jobs = (
this.config.includeVmState
? await cyclotron.dequeueJobsWithVmState(this.config.queueName, batchMaxSize)
: await cyclotron.dequeueJobs(this.config.queueName, batchMaxSize)
).map(parseJob)
if (!jobs.length) {
// Wait a bit before polling again
await new Promise((resolve) => setTimeout(resolve, pollDelayMs))
continue
}
await processBatch(jobs)
}
} catch (e) {
// We only log here so as not to crash the parent process
console.error('[Cyclotron] Error in worker loop', e)
}
}
async disconnect(): Promise<void> {
this.isConsuming = false
await (this.consumerLoopPromise ?? Promise.resolve())
}
async flushJob(jobId: string): Promise<void> {
return await cyclotron.flushJob(jobId)
}
updateJob(id: CyclotronJob['id'], state: CyclotronJobState, updates?: CyclotronJobUpdate): void {
cyclotron.setState(id, state)
if (updates?.queueName) {
cyclotron.setQueue(id, updates.queueName)
}
if (updates?.priority) {
cyclotron.setPriority(id, updates.priority)
}
if (updates?.parameters) {
cyclotron.setParameters(id, serializeObject('parameters', updates.parameters))
}
if (updates?.metadata) {
cyclotron.setMetadata(id, serializeObject('metadata', updates.metadata))
}
if (updates?.vmState) {
cyclotron.setVmState(id, serializeObject('vmState', updates.vmState))
}
if (updates?.blob) {
cyclotron.setBlob(id, updates.blob)
}
}
}