Skip to content

Commit

Permalink
feat: Simulator gRPC Client implementation and initial configuration (#…
Browse files Browse the repository at this point in the history
…182)

Signed-off-by: Alfredo Gutierrez <[email protected]>
  • Loading branch information
AlfredoG87 committed Sep 20, 2024
1 parent f3e1b62 commit f78143f
Show file tree
Hide file tree
Showing 57 changed files with 1,358 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ jvmDependencyConflicts.patch {
"org.codehaus.mojo:animal-sniffer-annotations"
)

module("io.grpc:grpc-netty-shaded") { annotationLibraries.forEach { removeDependency(it) } }

module("io.grpc:grpc-api") { annotationLibraries.forEach { removeDependency(it) } }
module("io.grpc:grpc-core") { annotationLibraries.forEach { removeDependency(it) } }
module("io.grpc:grpc-context") { annotationLibraries.forEach { removeDependency(it) } }
Expand Down Expand Up @@ -87,9 +89,16 @@ extraJavaModuleInfo {
exportAllPackages()
requireAllDefinedDependencies()
requires("java.logging")
uses("io.grpc.ManagedChannelProvider")
uses("io.grpc.NameResolverProvider")
uses("io.grpc.LoadBalancerProvider")
}

module("io.grpc:grpc-core", "io.grpc.internal")
module("io.grpc:grpc-core", "io.grpc.internal") {
exportAllPackages()
requireAllDefinedDependencies()
requires("java.logging")
}
module("io.grpc:grpc-context", "io.grpc.context")
module("io.grpc:grpc-stub", "io.grpc.stub") {
exportAllPackages()
Expand All @@ -101,6 +110,15 @@ extraJavaModuleInfo {
module("io.grpc:grpc-util", "io.grpc.util")
module("io.grpc:grpc-protobuf", "io.grpc.protobuf")
module("io.grpc:grpc-protobuf-lite", "io.grpc.protobuf.lite")

module("io.grpc:grpc-netty-shaded", "io.grpc.netty.shaded") {
exportAllPackages()
requireAllDefinedDependencies()
requires("java.logging")
requires("jdk.unsupported")
ignoreServiceProvider("reactor.blockhound.integration.BlockHoundIntegration")
}

module("com.github.spotbugs:spotbugs-annotations", "com.github.spotbugs.annotations")
module("com.google.code.findbugs:jsr305", "java.annotation") {
exportAllPackages()
Expand Down
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ ignore:
- "server/src/main/java/com/hedera/block/server/Server.java"
- "server/src/main/java/com/hedera/block/server/Translator.java"
- "simulator/src/main/java/com/hedera/block/simulator/BlockStreamSimulator.java"
- "simulator/src/main/java/com/hedera/block/simulator/Translator.java"
2 changes: 2 additions & 0 deletions gradle/modules.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ com.github.spotbugs.annotations=com.github.spotbugs:spotbugs-annotations
com.lmax.disruptor=com.lmax:disruptor
io.helidon.webserver=io.helidon.webserver:helidon-webserver
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc

io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5

io.helidon.logging=io.helidon.logging:helidon-logging-jul
Expand All @@ -21,6 +22,7 @@ google.proto=com.google.protobuf:protoc
io.grpc=io.grpc:grpc-api
io.grpc.protobuf=io.grpc:grpc-protobuf
io.grpc.stub=io.grpc:grpc-stub
io.grpc.netty.shaded=io.grpc:grpc-netty-shaded

com.hedera.pbj.runtime=com.hedera.pbj:pbj-runtime
com.google.protobuf=com.google.protobuf:protobuf-java
Expand Down
10 changes: 7 additions & 3 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ dependencyResolutionManagement {
// Define a constant for the Dagger version.
val daggerVersion = "2.42"

// Define a constant for protobuf version.
val protobufVersion = "4.28.2"

// Compile time dependencies
version("io.helidon.webserver.http2", "4.1.0")
version("io.helidon.webserver.grpc", "4.1.0")
Expand All @@ -63,14 +66,15 @@ dependencyResolutionManagement {
version("io.grpc", "1.65.1")
version("io.grpc.protobuf", "1.65.1")
version("io.grpc.stub", "1.65.1")
version("io.grpc.netty.shaded", "1.65.1")

// Reference from the protobuf plugin
version("google.proto", "4.27.3")
version("google.proto", protobufVersion)
version("grpc.protobuf.grpc", "1.65.1")

// Google protobuf dependencies
version("com.google.protobuf", "4.27.3")
version("com.google.protobuf.util", "4.27.3")
version("com.google.protobuf", protobufVersion)
version("com.google.protobuf.util", protobufVersion)

// PBJ dependencies
plugin("pbj", "com.hedera.pbj.pbj-compiler").version("0.9.2")
Expand Down
3 changes: 3 additions & 0 deletions simulator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ mainModuleInfo {
annotationProcessor("dagger.compiler")
annotationProcessor("com.google.auto.service.processor")
runtimeOnly("com.swirlds.config.impl")
runtimeOnly("org.apache.logging.log4j.slf4j2.impl")
runtimeOnly("io.grpc.netty.shaded")
}

testModuleInfo {
requires("org.junit.jupiter.api")
requires("org.mockito")
requires("org.mockito.junit.jupiter")
requiresStatic("com.github.spotbugs.annotations")
requires("com.swirlds.common")
}

tasks.register<Copy>("untarTestBlockStream") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ private BlockStreamSimulator() {}
*
* @param args the arguments to be passed to the block stream simulator
* @throws IOException if an I/O error occurs
* @throws InterruptedException if the thread is interrupted
*/
public static void main(String[] args) throws IOException {
public static void main(String[] args) throws IOException, InterruptedException {

LOGGER.log(INFO, "Starting Block Stream Simulator");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package com.hedera.block.simulator;

import com.hedera.block.simulator.config.data.BlockStreamConfig;
import com.hedera.block.simulator.generator.BlockStreamManager;
import com.hedera.block.simulator.grpc.PublishStreamGrpcClient;
import com.hedera.hapi.block.stream.BlockItem;
import com.swirlds.config.api.Configuration;
import edu.umd.cs.findbugs.annotations.NonNull;
import javax.inject.Inject;
Expand All @@ -29,6 +32,10 @@ public class BlockStreamSimulatorApp {

Configuration configuration;
BlockStreamManager blockStreamManager;
PublishStreamGrpcClient publishStreamGrpcClient;
BlockStreamConfig blockStreamConfig;

private final int delayBetweenBlockItems;

boolean isRunning = false;

Expand All @@ -37,33 +44,51 @@ public class BlockStreamSimulatorApp {
*
* @param configuration the configuration to be used by the block stream simulator
* @param blockStreamManager the block stream manager to be used by the block stream simulator
* @param publishStreamGrpcClient the gRPC client to be used by the block stream simulator
*/
@Inject
public BlockStreamSimulatorApp(
@NonNull Configuration configuration, @NonNull BlockStreamManager blockStreamManager) {
@NonNull Configuration configuration,
@NonNull BlockStreamManager blockStreamManager,
@NonNull PublishStreamGrpcClient publishStreamGrpcClient) {
this.configuration = configuration;
this.blockStreamManager = blockStreamManager;
}
this.publishStreamGrpcClient = publishStreamGrpcClient;

/** Starts the block stream simulator. */
public void start() {
blockStreamConfig = configuration.getConfigData(BlockStreamConfig.class);

delayBetweenBlockItems = blockStreamConfig.delayBetweenBlockItems();
}

// use blockStreamManager to get block stream
/**
* Starts the block stream simulator.
*
* @throws InterruptedException if the thread is interrupted
*/
public void start() throws InterruptedException {
int delayMSBetweenBlockItems = delayBetweenBlockItems / 1_000_000;
int delayNSBetweenBlockItems = delayBetweenBlockItems % 1_000_000;

// use PublishStreamGrpcClient to stream it to the block-node.
isRunning = true;
LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has started");

// while
boolean streamBlockItem = true;
int blockItemsStreamed = 0;

// get block item
// send block item
while (streamBlockItem) {
// get block item
BlockItem blockItem = blockStreamManager.getNextBlockItem();
publishStreamGrpcClient.streamBlockItem(blockItem);
blockItemsStreamed++;

// verify if ack is needed
// wait for ack async...
Thread.sleep(delayMSBetweenBlockItems, delayNSBetweenBlockItems);

// verify exit condition
if (blockItemsStreamed >= blockStreamConfig.maxBlockItemsToStream()) {
streamBlockItem = false;
}
}

LOGGER.log(System.Logger.Level.INFO, "Block Stream Simulator has stopped");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.hedera.block.simulator.config.ConfigInjectionModule;
import com.hedera.block.simulator.generator.GeneratorInjectionModule;
import com.hedera.block.simulator.grpc.GrpcInjectionModule;
import com.swirlds.config.api.Configuration;
import dagger.BindsInstance;
import dagger.Component;
Expand All @@ -29,6 +30,7 @@
modules = {
ConfigInjectionModule.class,
GeneratorInjectionModule.class,
GrpcInjectionModule.class,
})
public interface BlockStreamSimulatorInjectionComponent {

Expand Down
Loading

0 comments on commit f78143f

Please sign in to comment.