0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 00:47:50 +01:00
posthog/livestream/filter.go

158 lines
3.6 KiB
Go
Raw Normal View History

chore: move livestream to posthog monorepo (#23044) * 🔥 initial commit * update readme * Update README.md * Update README.md * deploy scripts * very basic consumer setup * add some configs and docker-compose * formatting for testing * add tailscale * flip from dev to prod flag * set default to be not prod * default for group_id * tailscale up * update gitignore * basic geolocation * remove unused localServer * document mmdb * just make configs an example * drop raw print * add a start script (downloads the mmdb) * add readme and update configs.example * ts working * if in start * update start script * fix start * fix start * fix more * add sql endpoints for tokenId and Person lookups * work towards filter * sub channel * fix subChan * hardcode team2 token * add cors * only allow get and head * add atomicbool * add channel to kafka * add logs * verbose logs * make array * drop sub ptrs * more logs * helps to loop * drop some logigng * move sub branch * logging * drop log * hog * Deal with numeric distinct ids later * logs * api_key * send 1/1000 * remove log * remove more logs * change response payload * set timestamp if needed * fill in person_id if team_id is set * require teamid, convert to token * clean up subs on disconnect * log * check for token in another place * clean up subs on disconnect * drop modulo and log * fix no assign * don't reuse db conn for now * drop a log * add back commented out log * Don't block on send to client channel * add geo bool * only geo events * use wrapper ip * don't require team in geo mode * add an endpoint and stats keeper for teams * remove stats keeper * start stats keeper * wire it up * change the shape of the response * omit empty error * omit empty on the stats as well * enable logging on back pressure * add jwt endpoint for testing * support multiple event types * Get Auth Setup * jwt team is float so turn that into int * logs * add auth for stats endpoint * remove tailscale and use autoTLS on public endpoints * default to :443 for auto tls * remove un-needed endpoints and handlers * Use compression because... a lot of data (#9) * add dockerfile and CI/CD (#10) * add dockerfile and CI/CD * Use ubuntu not alpine couldn't build in alpine :'( * Add MMDB download to Dockerfile (#11) * Use clearer name for MMDB * Don't connect to Kafka over SSL in dev * Fix JWT token in example config * Add postgres.url to example config * Add expected scope * Fix const syntax * Put scope validation where claims are known * Fix audience validation * moves * ignore livestream for ci * main -> master * move GA to root * docker lint fix * fix typo * fixes for docker builds * test docker build * livestream build docker * dang * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * don't build posthog container when PR is pushed for rust or livestream * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * add a lot of paths-ignore * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * Dorny filters are handling most of what I was trying to do * remove tailscale to speed up builds * maybe? * push container to github.com/posthog/postog * don't build container on PR * remove more filters because dorny --------- Co-authored-by: Brett Hoerner <brett@bretthoerner.com> Co-authored-by: Zach Waterfield <zlwaterfield@gmail.com> Co-authored-by: Frank Hamand <frankhamand@gmail.com> Co-authored-by: Michael Matloka <michal@matloka.com> Co-authored-by: Neil Kakkar <neilkakkar@gmail.com>
2024-06-18 17:38:53 +02:00
package main
import (
"fmt"
"log"
"sync/atomic"
"github.com/gofrs/uuid/v5"
"golang.org/x/exp/slices"
)
type Subscription struct {
// Client
ClientId string
// Filters
TeamId int
Token string
DistinctId string
EventTypes []string
Geo bool
// Channels
EventChan chan interface{}
ShouldClose *atomic.Bool
}
type ResponsePostHogEvent struct {
Uuid string `json:"uuid"`
Timestamp string `json:"timestamp"`
DistinctId string `json:"distinct_id"`
PersonId string `json:"person_id"`
Event string `json:"event"`
Properties map[string]interface{} `json:"properties"`
}
type ResponseGeoEvent struct {
Lat float64 `json:"lat"`
Lng float64 `json:"lng"`
Count uint `json:"count"`
}
type Filter struct {
inboundChan chan PostHogEvent
subChan chan Subscription
unSubChan chan Subscription
subs []Subscription
}
func NewFilter(subChan chan Subscription, unSubChan chan Subscription, inboundChan chan PostHogEvent) *Filter {
return &Filter{subChan: subChan, unSubChan: unSubChan, inboundChan: inboundChan, subs: make([]Subscription, 0)}
}
func convertToResponseGeoEvent(event PostHogEvent) *ResponseGeoEvent {
return &ResponseGeoEvent{
Lat: event.Lat,
Lng: event.Lng,
Count: 1,
}
}
func convertToResponsePostHogEvent(event PostHogEvent, teamId int) *ResponsePostHogEvent {
return &ResponsePostHogEvent{
Uuid: event.Uuid,
Timestamp: event.Timestamp,
chore: move livestream to posthog monorepo (#23044) * 🔥 initial commit * update readme * Update README.md * Update README.md * deploy scripts * very basic consumer setup * add some configs and docker-compose * formatting for testing * add tailscale * flip from dev to prod flag * set default to be not prod * default for group_id * tailscale up * update gitignore * basic geolocation * remove unused localServer * document mmdb * just make configs an example * drop raw print * add a start script (downloads the mmdb) * add readme and update configs.example * ts working * if in start * update start script * fix start * fix start * fix more * add sql endpoints for tokenId and Person lookups * work towards filter * sub channel * fix subChan * hardcode team2 token * add cors * only allow get and head * add atomicbool * add channel to kafka * add logs * verbose logs * make array * drop sub ptrs * more logs * helps to loop * drop some logigng * move sub branch * logging * drop log * hog * Deal with numeric distinct ids later * logs * api_key * send 1/1000 * remove log * remove more logs * change response payload * set timestamp if needed * fill in person_id if team_id is set * require teamid, convert to token * clean up subs on disconnect * log * check for token in another place * clean up subs on disconnect * drop modulo and log * fix no assign * don't reuse db conn for now * drop a log * add back commented out log * Don't block on send to client channel * add geo bool * only geo events * use wrapper ip * don't require team in geo mode * add an endpoint and stats keeper for teams * remove stats keeper * start stats keeper * wire it up * change the shape of the response * omit empty error * omit empty on the stats as well * enable logging on back pressure * add jwt endpoint for testing * support multiple event types * Get Auth Setup * jwt team is float so turn that into int * logs * add auth for stats endpoint * remove tailscale and use autoTLS on public endpoints * default to :443 for auto tls * remove un-needed endpoints and handlers * Use compression because... a lot of data (#9) * add dockerfile and CI/CD (#10) * add dockerfile and CI/CD * Use ubuntu not alpine couldn't build in alpine :'( * Add MMDB download to Dockerfile (#11) * Use clearer name for MMDB * Don't connect to Kafka over SSL in dev * Fix JWT token in example config * Add postgres.url to example config * Add expected scope * Fix const syntax * Put scope validation where claims are known * Fix audience validation * moves * ignore livestream for ci * main -> master * move GA to root * docker lint fix * fix typo * fixes for docker builds * test docker build * livestream build docker * dang * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * don't build posthog container when PR is pushed for rust or livestream * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * add a lot of paths-ignore * Update .github/workflows/livestream-docker-image.yml Co-authored-by: Neil Kakkar <neilkakkar@gmail.com> * Dorny filters are handling most of what I was trying to do * remove tailscale to speed up builds * maybe? * push container to github.com/posthog/postog * don't build container on PR * remove more filters because dorny --------- Co-authored-by: Brett Hoerner <brett@bretthoerner.com> Co-authored-by: Zach Waterfield <zlwaterfield@gmail.com> Co-authored-by: Frank Hamand <frankhamand@gmail.com> Co-authored-by: Michael Matloka <michal@matloka.com> Co-authored-by: Neil Kakkar <neilkakkar@gmail.com>
2024-06-18 17:38:53 +02:00
DistinctId: event.DistinctId,
PersonId: uuidFromDistinctId(teamId, event.DistinctId),
Event: event.Event,
Properties: event.Properties,
}
}
var personUUIDV5Namespace *uuid.UUID
func uuidFromDistinctId(teamId int, distinctId string) string {
if teamId == 0 || distinctId == "" {
return ""
}
if personUUIDV5Namespace == nil {
uuid, _ := uuid.FromString("932979b4-65c3-4424-8467-0b66ec27bc22")
personUUIDV5Namespace = &uuid
}
input := fmt.Sprintf("%d:%s", teamId, distinctId)
return uuid.NewV5(*personUUIDV5Namespace, input).String()
}
func removeSubscription(clientId string, subs []Subscription) []Subscription {
var lighterSubs []Subscription
for i, sub := range subs {
if clientId == sub.ClientId {
lighterSubs = slices.Delete(subs, i, i+1)
}
}
return lighterSubs
}
func (c *Filter) Run() {
for {
select {
case newSub := <-c.subChan:
c.subs = append(c.subs, newSub)
case unSub := <-c.unSubChan:
c.subs = removeSubscription(unSub.ClientId, c.subs)
case event := <-c.inboundChan:
var responseEvent *ResponsePostHogEvent
var responseGeoEvent *ResponseGeoEvent
for _, sub := range c.subs {
if sub.ShouldClose.Load() {
log.Println("User has unsubscribed, but not been removed from the slice of subs")
continue
}
// log.Printf("event.Token: %s, sub.Token: %s", event.Token, sub.Token)
if sub.Token != "" && event.Token != sub.Token {
continue
}
if sub.DistinctId != "" && event.DistinctId != sub.DistinctId {
continue
}
if len(sub.EventTypes) > 0 && !slices.Contains(sub.EventTypes, event.Event) {
continue
}
if sub.Geo {
if event.Lat != 0.0 {
if responseGeoEvent == nil {
responseGeoEvent = convertToResponseGeoEvent(event)
}
select {
case sub.EventChan <- *responseGeoEvent:
default:
// Don't block
}
}
} else {
if responseEvent == nil {
responseEvent = convertToResponsePostHogEvent(event, sub.TeamId)
}
select {
case sub.EventChan <- *responseEvent:
default:
// Don't block
}
}
}
}
}
}