mirror of
https://github.com/PostHog/posthog.git
synced 2024-11-24 00:47:50 +01:00
refactor: Yeet PRIMARY_DB
(#9017)
* refactor: Yeet `PRIMARY_DB` * Remove `db_backend` * Eliminate "Analytics database in use" * Satisfy mypy
This commit is contained in:
parent
b7d68eb37e
commit
500d4623ba
@ -6,8 +6,6 @@
|
||||
<envs>
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
<env name="DEBUG" value="1" />
|
||||
<env name="PRIMARY_DB" value="clickhouse" />
|
||||
<env name="KAFKA_ENABLED" value="true" />
|
||||
<env name="CLICKHOUSE_SECURE" value="False" />
|
||||
<env name="KAFKA_URL" value="kafka://localhost" />
|
||||
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
|
||||
@ -28,4 +26,4 @@
|
||||
<option name="INPUT_FILE" value="" />
|
||||
<method v="2" />
|
||||
</configuration>
|
||||
</component>
|
||||
</component>
|
||||
|
@ -9,11 +9,10 @@
|
||||
<envs>
|
||||
<env name="WORKER_CONCURRENCY" value="2" />
|
||||
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
|
||||
<env name="PRIMARY_DB" value="clickhouse" />
|
||||
<env name="KAFKA_ENABLED" value="true" />
|
||||
<env name="CLICKHOUSE_SECURE" value="False" />
|
||||
<env name="KAFKA_HOSTS" value="localhost:9092" />
|
||||
</envs>
|
||||
<method v="2" />
|
||||
</configuration>
|
||||
</component>
|
||||
</component>
|
||||
|
@ -7,8 +7,6 @@
|
||||
<env name="PYTHONUNBUFFERED" value="1" />
|
||||
<env name="DJANGO_SETTINGS_MODULE" value="posthog.settings" />
|
||||
<env name="DEBUG" value="1" />
|
||||
<env name="PRIMARY_DB" value="clickhouse" />
|
||||
<env name="KAFKA_ENABLED" value="true" />
|
||||
<env name="CLICKHOUSE_SECURE" value="False" />
|
||||
<env name="KAFKA_URL" value="kafka://localhost" />
|
||||
<env name="DATABASE_URL" value="postgres://posthog:posthog@localhost:5432/posthog" />
|
||||
@ -31,4 +29,4 @@
|
||||
<option name="customRunCommand" value="" />
|
||||
<method v="2" />
|
||||
</configuration>
|
||||
</component>
|
||||
</component>
|
||||
|
@ -7,7 +7,6 @@ export NO_RESTART_LOOP=1
|
||||
export CYPRESS_BASE_URL=http://localhost:8080
|
||||
export OPT_OUT_CAPTURE=1
|
||||
export SECURE_COOKIES=0
|
||||
export PRIMARY_DB=clickhouse
|
||||
export SKIP_SERVICE_VERSION_REQUIREMENTS=1
|
||||
export KAFKA_URL=kafka://kafka:9092
|
||||
export CLICKHOUSE_DATABASE=posthog_test
|
||||
|
@ -26,8 +26,8 @@ export BASE_DIR=$(dirname $(dirname "$PWD/${0#./}"))
|
||||
export KAFKA_URL=${KAFKA_URL:-'kafka://kafka:9092'}
|
||||
# Crudely extract netlocs by removing "kafka://" in a compatibility approach
|
||||
export KAFKA_HOSTS=${KAFKA_HOSTS:-$(echo $KAFKA_URL | sed -e "s/kafka\(\+ssl\)\{0,1\}:\/\///g")}
|
||||
# Check PRIMARY_DB in a compatibility approach
|
||||
export KAFKA_ENABLED=${KAFKA_ENABLED:-$([[ $PRIMARY_DB = "clickhouse" ]] && echo "true" || echo "false")}
|
||||
# Make sure KAFKA_ENABLED is true - eventually this should be redundant
|
||||
export KAFKA_ENABLED="true"
|
||||
|
||||
# On _Heroku_, the $WEB_CONCURRENCY env contains suggested number of workers per dyno.
|
||||
# Unfortunately we are running a NodeJS app, yet get the value for the "python" buildpack.
|
||||
|
@ -5,7 +5,6 @@ set -e
|
||||
trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT
|
||||
|
||||
export DEBUG=${DEBUG:-1}
|
||||
export PRIMARY_DB=clickhouse
|
||||
export SKIP_SERVICE_VERSION_REQUIREMENTS=1
|
||||
|
||||
ARCH=$(uname -m)
|
||||
|
@ -8,7 +8,6 @@
|
||||
"cloud": false,
|
||||
"available_social_auth_providers": { "google-oauth2": false, "github": false, "gitlab": false },
|
||||
"ee_available": true,
|
||||
"db_backend": "postgres",
|
||||
"available_timezones": {
|
||||
"Africa/Abidjan": 0.0,
|
||||
"Africa/Accra": 0.0,
|
||||
|
@ -1,7 +1,6 @@
|
||||
{
|
||||
"results": [
|
||||
{ "key": "posthog_version", "metric": "PostHog version", "value": "1.23.1" },
|
||||
{ "key": "analytics_database", "metric": "Analytics database in use", "value": "Postgres" },
|
||||
{ "key": "ingestion_server", "metric": "Event ingestion via", "value": "Plugin Server" },
|
||||
{ "key": "plugin_sever_alive", "metric": "Plugin server alive", "value": true },
|
||||
{ "key": "plugin_sever_version", "metric": "Plugin server version", "value": "0.15.5" },
|
||||
|
@ -67,7 +67,6 @@ services:
|
||||
CLICKHOUSE_DATABASE: 'posthog'
|
||||
CLICKHOUSE_SECURE: 'false'
|
||||
CLICKHOUSE_VERIFY: 'false'
|
||||
PRIMARY_DB: 'clickhouse'
|
||||
KAFKA_URL: 'kafka://kafka'
|
||||
REDIS_URL: 'redis://redis:6379/'
|
||||
SECRET_KEY: $POSTHOG_SECRET
|
||||
|
@ -18,7 +18,6 @@ from sentry_sdk.api import capture_exception
|
||||
from ee.clickhouse.errors import wrap_query_error
|
||||
from ee.clickhouse.timer import get_timer_thread
|
||||
from posthog import redis
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.internal_metrics import incr, timing
|
||||
from posthog.settings import (
|
||||
CLICKHOUSE_ASYNC,
|
||||
@ -31,7 +30,6 @@ from posthog.settings import (
|
||||
CLICKHOUSE_SECURE,
|
||||
CLICKHOUSE_USER,
|
||||
CLICKHOUSE_VERIFY,
|
||||
PRIMARY_DB,
|
||||
TEST,
|
||||
)
|
||||
from posthog.utils import get_safe_cache
|
||||
@ -80,122 +78,106 @@ def make_ch_pool(**overrides) -> ChPool:
|
||||
return ChPool(**kwargs)
|
||||
|
||||
|
||||
if PRIMARY_DB != AnalyticsDBMS.CLICKHOUSE:
|
||||
ch_client = None # type: Client
|
||||
if not TEST and CLICKHOUSE_ASYNC:
|
||||
ch_client = Client(
|
||||
host=CLICKHOUSE_HOST,
|
||||
database=CLICKHOUSE_DATABASE,
|
||||
secure=CLICKHOUSE_SECURE,
|
||||
user=CLICKHOUSE_USER,
|
||||
password=CLICKHOUSE_PASSWORD,
|
||||
ca_certs=CLICKHOUSE_CA,
|
||||
verify=CLICKHOUSE_VERIFY,
|
||||
)
|
||||
|
||||
class ClickHouseNotConfigured(NotImplementedError):
|
||||
def __init__(self, msg='This function only works if PRIMARY_DB is set to indicate ClickHouse!"', *args):
|
||||
super().__init__(msg, *args)
|
||||
ch_pool = make_ch_pool()
|
||||
|
||||
def async_execute(query, args=None, settings=None, with_column_types=False):
|
||||
raise ClickHouseNotConfigured()
|
||||
|
||||
def sync_execute(query, args=None, settings=None, with_column_types=False):
|
||||
raise ClickHouseNotConfigured()
|
||||
|
||||
def cache_sync_execute(query, args=None, redis_client=None, ttl=None, settings=None, with_column_types=False):
|
||||
raise ClickHouseNotConfigured()
|
||||
@async_to_sync
|
||||
async def async_execute(query, args=None, settings=None, with_column_types=False):
|
||||
loop = asyncio.get_event_loop()
|
||||
task = loop.create_task(ch_client.execute(query, args, settings=settings, with_column_types=with_column_types))
|
||||
return task
|
||||
|
||||
|
||||
else:
|
||||
if not TEST and CLICKHOUSE_ASYNC:
|
||||
ch_client = Client(
|
||||
host=CLICKHOUSE_HOST,
|
||||
database=CLICKHOUSE_DATABASE,
|
||||
secure=CLICKHOUSE_SECURE,
|
||||
user=CLICKHOUSE_USER,
|
||||
password=CLICKHOUSE_PASSWORD,
|
||||
ca_certs=CLICKHOUSE_CA,
|
||||
verify=CLICKHOUSE_VERIFY,
|
||||
)
|
||||
# if this is a test use the sync client
|
||||
ch_client = SyncClient(
|
||||
host=CLICKHOUSE_HOST,
|
||||
database=CLICKHOUSE_DATABASE,
|
||||
secure=CLICKHOUSE_SECURE,
|
||||
user=CLICKHOUSE_USER,
|
||||
password=CLICKHOUSE_PASSWORD,
|
||||
ca_certs=CLICKHOUSE_CA,
|
||||
verify=CLICKHOUSE_VERIFY,
|
||||
settings={"mutations_sync": "1"} if TEST else {},
|
||||
)
|
||||
|
||||
ch_pool = make_ch_pool()
|
||||
ch_pool = make_ch_pool()
|
||||
|
||||
@async_to_sync
|
||||
async def async_execute(query, args=None, settings=None, with_column_types=False):
|
||||
loop = asyncio.get_event_loop()
|
||||
task = loop.create_task(
|
||||
ch_client.execute(query, args, settings=settings, with_column_types=with_column_types)
|
||||
)
|
||||
return task
|
||||
def async_execute(query, args=None, settings=None, with_column_types=False): # type: ignore
|
||||
return sync_execute(query, args, settings=settings, with_column_types=with_column_types)
|
||||
|
||||
|
||||
def cache_sync_execute(query, args=None, redis_client=None, ttl=CACHE_TTL, settings=None, with_column_types=False):
|
||||
if not redis_client:
|
||||
redis_client = redis.get_client()
|
||||
key = _key_hash(query, args)
|
||||
if redis_client.exists(key):
|
||||
result = _deserialize(redis_client.get(key))
|
||||
return result
|
||||
else:
|
||||
# if this is a test use the sync client
|
||||
ch_client = SyncClient(
|
||||
host=CLICKHOUSE_HOST,
|
||||
database=CLICKHOUSE_DATABASE,
|
||||
secure=CLICKHOUSE_SECURE,
|
||||
user=CLICKHOUSE_USER,
|
||||
password=CLICKHOUSE_PASSWORD,
|
||||
ca_certs=CLICKHOUSE_CA,
|
||||
verify=CLICKHOUSE_VERIFY,
|
||||
settings={"mutations_sync": "1"} if TEST else {},
|
||||
)
|
||||
|
||||
ch_pool = make_ch_pool()
|
||||
|
||||
def async_execute(query, args=None, settings=None, with_column_types=False):
|
||||
return sync_execute(query, args, settings=settings, with_column_types=with_column_types)
|
||||
|
||||
def cache_sync_execute(query, args=None, redis_client=None, ttl=CACHE_TTL, settings=None, with_column_types=False):
|
||||
if not redis_client:
|
||||
redis_client = redis.get_client()
|
||||
key = _key_hash(query, args)
|
||||
if redis_client.exists(key):
|
||||
result = _deserialize(redis_client.get(key))
|
||||
return result
|
||||
else:
|
||||
result = sync_execute(query, args, settings=settings, with_column_types=with_column_types)
|
||||
redis_client.set(key, _serialize(result), ex=ttl)
|
||||
return result
|
||||
|
||||
def sync_execute(query, args=None, settings=None, with_column_types=False):
|
||||
with ch_pool.get_client() as client:
|
||||
start_time = perf_counter()
|
||||
|
||||
prepared_sql, prepared_args, tags = _prepare_query(client=client, query=query, args=args)
|
||||
|
||||
timeout_task = QUERY_TIMEOUT_THREAD.schedule(_notify_of_slow_query_failure, tags)
|
||||
|
||||
try:
|
||||
result = client.execute(
|
||||
prepared_sql, params=prepared_args, settings=settings, with_column_types=with_column_types
|
||||
)
|
||||
except Exception as err:
|
||||
err = wrap_query_error(err)
|
||||
tags["failed"] = True
|
||||
tags["reason"] = type(err).__name__
|
||||
incr("clickhouse_sync_execution_failure", tags=tags)
|
||||
|
||||
raise err
|
||||
finally:
|
||||
execution_time = perf_counter() - start_time
|
||||
|
||||
QUERY_TIMEOUT_THREAD.cancel(timeout_task)
|
||||
timing("clickhouse_sync_execution_time", execution_time * 1000.0, tags=tags)
|
||||
|
||||
if app_settings.SHELL_PLUS_PRINT_SQL:
|
||||
print("Execution time: %.6fs" % (execution_time,))
|
||||
if _request_information is not None and _request_information.get("save", False):
|
||||
save_query(prepared_sql, execution_time)
|
||||
result = sync_execute(query, args, settings=settings, with_column_types=with_column_types)
|
||||
redis_client.set(key, _serialize(result), ex=ttl)
|
||||
return result
|
||||
|
||||
def substitute_params(query, params):
|
||||
"""
|
||||
Helper method to ease rendering of sql clickhouse queries progressively.
|
||||
For example, there are many places where we construct queries to be used
|
||||
as subqueries of others. Each time we generate a subquery we also pass
|
||||
up the "bound" parameters to be used to render the query, which
|
||||
otherwise only happens at the point of calling clickhouse via the
|
||||
clickhouse_driver `Client`.
|
||||
|
||||
This results in sometimes large lists of parameters with no relevance to
|
||||
the containing query being passed up. Rather than do this, we can
|
||||
instead "render" the subqueries prior to using as a subquery, so our
|
||||
containing code is only responsible for it's parameters, and we can
|
||||
avoid any potential param collisions.
|
||||
"""
|
||||
return cast(SyncClient, ch_client).substitute_params(query, params)
|
||||
def sync_execute(query, args=None, settings=None, with_column_types=False):
|
||||
with ch_pool.get_client() as client:
|
||||
start_time = perf_counter()
|
||||
|
||||
prepared_sql, prepared_args, tags = _prepare_query(client=client, query=query, args=args)
|
||||
|
||||
timeout_task = QUERY_TIMEOUT_THREAD.schedule(_notify_of_slow_query_failure, tags)
|
||||
|
||||
try:
|
||||
result = client.execute(
|
||||
prepared_sql, params=prepared_args, settings=settings, with_column_types=with_column_types
|
||||
)
|
||||
except Exception as err:
|
||||
err = wrap_query_error(err)
|
||||
tags["failed"] = True
|
||||
tags["reason"] = type(err).__name__
|
||||
incr("clickhouse_sync_execution_failure", tags=tags)
|
||||
|
||||
raise err
|
||||
finally:
|
||||
execution_time = perf_counter() - start_time
|
||||
|
||||
QUERY_TIMEOUT_THREAD.cancel(timeout_task)
|
||||
timing("clickhouse_sync_execution_time", execution_time * 1000.0, tags=tags)
|
||||
|
||||
if app_settings.SHELL_PLUS_PRINT_SQL:
|
||||
print("Execution time: %.6fs" % (execution_time,))
|
||||
if _request_information is not None and _request_information.get("save", False):
|
||||
save_query(prepared_sql, execution_time)
|
||||
return result
|
||||
|
||||
|
||||
def substitute_params(query, params):
|
||||
"""
|
||||
Helper method to ease rendering of sql clickhouse queries progressively.
|
||||
For example, there are many places where we construct queries to be used
|
||||
as subqueries of others. Each time we generate a subquery we also pass
|
||||
up the "bound" parameters to be used to render the query, which
|
||||
otherwise only happens at the point of calling clickhouse via the
|
||||
clickhouse_driver `Client`.
|
||||
|
||||
This results in sometimes large lists of parameters with no relevance to
|
||||
the containing query being passed up. Rather than do this, we can
|
||||
instead "render" the subqueries prior to using as a subquery, so our
|
||||
containing code is only responsible for it's parameters, and we can
|
||||
avoid any potential param collisions.
|
||||
"""
|
||||
return cast(SyncClient, ch_client).substitute_params(query, params)
|
||||
|
||||
|
||||
def _prepare_query(client: SyncClient, query: str, args: QueryArgs):
|
||||
|
@ -1,6 +1,5 @@
|
||||
[pytest]
|
||||
env =
|
||||
PRIMARY_DB=clickhouse
|
||||
DEBUG=1
|
||||
TEST=1
|
||||
DJANGO_SETTINGS_MODULE = posthog.settings
|
||||
|
@ -5,8 +5,7 @@ import os
|
||||
from typing import Dict, List
|
||||
|
||||
from ee.kafka_client.topics import KAFKA_EVENTS_PLUGIN_INGESTION as DEFAULT_KAFKA_EVENTS_PLUGIN_INGESTION
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.settings import AUTHENTICATION_BACKENDS, PRIMARY_DB, SITE_URL, TEST, get_from_env
|
||||
from posthog.settings import AUTHENTICATION_BACKENDS, SITE_URL, TEST, get_from_env
|
||||
from posthog.utils import print_warning, str_to_bool
|
||||
|
||||
# Zapier REST hooks
|
||||
@ -72,9 +71,8 @@ AUTHENTICATION_BACKENDS = AUTHENTICATION_BACKENDS + [
|
||||
|
||||
SSO_ENFORCEMENT = get_from_env("SSO_ENFORCEMENT", "saml" if SAML_ENFORCED else None, optional=True)
|
||||
|
||||
|
||||
# ClickHouse and Kafka
|
||||
KAFKA_ENABLED = PRIMARY_DB == AnalyticsDBMS.CLICKHOUSE and not TEST
|
||||
KAFKA_ENABLED = not TEST
|
||||
|
||||
# Schedule to run column materialization on. Follows crontab syntax.
|
||||
# Use empty string to prevent from materializing
|
||||
|
@ -1,7 +1,6 @@
|
||||
{
|
||||
"results": [
|
||||
{ "key": "posthog_version", "metric": "PostHog version", "value": "1.23.1" },
|
||||
{ "key": "analytics_database", "metric": "Analytics database in use", "value": "Postgres" },
|
||||
{ "key": "ingestion_server", "metric": "Event ingestion via", "value": "Plugin Server" },
|
||||
{ "key": "plugin_sever_alive", "metric": "Plugin server alive", "value": true },
|
||||
{ "key": "plugin_sever_version", "metric": "Plugin server version", "value": "0.15.5" },
|
||||
|
@ -1282,7 +1282,6 @@ export interface PreflightStatus {
|
||||
demo: boolean
|
||||
celery: boolean
|
||||
realm: Realm
|
||||
db_backend?: 'postgres' | 'clickhouse'
|
||||
available_social_auth_providers: AuthBackends
|
||||
available_timezones?: Record<string, number>
|
||||
opt_out_capture?: boolean
|
||||
|
@ -58,8 +58,8 @@
|
||||
"arm64:services:start": "docker-compose -f docker-compose.arm64.yml up zookeeper kafka clickhouse",
|
||||
"arm64:services:stop": "docker-compose -f docker-compose.arm64.yml down",
|
||||
"arm64:services:clean": "docker-compose -f docker-compose.arm64.yml rm -v zookeeper kafka clickhouse",
|
||||
"dev:migrate:postgres": "export DEBUG=1 PRIMARY_DB=clickhouse && source env/bin/activate && python manage.py migrate",
|
||||
"dev:migrate:clickhouse": "export DEBUG=1 PRIMARY_DB=clickhouse && source env/bin/activate && python manage.py migrate_clickhouse"
|
||||
"dev:migrate:postgres": "export DEBUG=1 && source env/bin/activate && python manage.py migrate",
|
||||
"dev:migrate:clickhouse": "export DEBUG=1 && source env/bin/activate && python manage.py migrate_clickhouse"
|
||||
},
|
||||
"dependencies": {
|
||||
"@babel/core": "^7.10.4",
|
||||
|
@ -31,7 +31,7 @@
|
||||
"prettier:check": "prettier --check .",
|
||||
"prepare": "yarn protobuf:compile",
|
||||
"prepublishOnly": "yarn build",
|
||||
"setup:dev:clickhouse": "cd .. && export DEBUG=1 PRIMARY_DB=clickhouse && python manage.py migrate_clickhouse",
|
||||
"setup:dev:clickhouse": "cd .. && export DEBUG=1 && python manage.py migrate_clickhouse",
|
||||
"setup:test": "cd .. && export TEST=1 && python manage.py setup_test_environment",
|
||||
"services:start": "cd .. && docker-compose -f docker-compose.dev.yml up zookeeper kafka clickhouse",
|
||||
"services:stop": "cd .. && docker-compose -f docker-compose.dev.yml down",
|
||||
|
@ -53,10 +53,6 @@ class InstanceStatusViewSet(viewsets.ViewSet):
|
||||
}
|
||||
)
|
||||
|
||||
metrics.append(
|
||||
{"key": "analytics_database", "metric": "Analytics database in use", "value": "ClickHouse",}
|
||||
)
|
||||
|
||||
metrics.append(
|
||||
{"key": "plugin_sever_alive", "metric": "Plugin server alive", "value": is_plugin_server_alive()}
|
||||
)
|
||||
|
@ -5,7 +5,6 @@ from constance.test import override_config
|
||||
from django.utils import timezone
|
||||
from rest_framework import status
|
||||
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.models.organization import Organization, OrganizationInvite
|
||||
from posthog.test.base import APIBaseTest
|
||||
from posthog.version import VERSION
|
||||
@ -24,7 +23,7 @@ class TestPreflight(APIBaseTest):
|
||||
For security purposes, the information contained in an unauthenticated preflight request is minimal.
|
||||
"""
|
||||
self.client.logout()
|
||||
with self.settings(PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE, MULTI_TENANCY=False):
|
||||
with self.settings(MULTI_TENANCY=False):
|
||||
response = self.client.get("/_preflight/")
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
@ -53,9 +52,7 @@ class TestPreflight(APIBaseTest):
|
||||
|
||||
def test_preflight_request(self):
|
||||
with self.settings(
|
||||
PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE,
|
||||
MULTI_TENANCY=False,
|
||||
INSTANCE_PREFERENCES=self.instance_preferences(debug_queries=True),
|
||||
MULTI_TENANCY=False, INSTANCE_PREFERENCES=self.instance_preferences(debug_queries=True),
|
||||
):
|
||||
response = self.client.get("/_preflight/")
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
@ -74,7 +71,6 @@ class TestPreflight(APIBaseTest):
|
||||
"cloud": False,
|
||||
"demo": False,
|
||||
"realm": "hosted-clickhouse",
|
||||
"db_backend": "clickhouse",
|
||||
"available_social_auth_providers": {
|
||||
"google-oauth2": False,
|
||||
"github": False,
|
||||
@ -100,7 +96,7 @@ class TestPreflight(APIBaseTest):
|
||||
|
||||
self.client.logout() # make sure it works anonymously
|
||||
|
||||
with self.settings(MULTI_TENANCY=True, PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE):
|
||||
with self.settings(MULTI_TENANCY=True):
|
||||
response = self.client.get("/_preflight/")
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
@ -129,7 +125,7 @@ class TestPreflight(APIBaseTest):
|
||||
|
||||
@pytest.mark.ee
|
||||
def test_cloud_preflight_request(self):
|
||||
with self.settings(MULTI_TENANCY=True, PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE, SITE_URL="https://app.posthog.com"):
|
||||
with self.settings(MULTI_TENANCY=True, SITE_URL="https://app.posthog.com"):
|
||||
response = self.client.get("/_preflight/")
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
response = response.json()
|
||||
@ -147,7 +143,6 @@ class TestPreflight(APIBaseTest):
|
||||
"cloud": True,
|
||||
"demo": False,
|
||||
"realm": "cloud",
|
||||
"db_backend": "clickhouse",
|
||||
"available_social_auth_providers": {
|
||||
"google-oauth2": False,
|
||||
"github": False,
|
||||
@ -174,7 +169,6 @@ class TestPreflight(APIBaseTest):
|
||||
SOCIAL_AUTH_GOOGLE_OAUTH2_KEY="test_key",
|
||||
SOCIAL_AUTH_GOOGLE_OAUTH2_SECRET="test_secret",
|
||||
MULTI_TENANCY=True,
|
||||
PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE,
|
||||
INSTANCE_PREFERENCES=self.instance_preferences(disable_paid_fs=True),
|
||||
):
|
||||
response = self.client.get("/_preflight/")
|
||||
@ -194,7 +188,6 @@ class TestPreflight(APIBaseTest):
|
||||
"cloud": True,
|
||||
"demo": False,
|
||||
"realm": "cloud",
|
||||
"db_backend": "clickhouse",
|
||||
"available_social_auth_providers": {
|
||||
"google-oauth2": True,
|
||||
"github": False,
|
||||
@ -218,7 +211,7 @@ class TestPreflight(APIBaseTest):
|
||||
def test_demo(self):
|
||||
self.client.logout() # make sure it works anonymously
|
||||
|
||||
with self.settings(PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE, DEMO=True):
|
||||
with self.settings(DEMO=True):
|
||||
response = self.client.get("/_preflight/")
|
||||
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
@ -257,7 +250,7 @@ class TestPreflight(APIBaseTest):
|
||||
|
||||
self.client.logout() # make sure it works anonymously
|
||||
|
||||
with self.settings(PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE, SAML_CONFIGURED=True):
|
||||
with self.settings(SAML_CONFIGURED=True):
|
||||
response = self.client.get("/_preflight/")
|
||||
self.assertEqual(response.status_code, status.HTTP_200_OK)
|
||||
|
||||
|
@ -1,10 +1,7 @@
|
||||
# Generated by Django 3.2.5 on 2021-11-30 08:23
|
||||
import django.utils.timezone
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
|
||||
|
||||
def set_created_at(apps, schema_editor):
|
||||
|
||||
@ -16,25 +13,15 @@ def set_created_at(apps, schema_editor):
|
||||
EventDefinition = apps.get_model("posthog", "EventDefinition")
|
||||
for instance in EventDefinition.objects.filter(created_at=None):
|
||||
created_at = None
|
||||
if settings.PRIMARY_DB == AnalyticsDBMS.POSTGRES:
|
||||
Event = apps.get_model("posthog", "Event")
|
||||
event_instance = (
|
||||
Event.objects.select("timestamp")
|
||||
.filter(team=instance.team, name=instance.name)
|
||||
.order_by("timestamp")
|
||||
.first()
|
||||
result = None
|
||||
if sync_execute:
|
||||
result = sync_execute(
|
||||
"SELECT timestamp FROM events where team_id=%(team_id)s AND event=%(event)s"
|
||||
" order by timestamp limit 1",
|
||||
{"team_id": instance.team.pk, "event": instance.name,},
|
||||
)
|
||||
if event_instance:
|
||||
created_at = event_instance.timestamp
|
||||
else:
|
||||
if sync_execute:
|
||||
result = sync_execute(
|
||||
"SELECT timestamp FROM events where team_id=%(team_id)s AND event=%(event)s"
|
||||
" order by timestamp limit 1",
|
||||
{"team_id": instance.team.pk, "event": instance.name,},
|
||||
)
|
||||
if result:
|
||||
created_at = result[0][0]
|
||||
if result:
|
||||
created_at = result[0][0]
|
||||
|
||||
if created_at:
|
||||
instance.created_at = created_at
|
||||
|
@ -2,15 +2,14 @@ from datetime import timedelta
|
||||
|
||||
from kombu import Exchange, Queue
|
||||
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.settings.base_variables import TEST
|
||||
from posthog.settings.data_stores import PRIMARY_DB, REDIS_URL
|
||||
from posthog.settings.data_stores import REDIS_URL
|
||||
|
||||
# Only listen to the default queue "celery", unless overridden via the cli
|
||||
# NB! This is set to explicitly exclude the "posthog-plugins" queue, handled by a nodejs process
|
||||
CELERY_QUEUES = (Queue("celery", Exchange("celery"), "celery"),)
|
||||
CELERY_DEFAULT_QUEUE = "celery"
|
||||
CELERY_IMPORTS = []
|
||||
CELERY_IMPORTS = ["ee.tasks.materialized_columns"]
|
||||
CELERY_BROKER_URL = REDIS_URL # celery connects to redis
|
||||
CELERY_BEAT_MAX_LOOP_INTERVAL = 30 # sleep max 30sec before checking for new periodic events
|
||||
CELERY_RESULT_BACKEND = REDIS_URL # stores results for lookup when processing
|
||||
@ -18,9 +17,6 @@ CELERY_IGNORE_RESULT = True # only applies to delay(), must do @shared_task(ign
|
||||
CELERY_RESULT_EXPIRES = timedelta(days=4) # expire tasks after 4 days instead of the default 1
|
||||
REDBEAT_LOCK_TIMEOUT = 45 # keep distributed beat lock for 45sec
|
||||
|
||||
if PRIMARY_DB == AnalyticsDBMS.CLICKHOUSE:
|
||||
CELERY_IMPORTS.append("ee.tasks.materialized_columns")
|
||||
|
||||
if TEST:
|
||||
import celery
|
||||
|
||||
|
@ -4,7 +4,6 @@ from urllib.parse import urlparse
|
||||
import dj_database_url
|
||||
from django.core.exceptions import ImproperlyConfigured
|
||||
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.settings.base_variables import DEBUG, IS_COLLECT_STATIC, TEST
|
||||
from posthog.settings.utils import get_from_env, str_to_bool
|
||||
|
||||
@ -116,8 +115,6 @@ KAFKA_HOSTS_FOR_CLICKHOUSE = _parse_kafka_hosts(os.getenv("KAFKA_URL_FOR_CLICKHO
|
||||
|
||||
KAFKA_BASE64_KEYS = get_from_env("KAFKA_BASE64_KEYS", False, type_cast=str_to_bool)
|
||||
|
||||
PRIMARY_DB = AnalyticsDBMS.CLICKHOUSE
|
||||
|
||||
# The last case happens when someone upgrades Heroku but doesn't have Redis installed yet. Collectstatic gets called before we can provision Redis.
|
||||
if TEST or DEBUG or IS_COLLECT_STATIC:
|
||||
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost/")
|
||||
|
@ -36,12 +36,3 @@ if runner:
|
||||
print("Running in test mode. Setting DEBUG and TEST environment variables.")
|
||||
os.environ["DEBUG"] = "1"
|
||||
os.environ["TEST"] = "1"
|
||||
|
||||
try:
|
||||
path = sys.argv[2] if cmd == "test" else sys.argv[3]
|
||||
if path.startswith("ee"):
|
||||
print("Running EE tests. Setting clickhouse as primary database.")
|
||||
os.environ["PRIMARY_DB"] = "clickhouse"
|
||||
except IndexError:
|
||||
# there was no path, we don't want to set PRIMARY_DB
|
||||
pass
|
||||
|
@ -1,6 +1,4 @@
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.settings.base_variables import DEBUG, IS_COLLECT_STATIC, TEST
|
||||
from posthog.settings.data_stores import PRIMARY_DB
|
||||
from posthog.settings.utils import get_from_env, print_warning, str_to_bool
|
||||
from posthog.version_requirement import ServiceVersionRequirement
|
||||
|
||||
@ -14,9 +12,5 @@ if SKIP_SERVICE_VERSION_REQUIREMENTS and not (TEST or DEBUG):
|
||||
SERVICE_VERSION_REQUIREMENTS = [
|
||||
ServiceVersionRequirement(service="postgresql", supported_version=">=11.0.0,<=14.1.0",),
|
||||
ServiceVersionRequirement(service="redis", supported_version=">=5.0.0,<=6.3.0",),
|
||||
ServiceVersionRequirement(service="clickhouse", supported_version=">=21.6.0,<21.12.0"),
|
||||
]
|
||||
|
||||
if PRIMARY_DB == AnalyticsDBMS.CLICKHOUSE:
|
||||
SERVICE_VERSION_REQUIREMENTS = SERVICE_VERSION_REQUIREMENTS + [
|
||||
ServiceVersionRequirement(service="clickhouse", supported_version=">=21.6.0,<21.12.0"),
|
||||
]
|
||||
|
@ -1,5 +1,3 @@
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.settings.data_stores import PRIMARY_DB
|
||||
from posthog.settings.utils import get_from_env, str_to_bool
|
||||
|
||||
# shell_plus settings
|
||||
@ -11,5 +9,4 @@ SHELL_PLUS_POST_IMPORTS = [
|
||||
("posthog.models.property", ("Property",)),
|
||||
]
|
||||
|
||||
if PRIMARY_DB == AnalyticsDBMS.CLICKHOUSE:
|
||||
SHELL_PLUS_POST_IMPORTS.append(("ee.clickhouse.client", ("sync_execute",)))
|
||||
SHELL_PLUS_POST_IMPORTS.append(("ee.clickhouse.client", ("sync_execute",)))
|
||||
|
@ -9,7 +9,6 @@ from freezegun import freeze_time
|
||||
from ee.clickhouse.models.event import create_event
|
||||
from ee.clickhouse.models.group import create_group
|
||||
from ee.clickhouse.util import ClickhouseTestMixin
|
||||
from posthog.constants import AnalyticsDBMS
|
||||
from posthog.models import Organization, Person, Team, User
|
||||
from posthog.models.group_type_mapping import GroupTypeMapping
|
||||
from posthog.models.organization import OrganizationMembership
|
||||
@ -74,7 +73,7 @@ class TestOrganizationUsageReport(APIBaseTest, ClickhouseTestMixin):
|
||||
self.assertEqual(org_report["group_types_total"], 0)
|
||||
self.assertEqual(org_report["event_count_with_groups_month"], 0)
|
||||
|
||||
with self.settings(USE_TZ=False, PRIMARY_DB=AnalyticsDBMS.CLICKHOUSE):
|
||||
with self.settings(USE_TZ=False):
|
||||
with freeze_time("2020-11-02"):
|
||||
_create_person("old_user1", team=default_team)
|
||||
_create_person("old_user2", team=default_team)
|
||||
|
@ -144,55 +144,54 @@ class TestUpdateCache(APIBaseTest):
|
||||
}
|
||||
)
|
||||
|
||||
with self.settings(PRIMARY_DB="clickhouse"):
|
||||
filter = base_filter
|
||||
funnel_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_mock.assert_called_once()
|
||||
filter = base_filter
|
||||
funnel_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_mock.assert_called_once()
|
||||
|
||||
# trends funnel
|
||||
filter = base_filter.with_data({"funnel_viz_type": "trends"})
|
||||
funnel_trends_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_trends_mock.assert_called_once()
|
||||
# trends funnel
|
||||
filter = base_filter.with_data({"funnel_viz_type": "trends"})
|
||||
funnel_trends_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_trends_mock.assert_called_once()
|
||||
|
||||
# time to convert funnel
|
||||
filter = base_filter.with_data({"funnel_viz_type": "time_to_convert", "funnel_order_type": "strict"})
|
||||
funnel_time_to_convert_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_time_to_convert_mock.assert_called_once()
|
||||
# time to convert funnel
|
||||
filter = base_filter.with_data({"funnel_viz_type": "time_to_convert", "funnel_order_type": "strict"})
|
||||
funnel_time_to_convert_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_time_to_convert_mock.assert_called_once()
|
||||
|
||||
# strict funnel
|
||||
filter = base_filter.with_data({"funnel_order_type": "strict"})
|
||||
funnel_strict_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_strict_mock.assert_called_once()
|
||||
# strict funnel
|
||||
filter = base_filter.with_data({"funnel_order_type": "strict"})
|
||||
funnel_strict_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_strict_mock.assert_called_once()
|
||||
|
||||
# unordered funnel
|
||||
filter = base_filter.with_data({"funnel_order_type": "unordered"})
|
||||
funnel_unordered_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_unordered_mock.assert_called_once()
|
||||
# unordered funnel
|
||||
filter = base_filter.with_data({"funnel_order_type": "unordered"})
|
||||
funnel_unordered_mock.return_value.run.return_value = {}
|
||||
update_cache_item(
|
||||
generate_cache_key("{}_{}".format(filter.toJSON(), self.team.pk)),
|
||||
CacheType.FUNNEL,
|
||||
{"filter": filter.toJSON(), "team_id": self.team.pk,},
|
||||
)
|
||||
funnel_unordered_mock.assert_called_once()
|
||||
|
||||
def _test_refresh_dashboard_cache_types(
|
||||
self, filter: FilterType, cache_type: CacheType, patch_update_cache_item: MagicMock,
|
||||
|
@ -111,7 +111,6 @@ def preflight_check(request: HttpRequest) -> JsonResponse:
|
||||
if request.user.is_authenticated:
|
||||
response = {
|
||||
**response,
|
||||
"db_backend": settings.PRIMARY_DB.value,
|
||||
"available_timezones": get_available_timezones_with_offsets(),
|
||||
"opt_out_capture": os.environ.get("OPT_OUT_CAPTURE", False),
|
||||
"posthog_version": VERSION,
|
||||
|
@ -72,10 +72,6 @@
|
||||
"name": "KAFKA_BASE64_KEYS",
|
||||
"value": "True"
|
||||
},
|
||||
{
|
||||
"name": "PRIMARY_DB",
|
||||
"value": "clickhouse"
|
||||
},
|
||||
{
|
||||
"name": "SITE_URL",
|
||||
"value": "https://app.posthog.com"
|
||||
|
@ -78,10 +78,6 @@
|
||||
"name": "KAFKA_BASE64_KEYS",
|
||||
"value": "True"
|
||||
},
|
||||
{
|
||||
"name": "PRIMARY_DB",
|
||||
"value": "clickhouse"
|
||||
},
|
||||
{
|
||||
"name": "SITE_URL",
|
||||
"value": "https://app.posthog.com"
|
||||
|
@ -72,10 +72,6 @@
|
||||
"name": "KAFKA_BASE64_KEYS",
|
||||
"value": "True"
|
||||
},
|
||||
{
|
||||
"name": "PRIMARY_DB",
|
||||
"value": "clickhouse"
|
||||
},
|
||||
{
|
||||
"name": "SITE_URL",
|
||||
"value": "https://app.posthog.com"
|
||||
|
Loading…
Reference in New Issue
Block a user