mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 13:39:22 +01:00
chore: set defaults for message payload and remove custom unmarshaller (#25377)
This commit is contained in:
parent
67453258fe
commit
1d85df2b4c
@ -63,7 +63,7 @@ func convertToResponseGeoEvent(event PostHogEvent) *ResponseGeoEvent {
|
||||
func convertToResponsePostHogEvent(event PostHogEvent, teamId int) *ResponsePostHogEvent {
|
||||
return &ResponsePostHogEvent{
|
||||
Uuid: event.Uuid,
|
||||
Timestamp: event.Timestamp.Value,
|
||||
Timestamp: event.Timestamp,
|
||||
DistinctId: event.DistinctId,
|
||||
PersonId: uuidFromDistinctId(teamId, event.DistinctId),
|
||||
Event: event.Event,
|
||||
|
@ -67,7 +67,7 @@ func TestConvertToResponsePostHogEvent(t *testing.T) {
|
||||
timestamp := "2023-01-01T00:00:00Z"
|
||||
event := PostHogEvent{
|
||||
Uuid: "123",
|
||||
Timestamp: Timestamp{Value: timestamp},
|
||||
Timestamp: timestamp,
|
||||
DistinctId: "user1",
|
||||
Event: "pageview",
|
||||
Properties: map[string]interface{}{"url": "https://example.com"},
|
||||
@ -112,7 +112,7 @@ func TestFilterRun(t *testing.T) {
|
||||
timestamp := "2023-01-01T00:00:00Z"
|
||||
event := PostHogEvent{
|
||||
Uuid: "123",
|
||||
Timestamp: Timestamp{Value: timestamp},
|
||||
Timestamp: timestamp,
|
||||
DistinctId: "user1",
|
||||
Token: "token1",
|
||||
Event: "pageview",
|
||||
|
@ -16,32 +16,11 @@ type PostHogEventWrapper struct {
|
||||
Data string `json:"data"`
|
||||
}
|
||||
|
||||
type Timestamp struct {
|
||||
Value string
|
||||
Raw string
|
||||
}
|
||||
|
||||
func (t *Timestamp) UnmarshalJSON(data []byte) error {
|
||||
t.Raw = string(data)
|
||||
|
||||
var s string
|
||||
if err := json.Unmarshal(data, &s); err == nil {
|
||||
t.Value = s
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("Unable to unmarshal timestamp to string. Raw value: %s", t.Raw)
|
||||
// Set Default to empty string
|
||||
t.Value = ""
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type PostHogEvent struct {
|
||||
Token string `json:"api_key,omitempty"`
|
||||
Event string `json:"event"`
|
||||
Properties map[string]interface{} `json:"properties"`
|
||||
Timestamp Timestamp `json:"timestamp,omitempty"`
|
||||
Timestamp string `json:"timestamp,omitempty"`
|
||||
|
||||
Uuid string
|
||||
DistinctId string
|
||||
@ -101,32 +80,35 @@ func (c *PostHogKafkaConsumer) Consume() {
|
||||
for {
|
||||
msg, err := c.consumer.ReadMessage(-1)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Printf("Error consuming message: %v", err)
|
||||
continue
|
||||
sentry.CaptureException(err)
|
||||
}
|
||||
|
||||
var wrapperMessage PostHogEventWrapper
|
||||
err = json.Unmarshal(msg.Value, &wrapperMessage)
|
||||
if err != nil {
|
||||
sentry.CaptureException(err)
|
||||
log.Printf("Error decoding JSON: %v", err)
|
||||
continue
|
||||
log.Printf("Data: %s", string(msg.Value))
|
||||
}
|
||||
|
||||
var phEvent PostHogEvent
|
||||
err = json.Unmarshal([]byte(wrapperMessage.Data), &phEvent)
|
||||
phEvent := PostHogEvent{
|
||||
Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"),
|
||||
Token: "",
|
||||
Event: "",
|
||||
Properties: make(map[string]interface{}),
|
||||
}
|
||||
|
||||
data := []byte(wrapperMessage.Data)
|
||||
|
||||
err = json.Unmarshal(data, &phEvent)
|
||||
if err != nil {
|
||||
log.Printf("Error decoding JSON: %v", err)
|
||||
sentry.CaptureException(err)
|
||||
continue
|
||||
log.Printf("Data: %s", string(data))
|
||||
}
|
||||
|
||||
phEvent.Uuid = wrapperMessage.Uuid
|
||||
phEvent.DistinctId = wrapperMessage.DistinctId
|
||||
if phEvent.Timestamp.Value == "" {
|
||||
phEvent.Timestamp.Value = time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
|
||||
}
|
||||
|
||||
if phEvent.Token == "" {
|
||||
if tokenValue, ok := phEvent.Properties["token"].(string); ok {
|
||||
phEvent.Token = tokenValue
|
||||
|
Loading…
Reference in New Issue
Block a user