0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-12-01 04:12:23 +01:00
posthog/ee/kafka_client/client.py
Harry Waye bc43b8685e
feat(healthchecks): add clickhouse and redis health (#8497)
* feat(healthchecks): add clickhouse as a dependency

* feat(healthchecks): add celery broker healthcheck

* feat(healthchecks): add cache healthcheck, plus some misc updates

* add missing typing

* add missing typing

* add role cast

* Add logging for kafka connectivity check

* actually add healthcheck as middleware

* fix postgres failure check

* simplify postgres migration return value

* verboise -> verbose

Co-authored-by: Guido Iaquinti <4038041+guidoiaquinti@users.noreply.github.com>

* put interface validation at top of view

* remove unused imports

* show resp.content on status failures

* test(healthchecks): set log level to debug to pytest displays it

* restore logger level

* Bring up redis for backend ci tests

* up redis in non-cloud tests

Co-authored-by: Guido Iaquinti <4038041+guidoiaquinti@users.noreply.github.com>
2022-02-10 15:20:38 +00:00

161 lines
4.9 KiB
Python

import io
import json
from typing import Any, Callable, Dict, Optional
import kafka.errors
from google.protobuf.internal.encoder import _VarintBytes # type: ignore
from google.protobuf.json_format import MessageToJson
from kafka import KafkaConsumer as KC
from kafka import KafkaProducer as KP
from structlog import get_logger
from ee.clickhouse.client import async_execute, sync_execute
from ee.kafka_client import helper
from ee.settings import KAFKA_ENABLED
from posthog.settings import KAFKA_BASE64_KEYS, KAFKA_HOSTS, TEST
from posthog.utils import SingletonDecorator
KAFKA_PRODUCER_RETRIES = 5
logger = get_logger(__file__)
class TestKafkaProducer:
def __init__(self):
pass
def send(self, topic: str, value: Any, key: Any = None):
return
def flush(self):
return
class TestKafkaConsumer:
def __init__(self, topic="test", max=0, **kwargs):
self.max = max
self.n = 0
self.topic = topic
def __iter__(self):
return self
def __next__(self):
if self.n <= self.max:
self.n += 1
return f"message {self.n} from {self.topic} topic"
else:
raise StopIteration
def seek_to_beginning(self):
return
def seek_to_end(self):
return
class _KafkaProducer:
def __init__(self, test=TEST):
if test:
self.producer = TestKafkaProducer()
elif KAFKA_BASE64_KEYS:
self.producer = helper.get_kafka_producer(retries=KAFKA_PRODUCER_RETRIES, value_serializer=lambda d: d)
else:
self.producer = KP(retries=KAFKA_PRODUCER_RETRIES, bootstrap_servers=KAFKA_HOSTS)
@staticmethod
def json_serializer(d):
b = json.dumps(d).encode("utf-8")
return b
def produce(self, topic: str, data: Any, key: Any = None, value_serializer: Optional[Callable[[Any], Any]] = None):
if not value_serializer:
value_serializer = self.json_serializer
b = value_serializer(data)
if key is not None:
key = key.encode("utf-8")
self.producer.send(topic, value=b)
def close(self):
self.producer.flush()
def can_connect():
"""
This is intended to validate if we are able to connect to kafka, without
actually sending any messages. I'm not amazingly pleased with this as a
solution. Would have liked to have validated that the singleton producer was
connected. It does expose `bootstrap_connected`, but this becomes false if
the cluster restarts despite still being able to successfully send messages.
I'm hoping that the load this generates on the cluster will be
insignificant, even if it is occuring from, say, 30 separate pods, say,
every 10 seconds.
"""
try:
_KafkaProducer(test=TEST)
except kafka.errors.KafkaError:
logger.debug("kafka_connection_failure", exc_info=True)
return False
return True
KafkaProducer = SingletonDecorator(_KafkaProducer)
def build_kafka_consumer(
topic: str, value_deserializer=lambda v: json.loads(v.decode("utf-8")), auto_offset_reset="latest", test=TEST
):
if test:
consumer = TestKafkaConsumer(topic=topic, auto_offset_reset=auto_offset_reset, max=10)
elif KAFKA_BASE64_KEYS:
consumer = helper.get_kafka_consumer(
topic=topic, auto_offset_reset=auto_offset_reset, value_deserializer=value_deserializer
)
else:
consumer = KC(
topic,
bootstrap_servers=KAFKA_HOSTS,
auto_offset_reset=auto_offset_reset,
value_deserializer=value_deserializer,
)
return consumer
class ClickhouseProducer:
def __init__(self, kafka_enabled=KAFKA_ENABLED):
if kafka_enabled:
self.send_to_kafka = True
self.producer = KafkaProducer()
else:
self.send_to_kafka = False
@staticmethod
def proto_length_serializer(data: Any) -> bytes:
f = io.BytesIO()
f.write(_VarintBytes(data.ByteSize()))
f.write(data.SerializeToString())
f.seek(0)
return f.read()
def produce_proto(self, sql: str, topic: str, data: Any, sync: bool = True):
if self.send_to_kafka:
self.producer.produce(topic=topic, data=data, value_serializer=self.proto_length_serializer)
else:
dict_data = json.loads(
MessageToJson(data, including_default_value_fields=True, preserving_proto_field_name=True)
)
if sync:
sync_execute(sql, dict_data)
else:
async_execute(sql, dict_data)
def produce(self, sql: str, topic: str, data: Dict[str, Any], sync: bool = True):
if self.send_to_kafka:
self.producer.produce(topic=topic, data=data)
else:
if sync:
sync_execute(sql, data)
else:
async_execute(sql, data)