diff --git a/livestream/live_stats.go b/livestream/live_stats.go index 00d3acfa19d..5bf214f90b4 100644 --- a/livestream/live_stats.go +++ b/livestream/live_stats.go @@ -8,7 +8,7 @@ import ( ) const ( - COUNTER_TTL = 60 + COUNTER_TTL = time.Second * 60 ) type Stats struct { @@ -20,7 +20,7 @@ type Stats struct { func newStatsKeeper() *Stats { return &Stats{ Store: make(map[string]*expirable.LRU[string, string]), - GlobalStore: expirable.NewLRU[string, string](0, nil, time.Second*COUNTER_TTL), + GlobalStore: expirable.NewLRU[string, string](0, nil, COUNTER_TTL), Counter: NewSlidingWindowCounter(COUNTER_TTL), } } @@ -28,16 +28,13 @@ func newStatsKeeper() *Stats { func (ts *Stats) keepStats(statsChan chan PostHogEvent) { log.Println("starting stats keeper...") - for { // ignore the range warning here - it's wrong - select { - case event := <-statsChan: - ts.Counter.Increment() - token := event.Token - if _, ok := ts.Store[token]; !ok { - ts.Store[token] = expirable.NewLRU[string, string](0, nil, time.Second*COUNTER_TTL) - } - ts.Store[token].Add(event.DistinctId, "1") - ts.GlobalStore.Add(event.DistinctId, "1") + for event := range statsChan { + ts.Counter.Increment() + token := event.Token + if _, ok := ts.Store[token]; !ok { + ts.Store[token] = expirable.NewLRU[string, string](0, nil, COUNTER_TTL) } + ts.Store[token].Add(event.DistinctId, "1") + ts.GlobalStore.Add(event.DistinctId, "1") } } diff --git a/livestream/served.go b/livestream/served.go index 1c1cc4c0396..dc9c89b9677 100644 --- a/livestream/served.go +++ b/livestream/served.go @@ -10,8 +10,8 @@ import ( ) type Counter struct { - EventCount uint32 - UserCount uint32 + EventCount int + UserCount int } func servedHandler(stats *Stats) func(c echo.Context) error { @@ -19,8 +19,8 @@ func servedHandler(stats *Stats) func(c echo.Context) error { userCount := stats.GlobalStore.Len() count := stats.Counter.Count() resp := Counter{ - EventCount: uint32(count), - UserCount: uint32(userCount), + EventCount: count, + UserCount: userCount, } return c.JSON(http.StatusOK, resp) } diff --git a/livestream/ttl_counter.go b/livestream/ttl_counter.go index 4be95d9b20a..635ce9b6f28 100644 --- a/livestream/ttl_counter.go +++ b/livestream/ttl_counter.go @@ -12,10 +12,24 @@ type SlidingWindowCounter struct { } func NewSlidingWindowCounter(windowSize time.Duration) *SlidingWindowCounter { - return &SlidingWindowCounter{ + swc := &SlidingWindowCounter{ events: make([]time.Time, 0), windowSize: windowSize, } + + // Start a goroutine to periodically remove old events + go func() { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + for range ticker.C { + swc.mu.Lock() + swc.removeOldEvents(time.Now()) + swc.mu.Unlock() + } + }() + + return swc } func (swc *SlidingWindowCounter) Increment() { @@ -24,7 +38,6 @@ func (swc *SlidingWindowCounter) Increment() { now := time.Now() swc.events = append(swc.events, now) - swc.removeOldEvents(now) } func (swc *SlidingWindowCounter) Count() int {