mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-21 13:39:22 +01:00
feat(data-warehouse): use materialized view in hogql (#25187)
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
parent
ab0865abbe
commit
6e9e9e83de
@ -5949,6 +5949,9 @@
|
||||
"sessionTableVersion": {
|
||||
"enum": ["auto", "v1", "v2"],
|
||||
"type": "string"
|
||||
},
|
||||
"useMaterializedViews": {
|
||||
"type": "boolean"
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
|
@ -228,6 +228,7 @@ export interface HogQLQueryModifiers {
|
||||
bounceRatePageViewMode?: 'count_pageviews' | 'uniq_urls' | 'uniq_page_screen_autocaptures'
|
||||
sessionTableVersion?: 'auto' | 'v1' | 'v2'
|
||||
propertyGroupsMode?: 'enabled' | 'disabled' | 'optimized'
|
||||
useMaterializedViews?: boolean
|
||||
}
|
||||
|
||||
export interface DataWarehouseEventsModifier {
|
||||
|
@ -81,7 +81,7 @@ export const DatabaseTableTreeWithItems = ({ inline }: DatabaseTableTreeProps):
|
||||
return <></>
|
||||
}
|
||||
|
||||
if (table.type === 'view' || table.type === 'data_warehouse') {
|
||||
if (table.type === 'view' || table.type === 'data_warehouse' || table.type === 'materialized_view') {
|
||||
return (
|
||||
<LemonButton
|
||||
data-attr="schema-list-item-delete"
|
||||
@ -336,7 +336,7 @@ export const DatabaseTableTreeWithItems = ({ inline }: DatabaseTableTreeProps):
|
||||
setIsOpen={setIsDeleteModalOpen}
|
||||
onDelete={() => {
|
||||
if (selectedRow) {
|
||||
if (selectedRow.type === 'view') {
|
||||
if (selectedRow.type === 'view' || selectedRow.type === 'materialized_view') {
|
||||
deleteDataWarehouseSavedQuery(selectedRow.id)
|
||||
} else {
|
||||
deleteDataWarehouseTable(selectedRow.id)
|
||||
|
@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
|
||||
ee: 0016_rolemembership_organization_member
|
||||
otp_static: 0002_throttling
|
||||
otp_totp: 0002_auto_20190420_0723
|
||||
posthog: 0482_alertconfiguration_calculation_interval_and_more
|
||||
posthog: 0483_datawarehousesavedquery_table
|
||||
sessions: 0001_initial
|
||||
social_django: 0010_uid_db_index
|
||||
two_factor: 0007_auto_20201201_1019
|
||||
|
@ -151,6 +151,8 @@ def execute_process_query(
|
||||
sentry_sdk.set_tag("team_id", team_id)
|
||||
|
||||
is_staff_user = False
|
||||
|
||||
user = None
|
||||
if user_id:
|
||||
user = User.objects.only("email", "is_staff").get(pk=user_id)
|
||||
is_staff_user = user.is_staff
|
||||
@ -184,6 +186,7 @@ def execute_process_query(
|
||||
execution_mode=ExecutionMode.CALCULATE_BLOCKING_ALWAYS,
|
||||
insight_id=query_status.insight_id,
|
||||
dashboard_id=query_status.dashboard_id,
|
||||
user=user,
|
||||
)
|
||||
if isinstance(results, BaseModel):
|
||||
results = results.model_dump(by_alias=True)
|
||||
|
@ -289,11 +289,18 @@ def create_hogql_database(
|
||||
warehouse_tables: dict[str, Table] = {}
|
||||
views: dict[str, Table] = {}
|
||||
|
||||
for saved_query in DataWarehouseSavedQuery.objects.filter(team_id=team.pk).exclude(deleted=True):
|
||||
views[saved_query.name] = saved_query.hogql_definition(modifiers)
|
||||
|
||||
for table in (
|
||||
DataWarehouseTable.objects.filter(team_id=team.pk)
|
||||
.exclude(deleted=True)
|
||||
.select_related("credential", "external_data_source")
|
||||
):
|
||||
# Skip adding data warehouse tables that are materialized from views (in this case they have the same names)
|
||||
if views.get(table.name, None) is not None:
|
||||
continue
|
||||
|
||||
s3_table = table.hogql_definition(modifiers)
|
||||
|
||||
# If the warehouse table has no _properties_ field, then set it as a virtual table
|
||||
@ -313,9 +320,6 @@ def create_hogql_database(
|
||||
|
||||
warehouse_tables[table.name] = s3_table
|
||||
|
||||
for saved_query in DataWarehouseSavedQuery.objects.filter(team_id=team.pk).exclude(deleted=True):
|
||||
views[saved_query.name] = saved_query.hogql_definition()
|
||||
|
||||
def define_mappings(warehouse: dict[str, Table], get_table: Callable):
|
||||
if "id" not in warehouse[warehouse_modifier.table_name].fields.keys():
|
||||
warehouse[warehouse_modifier.table_name].fields["id"] = ExpressionField(
|
||||
@ -591,7 +595,10 @@ def serialize_database(
|
||||
)
|
||||
if len(saved_query) != 0:
|
||||
tables[view_name] = DatabaseSchemaViewTable(
|
||||
fields=fields_dict, id=str(saved_query[0].pk), name=view.name, query=HogQLQuery(query=view.query)
|
||||
fields=fields_dict,
|
||||
id=str(saved_query[0].pk),
|
||||
name=view.name,
|
||||
query=HogQLQuery(query=saved_query[0].query["query"]),
|
||||
)
|
||||
|
||||
return tables
|
||||
|
@ -15,6 +15,28 @@ from posthog.schema import (
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from posthog.models import Team
|
||||
from posthog.models import User
|
||||
|
||||
|
||||
def create_default_modifiers_for_user(
|
||||
user: "User", team: "Team", modifiers: Optional[HogQLQueryModifiers] = None
|
||||
) -> HogQLQueryModifiers:
|
||||
if modifiers is None:
|
||||
modifiers = HogQLQueryModifiers()
|
||||
else:
|
||||
modifiers = modifiers.model_copy()
|
||||
|
||||
modifiers.useMaterializedViews = posthoganalytics.feature_enabled(
|
||||
"data-modeling",
|
||||
str(user.distinct_id),
|
||||
person_properties={
|
||||
"email": user.email,
|
||||
},
|
||||
only_evaluate_locally=True,
|
||||
send_feature_flag_events=False,
|
||||
)
|
||||
|
||||
return create_default_modifiers_for_team(team, modifiers)
|
||||
|
||||
|
||||
def create_default_modifiers_for_team(
|
||||
|
@ -56,6 +56,7 @@ from posthog.schema import (
|
||||
)
|
||||
from posthog.schema_helpers import to_dict, to_json
|
||||
from posthog.utils import generate_cache_key, get_from_dict_or_attr
|
||||
from posthog.hogql.modifiers import create_default_modifiers_for_user
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@ -622,6 +623,13 @@ class QueryRunner(ABC, Generic[Q, R, CR]):
|
||||
|
||||
last_refresh = datetime.now(UTC)
|
||||
target_age = self.cache_target_age(last_refresh=last_refresh)
|
||||
|
||||
# Avoid affecting cache key
|
||||
# Add user based modifiers here, primarily for user specific feature flagging
|
||||
if user:
|
||||
self.modifiers = create_default_modifiers_for_user(user, self.team, self.modifiers)
|
||||
self.modifiers.useMaterializedViews = True
|
||||
|
||||
fresh_response_dict = {
|
||||
**self.calculate().model_dump(),
|
||||
"is_cached": False,
|
||||
|
47
posthog/migrations/0483_datawarehousesavedquery_table.py
Normal file
47
posthog/migrations/0483_datawarehousesavedquery_table.py
Normal file
@ -0,0 +1,47 @@
|
||||
# Generated by Django 4.2.15 on 2024-10-01 18:08
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.db.models.deletion
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
atomic = False
|
||||
dependencies = [
|
||||
("posthog", "0482_alertconfiguration_calculation_interval_and_more"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.SeparateDatabaseAndState(
|
||||
state_operations=[
|
||||
migrations.AddField(
|
||||
model_name="datawarehousesavedquery",
|
||||
name="table",
|
||||
field=models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to="posthog.datawarehousetable",
|
||||
),
|
||||
),
|
||||
],
|
||||
database_operations=[
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
ALTER TABLE "posthog_datawarehousesavedquery" ADD COLUMN "table_id" uuid NULL CONSTRAINT "posthog_datawarehous_table_id_96fdb4f5_fk_posthog_d" REFERENCES "posthog_datawarehousetable"("id") DEFERRABLE INITIALLY DEFERRED; -- existing-table-constraint-ignore
|
||||
SET CONSTRAINTS "posthog_datawarehous_table_id_96fdb4f5_fk_posthog_d" IMMEDIATE; -- existing-table-constraint-ignore
|
||||
""",
|
||||
reverse_sql="""
|
||||
ALTER TABLE "posthog_datawarehousesavedquery" DROP COLUMN IF EXISTS "table_id";
|
||||
""",
|
||||
),
|
||||
migrations.RunSQL(
|
||||
"""
|
||||
CREATE INDEX CONCURRENTLY "posthog_datawarehousesavedquery_table_id_96fdb4f5" ON "posthog_datawarehousesavedquery" ("table_id");
|
||||
""",
|
||||
reverse_sql="""
|
||||
DROP INDEX IF EXISTS "posthog_datawarehousesavedquery_table_id_96fdb4f5";
|
||||
""",
|
||||
),
|
||||
],
|
||||
),
|
||||
]
|
@ -696,6 +696,7 @@ class HogQLQueryModifiers(BaseModel):
|
||||
propertyGroupsMode: Optional[PropertyGroupsMode] = None
|
||||
s3TableUseInvalidColumns: Optional[bool] = None
|
||||
sessionTableVersion: Optional[SessionTableVersion] = None
|
||||
useMaterializedViews: Optional[bool] = None
|
||||
|
||||
|
||||
class HogQLVariable(BaseModel):
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -22,7 +22,7 @@ from posthog.warehouse.models.external_data_job import ExternalDataJob, get_exte
|
||||
from posthog.warehouse.models.external_data_schema import ExternalDataSchema, aget_schema_by_id
|
||||
from posthog.warehouse.models.external_data_source import ExternalDataSource
|
||||
from posthog.warehouse.models.table import DataWarehouseTable
|
||||
from posthog.warehouse.s3 import get_s3_client
|
||||
from posthog.temporal.data_imports.util import prepare_s3_files_for_querying
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -101,21 +101,12 @@ class DataImportPipeline:
|
||||
)
|
||||
|
||||
async def _prepare_s3_files_for_querying(self, file_uris: list[str]):
|
||||
s3 = get_s3_client()
|
||||
job: ExternalDataJob = await get_external_data_job(job_id=self.inputs.run_id)
|
||||
schema: ExternalDataSchema = await aget_schema_by_id(self.inputs.schema_id, self.inputs.team_id)
|
||||
|
||||
normalized_schema_name = NamingConvention().normalize_identifier(schema.name)
|
||||
s3_folder_for_job = f"{settings.BUCKET_URL}/{job.folder_path()}"
|
||||
s3_folder_for_schema = f"{s3_folder_for_job}/{normalized_schema_name}"
|
||||
s3_folder_for_querying = f"{s3_folder_for_job}/{normalized_schema_name}__query"
|
||||
|
||||
if s3.exists(s3_folder_for_querying):
|
||||
s3.delete(s3_folder_for_querying, recursive=True)
|
||||
|
||||
for file in file_uris:
|
||||
file_name = file.replace(f"{s3_folder_for_schema}/", "")
|
||||
s3.copy(file, f"{s3_folder_for_querying}/{file_name}")
|
||||
prepare_s3_files_for_querying(job.folder_path(), normalized_schema_name, file_uris)
|
||||
|
||||
def _run(self) -> dict[str, int]:
|
||||
if self.refresh_dlt:
|
||||
|
17
posthog/temporal/data_imports/util.py
Normal file
17
posthog/temporal/data_imports/util.py
Normal file
@ -0,0 +1,17 @@
|
||||
from posthog.warehouse.s3 import get_s3_client
|
||||
from django.conf import settings
|
||||
|
||||
|
||||
def prepare_s3_files_for_querying(folder_path: str, table_name: str, file_uris: list[str]):
|
||||
s3 = get_s3_client()
|
||||
|
||||
s3_folder_for_job = f"{settings.BUCKET_URL}/{folder_path}"
|
||||
s3_folder_for_schema = f"{s3_folder_for_job}/{table_name}"
|
||||
s3_folder_for_querying = f"{s3_folder_for_job}/{table_name}__query"
|
||||
|
||||
if s3.exists(s3_folder_for_querying):
|
||||
s3.delete(s3_folder_for_querying, recursive=True)
|
||||
|
||||
for file in file_uris:
|
||||
file_name = file.replace(f"{s3_folder_for_schema}/", "")
|
||||
s3.copy(file, f"{s3_folder_for_querying}/{file_name}")
|
@ -1,10 +1,17 @@
|
||||
from posthog.temporal.data_modeling.run_workflow import (
|
||||
RunWorkflow,
|
||||
build_dag_activity,
|
||||
create_table_activity,
|
||||
finish_run_activity,
|
||||
run_dag_activity,
|
||||
start_run_activity,
|
||||
)
|
||||
|
||||
WORKFLOWS = [RunWorkflow]
|
||||
ACTIVITIES = [finish_run_activity, start_run_activity, build_dag_activity, run_dag_activity]
|
||||
ACTIVITIES = [
|
||||
finish_run_activity,
|
||||
start_run_activity,
|
||||
build_dag_activity,
|
||||
run_dag_activity,
|
||||
create_table_activity,
|
||||
]
|
||||
|
@ -31,6 +31,8 @@ from posthog.temporal.batch_exports.base import PostHogWorkflow
|
||||
from posthog.temporal.common.heartbeat import Heartbeater
|
||||
from posthog.warehouse.models import DataWarehouseModelPath, DataWarehouseSavedQuery
|
||||
from posthog.warehouse.util import database_sync_to_async
|
||||
from posthog.warehouse.data_load.create_table import create_table_from_saved_query
|
||||
from posthog.temporal.data_imports.util import prepare_s3_files_for_querying
|
||||
|
||||
logger = structlog.get_logger()
|
||||
|
||||
@ -285,7 +287,9 @@ async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTab
|
||||
model_name = model_label
|
||||
filter_params["name"] = model_name
|
||||
|
||||
saved_query = await database_sync_to_async(DataWarehouseSavedQuery.objects.filter(team=team, **filter_params).get)()
|
||||
saved_query = await database_sync_to_async(
|
||||
DataWarehouseSavedQuery.objects.prefetch_related("team").filter(team=team, **filter_params).get
|
||||
)()
|
||||
|
||||
query_columns = saved_query.columns
|
||||
if not query_columns:
|
||||
@ -320,6 +324,14 @@ async def materialize_model(model_label: str, team: Team) -> tuple[str, DeltaTab
|
||||
_ = await asyncio.to_thread(pipeline.run, hogql_table(hogql_query, team, saved_query.name, table_columns))
|
||||
|
||||
tables = get_delta_tables(pipeline)
|
||||
|
||||
for table in tables.values():
|
||||
table.optimize.compact()
|
||||
table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False)
|
||||
|
||||
file_uris = table.file_uris()
|
||||
prepare_s3_files_for_querying(saved_query.folder_path, saved_query.name, file_uris)
|
||||
|
||||
key, delta_table = tables.popitem()
|
||||
return (key, delta_table)
|
||||
|
||||
@ -568,6 +580,19 @@ async def finish_run_activity(inputs: FinishRunActivityInputs) -> None:
|
||||
raise
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class CreateTableActivityInputs:
|
||||
models: list[str]
|
||||
team_id: int
|
||||
|
||||
|
||||
@temporalio.activity.defn
|
||||
async def create_table_activity(inputs: CreateTableActivityInputs) -> None:
|
||||
"""Activity that creates tables for a list of saved queries."""
|
||||
for model in inputs.models:
|
||||
await create_table_from_saved_query(model, inputs.team_id)
|
||||
|
||||
|
||||
async def update_saved_query_status(
|
||||
label: str, status: DataWarehouseSavedQuery.Status, run_at: dt.datetime, team_id: int
|
||||
):
|
||||
@ -623,7 +648,7 @@ class RunWorkflow(PostHogWorkflow):
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
initial_interval=dt.timedelta(seconds=10),
|
||||
maximum_interval=dt.timedelta(seconds=60),
|
||||
maximum_attempts=0,
|
||||
maximum_attempts=1,
|
||||
),
|
||||
)
|
||||
|
||||
@ -637,7 +662,7 @@ class RunWorkflow(PostHogWorkflow):
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
initial_interval=dt.timedelta(seconds=10),
|
||||
maximum_interval=dt.timedelta(seconds=60),
|
||||
maximum_attempts=0,
|
||||
maximum_attempts=1,
|
||||
),
|
||||
)
|
||||
|
||||
@ -647,11 +672,26 @@ class RunWorkflow(PostHogWorkflow):
|
||||
run_model_activity_inputs,
|
||||
start_to_close_timeout=dt.timedelta(hours=1),
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
maximum_attempts=0,
|
||||
maximum_attempts=1,
|
||||
),
|
||||
)
|
||||
completed, failed, ancestor_failed = results
|
||||
|
||||
selected_labels = [selector.label for selector in inputs.select]
|
||||
create_table_activity_inputs = CreateTableActivityInputs(
|
||||
models=[label for label in completed if label in selected_labels], team_id=inputs.team_id
|
||||
)
|
||||
await temporalio.workflow.execute_activity(
|
||||
create_table_activity,
|
||||
create_table_activity_inputs,
|
||||
start_to_close_timeout=dt.timedelta(minutes=5),
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
initial_interval=dt.timedelta(seconds=10),
|
||||
maximum_interval=dt.timedelta(seconds=60),
|
||||
maximum_attempts=1,
|
||||
),
|
||||
)
|
||||
|
||||
finish_run_activity_inputs = FinishRunActivityInputs(
|
||||
completed=[label for label in completed if dag[label].selected is True],
|
||||
failed=[label for label in itertools.chain(failed, ancestor_failed) if dag[label].selected is True],
|
||||
@ -665,7 +705,7 @@ class RunWorkflow(PostHogWorkflow):
|
||||
retry_policy=temporalio.common.RetryPolicy(
|
||||
initial_interval=dt.timedelta(seconds=10),
|
||||
maximum_interval=dt.timedelta(seconds=60),
|
||||
maximum_attempts=0,
|
||||
maximum_attempts=1,
|
||||
),
|
||||
)
|
||||
|
||||
|
@ -23,11 +23,13 @@ from posthog.models import Team
|
||||
from posthog.temporal.data_modeling.run_workflow import (
|
||||
BuildDagActivityInputs,
|
||||
ModelNode,
|
||||
CreateTableActivityInputs,
|
||||
RunDagActivityInputs,
|
||||
RunWorkflow,
|
||||
RunWorkflowInputs,
|
||||
Selector,
|
||||
build_dag_activity,
|
||||
create_table_activity,
|
||||
finish_run_activity,
|
||||
get_dlt_destination,
|
||||
materialize_model,
|
||||
@ -36,6 +38,7 @@ from posthog.temporal.data_modeling.run_workflow import (
|
||||
)
|
||||
from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse
|
||||
from posthog.warehouse.models.datawarehouse_saved_query import DataWarehouseSavedQuery
|
||||
from posthog.warehouse.models.table import DataWarehouseTable
|
||||
from posthog.warehouse.models.modeling import DataWarehouseModelPath
|
||||
from posthog.warehouse.util import database_sync_to_async
|
||||
|
||||
@ -94,6 +97,42 @@ async def test_run_dag_activity_activity_materialize_mocked(activity_environment
|
||||
assert results.completed == set(dag.keys())
|
||||
|
||||
|
||||
async def test_create_table_activity(activity_environment, ateam):
|
||||
query = """\
|
||||
select
|
||||
event as event,
|
||||
if(distinct_id != '0', distinct_id, null) as distinct_id,
|
||||
timestamp as timestamp
|
||||
from events
|
||||
where event = '$pageview'
|
||||
"""
|
||||
saved_query = await DataWarehouseSavedQuery.objects.acreate(
|
||||
team=ateam,
|
||||
name="my_model",
|
||||
query={"query": query, "kind": "HogQLQuery"},
|
||||
)
|
||||
|
||||
create_table_activity_inputs = CreateTableActivityInputs(team_id=ateam.pk, models=[saved_query.id.hex])
|
||||
with (
|
||||
override_settings(
|
||||
AIRBYTE_BUCKET_KEY=settings.OBJECT_STORAGE_ACCESS_KEY_ID,
|
||||
AIRBYTE_BUCKET_SECRET=settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
|
||||
),
|
||||
unittest.mock.patch(
|
||||
"posthog.warehouse.models.table.DataWarehouseTable.get_columns",
|
||||
return_value={
|
||||
"id": {"clickhouse": "String", "hogql": "StringDatabaseField", "valid": True},
|
||||
"a_column": {"clickhouse": "String", "hogql": "StringDatabaseField", "valid": True},
|
||||
},
|
||||
),
|
||||
):
|
||||
async with asyncio.timeout(10):
|
||||
await activity_environment.run(create_table_activity, create_table_activity_inputs)
|
||||
|
||||
table = await DataWarehouseTable.objects.aget(name=saved_query.name)
|
||||
assert table.name == saved_query.name
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"dag,make_fail",
|
||||
[
|
||||
@ -296,6 +335,7 @@ async def test_materialize_model(ateam, bucket_name, minio_client, pageview_even
|
||||
key=lambda d: (d["distinct_id"], d["timestamp"]),
|
||||
)
|
||||
|
||||
assert any(f"{saved_query.name}__query" in obj["Key"] for obj in s3_objects["Contents"])
|
||||
assert table.num_rows == len(expected_events)
|
||||
assert table.num_columns == 3
|
||||
assert table.column_names == ["event", "distinct_id", "timestamp"]
|
||||
@ -588,6 +628,7 @@ async def test_run_workflow_with_minio_bucket(
|
||||
build_dag_activity,
|
||||
run_dag_activity,
|
||||
finish_run_activity,
|
||||
create_table_activity,
|
||||
],
|
||||
workflow_runner=temporalio.worker.UnsandboxedWorkflowRunner(),
|
||||
):
|
||||
|
@ -1,8 +1,8 @@
|
||||
from typing import Any
|
||||
from django.conf import settings
|
||||
|
||||
import structlog
|
||||
from asgiref.sync import async_to_sync
|
||||
from django.conf import settings
|
||||
from django.db import transaction
|
||||
from rest_framework import exceptions, filters, request, response, serializers, status, viewsets
|
||||
from rest_framework.decorators import action
|
||||
@ -21,6 +21,7 @@ from posthog.temporal.data_modeling.run_workflow import RunWorkflowInputs, Selec
|
||||
from posthog.warehouse.models import DataWarehouseJoin, DataWarehouseModelPath, DataWarehouseSavedQuery
|
||||
import uuid
|
||||
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
|
||||
@ -155,6 +156,10 @@ class DataWarehouseSavedQueryViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewS
|
||||
instance: DataWarehouseSavedQuery = self.get_object()
|
||||
DataWarehouseJoin.objects.filter(source_table_name=instance.name).delete()
|
||||
DataWarehouseJoin.objects.filter(joining_table_name=instance.name).delete()
|
||||
|
||||
if instance.table is not None:
|
||||
instance.table.soft_delete()
|
||||
|
||||
self.perform_destroy(instance)
|
||||
|
||||
return response.Response(status=status.HTTP_204_NO_CONTENT)
|
||||
@ -168,6 +173,7 @@ class DataWarehouseSavedQueryViewSet(TeamAndOrgViewSetMixin, viewsets.ModelViewS
|
||||
saved_query = self.get_object()
|
||||
|
||||
temporal = sync_connect()
|
||||
|
||||
inputs = RunWorkflowInputs(
|
||||
team_id=saved_query.team_id,
|
||||
select=[Selector(label=saved_query.id.hex, ancestors=ancestors, descendants=descendants)],
|
||||
|
85
posthog/warehouse/data_load/create_table.py
Normal file
85
posthog/warehouse/data_load/create_table.py
Normal file
@ -0,0 +1,85 @@
|
||||
import uuid
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from posthog.warehouse.models import (
|
||||
aget_or_create_datawarehouse_credential,
|
||||
DataWarehouseTable,
|
||||
DataWarehouseCredential,
|
||||
asave_datawarehousetable,
|
||||
acreate_datawarehousetable,
|
||||
aget_table_by_saved_query_id,
|
||||
aget_saved_query_by_id,
|
||||
asave_saved_query,
|
||||
)
|
||||
|
||||
from asgiref.sync import sync_to_async
|
||||
from posthog.temporal.common.logger import bind_temporal_worker_logger
|
||||
from clickhouse_driver.errors import ServerException
|
||||
|
||||
|
||||
async def create_table_from_saved_query(
|
||||
saved_query_id: str,
|
||||
team_id: int,
|
||||
) -> None:
|
||||
"""
|
||||
Create a table from a saved query if it doesn't exist.
|
||||
"""
|
||||
logger = await bind_temporal_worker_logger(team_id=team_id)
|
||||
|
||||
credential: DataWarehouseCredential = await aget_or_create_datawarehouse_credential(
|
||||
team_id=team_id,
|
||||
access_key=settings.AIRBYTE_BUCKET_KEY,
|
||||
access_secret=settings.AIRBYTE_BUCKET_SECRET,
|
||||
)
|
||||
saved_query_id_converted = str(uuid.UUID(saved_query_id))
|
||||
saved_query = await aget_saved_query_by_id(saved_query_id=saved_query_id_converted, team_id=team_id)
|
||||
|
||||
try:
|
||||
table_name = f"{saved_query.name}"
|
||||
url_pattern = saved_query.url_pattern
|
||||
table_format = DataWarehouseTable.TableFormat.DeltaS3Wrapper
|
||||
|
||||
table_params = {
|
||||
"credential": credential,
|
||||
"name": table_name,
|
||||
"format": table_format,
|
||||
"url_pattern": url_pattern,
|
||||
"team_id": team_id,
|
||||
}
|
||||
|
||||
# create or update
|
||||
table_created: DataWarehouseTable | None = await aget_table_by_saved_query_id(saved_query_id_converted, team_id)
|
||||
if table_created:
|
||||
table_created.credential = credential
|
||||
table_created.format = table_format
|
||||
table_created.url_pattern = url_pattern
|
||||
await asave_datawarehousetable(table_created)
|
||||
|
||||
if not table_created:
|
||||
table_created = await acreate_datawarehousetable(**table_params)
|
||||
|
||||
assert isinstance(table_created, DataWarehouseTable) and table_created is not None
|
||||
|
||||
# TODO: handle dlt columns schemas. Need to refactor dag pipeline to pass through schema or propagate from upstream tables
|
||||
table_created.columns = await sync_to_async(table_created.get_columns)()
|
||||
await asave_datawarehousetable(table_created)
|
||||
|
||||
saved_query = await aget_saved_query_by_id(saved_query_id=saved_query_id_converted, team_id=team_id)
|
||||
|
||||
if saved_query:
|
||||
saved_query.table = table_created
|
||||
await asave_saved_query(saved_query)
|
||||
|
||||
except ServerException as err:
|
||||
logger.exception(
|
||||
f"Data Warehouse: Unknown ServerException {saved_query.pk}",
|
||||
exc_info=err,
|
||||
)
|
||||
except Exception as e:
|
||||
# TODO: handle other exceptions here
|
||||
logger.exception(
|
||||
f"Data Warehouse: Could not validate schema for saved query materialization{saved_query.pk}",
|
||||
exc_info=e,
|
||||
)
|
||||
raise
|
@ -13,7 +13,7 @@ from posthog.hogql.database.models import (
|
||||
)
|
||||
|
||||
from posthog.warehouse.models import (
|
||||
get_or_create_datawarehouse_credential,
|
||||
aget_or_create_datawarehouse_credential,
|
||||
DataWarehouseTable,
|
||||
DataWarehouseCredential,
|
||||
get_external_data_job,
|
||||
@ -103,7 +103,7 @@ async def validate_schema_and_update_table(
|
||||
|
||||
job: ExternalDataJob = await get_external_data_job(job_id=run_id)
|
||||
|
||||
credential: DataWarehouseCredential = await get_or_create_datawarehouse_credential(
|
||||
credential: DataWarehouseCredential = await aget_or_create_datawarehouse_credential(
|
||||
team_id=team_id,
|
||||
access_key=settings.AIRBYTE_BUCKET_KEY,
|
||||
access_secret=settings.AIRBYTE_BUCKET_SECRET,
|
||||
|
@ -15,6 +15,10 @@ class DataWarehouseCredential(CreatedMetaFields, UUIDModel):
|
||||
|
||||
|
||||
@database_sync_to_async
|
||||
def aget_or_create_datawarehouse_credential(team_id, access_key, access_secret) -> DataWarehouseCredential:
|
||||
return get_or_create_datawarehouse_credential(team_id, access_key, access_secret)
|
||||
|
||||
|
||||
def get_or_create_datawarehouse_credential(team_id, access_key, access_secret) -> DataWarehouseCredential:
|
||||
credential, _ = DataWarehouseCredential.objects.get_or_create(
|
||||
team_id=team_id, access_key=access_key, access_secret=access_secret
|
||||
|
@ -1,8 +1,9 @@
|
||||
import re
|
||||
from typing import Any, Optional
|
||||
from typing import Any, Optional, Union
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import models
|
||||
from django.conf import settings
|
||||
|
||||
from posthog.hogql import ast
|
||||
from posthog.hogql.database.database import Database
|
||||
@ -16,6 +17,8 @@ from posthog.warehouse.models.util import (
|
||||
clean_type,
|
||||
remove_named_tuples,
|
||||
)
|
||||
from posthog.hogql.database.s3_table import S3Table
|
||||
from posthog.warehouse.util import database_sync_to_async
|
||||
|
||||
|
||||
def validate_saved_query_name(value):
|
||||
@ -63,6 +66,7 @@ class DataWarehouseSavedQuery(CreatedMetaFields, UUIDModel, DeletedMetaFields):
|
||||
null=True,
|
||||
help_text="The timestamp of this SavedQuery's last run (if any).",
|
||||
)
|
||||
table = models.ForeignKey("posthog.DataWarehouseTable", on_delete=models.SET_NULL, null=True, blank=True)
|
||||
|
||||
class Meta:
|
||||
constraints = [
|
||||
@ -127,7 +131,17 @@ class DataWarehouseSavedQuery(CreatedMetaFields, UUIDModel, DeletedMetaFields):
|
||||
|
||||
return list(table_collector.tables)
|
||||
|
||||
def hogql_definition(self, modifiers: Optional[HogQLQueryModifiers] = None) -> SavedQuery:
|
||||
@property
|
||||
def folder_path(self):
|
||||
return f"team_{self.team.pk}_model_{self.id.hex}/modeling"
|
||||
|
||||
@property
|
||||
def url_pattern(self):
|
||||
return (
|
||||
f"https://{settings.AIRBYTE_BUCKET_DOMAIN}/dlt/team_{self.team.pk}_model_{self.id.hex}/modeling/{self.name}"
|
||||
)
|
||||
|
||||
def hogql_definition(self, modifiers: Optional[HogQLQueryModifiers] = None) -> Union[SavedQuery, S3Table]:
|
||||
from posthog.warehouse.models.table import CLICKHOUSE_HOGQL_MAPPING
|
||||
|
||||
columns = self.columns or {}
|
||||
@ -165,9 +179,36 @@ class DataWarehouseSavedQuery(CreatedMetaFields, UUIDModel, DeletedMetaFields):
|
||||
|
||||
fields[column] = hogql_type(name=column)
|
||||
|
||||
return SavedQuery(
|
||||
id=str(self.id),
|
||||
name=self.name,
|
||||
query=self.query["query"],
|
||||
fields=fields,
|
||||
)
|
||||
if (
|
||||
self.table is not None
|
||||
and (self.status == DataWarehouseSavedQuery.Status.COMPLETED or self.last_run_at is not None)
|
||||
and modifiers is not None
|
||||
and modifiers.useMaterializedViews
|
||||
):
|
||||
return self.table.hogql_definition(modifiers)
|
||||
else:
|
||||
return SavedQuery(
|
||||
id=str(self.id),
|
||||
name=self.name,
|
||||
query=self.query["query"],
|
||||
fields=fields,
|
||||
)
|
||||
|
||||
|
||||
@database_sync_to_async
|
||||
def aget_saved_query_by_id(saved_query_id: str, team_id: int) -> DataWarehouseSavedQuery | None:
|
||||
return (
|
||||
DataWarehouseSavedQuery.objects.prefetch_related("team")
|
||||
.exclude(deleted=True)
|
||||
.get(id=saved_query_id, team_id=team_id)
|
||||
)
|
||||
|
||||
|
||||
@database_sync_to_async
|
||||
def asave_saved_query(saved_query: DataWarehouseSavedQuery) -> None:
|
||||
saved_query.save()
|
||||
|
||||
|
||||
@database_sync_to_async
|
||||
def aget_table_by_saved_query_id(saved_query_id: str, team_id: int):
|
||||
return DataWarehouseSavedQuery.objects.get(id=saved_query_id, team_id=team_id).table
|
||||
|
Loading…
Reference in New Issue
Block a user