mirror of
https://github.com/PostHog/posthog.git
synced 2024-12-01 12:21:02 +01:00
feat(exports): support SQL insight exports (#15435)
This commit is contained in:
parent
2e9a91aa90
commit
6af88e6931
@ -4,12 +4,13 @@ import { triggerExport } from 'lib/components/ExportButton/exporter'
|
||||
import { ExporterFormat } from '~/types'
|
||||
import { DataNode, DataTableNode } from '~/queries/schema'
|
||||
import { defaultDataTableColumns } from '~/queries/nodes/DataTable/utils'
|
||||
import { isEventsQuery, isPersonsNode } from '~/queries/utils'
|
||||
import { isEventsQuery, isHogQLQuery, isPersonsNode } from '~/queries/utils'
|
||||
import { getEventsEndpoint, getPersonsEndpoint } from '~/queries/query'
|
||||
import { ExportWithConfirmation } from '~/queries/nodes/DataTable/ExportWithConfirmation'
|
||||
|
||||
const EXPORT_LIMIT_EVENTS = 3500
|
||||
const EXPORT_LIMIT_PERSONS = 10000
|
||||
const EXPORT_LIMIT_HOGQL = 65536
|
||||
|
||||
function startDownload(query: DataTableNode, onlySelectedColumns: boolean): void {
|
||||
const exportContext = isEventsQuery(query.source)
|
||||
@ -19,6 +20,8 @@ function startDownload(query: DataTableNode, onlySelectedColumns: boolean): void
|
||||
}
|
||||
: isPersonsNode(query.source)
|
||||
? { path: getPersonsEndpoint(query.source), max_limit: EXPORT_LIMIT_PERSONS }
|
||||
: isHogQLQuery(query.source)
|
||||
? { source: query.source, max_limit: EXPORT_LIMIT_HOGQL }
|
||||
: undefined
|
||||
if (!exportContext) {
|
||||
throw new Error('Unsupported node type')
|
||||
|
@ -2693,7 +2693,13 @@ export type OnlineExportContext = {
|
||||
max_limit?: number
|
||||
}
|
||||
|
||||
export type ExportContext = OnlineExportContext | LocalExportContext
|
||||
export type QueryExportContext = {
|
||||
source: Record<string, any>
|
||||
filename?: string
|
||||
max_limit?: number
|
||||
}
|
||||
|
||||
export type ExportContext = OnlineExportContext | LocalExportContext | QueryExportContext
|
||||
|
||||
export interface ExportedAssetType {
|
||||
id: number
|
||||
|
@ -8,6 +8,7 @@ from sentry_sdk import capture_exception, push_scope
|
||||
from statshog.defaults.django import statsd
|
||||
|
||||
from posthog.jwt import PosthogJwtAudience, encode_jwt
|
||||
from posthog.api.query import process_query
|
||||
from posthog.logging.timing import timed
|
||||
from posthog.models.exported_asset import ExportedAsset, save_content
|
||||
from posthog.utils import absolute_uri
|
||||
@ -69,7 +70,7 @@ def _convert_response_to_csv_data(data: Any) -> List[Any]:
|
||||
results = data.get("results")
|
||||
|
||||
# query like
|
||||
if isinstance(results[0], list) and "types" in data:
|
||||
if len(results) > 0 and (isinstance(results[0], list) or isinstance(results[0], tuple)) and "types" in data:
|
||||
# e.g. {'columns': ['count()'], 'hasMore': False, 'results': [[1775]], 'types': ['UInt64']}
|
||||
# or {'columns': ['count()', 'event'], 'hasMore': False, 'results': [[551, '$feature_flag_called'], [265, '$autocapture']], 'types': ['UInt64', 'String']}
|
||||
csv_rows: List[Dict[str, Any]] = []
|
||||
@ -167,52 +168,57 @@ class UnexpectedEmptyJsonResponse(Exception):
|
||||
def _export_to_csv(exported_asset: ExportedAsset, limit: int = 1000, max_limit: int = 3_500) -> None:
|
||||
resource = exported_asset.export_context
|
||||
|
||||
path: str = resource["path"]
|
||||
columns: List[str] = resource.get("columns", [])
|
||||
|
||||
method: str = resource.get("method", "GET")
|
||||
body = resource.get("body", None)
|
||||
|
||||
access_token = encode_jwt(
|
||||
{"id": exported_asset.created_by_id}, datetime.timedelta(minutes=15), PosthogJwtAudience.IMPERSONATED_USER
|
||||
)
|
||||
|
||||
next_url = None
|
||||
all_csv_rows: List[Any] = []
|
||||
|
||||
while len(all_csv_rows) < max_limit:
|
||||
response = make_api_call(access_token, body, limit, method, next_url, path)
|
||||
if resource.get("source"):
|
||||
query = resource.get("source")
|
||||
query_response = process_query(team=exported_asset.team, query_json=query, is_hogql_enabled=True)
|
||||
all_csv_rows = _convert_response_to_csv_data(query_response)
|
||||
|
||||
if response.status_code != 200:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
response_json = response.json()
|
||||
except Exception:
|
||||
response_json = "no response json to parse"
|
||||
raise Exception(f"export API call failed with status_code: {response.status_code}. {response_json}")
|
||||
else:
|
||||
path: str = resource["path"]
|
||||
method: str = resource.get("method", "GET")
|
||||
body = resource.get("body", None)
|
||||
next_url = None
|
||||
access_token = encode_jwt(
|
||||
{"id": exported_asset.created_by_id}, datetime.timedelta(minutes=15), PosthogJwtAudience.IMPERSONATED_USER
|
||||
)
|
||||
|
||||
# Figure out how to handle funnel polling....
|
||||
data = response.json()
|
||||
while len(all_csv_rows) < max_limit:
|
||||
response = make_api_call(access_token, body, limit, method, next_url, path)
|
||||
|
||||
if data is None:
|
||||
unexpected_empty_json_response = UnexpectedEmptyJsonResponse("JSON is None when calling API for data")
|
||||
logger.error(
|
||||
"csv_exporter.json_was_none",
|
||||
exc=unexpected_empty_json_response,
|
||||
exc_info=True,
|
||||
response_text=response.text,
|
||||
)
|
||||
if response.status_code != 200:
|
||||
# noinspection PyBroadException
|
||||
try:
|
||||
response_json = response.json()
|
||||
except Exception:
|
||||
response_json = "no response json to parse"
|
||||
raise Exception(f"export API call failed with status_code: {response.status_code}. {response_json}")
|
||||
|
||||
raise unexpected_empty_json_response
|
||||
# Figure out how to handle funnel polling....
|
||||
data = response.json()
|
||||
|
||||
csv_rows = _convert_response_to_csv_data(data)
|
||||
if data is None:
|
||||
unexpected_empty_json_response = UnexpectedEmptyJsonResponse("JSON is None when calling API for data")
|
||||
logger.error(
|
||||
"csv_exporter.json_was_none",
|
||||
exc=unexpected_empty_json_response,
|
||||
exc_info=True,
|
||||
response_text=response.text,
|
||||
)
|
||||
|
||||
all_csv_rows = all_csv_rows + csv_rows
|
||||
raise unexpected_empty_json_response
|
||||
|
||||
if not data.get("next") or not csv_rows:
|
||||
break
|
||||
csv_rows = _convert_response_to_csv_data(data)
|
||||
|
||||
next_url = data.get("next")
|
||||
all_csv_rows = all_csv_rows + csv_rows
|
||||
|
||||
if not data.get("next") or not csv_rows:
|
||||
break
|
||||
|
||||
next_url = data.get("next")
|
||||
|
||||
renderer = OrderedCsvRenderer()
|
||||
|
||||
|
@ -256,6 +256,29 @@ class TestCSVExporter(APIBaseTest):
|
||||
with pytest.raises(UnexpectedEmptyJsonResponse, match="JSON is None when calling API for data"):
|
||||
csv_exporter.export_csv(self._create_asset())
|
||||
|
||||
@patch("posthog.models.exported_asset.UUIDT")
|
||||
def test_csv_exporter_hogql_query(self, mocked_uuidt) -> None:
|
||||
exported_asset = ExportedAsset(
|
||||
team=self.team,
|
||||
export_format=ExportedAsset.ExportFormat.CSV,
|
||||
export_context={"source": {"kind": "HogQLQuery", "query": "select 10 as number, '20' as string"}},
|
||||
)
|
||||
exported_asset.save()
|
||||
mocked_uuidt.return_value = "a-guid"
|
||||
|
||||
with self.settings(OBJECT_STORAGE_ENABLED=True, OBJECT_STORAGE_EXPORTS_FOLDER="Test-Exports"):
|
||||
csv_exporter.export_csv(exported_asset)
|
||||
|
||||
assert (
|
||||
exported_asset.content_location
|
||||
== f"{TEST_PREFIX}/csv/team-{self.team.id}/task-{exported_asset.id}/a-guid"
|
||||
)
|
||||
|
||||
content = object_storage.read(exported_asset.content_location)
|
||||
assert content == "number,string\r\n10,20\r\n"
|
||||
|
||||
assert exported_asset.content is None
|
||||
|
||||
def _split_to_dict(self, url: str) -> Dict[str, Any]:
|
||||
first_split_parts = url.split("?")
|
||||
assert len(first_split_parts) == 2
|
||||
|
Loading…
Reference in New Issue
Block a user