From b160186ab9fd0b1d8e3c047eb5d6868f3a590167 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Fri, 23 Apr 2021 14:13:57 +0100 Subject: [PATCH] Update http4s to 1.0.0-M21 --- build.sbt | 6 ++-- .../middleware/client/KamonSupport.scala | 11 +++--- .../middleware/server/KamonSupport.scala | 4 +-- src/main/scala/kamon/http4s/package.scala | 20 +++++------ .../http4s/ClientInstrumentationSpec.scala | 6 ++-- .../scala/kamon/http4s/HttpMetricsSpec.scala | 12 +++---- .../http4s/ServerInstrumentationSpec.scala | 35 +++++++++---------- 7 files changed, 46 insertions(+), 48 deletions(-) diff --git a/build.sbt b/build.sbt index 8d464f8..9239d31 100644 --- a/build.sbt +++ b/build.sbt @@ -17,9 +17,9 @@ val kamonCore = "io.kamon" %% "kamon-core" % "2.1 val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.1.0" val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.1.0" -val server = "org.http4s" %% "http4s-blaze-server" % "0.21.3" -val client = "org.http4s" %% "http4s-blaze-client" % "0.21.3" -val dsl = "org.http4s" %% "http4s-dsl" % "0.21.3" +val server = "org.http4s" %% "http4s-blaze-server" % "1.0.0-M21" +val client = "org.http4s" %% "http4s-blaze-client" % "1.0.0-M21" +val dsl = "org.http4s" %% "http4s-dsl" % "1.0.0-M21" lazy val root = (project in file(".")) diff --git a/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala b/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala index 3dba4c7..980b253 100644 --- a/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala +++ b/src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala @@ -40,11 +40,10 @@ object KamonSupport { def apply[F[_]](underlying: Client[F])(implicit F:Sync[F]): Client[F] = Client { request => + // this needs to run on the same thread as the caller, so can't be suspended in F + val ctx = Kamon.currentContext() - for { - ctx <- Resource.liftF(F.delay(Kamon.currentContext())) - k <- kamonClient(underlying)(request)(ctx)(_instrumentation) - } yield k + kamonClient(underlying)(request)(ctx)(_instrumentation) } @@ -54,9 +53,9 @@ object KamonSupport { (instrumentation: HttpClientInstrumentation) (implicit F:Sync[F]): Resource[F, Response[F]] = for { - requestHandler <- Resource.liftF(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) + requestHandler <- Resource.eval(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx))) response <- underlying.run(requestHandler.request).attempt - trackedResponse <- Resource.liftF(handleResponse(response, requestHandler, instrumentation.settings)) + trackedResponse <- Resource.eval(handleResponse(response, requestHandler, instrumentation.settings)) } yield trackedResponse def handleResponse[F[_]]( diff --git a/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala b/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala index a47f121..5bd044d 100644 --- a/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala +++ b/src/main/scala/kamon/http4s/middleware/server/KamonSupport.scala @@ -32,7 +32,7 @@ object KamonSupport { val httpServerConfig = Kamon.config().getConfig("kamon.instrumentation.http4s.server") val instrumentation = HttpServerInstrumentation.from(httpServerConfig, "http4s.server", interface, port) - Kleisli(kamonService[F](service, instrumentation)(_)) + Kleisli((request: Request[F]) => kamonService[F](service, instrumentation)(request)) } @@ -56,7 +56,7 @@ object KamonSupport { private def getHandler[F[_]](instrumentation: HttpServerInstrumentation)(request: Request[F])(implicit F: Sync[F]): Resource[F, RequestHandler] = for { - handler <- Resource.liftF(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) + handler <- Resource.eval(F.delay(instrumentation.createHandler(buildRequestMessage(request)))) _ <- processRequest(handler) _ <- withContext(handler) } yield handler diff --git a/src/main/scala/kamon/http4s/package.scala b/src/main/scala/kamon/http4s/package.scala index f08d786..a643ccc 100644 --- a/src/main/scala/kamon/http4s/package.scala +++ b/src/main/scala/kamon/http4s/package.scala @@ -3,7 +3,7 @@ package kamon import org.http4s.{Header, Headers, Request, Response, Status} import kamon.instrumentation.http.HttpMessage import kamon.instrumentation.http.HttpMessage.ResponseBuilder -import org.http4s.util.CaseInsensitiveString +import org.typelevel.ci.CIString package object http4s { @@ -11,7 +11,7 @@ package object http4s { def buildRequestMessage[F[_]](inner: Request[F]): HttpMessage.Request = new HttpMessage.Request { override def url: String = inner.uri.toString() - override def path: String = inner.uri.path + override def path: String = inner.uri.path.toString override def method: String = inner.method.name @@ -19,11 +19,11 @@ package object http4s { override def port: Int = inner.uri.authority.flatMap(_.port).getOrElse(0) - override def read(header: String): Option[String] = inner.headers.get(CaseInsensitiveString(header)).map(_.value) + override def read(header: String): Option[String] = inner.headers.get(CIString(header)).map(_.toString) override def readAll(): Map[String, String] = { val builder = Map.newBuilder[String, String] - inner.headers.foreach(h => builder += (h.name.value -> h.value)) + inner.headers.foreach(h => builder += (h.name.toString -> h.toString)) builder.result() } } @@ -39,7 +39,7 @@ package object http4s { private var _headers = Headers.empty override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) override def statusCode: Int = 404 override def build(): Response[F] = new Response[F](status = Status.NotFound, headers = _headers) @@ -53,7 +53,7 @@ package object http4s { override def build(): Response[F] = response.withHeaders(_headers) override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) } @@ -63,11 +63,11 @@ package object http4s { override def build(): Request[F] = request.withHeaders(_headers) override def write(header: String, value: String): Unit = - _headers = _headers.put(Header(header, value)) + _headers = _headers.put(Header.Raw(CIString(header), value)) override def url: String = request.uri.toString() - override def path: String = request.uri.path + override def path: String = request.uri.path.toString() override def method: String = request.method.name @@ -75,11 +75,11 @@ package object http4s { override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(0) - override def read(header: String): Option[String] = _headers.get(CaseInsensitiveString(header)).map(_.value) + override def read(header: String): Option[String] = _headers.get(CIString(header)).map(_.toString) override def readAll(): Map[String, String] = { val builder = Map.newBuilder[String, String] - request.headers.foreach(h => builder += (h.name.value -> h.value)) + request.headers.foreach(h => builder += (h.name.toString -> h.value)) builder.result() } } diff --git a/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala b/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala index 3bcd66c..e2ed42c 100644 --- a/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala +++ b/src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala @@ -19,6 +19,7 @@ package kamon.http4s import java.net.ConnectException import cats.effect.{IO, Resource} +import cats.effect.unsafe.implicits.global import kamon.Kamon import kamon.http4s.middleware.client.KamonSupport import kamon.testkit.TestSpanReporter @@ -54,7 +55,8 @@ class ClientInstrumentationSpec extends WordSpec val okSpan = Kamon.spanBuilder("ok-operation-span").start() Kamon.runWithSpan(okSpan) { - client.expect[String]("/tracing/ok").unsafeRunSync() shouldBe "ok" + val result = client.expect[String]("/tracing/ok").unsafeRunSync() + result shouldBe "ok" } eventually(timeout(2 seconds)) { @@ -74,7 +76,7 @@ class ClientInstrumentationSpec extends WordSpec "close and finish a span even if an exception is thrown by the client" in { val okSpan = Kamon.spanBuilder("client-exception").start() val client: Client[IO] = KamonSupport[IO]( - Client(_ => Resource.liftF(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) + Client(_ => Resource.eval(IO.raiseError[Response[IO]](new ConnectException("Connection Refused.")))) ) Kamon.runWithSpan(okSpan) { diff --git a/src/test/scala/kamon/http4s/HttpMetricsSpec.scala b/src/test/scala/kamon/http4s/HttpMetricsSpec.scala index 1509476..b79e89b 100644 --- a/src/test/scala/kamon/http4s/HttpMetricsSpec.scala +++ b/src/test/scala/kamon/http4s/HttpMetricsSpec.scala @@ -17,6 +17,7 @@ package kamon.http4s import cats.effect._ +import cats.effect.unsafe.implicits.global import kamon.testkit.InstrumentInspection import org.http4s.HttpRoutes import org.http4s.dsl.io._ @@ -42,11 +43,8 @@ class HttpMetricsSpec extends WordSpec with OptionValues { - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - val srv = - BlazeServerBuilder[IO] + BlazeServerBuilder[IO](global.compute) .bindLocal(43567) .withHttpApp(KamonSupport(HttpRoutes.of[IO] { case GET -> Root / "tracing" / "ok" => Ok("ok") @@ -59,13 +57,13 @@ class HttpMetricsSpec extends WordSpec BlazeClientBuilder[IO](ExecutionContext.global).withMaxTotalConnections(10).resource val metrics = - Resource.liftF(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) + Resource.eval(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567))) - def withServerAndClient[A](f: (Server[IO], Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = + def withServerAndClient[A](f: (Server, Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A = (srv, client, metrics).tupled.use(f.tupled).unsafeRunSync() - private def get[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[String] = { + private def get[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[String] = { client.expect[String](s"http://127.0.0.1:${server.address.getPort}$path") } diff --git a/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala b/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala index ef4e568..27d55c6 100644 --- a/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala +++ b/src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala @@ -16,7 +16,8 @@ package kamon.http4s -import cats.effect.{ContextShift, IO, Sync, Timer} +import cats.effect.{IO, Async} +import cats.effect.unsafe.implicits.global import kamon.http4s.middleware.server.KamonSupport import kamon.trace.Span import org.http4s.{Headers, HttpRoutes} @@ -29,12 +30,11 @@ import org.scalatest.concurrent.Eventually import org.scalatest.time.SpanSugar import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} -import scala.concurrent.ExecutionContext import org.http4s.implicits._ import cats.implicits._ import kamon.testkit.TestSpanReporter import kamon.tag.Lookups.{plain, plainLong} -import org.http4s.util.CaseInsensitiveString +import org.typelevel.ci.CIString class ServerInstrumentationSpec extends WordSpec with Matchers @@ -44,13 +44,9 @@ class ServerInstrumentationSpec extends WordSpec with TestSpanReporter with BeforeAndAfterAll { - implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - val srv = - BlazeServerBuilder[IO] + BlazeServerBuilder[IO](global.compute) .bindAny() - .withExecutionContext(ExecutionContext.global) .withHttpApp(KamonSupport(HttpRoutes.of[IO] { case GET -> Root / "tracing" / "ok" => Ok("ok") case GET -> Root / "tracing" / "error" => InternalServerError("error!") @@ -61,21 +57,21 @@ class ServerInstrumentationSpec extends WordSpec .resource val client = - BlazeClientBuilder[IO](ExecutionContext.global).resource + BlazeClientBuilder[IO](global.compute).resource - def withServerAndClient[A](f: (Server[IO], Client[IO]) => IO[A]): A = + def withServerAndClient[A](f: (Server, Client[IO]) => IO[A]): A = (srv, client).tupled.use(f.tupled).unsafeRunSync() - private def getResponse[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[(String, Headers)] = { + private def getResponse[F[_]: Async](path: String)(server: Server, client: Client[F]): F[(String, Headers)] = { client.get(s"http://127.0.0.1:${server.address.getPort}$path"){ r => - r.bodyAsText.compile.toList.map(_.mkString).map(_ -> r.headers) + r.as[String].tupleRight(r.headers) } } "The Server instrumentation" should { "propagate the current context and respond to the ok action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/ok")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.headers.exists(_.name == CIString("trace-id")) shouldBe true body should startWith("ok") } @@ -91,12 +87,15 @@ class ServerInstrumentationSpec extends WordSpec } } - request *> test + request.attempt.flatTap{ + case Left(e) => IO(e.printStackTrace) + case _ => IO.unit + } *> test } "propagate the current context and respond to the not-found action" in withServerAndClient { (server, client) => - val request = getResponse("/tracing/not-found")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + val request = getResponse("/tracing/not-found")(server, client).map { case (_, headers) => + headers.headers.exists(_.name == CIString("trace-id")) shouldBe true } val test = IO { @@ -116,7 +115,7 @@ class ServerInstrumentationSpec extends WordSpec "propagate the current context and respond to the error action" in withServerAndClient { (server, client) => val request = getResponse("/tracing/error")(server, client).map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.headers.exists(_.name == CIString("trace-id")) shouldBe true body should startWith("error!") } @@ -139,7 +138,7 @@ class ServerInstrumentationSpec extends WordSpec val request = getResponse("/tracing/errorinternal")(server, client) /* TODO serviceErrorHandler kicks in and rewrites response, loosing trace information .map { case (body, headers) => - headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true + headers.exists(_.name == CIString("trace-id")) shouldBe true } */