Skip to content

Commit

Permalink
Merge pull request #18 from alvindera97/migrate-away-from-mocked-test…
Browse files Browse the repository at this point in the history
…s-to-docker-powered-resources

Migrate away from mocked tests to docker powered resources
  • Loading branch information
alvindera97 authored Oct 13, 2024
2 parents 9ce4c06 + bd03238 commit 454deef
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 1,041 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
- name: Build the Sphinx docs
env:
GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
TEST_CHAT_UUID: ${{ secrets.TEST_CHAT_UUID }}
TEST_CHAT_URL: ${{ secrets.TEST_CHAT_URL }}
run: |
# Navigate to the Sphinx documentation directory
cd docs || exit
Expand Down
8 changes: 7 additions & 1 deletion .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,16 @@ jobs:
- name: Run Black Formatter
run: |
black . --check
- name: Set Up Docker For Apache Kafka
run:
docker compose -f docker-compose.yaml up -d --wait
- name: Wait for Kafka and Zookeeper to be ready
run: |
while ! nc -z localhost 9092; do sleep 1; done
- name: Test with unittest
env:
GOOGLE_API_KEY: ${{ secrets.GOOGLE_API_KEY }}
TEST_CHAT_UUID: ${{ secrets.TEST_CHAT_UUID }}
TEST_CHAT_URL: ${{ secrets.TEST_CHAT_URL }}
APACHE_KAFKA_ZOOKEEPER_SERVER_START_EXECUTABLE_FULL_PATH: ${{ secrets.APACHE_KAFKA_ZOOKEEPER_SERVER_START_EXECUTABLE_FULL_PATH }}
APACHE_KAFKA_ZOOKEEPER_KAFKA_ZOOKEEPER_PROPERTIES_FULL_PATH: ${{ secrets.APACHE_KAFKA_ZOOKEEPER_KAFKA_ZOOKEEPER_PROPERTIES_FULL_PATH }}
APACHE_KAFKA_SERVER_START_EXECUTABLE_FULL_PATH: ${{ secrets.APACHE_KAFKA_SERVER_START_EXECUTABLE_FULL_PATH }}
Expand Down
22 changes: 8 additions & 14 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,14 @@ If you're going to use a .env file to store all these environment variables, you
`pip install python-dotenv`

You will need to set up the following environment variables:
`GOOGLE_API_KEY` (API Key for Google Gemini AI)
`TEST_CHAT_UUID` (e.g. chat/342498g2-87x3-4a64-9325-rb70471623ax)
`DATABASE_URL` (postgresql database URL)
`APACHE_KAFKA_OPS_MAX_WAIT_TIME_SECS` ((integer) number of seconds to wait for kafka zookeeper & kafka server start e.g. 5)
`APACHE_KAFKA_ZOOKEEPER_SERVER_START_EXECUTABLE_FULL_PATH` (/path/to/zookeeper-server-start.sh)
`APACHE_KAFKA_ZOOKEEPER_KAFKA_ZOOKEEPER_PROPERTIES_FULL_PATH` (/path/to/zookeeper.properties)
`APACHE_KAFKA_ZOOKEEPER_SERVER_STOP_EXECUTABLE_FULL_PATH` (/path/to/zookeeper-server-stop.sh)
`APACHE_KAFKA_SERVER_PROPERTIES_FULL_PATH` (/path/to/server.properties)
`APACHE_KAFKA_SERVER_START_EXECUTABLE_FULL_PATH` (/path/to/kafka-server-start.sh)
`APACHE_KAFKA_SERVER_STOP_EXECUTABLE_FULL_PATH` (/path/to/kafka-server-stop.sh)
`APACHE_KAFKA_CONSUMER_EXECUTABLE_FULL_PATH` (/path/to/kafka-console-consumer.sh)
`APACHE_KAFKA_CONSUMER_PROPERTIES_FULL_PATH` (/path/to/consumer.properties)
`APACHE_KAFKA_BOOTSTRAP_SERVER_HOST` (e.g. "localhost")
`APACHE_KAFKA_BOOTSTRAP_SERVER_PORT` (e.g. "9092")
- `GOOGLE_API_KEY` (API Key for Google Gemini AI)
- `TEST_CHAT_URL` (e.g. chat/342498g2-87x3-4a64-9325-rb70471623ax)
- `DATABASE_URL` (postgresql database URL)
- `APACHE_KAFKA_OPS_MAX_WAIT_TIME_SECS` ((integer) number of seconds to wait for kafka zookeeper & kafka server start e.g. 5)
- `APACHE_KAFKA_CONSUMER_EXECUTABLE_FULL_PATH` (/path/to/kafka-console-consumer.sh)
- `APACHE_KAFKA_CONSUMER_PROPERTIES_FULL_PATH` (/path/to/consumer.properties)
- `APACHE_KAFKA_BOOTSTRAP_SERVER_HOST` (e.g. "localhost")
- `APACHE_KAFKA_BOOTSTRAP_SERVER_PORT` (e.g. "9092")

### Code formatting
This project uses Black (https://github.com/psf/black) for all code formatting.
Expand Down
182 changes: 1 addition & 181 deletions api/endpoints/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
"""

import os
import select
import subprocess
import uuid
import warnings
from contextlib import asynccontextmanager

import eventlet
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI, WebSocket, HTTPException
from fastapi.params import Depends
Expand All @@ -21,7 +18,6 @@
from controller import Controller
from database import db
from json_defs.requests import json_request_body_defs as json
from utils import exceptions
from utils.functions import utility_functions


Expand Down Expand Up @@ -77,178 +73,6 @@ async def close_apache_kafka_producer(fastapi_application: FastAPI):
await fastapi_application.state.kafka_producer.stop()


def startup_apache_kafka(fastapi_application: FastAPI):
"""
Starts Apache Kafka
It essentially does 2 things:
1. Starts zookeeper
2. Starts the apache kafka server.
:param fastapi_application: Instance of FastAPI application to set properties on.
"""

# Start Apache Kafka Zookeeper
apache_kafka_zookeeper_startup_command = [
os.getenv("APACHE_KAFKA_ZOOKEEPER_SERVER_START_EXECUTABLE_FULL_PATH"),
os.getenv("APACHE_KAFKA_ZOOKEEPER_KAFKA_ZOOKEEPER_PROPERTIES_FULL_PATH"),
]

zookeeper_process = subprocess.Popen(
apache_kafka_zookeeper_startup_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

while eventlet.Timeout(int(os.getenv("APACHE_KAFKA_OPS_MAX_WAIT_TIME_SECS"))):
breakout = False
reads = [zookeeper_process.stdout, zookeeper_process.stderr]
ready_to_read, _, _ = select.select(reads, [], [], 0.1)

for pipe in ready_to_read:
output = pipe.readline()

if output:
print(output.strip())

if "binding to port 0.0.0.0/0.0.0.0:2181" in output.strip():
print(f"\nSUCCESSFULLY STARTED APACHE KAFKA ZOOKEEPER\n")
breakout = True
break

if (
"Failed to acquire lock on file .lock" in output.strip()
or "Exiting Kafka" in output.strip()
):
print(f"FAILED TO START APACHE KAFKA ZOOKEEPER")
zookeeper_process.kill()
zookeeper_process.return_code = -1
breakout = True
break

if breakout:
break

if (
zookeeper_process.returncode is not None
): # We're not expecting zookeeper to stop and return a returncode.
raise subprocess.CalledProcessError(
returncode=zookeeper_process.returncode,
cmd=apache_kafka_zookeeper_startup_command,
)

# Start Apache Kafka server
apache_kafka_server_startup_command = [
os.getenv("APACHE_KAFKA_SERVER_START_EXECUTABLE_FULL_PATH"),
os.getenv("APACHE_KAFKA_SERVER_PROPERTIES_FULL_PATH"),
]

apache_kafka_server_startup_process = subprocess.Popen(
apache_kafka_server_startup_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)

while eventlet.Timeout(int(os.getenv("APACHE_KAFKA_OPS_MAX_WAIT_TIME_SECS"))):
breakout = False
reads = [
apache_kafka_server_startup_process.stdout,
apache_kafka_server_startup_process.stderr,
]
ready_to_read, _, _ = select.select(reads, [], [], 0.1)

for pipe in ready_to_read:
output = pipe.readline()

if output:
print(output.strip())

if "started (kafka.server.KafkaServer)" in output.strip():
print("\nSUCCESSFULLY STARTED APACHE KAFKA SERVER\n")
breakout = True
break

if (
"Failed to acquire lock on file .lock" in output.strip()
or "shutting down (kafka.server.KafkaServer)" in output.strip()
):
breakout = True
apache_kafka_server_startup_process.kill()
apache_kafka_server_startup_process.returncode = -1
print("\nFAILED TO STARTUP APACHE KAFKA SERVER\n")
break

if breakout:
break

if apache_kafka_server_startup_process.returncode is not None:
raise subprocess.CalledProcessError(
returncode=apache_kafka_server_startup_process.returncode,
cmd=apache_kafka_zookeeper_startup_command,
)

fastapi_application.state.zookeeper_subprocess = zookeeper_process
fastapi_application.state.kafka_server_subprocess = (
apache_kafka_server_startup_process
)


def shutdown_apache_kafka(fastapi_application: FastAPI):
"""
Shutdown Apache Kafka.
This is done with a trial and error method. If there is an error while attempting to close
with the official *-stop.sh executables, a terminate() call to the subprocesses that holds the already
started kafka zookeeper and server process will be called; a warning will also be issued
If kafka's server or zookeeper isn't running, OperationNotAllowedException will be raised.
"""

if not hasattr(fastapi_application.state, "zookeeper_subprocess") or not hasattr(
fastapi_application.state, "kafka_server_subprocess"
):
raise exceptions.OperationNotAllowedException(
"You cannot shutdown apache kafka as there's none running for this instance of the server!"
)

apache_kafka_zookeeper_shutdown_command, apache_kafka_server_shutdown_command = [
os.getenv("APACHE_KAFKA_ZOOKEEPER_SERVER_STOP_EXECUTABLE_FULL_PATH"),
], [
os.getenv("APACHE_KAFKA_SERVER_STOP_EXECUTABLE_FULL_PATH"),
]

apache_kafka_shutdown_process = subprocess.Popen(
apache_kafka_server_shutdown_command,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)

if (
type(apache_kafka_shutdown_process.returncode) is int
and apache_kafka_shutdown_process.returncode < 0
) or apache_kafka_shutdown_process.returncode is not None: # if returncode is None, process didn't return immediately
fastapi_application.state.kafka_server_subprocess.terminate()
fastapi_application.state.zookeeper_subprocess.terminate()
warnings.warn(
"Kafka's Zookeeper and Server couldn't be closed via the official Kafka closure executables! A subprocess.Popen.terminate() to their subprocesses was used instead.",
InelegantKafkaShutdownWarning,
)

# Here, we're fairly guaranteed that we can safely close the zookeeper server successfully.
else:
subprocess.Popen(
apache_kafka_zookeeper_shutdown_command,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
text=True,
)

return


@asynccontextmanager
async def lifespan(fastapi_application: FastAPI):
"""
Expand All @@ -259,16 +83,12 @@ async def lifespan(fastapi_application: FastAPI):
:param fastapi_application: FastAPI app instance.
"""

startup_apache_kafka(fastapi_application)

await start_apache_kafka_producer(fastapi_application)

yield

await close_apache_kafka_producer(fastapi_application)

shutdown_apache_kafka(fastapi_application)


app = FastAPI(lifespan=lifespan)

Expand All @@ -283,7 +103,7 @@ async def handle_chat(websocket: WebSocket, chat_uuid: uuid.UUID):
"""
await websocket.accept()

if chat_uuid.__str__() != os.getenv("TEST_CHAT_UUID"):
if chat_uuid.__str__() != os.getenv("TEST_CHAT_URL"):
raise Exception("Invalid chat URL")

while True:
Expand Down
33 changes: 33 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
services:
zookeeper:
image: zookeeper:3.7.2
ports:
- "2181:2181"

kafka:
image: wurstmeister/kafka:2.13-2.8.1
ports:
- "9092:9092" # OUTSIDE listener
- "9093:9093" # INSIDE listener
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
depends_on:
- zookeeper
command:
- /bin/sh
- -c
- |
# Wait for Zookeeper to be ready
while ! nc -z zookeeper 2181; do sleep 1; done;
# Start Kafka server
start-kafka.sh &
# Wait for Kafka to be ready
sleep 10;
# Create topics
# kafka-topics.sh --create --topic 342498f2-87b3-4e64-9325-eb70471623de --partitions 1 --replication-factor 1 --if-not-exists --bootstrap-server kafka:9093;
# Keep the container running
tail -f /dev/null
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cachetools==5.4.0
certifi==2024.6.2
charset-normalizer==3.3.2
click==8.1.7
confluent-kafka==2.6.0
dnspython==2.6.1
docutils==0.21.2
email_validator==2.2.0
Expand Down
Loading

0 comments on commit 454deef

Please sign in to comment.