From 33d28034bb58d8e730ce2308d548f34132b42032 Mon Sep 17 00:00:00 2001 From: James Greenhill Date: Thu, 3 Oct 2024 09:31:50 -0700 Subject: [PATCH] chore: make the timestamp field optional (looks like it's coming in with a variety of types) (#25368) --- livestream/filter.go | 2 +- livestream/filter_test.go | 6 ++++-- livestream/kafka.go | 29 +++++++++++++++++++++++++---- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/livestream/filter.go b/livestream/filter.go index 9d1179ae31b..dbf6f5807b5 100644 --- a/livestream/filter.go +++ b/livestream/filter.go @@ -63,7 +63,7 @@ func convertToResponseGeoEvent(event PostHogEvent) *ResponseGeoEvent { func convertToResponsePostHogEvent(event PostHogEvent, teamId int) *ResponsePostHogEvent { return &ResponsePostHogEvent{ Uuid: event.Uuid, - Timestamp: event.Timestamp, + Timestamp: event.Timestamp.Value, DistinctId: event.DistinctId, PersonId: uuidFromDistinctId(teamId, event.DistinctId), Event: event.Event, diff --git a/livestream/filter_test.go b/livestream/filter_test.go index 4c4189aa10f..e08b8f9a2d8 100644 --- a/livestream/filter_test.go +++ b/livestream/filter_test.go @@ -64,9 +64,10 @@ func TestConvertToResponseGeoEvent(t *testing.T) { } func TestConvertToResponsePostHogEvent(t *testing.T) { + timestamp := "2023-01-01T00:00:00Z" event := PostHogEvent{ Uuid: "123", - Timestamp: "2023-01-01T00:00:00Z", + Timestamp: Timestamp{Value: timestamp}, DistinctId: "user1", Event: "pageview", Properties: map[string]interface{}{"url": "https://example.com"}, @@ -108,9 +109,10 @@ func TestFilterRun(t *testing.T) { time.Sleep(10 * time.Millisecond) // Test event filtering + timestamp := "2023-01-01T00:00:00Z" event := PostHogEvent{ Uuid: "123", - Timestamp: "2023-01-01T00:00:00Z", + Timestamp: Timestamp{Value: timestamp}, DistinctId: "user1", Token: "token1", Event: "pageview", diff --git a/livestream/kafka.go b/livestream/kafka.go index cd4704e8784..c3e1759fea0 100644 --- a/livestream/kafka.go +++ b/livestream/kafka.go @@ -16,11 +16,32 @@ type PostHogEventWrapper struct { Data string `json:"data"` } +type Timestamp struct { + Value string + Raw string +} + +func (t *Timestamp) UnmarshalJSON(data []byte) error { + t.Raw = string(data) + + var s string + if err := json.Unmarshal(data, &s); err == nil { + t.Value = s + return nil + } + + log.Printf("Unable to unmarshal timestamp to string. Raw value: %s", t.Raw) + // Set Default to empty string + t.Value = "" + + return nil +} + type PostHogEvent struct { Token string `json:"api_key,omitempty"` Event string `json:"event"` Properties map[string]interface{} `json:"properties"` - Timestamp string `json:"timestamp,omitempty"` + Timestamp Timestamp `json:"timestamp,omitempty"` Uuid string DistinctId string @@ -96,15 +117,15 @@ func (c *PostHogKafkaConsumer) Consume() { var phEvent PostHogEvent err = json.Unmarshal([]byte(wrapperMessage.Data), &phEvent) if err != nil { - sentry.CaptureException(err) log.Printf("Error decoding JSON: %v", err) + sentry.CaptureException(err) continue } phEvent.Uuid = wrapperMessage.Uuid phEvent.DistinctId = wrapperMessage.DistinctId - if phEvent.Timestamp == "" { - phEvent.Timestamp = time.Now().UTC().Format("2006-01-02T15:04:05.000Z") + if phEvent.Timestamp.Value == "" { + phEvent.Timestamp.Value = time.Now().UTC().Format("2006-01-02T15:04:05.000Z") } if phEvent.Token == "" { if tokenValue, ok := phEvent.Properties["token"].(string); ok {