Skip to content

Commit

Permalink
RpcClient Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
czfdcn committed Sep 14, 2023
1 parent 8c02ab6 commit af5ee68
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 735 deletions.
47 changes: 0 additions & 47 deletions src/main/java/org/eclipse/uprotocol/rpc/Rpc.java

This file was deleted.

17 changes: 8 additions & 9 deletions src/main/java/org/eclipse/uprotocol/rpc/RpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,24 @@

package org.eclipse.uprotocol.rpc;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;

import org.eclipse.uprotocol.uri.datamodel.UUri;
import org.eclipse.uprotocol.utransport.datamodel.UAttributes;
import org.eclipse.uprotocol.utransport.datamodel.UPayload;

/**
* The RpcClient interface defines the RPC client API per the uProtocol specification
* https://github.com/eclipse-uprotocol/uprotocol-spec/blob/main/up-l2/README.adoc
*
* Interface used by code generators found in https://github.com/eclipse-uprotocol/uprotocol-core-api
* to invoke a method to support RPC.
*/
public interface RpcClient {

/**
* Support for RPC method invocation.
* @param topic The topic to invoke the method on.
* @param payload The payload to send.
* @param attributes The attributes to send.
* @return Returns the CompletableFuture with the result or exception.
* @param topic req.v1 CloudEvent.
* @param payload TODO
* @param attributes TODO
* @return Returns the CompletableFuture with the result or exception.
*/
CompletionStage<UPayload> invokeMethod(UUri topic, UPayload payload, UAttributes attributes);
CompletableFuture<UPayload> invokeMethod(UUri topic, UPayload payload, UAttributes attributes);
}
52 changes: 35 additions & 17 deletions src/main/java/org/eclipse/uprotocol/rpc/RpcMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

import org.eclipse.uprotocol.utransport.datamodel.UPayload;

/**
* An interface that maps the returned value from uProtocol Layer 2 to the response in Layer3.
*/
Expand All @@ -42,7 +44,7 @@ public interface RpcMapper {
* @return Returns a CompletableFuture containing the declared expected return type of the RPC method or an exception.
* @param <T> The declared expected return type of the RPC method.
*/
static <T extends Message> CompletableFuture<T> mapResponse(CompletableFuture<Any> responseFuture, Class<T> expectedClazz) {
static <T extends Message> CompletableFuture<T> mapResponse(CompletableFuture<UPayload> responseFuture, Class<T> expectedClazz) {
return responseFuture.handle((payload, exception) -> {
// Unexpected exception
if (exception != null) {
Expand All @@ -51,12 +53,19 @@ static <T extends Message> CompletableFuture<T> mapResponse(CompletableFuture<An
if (payload == null) {
throw new RuntimeException("Server returned a null payload. Expected " + expectedClazz.getName());
}
// Expected type
if (payload.is(expectedClazz)) {
return unpackPayload(payload, expectedClazz);
Any any;
try {
any = Any.parseFrom(payload.data());

// Expected type
if (any.is(expectedClazz)) {
return unpackPayload(any, expectedClazz);
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("%s [%s]", e.getMessage(), Status.class.getName()), e);
}
// Some other type instead of the expected one
throw new RuntimeException(String.format("Unknown payload type [%s]. Expected [%s]", payload.getTypeUrl(), expectedClazz.getName()));
throw new RuntimeException(String.format("Unknown payload type [%s]. Expected [%s]", any.getTypeUrl(), expectedClazz.getName()));
});
}

Expand All @@ -67,30 +76,39 @@ static <T extends Message> CompletableFuture<T> mapResponse(CompletableFuture<An
* @return Returns a CompletableFuture containing an RpcResult containing the declared expected return type T, or a Status containing any errors.
* @param <T> The declared expected return type of the RPC method.
*/
static <T extends Message> CompletableFuture<RpcResult<T>> mapResponseToResult(CompletableFuture<Any> responseFuture, Class<T> expectedClazz) {
static <T extends Message> CompletableFuture<RpcResult<T>> mapResponseToResult(CompletableFuture<UPayload> responseFuture, Class<T> expectedClazz) {
return responseFuture.handle((payload, exception) -> {
// Unexpected exception
if (exception != null) {
throw new RuntimeException(exception.getMessage(), exception);
}

if (payload == null) {
throw new RuntimeException("Server returned a null payload. Expected " + expectedClazz.getName());
}
// Expected type
if (payload.is(expectedClazz)) {
if (Status.class.equals(expectedClazz)) {
return calculateStatusResult(payload);
} else {
return RpcResult.success(unpackPayload(payload, expectedClazz));
Any any;
try {
any = Any.parseFrom(payload.data());

// Expected type
if (any.is(expectedClazz)) {
if (Status.class.equals(expectedClazz)) {
return calculateStatusResult(any);
} else {
return RpcResult.success(unpackPayload(any, expectedClazz));
}
}
// Status instead of the expected one
if (any.is(Status.class)) {
return calculateStatusResult(any);
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("%s [%s]", e.getMessage(), Status.class.getName()), e);
}
// Status instead of the expected one
if (payload.is(Status.class)) {
return calculateStatusResult(payload);
}

// Some other type instead of the expected one
throw new RuntimeException(String.format("Unknown payload type [%s]. Expected [%s]",
payload.getTypeUrl(), expectedClazz.getName()));
any.getTypeUrl(), expectedClazz.getName()));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@
import java.util.Optional;

public enum USerializationHint {
// Serialization hint is unknown
UNKNOWN(0, ""),
PROTOBUF (1, "application/x-protobuf"), // data is a Base64 encoded protobuf string
JSON(2, "application/json"), // data is a UTF-8 string containing a JSON structure
SOMEIP(3, "application/x-someip"), // data is a UTF-8 string containing a JSON structure
RAW(4, "application/octet-stream"); // data is a Base64 encoded protobuf string of an Any object with the payload inside

// serialized com.google.protobuf.Any type
PROTOBUF (1, "application/x-protobuf"),

// data is a UTF-8 string containing a JSON structure
JSON(2, "application/json"),

// data is a UTF-8 string containing a JSON structure
SOMEIP(3, "application/x-someip"),

// Raw binary data that has not been serialized
RAW(4, "application/octet-stream");

private final int hintNumber;
private final String mimeType;
Expand Down
Loading

0 comments on commit af5ee68

Please sign in to comment.