Skip to content

Commit

Permalink
Merge pull request #12 from crocs-muni/devel
Browse files Browse the repository at this point in the history
Version 0.1.1 changes
  • Loading branch information
dufkan authored Oct 9, 2022
2 parents 8a8398d + ed0dc47 commit 37c6994
Show file tree
Hide file tree
Showing 16 changed files with 612 additions and 262 deletions.
47 changes: 29 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
[package]
name = "meesign-server"
version = "0.1.0"
version = "0.1.1"
edition = "2018"

[dependencies]
tonic = "0.6"
prost = "0.9"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "time"] }
tokio-stream = "0.1.10"
uuid = { version = "1.0.0-alpha.1", features = ["v4", "fast-rng"] }
log = "0.4.16"
env_logger = "0.9.0"
hex = "0.4.3"
clap = { version = "3.1.8", features = ["derive"] }
rand = "0.8.5"
tempfile = "3.3.0"

[build-dependencies]
tonic-build = "0.6"
22 changes: 18 additions & 4 deletions proto/mpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";
package meesign;

service MPC {
rpc GetServerInfo(ServerInfoRequest) returns (ServerInfo);
rpc Register(RegistrationRequest) returns (Resp);
rpc Sign(SignRequest) returns (Task);
rpc Group(GroupRequest) returns (Task);
Expand All @@ -13,6 +14,13 @@ service MPC {
rpc GetGroups(GroupsRequest) returns (Groups);
rpc GetDevices(DevicesRequest) returns (Devices);
rpc Log(LogRequest) returns (Resp);
rpc SubscribeUpdates(SubscribeRequest) returns (stream Task);
}

message ServerInfoRequest {}

message ServerInfo {
string version = 1;
}

enum ProtocolType {
Expand Down Expand Up @@ -86,16 +94,18 @@ message Task {
}
TaskState state = 3;
uint32 round = 4;
uint32 accept = 5; // Number of task accepts
uint32 reject = 6; // Number of task rejects
optional bytes data = 7; // If present, the task is waiting for recipient's action
optional bytes request = 8; // Serialized SignRequest or TaskRequest; present only when queried directly
uint32 attempt = 5;
uint32 accept = 6; // Number of task accepts
uint32 reject = 7; // Number of task rejects
optional bytes data = 8; // If present, the task is waiting for recipient's action
optional bytes request = 9; // Serialized SignRequest or TaskRequest; present only when queried directly
}

message TaskUpdate {
bytes device_id = 1;
bytes task = 2;
bytes data = 3;
uint32 attempt = 4;
}

message TasksRequest {
Expand Down Expand Up @@ -134,6 +144,10 @@ message LogRequest {
optional bytes device_id = 2;
};

message SubscribeRequest {
bytes device_id = 1;
};

message GG18KeyGenInit {
uint32 index = 1;
uint32 parties = 2;
Expand Down
67 changes: 49 additions & 18 deletions src/communicator.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
use crate::device::Device;
use crate::get_timestamp;
use crate::proto::Gg18Message;
use prost::Message;
use rand::prelude::SliceRandom;
use rand::thread_rng;
use std::collections::HashMap;
use std::ops::Deref;
use tonic::codegen::Arc;

/// Communication state of a Task
pub struct Communicator {
/// The minimal number of parties needed to successfully complete the task
threshold: u32,
/// Ordered list of devices
device_list: Vec<Device>,
device_list: Vec<Arc<Device>>,
/// Ordered list of active devices (participating in the protocol)
active_devices: Option<Vec<Vec<u8>>>,
/// A mapping of device identifiers to their Task decision
Expand All @@ -27,15 +32,15 @@ impl Communicator {
/// # Arguments
/// * `devices` - Sorted list of devices; items of the list need to be unique
/// * `threshold` - The minimal number of devices to successfully complete the task
pub fn new(devices: &[Device], threshold: u32) -> Self {
pub fn new(devices: &[Arc<Device>], threshold: u32) -> Self {
assert!(devices.len() > 1);
assert!(threshold <= devices.len() as u32);
// TODO uncomment once is_sorted is stabilized
// assert!(devices.is_sorted());

let mut communicator = Communicator {
threshold,
device_list: devices.iter().map(Device::clone).collect(),
device_list: devices.iter().map(Arc::clone).collect(),
active_devices: None,
decisions: devices
.iter()
Expand Down Expand Up @@ -90,7 +95,7 @@ impl Communicator {

self.input
.get(device_index)
.and_then(|messages| Some(!messages.iter().any(Option::is_some)))
.map(|messages| !messages.iter().any(Option::is_some))
.unwrap_or(true)
}

Expand Down Expand Up @@ -136,7 +141,7 @@ impl Communicator {
.as_ref()
.unwrap()
.iter()
.zip((&self.input).into_iter())
.zip((&self.input).iter())
.filter(|(_a, b)| b.iter().all(Option::is_none))
.count()
== 0
Expand All @@ -157,14 +162,44 @@ impl Communicator {
/// Set active devices
pub fn set_active_devices(&mut self) -> Vec<Vec<u8>> {
assert!(self.accept_count() >= self.threshold);
let agreeing_devices = self
.device_list
.iter()
.filter(|device| self.decisions.get(device.identifier()) == Some(&Some(true)))
.collect::<Vec<_>>();

let timestamp = get_timestamp();
let connected_devices = agreeing_devices
.iter()
.filter(|device| device.last_active() > timestamp - 5)
.map(Deref::deref)
.collect::<Vec<_>>();

let (devices, indices): (&Vec<&Arc<Device>>, Vec<_>) =
if connected_devices.len() >= self.threshold as usize {
(&connected_devices, (0..connected_devices.len()).collect())
} else {
(&agreeing_devices, (0..agreeing_devices.len()).collect())
};
let mut indices = indices
.choose_multiple(&mut thread_rng(), self.threshold as usize)
.cloned()
.collect::<Vec<_>>();
indices.sort();

self.active_devices = Some(
self.device_list
devices
.iter()
.filter(|device| self.decisions.get(device.identifier()) == Some(&Some(true)))
.take(self.threshold as usize)
.map(|device| device.identifier().to_vec())
.enumerate()
.filter(|(idx, _)| indices.contains(idx))
.map(|(_, device)| device.identifier().to_vec())
.collect(),
);
assert_eq!(
self.active_devices.as_ref().unwrap().len(),
self.threshold as usize
);

self.active_devices.as_ref().unwrap().clone()
}

Expand Down Expand Up @@ -200,11 +235,7 @@ impl Communicator {

/// Check whether a device submitted its decision
pub fn device_decided(&self, device: &[u8]) -> bool {
if let Some(Some(_)) = self.decisions.get(device) {
true
} else {
false
}
matches!(self.decisions.get(device), Some(Some(_)))
}

/// Save acknowledgement by the given device; return true if successful
Expand All @@ -229,11 +260,11 @@ impl Communicator {

let active_devices = self.get_active_devices().unwrap();
let mut indices: Vec<u32> = Vec::new();
for i in 0..active_devices.len() {
for device in &active_devices {
indices.push(
self.device_list
.iter()
.position(|x| x.identifier() == &active_devices[i])
.position(|x| x.identifier() == device)
.unwrap() as u32,
);
}
Expand Down Expand Up @@ -473,10 +504,10 @@ mod tests {
assert_eq!(communicator.acknowledge(devices[0].identifier()), false);
}

fn prepare_devices(n: usize) -> Vec<Device> {
fn prepare_devices(n: usize) -> Vec<Arc<Device>> {
assert!(n < u8::MAX as usize);
(0..n)
.map(|i| Device::new(vec![i as u8], format!("d{}", i)))
.map(|i| Arc::new(Device::new(vec![i as u8], format!("d{}", i))))
.collect()
}
}
Loading

0 comments on commit 37c6994

Please sign in to comment.