diff --git a/livestream/filter.go b/livestream/filter.go index dbf6f5807b5..9d1179ae31b 100644 --- a/livestream/filter.go +++ b/livestream/filter.go @@ -63,7 +63,7 @@ func convertToResponseGeoEvent(event PostHogEvent) *ResponseGeoEvent { func convertToResponsePostHogEvent(event PostHogEvent, teamId int) *ResponsePostHogEvent { return &ResponsePostHogEvent{ Uuid: event.Uuid, - Timestamp: event.Timestamp.Value, + Timestamp: event.Timestamp, DistinctId: event.DistinctId, PersonId: uuidFromDistinctId(teamId, event.DistinctId), Event: event.Event, diff --git a/livestream/filter_test.go b/livestream/filter_test.go index e08b8f9a2d8..76fc59bccc3 100644 --- a/livestream/filter_test.go +++ b/livestream/filter_test.go @@ -67,7 +67,7 @@ func TestConvertToResponsePostHogEvent(t *testing.T) { timestamp := "2023-01-01T00:00:00Z" event := PostHogEvent{ Uuid: "123", - Timestamp: Timestamp{Value: timestamp}, + Timestamp: timestamp, DistinctId: "user1", Event: "pageview", Properties: map[string]interface{}{"url": "https://example.com"}, @@ -112,7 +112,7 @@ func TestFilterRun(t *testing.T) { timestamp := "2023-01-01T00:00:00Z" event := PostHogEvent{ Uuid: "123", - Timestamp: Timestamp{Value: timestamp}, + Timestamp: timestamp, DistinctId: "user1", Token: "token1", Event: "pageview", diff --git a/livestream/kafka.go b/livestream/kafka.go index c3e1759fea0..71a00094f77 100644 --- a/livestream/kafka.go +++ b/livestream/kafka.go @@ -16,32 +16,11 @@ type PostHogEventWrapper struct { Data string `json:"data"` } -type Timestamp struct { - Value string - Raw string -} - -func (t *Timestamp) UnmarshalJSON(data []byte) error { - t.Raw = string(data) - - var s string - if err := json.Unmarshal(data, &s); err == nil { - t.Value = s - return nil - } - - log.Printf("Unable to unmarshal timestamp to string. Raw value: %s", t.Raw) - // Set Default to empty string - t.Value = "" - - return nil -} - type PostHogEvent struct { Token string `json:"api_key,omitempty"` Event string `json:"event"` Properties map[string]interface{} `json:"properties"` - Timestamp Timestamp `json:"timestamp,omitempty"` + Timestamp string `json:"timestamp,omitempty"` Uuid string DistinctId string @@ -101,32 +80,35 @@ func (c *PostHogKafkaConsumer) Consume() { for { msg, err := c.consumer.ReadMessage(-1) if err != nil { - sentry.CaptureException(err) log.Printf("Error consuming message: %v", err) - continue + sentry.CaptureException(err) } var wrapperMessage PostHogEventWrapper err = json.Unmarshal(msg.Value, &wrapperMessage) if err != nil { - sentry.CaptureException(err) log.Printf("Error decoding JSON: %v", err) - continue + log.Printf("Data: %s", string(msg.Value)) } - var phEvent PostHogEvent - err = json.Unmarshal([]byte(wrapperMessage.Data), &phEvent) + phEvent := PostHogEvent{ + Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), + Token: "", + Event: "", + Properties: make(map[string]interface{}), + } + + data := []byte(wrapperMessage.Data) + + err = json.Unmarshal(data, &phEvent) if err != nil { log.Printf("Error decoding JSON: %v", err) - sentry.CaptureException(err) - continue + log.Printf("Data: %s", string(data)) } phEvent.Uuid = wrapperMessage.Uuid phEvent.DistinctId = wrapperMessage.DistinctId - if phEvent.Timestamp.Value == "" { - phEvent.Timestamp.Value = time.Now().UTC().Format("2006-01-02T15:04:05.000Z") - } + if phEvent.Token == "" { if tokenValue, ok := phEvent.Properties["token"].(string); ok { phEvent.Token = tokenValue