Skip to content

Commit

Permalink
feat: add xs cas subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
cablehead committed Sep 19, 2024
1 parent a6e3f60 commit e6b0bea
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 4 deletions.
1 change: 0 additions & 1 deletion .github/workflows/release-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on:
push:
branches:
- main
- feat-ghcr
workflow_dispatch:

env:
Expand Down
47 changes: 46 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde_json::Value;

use futures::StreamExt;

use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio::net::{TcpStream, UnixStream};
use tokio::sync::mpsc::Receiver;

Expand Down Expand Up @@ -178,3 +178,48 @@ where
let body = res.collect().await?.to_bytes();
Ok(body)
}

pub async fn cas_get<W>(
addr: &str,
integrity: ssri::Integrity,
writer: &mut W,
) -> Result<(), BoxError>
where
W: AsyncWrite + Unpin,
{
let stream = connect(addr).await?;
let io = TokioIo::new(stream);

let (mut sender, conn) = http1::handshake(io).await?;

tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("Connection error: {}", e);
}
});

let uri = format!("http://localhost/cas/{}", integrity);
let req = Request::builder()
.method(Method::GET)
.uri(uri)
.body(empty())?;

let res = sender.send_request(req).await?;

if res.status() != StatusCode::OK {
return Err(format!("HTTP error: {}", res.status()).into());
}

let mut body = res.into_body();

while let Some(frame) = body.frame().await {
let frame = frame?;
if let Ok(chunk) = frame.into_data() {
writer.write_all(&chunk).await?;
}
}

writer.flush().await?;

Ok(())
}
23 changes: 23 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::path::PathBuf;
use std::str::FromStr;

use clap::{Parser, Subcommand};

Expand All @@ -23,6 +24,8 @@ enum Command {
Cat(CommandCat),
/// Append an event to the stream
Append(CommandAppend),
/// Retrieve content from Content-Addressable Storage
Cas(CommandCas),
}

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -85,13 +88,25 @@ struct CommandAppend {
meta: Option<String>,
}

#[derive(Parser, Debug)]
struct CommandCas {
/// Address to connect to [HOST]:PORT or <PATH> for Unix domain socket
#[clap(value_parser)]
addr: String,

/// Hash of the content to retrieve
#[clap(value_parser)]
hash: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let args = Args::parse();
match args.command {
Command::Serve(args) => serve(args).await,
Command::Cat(args) => cat(args).await,
Command::Append(args) => append(args).await,
Command::Cas(args) => cas(args).await,
}
}

Expand Down Expand Up @@ -184,3 +199,11 @@ async fn append(args: CommandAppend) -> Result<(), Box<dyn std::error::Error + S
tokio::io::stdout().write_all(&response).await?;
Ok(())
}

async fn cas(args: CommandCas) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let integrity = ssri::Integrity::from_str(&args.hash)?;
let mut stdout = tokio::io::stdout();
xs::client::cas_get(&args.addr, integrity, &mut stdout).await?;
stdout.flush().await?;
Ok(())
}
3 changes: 1 addition & 2 deletions xs.nu
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ export def .cas [hash?: any] {
let alt = $in
let hash = read_hash (if $hash != null { $hash } else { $alt })
if $hash == null { return }
let uri = $"./store/sock//cas/($hash)"
h. get $uri
xs cas (store-addr) $hash
}

export def .get [id: string] {
Expand Down

0 comments on commit e6b0bea

Please sign in to comment.