From ab232c698c0d4900f327973a181d960711fa7c98 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Thu, 13 Jul 2023 11:19:26 -0500 Subject: [PATCH] Updated BufferManager to warn on retries until a threshold is reached --- .github/workflows/scala-steward.yml | 2 +- build.sbt | 2 +- .../main/scala/spice/util/BufferManager.scala | 26 ++++++++++++------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/.github/workflows/scala-steward.yml b/.github/workflows/scala-steward.yml index a440a0c..c871a73 100755 --- a/.github/workflows/scala-steward.yml +++ b/.github/workflows/scala-steward.yml @@ -2,7 +2,7 @@ name: Scala Steward on: schedule: - - cron: '0 0 * * 0' + - cron: '0 0 * * *' workflow_dispatch: jobs: scala-steward: diff --git a/build.sbt b/build.sbt index e0ecc1a..abb4453 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ name := "spice" ThisBuild / organization := "com.outr" -ThisBuild / version := "0.1.4" +ThisBuild / version := "0.1.5-SNAPSHOT" val scala213: String = "2.13.11" val scala3: String = "3.3.0" diff --git a/core/shared/src/main/scala/spice/util/BufferManager.scala b/core/shared/src/main/scala/spice/util/BufferManager.scala index 14080e6..943344e 100644 --- a/core/shared/src/main/scala/spice/util/BufferManager.scala +++ b/core/shared/src/main/scala/spice/util/BufferManager.scala @@ -2,22 +2,22 @@ package spice.util import cats.effect.{FiberIO, IO} import cats.syntax.all._ - import scribe.cats.{io => logger} import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ case class BufferManager(checkEvery: FiniteDuration = 10.seconds, - triggerAfter: Int = 100, - maxPerBatch: Int = 5000, - checkFrequency: FiniteDuration = 1.second, - sendEmpty: Boolean = false) { + triggerAfter: Int = 100, + maxPerBatch: Int = 5000, + checkFrequency: FiniteDuration = 1.second, + sendEmpty: Boolean = false, + logErrorAfter: Int = 3) { private var queues = List.empty[BufferQueue[_]] private val lastCheck = new AtomicLong(System.currentTimeMillis()) private var keepAlive = true - def start: IO[FiberIO[Unit]] = recurse().start + def start: IO[FiberIO[Unit]] = recurse(0).start def stop(): IO[Unit] = IO { keepAlive = false @@ -29,7 +29,7 @@ case class BufferManager(checkEvery: FiniteDuration = 10.seconds, q } - private def recurse(): IO[Unit] = IO + private def recurse(failures: Int): IO[Unit] = IO .sleep(checkFrequency) .flatMap { _ => val timeElapsed: Boolean = lastCheck.get() + checkEvery.toMillis < System.currentTimeMillis() @@ -40,13 +40,19 @@ case class BufferManager(checkEvery: FiniteDuration = 10.seconds, .map { _ => if (timeElapsed) lastCheck.set(System.currentTimeMillis()) } - .flatMap(_ => recurse()) + .flatMap(_ => recurse(0)) .whenA(keepAlive) } .handleErrorWith { throwable => - logger.error("An error occurred processing the buffer. Delaying before trying again.", throwable) + val message = s"An error occurred processing the buffer (failure count: $failures). Delaying before trying again." + val log = if (failures < logErrorAfter) { + logger.warn(message, throwable) + } else { + logger.error(message, throwable) + } + log .flatMap { _ => - IO.sleep(checkEvery).flatMap(_ => recurse()).whenA(keepAlive) + IO.sleep(checkEvery).flatMap(_ => recurse(failures + 1)).whenA(keepAlive) } } } \ No newline at end of file