Skip to content

Commit

Permalink
feat: add support for streaming responses
Browse files Browse the repository at this point in the history
  • Loading branch information
sansyrox committed Dec 15, 2024
1 parent 02d1ce7 commit 962c683
Show file tree
Hide file tree
Showing 2 changed files with 233 additions and 22 deletions.
129 changes: 129 additions & 0 deletions integration_tests/base_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1082,6 +1082,135 @@ def create_item(request, body: CreateItemBody, query: CreateItemQueryParamsParam
return CreateItemResponse(success=True, items_changed=2)


# --- Streaming responses ---

@app.get("/stream/sync")
def sync_stream():
def number_generator():
for i in range(5):
yield f"Chunk {i}\n".encode()

return Response(
status_code=200,
headers={"Content-Type": "text/plain"},
description=number_generator()
)

@app.get("/stream/async")
async def async_stream():
async def async_generator():
import asyncio
for i in range(5):
await asyncio.sleep(1) # Simulate async work
yield f"Async Chunk {i}\n".encode()

return Response(
status_code=200,
headers={"Content-Type": "text/plain"},
description=async_generator()
)

@app.get("/stream/mixed")
async def mixed_stream():
async def mixed_generator():
import asyncio
# Binary data
yield b"Binary chunk\n"
await asyncio.sleep(0.5)

# String data
yield "String chunk\n".encode()
await asyncio.sleep(0.5)

# Integer data
yield str(42).encode() + b"\n"
await asyncio.sleep(0.5)

# JSON data
import json
data = {"message": "JSON chunk", "number": 123}
yield json.dumps(data).encode() + b"\n"

return Response(
status_code=200,
headers={"Content-Type": "text/plain"},
description=mixed_generator()
)

@app.get("/stream/events")
async def server_sent_events():
async def event_generator():
import asyncio
import json
import time

# Regular event
yield f"event: message\ndata: {json.dumps({'time': time.time(), 'type': 'start'})}\n\n".encode()
await asyncio.sleep(1)

# Event with ID
yield f"id: 1\nevent: update\ndata: {json.dumps({'progress': 50})}\n\n".encode()
await asyncio.sleep(1)

# Multiple data lines
data = json.dumps({'status': 'complete', 'results': [1, 2, 3]}, indent=2)
yield f"event: complete\ndata: {data}\n\n".encode()

return Response(
status_code=200,
headers={
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"Connection": "keep-alive"
},
description=event_generator()
)

@app.get("/stream/large-file")
async def stream_large_file():
async def file_generator():
# Simulate streaming a large file in chunks
chunk_size = 1024 # 1KB chunks
total_size = 10 * chunk_size # 10KB total

for offset in range(0, total_size, chunk_size):
# Simulate reading file chunk
chunk = b"X" * min(chunk_size, total_size - offset)
yield chunk

return Response(
status_code=200,
headers={
"Content-Type": "application/octet-stream",
"Content-Disposition": "attachment; filename=large-file.bin"
},
description=file_generator()
)

@app.get("/stream/csv")
async def stream_csv():
async def csv_generator():
# CSV header
yield "id,name,value\n".encode()

import asyncio
import random

# Generate rows
for i in range(5):
await asyncio.sleep(0.5) # Simulate data processing
row = f"{i},item-{i},{random.randint(1, 100)}\n"
yield row.encode()

return Response(
status_code=200,
headers={
"Content-Type": "text/csv",
"Content-Disposition": "attachment; filename=data.csv"
},
description=csv_generator()
)

def main():
app.set_response_header("server", "robyn")
app.serve_directory(
Expand Down
126 changes: 104 additions & 22 deletions src/types/response.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,92 @@
use actix_http::{body::BoxBody, StatusCode};
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder};
use actix_web::{HttpRequest, HttpResponse, HttpResponseBuilder, Responder, Error, web::Bytes};
use pyo3::{
exceptions::PyIOError,
prelude::*,
types::{PyBytes, PyDict},
types::{PyBytes, PyDict, PyList},
};
use futures::stream::Stream;
use futures_util::StreamExt;
use std::pin::Pin;

use crate::io_helpers::{apply_hashmap_headers, read_file};
use crate::types::{check_body_type, check_description_type, get_description_from_pyobject};

use super::headers::Headers;

#[derive(Debug, Clone, FromPyObject)]
#[derive(Debug, Clone)]
pub enum ResponseBody {
Static(Vec<u8>),
Streaming(Vec<Vec<u8>>),
}

#[derive(Debug, Clone)]
pub struct Response {
pub status_code: u16,
pub response_type: String,
pub headers: Headers,
// https://pyo3.rs/v0.19.2/function.html?highlight=from_py_#per-argument-options
#[pyo3(from_py_with = "get_description_from_pyobject")]
pub description: Vec<u8>,
pub body: ResponseBody,
pub file_path: Option<String>,
}

impl<'a> FromPyObject<'a> for Response {
fn extract(ob: &'a PyAny) -> PyResult<Self> {
let status_code = ob.getattr("status_code")?.extract()?;
let response_type = ob.getattr("response_type")?.extract()?;
let headers = ob.getattr("headers")?.extract()?;
let description = ob.getattr("description")?;
let file_path = ob.getattr("file_path")?.extract()?;

let body = if let Ok(iter) = description.iter() {
let mut chunks = Vec::new();
for item in iter {
let item = item?;
let chunk = if item.is_instance_of::<pyo3::types::PyBytes>() {
item.extract::<Vec<u8>>()?
} else if item.is_instance_of::<pyo3::types::PyString>() {
item.extract::<String>()?.into_bytes()
} else if item.is_instance_of::<pyo3::types::PyInt>() {
item.extract::<i64>()?.to_string().into_bytes()
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
"Stream items must be bytes, str, or int"
));
};
chunks.push(chunk);
}
ResponseBody::Streaming(chunks)
} else {
ResponseBody::Static(get_description_from_pyobject(description)?)
};

Ok(Response {
status_code,
response_type,
headers,
body,
file_path,
})
}
}

impl Responder for Response {
type Body = BoxBody;

fn respond_to(self, _req: &HttpRequest) -> HttpResponse<Self::Body> {
let mut response_builder =
HttpResponseBuilder::new(StatusCode::from_u16(self.status_code).unwrap());
apply_hashmap_headers(&mut response_builder, &self.headers);
response_builder.body(self.description)

match self.body {
ResponseBody::Static(data) => response_builder.body(data),
ResponseBody::Streaming(chunks) => {
let stream = Box::pin(
futures::stream::iter(chunks.into_iter())
.map(|chunk| Ok::<Bytes, Error>(Bytes::from(chunk)))
) as Pin<Box<dyn Stream<Item = Result<Bytes, Error>>>>;
response_builder.streaming(stream)
}
}
}
}

Expand All @@ -44,7 +101,7 @@ impl Response {
status_code: 404,
response_type: "text".to_string(),
headers,
description: "Not found".to_owned().into_bytes(),
body: ResponseBody::Static("Not found".to_owned().into_bytes()),
file_path: None,
}
}
Expand All @@ -59,7 +116,7 @@ impl Response {
status_code: 500,
response_type: "text".to_string(),
headers,
description: "Internal server error".to_owned().into_bytes(),
body: ResponseBody::Static("Internal server error".to_owned().into_bytes()),
file_path: None,
}
}
Expand All @@ -68,11 +125,21 @@ impl Response {
impl ToPyObject for Response {
fn to_object(&self, py: Python) -> PyObject {
let headers = self.headers.clone().into_py(py).extract(py).unwrap();
// The description should only be either string or binary.
// it should raise an exception otherwise
let description = match String::from_utf8(self.description.to_vec()) {
Ok(description) => description.to_object(py),
Err(_) => PyBytes::new(py, &self.description.to_vec()).into(),

let description = match &self.body {
ResponseBody::Static(data) => {
match String::from_utf8(data.to_vec()) {
Ok(description) => description.to_object(py),
Err(_) => PyBytes::new(py, data).into(),
}
},
ResponseBody::Streaming(chunks) => {
let list = PyList::empty(py);
for chunk in chunks {
list.append(PyBytes::new(py, chunk)).unwrap();
}
list.to_object(py)
}
};

let response = PyResponse {
Expand Down Expand Up @@ -111,15 +178,22 @@ impl PyResponse {
headers: &PyAny,
description: Py<PyAny>,
) -> PyResult<Self> {
check_body_type(py, &description)?;
// Check if description is an iterator/generator
let is_stream = Python::with_gil(|py| {
description.as_ref(py).iter().is_ok()
});

if is_stream {
// For streaming responses, we don't need to check body type
// as we'll validate each chunk when it's yielded
} else {
check_body_type(py, &description)?;
}

let headers_output: Py<Headers> = if let Ok(headers_dict) = headers.downcast::<PyDict>() {
// Here you'd have logic to create a Headers instance from a PyDict
// For simplicity, let's assume you have a method `from_dict` on Headers for this
let headers = Headers::new(Some(headers_dict)); // Hypothetical method
let headers = Headers::new(Some(headers_dict));
Py::new(py, headers)?
} else if let Ok(headers) = headers.extract::<Py<Headers>>() {
// If it's already a Py<Headers>, use it directly
headers
} else {
return Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(
Expand All @@ -129,8 +203,7 @@ impl PyResponse {

Ok(Self {
status_code,
// we should be handling based on headers but works for now
response_type: "text".to_string(),
response_type: if is_stream { "stream".to_string() } else { "text".to_string() },
headers: headers_output,
description,
file_path: None,
Expand All @@ -139,7 +212,16 @@ impl PyResponse {

#[setter]
pub fn set_description(&mut self, py: Python, description: Py<PyAny>) -> PyResult<()> {
check_description_type(py, &description)?;
// Check if description is an iterator/generator
let is_stream = description.as_ref(py).iter().is_ok();

if is_stream {
self.response_type = "stream".to_string();
} else {
check_description_type(py, &description)?;
self.response_type = "text".to_string();
}

self.description = description;
Ok(())
}
Expand Down

0 comments on commit 962c683

Please sign in to comment.