diff --git a/.run/Celery.run.xml b/.run/Celery.run.xml
index d56e8fe2bc6..91b0cf2c2ea 100644
--- a/.run/Celery.run.xml
+++ b/.run/Celery.run.xml
@@ -18,7 +18,7 @@
-
+
@@ -26,4 +26,4 @@
-
+
\ No newline at end of file
diff --git a/ee/tasks/subscriptions/subscription_utils.py b/ee/tasks/subscriptions/subscription_utils.py
index 4b750900237..3d83a8b2e62 100644
--- a/ee/tasks/subscriptions/subscription_utils.py
+++ b/ee/tasks/subscriptions/subscription_utils.py
@@ -8,7 +8,7 @@ from posthog.models.dashboard_tile import get_tiles_ordered_by_position
from posthog.models.exported_asset import ExportedAsset
from posthog.models.insight import Insight
from posthog.models.subscription import Subscription
-from posthog.tasks.exports.insight_exporter import export_insight
+from posthog.tasks import exporter
logger = structlog.get_logger(__name__)
@@ -40,7 +40,7 @@ def generate_assets(
ExportedAsset.objects.bulk_create(assets)
# Wait for all assets to be exported
- tasks = [export_insight.s(asset.id) for asset in assets]
+ tasks = [exporter.export_asset.s(asset.id) for asset in assets]
parallel_job = group(tasks).apply_async()
max_wait = 30
diff --git a/ee/tasks/test/subscriptions/test_subscriptions_utils.py b/ee/tasks/test/subscriptions/test_subscriptions_utils.py
index 5427f36d898..afb5838be56 100644
--- a/ee/tasks/test/subscriptions/test_subscriptions_utils.py
+++ b/ee/tasks/test/subscriptions/test_subscriptions_utils.py
@@ -13,7 +13,7 @@ from posthog.test.base import APIBaseTest
@patch("ee.tasks.subscriptions.subscription_utils.group")
-@patch("ee.tasks.subscriptions.subscription_utils.export_insight")
+@patch("ee.tasks.subscriptions.subscription_utils.exporter.export_asset")
class TestSubscriptionsTasksUtils(APIBaseTest):
dashboard: Dashboard
insight: Insight
diff --git a/posthog/api/exports.py b/posthog/api/exports.py
index 9a37b63afed..010dd6a4dec 100644
--- a/posthog/api/exports.py
+++ b/posthog/api/exports.py
@@ -11,7 +11,6 @@ from rest_framework.decorators import action
from rest_framework.exceptions import ValidationError
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
-from statshog.defaults.django import statsd
from posthog.api.documentation import PropertiesSerializer, extend_schema
from posthog.api.routing import StructuredViewSetMixin
@@ -22,7 +21,7 @@ from posthog.models.activity_logging.activity_log import Change, Detail, log_act
from posthog.models.event.query_event_list import parse_order_by
from posthog.models.exported_asset import ExportedAsset, get_content_response
from posthog.permissions import ProjectMembershipNecessaryPermissions, TeamMemberAccessPermission
-from posthog.tasks.exports import csv_exporter, insight_exporter
+from posthog.tasks import exporter
logger = structlog.get_logger(__name__)
@@ -96,19 +95,15 @@ class ExportedAssetSerializer(serializers.ModelSerializer):
}
instance: ExportedAsset = super().create(validated_data)
- if is_csv_export:
- task = csv_exporter.export_csv.delay(instance.id)
- statsd.incr("csv_exporter.queued", tags={"team_id": self.context["team_id"]})
- else:
- task = insight_exporter.export_insight.delay(instance.id)
- statsd.incr("insight_exporter.queued", tags={"team_id": self.context["team_id"]})
+ task = exporter.export_asset.delay(instance.id)
try:
task.get(timeout=10)
instance.refresh_from_db()
except celery.exceptions.TimeoutError:
# If the rendering times out - fine, the frontend will poll instead for the response
pass
- except NotImplementedError:
+ except NotImplementedError as ex:
+ logger.error("exporters.unsupported_export_type", exception=ex)
raise serializers.ValidationError(
{"export_format": ["This type of export is not supported for this resource."]}
)
diff --git a/posthog/api/test/test_exports.py b/posthog/api/test/test_exports.py
index b8f0261e90c..2fb7ce7241d 100644
--- a/posthog/api/test/test_exports.py
+++ b/posthog/api/test/test_exports.py
@@ -19,7 +19,7 @@ from posthog.settings import (
OBJECT_STORAGE_ENDPOINT,
OBJECT_STORAGE_SECRET_ACCESS_KEY,
)
-from posthog.tasks.exports.csv_exporter import export_csv
+from posthog.tasks import exporter
from posthog.test.base import APIBaseTest, _create_event, flush_persons_and_events
TEST_ROOT_BUCKET = "test_exports"
@@ -59,7 +59,7 @@ class TestExports(APIBaseTest):
team=cls.team, dashboard_id=cls.dashboard.id, export_format="image/png"
)
- @patch("posthog.api.exports.insight_exporter")
+ @patch("posthog.api.exports.exporter")
def test_can_create_new_valid_export_dashboard(self, mock_exporter_task) -> None:
response = self.client.post(
f"/api/projects/{self.team.id}/exports", {"export_format": "image/png", "dashboard": self.dashboard.id}
@@ -79,9 +79,9 @@ class TestExports(APIBaseTest):
},
)
- mock_exporter_task.export_insight.delay.assert_called_once_with(data["id"])
+ mock_exporter_task.export_asset.delay.assert_called_once_with(data["id"])
- @patch("posthog.api.exports.insight_exporter")
+ @patch("posthog.api.exports.exporter")
@freeze_time("2021-08-25T22:09:14.252Z")
def test_can_create_new_valid_export_insight(self, mock_exporter_task) -> None:
response = self.client.post(
@@ -129,7 +129,7 @@ class TestExports(APIBaseTest):
],
)
- mock_exporter_task.export_insight.delay.assert_called_once_with(data["id"])
+ mock_exporter_task.export_asset.delay.assert_called_once_with(data["id"])
def test_errors_if_missing_related_instance(self) -> None:
response = self.client.post(f"/api/projects/{self.team.id}/exports", {"export_format": "image/png"})
@@ -157,19 +157,17 @@ class TestExports(APIBaseTest):
},
)
- @patch("posthog.api.exports.insight_exporter")
+ @patch("posthog.api.exports.exporter")
def test_will_respond_even_if_task_timesout(self, mock_exporter_task) -> None:
- mock_exporter_task.export_insight.delay.return_value.get.side_effect = celery.exceptions.TimeoutError(
- "timed out"
- )
+ mock_exporter_task.export_asset.delay.return_value.get.side_effect = celery.exceptions.TimeoutError("timed out")
response = self.client.post(
f"/api/projects/{self.team.id}/exports", {"export_format": "application/pdf", "insight": self.insight.id}
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
- @patch("posthog.api.exports.insight_exporter")
+ @patch("posthog.api.exports.exporter")
def test_will_error_if_export_unsupported(self, mock_exporter_task) -> None:
- mock_exporter_task.export_insight.delay.return_value.get.side_effect = NotImplementedError("not implemented")
+ mock_exporter_task.export_asset.delay.return_value.get.side_effect = NotImplementedError("not implemented")
response = self.client.post(
f"/api/projects/{self.team.id}/exports", {"export_format": "application/pdf", "insight": self.insight.id}
)
@@ -252,7 +250,7 @@ class TestExports(APIBaseTest):
},
)
- @patch("posthog.api.exports.csv_exporter")
+ @patch("posthog.api.exports.exporter")
@freeze_time("2021-08-25T22:09:14.252Z")
def test_can_create_new_valid_export_csv(self, mock_exporter_task) -> None:
response = self.client.post(
@@ -274,7 +272,7 @@ class TestExports(APIBaseTest):
self.assertEqual(exported_instance["export_context"]["file_export_type"], "list_events")
- mock_exporter_task.export_csv.delay.assert_called_once_with(exported_instance["id"])
+ mock_exporter_task.export_asset.delay.assert_called_once_with(exported_instance["id"])
def test_can_download_a_csv(self) -> None:
_create_event(
@@ -299,7 +297,7 @@ class TestExports(APIBaseTest):
},
)
# pass the root in because django/celery refused to override it otherwise
- export_csv(instance.id, TEST_ROOT_BUCKET)
+ exporter.export_asset(instance.id, TEST_ROOT_BUCKET)
response: Optional[HttpResponse] = None
attempt_count = 0
diff --git a/posthog/tasks/__init__.py b/posthog/tasks/__init__.py
index 311cc61c47c..cb17691259b 100644
--- a/posthog/tasks/__init__.py
+++ b/posthog/tasks/__init__.py
@@ -1,5 +1,4 @@
# Make tasks ready for celery autoimport
-from posthog.tasks.exports import csv_exporter, insight_exporter
from . import (
async_migrations,
@@ -8,6 +7,7 @@ from . import (
check_clickhouse_schema_drift,
delete_clickhouse_data,
email,
+ exporter,
split_person,
status_report,
sync_all_organization_available_features,
@@ -22,8 +22,7 @@ __all__ = [
"check_clickhouse_schema_drift",
"delete_clickhouse_data",
"email",
- "csv_exporter",
- "insight_exporter",
+ "exporter",
"split_person",
"status_report",
"sync_all_organization_available_features",
diff --git a/posthog/tasks/exporter.py b/posthog/tasks/exporter.py
new file mode 100644
index 00000000000..80cffa8986c
--- /dev/null
+++ b/posthog/tasks/exporter.py
@@ -0,0 +1,20 @@
+from posthog import settings
+from posthog.celery import app
+from posthog.models import ExportedAsset
+
+
+@app.task()
+def export_asset(exported_asset_id: int, storage_root_bucket: str = settings.OBJECT_STORAGE_EXPORTS_FOLDER) -> None:
+ from statshog.defaults.django import statsd
+
+ from posthog.tasks.exports import csv_exporter, insight_exporter
+
+ exported_asset = ExportedAsset.objects.select_related("insight", "dashboard").get(pk=exported_asset_id)
+
+ is_csv_export = exported_asset.export_format == ExportedAsset.ExportFormat.CSV
+ if is_csv_export:
+ csv_exporter.export_csv(exported_asset, storage_root_bucket)
+ statsd.incr("csv_exporter.queued", tags={"team_id": str(exported_asset.team_id)})
+ else:
+ insight_exporter.export_insight(exported_asset)
+ statsd.incr("insight_exporter.queued", tags={"team_id": str(exported_asset.team_id)})
diff --git a/posthog/tasks/exports/csv_exporter.py b/posthog/tasks/exports/csv_exporter.py
index 71be11fa09e..0f9b4fc08a8 100644
--- a/posthog/tasks/exports/csv_exporter.py
+++ b/posthog/tasks/exports/csv_exporter.py
@@ -2,14 +2,13 @@ import datetime
import gzip
import tempfile
import uuid
-from typing import IO, Optional
+from typing import IO
import structlog
from sentry_sdk import capture_exception
from statshog.defaults.django import statsd
from posthog import settings
-from posthog.celery import app
from posthog.models import Filter
from posthog.models.event.query_event_list import query_events_list
from posthog.models.exported_asset import ExportedAsset
@@ -71,13 +70,10 @@ def _export_to_csv(exported_asset: ExportedAsset, root_bucket: str) -> None:
exported_asset.save(update_fields=["content_location"])
-@app.task()
-def export_csv(exported_asset_id: int, root_bucket: str = settings.OBJECT_STORAGE_EXPORTS_FOLDER) -> None:
+def export_csv(exported_asset: ExportedAsset, root_bucket: str = settings.OBJECT_STORAGE_EXPORTS_FOLDER) -> None:
timer = statsd.timer("csv_exporter").start()
- exported_asset: Optional[ExportedAsset] = None
try:
- exported_asset = ExportedAsset.objects.get(pk=exported_asset_id)
if exported_asset.export_format == "text/csv":
_export_to_csv(exported_asset, root_bucket)
statsd.incr("csv_exporter.succeeded", tags={"team_id": exported_asset.team.id})
diff --git a/posthog/tasks/exports/insight_exporter.py b/posthog/tasks/exports/insight_exporter.py
index 0270801d128..958fcea436e 100644
--- a/posthog/tasks/exports/insight_exporter.py
+++ b/posthog/tasks/exports/insight_exporter.py
@@ -14,7 +14,6 @@ from selenium.webdriver.support.wait import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.utils import ChromeType
-from posthog.celery import app
from posthog.internal_metrics import incr, timing
from posthog.models.exported_asset import ExportedAsset, get_public_access_token
from posthog.tasks.update_cache import update_insight_cache
@@ -24,6 +23,7 @@ logger = structlog.get_logger(__name__)
TMP_DIR = "/tmp" # NOTE: Externalise this to ENV var
+
# NOTE: We purporsefully DONT re-use the driver. It would be slightly faster but would keep an in-memory browser
# window permanently around which is unnecessary
def get_driver() -> webdriver.Chrome:
@@ -122,10 +122,7 @@ def _export_to_png(exported_asset: ExportedAsset) -> None:
driver.close()
-@app.task()
-def export_insight(exported_asset_id: int) -> None:
- exported_asset = ExportedAsset.objects.select_related("insight", "dashboard").get(pk=exported_asset_id)
-
+def export_insight(exported_asset: ExportedAsset) -> None:
if exported_asset.insight:
# NOTE: Dashboards are regularly updated but insights are not
# so, we need to trigger a manual update to ensure the results are good
diff --git a/posthog/tasks/test/test_exporter.py b/posthog/tasks/test/test_exporter.py
index fa2e79f0c2d..f2c2173f016 100644
--- a/posthog/tasks/test/test_exporter.py
+++ b/posthog/tasks/test/test_exporter.py
@@ -3,7 +3,8 @@ from unittest.mock import MagicMock, patch
from posthog.models.dashboard import Dashboard
from posthog.models.exported_asset import ExportedAsset
-from posthog.tasks.exports.insight_exporter import export_insight, get_driver
+from posthog.tasks import exporter
+from posthog.tasks.exports.insight_exporter import get_driver
from posthog.test.base import APIBaseTest
@@ -28,7 +29,7 @@ class TestExporterTask(APIBaseTest):
def test_exporter_runs(self, mock_get_driver: MagicMock, mock_uuid: MagicMock) -> None:
mock_uuid.uuid4.return_value = "posthog_test_exporter"
assert self.exported_asset.content is None
- export_insight(self.exported_asset.id)
+ exporter.export_asset(self.exported_asset.id)
self.exported_asset.refresh_from_db()
assert self.exported_asset.content is not None