Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Assign specific jobs to dedicated workers #564

Open
wants to merge 16 commits into
base: develop
Choose a base branch
from
Open
19 changes: 3 additions & 16 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ num-bigint = "0.4.5"
num-traits = "0.2.19"
nunny = "0.2.1"
once_cell = "1.19.0"
paladin-core = "0.4.2"
paladin-core = { git = "https://github.com/0xPolygonZero/paladin.git", branch = "arpit/507-2" }
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
parking_lot = "0.12.3"
paste = "1.0.15"
pest = "2.7.10"
Expand Down Expand Up @@ -139,4 +139,3 @@ trybuild = "1.0"

[workspace.lints.clippy]
too_long_first_doc_paragraph = "allow"

2 changes: 1 addition & 1 deletion zero_bin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Paladin options:
-t, --task-bus-routing-key <TASK_BUS_ROUTING_KEY>
Specifies the routing key for publishing task messages. In most cases, the default value should suffice

[default: task]
[default: ""]
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved

-s, --serializer <SERIALIZER>
Determines the serialization format to be used
Expand Down
9 changes: 6 additions & 3 deletions zero_bin/leader/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub struct ProofParams {

/// The main function for the client.
pub(crate) async fn client_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_runtime: Runtime,
rpc_params: RpcParams,
block_interval: BlockInterval,
mut params: ProofParams,
Expand Down Expand Up @@ -82,13 +83,15 @@ pub(crate) async fn client_main(
// verify the whole sequence.
let proved_blocks = prover::prove(
block_prover_inputs,
&runtime,
&block_proof_runtime,
&segment_runtime,
params.previous_proof.take(),
params.prover_config,
params.proof_output_dir.clone(),
)
.await;
runtime.close().await?;
block_proof_runtime.close().await?;
segment_runtime.close().await?;
let proved_blocks = proved_blocks?;

if params.prover_config.test_only {
Expand Down
27 changes: 20 additions & 7 deletions zero_bin/leader/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,31 @@ use tracing::{debug, error, info};

/// The main function for the HTTP mode.
pub(crate) async fn http_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_runtime: Runtime,
port: u16,
output_dir: PathBuf,
prover_config: ProverConfig,
) -> Result<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], port));
debug!("listening on {}", addr);

let runtime = Arc::new(runtime);
let block_proof_runtime = Arc::new(block_proof_runtime);
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
let segment_runtime = Arc::new(segment_runtime);
let app = Router::new().route(
"/prove",
post({
let runtime = runtime.clone();
move |body| prove(body, runtime, output_dir.clone(), prover_config)
let block_proof_runtime = block_proof_runtime.clone();
let segment_runtime = segment_runtime.clone();
move |body| {
prove(
body,
block_proof_runtime,
segment_runtime,
output_dir.clone(),
prover_config,
)
}
}),
);
let listener = tokio::net::TcpListener::bind(&addr).await?;
Expand Down Expand Up @@ -63,7 +74,8 @@ struct HttpProverInput {

async fn prove(
Json(payload): Json<HttpProverInput>,
runtime: Arc<Runtime>,
block_proof_runtime: Arc<Runtime>,
segment_runtime: Arc<Runtime>,
output_dir: PathBuf,
prover_config: ProverConfig,
) -> StatusCode {
Expand All @@ -75,7 +87,7 @@ async fn prove(
payload
.prover_input
.prove_test(
&runtime,
&segment_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand All @@ -84,7 +96,8 @@ async fn prove(
payload
.prover_input
.prove(
&runtime,
&block_proof_runtime,
&segment_runtime,
payload.previous.map(futures::future::ok),
prover_config,
)
Expand Down
30 changes: 25 additions & 5 deletions zero_bin/leader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,14 @@ async fn main() -> Result<()> {

let args = cli::Cli::parse();

let runtime = Runtime::from_config(&args.paladin, register()).await?;
let mut block_proof_paladin_args = args.paladin.clone();
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved
block_proof_paladin_args.task_bus_routing_key = Some("block_proof".to_string());
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved

let mut segment_paladin_args = args.paladin.clone();
segment_paladin_args.task_bus_routing_key = Some("segment".to_string());
temaniarpit27 marked this conversation as resolved.
Show resolved Hide resolved

let block_proof_runtime = Runtime::from_config(&block_proof_paladin_args, register()).await?;
let segment_runtime = Runtime::from_config(&segment_paladin_args, register()).await?;

let prover_config: ProverConfig = args.prover_config.into();

Expand All @@ -73,7 +80,13 @@ async fn main() -> Result<()> {
Command::Clean => zero_bin_common::prover_state::persistence::delete_all()?,
Command::Stdio { previous_proof } => {
let previous_proof = get_previous_proof(previous_proof)?;
stdio::stdio_main(runtime, previous_proof, prover_config).await?;
stdio::stdio_main(
block_proof_runtime,
segment_runtime,
previous_proof,
prover_config,
)
.await?;
}
Command::Http { port, output_dir } => {
// check if output_dir exists, is a directory, and is writable
Expand All @@ -85,7 +98,14 @@ async fn main() -> Result<()> {
panic!("output-dir is not a writable directory");
}

http::http_main(runtime, port, output_dir, prover_config).await?;
http::http_main(
block_proof_runtime,
segment_runtime,
port,
output_dir,
prover_config,
)
.await?;
}
Command::Rpc {
rpc_url,
Expand All @@ -99,7 +119,6 @@ async fn main() -> Result<()> {
backoff,
max_retries,
} => {
let runtime = Runtime::from_config(&args.paladin, register()).await?;
let previous_proof = get_previous_proof(previous_proof)?;
let mut block_interval = BlockInterval::new(&block_interval)?;

Expand All @@ -113,7 +132,8 @@ async fn main() -> Result<()> {

info!("Proving interval {block_interval}");
client_main(
runtime,
block_proof_runtime,
segment_runtime,
RpcParams {
rpc_url,
rpc_type,
Expand Down
17 changes: 13 additions & 4 deletions zero_bin/leader/src/stdio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use tracing::info;

/// The main function for the stdio mode.
pub(crate) async fn stdio_main(
runtime: Runtime,
block_proof_runtime: Runtime,
segment_runtime: Runtime,
previous: Option<GeneratedBlockProof>,
prover_config: ProverConfig,
) -> Result<()> {
Expand All @@ -21,9 +22,17 @@ pub(crate) async fn stdio_main(
.map(Into::into)
.collect::<Vec<BlockProverInputFuture>>();

let proved_blocks =
prover::prove(block_prover_inputs, &runtime, previous, prover_config, None).await;
runtime.close().await?;
let proved_blocks = prover::prove(
block_prover_inputs,
&block_proof_runtime,
&segment_runtime,
previous,
prover_config,
None,
)
.await;
block_proof_runtime.close().await?;
segment_runtime.close().await?;
let proved_blocks = proved_blocks?;

if prover_config.test_only {
Expand Down
25 changes: 16 additions & 9 deletions zero_bin/prover/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl BlockProverInput {

pub async fn prove(
self,
runtime: &Runtime,
block_proof_runtime: &Runtime,
segment_runtime: &Runtime,
previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
prover_config: ProverConfig,
) -> Result<GeneratedBlockProof> {
Expand Down Expand Up @@ -99,7 +100,7 @@ impl BlockProverInput {

Directive::map(IndexedStream::from(segment_data_iterator), &seg_prove_ops)
.fold(&seg_agg_ops)
.run(runtime)
.run(segment_runtime)
.map(move |e| {
e.map(|p| (idx, proof_gen::proof_types::BatchAggregatableProof::from(p)))
})
Expand All @@ -109,7 +110,7 @@ impl BlockProverInput {
// Fold the batch aggregated proof stream into a single proof.
let final_batch_proof =
Directive::fold(IndexedStream::new(batch_proof_futs), &batch_agg_ops)
.run(runtime)
.run(block_proof_runtime)
.await?;

if let proof_gen::proof_types::BatchAggregatableProof::Agg(proof) = final_batch_proof {
Expand All @@ -126,7 +127,7 @@ impl BlockProverInput {
prev,
save_inputs_on_error,
})
.run(runtime)
.run(block_proof_runtime)
.await?;

info!("Successfully proved block {block_number}");
Expand All @@ -139,7 +140,7 @@ impl BlockProverInput {

pub async fn prove_test(
self,
runtime: &Runtime,
segment_runtime: &Runtime,
previous: Option<impl Future<Output = Result<GeneratedBlockProof>>>,
prover_config: ProverConfig,
) -> Result<GeneratedBlockProof> {
Expand Down Expand Up @@ -175,7 +176,7 @@ impl BlockProverInput {
);

simulation
.run(runtime)
.run(segment_runtime)
.await?
.try_for_each(|_| future::ok(()))
.await?;
Expand Down Expand Up @@ -204,7 +205,8 @@ impl BlockProverInput {
/// block proofs as well.
pub async fn prove(
block_prover_inputs: Vec<BlockProverInputFuture>,
runtime: &Runtime,
block_proof_runtime: &Runtime,
segment_runtime: &Runtime,
previous_proof: Option<GeneratedBlockProof>,
prover_config: ProverConfig,
proof_output_dir: Option<PathBuf>,
Expand All @@ -226,7 +228,7 @@ pub async fn prove(
// Prove the block
let block_proof = if prover_config.test_only {
block
.prove_test(runtime, previous_block_proof, prover_config)
.prove_test(segment_runtime, previous_block_proof, prover_config)
.then(move |proof| async move {
let proof = proof?;
let block_number = proof.b_height;
Expand All @@ -250,7 +252,12 @@ pub async fn prove(
.await?
} else {
block
.prove(runtime, previous_block_proof, prover_config)
.prove(
block_proof_runtime,
segment_runtime,
previous_block_proof,
prover_config,
)
.then(move |proof| async move {
let proof = proof?;
let block_number = proof.b_height;
Expand Down
Loading