Skip to content

Commit

Permalink
feat: start on a built-in API client: xs cat, xs append, xs serve (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Sep 15, 2024
1 parent aafa84c commit 32f120c
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target
store
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
[![Discord](https://img.shields.io/discord/1182364431435436042?logo=discord)](https://discord.com/invite/YNbScHBHrh)

```
Status: WIP [████████████.... 60%] ... (until a v0.1.0 release)
Status: WIP [██████████████.. 70%] ... (until a v0.1.0 release)
```

## Overview / Sketch
Expand Down
21 changes: 13 additions & 8 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use tokio_util::compat::{FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt};

use http_body_util::StreamBody;
use http_body_util::{combinators::BoxBody, BodyExt, Empty, Full};
use hyper::body::Body;
use hyper::body::Bytes;
use hyper::server::conn::http1;
use hyper::service::service_fn;
Expand Down Expand Up @@ -146,17 +145,23 @@ async fn handle_stream_append(
) -> HTTPResult {
let (parts, mut body) = req.into_parts();

let hash = if body.is_end_stream() {
None
} else {
let hash = {
let writer = store.cas_writer().await?;
let mut writer = writer.compat_write();
let mut bytes_written = 0;

while let Some(frame) = body.frame().await {
let data = frame?.into_data().unwrap();
writer.write_all(&data).await?;
if let Ok(data) = frame?.into_data() {
writer.write_all(&data).await?;
bytes_written += data.len();
}
}

if bytes_written > 0 {
Some(writer.into_inner().commit().await?)
} else {
None
}
let writer = writer.into_inner();
Some(writer.commit().await?)
};

let meta = match parts
Expand Down
151 changes: 151 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
use serde_json::Value;

use futures::StreamExt;

use tokio::io::AsyncRead;
use tokio::net::{TcpStream, UnixStream};
use tokio::sync::mpsc::Receiver;

use http_body_util::{combinators::BoxBody, BodyExt, Empty};
use hyper::body::Bytes;
use hyper::client::conn::http1;
use hyper::{Method, Request, StatusCode};
use hyper_util::rt::TokioIo;

use crate::listener::AsyncReadWriteBox;

type BoxError = Box<dyn std::error::Error + Send + Sync>;

async fn connect(addr: &str) -> Result<AsyncReadWriteBox, BoxError> {
if addr.starts_with('/') || addr.starts_with('.') {
let path = std::path::Path::new(addr);
let addr = if path.is_dir() {
path.join("sock")
} else {
path.to_path_buf()
};
let stream = UnixStream::connect(addr).await?;
Ok(Box::new(stream))
} else {
let addr = if addr.starts_with(':') {
format!("127.0.0.1{}", addr)
} else {
addr.to_string()
};
let stream = TcpStream::connect(addr).await?;
Ok(Box::new(stream))
}
}

fn empty() -> BoxBody<Bytes, BoxError> {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

pub async fn cat(addr: &str, follow: bool) -> Result<Receiver<Bytes>, BoxError> {
let stream = connect(addr).await?;
let io = TokioIo::new(stream);

let (mut sender, conn) = http1::handshake(io).await?;

tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("Connection error: {}", e);
}
});

let uri = if follow {
"http://localhost/?follow=true"
} else {
"http://localhost/"
};

let req = Request::builder()
.method(Method::GET)
.uri(uri)
.body(empty())?;

let res = sender.send_request(req).await?;

if res.status() != StatusCode::OK {
return Err(format!("HTTP error: {}", res.status()).into());
}

let (_parts, mut body) = res.into_parts();

let (tx, rx) = tokio::sync::mpsc::channel(100);

tokio::spawn(async move {
while let Some(frame_result) = body.frame().await {
match frame_result {
Ok(frame) => {
if let Ok(bytes) = frame.into_data() {
if tx.send(bytes).await.is_err() {
break;
}
}
// Ignore non-data frames
}
Err(e) => {
eprintln!("Error reading body: {}", e);
break;
}
}
}
});

Ok(rx)
}

use http_body_util::StreamBody;
use tokio_util::io::ReaderStream;

pub async fn append<R>(
addr: &str,
topic: &str,
data: R,
meta: Option<&Value>,
) -> Result<Bytes, BoxError>
where
R: AsyncRead + Unpin + Send + 'static,
{
let stream = connect(addr).await?;
let io = TokioIo::new(stream);
let (mut sender, conn) = http1::handshake(io).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("Connection error: {}", e);
}
});

let uri = format!("http://localhost/{}", topic);
let mut req = Request::builder().method(Method::POST).uri(uri);

if let Some(meta_value) = meta {
req = req.header("xs-meta", serde_json::to_string(meta_value)?);
}

// Create a stream from the AsyncRead
let reader_stream = ReaderStream::new(data);

// Map the stream to convert io::Error to BoxError
let mapped_stream = reader_stream.map(|result| {
result
.map(hyper::body::Frame::data)
.map_err(|e| Box::new(e) as BoxError)
});

// Create a StreamBody
let body = StreamBody::new(mapped_stream);

let req = req.body(body)?;
let res = sender.send_request(req).await?;

if res.status() != StatusCode::OK {
return Err(format!("HTTP error: {}", res.status()).into());
}

let body = res.collect().await?.to_bytes();
Ok(body)
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod api;
pub mod client;
pub mod error;
pub mod handlers;
pub mod http;
Expand Down
99 changes: 94 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::path::PathBuf;

use clap::Parser;
use clap::{Parser, Subcommand};

use tokio::io::AsyncWriteExt;

use xs::nu;
use xs::store::Store;
Expand All @@ -9,24 +11,77 @@ use xs::thread_pool::ThreadPool;
#[derive(Parser, Debug)]
#[clap(version)]
struct Args {
#[clap(subcommand)]
command: Command,
}

#[derive(Subcommand, Debug)]
enum Command {
/// Provides an API to interact with a local store
Serve(CommandServe),
/// `cat` the event stream
Cat(CommandCat),
/// Append an event to the stream
Append(CommandAppend),
}

#[derive(Parser, Debug)]
struct CommandServe {
/// Path to the store
#[clap(value_parser)]
path: PathBuf,

/// Overrides the default address the API listens on. Default is a Unix domain socket 'sock' in
/// the store path
/// Overrides the default address the API listens on.
/// Default is a Unix domain socket 'sock' in the store path.
/// Address to listen on [HOST]:PORT or <PATH> for Unix domain socket
#[clap(long, value_parser, value_name = "LISTEN_ADDR")]
api: Option<String>,

/// Enables a HTTP endpoint. Address to listen on [HOST]:PORT or <PATH> for Unix domain socket
/// Enables a HTTP endpoint.
/// Address to listen on [HOST]:PORT or <PATH> for Unix domain socket
#[clap(long, value_parser, value_name = "LISTEN_ADDR")]
http: Option<String>,
}

#[derive(Parser, Debug)]
struct CommandCat {
/// Address to connect to [HOST]:PORT or <PATH> for Unix domain socket
#[clap(value_parser)]
addr: String,

/// Follow the stream for new data
#[clap(long)]
follow: bool,
}

#[derive(Parser, Debug)]
struct CommandAppend {
/// Address to connect to [HOST]:PORT or <PATH> for Unix domain socket
#[clap(value_parser)]
addr: String,

/// Topic to append to
#[clap(value_parser)]
topic: String,

/// JSON metadata to include with the append
#[clap(long, value_parser)]
meta: Option<String>,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
match args.command {
Command::Serve(args) => serve(args).await,
Command::Cat(args) => cat(args).await,
Command::Append(args) => append(args).await,
}
}

async fn serve(args: CommandServe) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
xs::trace::init();

let args = Args::parse();
let store = Store::spawn(args.path).await;
let pool = ThreadPool::new(10);
let engine = nu::Engine::new(store.clone())?;
Expand Down Expand Up @@ -71,3 +126,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {

Ok(())
}

async fn cat(args: CommandCat) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut receiver = xs::client::cat(&args.addr, args.follow).await?;
let mut stdout = tokio::io::stdout();
while let Some(bytes) = receiver.recv().await {
stdout.write_all(&bytes).await?;
stdout.flush().await?;
}
Ok(())
}

use std::io::IsTerminal;
use tokio::io::stdin;
use tokio::io::AsyncRead;

async fn append(args: CommandAppend) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let meta = args
.meta
.map(|meta_str| serde_json::from_str(&meta_str))
.transpose()?;

let input = if !std::io::stdin().is_terminal() {
// Stdin is a pipe, use it as input
Box::new(stdin()) as Box<dyn AsyncRead + Unpin + Send>
} else {
// Stdin is not a pipe, use an empty reader
Box::new(tokio::io::empty()) as Box<dyn AsyncRead + Unpin + Send>
};

let response = xs::client::append(&args.addr, &args.topic, input, meta.as_ref()).await?;

tokio::io::stdout().write_all(&response).await?;
Ok(())
}
1 change: 1 addition & 0 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async fn test_integration() {
let temp_dir = TempDir::new().expect("Failed to create temp dir");

let mut cli_process = Command::new(cargo_bin("xs"))
.arg("serve")
.arg(temp_dir.path())
.spawn()
.expect("Failed to start CLI binary");
Expand Down

0 comments on commit 32f120c

Please sign in to comment.