0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-28 09:16:49 +01:00

Add tags to dead letter queue table (#8345)

* add tags to dead letter queue table

* Update ee/clickhouse/migrations/0023_dead_letter_queue_tags.py

* fix typo

* fixes

* update snapshots

* fix tests

* fix test

* update snapshot

* send tags

* update

* Update ee/clickhouse/migrations/0023_dead_letter_queue_tags.py

* fix plugin server test?

* Update plugin-server/src/worker/ingestion/utils.ts

* Update plugin-server/src/types.ts

* Update plugin-server/src/worker/ingestion/utils.ts
This commit is contained in:
Yakko Majuri 2022-02-04 15:36:15 -05:00 committed by GitHub
parent 7d22c09f45
commit fca0a46772
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 37 additions and 10 deletions

View File

@ -0,0 +1,14 @@
from infi.clickhouse_orm import migrations
from ee.clickhouse.sql.dead_letter_queue import DEAD_LETTER_QUEUE_TABLE_MV_SQL, KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL
from posthog.settings import CLICKHOUSE_CLUSTER
operations = [
migrations.RunSQL(f"DROP TABLE events_dead_letter_queue_mv ON CLUSTER {CLICKHOUSE_CLUSTER}"),
migrations.RunSQL(f"DROP TABLE kafka_events_dead_letter_queue ON CLUSTER {CLICKHOUSE_CLUSTER}"),
migrations.RunSQL(
f"ALTER TABLE events_dead_letter_queue ON CLUSTER {CLICKHOUSE_CLUSTER} ADD COLUMN IF NOT EXISTS tags Array(VARCHAR) AFTER error"
),
migrations.RunSQL(KAFKA_DEAD_LETTER_QUEUE_TABLE_SQL()),
migrations.RunSQL(DEAD_LETTER_QUEUE_TABLE_MV_SQL),
]

View File

@ -24,7 +24,8 @@ CREATE TABLE IF NOT EXISTS {table_name} ON CLUSTER {cluster}
raw_payload VARCHAR,
error_timestamp DateTime64(6, 'UTC'),
error_location VARCHAR,
error VARCHAR
error VARCHAR,
tags Array(VARCHAR)
{extra_fields}
) ENGINE = {engine}
"""
@ -69,6 +70,7 @@ raw_payload,
error_timestamp,
error_location,
error,
tags,
_timestamp,
_offset
FROM {database}.kafka_{table_name}
@ -95,6 +97,7 @@ SELECT
%(error_timestamp)s,
%(error_location)s,
%(error)s,
['some_tag'],
0,
now()
"""

View File

@ -44,7 +44,7 @@
'
---
# name: test_create_kafka_table_with_different_kafka_host[\nCREATE TABLE IF NOT EXISTS kafka_events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR\n \n) ENGINE = Kafka('kafka', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')\n]
# name: test_create_kafka_table_with_different_kafka_host[\nCREATE TABLE IF NOT EXISTS kafka_events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR,\n tags Array(VARCHAR)\n \n) ENGINE = Kafka('kafka', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')\n]
'
CREATE TABLE IF NOT EXISTS kafka_events_dead_letter_queue ON CLUSTER posthog
@ -63,7 +63,8 @@
raw_payload VARCHAR,
error_timestamp DateTime64(6, 'UTC'),
error_location VARCHAR,
error VARCHAR
error VARCHAR,
tags Array(VARCHAR)
) ENGINE = Kafka('test.kafka.broker:9092', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')
@ -167,7 +168,7 @@
'
---
# name: test_create_table_query[\nCREATE MATERIALIZED VIEW IF NOT EXISTS events_dead_letter_queue_mv ON CLUSTER posthog\nTO posthog_test.events_dead_letter_queue\nAS SELECT\nid,\nevent_uuid,\nevent,\nproperties,\ndistinct_id,\nteam_id,\nelements_chain,\ncreated_at,\nip,\nsite_url,\nnow,\nraw_payload,\nerror_timestamp,\nerror_location,\nerror,\n_timestamp,\n_offset\nFROM posthog_test.kafka_events_dead_letter_queue\n]
# name: test_create_table_query[\nCREATE MATERIALIZED VIEW IF NOT EXISTS events_dead_letter_queue_mv ON CLUSTER posthog\nTO posthog_test.events_dead_letter_queue\nAS SELECT\nid,\nevent_uuid,\nevent,\nproperties,\ndistinct_id,\nteam_id,\nelements_chain,\ncreated_at,\nip,\nsite_url,\nnow,\nraw_payload,\nerror_timestamp,\nerror_location,\nerror,\ntags,\n_timestamp,\n_offset\nFROM posthog_test.kafka_events_dead_letter_queue\n]
'
CREATE MATERIALIZED VIEW IF NOT EXISTS events_dead_letter_queue_mv ON CLUSTER posthog
@ -188,6 +189,7 @@
error_timestamp,
error_location,
error,
tags,
_timestamp,
_offset
FROM posthog_test.kafka_events_dead_letter_queue
@ -371,7 +373,7 @@
'
---
# name: test_create_table_query[\nCREATE TABLE IF NOT EXISTS events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR\n \n, _timestamp DateTime\n, _offset UInt64\n\n) ENGINE = ReplacingMergeTree(_timestamp)\nORDER BY (id, event_uuid, distinct_id, team_id)\n\nSETTINGS index_granularity=512\n]
# name: test_create_table_query[\nCREATE TABLE IF NOT EXISTS events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR,\n tags Array(VARCHAR)\n \n, _timestamp DateTime\n, _offset UInt64\n\n) ENGINE = ReplacingMergeTree(_timestamp)\nORDER BY (id, event_uuid, distinct_id, team_id)\n\nSETTINGS index_granularity=512\n]
'
CREATE TABLE IF NOT EXISTS events_dead_letter_queue ON CLUSTER posthog
@ -390,7 +392,8 @@
raw_payload VARCHAR,
error_timestamp DateTime64(6, 'UTC'),
error_location VARCHAR,
error VARCHAR
error VARCHAR,
tags Array(VARCHAR)
, _timestamp DateTime
, _offset UInt64
@ -449,7 +452,7 @@
'
---
# name: test_create_table_query[\nCREATE TABLE IF NOT EXISTS kafka_events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR\n \n) ENGINE = Kafka('kafka', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')\n]
# name: test_create_table_query[\nCREATE TABLE IF NOT EXISTS kafka_events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR,\n tags Array(VARCHAR)\n \n) ENGINE = Kafka('kafka', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')\n]
'
CREATE TABLE IF NOT EXISTS kafka_events_dead_letter_queue ON CLUSTER posthog
@ -468,7 +471,8 @@
raw_payload VARCHAR,
error_timestamp DateTime64(6, 'UTC'),
error_location VARCHAR,
error VARCHAR
error VARCHAR,
tags Array(VARCHAR)
) ENGINE = Kafka('kafka', 'events_dead_letter_queue_test', 'group1', 'JSONEachRow')
@ -754,7 +758,7 @@
'
---
# name: test_create_table_query_replicated_and_storage[\nCREATE TABLE IF NOT EXISTS events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR\n \n, _timestamp DateTime\n, _offset UInt64\n\n) ENGINE = ReplacingMergeTree(_timestamp)\nORDER BY (id, event_uuid, distinct_id, team_id)\n\nSETTINGS index_granularity=512\n]
# name: test_create_table_query_replicated_and_storage[\nCREATE TABLE IF NOT EXISTS events_dead_letter_queue ON CLUSTER posthog\n(\n id UUID,\n event_uuid UUID,\n event VARCHAR,\n properties VARCHAR,\n distinct_id VARCHAR,\n team_id Int64,\n elements_chain VARCHAR,\n created_at DateTime64(6, 'UTC'),\n ip VARCHAR,\n site_url VARCHAR,\n now DateTime64(6, 'UTC'),\n raw_payload VARCHAR,\n error_timestamp DateTime64(6, 'UTC'),\n error_location VARCHAR,\n error VARCHAR,\n tags Array(VARCHAR)\n \n, _timestamp DateTime\n, _offset UInt64\n\n) ENGINE = ReplacingMergeTree(_timestamp)\nORDER BY (id, event_uuid, distinct_id, team_id)\n\nSETTINGS index_granularity=512\n]
'
CREATE TABLE IF NOT EXISTS events_dead_letter_queue ON CLUSTER posthog
@ -773,7 +777,8 @@
raw_payload VARCHAR,
error_timestamp DateTime64(6, 'UTC'),
error_location VARCHAR,
error VARCHAR
error VARCHAR,
tags Array(VARCHAR)
, _timestamp DateTime
, _offset UInt64

View File

@ -499,6 +499,7 @@ export interface DeadLetterQueueEvent {
error_timestamp: string
error_location: string
error: string
tags: string[]
_timestamp: string
_offset: number
}

View File

@ -35,7 +35,9 @@ export function generateEventDeadLetterQueueMessage(
raw_payload: JSON.stringify(event),
error_location: errorLocation,
error: errorMessage,
tags: ['plugin_server', 'ingest_event'],
}
const message = {
topic: KAFKA_EVENTS_DEAD_LETTER_QUEUE,
messages: [

View File

@ -25,6 +25,7 @@ export async function resetTestDatabaseClickhouse(extraServerConfig: Partial<Plu
await clickhouse.querying('TRUNCATE session_recording_events_mv')
await clickhouse.querying('TRUNCATE plugin_log_entries')
await clickhouse.querying('TRUNCATE events_dead_letter_queue')
await clickhouse.querying('TRUNCATE events_dead_letter_queue_mv')
await clickhouse.querying('TRUNCATE groups')
await clickhouse.querying('TRUNCATE groups_mv')
}

View File

@ -78,6 +78,7 @@ def log_event_to_dead_letter_queue(
data["event"] = event_name
data["raw_payload"] = json.dumps(raw_payload)
data["now"] = datetime.fromisoformat(data["now"]).replace(tzinfo=None).isoformat() if data["now"] else None
data["tags"] = json.dumps(["django_server"])
data["event_uuid"] = event["uuid"]
del data["uuid"]