Skip to content

Commit

Permalink
dglogo/code documentation (#4)
Browse files Browse the repository at this point in the history
* readme

* readme update

* code docs

* code documentation
  • Loading branch information
dglogo authored Jan 3, 2025
1 parent 59d1232 commit a9a0766
Show file tree
Hide file tree
Showing 25 changed files with 1,641 additions and 114 deletions.
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ This NVIDIA AI blueprint shows developers how to build a microservice that trans

<img width="1021" alt="Screenshot 2024-12-30 at 8 43 43 PM" src="https://github.com/user-attachments/assets/604d0b4d-664f-4089-a30d-0431ff35aece" />

[mermaid diagram](docs/README.md)

## Quick Start Guide

1. **Set environment variables**

```bash
# Create .env file with required variables
echo "ELEVENLABS_API_KEY=your_key" > .env
Expand Down Expand Up @@ -83,7 +81,7 @@ It is easy to swap out different pieces of the stack to optimize GPU usage for a

4. **Enable Tracing**

We expose a Jaeger instance at `http://localhost:16686/` for tracing. This is useful for debugging and monitoring the system.
We expose a Jaeger instance at `http://localhost:16686/` for tracing. This is useful for debugging and monitoring the system

## Contributing

Expand Down
182 changes: 173 additions & 9 deletions services/APIService/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
"""
Main FastAPI application module for the AI Research Assistant API Service.
This module provides the core API endpoints for the PDF-to-Podcast service, handling:
- PDF file uploads and processing
- WebSocket status updates
- Job management and status tracking
- Saved podcast retrieval and management
- Vector database querying
- Service health monitoring
The service integrates with:
- PDF Service for document processing
- Agent Service for content generation
- TTS Service for audio synthesis
- Redis for caching and pub/sub
- MinIO for file storage
- OpenTelemetry for observability
"""

from fastapi import (
HTTPException,
FastAPI,
Expand Down Expand Up @@ -100,6 +120,19 @@

@app.websocket("/ws/status/{job_id}")
async def websocket_endpoint(websocket: WebSocket, job_id: str):
"""
WebSocket endpoint for real-time job status updates.
Handles client connections and sends status updates for all services processing a job.
Implements a ready-check protocol and maintains connection with periodic pings.
Args:
websocket (WebSocket): The WebSocket connection instance
job_id (str): Unique identifier for the job to track
Raises:
WebSocketDisconnect: If the client disconnects
"""
try:
# Accept the WebSocket connection
await manager.connect(websocket, job_id)
Expand Down Expand Up @@ -166,6 +199,20 @@ def process_pdf_task(
files_and_types: List[Tuple[bytes, str]],
transcription_params: TranscriptionParams,
):
"""
Process PDF files through the conversion pipeline.
Coordinates the workflow between PDF Service, Agent Service, and TTS Service
to convert PDFs into an audio podcast.
Args:
job_id (str): Unique identifier for the job
files_and_types (List[Tuple[bytes, str]]): List of tuples containing file content and type (target/context)
transcription_params (TranscriptionParams): Parameters controlling the transcription process
Raises:
Exception: If any service in the pipeline fails
"""
with telemetry.tracer.start_as_current_span("api.process_pdf_task") as span:
span.set_attribute("job_id", job_id)
try:
Expand Down Expand Up @@ -305,6 +352,21 @@ async def process_pdf(
context_files: Union[UploadFile, List[UploadFile]] = File([]),
transcription_params: str = Form(...),
):
"""
Process uploaded PDF files and generate a podcast.
Args:
background_tasks (BackgroundTasks): FastAPI background tasks handler
target_files (Union[UploadFile, List[UploadFile]]): Primary PDF file(s) to process
context_files (Union[UploadFile, List[UploadFile]], optional): Supporting PDF files
transcription_params (str): JSON string containing transcription parameters
Returns:
dict: Contains job_id for tracking the processing status
Raises:
HTTPException: If file validation fails or parameters are invalid
"""
with telemetry.tracer.start_as_current_span("api.process_pdf") as span:
# Convert single file to list for consistent handling
target_files_list = (
Expand Down Expand Up @@ -368,7 +430,19 @@ async def process_pdf(
# TODO: wire up userId auth here
@app.get("/status/{job_id}")
async def get_status(job_id: str, userId: str = Query(..., description="KAS User ID")):
"""Get aggregated status from all services"""
"""
Get aggregated status from all services for a specific job.
Args:
job_id (str): Job identifier to check status for
userId (str): User identifier for authorization
Returns:
dict: Status information from all services
Raises:
HTTPException: If job is not found
"""
with telemetry.tracer.start_as_current_span("api.job.status") as span:
span.set_attribute("job_id", job_id)
statuses = {}
Expand All @@ -392,7 +466,19 @@ async def get_status(job_id: str, userId: str = Query(..., description="KAS User

@app.get("/output/{job_id}")
async def get_output(job_id: str, userId: str = Query(..., description="KAS User ID")):
"""Get the final TTS output"""
"""
Get the final TTS output for a completed job.
Args:
job_id (str): Job identifier to get output for
userId (str): User identifier for authorization
Returns:
Response: Audio file response with appropriate headers
Raises:
HTTPException: If result is not found or TTS not completed
"""
with telemetry.tracer.start_as_current_span("api.job.output") as span:
span.set_attribute("job_id", job_id)

Expand Down Expand Up @@ -427,7 +513,14 @@ async def get_output(job_id: str, userId: str = Query(..., description="KAS User

@app.post("/cleanup")
async def cleanup_jobs():
"""Clean up old jobs across all services"""
"""
Clean up old jobs across all services.
Removes job status and result data from Redis for all services.
Returns:
dict: Number of jobs removed
"""
removed = 0
for service in ServiceType:
pattern = f"status:*:{service}"
Expand All @@ -443,7 +536,18 @@ async def cleanup_jobs():
async def get_saved_podcasts(
userId: str = Query(..., description="KAS User ID", min_length=1),
):
"""Get a list of all saved podcasts from storage with their audio data"""
"""
Get a list of all saved podcasts from storage with their audio data.
Args:
userId (str): User identifier to filter podcasts
Returns:
Dict[str, List[SavedPodcast]]: List of saved podcasts metadata
Raises:
HTTPException: If retrieval fails
"""
try:
with telemetry.tracer.start_as_current_span("api.saved_podcasts") as span:
if not userId.strip(): # Check for whitespace-only strings
Expand Down Expand Up @@ -478,7 +582,19 @@ async def get_saved_podcasts(
async def get_saved_podcast_metadata(
job_id: str, userId: str = Query(..., description="KAS User ID")
):
"""Get a specific saved podcast metadata without audio data"""
"""
Get a specific saved podcast metadata without audio data.
Args:
job_id (str): Job identifier for the podcast
userId (str): User identifier for authorization
Returns:
SavedPodcast: Podcast metadata
Raises:
HTTPException: If podcast not found or retrieval fails
"""
try:
with telemetry.tracer.start_as_current_span(
"api.saved_podcast.metadata"
Expand Down Expand Up @@ -511,7 +627,19 @@ async def get_saved_podcast_metadata(
async def get_saved_podcast(
job_id: str, userId: str = Query(..., description="KAS User ID")
):
"""Get a specific saved podcast with its audio data"""
"""
Get a specific saved podcast with its audio data.
Args:
job_id (str): Job identifier for the podcast
userId (str): User identifier for authorization
Returns:
SavedPodcastWithAudio: Podcast metadata and audio content
Raises:
HTTPException: If podcast not found or retrieval fails
"""
try:
with telemetry.tracer.start_as_current_span("api.saved_podcast.audio") as span:
span.set_attribute("job_id", job_id)
Expand Down Expand Up @@ -556,7 +684,19 @@ async def get_saved_podcast(
async def get_saved_podcast_transcript(
job_id: str, userId: str = Query(..., description="KAS User ID")
):
"""Get a specific saved podcast transcript"""
"""
Get a specific saved podcast transcript.
Args:
job_id (str): Job identifier for the podcast
userId (str): User identifier for authorization
Returns:
Conversation: Podcast transcript data
Raises:
HTTPException: If transcript not found or invalid format
"""
with telemetry.tracer.start_as_current_span("api.saved_podcast.transcript") as span:
try:
span.set_attribute("job_id", job_id)
Expand Down Expand Up @@ -590,7 +730,19 @@ async def get_saved_podcast_transcript(
async def get_saved_podcast_agent_workflow(
job_id: str, userId: str = Query(..., description="KAS User ID")
):
"""Get a specific saved podcast agent workflow"""
"""
Get a specific saved podcast agent workflow history.
Args:
job_id (str): Job identifier for the podcast
userId (str): User identifier for authorization
Returns:
PromptTracker: Agent workflow history data
Raises:
HTTPException: If history not found or retrieval fails
"""
with telemetry.tracer.start_as_current_span("api.saved_podcast.history") as span:
try:
span.set_attribute("job_id", job_id)
Expand Down Expand Up @@ -618,7 +770,19 @@ async def get_saved_podcast_agent_workflow(
async def get_saved_podcast_pdf(
job_id: str, userId: str = Query(..., description="KAS User ID")
):
"""Get the original PDF file for a specific podcast"""
"""
Get the original PDF file for a specific podcast.
Args:
job_id (str): Job identifier for the podcast
userId (str): User identifier for authorization
Returns:
Response: PDF file response with appropriate headers
Raises:
HTTPException: If PDF not found or retrieval fails
"""
with telemetry.tracer.start_as_current_span("api.saved_podcast.pdf") as span:
try:
span.set_attribute("job_id", job_id)
Expand Down
Loading

0 comments on commit a9a0766

Please sign in to comment.