acknowledgements: enabled: true api: enabled: true address: 0.0.0.0:8686 playground: true enrichment_tables: quota_limited_teams: type: file file: path: '${QUOTA_LIMITED_TEAMS_PATH:-/etc/vector/data/quota_limited_teams.csv}' encoding: type: csv schema: token: string overflow_sessions: type: file file: path: '${OVERFLOW_SESSIONS_PATH:-/etc/vector/data/overflow_sessions.csv}' encoding: type: csv schema: token: string sources: capture_server: type: http_server address: 0.0.0.0:8000 strict_path: false query_parameters: - _ host_key: ip decoding: codec: vrl vrl: source: | ._message, err = decode_gzip(.message) if err == null { .message = parse_json!(del(._message)) } else { # if we failed to decode gzip then ._message is empty .message = parse_json!(.message) del(._message) } .message[0].distinct_id = .message[0]."$$distinct_id" || .message[0].properties.distinct_id || .message[0].distinct_id if is_integer(.message[0].distinct_id) { .message[0].distinct_id, _ = to_string(.message[0].distinct_id) } %token = .message[0].properties.token || .message[0].api_key if !is_string(%token) || !is_string(.message[0].distinct_id) { for_each(array(.message)) -> |_index, value| { del(value.properties."$$snapshot_data") } log(truncate(encode_json(.), 1000), rate_limit_secs: 0) } assert!(is_string(.message[0].distinct_id), "distinct_id is required") assert!(is_string(.message[0].properties."$$session_id"), "$$session_id is required") assert!(is_string(%token), "token is required") transforms: quota_check: type: route inputs: - capture_server route: quota_limited: type: vrl source: | _, err = get_enrichment_table_record("quota_limited_teams", { "token": %token }) err == null # err is not null if row not found, we want to drop where the row _is_ found events_parsed: type: remap inputs: - quota_check._unmatched drop_on_abort: true drop_on_error: true reroute_dropped: true source: | event = { "ip": .ip, "uuid": uuid_v7(), "distinct_id": .message[0].distinct_id, "session_id": .message[0].properties."$$session_id", "now": format_timestamp!(.timestamp, "%+", "UTC"), "token": %token, } event.sent_at, err = from_unix_timestamp(to_int!(."_"), "milliseconds") if err != null { event.sent_at = event.now } else { event.sent_at = format_timestamp!(event.sent_at, "%+", "UTC") } snapshot_items = flatten(map_values(array!(.message)) -> |value| { if is_array(value.properties."$$snapshot_data") { array!(value.properties."$$snapshot_data") } else { [value.properties."$$snapshot_data"] } }) data = { "uuid": event.uuid, "event": "$$snapshot_items", "properties": { "distinct_id": event.distinct_id, "$$session_id": .message[0].properties."$$session_id", "$$window_id": .message[0].properties."$$window_id", "$$snapshot_source": .message[0].properties."$$snapshot_source" || "web", "$$snapshot_items": snapshot_items } } event.data = encode_json(data) . = event %headers = { "token": .token } overflow_check: type: route inputs: - events_parsed route: overflow: type: vrl source: | _, err = get_enrichment_table_record("overflow_sessions", { "session_id": .session_id }) err == null # err is not null if row not found, we want to drop where the row _is_ found log_errors: type: remap inputs: - events_parsed.dropped source: | log({ "event": "error", "reason": "events_parsed.dropped", "token": %token, "session_id": .message[0].properties."$$session_id", "distinct_id": .message[0].distinct_id }, rate_limit_secs: 0) metric_quota_dropped: type: log_to_metric inputs: - quota_check.quota_limited metrics: - type: counter field: message kind: incremental name: vector_capture_quota_dropped_count tags: token: '{{%token}}' sinks: # invalid sink to catch and raise errors # without this vector drops them silently error: type: file path: '' encoding: codec: json acknowledgements: enabled: true inputs: - log_errors dropped: type: blackhole acknowledgements: enabled: true inputs: - metric_quota_dropped kafka: &kafka type: kafka acknowledgements: enabled: true inputs: - overflow_check._unmatched buffer: - type: memory max_events: 10000 when_full: block bootstrap_servers: $KAFKA_BOOSTRAP_SERVERS topic: $KAFKA_EVENTS_TOPIC compression: gzip key_field: .session_id headers_key: '%headers' tls: enabled: false encoding: codec: json librdkafka_options: client.id: ${CLIENT_ID:-$HOSTNAME} linger.ms: '0' topic.metadata.refresh.interval.ms: '20000' queue.buffering.max.kbytes: '1048576' queue.buffering.max.messages: '100' message.max.bytes: '64000000' batch.size: '1600000' batch.num.messages: '100' sticky.partitioning.linger.ms: '25' enable.idempotence: 'false' max.in.flight.requests.per.connection: '1000000' partitioner: 'consistent_random' message_timeout_ms: 10000 socket_timeout_ms: 5000 kafka_overflow: <<: *kafka inputs: - overflow_check.overflow topic: $KAFKA_OVERFLOW_TOPIC