Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: move away from ractor #23

Merged
merged 3 commits into from
Apr 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 2 additions & 384 deletions Cargo.lock

Large diffs are not rendered by default.

162 changes: 86 additions & 76 deletions integration-tests/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use bollard::network::CreateNetworkOptions;
use bollard::service::{HostConfig, Ipam, PortBinding};
use bollard::Docker;
use futures::StreamExt;
use hyper::{body::HttpBody, Body, Client, Method, Request};
use hyper::{Body, Client, Method, Request};
use mpc_recovery::msg::LeaderResponse;
use rand::{distributions::Alphanumeric, Rng};
use serde_json::json;
use std::collections::HashMap;
use std::convert::TryInto;
use std::time::Duration;
use threshold_crypto::{serde_impl::SerdeSecret, PublicKeySet, SecretKeyShare, Signature};
use threshold_crypto::{serde_impl::SerdeSecret, PublicKeySet, SecretKeyShare};
use tokio::io::AsyncWriteExt;
use tokio::spawn;

Expand Down Expand Up @@ -45,44 +45,22 @@ async fn continuously_print_docker_output(docker: &Docker, id: &str) -> anyhow::

async fn start_mpc_node(
docker: &Docker,
node_id: u64,
pk_set: &PublicKeySet,
sk_share: &SecretKeyShare,
actor_address: Option<String>,
) -> anyhow::Result<(String, String, String)> {
let actor_port = portpicker::pick_unused_port().expect("no free ports");
let web_port = portpicker::pick_unused_port().expect("no free ports");

let empty = HashMap::<(), ()>::new();
cmd: Vec<String>,
web_port: u16,
expose_web_port: bool,
) -> anyhow::Result<(String, String)> {
let mut exposed_ports = HashMap::new();
exposed_ports.insert(format!("{actor_port}/tcp"), empty.clone());
exposed_ports.insert(format!("{web_port}/tcp"), empty);
let mut port_bindings = HashMap::new();
port_bindings.insert(
format!("{actor_port}/tcp"),
Some(vec![PortBinding {
host_ip: None,
host_port: Some(actor_port.to_string()),
}]),
);
port_bindings.insert(
format!("{web_port}/tcp"),
Some(vec![PortBinding {
host_ip: None,
host_port: Some(web_port.to_string()),
}]),
);

let mut cmd = vec![
"start".to_string(),
node_id.to_string(),
serde_json::to_string(&pk_set)?,
serde_json::to_string(&SerdeSecret(sk_share))?,
actor_port.to_string(),
web_port.to_string(),
];
if let Some(actor_address) = actor_address {
cmd.push(actor_address);
if expose_web_port {
let empty = HashMap::<(), ()>::new();
exposed_ports.insert(format!("{web_port}/tcp"), empty);
port_bindings.insert(
format!("{web_port}/tcp"),
Some(vec![PortBinding {
host_ip: None,
host_port: Some(web_port.to_string()),
}]),
);
}

let mpc_recovery_config = Config {
Expand Down Expand Up @@ -123,11 +101,60 @@ async fn start_mpc_node(
.ip_address
.unwrap();

Ok((
id,
format!("{ip_address}:{actor_port}"),
format!("localhost:{web_port}"),
))
Ok((id, ip_address))
}

async fn start_mpc_leader_node(
docker: &Docker,
node_id: u64,
pk_set: &PublicKeySet,
sk_share: &SecretKeyShare,
sign_nodes: Vec<String>,
) -> anyhow::Result<String> {
let web_port = portpicker::pick_unused_port().expect("no free ports");

let mut cmd = vec![
"start-leader".to_string(),
node_id.to_string(),
"--pk-set".to_string(),
serde_json::to_string(&pk_set)?,
"--sk-share".to_string(),
serde_json::to_string(&SerdeSecret(sk_share))?,
"--web-port".to_string(),
web_port.to_string(),
];
for sign_node in sign_nodes {
cmd.push("--sign-nodes".to_string());
cmd.push(sign_node);
}

start_mpc_node(docker, cmd, web_port, true).await?;
// exposed host address
Ok(format!("http://localhost:{web_port}"))
}

async fn start_mpc_sign_node(
docker: &Docker,
node_id: u64,
pk_set: &PublicKeySet,
sk_share: &SecretKeyShare,
) -> anyhow::Result<String> {
let web_port = portpicker::pick_unused_port().expect("no free ports");

let cmd = vec![
"start-sign".to_string(),
node_id.to_string(),
"--pk-set".to_string(),
serde_json::to_string(&pk_set)?,
"--sk-share".to_string(),
serde_json::to_string(&SerdeSecret(sk_share))?,
"--web-port".to_string(),
web_port.to_string(),
];

let (_, ip_address) = start_mpc_node(docker, cmd, web_port, false).await?;
// internal network address
Ok(format!("http://{ip_address}:{web_port}"))
}

async fn create_network(docker: &Docker) -> anyhow::Result<()> {
Expand Down Expand Up @@ -167,56 +194,39 @@ async fn test_trio() -> anyhow::Result<()> {
// but only instantiates 3 nodes.
let (pk_set, sk_shares) = mpc_recovery::generate(4, 3)?;

let mut actor_addresses = Vec::new();
let mut web_ports = Vec::new();
for (i, sk_share) in sk_shares.into_iter().enumerate().take(3) {
let (_id, actor_address, web_port) = start_mpc_node(
&docker,
(i + 1) as u64,
&pk_set,
&sk_share,
actor_addresses.first().cloned(),
)
.await?;
actor_addresses.push(actor_address);
web_ports.push(web_port);
let mut sign_nodes = Vec::new();
for i in 2..=3 {
let addr = start_mpc_sign_node(&docker, i as u64, &pk_set, &sk_shares[i - 1]).await?;
sign_nodes.push(addr);
}
let leader_node = start_mpc_leader_node(&docker, 1, &pk_set, &sk_shares[0], sign_nodes).await?;

// Wait until all nodes initialize
tokio::time::sleep(Duration::from_millis(2000)).await;

// TODO: only leader node works for now, other nodes struggling to connect to each other
// for some reason.
let web_port = &web_ports[0];
let payload: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect();
let req = Request::builder()
.method(Method::POST)
.uri(format!("http://{}/submit", web_port))
.uri(format!("{}/submit", leader_node))
.header("content-type", "application/json")
.body(Body::from(json!({ "payload": payload }).to_string()))?;

let client = Client::new();
let mut resp = client.request(req).await?;
let response = client.request(req).await?;

assert_eq!(resp.status(), 200);
assert_eq!(response.status(), 200);

let data = resp.body_mut().data().await.expect("no response body")?;
let response_body: String = serde_json::from_slice(&data)?;
let signature_bytes = hex::decode(response_body)?;
let signature_array: [u8; 96] = signature_bytes.as_slice().try_into().map_err(|_e| {
anyhow::anyhow!(
"signature has incorrect length: expected 96 bytes, but got {}",
signature_bytes.len()
)
})?;
let signature = Signature::from_bytes(signature_array)
.map_err(|e| anyhow::anyhow!("malformed signature: {}", e))?;

assert!(pk_set.public_key().verify(&signature, payload));
let data = hyper::body::to_bytes(response).await?;
let response: LeaderResponse = serde_json::from_slice(&data)?;
if let LeaderResponse::Ok { signature } = response {
assert!(pk_set.public_key().verify(&signature, payload));
} else {
panic!("response was not successful");
}

Ok(())
}
3 changes: 1 addition & 2 deletions mpc-recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ axum = "0.6"
clap = { version = "4.2", features = ["derive"] }
futures = "0.3"
hex = "0.4"
ractor = { version = "0.7", features = ["cluster"] }
ractor_cluster = "0.7"
hyper = { version = "0.14", features = ["full"] }
actix-rt = "2.8"
threshold_crypto = "0.4.0"
rand = "0.7"
Expand Down
Loading