Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pluggable Streamer Prototype in Java #85

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.eclipse.uprotocol.core.usubscription.v3;

import org.eclipse.uprotocol.v1.UStatus;
import org.eclipse.uprotocol.core.usubscription.v3.*;

/* The following is the uSubscription API declared as an interface so it could be easily
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this any different from all the other protobuf API's?
not that I am against an interface, I am just curious as to why this is suddenly an interface while the others are protobuf "interface API"

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @tamarafischer (and @AnotherDaniel). The reason why I added this here is because of the following reasons/issues:

  1. We do not have code generators from proto yet
  2. code generators create implementations not interfaces, this is problematic if I want to build generic uEs that are not dependent on an implementation (i.e. the pluggable streamer or any uE that will use uSubscription in general and not depend on say a specific up-client implementation).
  3. USubscription has always been special in that it will not always be 1:1 mapping from the generated code, we will always have to tweak it for the specific implementation so it is better to be a language specific interface like UTransport and RpcClient.

Above is a major shift from current PoR but we need the interface urgently to unblock everyone while we work out tooling for all the other uServices.

* used
*/
public interface USubscription {


/**
* A consumer (application) calls this API to subscribe to a topic.
* What is passed is the SubscriptionRequest message containing the topic, the
* subscriber's name, and any Subscription Attributes. This API returns a
* SubscriptionResponse message containing the status of the request along with
* any event delivery configuration
* required to consume the event. Calling this API also registers the subscriber
* to received subscription change notifications if ever the subscription state
* changes.
* @param request the request containing the topic, subscriber name, and subscription attributes
* @return the response containing the status of the request and any event delivery configuration
*/
SubscriptionResponse subscribe(SubscriptionRequest request);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these methods always executed within the application process? Is there no IO involved?
If there is I/O the I believe that we should be returning a CompletionStage
At least this way we can compose the operations



// The consumer no longer wishes to subscribe to a topic so it issues an
// explicit unsubscribe request.
UStatus unsubscribe(UnsubscribeRequest request);


/**
* Fetch a list of subscribers that are currently subscribed to a given topic.
* @param request the request containing the topic that we want the list of subscribers for
* @return the list of subscribers for the given topic
*/
FetchSubscribersResponse fetchSubscribers(FetchSubscribersRequest request);


/**
* Reset subscriptions to and from the uSubscription Service.
* This API is used between uSubscription services in order to flush and
* reestablish subscriptions between devices. A uSubscription service might
* ned to call this API if its database is flushed or corrupted (ex. factory
* reset).
* **__NOTE:__** This is a private API only for uSubscription services,
* uEs can call Unsubscribe() to flush their own subscriptions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not intended to be called by client developers then it should not be here.
Adding documentation to explain interfaces means that they are not intention revealing and there is nothing stopping anyone from calling this API.
IMHO, it should not be here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll reply in the other comment (they are related).

*
* @param request the request containing the reset information
* @return the status of the reset request
*/
UStatus reset(ResetRequest request);


/**
* Fetch a list of subscriptions for a given subscriber.
* @param request the request containing the subscriber
* @return the list of subscriptions for the subscriber
*/
FetchSubscriptionsResponse fetchSubscriptions(FetchSubscriptionsRequest request);


/**
* Register for subscription change notifications for a given topic
* @param request the request containing the topic
* @return the status of the request
*/
UStatus registerForNotifications(NotificationsRequest request);

/**
* Unregister for subscription change notifications for a given topic
* @param request the request containing the topic
* @return the status from unregistering for a subscription change notification
*/
UStatus unregisterForNotifications(NotificationsRequest request);
}
8 changes: 8 additions & 0 deletions src/main/java/org/eclipse/uprotocol/streamer/README.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Plugable Streamer

This is a simple example of a plugable streamer that is used to demonstrate how someone would implement the uStreamer using the UTransport interface.

For more information on the design, please refer to the code and uProtocol specifications.

NOTE: The following is reference example code and not intended to be used to build an actual production uStreamer

64 changes: 64 additions & 0 deletions src/main/java/org/eclipse/uprotocol/streamer/Route.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 General Motors GTO LLC
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* SPDX-FileType: SOURCE
* SPDX-FileCopyrightText: 2024 General Motors GTO LLC
* SPDX-License-Identifier: Apache-2.0
*/

package org.eclipse.uprotocol.streamer;

import org.eclipse.uprotocol.transport.UTransport;
import org.eclipse.uprotocol.v1.UAuthority;

/**
* Route is defined as a combination of UAuthority and UTransport as routes are at the UAuthority level.
*
*/
public class Route {
private UAuthority authority;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can mark these as final

private UTransport transport;

/**
* Constructor
* @param authority
* @param transport
*/
Route(UAuthority authority, UTransport transport) {
this.authority = authority;
this.transport = transport;
}

/**
* Fetch the authority
* @return UAuthority
*/
public UAuthority getAuthority() {
return authority;
}

/**
* Fetch the transport
* @return UTransport
*/
public UTransport getTransport() {
return transport;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2024 General Motors GTO LLC
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* SPDX-FileType: SOURCE
* SPDX-FileCopyrightText: 2024 General Motors GTO LLC
* SPDX-License-Identifier: Apache-2.0
*/
package org.eclipse.uprotocol.streamer;

import org.eclipse.uprotocol.transport.UListener;
import org.eclipse.uprotocol.v1.UMessage;


/**
* Transport Listener receives a message that is destine for another transport so
* it simply forwards the message to the output UTransport.
*/
class TransportListener implements UListener {

private Route in;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have a curcular reference here.
UListener is used by UTransport.
TransportListener has 2 Route objects that also contain a UTransport
How does this work?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to rethink the circular dependency but like I mention this is only demo code and NOT intended to be used to implement any up-streamer library.

private Route out;

@Override
public void onReceive(UMessage message) {
out.getTransport().send(message);
}

/**
* Constructor passing the input and output routes
* @param in input Route
* @param out output Route
*/
public TransportListener(Route in, Route out) {
this.in = in;
this.out = out;
}

/**
* Fetch the input route
* @return input Route
*/
public Route getInputRoute() {
return in;
}

/**
* Fetch the output route
* @return output Route
*/
public Route getOutputRoute() {
return out;
}

}
Loading
Loading