mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-27 16:26:50 +01:00
239 lines
5.7 KiB
Go
239 lines
5.7 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/getsentry/sentry-go"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/labstack/echo/v4/middleware"
|
|
"github.com/spf13/viper"
|
|
)
|
|
|
|
func main() {
|
|
loadConfigs()
|
|
|
|
isProd := viper.GetBool("prod")
|
|
|
|
err := sentry.Init(sentry.ClientOptions{
|
|
Dsn: viper.GetString("sentry.dsn"),
|
|
Debug: isProd,
|
|
AttachStacktrace: true,
|
|
})
|
|
if err != nil {
|
|
sentry.CaptureException(err)
|
|
log.Fatalf("sentry.Init: %s", err)
|
|
}
|
|
// Flush buffered events before the program terminates.
|
|
// Set the timeout to the maximum duration the program can afford to wait.
|
|
defer sentry.Flush(2 * time.Second)
|
|
|
|
mmdb := viper.GetString("mmdb.path")
|
|
if mmdb == "" {
|
|
sentry.CaptureException(errors.New("mmdb.path must be set"))
|
|
log.Fatal("mmdb.path must be set")
|
|
}
|
|
brokers := viper.GetString("kafka.brokers")
|
|
if brokers == "" {
|
|
sentry.CaptureException(errors.New("kafka.brokers must be set"))
|
|
log.Fatal("kafka.brokers must be set")
|
|
}
|
|
topic := viper.GetString("kafka.topic")
|
|
if topic == "" {
|
|
sentry.CaptureException(errors.New("kafka.topic must be set"))
|
|
log.Fatal("kafka.topic must be set")
|
|
}
|
|
groupID := viper.GetString("kafka.group_id")
|
|
if groupID == "" {
|
|
sentry.CaptureException(errors.New("kafka.group_id must be set"))
|
|
log.Fatal("kafka.group_id must be set")
|
|
}
|
|
|
|
geolocator, err := NewMaxMindGeoLocator(mmdb)
|
|
if err != nil {
|
|
sentry.CaptureException(err)
|
|
log.Fatalf("Failed to open MMDB: %v", err)
|
|
}
|
|
|
|
stats := newStatsKeeper()
|
|
|
|
phEventChan := make(chan PostHogEvent)
|
|
statsChan := make(chan PostHogEvent)
|
|
subChan := make(chan Subscription)
|
|
unSubChan := make(chan Subscription)
|
|
|
|
go stats.keepStats(statsChan)
|
|
|
|
kafkaSecurityProtocol := "SSL"
|
|
if !isProd {
|
|
kafkaSecurityProtocol = "PLAINTEXT"
|
|
}
|
|
consumer, err := NewPostHogKafkaConsumer(brokers, kafkaSecurityProtocol, groupID, topic, geolocator, phEventChan, statsChan)
|
|
if err != nil {
|
|
sentry.CaptureException(err)
|
|
log.Fatalf("Failed to create Kafka consumer: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
go consumer.Consume()
|
|
|
|
filter := NewFilter(subChan, unSubChan, phEventChan)
|
|
go filter.Run()
|
|
|
|
// Echo instance
|
|
e := echo.New()
|
|
|
|
// Middleware
|
|
e.Use(middleware.Logger())
|
|
e.Use(middleware.Recover())
|
|
e.Use(middleware.RequestID())
|
|
e.Use(middleware.GzipWithConfig(middleware.GzipConfig{
|
|
Level: 9, // Set compression level to maximum
|
|
}))
|
|
|
|
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
|
|
AllowOrigins: []string{"*"},
|
|
AllowMethods: []string{http.MethodGet, http.MethodHead},
|
|
}))
|
|
e.File("/", "./index.html")
|
|
|
|
// Routes
|
|
e.GET("/", index)
|
|
|
|
e.GET("/served", servedHandler(stats))
|
|
|
|
e.GET("/stats", statsHandler(stats))
|
|
|
|
e.GET("/events", func(c echo.Context) error {
|
|
e.Logger.Printf("SSE client connected, ip: %v", c.RealIP())
|
|
|
|
var teamId string
|
|
eventType := c.QueryParam("eventType")
|
|
distinctId := c.QueryParam("distinctId")
|
|
geo := c.QueryParam("geo")
|
|
|
|
teamIdInt := 0
|
|
token := ""
|
|
geoOnly := false
|
|
|
|
if strings.ToLower(geo) == "true" || geo == "1" {
|
|
geoOnly = true
|
|
} else {
|
|
teamId = ""
|
|
|
|
authHeader := c.Request().Header.Get("Authorization")
|
|
if authHeader == "" {
|
|
return errors.New("authorization header is required")
|
|
}
|
|
|
|
claims, err := decodeAuthToken(authHeader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
teamId = strconv.Itoa(int(claims["team_id"].(float64)))
|
|
token = fmt.Sprint(claims["api_token"])
|
|
|
|
if teamId == "" {
|
|
return errors.New("teamId is required unless geo=true")
|
|
}
|
|
}
|
|
|
|
eventTypes := []string{}
|
|
if eventType != "" {
|
|
eventTypes = strings.Split(eventType, ",")
|
|
}
|
|
|
|
subscription := Subscription{
|
|
TeamId: teamIdInt,
|
|
Token: token,
|
|
ClientId: c.Response().Header().Get(echo.HeaderXRequestID),
|
|
DistinctId: distinctId,
|
|
Geo: geoOnly,
|
|
EventTypes: eventTypes,
|
|
EventChan: make(chan interface{}, 100),
|
|
ShouldClose: &atomic.Bool{},
|
|
}
|
|
|
|
subChan <- subscription
|
|
|
|
w := c.Response()
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
|
|
for {
|
|
select {
|
|
case <-c.Request().Context().Done():
|
|
e.Logger.Printf("SSE client disconnected, ip: %v", c.RealIP())
|
|
filter.unSubChan <- subscription
|
|
subscription.ShouldClose.Store(true)
|
|
return nil
|
|
case payload := <-subscription.EventChan:
|
|
jsonData, err := json.Marshal(payload)
|
|
if err != nil {
|
|
sentry.CaptureException(err)
|
|
log.Println("Error marshalling payload", err)
|
|
continue
|
|
}
|
|
|
|
event := Event{
|
|
Data: jsonData,
|
|
}
|
|
if err := event.WriteTo(w); err != nil {
|
|
return err
|
|
}
|
|
w.Flush()
|
|
}
|
|
}
|
|
})
|
|
|
|
e.GET("/jwt", func(c echo.Context) error {
|
|
authHeader := c.Request().Header.Get("Authorization")
|
|
if authHeader == "" {
|
|
return errors.New("authorization header is required")
|
|
}
|
|
|
|
claims, err := decodeAuthToken(authHeader)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return c.JSON(http.StatusOK, claims)
|
|
})
|
|
|
|
e.GET("/sse", func(c echo.Context) error {
|
|
e.Logger.Printf("Map client connected, ip: %v", c.RealIP())
|
|
|
|
w := c.Response()
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
w.Header().Set("Connection", "keep-alive")
|
|
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-c.Request().Context().Done():
|
|
e.Logger.Printf("SSE client disconnected, ip: %v", c.RealIP())
|
|
return nil
|
|
case <-ticker.C:
|
|
event := Event{
|
|
Data: []byte("ping: " + time.Now().Format(time.RFC3339Nano)),
|
|
}
|
|
if err := event.WriteTo(w); err != nil {
|
|
return err
|
|
}
|
|
w.Flush()
|
|
}
|
|
}
|
|
})
|
|
|
|
e.Logger.Fatal(e.Start(":8080"))
|
|
}
|