0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-28 00:46:45 +01:00

feat: DeliveryHog extra logging and filtering (#22870)

This commit is contained in:
Ben White 2024-06-12 09:21:50 +02:00 committed by GitHub
parent 0f501710aa
commit f324ec6fbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 469 additions and 103 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 57 KiB

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 58 KiB

After

Width:  |  Height:  |  Size: 55 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

After

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

After

Width:  |  Height:  |  Size: 71 KiB

View File

@ -5,6 +5,7 @@ import { ActivityLogItem } from 'lib/components/ActivityLog/humanizeActivity'
import { apiStatusLogic } from 'lib/logic/apiStatusLogic'
import { objectClean, toParams } from 'lib/utils'
import posthog from 'posthog-js'
import { LogEntry } from 'scenes/pipeline/pipelineNodeLogsLogic'
import { SavedSessionRecordingPlaylistsResult } from 'scenes/session-recordings/saved-playlists/savedSessionRecordingPlaylistsLogic'
import { getCurrentExporterData } from '~/exporter/exporterViewLogic'
@ -1659,6 +1660,12 @@ const api = {
async update(id: HogFunctionType['id'], data: Partial<HogFunctionType>): Promise<HogFunctionType> {
return await new ApiRequest().hogFunction(id).update({ data })
},
async searchLogs(
id: HogFunctionType['id'],
params: Record<string, any> = {}
): Promise<PaginatedResponse<LogEntry>> {
return await new ApiRequest().hogFunction(id).withAction('logs').withQueryString(params).get()
},
},
annotations: {

View File

@ -1,4 +1,5 @@
import { LemonButton, LemonCheckbox, LemonInput, LemonTable } from '@posthog/lemon-ui'
import { IconSearch } from '@posthog/icons'
import { LemonButton, LemonCheckbox, LemonInput, LemonSnack, LemonTable } from '@posthog/lemon-ui'
import { useActions, useValues } from 'kea'
import { LOGS_PORTION_LIMIT } from 'lib/constants'
import { pluralize } from 'lib/utils'
@ -9,8 +10,9 @@ import { PipelineLogLevel, pipelineNodeLogsLogic } from './pipelineNodeLogsLogic
export function PipelineNodeLogs({ id, stage }: PipelineNodeLogicProps): JSX.Element {
const logic = pipelineNodeLogsLogic({ id, stage })
const { logs, logsLoading, backgroundLogs, columns, isThereMoreToLoad, selectedLogLevels } = useValues(logic)
const { revealBackground, loadMoreLogs, setSelectedLogLevels, setSearchTerm } = useActions(logic)
const { logs, logsLoading, backgroundLogs, columns, isThereMoreToLoad, selectedLogLevels, instanceId } =
useValues(logic)
const { revealBackground, loadMoreLogs, setSelectedLogLevels, setSearchTerm, setInstanceId } = useActions(logic)
return (
<div className="ph-no-capture space-y-2 flex-1">
@ -20,6 +22,13 @@ export function PipelineNodeLogs({ id, stage }: PipelineNodeLogicProps): JSX.Ele
fullWidth
onChange={setSearchTerm}
allowClear
prefix={
<>
<IconSearch />
{instanceId && <LemonSnack onClose={() => setInstanceId('')}>{instanceId}</LemonSnack>}
</>
}
/>
<div className="flex items-center gap-4">
<span className="mr-1">Show logs of level:</span>

View File

@ -5,7 +5,7 @@ export const HOG_FUNCTION_TEMPLATES: HogFunctionTemplateType[] = [
id: 'template-webhook',
name: 'HogHook',
description: 'Sends a webhook templated by the incoming event data',
hog: "fetch(inputs.url, {\n 'headers': inputs.headers,\n 'body': inputs.payload,\n 'method': inputs.method,\n 'payload': inputs.payload\n});",
hog: "fetch(inputs.url, {\n 'headers': inputs.headers,\n 'body': inputs.payload,\n 'method': inputs.method\n});",
inputs_schema: [
{
key: 'url',

View File

@ -1,4 +1,5 @@
import { LemonTableColumns } from '@posthog/lemon-ui'
import { TZLabel } from '@posthog/apps-common'
import { LemonTableColumns, Link } from '@posthog/lemon-ui'
import { actions, connect, events, kea, key, listeners, path, props, reducers, selectors } from 'kea'
import { loaders } from 'kea-loaders'
import { LOGS_PORTION_LIMIT } from 'lib/constants'
@ -28,13 +29,14 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
key(({ id }) => id),
path((key) => ['scenes', 'pipeline', 'pipelineNodeLogsLogic', key]),
connect((props: PipelineNodeLogicProps) => ({
values: [teamLogic(), ['currentTeamId'], pipelineNodeLogic(props), ['nodeBackend']],
values: [teamLogic(), ['currentTeamId'], pipelineNodeLogic(props), ['node']],
})),
actions({
setSelectedLogLevels: (levels: PipelineLogLevel[]) => ({
levels,
}),
setSearchTerm: (searchTerm: string) => ({ searchTerm }),
setInstanceId: (instanceId: string) => ({ instanceId }),
clearBackgroundLogs: true,
markLogsEnd: true,
}),
@ -44,15 +46,24 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
{
loadLogs: async () => {
let results: LogEntry[]
if (values.nodeBackend === PipelineBackend.BatchExport) {
if (values.node.backend === PipelineBackend.BatchExport) {
results = await api.batchExportLogs.search(
id as string,
values.node.id,
values.searchTerm,
values.selectedLogLevels
)
} else if (values.node.backend === PipelineBackend.HogFunction) {
const res = await api.hogFunctions.searchLogs(values.node.id, {
search: values.searchTerm,
levels: values.selectedLogLevels,
limit: LOGS_PORTION_LIMIT,
instance_id: values.instanceId,
})
results = res.results
} else {
results = await api.pluginLogs.search(
id as number,
values.node.id,
values.searchTerm,
logLevelsToTypeFilters(values.selectedLogLevels)
)
@ -66,13 +77,23 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
},
loadMoreLogs: async () => {
let results: LogEntry[]
if (values.nodeBackend === PipelineBackend.BatchExport) {
if (values.node.backend === PipelineBackend.BatchExport) {
results = await api.batchExportLogs.search(
id as string,
values.searchTerm,
values.selectedLogLevels,
values.trailingEntry as BatchExportLogEntry | null
)
} else if (values.node.backend === PipelineBackend.HogFunction) {
const res = await api.hogFunctions.searchLogs(values.node.id, {
search: values.searchTerm,
levels: values.selectedLogLevels,
limit: LOGS_PORTION_LIMIT,
before: values.trailingEntry?.timestamp,
instance_id: values.instanceId,
})
results = res.results
} else {
results = await api.pluginLogs.search(
id as number,
@ -105,7 +126,7 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
}
let results: LogEntry[]
if (values.nodeBackend === PipelineBackend.BatchExport) {
if (values.node.backend === PipelineBackend.BatchExport) {
results = await api.batchExportLogs.search(
id as string,
values.searchTerm,
@ -113,6 +134,16 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
null,
values.leadingEntry as BatchExportLogEntry | null
)
} else if (values.node.backend === PipelineBackend.HogFunction) {
const res = await api.hogFunctions.searchLogs(values.node.id, {
search: values.searchTerm,
levels: values.selectedLogLevels,
limit: LOGS_PORTION_LIMIT,
after: values.leadingEntry?.timestamp,
instance_id: values.instanceId,
})
results = res.results
} else {
results = await api.pluginLogs.search(
id as number,
@ -147,6 +178,12 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
setSearchTerm: (_, { searchTerm }) => searchTerm,
},
],
instanceId: [
'',
{
setInstanceId: (_, { instanceId }) => instanceId,
},
],
isThereMoreToLoad: [
true,
{
@ -155,7 +192,7 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
},
],
}),
selectors({
selectors(({ actions }) => ({
leadingEntry: [
(s) => [s.logs, s.backgroundLogs],
(logs: LogEntry[], backgroundLogs: LogEntry[]): LogEntry | null => {
@ -181,26 +218,76 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
},
],
columns: [
(s) => [s.nodeBackend],
(nodeBackend): LemonTableColumns<LogEntry> => {
(s) => [s.node],
(node): LemonTableColumns<LogEntry> => {
return [
{
title: 'Timestamp',
key: 'timestamp',
dataIndex: 'timestamp',
sorter: (a: LogEntry, b: LogEntry) => dayjs(a.timestamp).unix() - dayjs(b.timestamp).unix(),
render: (timestamp: string) => dayjs(timestamp).format('YYYY-MM-DD HH:mm:ss.SSS UTC'),
render: (timestamp: string) => <TZLabel time={timestamp} />,
width: 0,
},
{
title: nodeBackend === PipelineBackend.BatchExport ? 'Run Id' : 'Source',
dataIndex: nodeBackend === PipelineBackend.BatchExport ? 'run_id' : 'source',
key: nodeBackend === PipelineBackend.BatchExport ? 'run_id' : 'source',
width: 0,
title:
node.backend == PipelineBackend.HogFunction
? 'Invocation'
: node.backend == PipelineBackend.BatchExport
? 'Run Id'
: 'Source',
dataIndex:
node.backend == PipelineBackend.HogFunction
? 'instance_id'
: node.backend == PipelineBackend.BatchExport
? 'run_id'
: 'source',
key:
node.backend == PipelineBackend.HogFunction
? 'instance_id'
: node.backend == PipelineBackend.BatchExport
? 'run_id'
: 'source',
render: (instanceId: string) => (
<code className="whitespace-nowrap">
{node.backend === PipelineBackend.HogFunction ? (
<Link
subtle
onClick={() => {
actions.setInstanceId(instanceId)
}}
>
{instanceId}
</Link>
) : (
instanceId
)}
</code>
),
},
{
width: 100,
title: 'Level',
key: nodeBackend === PipelineBackend.BatchExport ? 'level' : 'type',
dataIndex: nodeBackend === PipelineBackend.BatchExport ? 'level' : 'type',
render: nodeBackend === PipelineBackend.BatchExport ? LogLevelDisplay : LogTypeDisplay,
key:
node.backend == PipelineBackend.HogFunction
? 'level'
: node.backend == PipelineBackend.BatchExport
? 'level'
: 'type',
dataIndex:
node.backend == PipelineBackend.HogFunction
? 'level'
: node.backend == PipelineBackend.BatchExport
? 'level'
: 'type',
render:
node.backend == PipelineBackend.HogFunction
? LogLevelDisplay
: node.backend == PipelineBackend.BatchExport
? LogLevelDisplay
: LogTypeDisplay,
},
{
title: 'Message',
@ -211,7 +298,7 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
] as LemonTableColumns<LogEntry>
},
],
}),
})),
listeners(({ actions }) => ({
setSelectedLogLevels: () => {
actions.loadLogs()
@ -222,6 +309,9 @@ export const pipelineNodeLogsLogic = kea<pipelineNodeLogsLogicType>([
}
actions.loadLogs()
},
setInstanceId: async () => {
actions.loadLogs()
},
})),
events(({ actions, cache }) => ({
afterMount: () => {

View File

@ -1,7 +1,7 @@
import { features, librdkafkaVersion, Message } from 'node-rdkafka'
import { Histogram } from 'prom-client'
import { KAFKA_EVENTS_JSON } from '../config/kafka-topics'
import { KAFKA_EVENTS_JSON, KAFKA_LOG_ENTRIES } from '../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../kafka/config'
import { createKafkaProducer } from '../kafka/producer'
@ -18,7 +18,7 @@ import { TeamManager } from '../worker/ingestion/team-manager'
import { RustyHook } from '../worker/rusty-hook'
import { HogExecutor } from './hog-executor'
import { HogFunctionManager } from './hog-function-manager'
import { HogFunctionInvocation, HogFunctionInvocationResult } from './types'
import { HogFunctionInvocationGlobals, HogFunctionInvocationResult, HogFunctionLogEntry } from './types'
import { convertToHogFunctionInvocationGlobals } from './utils'
// Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals
@ -51,7 +51,7 @@ export class CdpProcessedEventsConsumer {
organizationManager: OrganizationManager
groupTypeManager: GroupTypeManager
hogFunctionManager: HogFunctionManager
hogExecutor?: HogExecutor
hogExecutor: HogExecutor
appMetrics?: AppMetrics
topic: string
consumerGroupId: string
@ -71,6 +71,8 @@ export class CdpProcessedEventsConsumer {
this.organizationManager = new OrganizationManager(postgres, this.teamManager)
this.groupTypeManager = new GroupTypeManager(postgres, this.teamManager)
this.hogFunctionManager = new HogFunctionManager(postgres, config)
const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.config)
this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager, rustyHook)
}
private scheduleWork<T>(promise: Promise<T>): Promise<T> {
@ -79,8 +81,8 @@ export class CdpProcessedEventsConsumer {
return promise
}
public async consume(invocation: HogFunctionInvocation): Promise<HogFunctionInvocationResult[]> {
return await this.hogExecutor!.executeMatchingFunctions(invocation)
public async consume(event: HogFunctionInvocationGlobals): Promise<HogFunctionInvocationResult[]> {
return await this.hogExecutor!.executeMatchingFunctions(event)
}
public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise<void> {
@ -94,7 +96,7 @@ export class CdpProcessedEventsConsumer {
histogramKafkaBatchSize.observe(messages.length)
histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024)
const invocations: HogFunctionInvocation[] = []
const events: HogFunctionInvocationGlobals[] = []
await runInstrumentedFunction({
statsKey: `cdpFunctionExecutor.handleEachBatch.parseKafkaMessages`,
@ -106,6 +108,11 @@ export class CdpProcessedEventsConsumer {
try {
const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent
if (!this.hogFunctionManager.teamHasHogFunctions(clickHouseEvent.team_id)) {
// No need to continue if the team doesn't have any functions
return
}
let groupTypes: GroupTypeToColumnIndex | undefined = undefined
if (
@ -120,23 +127,18 @@ export class CdpProcessedEventsConsumer {
)
}
// TODO: Clean up all of this and parallelise
// TODO: We can fetch alot of teams and things in parallel
const team = await this.teamManager.fetchTeam(clickHouseEvent.team_id)
if (!team) {
return
}
const globals = convertToHogFunctionInvocationGlobals(
clickHouseEvent,
team,
this.config.SITE_URL ?? 'http://localhost:8000',
groupTypes
events.push(
convertToHogFunctionInvocationGlobals(
clickHouseEvent,
team,
this.config.SITE_URL ?? 'http://localhost:8000',
groupTypes
)
)
invocations.push({
globals,
})
} catch (e) {
status.error('Error parsing message', e)
}
@ -148,10 +150,14 @@ export class CdpProcessedEventsConsumer {
const invocationResults: HogFunctionInvocationResult[] = []
if (!events.length) {
return
}
await runInstrumentedFunction({
statsKey: `cdpFunctionExecutor.handleEachBatch.consumeBatch`,
func: async () => {
const results = await Promise.all(invocations.map((invocation) => this.consume(invocation)))
const results = await Promise.all(events.map((e) => this.consume(e)))
invocationResults.push(...results.flat())
},
})
@ -159,12 +165,31 @@ export class CdpProcessedEventsConsumer {
heartbeat()
// TODO: Follow up - process metrics from the invocationResults
// await runInstrumentedFunction({
// statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`,
// func: async () => {
// // TODO:
// },
// })
await runInstrumentedFunction({
statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`,
func: async () => {
const allLogs = invocationResults.reduce((acc, result) => {
return [...acc, ...result.logs]
}, [] as HogFunctionLogEntry[])
await Promise.all(
allLogs.map((x) =>
this.kafkaProducer!.produce({
topic: KAFKA_LOG_ENTRIES,
value: Buffer.from(JSON.stringify(x)),
key: x.instance_id,
waitForAck: true,
})
)
)
if (allLogs.length) {
status.info('🔁', `cdp-function-executor - produced logs`, {
size: allLogs.length,
})
}
},
})
},
})
}
@ -185,7 +210,6 @@ export class CdpProcessedEventsConsumer {
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
)
const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.config)
this.appMetrics =
this.hub?.appMetrics ??
new AppMetrics(
@ -193,7 +217,6 @@ export class CdpProcessedEventsConsumer {
this.config.APP_METRICS_FLUSH_FREQUENCY_MS,
this.config.APP_METRICS_FLUSH_MAX_QUEUE_SIZE
)
this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager, rustyHook)
this.kafkaProducer.producer.connect()
this.batchConsumer = await startBatchConsumer({

View File

@ -1,9 +1,11 @@
import { convertHogToJS, convertJSToHog, exec, ExecResult, VMState } from '@posthog/hogvm'
import { Webhook } from '@posthog/plugin-scaffold'
import { PluginsServerConfig } from 'types'
import { DateTime } from 'luxon'
import { PluginsServerConfig, TimestampFormat } from '../types'
import { trackedFetch } from '../utils/fetch'
import { status } from '../utils/status'
import { castTimestampOrNow, UUIDT } from '../utils/utils'
import { RustyHook } from '../worker/rusty-hook'
import { HogFunctionManager } from './hog-function-manager'
import {
@ -11,6 +13,8 @@ import {
HogFunctionInvocationAsyncResponse,
HogFunctionInvocationGlobals,
HogFunctionInvocationResult,
HogFunctionLogEntry,
HogFunctionLogEntryLevel,
HogFunctionType,
} from './types'
import { convertToHogFunctionFilterGlobal } from './utils'
@ -53,14 +57,14 @@ export class HogExecutor {
/**
* Intended to be invoked as a starting point from an event
*/
async executeMatchingFunctions(invocation: HogFunctionInvocation): Promise<HogFunctionInvocationResult[]> {
let functions = this.hogFunctionManager.getTeamHogFunctions(invocation.globals.project.id)
async executeMatchingFunctions(event: HogFunctionInvocationGlobals): Promise<HogFunctionInvocationResult[]> {
const allFunctionsForTeam = this.hogFunctionManager.getTeamHogFunctions(event.project.id)
const filtersGlobals = convertToHogFunctionFilterGlobal(invocation.globals)
const filtersGlobals = convertToHogFunctionFilterGlobal(event)
// Filter all functions based on the invocation
functions = Object.fromEntries(
Object.entries(functions).filter(([_key, value]) => {
const functions = Object.fromEntries(
Object.entries(allFunctionsForTeam).filter(([_key, value]) => {
try {
const filters = value.filters
@ -98,20 +102,27 @@ export class HogExecutor {
return []
}
status.info(
'🦔',
`[HogExecutor] Found ${Object.keys(functions).length} matching functions out of ${
Object.keys(allFunctionsForTeam).length
} for team`
)
const results: HogFunctionInvocationResult[] = []
for (const hogFunction of Object.values(functions)) {
// Add the source of the trigger to the globals
const modifiedGlobals: HogFunctionInvocationGlobals = {
...invocation.globals,
...event,
source: {
name: hogFunction.name ?? `Hog function: ${hogFunction.id}`,
url: `${invocation.globals.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`,
url: `${event.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`,
},
}
const result = await this.execute(hogFunction, {
...invocation,
id: new UUIDT().toString(),
globals: modifiedGlobals,
})
@ -152,6 +163,35 @@ export class HogExecutor {
status.info('🦔', `[HogExecutor] Executing function`, loggingContext)
let error: any = null
const logs: HogFunctionLogEntry[] = []
let lastTimestamp = DateTime.now()
const log = (level: HogFunctionLogEntryLevel, message: string) => {
// TRICKY: The log entries table is de-duped by timestamp, so we need to ensure that the timestamps are unique
// It is unclear how this affects parallel execution environments
let now = DateTime.now()
if (now <= lastTimestamp) {
// Ensure that the timestamps are unique
now = lastTimestamp.plus(1)
}
lastTimestamp = now
logs.push({
team_id: hogFunction.team_id,
log_source: 'hog_function',
log_source_id: hogFunction.id,
instance_id: invocation.id,
timestamp: castTimestampOrNow(now, TimestampFormat.ClickHouse),
level,
message,
})
}
if (!state) {
log('debug', `Executing function`)
} else {
log('debug', `Resuming function`)
}
try {
const globals = this.buildHogFunctionGlobals(hogFunction, invocation)
@ -164,15 +204,18 @@ export class HogExecutor {
// We need to pass these in but they don't actually do anything as it is a sync exec
fetch: async () => Promise.resolve(),
},
})
console.log('🦔', `[HogExecutor] TESTING`, {
asyncFunctionArgs: res.asyncFunctionArgs,
asyncFunctionName: res.asyncFunctionName,
globals: globals,
functions: {
print: (...args) => {
const message = args
.map((arg) => (typeof arg !== 'string' ? JSON.stringify(arg) : arg))
.join(', ')
log('info', message)
},
},
})
if (!res.finished) {
log('debug', `Suspending function due to async function call '${res.asyncFunctionName}'`)
status.info('🦔', `[HogExecutor] Function returned not finished. Executing async function`, {
...loggingContext,
asyncFunctionName: res.asyncFunctionName,
@ -189,28 +232,11 @@ export class HogExecutor {
)
// TODO: Log error somewhere
}
} else {
log('debug', `Function completed (${hogFunction.id}) (${hogFunction.name})!`)
}
// await this.appMetrics.queueMetric({
// teamId: hogFunction.team_id,
// appId: hogFunction.id, // Add this as a generic string ID
// category: 'hogFunction', // TODO: Figure this out
// successes: 1,
// })
} catch (err) {
error = err
// await this.appMetrics.queueError(
// {
// teamId: hogFunction.team_id,
// appId: hogFunction.id, // Add this as a generic string ID
// category: 'hogFunction',
// failures: 1,
// },
// {
// error,
// event,
// }
// )
status.error('🦔', `[HogExecutor] Error executing function ${hogFunction.id} - ${hogFunction.name}`, error)
}
@ -218,7 +244,7 @@ export class HogExecutor {
...invocation,
success: !error,
error,
logs: [], // TODO: Add logs
logs,
}
}

View File

@ -62,6 +62,10 @@ export class HogFunctionManager {
return this.cache[teamId] || {}
}
public teamHasHogFunctions(teamId: Team['id']): boolean {
return !!Object.keys(this.getTeamHogFunctions(teamId)).length
}
public async reloadAllHogFunctions(): Promise<void> {
this.cache = await fetchAllHogFunctionsGroupedByTeam(this.postgres)
status.info('🍿', 'Fetched all hog functions from DB anew')

View File

@ -97,14 +97,28 @@ export type HogFunctionFilterGlobals = {
}
}
export type HogFunctionLogEntrySource = 'system' | 'hog' | 'console'
export type HogFunctionLogEntryLevel = 'debug' | 'info' | 'warn' | 'error'
export interface HogFunctionLogEntry {
team_id: number
log_source: string // The kind of source (hog_function)
log_source_id: string // The id of the hog function
instance_id: string // The id of the specific invocation
timestamp: string
level: HogFunctionLogEntryLevel
message: string
}
export type HogFunctionInvocation = {
id: string
globals: HogFunctionInvocationGlobals
}
export type HogFunctionInvocationResult = HogFunctionInvocation & {
success: boolean
error?: any
logs: string[]
logs: HogFunctionLogEntry[]
}
export type HogFunctionInvocationAsyncRequest = HogFunctionInvocation & {

View File

@ -1,4 +1,5 @@
import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-processed-events-consumer'
import { HogFunctionType } from '../../src/cdp/types'
import { defaultConfig } from '../../src/config/config'
import { Hub, PluginsServerConfig, Team } from '../../src/types'
import { createHub } from '../../src/utils/db/hub'
@ -41,35 +42,56 @@ jest.mock('../../src/utils/fetch', () => {
}
})
const mockFetch = require('../../src/utils/fetch').trackedFetch
jest.mock('../../src/utils/db/kafka-producer-wrapper', () => {
const mockKafkaProducer = {
producer: {
connect: jest.fn(),
},
disconnect: jest.fn(),
produce: jest.fn(),
}
return {
KafkaProducerWrapper: jest.fn(() => mockKafkaProducer),
}
})
const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch
const mockProducer = require('../../src/utils/db/kafka-producer-wrapper').KafkaProducerWrapper()
jest.setTimeout(1000)
const noop = () => {}
const decodeKafkaMessage = (message: any): any => {
return {
...message,
value: JSON.parse(message.value.toString()),
}
}
describe('CDP Processed Events Consuner', () => {
let processor: CdpProcessedEventsConsumer
let hub: Hub
let closeHub: () => Promise<void>
let team: Team
const insertHogFunction = async (hogFunction) => {
const insertHogFunction = async (hogFunction: Partial<HogFunctionType>) => {
const item = await _insertHogFunction(hub.postgres, team, hogFunction)
// Trigger the reload that django would do
await processor.hogFunctionManager.reloadAllHogFunctions()
return item
}
beforeAll(async () => {
await resetTestDatabase()
})
beforeEach(async () => {
await resetTestDatabase()
;[hub, closeHub] = await createHub()
team = await getFirstTeam(hub)
processor = new CdpProcessedEventsConsumer(config, hub.postgres)
processor = new CdpProcessedEventsConsumer(config, hub)
await processor.start()
mockFetch.mockClear()
})
afterEach(async () => {
@ -142,5 +164,57 @@ describe('CDP Processed Events Consuner', () => {
]
`)
})
it('generates logs and produces them to kafka', async () => {
await insertHogFunction({
...HOG_EXAMPLES.simple_fetch,
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.no_filters,
})
// Create a message that should be processed by this function
// Run the function and check that it was executed
await processor.handleEachBatch(
[
createMessage(
createIncomingEvent(team.id, {
uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0',
event: '$pageview',
properties: JSON.stringify({
$lib_version: '1.0.0',
}),
})
),
],
noop
)
expect(mockFetch).toHaveBeenCalledTimes(1)
expect(mockProducer.produce).toHaveBeenCalledTimes(2)
expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toMatchObject({
key: expect.any(String),
topic: 'log_entries_test',
value: {
instance_id: expect.any(String),
level: 'debug',
log_source: 'hog_function',
log_source_id: expect.any(String),
message: 'Executing function',
team_id: 2,
timestamp: expect.any(String),
},
waitForAck: true,
})
expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toMatchObject({
topic: 'log_entries_test',
value: {
log_source: 'hog_function',
message: "Suspending function due to async function call 'fetch'",
team_id: 2,
},
})
})
})
})

View File

@ -58,9 +58,7 @@ describe('Hog Executor', () => {
// Create a message that should be processed by this function
// Run the function and check that it was executed
await executor.executeMatchingFunctions({
globals: createHogExecutionGlobals(),
})
await executor.executeMatchingFunctions(createHogExecutionGlobals())
expect(mockFetch).toHaveBeenCalledTimes(1)
expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(`
@ -106,21 +104,19 @@ describe('Hog Executor', () => {
[1]: fn,
})
const resultsShouldntMatch = await executor.executeMatchingFunctions({
globals: createHogExecutionGlobals(),
})
const resultsShouldntMatch = await executor.executeMatchingFunctions(createHogExecutionGlobals())
expect(resultsShouldntMatch).toHaveLength(0)
const resultsShouldMatch = await executor.executeMatchingFunctions({
globals: createHogExecutionGlobals({
const resultsShouldMatch = await executor.executeMatchingFunctions(
createHogExecutionGlobals({
event: {
name: '$pageview',
properties: {
$current_url: 'https://posthog.com',
},
} as any,
}),
})
})
)
expect(resultsShouldMatch).toHaveLength(1)
})
})

View File

@ -4,6 +4,7 @@ from rest_framework import serializers, viewsets
from rest_framework.serializers import BaseSerializer
from posthog.api.forbid_destroy_model import ForbidDestroyModel
from posthog.api.log_entries import LogEntryMixin
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.api.shared import UserBasicSerializer
from posthog.hogql.bytecode import create_bytecode
@ -167,7 +168,7 @@ class HogFunctionSerializer(HogFunctionMinimalSerializer):
return super().create(validated_data=validated_data)
class HogFunctionViewSet(TeamAndOrgViewSetMixin, ForbidDestroyModel, viewsets.ModelViewSet):
class HogFunctionViewSet(TeamAndOrgViewSetMixin, LogEntryMixin, ForbidDestroyModel, viewsets.ModelViewSet):
scope_object = "INTERNAL" # Keep internal until we are happy to release this GA
queryset = HogFunction.objects.all()
filter_backends = [DjangoFilterBackend]
@ -175,6 +176,7 @@ class HogFunctionViewSet(TeamAndOrgViewSetMixin, ForbidDestroyModel, viewsets.Mo
permission_classes = [PostHogFeatureFlagPermission]
posthog_feature_flag = {"hog-functions": ["create", "partial_update", "update"]}
log_source = "hog_function"
def get_serializer_class(self) -> type[BaseSerializer]:
return HogFunctionMinimalSerializer if self.action == "list" else HogFunctionSerializer

121
posthog/api/log_entries.py Normal file
View File

@ -0,0 +1,121 @@
import dataclasses
from datetime import datetime
from typing import Any, Optional, cast
from rest_framework import serializers, viewsets
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework_dataclasses.serializers import DataclassSerializer
from posthog.clickhouse.client.execute import sync_execute
@dataclasses.dataclass(frozen=True)
class LogEntry:
log_source_id: str
instance_id: str
timestamp: datetime
level: str
message: str
class LogEntrySerializer(DataclassSerializer):
class Meta:
dataclass = LogEntry
class LogEntryRequestSerializer(serializers.Serializer):
limit = serializers.IntegerField(required=False, default=50, max_value=500, min_value=1)
after = serializers.DateTimeField(required=False)
before = serializers.DateTimeField(required=False)
level = serializers.ListField(child=serializers.CharField(), required=False)
search = serializers.CharField(required=False)
instance_id = serializers.CharField(required=False)
def fetch_log_entries(
team_id: int,
log_source: str,
log_source_id: str,
limit: int,
instance_id: Optional[str] = None,
after: Optional[datetime] = None,
before: Optional[datetime] = None,
search: Optional[str] = None,
level: Optional[list[str]] = None,
) -> list[Any]:
"""Fetch a list of batch export log entries from ClickHouse."""
if level is None:
level = []
clickhouse_where_parts: list[str] = []
clickhouse_kwargs: dict[str, Any] = {}
clickhouse_where_parts.append("log_source = %(log_source)s")
clickhouse_kwargs["log_source"] = log_source
clickhouse_where_parts.append("log_source_id = %(log_source_id)s")
clickhouse_kwargs["log_source_id"] = log_source_id
clickhouse_where_parts.append("team_id = %(team_id)s")
clickhouse_kwargs["team_id"] = team_id
if instance_id:
clickhouse_where_parts.append("instance_id = %(instance_id)s")
clickhouse_kwargs["instance_id"] = instance_id
if after:
clickhouse_where_parts.append("timestamp > toDateTime64(%(after)s, 6)")
clickhouse_kwargs["after"] = after.isoformat().replace("+00:00", "")
if before:
clickhouse_where_parts.append("timestamp < toDateTime64(%(before)s, 6)")
clickhouse_kwargs["before"] = before.isoformat().replace("+00:00", "")
if search:
clickhouse_where_parts.append("message ILIKE %(search)s")
clickhouse_kwargs["search"] = f"%{search}%"
if len(level) > 0:
clickhouse_where_parts.append("upper(level) in %(levels)s")
clickhouse_kwargs["levels"] = level
clickhouse_query = f"""
SELECT log_source_id, instance_id, timestamp, upper(level) as level, message FROM log_entries
WHERE {' AND '.join(clickhouse_where_parts)} ORDER BY timestamp DESC {f'LIMIT {limit}'}
"""
return [LogEntry(*result) for result in cast(list, sync_execute(clickhouse_query, clickhouse_kwargs))]
class LogEntryMixin(viewsets.GenericViewSet):
log_source: str # Should be set by the inheriting class
@action(detail=True, methods=["GET"])
def logs(self, request: Request, *args, **kwargs):
obj = self.get_object()
param_serializer = LogEntryRequestSerializer(data=request.query_params)
if not self.log_source:
raise ValidationError("log_source not set on the viewset")
if not param_serializer.is_valid():
raise ValidationError(param_serializer.errors)
params = param_serializer.validated_data
data = fetch_log_entries(
team_id=self.team_id, # type: ignore
log_source=self.log_source,
log_source_id=str(obj.id),
limit=params["limit"],
# From request params
instance_id=params.get("instance_id"),
after=params.get("after"),
before=params.get("before"),
search=params.get("search"),
level=params.get("level"),
)
page = self.paginate_queryset(data)
if page is not None:
serializer = LogEntrySerializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = LogEntrySerializer(data, many=True)
return Response({"status": "not implemented"})