mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 13:39:22 +01:00
chore: add sentry to liveevents (#23057)
* chore: add sentry to liveevents * Use `isProd` for `Debug` * Add a bunch of `CaptureException` calls * Properly bubble `getPGConn()` error * Fix `ParseIP` error handling * Remove unused `personFromDistinctId()` --------- Co-authored-by: Michael Matloka <dev@twixes.com> Co-authored-by: Michael Matloka <michal@matloka.com>
This commit is contained in:
parent
5d761a1f68
commit
37f7c8ec9a
@ -2,9 +2,11 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
@ -17,7 +19,8 @@ func loadConfigs() {
|
||||
|
||||
err := viper.ReadInConfig()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("fatal error config file: %w", err))
|
||||
sentry.CaptureException(err)
|
||||
log.Fatalf("fatal error config file: %w", err)
|
||||
}
|
||||
|
||||
viper.OnConfigChange(func(e fsnotify.Event) {
|
||||
|
@ -1,4 +1,6 @@
|
||||
prod: true
|
||||
sentry:
|
||||
dsn: 'david://cramer'
|
||||
kafka:
|
||||
brokers: 'localhost:9092'
|
||||
topic: ''
|
||||
|
@ -2,17 +2,18 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
func getPGConn() *pgx.Conn {
|
||||
func getPGConn() (*pgx.Conn, error) {
|
||||
url := viper.GetString("postgres.url")
|
||||
conn, err := pgx.Connect(context.Background(), url)
|
||||
if err != nil {
|
||||
log.Panicf("Unable to connect to database: %v\n", err)
|
||||
sentry.CaptureException(err)
|
||||
return nil, err
|
||||
}
|
||||
return conn
|
||||
return conn, nil
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"errors"
|
||||
"net"
|
||||
|
||||
"github.com/oschwald/maxminddb-golang"
|
||||
@ -14,7 +14,7 @@ type GeoLocator struct {
|
||||
func NewGeoLocator(dbPath string) (*GeoLocator, error) {
|
||||
db, err := maxminddb.Open(dbPath)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &GeoLocator{
|
||||
@ -22,10 +22,10 @@ func NewGeoLocator(dbPath string) (*GeoLocator, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (g *GeoLocator) Lookup(ipString string) (float64, float64) {
|
||||
func (g *GeoLocator) Lookup(ipString string) (float64, float64, error) {
|
||||
ip := net.ParseIP(ipString)
|
||||
if ip == nil {
|
||||
return 0, 0
|
||||
return 0, 0, errors.New("invalid IP address")
|
||||
}
|
||||
|
||||
var record struct {
|
||||
@ -37,8 +37,7 @@ func (g *GeoLocator) Lookup(ipString string) (float64, float64) {
|
||||
|
||||
err := g.db.Lookup(ip, &record)
|
||||
if err != nil {
|
||||
log.Panic(err)
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
return record.Location.Latitude, record.Location.Longitude
|
||||
return record.Location.Latitude, record.Location.Longitude, nil
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ go 1.22.2
|
||||
require (
|
||||
github.com/confluentinc/confluent-kafka-go/v2 v2.4.0
|
||||
github.com/fsnotify/fsnotify v1.7.0
|
||||
github.com/getsentry/sentry-go v0.28.1
|
||||
github.com/gofrs/uuid/v5 v5.2.0
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7
|
||||
|
@ -100,6 +100,10 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos
|
||||
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
|
||||
github.com/fvbommel/sortorder v1.0.2 h1:mV4o8B2hKboCdkJm+a7uX/SIpZob4JzUpc5GGnM45eo=
|
||||
github.com/fvbommel/sortorder v1.0.2/go.mod h1:uk88iVf1ovNn1iLfgUVU2F9o5eO30ui720w+kxuqRs0=
|
||||
github.com/getsentry/sentry-go v0.28.1 h1:zzaSm/vHmGllRM6Tpx1492r0YDzauArdBfkJRtY6P5k=
|
||||
github.com/getsentry/sentry-go v0.28.1/go.mod h1:1fQZ+7l7eeJ3wYi82q5Hg8GqAPgefRq+FP/QhafYVgg=
|
||||
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
|
||||
github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
|
||||
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
|
||||
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
@ -178,8 +182,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
|
||||
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
|
||||
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
|
||||
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
@ -253,6 +257,8 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v
|
||||
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
|
||||
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
|
||||
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
|
||||
github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8=
|
||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
|
||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
|
||||
"github.com/getsentry/sentry-go"
|
||||
)
|
||||
|
||||
type PostHogEventWrapper struct {
|
||||
@ -61,12 +62,14 @@ func NewKafkaConsumer(brokers string, securityProtocol string, groupID string, t
|
||||
func (c *KafkaConsumer) Consume() {
|
||||
err := c.consumer.SubscribeTopics([]string{c.topic}, nil)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Fatalf("Failed to subscribe to topic: %v", err)
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := c.consumer.ReadMessage(-1)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Printf("Error consuming message: %v", err)
|
||||
continue
|
||||
}
|
||||
@ -74,6 +77,7 @@ func (c *KafkaConsumer) Consume() {
|
||||
var wrapperMessage PostHogEventWrapper
|
||||
err = json.Unmarshal(msg.Value, &wrapperMessage)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Printf("Error decoding JSON: %v", err)
|
||||
continue
|
||||
}
|
||||
@ -81,6 +85,7 @@ func (c *KafkaConsumer) Consume() {
|
||||
var phEvent PostHogEvent
|
||||
err = json.Unmarshal([]byte(wrapperMessage.Data), &phEvent)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Printf("Error decoding JSON: %v", err)
|
||||
continue
|
||||
}
|
||||
@ -110,7 +115,10 @@ func (c *KafkaConsumer) Consume() {
|
||||
}
|
||||
|
||||
if ipStr != "" {
|
||||
phEvent.Lat, phEvent.Lng = c.geolocator.Lookup(ipStr)
|
||||
phEvent.Lat, phEvent.Lng, err = c.geolocator.Lookup(ipStr)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
}
|
||||
|
||||
c.outgoingChan <- phEvent
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
"github.com/hashicorp/golang-lru/v2/expirable"
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/labstack/echo/v4/middleware"
|
||||
@ -21,28 +22,46 @@ func main() {
|
||||
|
||||
isProd := viper.GetBool("prod")
|
||||
|
||||
err := sentry.Init(sentry.ClientOptions{
|
||||
Dsn: viper.GetString("sentry.dsn"),
|
||||
Debug: isProd,
|
||||
AttachStacktrace: true,
|
||||
})
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Fatalf("sentry.Init: %s", err)
|
||||
}
|
||||
// Flush buffered events before the program terminates.
|
||||
// Set the timeout to the maximum duration the program can afford to wait.
|
||||
defer sentry.Flush(2 * time.Second)
|
||||
|
||||
mmdb := viper.GetString("mmdb.path")
|
||||
if mmdb == "" {
|
||||
sentry.CaptureException(errors.New("mmdb.path must be set"))
|
||||
log.Fatal("mmdb.path must be set")
|
||||
}
|
||||
brokers := viper.GetString("kafka.brokers")
|
||||
if brokers == "" {
|
||||
sentry.CaptureException(errors.New("kafka.brokers must be set"))
|
||||
log.Fatal("kafka.brokers must be set")
|
||||
}
|
||||
topic := viper.GetString("kafka.topic")
|
||||
if topic == "" {
|
||||
sentry.CaptureException(errors.New("kafka.topic must be set"))
|
||||
log.Fatal("kafka.topic must be set")
|
||||
}
|
||||
groupID := viper.GetString("kafka.group_id")
|
||||
if groupID == "" {
|
||||
sentry.CaptureException(errors.New("kafka.group_id must be set"))
|
||||
log.Fatal("kafka.group_id must be set")
|
||||
}
|
||||
|
||||
geolocator, err := NewGeoLocator(mmdb)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Fatalf("Failed to open MMDB: %v", err)
|
||||
}
|
||||
|
||||
brokers := viper.GetString("kafka.brokers")
|
||||
if brokers == "" {
|
||||
log.Fatal("kafka.brokers must be set")
|
||||
}
|
||||
|
||||
topic := viper.GetString("kafka.topic")
|
||||
if topic == "" {
|
||||
log.Fatal("kafka.topic must be set")
|
||||
}
|
||||
|
||||
groupID := viper.GetString("kafka.group_id")
|
||||
|
||||
teamStats := &TeamStats{
|
||||
Store: make(map[string]*expirable.LRU[string, string]),
|
||||
}
|
||||
@ -60,6 +79,7 @@ func main() {
|
||||
}
|
||||
consumer, err := NewKafkaConsumer(brokers, kafkaSecurityProtocol, groupID, topic, geolocator, phEventChan, statsChan)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Fatalf("Failed to create Kafka consumer: %v", err)
|
||||
}
|
||||
defer consumer.Close()
|
||||
@ -208,6 +228,7 @@ func main() {
|
||||
case payload := <-subscription.EventChan:
|
||||
jsonData, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Println("Error marshalling payload", err)
|
||||
continue
|
||||
}
|
||||
|
@ -5,29 +5,17 @@ import (
|
||||
)
|
||||
|
||||
func tokenFromTeamId(teamId int) (string, error) {
|
||||
pgConn := getPGConn()
|
||||
pgConn, pgConnErr := getPGConn()
|
||||
if pgConnErr != nil {
|
||||
return "", pgConnErr
|
||||
}
|
||||
defer pgConn.Close(context.Background())
|
||||
|
||||
var token string
|
||||
err := pgConn.QueryRow(context.Background(), "select api_token from posthog_team where id = $1;", teamId).Scan(&token)
|
||||
|
||||
if err != nil {
|
||||
return "", err
|
||||
queryErr := pgConn.QueryRow(context.Background(), "select api_token from posthog_team where id = $1;", teamId).Scan(&token)
|
||||
if queryErr != nil {
|
||||
return "", queryErr
|
||||
}
|
||||
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func personFromDistinctId(distinctId string) (int, error) {
|
||||
pgConn := getPGConn()
|
||||
defer pgConn.Close(context.Background())
|
||||
|
||||
var personId int
|
||||
err := pgConn.QueryRow(context.Background(), "select person_id from posthog_persondistinctid where distinct_id = $1;", distinctId).Scan(&personId)
|
||||
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return personId, nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user