0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 04:12:23 +01:00
posthog/ee/tasks/subscriptions/subscription_utils.py
Paul D'Ambra c8dd7b0244
feat: export lists of events to CSV (backend only) (#10510)
* sketch the interaction

* correct field type

* explode a filter to steps of day length

* write to object storage maybe

* very shonky storing of gzipped files

* doesn't need an export type

* mark export type choices as deprecated

* order methods

* stage to temporary file

* can manually export the uncompressed content

* shonkily export as a csv

* wip

* with test for requesting the export

* with polling test for the API

* put existing broken CSV download back before implementing UI change

* open api change

* even more waffle

* less passive waffle

* sometimes less specific is more correct

* refactor looping

* okay snapshots

* remove unused exception variable

* fix mocks

* Update snapshots

* Update snapshots

* lift storage location to the exported asset model

* split the export tasks

* improve the temp file usage in csv exporter

* delete the test files we're creating

* add a commit to try and trigger github actions

Co-authored-by: pauldambra <pauldambra@users.noreply.github.com>
2022-06-28 20:05:37 +02:00

54 lines
1.6 KiB
Python

from time import sleep
from typing import List, Tuple
import structlog
from celery import group
from posthog.models.dashboard_tile import get_tiles_ordered_by_position
from posthog.models.exported_asset import ExportedAsset
from posthog.models.insight import Insight
from posthog.models.subscription import Subscription
from posthog.tasks.exports.insight_exporter import export_insight
logger = structlog.get_logger(__name__)
UTM_TAGS_BASE = "utm_source=posthog&utm_campaign=subscription_report"
DEFAULT_MAX_ASSET_COUNT = 6
def generate_assets(
subscription: Subscription, max_asset_count: int = DEFAULT_MAX_ASSET_COUNT
) -> Tuple[List[Insight], List[ExportedAsset]]:
insights = []
if subscription.dashboard:
tiles = get_tiles_ordered_by_position(subscription.dashboard)
insights = [tile.insight for tile in tiles]
elif subscription.insight:
insights = [subscription.insight]
else:
raise Exception("There are no insights to be sent for this Subscription")
# Create all the assets we need
assets = [
ExportedAsset(
team=subscription.team, export_format="image/png", insight=insight, dashboard=subscription.dashboard
)
for insight in insights[:max_asset_count]
]
ExportedAsset.objects.bulk_create(assets)
# Wait for all assets to be exported
tasks = [export_insight.s(asset.id) for asset in assets]
parallel_job = group(tasks).apply_async()
max_wait = 30
while not parallel_job.ready():
max_wait = max_wait - 1
sleep(1)
if max_wait < 0:
raise Exception("Timed out waiting for exports")
return insights, assets