Skip to content

Commit

Permalink
laying basework for yields
Browse files Browse the repository at this point in the history
  • Loading branch information
iiian committed Apr 12, 2024
1 parent 22233be commit 00b7f2b
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 78 deletions.
25 changes: 22 additions & 3 deletions integration_tests/base_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

app = Robyn(__file__)
websocket = WebSocket(app, "/web_socket")


wsy = WebSocket(app, "/wsy")
wsya = WebSocket(app, "/wsya")
# Creating a new WebSocket app to test json handling + to serve an example to future users of this lib
# while the original "raw" web_socket is used with benchmark tests
websocket_json = WebSocket(app, "/web_socket_json")
Expand All @@ -45,6 +45,18 @@
websocket_state = defaultdict(int)


@wsy.on("message")
async def echo():
yield "hello"
yield "world"


@wsya.on("message")
async def echo():
yield "hello"
yield "world"


@websocket_json.on("message")
async def jsonws_message(ws, msg: str) -> str:
websocket_id = ws.id
Expand Down Expand Up @@ -152,21 +164,28 @@ def sync_global_middlewares(request: Request):

# --- Route specific ---

vals = [1,2,3]
vals = [1, 2, 3]


def yeidler():
while True:
for val in vals:
yield val
yield val


yieldster = yeidler()
glob = 0


@app.get("/yield")
def gen_yield():
global yieldster
global glob
glob += 1
return f"{next(yieldster)}+{glob} "


@app.before_request("/sync/middlewares")
def sync_before_request(request: Request):
request.headers.set("before", "sync_before_request")
Expand Down
16 changes: 16 additions & 0 deletions robyn/robyn.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ class HttpMethod(Enum):
TRACE: str
CONNECT: str

class FunctionType(Enum):
"""
The function types supported by Python.
Attributes:
SYNC: str
ASYNC: str
SYNCGENERATOR: str
ASYNCGENERATOR: str
"""

SYNC: str
ASYNC: str
SYNCGENERATOR: str
ASYNCGENERATOR: str

@dataclass
class FunctionInfo:
"""
Expand Down
13 changes: 7 additions & 6 deletions robyn/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from robyn.robyn import (
FunctionInfo,
FunctionType,
Headers,
HttpMethod,
MiddlewareType,
Expand Down Expand Up @@ -163,26 +164,26 @@ def inner_handler(*args, **kwargs):
else:
_logger.debug(f"Dependency {dependency} is not used in the handler {handler.__name__}")

if iscoroutinefunction(handler):
if inspect.isasyncgenfunction(handler):
function = FunctionInfo(
async_inner_handler,
True,
FunctionType.ASYNCGENERATOR,
number_of_params,
params,
new_injected_dependencies,
)
self.routes.append(Route(route_type, endpoint, function, is_const))
return async_inner_handler
else:
if iscoroutinefunction(handler):
function = FunctionInfo(
inner_handler,
False,
async_inner_handler,
FunctionType.ASYNC,
number_of_params,
params,
new_injected_dependencies,
)
self.routes.append(Route(route_type, endpoint, function, is_const))
return inner_handler
return async_inner_handler

def get_routes(self) -> List[Route]:
return self.routes
Expand Down
20 changes: 20 additions & 0 deletions robyn/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import inspect
from robyn.robyn import FunctionType

def get_function_type(handler):
# as this is the most likely case, short circuit for it
is_sync = (
not asyncio.iscoroutinefunction(handler) and
not inspect.isgeneratorfunction(handler) and
not inpsect.isasyncgenfunction(handler)
)
if is_sync:
return FunctionType.SYNC
if inspect.iscoroutinefunction(handler): return FunctionType.ASYNC
if inspect.isasyncgenfunction(handler): return FunctionType.ASYNCGENERATOR
if inspect.isgeneratorfunction(handler): return FunctionType.SYNCGENERATOR


class FunctionInfoFactory:
"""Used for constructing FunctionInfo and wrapper"""
def create_route(handler):
107 changes: 60 additions & 47 deletions src/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use pyo3::prelude::*;
use pyo3_asyncio::TaskLocals;

use crate::types::{
function_info::FunctionInfo, request::Request, response::Response, MiddlewareReturn,
function_info::{FunctionType, FunctionInfo}, request::Request, response::Response, MiddlewareReturn,
};

#[inline]
Expand Down Expand Up @@ -68,27 +68,32 @@ pub async fn execute_middleware_function<T>(
where
T: for<'a> FromPyObject<'a> + ToPyObject,
{
if function.is_async {
let output: Py<PyAny> = Python::with_gil(|py| {
pyo3_asyncio::tokio::into_future(get_function_output(function, py, input)?)
})?
.await?;

Python::with_gil(|py| -> Result<MiddlewareReturn> {
let output_response = output.extract::<Response>(py);
match output_response {
Ok(o) => Ok(MiddlewareReturn::Response(o)),
Err(_) => Ok(MiddlewareReturn::Request(output.extract::<Request>(py)?)),
}
})
} else {
Python::with_gil(|py| -> Result<MiddlewareReturn> {
let output = get_function_output(function, py, input)?;
match output.extract::<Response>() {
Ok(o) => Ok(MiddlewareReturn::Response(o)),
Err(_) => Ok(MiddlewareReturn::Request(output.extract::<Request>()?)),
}
})
match function.ftype {
FunctionType::Sync => {
Python::with_gil(|py| -> Result<MiddlewareReturn> {
let output = get_function_output(function, py, input)?;
match output.extract::<Response>() {
Ok(o) => Ok(MiddlewareReturn::Response(o)),
Err(_) => Ok(MiddlewareReturn::Request(output.extract::<Request>()?)),
}
})
},
FunctionType::Async => {
let output: Py<PyAny> = Python::with_gil(|py| {
pyo3_asyncio::tokio::into_future(get_function_output(function, py, input)?)
})?
.await?;

Python::with_gil(|py| -> Result<MiddlewareReturn> {
let output_response = output.extract::<Response>(py);
match output_response {
Ok(o) => Ok(MiddlewareReturn::Response(o)),
Err(_) => Ok(MiddlewareReturn::Request(output.extract::<Request>(py)?)),
}
})
},
FunctionType::SyncGenerator => todo!(),
FunctionType::AsyncGenerator => todo!(),
}
}

Expand All @@ -97,38 +102,46 @@ pub async fn execute_http_function(
request: &Request,
function: &FunctionInfo,
) -> PyResult<Response> {
if function.is_async {
let output = Python::with_gil(|py| {
let function_output = get_function_output(function, py, request)?;
pyo3_asyncio::tokio::into_future(function_output)
})?
.await?;

return Python::with_gil(|py| -> PyResult<Response> { output.extract(py) });
};

Python::with_gil(|py| -> PyResult<Response> {
get_function_output(function, py, request)?.extract()
})
match function.ftype {
FunctionType::Sync => Python::with_gil(|py| -> PyResult<Response> {
get_function_output(function, py, request)?.extract()
}),
FunctionType::Async => {
let output = Python::with_gil(|py| {
let function_output = get_function_output(function, py, request)?;
pyo3_asyncio::tokio::into_future(function_output)
})?
.await?;

return Python::with_gil(|py| -> PyResult<Response> { output.extract(py) });
},
FunctionType::SyncGenerator => todo!(),
FunctionType::AsyncGenerator => todo!(),
}
}

pub async fn execute_event_handler(
event_handler: Option<Arc<FunctionInfo>>,
task_locals: &TaskLocals,
) -> Result<()> {
if let Some(function) = event_handler {
if function.is_async {
debug!("Startup event handler async");
Python::with_gil(|py| {
pyo3_asyncio::into_future_with_locals(
task_locals,
function.handler.as_ref(py).call0()?,
)
})?
.await?;
} else {
debug!("Startup event handler");
Python::with_gil(|py| function.handler.call0(py))?;
match function.ftype {
FunctionType::Sync => {
debug!("Startup event handler");
Python::with_gil(|py| function.handler.call0(py))?;
},
FunctionType::Async => {
debug!("Startup event handler async");
Python::with_gil(|py| {
pyo3_asyncio::into_future_with_locals(
task_locals,
function.handler.as_ref(py).call0()?,
)
})?
.await?;
},
FunctionType::SyncGenerator => todo!(),
FunctionType::AsyncGenerator => todo!(),
}
}
Ok(())
Expand Down
39 changes: 21 additions & 18 deletions src/executors/web_socket_executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,33 @@ pub fn execute_ws_function(
ws: &WebSocketConnector,
// add number of params here
) {
if function.is_async {
let fut = Python::with_gil(|py| {
pyo3_asyncio::into_future_with_locals(
task_locals,
get_function_output(function, text, py, ws).unwrap(),
)
.unwrap()
});
let f = async {
let output = fut.await.unwrap();
Python::with_gil(|py| output.extract::<&str>(py).unwrap().to_string())
}
.into_actor(ws)
.map(|res, _, ctx| ctx.text(res));
ctx.spawn(f);
} else {
Python::with_gil(|py| {
match function.ftype {
crate::types::function_info::FunctionType::Sync => Python::with_gil(|py| {
if let Some(op) = get_function_output(function, text, py, ws)
.unwrap()
.extract::<Option<&str>>()
.unwrap()
{
ctx.text(op);
}
});
}),
crate::types::function_info::FunctionType::Async => {
let fut = Python::with_gil(|py| {
pyo3_asyncio::into_future_with_locals(
task_locals,
get_function_output(function, text, py, ws).unwrap(),
)
.unwrap()
});
let f = async {
let output = fut.await.unwrap();
Python::with_gil(|py| output.extract::<&str>(py).unwrap().to_string())
}
.into_actor(ws)
.map(|res, _, ctx| ctx.text(res));
ctx.spawn(f);
},
crate::types::function_info::FunctionType::SyncGenerator => todo!(),
crate::types::function_info::FunctionType::AsyncGenerator => todo!(),
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use shared_socket::SocketHeld;
// pyO3 module
use pyo3::prelude::*;
use types::{
function_info::{FunctionInfo, MiddlewareType},
function_info::{FunctionInfo, FunctionType, MiddlewareType},
headers::Headers,
identity::Identity,
multimap::QueryParams,
Expand All @@ -39,6 +39,7 @@ pub fn robyn(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<WebSocketConnector>()?;
m.add_class::<SocketHeld>()?;
m.add_class::<FunctionInfo>()?;
m.add_class::<FunctionType>()?;
m.add_class::<Identity>()?;
m.add_class::<PyRequest>()?;
m.add_class::<PyResponse>()?;
Expand Down
Loading

0 comments on commit 00b7f2b

Please sign in to comment.