mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-24 09:14:46 +01:00
feat(hog): save function telemetry (#25093)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
parent
f42697bdea
commit
44e67095a0
@ -18,7 +18,7 @@ import { userLogic } from 'scenes/userLogic'
|
||||
import { groupsModel } from '~/models/groupsModel'
|
||||
import { performQuery } from '~/queries/query'
|
||||
import { EventsNode, EventsQuery, NodeKind, TrendsQuery } from '~/queries/schema'
|
||||
import { hogql } from '~/queries/utils'
|
||||
import { escapePropertyAsHogQlIdentifier, hogql } from '~/queries/utils'
|
||||
import {
|
||||
AnyPropertyFilter,
|
||||
AvailableFeature,
|
||||
@ -642,7 +642,7 @@ export const hogFunctionConfigurationLogic = kea<hogFunctionConfigurationLogicTy
|
||||
orderBy: ['timestamp DESC'],
|
||||
}
|
||||
groupTypes.forEach((groupType) => {
|
||||
const name = groupType.group_type
|
||||
const name = escapePropertyAsHogQlIdentifier(groupType.group_type)
|
||||
query.select.push(
|
||||
`tuple(${name}.created_at, ${name}.index, ${name}.key, ${name}.properties, ${name}.updated_at)`
|
||||
)
|
||||
|
@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@posthog/hogvm",
|
||||
"version": "1.0.50",
|
||||
"version": "1.0.52",
|
||||
"description": "PostHog Hog Virtual Machine",
|
||||
"types": "dist/index.d.ts",
|
||||
"source": "src/index.ts",
|
||||
|
@ -636,6 +636,7 @@ describe('hogvm execute', () => {
|
||||
ops: 3,
|
||||
stack: [],
|
||||
upvalues: [],
|
||||
telemetry: undefined,
|
||||
throwStack: [],
|
||||
syncDuration: expect.any(Number),
|
||||
},
|
||||
@ -677,10 +678,15 @@ describe('hogvm execute', () => {
|
||||
).toEqual(map({ key: map({ otherKey: 'value' }) }))
|
||||
|
||||
// // return {key: 'value'};
|
||||
expect(
|
||||
() => exec(['_h', op.STRING, 'key', op.GET_GLOBAL, 1, op.STRING, 'value', op.DICT, 1, op.RETURN]).result
|
||||
expect(() =>
|
||||
execSync(['_h', op.STRING, 'key', op.GET_GLOBAL, 1, op.STRING, 'value', op.DICT, 1, op.RETURN])
|
||||
).toThrow('Global variable not found: key')
|
||||
|
||||
// // return {key: 'value'};
|
||||
expect(
|
||||
exec(['_h', op.STRING, 'key', op.GET_GLOBAL, 1, op.STRING, 'value', op.DICT, 1, op.RETURN]).error.message
|
||||
).toEqual('Global variable not found: key')
|
||||
|
||||
// var key := 3; return {key: 'value'};
|
||||
expect(
|
||||
exec(['_h', op.INTEGER, 3, op.GET_LOCAL, 0, op.STRING, 'value', op.DICT, 1, op.RETURN, op.POP]).result
|
||||
@ -2105,7 +2111,8 @@ describe('hogvm execute', () => {
|
||||
finished: true,
|
||||
state: {
|
||||
bytecode: [],
|
||||
stack: [],
|
||||
stack: expect.any(Array),
|
||||
telemetry: undefined,
|
||||
upvalues: [],
|
||||
callStack: [],
|
||||
throwStack: [],
|
||||
@ -2443,7 +2450,7 @@ describe('hogvm execute', () => {
|
||||
finished: true,
|
||||
state: {
|
||||
bytecode: [],
|
||||
stack: [],
|
||||
stack: expect.any(Array),
|
||||
upvalues: [],
|
||||
callStack: [],
|
||||
throwStack: [],
|
||||
@ -2455,4 +2462,59 @@ describe('hogvm execute', () => {
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test('logs telemetry', () => {
|
||||
const bytecode = ['_h', op.INTEGER, 1, op.INTEGER, 2, op.PLUS, op.RETURN]
|
||||
const result = exec(bytecode, { telemetry: true })
|
||||
expect(result).toEqual({
|
||||
result: 3,
|
||||
finished: true,
|
||||
state: {
|
||||
bytecode: [],
|
||||
stack: [],
|
||||
upvalues: [],
|
||||
callStack: [],
|
||||
throwStack: [],
|
||||
declaredFunctions: {},
|
||||
ops: 4,
|
||||
asyncSteps: 0,
|
||||
syncDuration: expect.any(Number),
|
||||
maxMemUsed: 16,
|
||||
telemetry: [
|
||||
[expect.any(Number), 'root', 0, 'START', ''],
|
||||
[expect.any(Number), '', 1, '33/INTEGER', '1'],
|
||||
[expect.any(Number), '', 3, '33/INTEGER', '2'],
|
||||
[expect.any(Number), '', 5, '6/PLUS', ''],
|
||||
[expect.any(Number), '', 6, '38/RETURN', ''],
|
||||
],
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
test('logs telemetry for calls', () => {
|
||||
const bytecode = ['_h', op.FALSE, op.TRUE, op.CALL_GLOBAL, 'concat', 2]
|
||||
const result = exec(bytecode, { telemetry: true })
|
||||
expect(result).toEqual({
|
||||
result: 'truefalse',
|
||||
finished: true,
|
||||
state: {
|
||||
bytecode: [],
|
||||
stack: [],
|
||||
upvalues: [],
|
||||
callStack: [],
|
||||
throwStack: [],
|
||||
declaredFunctions: {},
|
||||
ops: 3,
|
||||
asyncSteps: 0,
|
||||
syncDuration: expect.any(Number),
|
||||
maxMemUsed: 17,
|
||||
telemetry: [
|
||||
[expect.any(Number), 'root', 0, 'START', ''],
|
||||
[expect.any(Number), '', 1, '30/FALSE', ''],
|
||||
[expect.any(Number), '', 2, '29/TRUE', ''],
|
||||
[expect.any(Number), '', 3, '2/CALL_GLOBAL', 'concat'],
|
||||
],
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { calculateCost } from '../utils'
|
||||
import { calculateCost, convertJSToHog, convertHogToJS } from '../utils'
|
||||
|
||||
const PTR_COST = 8
|
||||
|
||||
@ -29,4 +29,26 @@ describe('hogvm utils', () => {
|
||||
obj['key'] = obj
|
||||
expect(calculateCost(obj)).toBe(PTR_COST * 3 + 3)
|
||||
})
|
||||
|
||||
test('convertJSToHog preserves circular references', () => {
|
||||
const obj: any = { a: null, b: true }
|
||||
obj.a = obj
|
||||
const hog = convertJSToHog(obj)
|
||||
expect(hog.get('a') === hog).toBe(true)
|
||||
})
|
||||
|
||||
test('convertHogToJs preserves circular references', () => {
|
||||
const obj: any = { a: null, b: true }
|
||||
obj.a = obj
|
||||
const js = convertHogToJS(obj)
|
||||
expect(js.a === js).toBe(true)
|
||||
|
||||
const map: any = new Map([
|
||||
['a', null],
|
||||
['b', true],
|
||||
])
|
||||
map.set('a', map)
|
||||
const js2 = convertHogToJS(map)
|
||||
expect(js2.a === js2).toBe(true)
|
||||
})
|
||||
})
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -58,3 +58,64 @@ export const enum Operation {
|
||||
SET_UPVALUE = 56,
|
||||
CLOSE_UPVALUE = 57,
|
||||
}
|
||||
|
||||
export const operations = [
|
||||
'',
|
||||
'GET_GLOBAL',
|
||||
'CALL_GLOBAL',
|
||||
'AND',
|
||||
'OR',
|
||||
'NOT',
|
||||
'PLUS',
|
||||
'MINUS',
|
||||
'MULTIPLY',
|
||||
'DIVIDE',
|
||||
'MOD',
|
||||
'EQ',
|
||||
'NOT_EQ',
|
||||
'GT',
|
||||
'GT_EQ',
|
||||
'LT',
|
||||
'LT_EQ',
|
||||
'LIKE',
|
||||
'ILIKE',
|
||||
'NOT_LIKE',
|
||||
'NOT_ILIKE',
|
||||
'IN',
|
||||
'NOT_IN',
|
||||
'REGEX',
|
||||
'NOT_REGEX',
|
||||
'IREGEX',
|
||||
'NOT_IREGEX',
|
||||
'IN_COHORT',
|
||||
'NOT_IN_COHORT',
|
||||
'TRUE',
|
||||
'FALSE',
|
||||
'NULL',
|
||||
'STRING',
|
||||
'INTEGER',
|
||||
'FLOAT',
|
||||
'POP',
|
||||
'GET_LOCAL',
|
||||
'SET_LOCAL',
|
||||
'RETURN',
|
||||
'JUMP',
|
||||
'JUMP_IF_FALSE',
|
||||
'DECLARE_FN',
|
||||
'DICT',
|
||||
'ARRAY',
|
||||
'TUPLE',
|
||||
'GET_PROPERTY',
|
||||
'SET_PROPERTY',
|
||||
'JUMP_IF_STACK_NOT_NULL',
|
||||
'GET_PROPERTY_NULLISH',
|
||||
'THROW',
|
||||
'TRY',
|
||||
'POP_TRY',
|
||||
'CALLABLE',
|
||||
'CLOSURE',
|
||||
'CALL_LOCAL',
|
||||
'GET_UPVALUE',
|
||||
'SET_UPVALUE',
|
||||
'CLOSE_UPVALUE',
|
||||
]
|
||||
|
@ -21,6 +21,8 @@ export interface VMState {
|
||||
syncDuration: number
|
||||
/** Max memory used */
|
||||
maxMemUsed: number
|
||||
/** Telemetry data */
|
||||
telemetry?: Telemetry[]
|
||||
}
|
||||
|
||||
export interface ExecOptions {
|
||||
@ -43,14 +45,31 @@ export interface ExecOptions {
|
||||
/** NodeJS crypto */
|
||||
crypto?: typeof crypto
|
||||
}
|
||||
/** Collecte telemetry data */
|
||||
telemetry?: boolean
|
||||
}
|
||||
|
||||
export type Telemetry = [
|
||||
/** Time from epoch in milliseconds */
|
||||
number,
|
||||
/** Current chunk */
|
||||
string,
|
||||
/** Current position in chunk */
|
||||
number,
|
||||
/** Opcode */
|
||||
string,
|
||||
/** Debug */
|
||||
string
|
||||
]
|
||||
|
||||
export interface ExecResult {
|
||||
result: any
|
||||
finished: boolean
|
||||
error?: any
|
||||
asyncFunctionName?: string
|
||||
asyncFunctionArgs?: any[]
|
||||
state?: VMState
|
||||
telemetry?: Telemetry[]
|
||||
}
|
||||
|
||||
export interface CallFrame {
|
||||
|
@ -89,9 +89,19 @@ export function setNestedValue(obj: any, chain: any[], value: any): void {
|
||||
}
|
||||
|
||||
// Recursively convert objects to maps
|
||||
export function convertJSToHog(x: any): any {
|
||||
export function convertJSToHog(x: any, found?: Map<any, any>): any {
|
||||
if (!found) {
|
||||
found = new Map()
|
||||
}
|
||||
if (found.has(x)) {
|
||||
return found.get(x)
|
||||
}
|
||||
if (Array.isArray(x)) {
|
||||
return x.map(convertJSToHog)
|
||||
const obj: any[] = []
|
||||
found.set(x, obj)
|
||||
x.forEach((v) => obj.push(convertJSToHog(v, found)))
|
||||
found.delete(x)
|
||||
return obj
|
||||
} else if (typeof x === 'object' && x !== null) {
|
||||
if (x.__hogDateTime__) {
|
||||
return toHogDateTime(x.dt, x.zone)
|
||||
@ -101,31 +111,47 @@ export function convertJSToHog(x: any): any {
|
||||
return x
|
||||
}
|
||||
const map = new Map()
|
||||
found.set(x, map)
|
||||
for (const key in x) {
|
||||
map.set(key, convertJSToHog(x[key]))
|
||||
map.set(key, convertJSToHog(x[key], found))
|
||||
}
|
||||
found.delete(x)
|
||||
return map
|
||||
}
|
||||
return x
|
||||
}
|
||||
|
||||
export function convertHogToJS(x: any): any {
|
||||
export function convertHogToJS(x: any, found?: Map<any, any>): any {
|
||||
if (!found) {
|
||||
found = new Map()
|
||||
}
|
||||
if (found.has(x)) {
|
||||
return found.get(x)
|
||||
}
|
||||
if (x instanceof Map) {
|
||||
const obj: Record<string, any> = {}
|
||||
found.set(x, obj)
|
||||
x.forEach((value, key) => {
|
||||
obj[key] = convertHogToJS(value)
|
||||
obj[key] = convertHogToJS(value, found)
|
||||
})
|
||||
found.delete(x)
|
||||
return obj
|
||||
} else if (typeof x === 'object' && Array.isArray(x)) {
|
||||
return x.map(convertHogToJS)
|
||||
const obj: any[] = []
|
||||
found.set(x, obj)
|
||||
x.forEach((v) => obj.push(convertHogToJS(v, found)))
|
||||
found.delete(x)
|
||||
return obj
|
||||
} else if (typeof x === 'object' && x !== null) {
|
||||
if (x.__hogDateTime__ || x.__hogDate__ || x.__hogClosure__ || x.__hogCallable__) {
|
||||
return x
|
||||
}
|
||||
const obj: Record<string, any> = {}
|
||||
found.set(x, obj)
|
||||
for (const key in x) {
|
||||
obj[key] = convertHogToJS(x[key])
|
||||
obj[key] = convertHogToJS(x[key], found)
|
||||
}
|
||||
found.delete(x)
|
||||
return obj
|
||||
}
|
||||
return x
|
||||
|
@ -76,7 +76,7 @@
|
||||
"@medv/finder": "^3.1.0",
|
||||
"@microlink/react-json-view": "^1.21.3",
|
||||
"@monaco-editor/react": "4.6.0",
|
||||
"@posthog/hogvm": "^1.0.50",
|
||||
"@posthog/hogvm": "^1.0.52",
|
||||
"@posthog/icons": "0.8.1",
|
||||
"@posthog/plugin-scaffold": "^1.4.4",
|
||||
"@react-hook/size": "^2.1.2",
|
||||
|
@ -54,7 +54,7 @@
|
||||
"@maxmind/geoip2-node": "^3.4.0",
|
||||
"@posthog/clickhouse": "^1.7.0",
|
||||
"@posthog/cyclotron": "file:../rust/cyclotron-node",
|
||||
"@posthog/hogvm": "^1.0.50",
|
||||
"@posthog/hogvm": "^1.0.52",
|
||||
"@posthog/plugin-scaffold": "1.4.4",
|
||||
"@sentry/node": "^7.49.0",
|
||||
"@sentry/profiling-node": "^0.3.0",
|
||||
|
@ -47,8 +47,8 @@ dependencies:
|
||||
specifier: file:../rust/cyclotron-node
|
||||
version: file:../rust/cyclotron-node
|
||||
'@posthog/hogvm':
|
||||
specifier: ^1.0.50
|
||||
version: 1.0.50(luxon@3.4.4)
|
||||
specifier: ^1.0.52
|
||||
version: 1.0.52(luxon@3.4.4)
|
||||
'@posthog/plugin-scaffold':
|
||||
specifier: 1.4.4
|
||||
version: 1.4.4
|
||||
@ -3119,8 +3119,8 @@ packages:
|
||||
engines: {node: '>=12'}
|
||||
dev: false
|
||||
|
||||
/@posthog/hogvm@1.0.50(luxon@3.4.4):
|
||||
resolution: {integrity: sha512-dkq/46mkVO6xpvUzzxxJ5IrSvDwTTU/pUPJk/0DHnmLBuzAwlpwzMhNLA2XdxJhCC0giXjN7NIg9PERg615Wlw==}
|
||||
/@posthog/hogvm@1.0.52(luxon@3.4.4):
|
||||
resolution: {integrity: sha512-Mn/tLjgZbeKQcp1OTYkCiGD9mNybGeg07baCPJj4EHXFz1EdfSlDzjfYPOcRlpTnAHhF037eYrIocsYbMRdhSg==}
|
||||
peerDependencies:
|
||||
luxon: ^3.4.4
|
||||
dependencies:
|
||||
|
@ -121,7 +121,7 @@ abstract class CdpConsumerBase {
|
||||
void this.captureInternalPostHogEvent(id, 'hog function state changed', { state })
|
||||
})
|
||||
this.hogMasker = new HogMasker(this.redis)
|
||||
this.hogExecutor = new HogExecutor(this.hogFunctionManager)
|
||||
this.hogExecutor = new HogExecutor(this.hub, this.hogFunctionManager)
|
||||
const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.hub)
|
||||
this.fetchExecutor = new FetchExecutor(this.hub, rustyHook)
|
||||
this.groupsManager = new GroupsManager(this.hub)
|
||||
|
@ -4,6 +4,8 @@ import { DateTime } from 'luxon'
|
||||
import { Histogram } from 'prom-client'
|
||||
import RE2 from 're2'
|
||||
|
||||
import { buildIntegerMatcher } from '../config/config'
|
||||
import { Hub, ValueMatcher } from '../types'
|
||||
import { status } from '../utils/status'
|
||||
import { HogFunctionManager } from './hog-function-manager'
|
||||
import {
|
||||
@ -65,6 +67,9 @@ export const formatInput = (bytecode: any, globals: HogFunctionInvocation['globa
|
||||
// NOT ALLOWED
|
||||
throw new Error('Input fields must be simple sync values')
|
||||
}
|
||||
if (res.error) {
|
||||
throw res.error
|
||||
}
|
||||
return convertHogToJS(res.result)
|
||||
}
|
||||
|
||||
@ -93,7 +98,11 @@ const sanitizeLogMessage = (args: any[], sensitiveValues?: string[]): string =>
|
||||
}
|
||||
|
||||
export class HogExecutor {
|
||||
constructor(private hogFunctionManager: HogFunctionManager) {}
|
||||
private telemetryMatcher: ValueMatcher<number>
|
||||
|
||||
constructor(private hub: Hub, private hogFunctionManager: HogFunctionManager) {
|
||||
this.telemetryMatcher = buildIntegerMatcher(this.hub.CDP_HOG_FILTERS_TELEMETRY_TEAMS, true)
|
||||
}
|
||||
|
||||
findMatchingFunctions(event: HogFunctionInvocationGlobals): {
|
||||
matchingFunctions: HogFunctionType[]
|
||||
@ -112,15 +121,30 @@ export class HogExecutor {
|
||||
if (hogFunction.filters?.bytecode) {
|
||||
const start = performance.now()
|
||||
try {
|
||||
const filterResult = execHog(hogFunction.filters.bytecode, { globals: filtersGlobals })
|
||||
const filterResult = execHog(hogFunction.filters.bytecode, {
|
||||
globals: filtersGlobals,
|
||||
telemetry: this.telemetryMatcher(hogFunction.team_id),
|
||||
})
|
||||
if (typeof filterResult.result === 'boolean' && filterResult.result) {
|
||||
matchingFunctions.push(hogFunction)
|
||||
return
|
||||
}
|
||||
if (filterResult.error) {
|
||||
status.error('🦔', `[HogExecutor] Error filtering function`, {
|
||||
hogFunctionId: hogFunction.id,
|
||||
hogFunctionName: hogFunction.name,
|
||||
teamId: hogFunction.team_id,
|
||||
error: filterResult.error.message,
|
||||
result: filterResult,
|
||||
})
|
||||
erroredFunctions.push(hogFunction)
|
||||
return
|
||||
}
|
||||
} catch (error) {
|
||||
status.error('🦔', `[HogExecutor] Error filtering function`, {
|
||||
hogFunctionId: hogFunction.id,
|
||||
hogFunctionName: hogFunction.name,
|
||||
teamId: hogFunction.team_id,
|
||||
error: error.message,
|
||||
})
|
||||
erroredFunctions.push(hogFunction)
|
||||
@ -315,6 +339,9 @@ export class HogExecutor {
|
||||
},
|
||||
},
|
||||
})
|
||||
if (execRes.error) {
|
||||
throw execRes.error
|
||||
}
|
||||
} catch (e) {
|
||||
result.logs.push({
|
||||
level: 'error',
|
||||
|
@ -187,6 +187,7 @@ export function getDefaultConfig(): PluginsServerConfig {
|
||||
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
|
||||
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
|
||||
CDP_CYCLOTRON_ENABLED_TEAMS: '',
|
||||
CDP_HOG_FILTERS_TELEMETRY_TEAMS: '',
|
||||
CDP_REDIS_PASSWORD: '',
|
||||
CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: true,
|
||||
CDP_REDIS_HOST: '',
|
||||
|
@ -114,6 +114,7 @@ export type CdpConfig = {
|
||||
CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for
|
||||
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently
|
||||
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
|
||||
CDP_HOG_FILTERS_TELEMETRY_TEAMS: string
|
||||
CDP_CYCLOTRON_ENABLED_TEAMS: string
|
||||
CDP_CYCLOTRON_BATCH_SIZE: number
|
||||
CDP_CYCLOTRON_BATCH_DELAY_MS: number
|
||||
|
@ -174,7 +174,7 @@ describe('CDP API', () => {
|
||||
},
|
||||
{
|
||||
level: 'debug',
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 2039 bytes",
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 2064 bytes",
|
||||
},
|
||||
{
|
||||
level: 'info',
|
||||
@ -222,7 +222,7 @@ describe('CDP API', () => {
|
||||
},
|
||||
{
|
||||
level: 'debug',
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 2039 bytes",
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 2064 bytes",
|
||||
},
|
||||
{
|
||||
level: 'debug',
|
||||
|
@ -209,7 +209,7 @@ describe('CDP Function Processor', () => {
|
||||
])
|
||||
expect(kafkaMessages.logs.map((x) => x.value.message)).toEqual([
|
||||
'Executing function',
|
||||
"Suspending function due to async function call 'fetch'. Payload: 1931 bytes",
|
||||
"Suspending function due to async function call 'fetch'. Payload: 1956 bytes",
|
||||
'Resuming function',
|
||||
'Fetch response:, {"status":200,"body":{"success":true}}',
|
||||
expect.stringContaining('Function completed'),
|
||||
|
@ -173,7 +173,7 @@ describe('CDP Processed Events Consumer', () => {
|
||||
{
|
||||
topic: 'log_entries_test',
|
||||
value: {
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 1931 bytes",
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 1956 bytes",
|
||||
log_source_id: fnFetchNoFilters.id,
|
||||
},
|
||||
},
|
||||
|
@ -370,6 +370,7 @@ export const HOG_INPUTS_EXAMPLES: Record<string, Pick<HogFunctionType, 'inputs'
|
||||
|
||||
export const HOG_FILTERS_EXAMPLES: Record<string, Pick<HogFunctionType, 'filters'>> = {
|
||||
no_filters: { filters: { events: [], actions: [], bytecode: ['_h', 29] } },
|
||||
broken_filters: { filters: { events: [], actions: [], bytecode: ['_H', 1, 29, 35, 35, 35] } },
|
||||
pageview_or_autocapture_filter: {
|
||||
filters: {
|
||||
events: [
|
||||
|
@ -3,9 +3,23 @@ import { DateTime } from 'luxon'
|
||||
import { HogExecutor } from '../../src/cdp/hog-executor'
|
||||
import { HogFunctionManager } from '../../src/cdp/hog-function-manager'
|
||||
import { HogFunctionInvocation, HogFunctionType } from '../../src/cdp/types'
|
||||
import { Hub } from '../../src/types'
|
||||
import { createHub } from '../../src/utils/db/hub'
|
||||
import { status } from '../../src/utils/status'
|
||||
import { truth } from '../helpers/truth'
|
||||
import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples'
|
||||
import { createHogExecutionGlobals, createHogFunction, createInvocation } from './fixtures'
|
||||
|
||||
jest.mock('../../src/utils/status', () => ({
|
||||
status: {
|
||||
error: jest.fn(),
|
||||
info: jest.fn(),
|
||||
warn: jest.fn(),
|
||||
debug: jest.fn(),
|
||||
updatePrompt: jest.fn(),
|
||||
},
|
||||
}))
|
||||
|
||||
const setupFetchResponse = (invocation: HogFunctionInvocation, options?: { status?: number; body?: string }): void => {
|
||||
invocation.queue = 'hog'
|
||||
invocation.queueParameters = {
|
||||
@ -25,6 +39,7 @@ const setupFetchResponse = (invocation: HogFunctionInvocation, options?: { statu
|
||||
describe('Hog Executor', () => {
|
||||
jest.setTimeout(1000)
|
||||
let executor: HogExecutor
|
||||
let hub: Hub
|
||||
|
||||
const mockFunctionManager = {
|
||||
reloadAllHogFunctions: jest.fn(),
|
||||
@ -32,10 +47,11 @@ describe('Hog Executor', () => {
|
||||
getTeamHogFunction: jest.fn(),
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
beforeEach(async () => {
|
||||
jest.useFakeTimers()
|
||||
jest.setSystemTime(new Date('2024-06-07T12:00:00.000Z').getTime())
|
||||
executor = new HogExecutor(mockFunctionManager as any as HogFunctionManager)
|
||||
hub = await createHub()
|
||||
executor = new HogExecutor(hub, mockFunctionManager as any as HogFunctionManager)
|
||||
})
|
||||
|
||||
describe('general event processing', () => {
|
||||
@ -90,7 +106,7 @@ describe('Hog Executor', () => {
|
||||
{
|
||||
timestamp: expect.any(DateTime),
|
||||
level: 'debug',
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 1847 bytes",
|
||||
message: "Suspending function due to async function call 'fetch'. Payload: 1872 bytes",
|
||||
},
|
||||
])
|
||||
})
|
||||
@ -171,7 +187,7 @@ describe('Hog Executor', () => {
|
||||
expect(logs.map((log) => log.message)).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"Executing function",
|
||||
"Suspending function due to async function call 'fetch'. Payload: 1847 bytes",
|
||||
"Suspending function due to async function call 'fetch'. Payload: 1872 bytes",
|
||||
"Resuming function",
|
||||
"Fetch response:, {\\"status\\":200,\\"body\\":\\"success\\"}",
|
||||
"Function completed in 100ms. Sync: 0ms. Mem: 779 bytes. Ops: 22.",
|
||||
@ -190,7 +206,7 @@ describe('Hog Executor', () => {
|
||||
expect(logs.map((log) => log.message)).toMatchInlineSnapshot(`
|
||||
Array [
|
||||
"Executing function",
|
||||
"Suspending function due to async function call 'fetch'. Payload: 1847 bytes",
|
||||
"Suspending function due to async function call 'fetch'. Payload: 1872 bytes",
|
||||
"Resuming function",
|
||||
"Fetch response:, {\\"status\\":200,\\"body\\":{\\"foo\\":\\"bar\\"}}",
|
||||
"Function completed in 100ms. Sync: 0ms. Mem: 779 bytes. Ops: 22.",
|
||||
@ -228,6 +244,40 @@ describe('Hog Executor', () => {
|
||||
expect(resultsShouldMatch.nonMatchingFunctions).toHaveLength(0)
|
||||
})
|
||||
|
||||
it('logs telemetry', async () => {
|
||||
hub = await createHub({ CDP_HOG_FILTERS_TELEMETRY_TEAMS: '*' })
|
||||
executor = new HogExecutor(hub, mockFunctionManager as any as HogFunctionManager)
|
||||
|
||||
const fn = createHogFunction({
|
||||
...HOG_EXAMPLES.simple_fetch,
|
||||
...HOG_INPUTS_EXAMPLES.simple_fetch,
|
||||
...HOG_FILTERS_EXAMPLES.broken_filters,
|
||||
})
|
||||
mockFunctionManager.getTeamHogFunctions.mockReturnValue([fn])
|
||||
const resultsShouldMatch = executor.findMatchingFunctions(
|
||||
createHogExecutionGlobals({
|
||||
groups: {},
|
||||
event: {
|
||||
event: '$pageview',
|
||||
properties: {
|
||||
$current_url: 'https://posthog.com',
|
||||
},
|
||||
} as any,
|
||||
})
|
||||
)
|
||||
expect(resultsShouldMatch.erroredFunctions).toHaveLength(1)
|
||||
expect(status.error).toHaveBeenCalledWith(
|
||||
'🦔',
|
||||
expect.stringContaining('Error filtering function'),
|
||||
truth(
|
||||
(obj) =>
|
||||
'telemetry' in obj.result.state &&
|
||||
Array.isArray(obj.result.state.telemetry) &&
|
||||
obj.result.state.telemetry[0][3] === 'START'
|
||||
)
|
||||
)
|
||||
})
|
||||
|
||||
it('can use elements_chain_texts', () => {
|
||||
const fn = createHogFunction({
|
||||
...HOG_EXAMPLES.simple_fetch,
|
||||
|
44
plugin-server/tests/helpers/truth.ts
Normal file
44
plugin-server/tests/helpers/truth.ts
Normal file
@ -0,0 +1,44 @@
|
||||
export class AsymmetricMatcher<T> {
|
||||
protected sample: T
|
||||
$$typeof: symbol
|
||||
inverse?: boolean
|
||||
|
||||
constructor(sample: T) {
|
||||
this.$$typeof = Symbol.for('jest.asymmetricMatcher')
|
||||
this.sample = sample
|
||||
}
|
||||
}
|
||||
|
||||
class Truth extends AsymmetricMatcher<(value: any) => boolean> {
|
||||
constructor(sample: (value: any) => boolean, inverse: boolean = false) {
|
||||
if (typeof sample !== 'function') {
|
||||
throw new Error('Expected is not a function')
|
||||
}
|
||||
super(sample)
|
||||
this.inverse = inverse
|
||||
}
|
||||
|
||||
asymmetricMatch(other: any): boolean {
|
||||
const result = this.sample(other)
|
||||
|
||||
return this.inverse ? !result : result
|
||||
}
|
||||
|
||||
toString(): string {
|
||||
return `${this.inverse ? 'Not' : ''}Truth`
|
||||
}
|
||||
|
||||
toAsymmetricMatcher(): string {
|
||||
return `Truth<${this.sample}>`
|
||||
}
|
||||
}
|
||||
|
||||
export const truth = (sample: (value: any) => boolean): Truth => new Truth(sample)
|
||||
|
||||
export function partial<T>(objectOrArray: T): T {
|
||||
if (Array.isArray(objectOrArray)) {
|
||||
return expect.arrayContaining(objectOrArray)
|
||||
} else {
|
||||
return expect.objectContaining(objectOrArray)
|
||||
}
|
||||
}
|
@ -50,8 +50,8 @@ dependencies:
|
||||
specifier: 4.6.0
|
||||
version: 4.6.0(monaco-editor@0.49.0)(react-dom@18.2.0)(react@18.2.0)
|
||||
'@posthog/hogvm':
|
||||
specifier: ^1.0.50
|
||||
version: 1.0.50(luxon@3.5.0)
|
||||
specifier: ^1.0.52
|
||||
version: 1.0.52(luxon@3.5.0)
|
||||
'@posthog/icons':
|
||||
specifier: 0.8.1
|
||||
version: 0.8.1(react-dom@18.2.0)(react@18.2.0)
|
||||
@ -5394,8 +5394,8 @@ packages:
|
||||
resolution: {integrity: sha512-50/17A98tWUfQ176raKiOGXuYpLyyVMkxxG6oylzL3BPOlA6ADGdK7EYunSa4I064xerltq9TGXs8HmOk5E+vw==}
|
||||
dev: false
|
||||
|
||||
/@posthog/hogvm@1.0.50(luxon@3.5.0):
|
||||
resolution: {integrity: sha512-dkq/46mkVO6xpvUzzxxJ5IrSvDwTTU/pUPJk/0DHnmLBuzAwlpwzMhNLA2XdxJhCC0giXjN7NIg9PERg615Wlw==}
|
||||
/@posthog/hogvm@1.0.52(luxon@3.5.0):
|
||||
resolution: {integrity: sha512-Mn/tLjgZbeKQcp1OTYkCiGD9mNybGeg07baCPJj4EHXFz1EdfSlDzjfYPOcRlpTnAHhF037eYrIocsYbMRdhSg==}
|
||||
peerDependencies:
|
||||
luxon: ^3.4.4
|
||||
dependencies:
|
||||
@ -18314,7 +18314,7 @@ packages:
|
||||
react: '>=15'
|
||||
dependencies:
|
||||
react: 18.2.0
|
||||
unlayer-types: 1.95.0
|
||||
unlayer-types: 1.103.0
|
||||
dev: false
|
||||
|
||||
/react-error-boundary@3.1.4(react@18.2.0):
|
||||
@ -20861,8 +20861,8 @@ packages:
|
||||
resolution: {integrity: sha512-hAZsKq7Yy11Zu1DE0OzWjw7nnLZmJZYTDZZyEFHZdUhV8FkH5MCfoU1XMaxXovpyW5nq5scPqq0ZDP9Zyl04oQ==}
|
||||
engines: {node: '>= 10.0.0'}
|
||||
|
||||
/unlayer-types@1.95.0:
|
||||
resolution: {integrity: sha512-WsvQp85+Xl8Gggkt+dSSePawoVAnGqjJzJJcDhIQswlPejARBRzAhl5dOF1Q+LjQckhWhKNDaJ5tcOeT+PV4ew==}
|
||||
/unlayer-types@1.103.0:
|
||||
resolution: {integrity: sha512-aVZS7g5F6dWEoxc0dhSDqYYncu+LIMB/SerJi6u5FKVSfTWnzA2MTpjFCbGkOOi8rUiIOabeuEOfyO/WDnarJg==}
|
||||
dev: false
|
||||
|
||||
/unpipe@1.0.0:
|
||||
|
Loading…
Reference in New Issue
Block a user