0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-21 21:49:51 +01:00

feat: Add replay capture vector config+docker image and deployment (#24264)

This commit is contained in:
Frank Hamand 2024-08-08 16:57:13 +01:00 committed by GitHub
parent 4aedc89447
commit 09700cc16f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 500 additions and 1 deletions

35
.github/workflows/replay-capture.yml vendored Normal file
View File

@ -0,0 +1,35 @@
name: Vector Replay Capture Tests
on:
workflow_dispatch:
pull_request:
paths:
- vector/**
- .github/workflows/replay-capture.yml
workflow_call:
jobs:
vector-test:
name: Vector test
runs-on: ubuntu-20.04
env:
QUOTA_LIMITED_TEAMS_PATH: vector/replay-capture/tests/quota_limited_teams.csv
OVERFLOW_SESSIONS_PATH: vector/replay-capture/tests/overflow_sessions.csv
KAFKA_BOOSTRAP_SERVERS: dummy:9092
KAFKA_EVENTS_TOPIC: session_recording_snapshot_item_events
KAFKA_OVERFLOW_TOPIC: session_recording_snapshot_item_overflow
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Install Vector
run: |
wget https://github.com/vectordotdev/vector/releases/download/v0.40.0/vector-0.40.0-x86_64-unknown-linux-gnu.tar.gz
tar xzvf vector-0.40.0-x86_64-unknown-linux-gnu.tar.gz ./vector-x86_64-unknown-linux-gnu/bin/vector
sudo mv ./vector-x86_64-unknown-linux-gnu/bin/vector /usr/bin/vector
- name: Run vector tests
run: |
yq -i e 'explode(.)' vector/replay-capture/vector.yaml
vector test vector/replay-capture/*.yaml

View File

@ -0,0 +1,105 @@
name: Build and deploy replay capture container images
on:
workflow_dispatch:
push:
paths:
- 'vector/**'
- '.github/workflows/vector-docker-build-deploy.yml'
branches:
- 'master'
jobs:
build:
name: Build and publish container image
runs-on: depot-ubuntu-22.04-4
permissions:
id-token: write # allow issuing OIDC tokens for this workflow run
contents: read # allow reading the repo contents
packages: write # allow push to ghcr.io
defaults:
run:
working-directory: vector/
steps:
- name: Check Out Repo
# Checkout project code
# Use sparse checkout to only select files in vector directory
# Turning off cone mode ensures that files in the project root are not included during checkout
uses: actions/checkout@v4
with:
sparse-checkout: 'vector/'
sparse-checkout-cone-mode: false
- name: Set up Depot CLI
uses: depot/setup-action@v1
- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: posthog
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Login to ghcr.io
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
logout: false
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: ghcr.io/posthog/posthog/replay-capture
tags: |
type=ref,event=pr
type=ref,event=branch
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
type=sha
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v2
- name: Build and push image
id: docker_build
uses: depot/build-push-action@v1
with:
context: ./vector/replay-capture/
file: ./vector/replay-capture/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
platforms: linux/arm64
cache-from: type=gha
cache-to: type=gha,mode=max
build-args: BIN=${{ matrix.image }}
- name: Container image digest
run: echo ${{ steps.docker_build.outputs.digest }}
- name: Trigger replay capture deployment
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.deployer.outputs.token }}
repository: PostHog/charts
event-type: commit_state_update
client-payload: |
{
"values": {
"image": {
"sha": "${{ steps.docker_build.outputs.digest }}"
}
},
"release": "replay-capture-vector",
"commit": ${{ toJson(github.event.head_commit) }},
"repository": ${{ toJson(github.repository) }},
"labels": []
}

View File

@ -105,7 +105,7 @@ services:
restart: on-failure
capture:
image: ghcr.io/posthog/capture:main
image: ghcr.io/posthog/posthog/capture:master
restart: on-failure
environment:
ADDRESS: '0.0.0.0:3000'
@ -113,6 +113,26 @@ services:
KAFKA_HOSTS: 'kafka:9092'
REDIS_URL: 'redis://redis:6379/'
replay-capture:
image: ghcr.io/posthog/posthog/replay-capture:master
build:
context: vector/replay-capture
restart: on-failure
entrypoint: ['sh', '-c']
command:
- |
set -x
# seed empty required data files
mkdir -p /etc/vector/data
echo "token" > /etc/vector/data/quota_limited_teams.csv
echo "session_id" > /etc/vector/data/overflow_sessions.csv
exec vector -v --watch-config
environment:
KAFKA_EVENTS_TOPIC: session_recording_snapshot_item_events
KAFKA_OVERFLOW_TOPIC: session_recording_snapshot_item_overflow
KAFKA_BOOSTRAP_SERVERS: 'kafka:9092'
REDIS_URL: 'redis://redis:6379/'
plugins:
command: ./bin/plugin-server --no-restart-loop
restart: on-failure

View File

@ -117,6 +117,17 @@ services:
- redis
- kafka
# Optional capture
replay-capture:
extends:
file: docker-compose.base.yml
service: replay-capture
ports:
- 3001:8000
depends_on:
- redis
- kafka
livestream:
extends:
file: docker-compose.base.yml

View File

@ -0,0 +1,12 @@
FROM alpine as config-builder
RUN apk add -U yq
WORKDIR /config
COPY vector.yaml .
# evaluate with yq, basically to expand anchors (which vector doesn't support)
RUN yq -i e 'explode(.)' vector.yaml
FROM timberio/vector:0.40.X-alpine
COPY --from=config-builder /config/vector.yaml /etc/vector/vector.yaml

View File

@ -0,0 +1,76 @@
tests:
- name: Basic Test
inputs:
- insert_at: quota_check
type: vrl
source: |
.message = [{}]
.message[0].properties = {}
.message[0].properties."$$session_id" = "123"
.message[0].properties."$$window_id" = "123"
.message[0].properties."token" = "123"
.message[0].properties."distinct_id" = "123"
.message[0].properties."$$snapshot_data" = [{"offset": 123}]
.ip = "0.0.0.0"
.timestamp = now()
."_" = "123456789"
%token = "123"
outputs:
- conditions:
- source: |
assert!(is_string(.uuid))
assert!(is_string(%headers.token))
assert!(is_string(parse_json!(.data).uuid))
assert!(parse_json!(.data).properties."$$snapshot_items"[0].offset == 123)
type: vrl
extract_from: overflow_check._unmatched
- name: Quota limited
inputs:
- insert_at: quota_check
type: vrl
source: |
.message = [{}]
.message[0].properties = {}
.message[0].properties."$$session_id" = "123"
.message[0].properties."$$window_id" = "123"
.message[0].properties."token" = "limited_token"
.message[0].properties."distinct_id" = "123"
.message[0].properties."$$snapshot_data" = [{"offset": 123}]
.ip = "0.0.0.0"
.timestamp = now()
."_" = "123456789"
%token = "limited_token"
outputs:
- conditions:
- source: |
true
type: vrl
extract_from: metric_quota_dropped
- name: Overflow
inputs:
- insert_at: quota_check
type: vrl
source: |
.message = [{}]
.message[0].properties = {}
.message[0].properties."$$session_id" = "overflow_session"
.message[0].properties."$$window_id" = "123"
.message[0].properties."token" = "123"
.message[0].properties."distinct_id" = "123"
.message[0].properties."$$snapshot_data" = [{"offset": 123}]
.ip = "0.0.0.0"
.timestamp = now()
."_" = "123456789"
%token = "123"
outputs:
- conditions:
- source: |
true
type: vrl
extract_from: overflow_check.overflow

View File

@ -0,0 +1,2 @@
session_id
overflow_session
1 session_id
2 overflow_session

View File

@ -0,0 +1,2 @@
token
limited_token
1 token
2 limited_token

View File

@ -0,0 +1,236 @@
acknowledgements:
enabled: true
api:
enabled: true
address: 0.0.0.0:8686
playground: true
enrichment_tables:
quota_limited_teams:
type: file
file:
path: '${QUOTA_LIMITED_TEAMS_PATH:-/etc/vector/data/quota_limited_teams.csv}'
encoding:
type: csv
schema:
token: string
overflow_sessions:
type: file
file:
path: '${OVERFLOW_SESSIONS_PATH:-/etc/vector/data/overflow_sessions.csv}'
encoding:
type: csv
schema:
token: string
sources:
capture_server:
type: http_server
address: 0.0.0.0:8000
strict_path: false
query_parameters:
- _
host_key: ip
decoding:
codec: vrl
vrl:
source: |
._message, err = decode_gzip(.message)
if err == null {
.message = parse_json!(del(._message))
} else {
# if we failed to decode gzip then ._message is empty
.message = parse_json!(.message)
del(._message)
}
.message[0].distinct_id = .message[0]."$$distinct_id" || .message[0].properties.distinct_id || .message[0].distinct_id
if is_integer(.message[0].distinct_id) {
.message[0].distinct_id, _ = to_string(.message[0].distinct_id)
}
%token = .message[0].properties.token || .message[0].api_key
if !is_string(%token) || !is_string(.message[0].distinct_id) {
for_each(array(.message)) -> |_index, value| {
del(value.properties."$$snapshot_data")
}
log(truncate(encode_json(.), 1000), rate_limit_secs: 0)
}
assert!(is_string(.message[0].distinct_id), "distinct_id is required")
assert!(is_string(.message[0].properties."$$session_id"), "$$session_id is required")
assert!(is_string(%token), "token is required")
transforms:
quota_check:
type: route
inputs:
- capture_server
route:
quota_limited:
type: vrl
source: |
_, err = get_enrichment_table_record("quota_limited_teams", { "token": %token })
err == null # err is not null if row not found, we want to drop where the row _is_ found
events_parsed:
type: remap
inputs:
- quota_check._unmatched
drop_on_abort: true
drop_on_error: true
reroute_dropped: true
source: |
event = {
"ip": .ip,
"uuid": uuid_v7(),
"distinct_id": .message[0].distinct_id,
"session_id": .message[0].properties."$$session_id",
"now": format_timestamp!(.timestamp, "%+", "UTC"),
"token": %token,
}
event.sent_at, err = from_unix_timestamp(to_int!(."_"), "milliseconds")
if err != null {
event.sent_at = event.now
} else {
event.sent_at = format_timestamp!(event.sent_at, "%+", "UTC")
}
snapshot_items = flatten(map_values(array!(.message)) -> |value| {
if is_array(value.properties."$$snapshot_data") {
array!(value.properties."$$snapshot_data")
} else {
[value.properties."$$snapshot_data"]
}
})
data = {
"uuid": event.uuid,
"event": "$$snapshot_items",
"properties": {
"distinct_id": event.distinct_id,
"$$session_id": .message[0].properties."$$session_id",
"$$window_id": .message[0].properties."$$window_id",
"$$snapshot_source": .message[0].properties."$$snapshot_source" || "web",
"$$snapshot_items": snapshot_items
}
}
event.data = encode_json(data)
. = event
%headers = {
"token": .token
}
overflow_check:
type: route
inputs:
- events_parsed
route:
overflow:
type: vrl
source: |
_, err = get_enrichment_table_record("overflow_sessions", { "session_id": .session_id })
err == null # err is not null if row not found, we want to drop where the row _is_ found
log_errors:
type: remap
inputs:
- events_parsed.dropped
source: |
log({
"event": "error",
"reason": "events_parsed.dropped",
"token": %token,
"session_id": .message[0].properties."$$session_id",
"distinct_id": .message[0].distinct_id
}, rate_limit_secs: 0)
metric_quota_dropped:
type: log_to_metric
inputs:
- quota_check.quota_limited
metrics:
- type: counter
field: message
kind: incremental
name: vector_capture_quota_dropped_count
tags:
token: '{{%token}}'
sinks:
# invalid sink to catch and raise errors
# without this vector drops them silently
error:
type: file
path: ''
encoding:
codec: json
acknowledgements:
enabled: true
inputs:
- log_errors
dropped:
type: blackhole
acknowledgements:
enabled: true
inputs:
- metric_quota_dropped
kafka:
type: kafka
inputs:
- overflow_check._unmatched
buffer:
- type: memory
max_events: 10000
when_full: block
bootstrap_servers: $KAFKA_BOOSTRAP_SERVERS
topic: $KAFKA_EVENTS_TOPIC
compression: gzip
key_field: .session_id
headers_key: '%headers'
tls:
enabled: false
encoding:
codec: json
librdkafka_options:
client.id: ${CLIENT_ID:-$HOSTNAME}
linger.ms: '0'
topic.metadata.refresh.interval.ms: '20000'
queue.buffering.max.kbytes: '1048576'
queue.buffering.max.messages: '100'
message.max.bytes: '64000000'
batch.size: '1600000'
batch.num.messages: '100'
sticky.partitioning.linger.ms: '25'
enable.idempotence: 'false'
max.in.flight.requests.per.connection: '1000000'
partitioner: 'consistent_random'
message_timeout_ms: 10000
socket_timeout_ms: 5000
kafka_overflow:
type: kafka
buffer:
- type: memory
max_events: 10000
when_full: block
bootstrap_servers: $KAFKA_BOOSTRAP_SERVERS
compression: gzip
key_field: .session_id
headers_key: '%headers'
tls:
enabled: false
encoding:
codec: json
librdkafka_options:
client.id: ${CLIENT_ID:-$HOSTNAME}
linger.ms: '0'
topic.metadata.refresh.interval.ms: '20000'
queue.buffering.max.kbytes: '1048576'
queue.buffering.max.messages: '100'
message.max.bytes: '64000000'
batch.size: '1600000'
batch.num.messages: '100'
sticky.partitioning.linger.ms: '25'
enable.idempotence: 'false'
max.in.flight.requests.per.connection: '1000000'
partitioner: 'consistent_random'
message_timeout_ms: 10000
socket_timeout_ms: 5000
inputs:
- overflow_check.overflow
topic: $KAFKA_OVERFLOW_TOPIC