0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-22 08:40:03 +01:00
posthog/ee/models/hook.py
2024-08-15 10:46:47 +01:00

48 lines
1.7 KiB
Python

import json
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models.signals import post_delete, post_save
from django.dispatch.dispatcher import receiver
from posthog.models.signals import mutable_receiver
from posthog.models.utils import generate_random_token
from posthog.redis import get_client
HOOK_EVENTS = ["action_performed"]
class Hook(models.Model):
id = models.CharField(primary_key=True, max_length=50, default=generate_random_token)
user = models.ForeignKey("posthog.User", related_name="rest_hooks", on_delete=models.CASCADE)
team = models.ForeignKey("posthog.Team", related_name="rest_hooks", on_delete=models.CASCADE)
event = models.CharField("Event", max_length=64, db_index=True)
resource_id = models.IntegerField(null=True, blank=True)
target = models.URLField("Target URL", max_length=255)
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
def clean(self):
"""Validation for events."""
if self.event not in HOOK_EVENTS:
raise ValidationError("Invalid hook event {evt}.".format(evt=self.event))
@receiver(post_save, sender=Hook)
def hook_saved(sender, instance: Hook, created, **kwargs):
if instance.event == "action_performed":
get_client().publish(
"reload-action",
json.dumps({"teamId": instance.team_id, "actionId": instance.resource_id}),
)
@mutable_receiver(post_delete, sender=Hook)
def hook_deleted(sender, instance: Hook, **kwargs):
if instance.event == "action_performed":
get_client().publish(
"drop-action",
json.dumps({"teamId": instance.team_id, "actionId": instance.resource_id}),
)