From 37f7c8ec9ad12ac37816e292041a8bfc397f3d6e Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Wed, 19 Jun 2024 22:23:55 +0100 Subject: [PATCH] chore: add sentry to liveevents (#23057) * chore: add sentry to liveevents * Use `isProd` for `Debug` * Add a bunch of `CaptureException` calls * Properly bubble `getPGConn()` error * Fix `ParseIP` error handling * Remove unused `personFromDistinctId()` --------- Co-authored-by: Michael Matloka Co-authored-by: Michael Matloka --- livestream/configs.go | 5 ++- livestream/configs/configs.example.yml | 2 ++ livestream/db.go | 9 +++--- livestream/geoip.go | 13 ++++---- livestream/go.mod | 1 + livestream/go.sum | 10 ++++-- livestream/kafka.go | 10 +++++- livestream/main.go | 45 +++++++++++++++++++------- livestream/posthog.go | 26 ++++----------- 9 files changed, 75 insertions(+), 46 deletions(-) diff --git a/livestream/configs.go b/livestream/configs.go index 8aa6cce0329..6c879663270 100644 --- a/livestream/configs.go +++ b/livestream/configs.go @@ -2,9 +2,11 @@ package main import ( "fmt" + "log" "strings" "github.com/fsnotify/fsnotify" + "github.com/getsentry/sentry-go" "github.com/spf13/viper" ) @@ -17,7 +19,8 @@ func loadConfigs() { err := viper.ReadInConfig() if err != nil { - panic(fmt.Errorf("fatal error config file: %w", err)) + sentry.CaptureException(err) + log.Fatalf("fatal error config file: %w", err) } viper.OnConfigChange(func(e fsnotify.Event) { diff --git a/livestream/configs/configs.example.yml b/livestream/configs/configs.example.yml index fd978465ded..a302e1af574 100644 --- a/livestream/configs/configs.example.yml +++ b/livestream/configs/configs.example.yml @@ -1,4 +1,6 @@ prod: true +sentry: + dsn: 'david://cramer' kafka: brokers: 'localhost:9092' topic: '' diff --git a/livestream/db.go b/livestream/db.go index b23b7bb1b24..f66e5610d87 100644 --- a/livestream/db.go +++ b/livestream/db.go @@ -2,17 +2,18 @@ package main import ( "context" - "log" + "github.com/getsentry/sentry-go" "github.com/jackc/pgx/v5" "github.com/spf13/viper" ) -func getPGConn() *pgx.Conn { +func getPGConn() (*pgx.Conn, error) { url := viper.GetString("postgres.url") conn, err := pgx.Connect(context.Background(), url) if err != nil { - log.Panicf("Unable to connect to database: %v\n", err) + sentry.CaptureException(err) + return nil, err } - return conn + return conn, nil } diff --git a/livestream/geoip.go b/livestream/geoip.go index 88a8e40bfe2..8f026d335ad 100644 --- a/livestream/geoip.go +++ b/livestream/geoip.go @@ -1,7 +1,7 @@ package main import ( - "log" + "errors" "net" "github.com/oschwald/maxminddb-golang" @@ -14,7 +14,7 @@ type GeoLocator struct { func NewGeoLocator(dbPath string) (*GeoLocator, error) { db, err := maxminddb.Open(dbPath) if err != nil { - log.Fatal(err) + return nil, err } return &GeoLocator{ @@ -22,10 +22,10 @@ func NewGeoLocator(dbPath string) (*GeoLocator, error) { }, nil } -func (g *GeoLocator) Lookup(ipString string) (float64, float64) { +func (g *GeoLocator) Lookup(ipString string) (float64, float64, error) { ip := net.ParseIP(ipString) if ip == nil { - return 0, 0 + return 0, 0, errors.New("invalid IP address") } var record struct { @@ -37,8 +37,7 @@ func (g *GeoLocator) Lookup(ipString string) (float64, float64) { err := g.db.Lookup(ip, &record) if err != nil { - log.Panic(err) + return 0, 0, err } - - return record.Location.Latitude, record.Location.Longitude + return record.Location.Latitude, record.Location.Longitude, nil } diff --git a/livestream/go.mod b/livestream/go.mod index 3b482ba7c58..350f86ffd61 100644 --- a/livestream/go.mod +++ b/livestream/go.mod @@ -5,6 +5,7 @@ go 1.22.2 require ( github.com/confluentinc/confluent-kafka-go/v2 v2.4.0 github.com/fsnotify/fsnotify v1.7.0 + github.com/getsentry/sentry-go v0.28.1 github.com/gofrs/uuid/v5 v5.2.0 github.com/golang-jwt/jwt v3.2.2+incompatible github.com/hashicorp/golang-lru/v2 v2.0.7 diff --git a/livestream/go.sum b/livestream/go.sum index 9eec442d19b..445095455f0 100644 --- a/livestream/go.sum +++ b/livestream/go.sum @@ -100,6 +100,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fvbommel/sortorder v1.0.2 h1:mV4o8B2hKboCdkJm+a7uX/SIpZob4JzUpc5GGnM45eo= github.com/fvbommel/sortorder v1.0.2/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0= +github.com/getsentry/sentry-go v0.28.1 h1:zzaSm/vHmGllRM6Tpx1492r0YDzauArdBfkJRtY6P5k= +github.com/getsentry/sentry-go v0.28.1/go.mod h1:1fQZ+7l7eeJ3wYi82q5Hg8GqAPgefRq+FP/QhafYVgg= +github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA= +github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= @@ -178,8 +182,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= -github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg= +github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -253,6 +257,8 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= +github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= +github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/livestream/kafka.go b/livestream/kafka.go index 4988e78eea2..4aa19f50f76 100644 --- a/livestream/kafka.go +++ b/livestream/kafka.go @@ -6,6 +6,7 @@ import ( "time" "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/getsentry/sentry-go" ) type PostHogEventWrapper struct { @@ -61,12 +62,14 @@ func NewKafkaConsumer(brokers string, securityProtocol string, groupID string, t func (c *KafkaConsumer) Consume() { err := c.consumer.SubscribeTopics([]string{c.topic}, nil) if err != nil { + sentry.CaptureException(err) log.Fatalf("Failed to subscribe to topic: %v", err) } for { msg, err := c.consumer.ReadMessage(-1) if err != nil { + sentry.CaptureException(err) log.Printf("Error consuming message: %v", err) continue } @@ -74,6 +77,7 @@ func (c *KafkaConsumer) Consume() { var wrapperMessage PostHogEventWrapper err = json.Unmarshal(msg.Value, &wrapperMessage) if err != nil { + sentry.CaptureException(err) log.Printf("Error decoding JSON: %v", err) continue } @@ -81,6 +85,7 @@ func (c *KafkaConsumer) Consume() { var phEvent PostHogEvent err = json.Unmarshal([]byte(wrapperMessage.Data), &phEvent) if err != nil { + sentry.CaptureException(err) log.Printf("Error decoding JSON: %v", err) continue } @@ -110,7 +115,10 @@ func (c *KafkaConsumer) Consume() { } if ipStr != "" { - phEvent.Lat, phEvent.Lng = c.geolocator.Lookup(ipStr) + phEvent.Lat, phEvent.Lng, err = c.geolocator.Lookup(ipStr) + if err != nil { + sentry.CaptureException(err) + } } c.outgoingChan <- phEvent diff --git a/livestream/main.go b/livestream/main.go index 08b4cc850db..3c1fd3487d2 100644 --- a/livestream/main.go +++ b/livestream/main.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/getsentry/sentry-go" "github.com/hashicorp/golang-lru/v2/expirable" "github.com/labstack/echo/v4" "github.com/labstack/echo/v4/middleware" @@ -21,28 +22,46 @@ func main() { isProd := viper.GetBool("prod") + err := sentry.Init(sentry.ClientOptions{ + Dsn: viper.GetString("sentry.dsn"), + Debug: isProd, + AttachStacktrace: true, + }) + if err != nil { + sentry.CaptureException(err) + log.Fatalf("sentry.Init: %s", err) + } + // Flush buffered events before the program terminates. + // Set the timeout to the maximum duration the program can afford to wait. + defer sentry.Flush(2 * time.Second) + mmdb := viper.GetString("mmdb.path") if mmdb == "" { + sentry.CaptureException(errors.New("mmdb.path must be set")) log.Fatal("mmdb.path must be set") } + brokers := viper.GetString("kafka.brokers") + if brokers == "" { + sentry.CaptureException(errors.New("kafka.brokers must be set")) + log.Fatal("kafka.brokers must be set") + } + topic := viper.GetString("kafka.topic") + if topic == "" { + sentry.CaptureException(errors.New("kafka.topic must be set")) + log.Fatal("kafka.topic must be set") + } + groupID := viper.GetString("kafka.group_id") + if groupID == "" { + sentry.CaptureException(errors.New("kafka.group_id must be set")) + log.Fatal("kafka.group_id must be set") + } geolocator, err := NewGeoLocator(mmdb) if err != nil { + sentry.CaptureException(err) log.Fatalf("Failed to open MMDB: %v", err) } - brokers := viper.GetString("kafka.brokers") - if brokers == "" { - log.Fatal("kafka.brokers must be set") - } - - topic := viper.GetString("kafka.topic") - if topic == "" { - log.Fatal("kafka.topic must be set") - } - - groupID := viper.GetString("kafka.group_id") - teamStats := &TeamStats{ Store: make(map[string]*expirable.LRU[string, string]), } @@ -60,6 +79,7 @@ func main() { } consumer, err := NewKafkaConsumer(brokers, kafkaSecurityProtocol, groupID, topic, geolocator, phEventChan, statsChan) if err != nil { + sentry.CaptureException(err) log.Fatalf("Failed to create Kafka consumer: %v", err) } defer consumer.Close() @@ -208,6 +228,7 @@ func main() { case payload := <-subscription.EventChan: jsonData, err := json.Marshal(payload) if err != nil { + sentry.CaptureException(err) log.Println("Error marshalling payload", err) continue } diff --git a/livestream/posthog.go b/livestream/posthog.go index 53ce23103df..9ce3f4d6cb9 100644 --- a/livestream/posthog.go +++ b/livestream/posthog.go @@ -5,29 +5,17 @@ import ( ) func tokenFromTeamId(teamId int) (string, error) { - pgConn := getPGConn() + pgConn, pgConnErr := getPGConn() + if pgConnErr != nil { + return "", pgConnErr + } defer pgConn.Close(context.Background()) var token string - err := pgConn.QueryRow(context.Background(), "select api_token from posthog_team where id = $1;", teamId).Scan(&token) - - if err != nil { - return "", err + queryErr := pgConn.QueryRow(context.Background(), "select api_token from posthog_team where id = $1;", teamId).Scan(&token) + if queryErr != nil { + return "", queryErr } return token, nil } - -func personFromDistinctId(distinctId string) (int, error) { - pgConn := getPGConn() - defer pgConn.Close(context.Background()) - - var personId int - err := pgConn.QueryRow(context.Background(), "select person_id from posthog_persondistinctid where distinct_id = $1;", distinctId).Scan(&personId) - - if err != nil { - return 0, err - } - - return personId, nil -}