Skip to content

Commit

Permalink
Updated BufferManager to warn on retries until a threshold is reached
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Jul 13, 2023
1 parent 9535f05 commit ab232c6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/scala-steward.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name: Scala Steward
on:
schedule:
- cron: '0 0 * * 0'
- cron: '0 0 * * *'
workflow_dispatch:
jobs:
scala-steward:
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spice"
ThisBuild / organization := "com.outr"
ThisBuild / version := "0.1.4"
ThisBuild / version := "0.1.5-SNAPSHOT"

val scala213: String = "2.13.11"
val scala3: String = "3.3.0"
Expand Down
26 changes: 16 additions & 10 deletions core/shared/src/main/scala/spice/util/BufferManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,22 @@ package spice.util

import cats.effect.{FiberIO, IO}
import cats.syntax.all._

import scribe.cats.{io => logger}

import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._

case class BufferManager(checkEvery: FiniteDuration = 10.seconds,
triggerAfter: Int = 100,
maxPerBatch: Int = 5000,
checkFrequency: FiniteDuration = 1.second,
sendEmpty: Boolean = false) {
triggerAfter: Int = 100,
maxPerBatch: Int = 5000,
checkFrequency: FiniteDuration = 1.second,
sendEmpty: Boolean = false,
logErrorAfter: Int = 3) {
private var queues = List.empty[BufferQueue[_]]
private val lastCheck = new AtomicLong(System.currentTimeMillis())
private var keepAlive = true

def start: IO[FiberIO[Unit]] = recurse().start
def start: IO[FiberIO[Unit]] = recurse(0).start

def stop(): IO[Unit] = IO {
keepAlive = false
Expand All @@ -29,7 +29,7 @@ case class BufferManager(checkEvery: FiniteDuration = 10.seconds,
q
}

private def recurse(): IO[Unit] = IO
private def recurse(failures: Int): IO[Unit] = IO
.sleep(checkFrequency)
.flatMap { _ =>
val timeElapsed: Boolean = lastCheck.get() + checkEvery.toMillis < System.currentTimeMillis()
Expand All @@ -40,13 +40,19 @@ case class BufferManager(checkEvery: FiniteDuration = 10.seconds,
.map { _ =>
if (timeElapsed) lastCheck.set(System.currentTimeMillis())
}
.flatMap(_ => recurse())
.flatMap(_ => recurse(0))
.whenA(keepAlive)
}
.handleErrorWith { throwable =>
logger.error("An error occurred processing the buffer. Delaying before trying again.", throwable)
val message = s"An error occurred processing the buffer (failure count: $failures). Delaying before trying again."
val log = if (failures < logErrorAfter) {
logger.warn(message, throwable)
} else {
logger.error(message, throwable)
}
log
.flatMap { _ =>
IO.sleep(checkEvery).flatMap(_ => recurse()).whenA(keepAlive)
IO.sleep(checkEvery).flatMap(_ => recurse(failures + 1)).whenA(keepAlive)
}
}
}

0 comments on commit ab232c6

Please sign in to comment.