Skip to content

Commit

Permalink
Fix the ffi to we don't hang on join server handles (#55)
Browse files Browse the repository at this point in the history
* Fix the ffi to we don't hang on join server handles

* typo
  • Loading branch information
npmenard committed Sep 29, 2022
1 parent 5bf4904 commit 66dc984
Showing 1 changed file with 47 additions and 40 deletions.
87 changes: 47 additions & 40 deletions src/ffi/dial_ffi.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
//! # Viam C API
//!
//! This module exposes a C API allowing a user to communicate with a Robot using any language able to call C functions without having
//! to implement webRTC or authentication. The module creates a UDS socket that a gRPC client can connect to
//!

use http::uri::Uri;
use std::ptr;
use std::{ptr, time::Duration};
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::Level;

use crate::rpc::dial::{
Expand All @@ -23,29 +28,28 @@ use anyhow::Result;

use crate::proxy::grpc_proxy::GRPCProxy;

/// The Ffi interface, returned as a pointer by init_rust_runtime. User should keep this pointer until freeing the runtime.
pub struct Ffi {
runtime: Runtime,
jhs: Option<Vec<JoinHandle<()>>>,
runtime: Option<Runtime>,
sigs: Option<Vec<oneshot::Sender<()>>>,
}

impl Drop for Ffi {
fn drop(&mut self) {
log::debug!("FFI runtime closing");
if let Some(r) = self.runtime.take() {
r.shutdown_timeout(Duration::from_secs(1));
}
}
}

impl Ffi {
fn new() -> Self {
Self {
runtime: Runtime::new().unwrap(),
jhs: None,
runtime: Some(Runtime::new().unwrap()),
sigs: None,
}
}
fn push_handle(&mut self, jh: JoinHandle<()>) {
match self.jhs {
Some(ref mut v) => v.push(jh),
None => {
let v: Vec<JoinHandle<()>> = vec![jh];
self.jhs = Some(v);
}
}
}
fn push_signal(&mut self, sig: oneshot::Sender<()>) {
match self.sigs {
Some(ref mut v) => v.push(sig),
Expand All @@ -56,7 +60,8 @@ impl Ffi {
}
}
}

/// Initialize a tokio runtime to run a gRPC client/sever, user should call this function before trying to dial to a Robot
/// Returns a pointer to a [`Ffi`]
#[no_mangle]
pub extern "C" fn init_rust_runtime() -> Box<Ffi> {
tracing_subscriber::fmt::init();
Expand Down Expand Up @@ -97,9 +102,11 @@ fn dial_with_cred(
/// Returns a path to a UDS proxy to a robot
/// # Safety
///
/// This function should be called from another language. See rpc::dial for dial from rust
/// This function must be called from another language. See [`dial`](mod@crate::rpc::dial) for dial from rust
/// The function returns a path to a UDS as a [`c_char`], the string should be freed with free_string when not needed anymore.
/// When falling to dial it will return a NULL pointer
/// # Arguments
/// * `c_uri` a C-style string representing the robot your are proxiying to
/// * `c_uri` a C-style string representing the address of robot you want to connect to
/// * `c_payload` a C-style string that is the robot's secret, set to NULL if you don't need authentication
/// * `c_allow_insecure` a bool, set to true when allowing insecure connection to your robot
/// * `rt_ptr` a pointer to a rust runtime previously obtained with init_rust_runtime
Expand Down Expand Up @@ -136,10 +143,13 @@ pub unsafe extern "C" fn dial(
return ptr::null_mut();
}
};
let conn = match ctx
.runtime
.block_on(async { proxy::uds::UDSConnector::new_random() })
{
let runtime = match &ctx.runtime {
Some(r) => r,
None => {
return ptr::null_mut();
}
};
let conn = match runtime.block_on(async { proxy::uds::UDSConnector::new_random() }) {
Ok(conn) => conn,
Err(e) => {
println!("Error creating the UDS proxy {e:?}");
Expand All @@ -157,7 +167,7 @@ pub unsafe extern "C" fn dial(
let uri_str = uri.to_string();
// if the uri is local then we can connect directly.
let disable_webrtc = uri_str.contains(".local");
let server = match ctx.runtime.block_on(async move {
let server = match runtime.block_on(async move {
let dial = match payload {
Some(p) => tower::util::Either::A(
dial_with_cred(uri_str, p.to_str()?, allow_insec, disable_webrtc)?
Expand Down Expand Up @@ -196,20 +206,19 @@ pub unsafe extern "C" fn dial(
let server = server.with_graceful_shutdown(async {
rx.await.ok();
});
ctx.push_signal(tx);
let h = ctx.runtime.spawn(async {
let _ = runtime.spawn(async {
let _ = server.await;
});
ctx.push_handle(h);
ctx.push_signal(tx);
path.into_raw()
}

/// This function must be used to free the path returned by the dial_direct function
/// This function must be used to free the path returned by the [`dial`] function
/// # Safety
///
/// This function must be use from outside a rust context
/// The function must not be called more than once with the same pointer
/// # Arguments
/// * `c_char` a pointer to the string returned by dial_direct
/// * `c_char` a pointer to the string returned by [`dial`]
#[no_mangle]
pub unsafe extern "C" fn free_string(s: *mut c_char) {
if s.is_null() {
Expand All @@ -218,30 +227,28 @@ pub unsafe extern "C" fn free_string(s: *mut c_char) {
let _ = CString::from_raw(s);
}

/// This function must be used the free a rust runtime returned by [`init_rust_runtime`] the function will signal any
/// opened server to shutdown. Further transaction on any UDS will not work anymore.
/// # Safety
///
/// The function must not be called more than once with the same pointer
/// # Arguments
/// * `rt_prt` a pointer to the string returned by [`init_rust_runtime`]
#[no_mangle]
pub extern "C" fn free_rust_runtime(rt_ptr: Option<Box<Ffi>>) -> i32 {
let ctx = match rt_ptr {
let mut ctx = match rt_ptr {
Some(ctx) => ctx,
None => {
return -1;
}
};
match ctx.sigs {
match ctx.sigs.take() {
Some(sigs) => {
for sig in sigs {
let _ = sig.send(());
}
}
None => {}
}

match ctx.jhs {
Some(jhs) => {
for js in jhs {
let _ = ctx.runtime.block_on(async move { js.await });
}
}
None => {}
}
0
}

0 comments on commit 66dc984

Please sign in to comment.