Skip to content

tddmonkey/rx-aws

Repository files navigation

rx-aws

This project provides RxJava abstractions over some of the AWS Java SDK

Supported SDKs

SQS

SNS

DynamoDb

Usage

All SDKs follow the same naming conventions, parameters and return types that the current clients do so that they can almost be used as a drop in replacement in your existing code. For example, if you have code that performs a synchronous listQueues request for SQS like this:

ListQueuesResult listQueuesResult = amazonClient.listQueues(new ListQueuesRequest());
listQueuesResult.getQueueUrls().forEach(System.out::println);

This can be turned into an Rx call like this:

AmazonSdkRxSqs rxSqs = new AmazonSdkRxSqs(amazonClient);
rxSqs.listQueues(new ListQueuesRequest())
	.flatMapIterable(listQueuesResult -> listQueuesResult.getQueueUrls())
	.subscribe(System.out::println);

To receive messages, a synchronous call of this:

ReceiveMessageRequest request = new ReceiveMessageRequest()
	.withQueueUrl(queueUrl)
	.withMaxNumberOfMessages(1);
ReceiveMessageResult receiveMessageResult = amazonClient.receiveMessage(request);
receiveMessageResult.getMessages()
	.stream()
	.map(message -> message.getBody())
	.forEach(System.out::println);

Would become this:

AmazonSdkRxSqs rxSqs = new AmazonSdkRxSqs(amazonClient);
ReceiveMessageRequest request = new ReceiveMessageRequest()
	.withQueueUrl(queueUrl)
	.withMaxNumberOfMessages(1);
rxSqs.receiveMessage(request)
	.flatMapIterable(receiveMessageResult -> receiveMessageResult.getMessages())
	.map(message -> message.getBody())
	.subscribe(System.out::println);

The Rx client must be instantiated with an Async version of the AWS SDK being used as the Rx code performs an asynchronous call on your behalf. Under the hood this is just a call to the relevant aws async method with a relevant Rx handler. For example, using the aws async interface for listing queues, the code would be like this:

amazonClient.listQueuesAsync(new ListQueuesRequest(), new AsyncHandler<ListQueuesRequest, ListQueuesResult>() {
	@Override
	public void onError(Exception exception) {
		// handle errors here
	}

	@Override
	public void onSuccess(ListQueuesRequest request, ListQueuesResult listQueuesResult) {
		// handle success here
	}
});

whereas with this library the call is a standard Rx chain:

rxSqs.listQueues(new ListQueuesRequest())
	.doOnError(t -> handleErrorsHere())
	.subscribe(listQueuesResult -> handleSuccessHere());

Building

The project is built using the Gradle wrapper and requires Java 1.8

Where are the tests?!

The code here is all fully generated and has no tests. If you want to know more about the code generation see The Generator Project

About

Rx version of the Amazon SQS SDK

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages