diff --git a/livestream/kafka.go b/livestream/kafka.go index f8af88ea249..6d0929a2958 100644 --- a/livestream/kafka.go +++ b/livestream/kafka.go @@ -14,6 +14,7 @@ type PostHogEventWrapper struct { DistinctId string `json:"distinct_id"` Ip string `json:"ip"` Data string `json:"data"` + Token string `json:"token"` } type PostHogEvent struct { @@ -109,7 +110,9 @@ func (c *PostHogKafkaConsumer) Consume() { phEvent.Uuid = wrapperMessage.Uuid phEvent.DistinctId = wrapperMessage.DistinctId - if phEvent.Token == "" { + if wrapperMessage.Token != "" { + phEvent.Token = wrapperMessage.Token + } else if phEvent.Token == "" { if tokenValue, ok := phEvent.Properties["token"].(string); ok { phEvent.Token = tokenValue } else {