0
0
mirror of https://github.com/PostHog/posthog.git synced 2024-11-28 18:26:15 +01:00
posthog/ee/kafka_client/helper.py
James Greenhill ed6eb5e796
Setup ecs configs for web, worker, migration tasks and services (#2458)
* add worker to the ecs config and deploy

* for testing

* pull from this branch for testing

* chain config renders

* split out events pipe

* Set is_heroku true because of heroku kafka

* update /e/ service to run on port 8001

* add 8001 to the container definition as well

* simplify

* test migrating w/ ecs task using aws cli

* split services

* typo in task def

* remove networkConfiguration from task definition

* duplicate

* task-def-web specific

* update events service name

* Handle base64 encoded kafka certs

* if it's empty then try to set it for env vars

* fix b64 decode call

* cleanups

* enable base64 encoding of keys for kafka

* depend on kafka-helper for deps

* reformat

* sort imports

* type fixes

* it's late, I can't type. typos.

* use get_bool_from_env

* remove debug bits. Trigger on master/main

* prettier my yaml

* add notes about ref in GA

* up cpu and memory
2020-12-03 15:51:37 -08:00

133 lines
5.2 KiB
Python

"""
Helper methods for creating the kafka-python KafkaProducer and KafkaConsumer objects.
https://github.com/heroku/kafka-helper
"""
import base64
import json
import os
import ssl
from tempfile import NamedTemporaryFile
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse # type: ignore
from base64 import standard_b64encode
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from kafka import KafkaConsumer, KafkaProducer
def get_kafka_ssl_context():
"""
Returns an SSL context based on the certificate information in the Kafka config vars.
"""
# NOTE: We assume that Kafka environment variables are present. If using
# Apache Kafka on Heroku, they will be available in your app configuration.
#
# 1. Write the PEM certificates necessary for connecting to the Kafka brokers to physical
# files. The broker connection SSL certs are passed in environment/config variables and
# the python and ssl libraries require them in physical files. The public keys are written
# to short lived NamedTemporaryFile files; the client key is encrypted before writing to
# the short lived NamedTemporaryFile
#
# 2. Create and return an SSLContext for connecting to the Kafka brokers referencing the
# PEM certificates written above
#
# stash the kafka certs in named temporary files for loading into SSLContext. Initialize the
# SSLContext inside the with so when it goes out of scope the files are removed which has them
# existing for the shortest amount of time. As extra caution password
# protect/encrypt the client key
with NamedTemporaryFile(suffix=".crt") as cert_file, NamedTemporaryFile(
suffix=".key"
) as key_file, NamedTemporaryFile(suffix=".crt") as trust_file:
cert_file.write(base64.b64decode(os.environ["KAFKA_CLIENT_CERT_B64"].encode("utf-8")))
cert_file.flush()
# setup cryptography to password encrypt/protect the client key so it's not in the clear on
# the filesystem. Use the generated password in the call to load_cert_chain
passwd = standard_b64encode(os.urandom(33))
private_key = serialization.load_pem_private_key(
base64.b64decode(os.environ["KAFKA_CLIENT_CERT_KEY_B64"].encode("utf-8")),
password=None,
backend=default_backend(),
)
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(passwd),
)
key_file.write(pem)
key_file.flush()
trust_file.write(base64.b64decode(os.environ["KAFKA_TRUSTED_CERT_B64"].encode("utf-8")))
trust_file.flush()
# create an SSLContext for passing into the kafka provider using the create_default_context
# function which creates an SSLContext with protocol set to PROTOCOL_SSLv23, OP_NO_SSLv2,
# and OP_NO_SSLv3 when purpose=SERVER_AUTH.
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=trust_file.name)
ssl_context.load_cert_chain(cert_file.name, keyfile=key_file.name, password=passwd)
# Intentionally disabling hostname checking. The Kafka cluster runs in the cloud and Apache
# Kafka on Heroku doesn't currently provide stable hostnames. We're pinned to a specific certificate
# for this connection even though the certificate doesn't include host information. We rely
# on the ca trust_cert for this purpose.
ssl_context.check_hostname = False
return ssl_context
def get_kafka_brokers():
"""
Parses the KAKFA_URL and returns a list of hostname:port pairs in the format
that kafka-python expects.
"""
# NOTE: The Kafka environment variables need to be present. If using
# Apache Kafka on Heroku, they will be available in your app configuration.
if not os.environ.get("KAFKA_URL"):
raise RuntimeError("The KAFKA_URL config variable is not set.")
return [
"{}:{}".format(parsedUrl.hostname, parsedUrl.port)
for parsedUrl in [urlparse(url) for url in os.environ.get("KAFKA_URL", "").split(",")]
]
def get_kafka_producer(acks="all", value_serializer=lambda v: json.dumps(v).encode("utf-8")):
"""
Return a KafkaProducer that uses the SSLContext created with create_ssl_context.
"""
producer = KafkaProducer(
bootstrap_servers=get_kafka_brokers(),
security_protocol="SSL",
ssl_context=get_kafka_ssl_context(),
value_serializer=value_serializer,
acks=acks,
)
return producer
def get_kafka_consumer(topic=None, value_deserializer=lambda v: json.loads(v.decode("utf-8"))):
"""
Return a KafkaConsumer that uses the SSLContext created with create_ssl_context.
"""
# Create the KafkaConsumer connected to the specified brokers. Use the
# SSLContext that is created with create_ssl_context.
consumer = KafkaConsumer(
topic,
bootstrap_servers=get_kafka_brokers(),
security_protocol="SSL",
ssl_context=get_kafka_ssl_context(),
value_deserializer=value_deserializer,
)
return consumer