0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-30 19:41:46 +01:00
posthog/ee/kafka_client/client.py
James Greenhill ed6eb5e796
Setup ecs configs for web, worker, migration tasks and services (#2458)
* add worker to the ecs config and deploy

* for testing

* pull from this branch for testing

* chain config renders

* split out events pipe

* Set is_heroku true because of heroku kafka

* update /e/ service to run on port 8001

* add 8001 to the container definition as well

* simplify

* test migrating w/ ecs task using aws cli

* split services

* typo in task def

* remove networkConfiguration from task definition

* duplicate

* task-def-web specific

* update events service name

* Handle base64 encoded kafka certs

* if it's empty then try to set it for env vars

* fix b64 decode call

* cleanups

* enable base64 encoding of keys for kafka

* depend on kafka-helper for deps

* reformat

* sort imports

* type fixes

* it's late, I can't type. typos.

* use get_bool_from_env

* remove debug bits. Trigger on master/main

* prettier my yaml

* add notes about ref in GA

* up cpu and memory
2020-12-03 15:51:37 -08:00

93 lines
2.8 KiB
Python

import io
import json
from typing import Any, Callable, Dict, Optional
import kafka_helper
from google.protobuf.internal.encoder import _VarintBytes # type: ignore
from google.protobuf.json_format import MessageToJson
from kafka import KafkaProducer as KP
from ee.clickhouse.client import async_execute, sync_execute
from ee.kafka_client import helper
from ee.settings import KAFKA_ENABLED
from posthog.settings import IS_HEROKU, KAFKA_BASE64_KEYS, KAFKA_HOSTS, TEST
from posthog.utils import SingletonDecorator
class TestKafkaProducer:
def __init__(self):
pass
def send(self, topic: str, data: Any):
return
def flush(self):
return
class _KafkaProducer:
def __init__(self):
if TEST:
self.producer = TestKafkaProducer()
elif IS_HEROKU:
self.producer = kafka_helper.get_kafka_producer(value_serializer=lambda d: d)
elif KAFKA_BASE64_KEYS:
self.producer = helper.get_kafka_producer(value_serializer=lambda d: d)
else:
self.producer = KP(bootstrap_servers=KAFKA_HOSTS)
@staticmethod
def json_serializer(d):
b = json.dumps(d).encode("utf-8")
return b
def produce(self, topic: str, data: Any, value_serializer: Optional[Callable[[Any], Any]] = None):
if not value_serializer:
value_serializer = self.json_serializer
b = value_serializer(data)
self.producer.send(topic, b)
def close(self):
self.producer.flush()
KafkaProducer = SingletonDecorator(_KafkaProducer)
class ClickhouseProducer:
def __init__(self):
if KAFKA_ENABLED:
self.send_to_kafka = True
self.producer = KafkaProducer()
else:
self.send_to_kafka = False
@staticmethod
def proto_length_serializer(data: Any) -> bytes:
f = io.BytesIO()
f.write(_VarintBytes(data.ByteSize()))
f.write(data.SerializeToString())
f.seek(0)
return f.read()
def produce_proto(self, sql: str, topic: str, data: Any, sync: bool = True):
if self.send_to_kafka:
self.producer.produce(topic=topic, data=data, value_serializer=self.proto_length_serializer)
else:
dict_data = json.loads(
MessageToJson(data, including_default_value_fields=True, preserving_proto_field_name=True)
)
if sync:
sync_execute(sql, dict_data)
else:
async_execute(sql, dict_data)
def produce(self, sql: str, topic: str, data: Dict[str, Any], sync: bool = True):
if self.send_to_kafka:
self.producer.produce(topic=topic, data=data)
else:
if sync:
sync_execute(sql, data)
else:
async_execute(sql, data)