0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-24 18:07:17 +01:00

fix: Fix TTL to be time.Second based (#24383)

This commit is contained in:
James Greenhill 2024-08-14 15:58:36 -07:00 committed by GitHub
parent 904c75050e
commit 190ecf82c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 28 additions and 18 deletions

View File

@ -8,7 +8,7 @@ import (
) )
const ( const (
COUNTER_TTL = 60 COUNTER_TTL = time.Second * 60
) )
type Stats struct { type Stats struct {
@ -20,7 +20,7 @@ type Stats struct {
func newStatsKeeper() *Stats { func newStatsKeeper() *Stats {
return &Stats{ return &Stats{
Store: make(map[string]*expirable.LRU[string, string]), Store: make(map[string]*expirable.LRU[string, string]),
GlobalStore: expirable.NewLRU[string, string](0, nil, time.Second*COUNTER_TTL), GlobalStore: expirable.NewLRU[string, string](0, nil, COUNTER_TTL),
Counter: NewSlidingWindowCounter(COUNTER_TTL), Counter: NewSlidingWindowCounter(COUNTER_TTL),
} }
} }
@ -28,16 +28,13 @@ func newStatsKeeper() *Stats {
func (ts *Stats) keepStats(statsChan chan PostHogEvent) { func (ts *Stats) keepStats(statsChan chan PostHogEvent) {
log.Println("starting stats keeper...") log.Println("starting stats keeper...")
for { // ignore the range warning here - it's wrong for event := range statsChan {
select {
case event := <-statsChan:
ts.Counter.Increment() ts.Counter.Increment()
token := event.Token token := event.Token
if _, ok := ts.Store[token]; !ok { if _, ok := ts.Store[token]; !ok {
ts.Store[token] = expirable.NewLRU[string, string](0, nil, time.Second*COUNTER_TTL) ts.Store[token] = expirable.NewLRU[string, string](0, nil, COUNTER_TTL)
} }
ts.Store[token].Add(event.DistinctId, "1") ts.Store[token].Add(event.DistinctId, "1")
ts.GlobalStore.Add(event.DistinctId, "1") ts.GlobalStore.Add(event.DistinctId, "1")
} }
} }
}

View File

@ -10,8 +10,8 @@ import (
) )
type Counter struct { type Counter struct {
EventCount uint32 EventCount int
UserCount uint32 UserCount int
} }
func servedHandler(stats *Stats) func(c echo.Context) error { func servedHandler(stats *Stats) func(c echo.Context) error {
@ -19,8 +19,8 @@ func servedHandler(stats *Stats) func(c echo.Context) error {
userCount := stats.GlobalStore.Len() userCount := stats.GlobalStore.Len()
count := stats.Counter.Count() count := stats.Counter.Count()
resp := Counter{ resp := Counter{
EventCount: uint32(count), EventCount: count,
UserCount: uint32(userCount), UserCount: userCount,
} }
return c.JSON(http.StatusOK, resp) return c.JSON(http.StatusOK, resp)
} }

View File

@ -12,10 +12,24 @@ type SlidingWindowCounter struct {
} }
func NewSlidingWindowCounter(windowSize time.Duration) *SlidingWindowCounter { func NewSlidingWindowCounter(windowSize time.Duration) *SlidingWindowCounter {
return &SlidingWindowCounter{ swc := &SlidingWindowCounter{
events: make([]time.Time, 0), events: make([]time.Time, 0),
windowSize: windowSize, windowSize: windowSize,
} }
// Start a goroutine to periodically remove old events
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
swc.mu.Lock()
swc.removeOldEvents(time.Now())
swc.mu.Unlock()
}
}()
return swc
} }
func (swc *SlidingWindowCounter) Increment() { func (swc *SlidingWindowCounter) Increment() {
@ -24,7 +38,6 @@ func (swc *SlidingWindowCounter) Increment() {
now := time.Now() now := time.Now()
swc.events = append(swc.events, now) swc.events = append(swc.events, now)
swc.removeOldEvents(now)
} }
func (swc *SlidingWindowCounter) Count() int { func (swc *SlidingWindowCounter) Count() int {