diff --git a/.bsp/sbt.json b/.bsp/sbt.json index 522f04ad..c3f7bc05 100644 --- a/.bsp/sbt.json +++ b/.bsp/sbt.json @@ -1 +1 @@ -{"name":"sbt","version":"1.5.5","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/Users/ben/.sdkman/candidates/java/8.0.292.hs-adpt/jre/bin/java","-Xms100m","-Xmx100m","-classpath","/usr/local/Cellar/sbt/1.5.5/libexec/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/usr/local/Cellar/sbt/1.5.5/libexec/bin/sbt-launch.jar"]} \ No newline at end of file +{"name":"sbt","version":"1.5.5","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/home/streeter/.sdkman/candidates/java/11.0.11.hs-adpt/bin/java","-Xms100m","-Xmx100m","-classpath","/home/streeter/.sdkman/candidates/sbt/1.5.5/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/home/streeter/.sdkman/candidates/sbt/1.5.5/bin/sbt-launch.jar"]} \ No newline at end of file diff --git a/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/PubsubGoogleConsumer.scala b/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/PubsubGoogleConsumer.scala index 9e80060e..35e18995 100644 --- a/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/PubsubGoogleConsumer.scala +++ b/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/PubsubGoogleConsumer.scala @@ -41,13 +41,16 @@ object PubsubGoogleConsumer { ): Stream[F, ConsumerRecord[F, A]] = PubsubSubscriber .subscribe(blocker, projectId, subscription, config) - .flatMap { case internal.Model.Record(msg, ack, nack) => + .evalMapChunk[F, Option[ConsumerRecord[F, A]]] { case internal.Model.Record(msg, ack, nack) => MessageDecoder[A].decode(msg.getData.toByteArray) match { - case Left(e) => Stream.eval_(errorHandler(msg, e, ack, nack)) + case Left(e) => errorHandler(msg, e, ack, nack).as(None) case Right(v) => - Stream.emit(ConsumerRecord(v, msg.getAttributesMap.asScala.toMap, ack, nack, _ => Applicative[F].unit)) + Sync[F].pure( + Some(ConsumerRecord(v, msg.getAttributesMap.asScala.toMap, ack, nack, _ => Applicative[F].unit)) + ) } } + .unNone /** * Subscribe with automatic acknowledgement diff --git a/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala b/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala index 47650185..367020bb 100644 --- a/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala +++ b/fs2-google-pubsub-grpc/src/main/scala/com/permutive/pubsub/consumer/grpc/internal/PubsubSubscriber.scala @@ -1,7 +1,7 @@ package com.permutive.pubsub.consumer.grpc.internal -import cats.effect.{Blocker, ContextShift, Resource, Sync} import cats.Applicative +import cats.effect.{Blocker, ContextShift, Resource, Sync} import cats.syntax.all._ import com.google.api.core.ApiService import com.google.api.gax.batching.FlowControlSettings @@ -11,10 +11,12 @@ import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage} import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumer.InternalPubSubError import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumerConfig import com.permutive.pubsub.consumer.{Model => PublicModel} -import fs2.Stream +import fs2.{Chunk, Stream} import org.threeten.bp.Duration +import collection.JavaConverters._ import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit} +import java.util.ArrayList private[consumer] object PubsubSubscriber { @@ -70,25 +72,29 @@ private[consumer] object PubsubSubscriber { queue.put(Left(InternalPubSubError(failure))) } - def takeNextElement[F[_]: Sync: ContextShift, A](messages: BlockingQueue[A], blocker: Blocker): F[A] = + def takeNextElements[F[_]: Sync: ContextShift, A](messages: BlockingQueue[A], blocker: Blocker): F[List[A]] = for { nextOpt <- Sync[F].delay(Option(messages.poll())) // `poll` is non-blocking, returning `null` if queue is empty next <- nextOpt.fold(blocker.delay(messages.take()))(Applicative[F].pure) // `take` can wait for an element - } yield next + more <- Sync[F].delay(new ArrayList[A]) + _ <- Sync[F].delay(messages.drainTo(more)) + } yield next :: more.asScala.toList def subscribe[F[_]: Sync: ContextShift]( blocker: Blocker, projectId: PublicModel.ProjectId, subscription: PublicModel.Subscription, config: PubsubGoogleConsumerConfig[F], - ): Stream[F, Model.Record[F]] = - for { - + ): Stream[F, Model.Record[F]] = { + val chunked = for { queue <- Stream.eval( Sync[F].delay(new LinkedBlockingQueue[Either[InternalPubSubError, Model.Record[F]]](config.maxQueueSize)) ) - _ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue, blocker)) - next <- Stream.repeatEval(takeNextElement(queue, blocker)) - msg <- Stream.fromEither[F](next) - } yield msg + _ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue, blocker)) + taken <- Stream.repeatEval(takeNextElements(queue, blocker)) + chunk <- Stream.fromEither[F](taken.sequence) + } yield Chunk.seq(chunk) + + chunked.flatMap(Stream.chunk) + } }