Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
Emit larger chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 19, 2021
1 parent 326fb8e commit 9370e7d
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
2 changes: 1 addition & 1 deletion .bsp/sbt.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"name":"sbt","version":"1.5.5","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/Users/ben/.sdkman/candidates/java/8.0.292.hs-adpt/jre/bin/java","-Xms100m","-Xmx100m","-classpath","/usr/local/Cellar/sbt/1.5.5/libexec/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/usr/local/Cellar/sbt/1.5.5/libexec/bin/sbt-launch.jar"]}
{"name":"sbt","version":"1.5.5","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/home/streeter/.sdkman/candidates/java/11.0.11.hs-adpt/bin/java","-Xms100m","-Xmx100m","-classpath","/home/streeter/.sdkman/candidates/sbt/1.5.5/bin/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/home/streeter/.sdkman/candidates/sbt/1.5.5/bin/sbt-launch.jar"]}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ object PubsubGoogleConsumer {
): Stream[F, ConsumerRecord[F, A]] =
PubsubSubscriber
.subscribe(blocker, projectId, subscription, config)
.flatMap { case internal.Model.Record(msg, ack, nack) =>
.evalMapChunk[F, Option[ConsumerRecord[F, A]]] { case internal.Model.Record(msg, ack, nack) =>
MessageDecoder[A].decode(msg.getData.toByteArray) match {
case Left(e) => Stream.eval_(errorHandler(msg, e, ack, nack))
case Left(e) => errorHandler(msg, e, ack, nack).as(None)
case Right(v) =>
Stream.emit(ConsumerRecord(v, msg.getAttributesMap.asScala.toMap, ack, nack, _ => Applicative[F].unit))
Sync[F].pure(
Some(ConsumerRecord(v, msg.getAttributesMap.asScala.toMap, ack, nack, _ => Applicative[F].unit))
)
}
}
.unNone

/**
* Subscribe with automatic acknowledgement
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.permutive.pubsub.consumer.grpc.internal

import cats.effect.{Blocker, ContextShift, Resource, Sync}
import cats.Applicative
import cats.effect.{Blocker, ContextShift, Resource, Sync}
import cats.syntax.all._
import com.google.api.core.ApiService
import com.google.api.gax.batching.FlowControlSettings
Expand All @@ -11,10 +11,12 @@ import com.google.pubsub.v1.{ProjectSubscriptionName, PubsubMessage}
import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumer.InternalPubSubError
import com.permutive.pubsub.consumer.grpc.PubsubGoogleConsumerConfig
import com.permutive.pubsub.consumer.{Model => PublicModel}
import fs2.Stream
import fs2.{Chunk, Stream}
import org.threeten.bp.Duration
import collection.JavaConverters._

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
import java.util.ArrayList

private[consumer] object PubsubSubscriber {

Expand Down Expand Up @@ -70,25 +72,29 @@ private[consumer] object PubsubSubscriber {
queue.put(Left(InternalPubSubError(failure)))
}

def takeNextElement[F[_]: Sync: ContextShift, A](messages: BlockingQueue[A], blocker: Blocker): F[A] =
def takeNextElements[F[_]: Sync: ContextShift, A](messages: BlockingQueue[A], blocker: Blocker): F[List[A]] =
for {
nextOpt <- Sync[F].delay(Option(messages.poll())) // `poll` is non-blocking, returning `null` if queue is empty
next <- nextOpt.fold(blocker.delay(messages.take()))(Applicative[F].pure) // `take` can wait for an element
} yield next
more <- Sync[F].delay(new ArrayList[A])
_ <- Sync[F].delay(messages.drainTo(more))
} yield next :: more.asScala.toList

def subscribe[F[_]: Sync: ContextShift](
blocker: Blocker,
projectId: PublicModel.ProjectId,
subscription: PublicModel.Subscription,
config: PubsubGoogleConsumerConfig[F],
): Stream[F, Model.Record[F]] =
for {

): Stream[F, Model.Record[F]] = {
val chunked = for {
queue <- Stream.eval(
Sync[F].delay(new LinkedBlockingQueue[Either[InternalPubSubError, Model.Record[F]]](config.maxQueueSize))
)
_ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue, blocker))
next <- Stream.repeatEval(takeNextElement(queue, blocker))
msg <- Stream.fromEither[F](next)
} yield msg
_ <- Stream.resource(PubsubSubscriber.createSubscriber(projectId, subscription, config, queue, blocker))
taken <- Stream.repeatEval(takeNextElements(queue, blocker))
chunk <- Stream.fromEither[F](taken.sequence)
} yield Chunk.seq(chunk)

chunked.flatMap(Stream.chunk)
}
}

0 comments on commit 9370e7d

Please sign in to comment.