mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 13:39:22 +01:00
225 lines
7.1 KiB
YAML
225 lines
7.1 KiB
YAML
acknowledgements:
|
|
enabled: true
|
|
|
|
api:
|
|
enabled: true
|
|
address: 0.0.0.0:8686
|
|
playground: true
|
|
|
|
enrichment_tables:
|
|
quota_limited_teams:
|
|
type: file
|
|
file:
|
|
path: '${QUOTA_LIMITED_TEAMS_PATH:-/etc/vector/data/quota_limited_teams.csv}'
|
|
encoding:
|
|
type: csv
|
|
schema:
|
|
token: string
|
|
overflow_sessions:
|
|
type: file
|
|
file:
|
|
path: '${OVERFLOW_SESSIONS_PATH:-/etc/vector/data/overflow_sessions.csv}'
|
|
encoding:
|
|
type: csv
|
|
schema:
|
|
token: string
|
|
|
|
sources:
|
|
capture_server:
|
|
type: http_server
|
|
address: 0.0.0.0:8000
|
|
strict_path: false
|
|
query_parameters:
|
|
- _
|
|
host_key: ip
|
|
response_body_key: '%response'
|
|
decoding:
|
|
codec: vrl
|
|
vrl:
|
|
source: |
|
|
._message, err = decode_gzip(.message)
|
|
if err == null {
|
|
.message = parse_json!(del(._message))
|
|
} else {
|
|
# if we failed to decode gzip then ._message is empty
|
|
.message = parse_json!(.message)
|
|
del(._message)
|
|
}
|
|
.message[0].distinct_id = .message[0]."$$distinct_id" || .message[0].properties.distinct_id || .message[0].distinct_id
|
|
if is_integer(.message[0].distinct_id) {
|
|
.message[0].distinct_id, _ = to_string(.message[0].distinct_id)
|
|
}
|
|
|
|
%token = .message[0].properties.token || .message[0].api_key
|
|
|
|
if !is_string(%token) || !is_string(.message[0].distinct_id) {
|
|
for_each(array(.message)) -> |_index, value| {
|
|
del(value.properties."$$snapshot_data")
|
|
}
|
|
log(truncate(encode_json(.), 1000), rate_limit_secs: 0)
|
|
}
|
|
|
|
assert!(is_string(.message[0].distinct_id), "distinct_id is required")
|
|
assert!(is_string(.message[0].properties."$$session_id"), "$$session_id is required")
|
|
assert!(is_string(%token), "token is required")
|
|
|
|
%response = {"status": 1}
|
|
|
|
transforms:
|
|
quota_check:
|
|
type: route
|
|
inputs:
|
|
- capture_server
|
|
route:
|
|
quota_limited:
|
|
type: vrl
|
|
source: |
|
|
_, err = get_enrichment_table_record("quota_limited_teams", { "token": %token })
|
|
err == null # err is not null if row not found, we want to drop where the row _is_ found
|
|
|
|
events_parsed:
|
|
type: remap
|
|
inputs:
|
|
- quota_check._unmatched
|
|
drop_on_abort: true
|
|
drop_on_error: true
|
|
reroute_dropped: true
|
|
source: |
|
|
event = {
|
|
"ip": .ip,
|
|
"uuid": uuid_v7(),
|
|
"distinct_id": .message[0].distinct_id,
|
|
"session_id": .message[0].properties."$$session_id",
|
|
"now": format_timestamp!(.timestamp, "%+", "UTC"),
|
|
"token": %token,
|
|
}
|
|
|
|
event.sent_at, err = from_unix_timestamp(to_int!(."_"), "milliseconds")
|
|
if err != null {
|
|
event.sent_at = event.now
|
|
} else {
|
|
event.sent_at = format_timestamp!(event.sent_at, "%+", "UTC")
|
|
}
|
|
|
|
snapshot_items = flatten(map_values(array!(.message)) -> |value| {
|
|
if is_array(value.properties."$$snapshot_data") {
|
|
array!(value.properties."$$snapshot_data")
|
|
} else {
|
|
[value.properties."$$snapshot_data"]
|
|
}
|
|
})
|
|
|
|
data = {
|
|
"uuid": event.uuid,
|
|
"event": "$$snapshot_items",
|
|
"properties": {
|
|
"distinct_id": event.distinct_id,
|
|
"$$session_id": .message[0].properties."$$session_id",
|
|
"$$window_id": .message[0].properties."$$window_id",
|
|
"$$snapshot_source": .message[0].properties."$$snapshot_source" || "web",
|
|
"$$snapshot_items": snapshot_items
|
|
}
|
|
}
|
|
|
|
event.data = encode_json(data)
|
|
. = event
|
|
|
|
%headers = {
|
|
"token": .token
|
|
}
|
|
|
|
overflow_check:
|
|
type: route
|
|
inputs:
|
|
- events_parsed
|
|
route:
|
|
overflow:
|
|
type: vrl
|
|
source: |
|
|
_, err = get_enrichment_table_record("overflow_sessions", { "session_id": .session_id })
|
|
err == null # err is not null if row not found, we want to drop where the row _is_ found
|
|
|
|
log_errors:
|
|
type: remap
|
|
inputs:
|
|
- events_parsed.dropped
|
|
source: |
|
|
log({
|
|
"event": "error",
|
|
"reason": "events_parsed.dropped",
|
|
"token": %token,
|
|
"session_id": .message[0].properties."$$session_id",
|
|
"distinct_id": .message[0].distinct_id
|
|
}, rate_limit_secs: 0)
|
|
|
|
metric_quota_dropped:
|
|
type: log_to_metric
|
|
inputs:
|
|
- quota_check.quota_limited
|
|
metrics:
|
|
- type: counter
|
|
field: message
|
|
kind: incremental
|
|
name: vector_capture_quota_dropped_count
|
|
tags:
|
|
token: '{{%token}}'
|
|
sinks:
|
|
# invalid sink to catch and raise errors
|
|
# without this vector drops them silently
|
|
error:
|
|
type: file
|
|
path: ''
|
|
encoding:
|
|
codec: json
|
|
acknowledgements:
|
|
enabled: true
|
|
inputs:
|
|
- log_errors
|
|
|
|
dropped:
|
|
type: blackhole
|
|
acknowledgements:
|
|
enabled: true
|
|
inputs:
|
|
- metric_quota_dropped
|
|
|
|
kafka: &kafka
|
|
type: kafka
|
|
acknowledgements:
|
|
enabled: true
|
|
inputs:
|
|
- overflow_check._unmatched
|
|
buffer:
|
|
- type: memory
|
|
max_events: 10000
|
|
when_full: block
|
|
bootstrap_servers: $KAFKA_BOOSTRAP_SERVERS
|
|
topic: $KAFKA_EVENTS_TOPIC
|
|
compression: gzip
|
|
key_field: .session_id
|
|
headers_key: '%headers'
|
|
tls:
|
|
enabled: false
|
|
encoding:
|
|
codec: json
|
|
librdkafka_options:
|
|
client.id: ${CLIENT_ID:-$HOSTNAME}
|
|
linger.ms: '0'
|
|
topic.metadata.refresh.interval.ms: '20000'
|
|
queue.buffering.max.kbytes: '1048576'
|
|
queue.buffering.max.messages: '100'
|
|
message.max.bytes: '64000000'
|
|
batch.size: '1600000'
|
|
batch.num.messages: '100'
|
|
sticky.partitioning.linger.ms: '25'
|
|
enable.idempotence: 'false'
|
|
max.in.flight.requests.per.connection: '1000000'
|
|
partitioner: 'murmur2_random'
|
|
message_timeout_ms: 10000
|
|
socket_timeout_ms: 5000
|
|
kafka_overflow:
|
|
<<: *kafka
|
|
inputs:
|
|
- overflow_check.overflow
|
|
topic: $KAFKA_OVERFLOW_TOPIC
|