mirror of
https://github.com/PostHog/posthog.git
synced 2024-12-01 04:12:23 +01:00
b56b141126
* add special migration definition and example * types * special migrations runner * fix tests * fix tests 2 * add clickhouse runner * add temp fix for tests * wip * add special migrations api (#7448) * wip new structure * update example sourcing * Update .gitignore * yet another wip structure * code quality * cypress * test docker image build * implement resumable ops * code quality * add comments * add warning * add conditional requirements for migration * add comment on is_required * add dependency map * wip dependencies and run migration on startup * code quality * fix bugs * fix more bugs * format * types * remove api from this branch * types * types * update clickhouse script * add is_migration_in_range util * fix type * fix runner * add AUTO_START_SPECIAL_MIGRATIONS env var * reset migration on start * cleanup * wip per op rollback * prevent accidental status rollback * add utils and definition test * update example with rollback per op * wip test special migration * add first runner tests * add runner tests * add util for code paths * fix test * fix types * fix types again * cleanup * cleanup * add periodic healthcheck task tests * remove unused imports * safer row updates * fix coalescing none checks * code quality * add docstrings * fix * fix deploys issue * update scripts * add delay * address reviews * address review comments * address review comments * address final comments * fix import error * fix tests * remove unused imports * fix tests * fix task test * remove unused return value * remove unused special migrations code from migrate_clickhouse * tweaks to support fresh deployments
89 lines
3.4 KiB
Python
89 lines
3.4 KiB
Python
import datetime
|
|
from textwrap import indent
|
|
|
|
from django.core.management.base import BaseCommand
|
|
from infi.clickhouse_orm import Database
|
|
from infi.clickhouse_orm.migrations import MigrationHistory
|
|
from infi.clickhouse_orm.utils import import_submodules
|
|
|
|
from posthog.settings import (
|
|
CLICKHOUSE_DATABASE,
|
|
CLICKHOUSE_HTTP_URL,
|
|
CLICKHOUSE_PASSWORD,
|
|
CLICKHOUSE_REPLICATION,
|
|
CLICKHOUSE_USER,
|
|
)
|
|
|
|
MIGRATIONS_PACKAGE_NAME = "ee.clickhouse.migrations"
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = "Migrate clickhouse"
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument(
|
|
"--upto", default=99_999, type=int, help="Database state will be brought to the state after that migration."
|
|
)
|
|
parser.add_argument("--fake", action="store_true", help="Mark migrations as run without actually running them.")
|
|
parser.add_argument(
|
|
"--plan", action="store_true", help="Shows a list of the migration actions that will be performed."
|
|
)
|
|
parser.add_argument(
|
|
"--print-sql",
|
|
action="store_true",
|
|
help="Only use with --plan. Also prints SQL for each migration to be applied.",
|
|
)
|
|
|
|
def handle(self, *args, **options):
|
|
self.migrate(CLICKHOUSE_HTTP_URL, options)
|
|
|
|
def migrate(self, host, options):
|
|
database = Database(
|
|
CLICKHOUSE_DATABASE,
|
|
db_url=host,
|
|
username=CLICKHOUSE_USER,
|
|
password=CLICKHOUSE_PASSWORD,
|
|
verify_ssl_cert=False,
|
|
)
|
|
if options["plan"]:
|
|
print("List of clickhouse migrations to be applied:")
|
|
migrations = list(self.get_migrations(database, options["upto"]))
|
|
for migration_name, operations in migrations:
|
|
print(f"Migration would get applied: {migration_name}")
|
|
for op in operations:
|
|
sql = getattr(op, "_sql")
|
|
if options["print_sql"] and sql is not None:
|
|
print(indent("\n\n".join(sql), " "))
|
|
if len(migrations) == 0:
|
|
print("Clickhouse migrations up to date!")
|
|
elif options["fake"]:
|
|
for migration_name, _ in self.get_migrations(database, options["upto"]):
|
|
print(f"Faked migration: {migration_name}")
|
|
database.insert(
|
|
[
|
|
MigrationHistory(
|
|
package_name=MIGRATIONS_PACKAGE_NAME,
|
|
module_name=migration_name,
|
|
applied=datetime.date.today(),
|
|
)
|
|
]
|
|
)
|
|
print("Migrations done")
|
|
else:
|
|
database.migrate(MIGRATIONS_PACKAGE_NAME, options["upto"], replicated=CLICKHOUSE_REPLICATION)
|
|
print("✅ Migration successful")
|
|
|
|
def get_migrations(self, database, upto):
|
|
modules = import_submodules(MIGRATIONS_PACKAGE_NAME)
|
|
applied_migrations = self.get_applied_migrations(database)
|
|
unapplied_migrations = set(modules.keys()) - applied_migrations
|
|
|
|
for migration_name in sorted(unapplied_migrations):
|
|
yield migration_name, modules[migration_name].operations
|
|
|
|
if int(migration_name[:4]) >= upto:
|
|
break
|
|
|
|
def get_applied_migrations(self, database):
|
|
return database._get_applied_migrations(MIGRATIONS_PACKAGE_NAME, replicated=CLICKHOUSE_REPLICATION)
|