Skip to content

Commit

Permalink
add FlowableStringInputStream from @akarnokd #12
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten committed Mar 23, 2018
1 parent 032e5a4 commit e5c5dd6
Show file tree
Hide file tree
Showing 6 changed files with 464 additions and 3 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ Strings

`splitSimple(String)`

`toInputStream`

`trim`

`strings`
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/github/davidmoten/rx2/Actions.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.github.davidmoten.rx2;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import io.reactivex.functions.Action;

Expand Down Expand Up @@ -45,4 +46,15 @@ public void run() throws Exception {
};
}

public static Action increment(final AtomicInteger x) {
//TODO make holder
return new Action() {

@Override
public void run() throws Exception {
x.incrementAndGet();
}
};
}

}
11 changes: 11 additions & 0 deletions src/main/java/com/github/davidmoten/rx2/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,16 @@ public R apply(T t) {
}
};
}

public static <T> Function<T, String> toStringFunction() {
// TODO make holder
return new Function<T,String> () {

@Override
public String apply(T t) throws Exception {
return String.valueOf(t);
}
};
}

}
32 changes: 29 additions & 3 deletions src/main/java/com/github/davidmoten/rx2/Strings.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.github.davidmoten.guavamini.Preconditions;
import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
import com.github.davidmoten.rx2.internal.flowable.FlowableStringInputStream;
import com.github.davidmoten.rx2.internal.flowable.FlowableStringSplitSimple;
import com.github.davidmoten.rx2.internal.flowable.TransformerDecode;
import com.github.davidmoten.rx2.internal.flowable.TransformerStringSplit;
Expand Down Expand Up @@ -334,9 +335,8 @@ public Publisher<String> apply(Flowable<T> source) {
/**
* Splits on a string delimiter, not a pattern. Is slower than RxJavaString
* 1.1.1 implementation on benchmarks below but requests minimally from
* upstream and is potentially much faster when the stream is significantly
* truncated (for example by downstream
* {@code .take(), .takeUntil(), elementAt()}.
* upstream and is potentially much faster when the stream is significantly truncated
* (for example by downstream {@code .take(), .takeUntil(), elementAt()}.
*
* <pre>
* Benchmark Mode Cnt Score Error Units
Expand Down Expand Up @@ -368,4 +368,30 @@ public Publisher<String> apply(Flowable<String> source) {
};
}

/**
* Returns an {@link InputStream} that offers the concatenated String data
* emitted by a subscription to the given publisher using the given character set.
*
* @param publisher the source of the String data
* @param charset the character set of the bytes to be read in the InputStream
* @return offers the concatenated String data emitted by a subscription to
* the given publisher using the given character set
*/
public static InputStream toInputStream(Publisher<String> publisher, Charset charset) {
return FlowableStringInputStream.createInputStream(publisher, charset);
}

/**
* Returns an {@link InputStream} that offers the concatenated String data
* emitted by a subscription to the given publisher using the character set
* UTF-8 for the bytes read through the InputStream.
*
* @param publisher the source of the String data
* @return offers the concatenated String data emitted by a subscription to
* the given publisher using the UTF-8 character set
*/
public static InputStream toInputStream(Publisher<String> f) {
return FlowableStringInputStream.createInputStream(f, Charset.forName("UTF-8"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package com.github.davidmoten.rx2.internal.flowable;

import java.io.*;
import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.*;

import io.reactivex.FlowableSubscriber;
import io.reactivex.internal.subscriptions.SubscriptionHelper;

/**
* @author David Karnok
*
*/
public final class FlowableStringInputStream {

private FlowableStringInputStream() {
throw new IllegalStateException("No instances!");
}

public static InputStream createInputStream(Publisher<String> source, Charset charset) {
StringInputStream parent = new StringInputStream(charset);
source.subscribe(parent);
return parent;
}

static final class StringInputStream extends InputStream
implements FlowableSubscriber<String> {

final AtomicReference<Subscription> upstream;

final Charset charset;

volatile byte[] bytes;

int index;

volatile boolean done;
Throwable error;

StringInputStream(Charset charset) {
this.charset = charset;
upstream = new AtomicReference<Subscription>();
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.setOnce(upstream, s)) {
s.request(1);
}
}

@Override
public void onNext(String t) {
bytes = t.getBytes(charset);
synchronized (this) {
notifyAll();
}
}

@Override
public void onError(Throwable t) {
error = t;
done = true;
synchronized (this) {
notifyAll();
}
}

@Override
public void onComplete() {
done = true;
synchronized (this) {
notifyAll();
}
}

@Override
public int read() throws IOException {
for (;;) {
byte[] a = awaitBufferIfNecessary();
if (a == null) {
Throwable ex = error;
if (ex != null) {
if (ex instanceof IOException) {
throw (IOException)ex;
}
throw new IOException(ex);
}
return -1;
}
int idx = index;
if (idx == a.length) {
index = 0;
bytes = null;
upstream.get().request(1);
} else {
int result = a[idx] & 0xFF;
index = idx + 1;
return result;
}
}
}

byte[] awaitBufferIfNecessary() throws IOException {
byte[] a = bytes;
if (a == null) {
synchronized (this) {
for (;;) {
boolean d = done;
a = bytes;
if (a != null) {
break;
}
if (d || upstream.get() == SubscriptionHelper.CANCELLED) {
break;
}
try {
wait();
} catch (InterruptedException ex) {
if (upstream.get() != SubscriptionHelper.CANCELLED) {
InterruptedIOException exc = new InterruptedIOException();
exc.initCause(ex);
throw exc;
}
break;
}
}
}
}
return a;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
if (off < 0 || len < 0 || off >= b.length || off + len > b.length) {
throw new IndexOutOfBoundsException("b.length=" + b.length + ", off=" + off + ", len=" + len);
}
for (;;) {
byte[] a = awaitBufferIfNecessary();
if (a == null) {
Throwable ex = error;
if (ex != null) {
if (ex instanceof IOException) {
throw (IOException)ex;
}
throw new IOException(ex);
}
return -1;
}
int idx = index;
if (idx == a.length) {
index = 0;
bytes = null;
upstream.get().request(1);
} else {
int r = 0;
while (idx < a.length && len > 0) {
b[off] = a[idx];
idx++;
off++;
r++;
len--;
}
index = idx;
return r;
}
}
}

@Override
public int available() throws IOException {
byte[] a = bytes;
int idx = index;
return a != null ? Math.max(0, a.length - idx) : 0;
}

@Override
public void close() throws IOException {
SubscriptionHelper.cancel(upstream);
synchronized (this) {
notifyAll();
}
}
}
}
Loading

0 comments on commit e5c5dd6

Please sign in to comment.