Skip to content
This repository has been archived by the owner on Mar 27, 2023. It is now read-only.

Commit

Permalink
Make timings configurable
Browse files Browse the repository at this point in the history
- Add configurable measurement Cassandra TTLs
- Add configurable Cassandra Query timeouts
- Add names and debug logging to schedulers
- Make scheduler intervals configurable
- Add startup complete message
- Add graceful shutdown via stdin if enabled
  • Loading branch information
Michel Zimmer committed Jun 24, 2022
1 parent 9e00282 commit a864311
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 55 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ LABEL org.opencontainers.image.vendor="neuland – Büro für Informatik GmbH"
LABEL org.opencontainers.image.licenses="Apache-2.0"
LABEL org.opencontainers.image.title="bandwhichd-server"
LABEL org.opencontainers.image.description="bandwhichd server collecting measurements and calculating statistics"
LABEL org.opencontainers.image.version="0.5.1"
LABEL org.opencontainers.image.version="0.6.0-rc1"
USER guest
ENTRYPOINT ["/opt/java/openjdk/bin/java"]
CMD ["-jar", "/opt/bandwhichd-server.jar"]
EXPOSE 8080
HEALTHCHECK --interval=5s --timeout=1s --start-period=2s --retries=2 \
CMD wget --spider http://localhost:8080/v1/health || exit 1
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.5.1.jar /opt/bandwhichd-server.jar
COPY --from=build --chown=root:root /tmp/bandwhichd-server/target/scala-3.1.2/bandwhichd-server-assembly-0.6.0-rc1.jar /opt/bandwhichd-server.jar
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@ lazy val root = (project in file("."))
.settings(
organization := "de.neuland-bfi",
name := "bandwhichd-server",
version := "0.5.1",
version := "0.6.0-rc1",
scalaVersion := "3.1.2",
Compile / scalaSource := baseDirectory.value / "src" / "main" / "scala",
Test / scalaSource := baseDirectory.value / "src" / "test" / "scala",
Test / fork := true,
run / fork := true,
run / connectInput := true,
javaOptions := Seq(
"-Dorg.slf4j.simpleLogger.log.de.neuland.bandwhichd=debug"
),
ThisBuild / assemblyMergeStrategy := {
case PathList(ps @ _*) if ps.last endsWith "module-info.class" =>
MergeStrategy.discard
Expand All @@ -25,6 +30,7 @@ lazy val root = (project in file("."))
libraryDependencies += "io.circe" %% "circe-core" % "0.14.2",
libraryDependencies += "io.circe" %% "circe-parser" % "0.14.2",
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.3.12",
libraryDependencies += "org.typelevel" %% "log4cats-slf4j" % "2.3.2",
libraryDependencies += "org.http4s" %% "http4s-circe" % "1.0.0-M32",
libraryDependencies += "org.http4s" %% "http4s-core" % "1.0.0-M32",
libraryDependencies += "org.http4s" %% "http4s-dsl" % "1.0.0-M32",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ import de.neuland.bandwhichd.server.application.StatsApplicationService
import de.neuland.bandwhichd.server.lib.scheduling.{Schedule, Scheduler, Work}

import scala.concurrent.duration.{FiniteDuration, SECONDS}
import de.neuland.bandwhichd.server.boot.Configuration
import scala.jdk.DurationConverters.*

class AggregationScheduler[F[_]: Monad](
private val configuration: Configuration,
private val statsApplicationService: StatsApplicationService[F]
) extends Scheduler[F] {
override def schedule: F[Schedule[F]] =
Monad[F].pure(
Schedule.Pausing(
FiniteDuration(10, SECONDS),
Work({ statsApplicationService.recalculate })
getClass.getSimpleName,
configuration.aggregationSchedulerInterval.toScala,
Work(statsApplicationService.recalculate)
)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ class MeasurementCassandraRepository[F[_]: Async](
override def record(measurement: Measurement[Timing]): F[Unit] =
cassandraContext.executeRawExpectNoRow(
SimpleStatement
.builder("insert into measurements json ?")
.builder("insert into measurements json ? using ttl ?")
.addPositionalValues(
Encoder[Measurement[Timing]]
.apply(measurement)
.noSpaces
.noSpaces,
measurement match
case _: Measurement.NetworkConfiguration =>
configuration.measurementNetworkConfigurationTTL.toSeconds.toInt
case _: Measurement.NetworkUtilization =>
configuration.measurementNetworkUtilizationTTL.toSeconds.toInt
)
.setKeyspace(configuration.measurementsKeyspace)
.setTimeout(configuration.recordMeasurementQueryTimeout)
.build()
)

Expand All @@ -40,6 +46,7 @@ class MeasurementCassandraRepository[F[_]: Async](
SimpleStatement
.builder("select json * from measurements")
.setKeyspace(configuration.measurementsKeyspace)
.setTimeout(configuration.getAllMeasurementsQueryTimeout)
.build()
)
.flatMap(reactiveRow =>
Expand Down
67 changes: 41 additions & 26 deletions src/main/scala/de/neuland/bandwhichd/server/boot/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ import org.http4s.dsl.io.*
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.server.{Router, Server}
import org.http4s.{HttpApp, HttpRoutes}
import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.typelevel.log4cats.{Logger, SelfAwareStructuredLogger}

import java.io.{BufferedReader, InputStreamReader}
import java.util.Scanner
import scala.io.StdIn

class App[F[_]: Async](
Expand Down Expand Up @@ -56,6 +60,19 @@ class App[F[_]: Async](
statsRepository = statsRepository
)

// in scheduling
val aggregationScheduler: Scheduler[F] =
AggregationScheduler[F](
configuration = configuration,
statsApplicationService = statsApplicationService
)

// scheduling
val schedulersOperator: SchedulersOperator[F] =
SchedulersOperator[F](
aggregationScheduler
)

// in http
val healthController: HealthController[F] =
HealthController[F]()
Expand All @@ -68,12 +85,6 @@ class App[F[_]: Async](
statsApplicationService = statsApplicationService
)

// in scheduling
val aggregationScheduler: Scheduler[F] =
AggregationScheduler[F](
statsApplicationService = statsApplicationService
)

// http
val routes: Routes[F] =
Routes[F](
Expand All @@ -84,42 +95,46 @@ class App[F[_]: Async](
// org.http4s.syntax.KleisliResponseOps#orNotFound
val httpApp: HttpApp[F] =
routes.routes.orNotFound

// scheduling
val schedulersOperator: Operator[F] =
SchedulersOperator[F](
aggregationScheduler
)
}

object App extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val schedulerOutcomeR = for {
val outcomeR = for {
configuration <- Configuration.resource[IO]
cassandraContext <- CassandraContext.resource[IO](configuration)
_ <- Resource.eval(
CassandraMigration(cassandraContext).migrate(configuration)
)
main = App[IO](cassandraContext, configuration)
schedulerOutcomeF <- main.schedulersOperator.resource
_ <- EmberServerBuilder
server <- EmberServerBuilder
.default[IO]
.withHostOption(None)
.withHttpApp(main.httpApp)
.build
} yield schedulerOutcomeF
logger <- Resource.eval(Slf4jLogger.create[IO])
_ <- Resource.eval(
logger.info(
s"bandwhichd-server startup complete - ${main.schedulersOperator.size} scheduler - listening on ${server.address}"
)
)
lineF <- Resource.eval(IO.delay {
for {
line <- IO.interruptible {
StdIn.readLine()
}
_ <- if (line == null) IO.never else IO.unit
} yield ()
})
} yield schedulerOutcomeF.race(lineF)

for {
schedulerOutcome <- schedulerOutcomeR.use(identity)
} yield {
schedulerOutcome match
case Outcome.Succeeded(_) =>
ExitCode.Success
case Outcome.Errored(throwable) =>
throwable.printStackTrace()
ExitCode.Error
case Outcome.Canceled() =>
ExitCode.Error
outcomeR.use { outcomeF =>
for {
outcome <- outcomeF
} yield outcome match
case Right(_) => ExitCode.Success
case Left(Outcome.Succeeded(_)) => ExitCode.Success
case _ => ExitCode.Error
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,31 @@ import cats.{Defer, Monad, MonadError, Traverse}
import com.comcast.ip4s.{Dns, Host, IpAddress, SocketAddress}
import com.datastax.oss.driver.api.core.CqlIdentifier

import java.time.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success, Try}

case class Configuration(
contactPoints: Seq[SocketAddress[IpAddress]],
localDatacenter: String,
measurementsKeyspace: CqlIdentifier
measurementsKeyspace: CqlIdentifier,
measurementNetworkConfigurationTTL: Duration,
measurementNetworkUtilizationTTL: Duration,
recordMeasurementQueryTimeout: Duration,
getAllMeasurementsQueryTimeout: Duration,
aggregationSchedulerInterval: Duration
)

object Configuration {
def resolve[F[_]: Sync](
contactPoints: String,
localDatacenter: String,
measurementsKeyspace: String
measurementsKeyspace: String,
measurementNetworkConfigurationTTL: String,
measurementNetworkUtilizationTTL: String,
recordMeasurementQueryTimeout: String,
getAllMeasurementsQueryTimeout: String,
aggregationSchedulerInterval: String
): F[Configuration] = {

val maybeHostnameContactPoints = contactPoints
Expand Down Expand Up @@ -48,15 +60,34 @@ object Configuration {
} yield Configuration(
contactPoints = ipAddressContactPoints,
localDatacenter = localDatacenter,
measurementsKeyspace = CqlIdentifier.fromCql(measurementsKeyspace)
measurementsKeyspace = CqlIdentifier.fromCql(measurementsKeyspace),
measurementNetworkConfigurationTTL =
Duration.parse(measurementNetworkConfigurationTTL),
measurementNetworkUtilizationTTL =
Duration.parse(measurementNetworkUtilizationTTL),
recordMeasurementQueryTimeout =
Duration.parse(recordMeasurementQueryTimeout),
getAllMeasurementsQueryTimeout =
Duration.parse(getAllMeasurementsQueryTimeout),
aggregationSchedulerInterval =
Duration.parse(aggregationSchedulerInterval)
)
}

def resolveEnv[F[_]: Sync]: F[Configuration] =
resolve(
scala.util.Properties.envOrElse("CONTACT_POINTS", "localhost:9042"),
scala.util.Properties.envOrElse("LOCAL_DATACENTER", "datacenter1"),
scala.util.Properties.envOrElse("MEASUREMENTS_KEYSPACE", "bandwhichd")
scala.util.Properties.envOrElse("MEASUREMENTS_KEYSPACE", "bandwhichd"),
scala.util.Properties
.envOrElse("MEASUREMENT_NETWORK_CONFIGURATION_TTL", "PT2H"),
scala.util.Properties
.envOrElse("MEASUREMENT_NETWORK_UTILIZATION_TTL", "PT2H"),
scala.util.Properties
.envOrElse("RECORD_MEASUREMENT_QUERY_TIMEOUT", "PT2S"),
scala.util.Properties
.envOrElse("GET_ALL_MEASUREMENTS_QUERY_TIMEOUT", "PT8S"),
scala.util.Properties.envOrElse("AGGREGATION_SCHEDULER_INTERVAL", "PT10S")
)

def resource[F[_]: Sync]: Resource[F, Configuration] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ sealed trait Schedule[F[_]] {

object Schedule {
case class Pausing[F[_]](
name: String,
pauseDuration: FiniteDuration,
work: Work[F]
) extends Schedule[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package de.neuland.bandwhichd.server.lib.scheduling

import cats.effect.*
import cats.implicits.*
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.util.concurrent.atomic.AtomicBoolean
import scala.util.{Failure, Success, Try}
Expand All @@ -11,15 +12,18 @@ class SchedulerOperator[F[_]: Async](
) extends Operator[F] {
override def resource: Resource[F, F[Outcome[F, Throwable, Unit]]] =
for {
schedule <- Resource.make(
Async[F].defer(scheduler.schedule)
)(_ => Async[F].pure(()))
logger <- Resource.eval(Slf4jLogger.create[F])
schedule <- Resource.eval(Async[F].defer(scheduler.schedule))
outcome: F[Outcome[F, Throwable, Unit]] <- Async[F].background {
schedule match
case Schedule.Pausing(pauseDuration, work) =>
case Schedule.Pausing(name, pauseDuration, work) =>
def cycle: F[Unit] =
for {
_ <- work.run
_ <- logger.debug(s"Running $name")
_ <- Async[F].onError(work.run) { case e =>
logger.error(e)(s"Scheduler $name failed")
}
_ <- logger.debug(s"Pausing $name for $pauseDuration")
_ <- Async[F].sleep(pauseDuration)
_ <- cycle
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import cats.implicits.*

class SchedulersOperator[F[_]: Async](private val schedulers: Scheduler[F]*)
extends Operator[F] {
def size: Int = schedulers.size

override def resource: Resource[F, F[Outcome[F, Throwable, Unit]]] =
schedulers
.map(scheduler => SchedulerOperator(scheduler))
Expand Down Expand Up @@ -37,5 +39,4 @@ class SchedulersOperator[F[_]: Async](private val schedulers: Scheduler[F]*)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ import de.neuland.bandwhichd.server.adapter.in.v1.message.{
ApiV1MessageV1Fixtures,
MessageController
}
import de.neuland.bandwhichd.server.boot.{App, Configuration, Routes}
import de.neuland.bandwhichd.server.boot.{
App,
Configuration,
ConfigurationFixtures,
Routes
}
import de.neuland.bandwhichd.server.domain.measurement.MeasurementFixtures
import de.neuland.bandwhichd.server.lib.cassandra.CassandraContext
import de.neuland.bandwhichd.server.lib.test.cassandra.CassandraContainer
Expand All @@ -31,11 +36,7 @@ class BandwhichDServerApiV1Spec

override val container: CassandraContainer = CassandraContainer()
private def configuration: Configuration =
Configuration(
contactPoints = Seq(container.container.socket),
localDatacenter = container.datacenter,
measurementsKeyspace = CqlIdentifier.fromCql("bandwhichd")
)
ConfigurationFixtures.testDefaults(container)

"bandwhichd-server v1 API" should {
"have health status" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import com.datastax.oss.driver.api.core.CqlIdentifier
import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, SimpleStatement}
import com.dimafeng.testcontainers.ForAllTestContainer
import de.neuland.bandwhichd.server.adapter.out.CassandraMigration
import de.neuland.bandwhichd.server.boot.Configuration
import de.neuland.bandwhichd.server.boot.{Configuration, ConfigurationFixtures}
import de.neuland.bandwhichd.server.domain.measurement.{
MeasurementFixtures,
MeasurementRepository,
Expand All @@ -30,11 +30,7 @@ class MeasurementCassandraRepositorySpec

override val container: CassandraContainer = CassandraContainer()
private def configuration: Configuration =
Configuration(
contactPoints = Seq(container.container.socket),
localDatacenter = container.datacenter,
measurementsKeyspace = CqlIdentifier.fromCql("bandwhichd")
)
ConfigurationFixtures.testDefaults(container)

"MeasurementCassandraRepository" should {
"record and get measurements" in {
Expand Down
Loading

0 comments on commit a864311

Please sign in to comment.