Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluggable Streamer Prototype in Java #85

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft

Conversation

stevenhartley
Copy link

@stevenhartley stevenhartley commented Feb 14, 2024

The following pull request implements the pluggable streamer written in Java. It is used to demonstrate what the concept is and how one would implement a streamer using only the uTransport interface. The intention of this PR is to provide example code and NOT be used for implementing an actual streamer as that would be written in C++ or rust to be useable everywhere. .

@stevenhartley stevenhartley marked this pull request as ready for review March 19, 2024 22:26
@stevenhartley stevenhartley changed the title Initial idea of the pluggable streamer Pluggable Streamer Prototype in Java Mar 19, 2024
@stevenhartley stevenhartley requested review from neelam-kushwah and tamarafischer and removed request for tamarafischer March 19, 2024 22:34
Copy link

@PLeVasseur PLeVasseur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really slick! 🙂

I like the concept. I'll think on how to apply a similar clean pattern to up-streamer-rust

*
*/
public class Route {
private UAuthority authority;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can mark these as final

// Unregister the listener with the transport
listeners.stream()
.filter(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))
.forEach(p -> in.getTransport().unregisterListener(UUri.newBuilder().setAuthority(out.getAuthority()).build(), p));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if unregister fails?
can unregister block the thread?
just a feeling in my tummy that this can end up with race conditions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking behaviour of the unregister method depends on its implementation
If one thread is unregistering listener while another is modifying the list, it can lead to race conditions.
We can use synchronization and an ExecutorService for async operations for added safety.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this is only pseudo code and not intended to be used to build an actual streamer.

Copy link
Contributor

@neelam-kushwah neelam-kushwah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor recommendation to use synchronized block, but I'm approving the PR as this code is only for demonstration purposes

// Unregister the listener with the transport
listeners.stream()
.filter(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))
.forEach(p -> in.getTransport().unregisterListener(UUri.newBuilder().setAuthority(out.getAuthority()).build(), p));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking behaviour of the unregister method depends on its implementation
If one thread is unregistering listener while another is modifying the list, it can lead to race conditions.
We can use synchronization and an ExecutorService for async operations for added safety.

TransportListener listener = new TransportListener(in, out);
UUri out_uri = UUri.newBuilder().setAuthority(out.getAuthority()).build();

UStatus result = in.getTransport().registerListener(out_uri, listener);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am recalling @sophokles73's feedback from about a week ago as I was implementing up-streamer-rust.

It seems to me this would nab you the Request, Response, and Notification messages headed to out.getAuthority(), but not Publish messages originating from in.getAuthority().

I think we may need to modify:

UStatus result = in.getTransport().registerListener(out_uri, request_response_notification_listener);

and add:

UUri in_uri = UUri.newBuilder().setAuthority(in.getAuthority()).build();
UStatus result = in.getTransport().registerListener(in_uri, publish_listener);

and make some corresponding changes to TransportListener to allow us to configure it for dropping Publish messages or all non-Publish messages.

Still tooling around with this over on up-streamer-rust, but wanted to make this comment before I forget 🙂

czfdcn added 2 commits March 25, 2024 21:07
Static routing rules work for messages that have a sink but messages that do not have sink we have to look up to  uSubscription for only a subset of topics that we (the streamer) need to listen to so that we don't get everything).
* ned to call this API if its database is flushed or corrupted (ex. factory
* reset).
* **__NOTE:__** This is a private API only for uSubscription services,
* uEs can call Unsubscribe() to flush their own subscriptions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not intended to be called by client developers then it should not be here.
Adding documentation to explain interfaces means that they are not intention revealing and there is nothing stopping anyone from calling this API.
IMHO, it should not be here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reply in the other comment (they are related).

import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.core.usubscription.v3.*;

/* The following is the uSubscription API declared as an interface so it could be easily
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this any different from all the other protobuf API's?
not that I am against an interface, I am just curious as to why this is suddenly an interface while the others are protobuf "interface API"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tamarafischer (and @AnotherDaniel). The reason why I added this here is because of the following reasons/issues:

  1. We do not have code generators from proto yet
  2. code generators create implementations not interfaces, this is problematic if I want to build generic uEs that are not dependent on an implementation (i.e. the pluggable streamer or any uE that will use uSubscription in general and not depend on say a specific up-client implementation).
  3. USubscription has always been special in that it will not always be 1:1 mapping from the generated code, we will always have to tweak it for the specific implementation so it is better to be a language specific interface like UTransport and RpcClient.

Above is a major shift from current PoR but we need the interface urgently to unblock everyone while we work out tooling for all the other uServices.

*/
class TransportListener implements UListener {

private Route in;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a curcular reference here.
UListener is used by UTransport.
TransportListener has 2 Route objects that also contain a UTransport
How does this work?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to rethink the circular dependency but like I mention this is only demo code and NOT intended to be used to implement any up-streamer library.

* @param request the request containing the topic, subscriber name, and subscription attributes
* @return the response containing the status of the request and any event delivery configuration
*/
SubscriptionResponse subscribe(SubscriptionRequest request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these methods always executed within the application process? Is there no IO involved?
If there is I/O the I believe that we should be returning a CompletionStage
At least this way we can compose the operations

public class UStreamer {

// List of listeners (routes) that the streamer listens to
private List<TransportListener> listeners;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the advantage of the TransportListener?
why can't we just have a list of Route objects?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The routes are not the rules in the forwarding table, the rules are a combination of in and out Route that then build a TransportListener and all we need to keep track of is the TransportListeners.

@PLeVasseur
Copy link

@tamarafischer

While you're offering Steven critique feel free to check out the attempt in Rust 😉

}

// check if the rule already exists in the list
if (listeners.stream().anyMatch(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not very efficient. An O(n) check can be a problem. If the list is small no problem. Maybe a data structure like a Set or a Map can help with managing duplicates?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do this optimization in the demo code though... Maybe I need to move this code out of up-java and into a separate project so that it is not accidentally used by developers!!

.build();

// Get the list of subscriptions
FetchSubscriptionsResponse response = submgr.fetchSubscriptions(request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if the subscription manager is slow - maybe under load?
what happens if the subscription manager fails? A bug or it is restarting or something?
Is this even a thing we need to handle in the car? Cloud applications must handle these cases and be a little more resilient.
Just saying that this is the happy path, subscription manager is a separate component and anything can happen.
The good thing is that you made it an interface and can now create a USubscription implementation that barfs and see how your application handles these use cases.

Objects.requireNonNull(out, "output cannot be null.");

if (in.equals(out)) {
return UStatus.newBuilder().setCode(UCode.INVALID_ARGUMENT).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who cares? if you can't delete, if there is nothing to delete, it is fine.
delete means I dont want something to be there anymore. If it is not possible then just say ok, deleted.
IMHO, you only give an error when something is there and you could not delete it, meaning I wanted it gone and it is still there. All other cases are, ok - it is not there anymore.
This is from a developer perspective.

if (listeners.removeIf(p -> p.getInputRoute().equals(in) && p.getOutputRoute().equals(out))) {
return UStatus.newBuilder().setCode(UCode.OK).build();
}
return UStatus.newBuilder().setCode(UCode.NOT_FOUND).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like delete, if not found and I want to remove, then I am ok with that. no need to tell me it was not found - meaning someone else already deleted it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there is no value in returning NOT_FOUND you're saying from a developer PoV?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants