Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build for http4s 1.0.0-M21 / cats-effect 3 #43

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ val kamonCore = "io.kamon" %% "kamon-core" % "2.1
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.1.0"
val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.1.0"

val server = "org.http4s" %% "http4s-blaze-server" % "0.21.3"
val client = "org.http4s" %% "http4s-blaze-client" % "0.21.3"
val dsl = "org.http4s" %% "http4s-dsl" % "0.21.3"
val server = "org.http4s" %% "http4s-blaze-server" % "1.0.0-M21"
val client = "org.http4s" %% "http4s-blaze-client" % "1.0.0-M21"
val dsl = "org.http4s" %% "http4s-dsl" % "1.0.0-M21"


lazy val root = (project in file("."))
Expand Down
11 changes: 5 additions & 6 deletions src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ object KamonSupport {


def apply[F[_]](underlying: Client[F])(implicit F:Sync[F]): Client[F] = Client { request =>
// this needs to run on the same thread as the caller, so can't be suspended in F
val ctx = Kamon.currentContext()

for {
ctx <- Resource.liftF(F.delay(Kamon.currentContext()))
k <- kamonClient(underlying)(request)(ctx)(_instrumentation)
} yield k
kamonClient(underlying)(request)(ctx)(_instrumentation)
}


Expand All @@ -54,9 +53,9 @@ object KamonSupport {
(instrumentation: HttpClientInstrumentation)
(implicit F:Sync[F]): Resource[F, Response[F]] =
for {
requestHandler <- Resource.liftF(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx)))
requestHandler <- Resource.eval(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx)))
response <- underlying.run(requestHandler.request).attempt
trackedResponse <- Resource.liftF(handleResponse(response, requestHandler, instrumentation.settings))
trackedResponse <- Resource.eval(handleResponse(response, requestHandler, instrumentation.settings))
} yield trackedResponse

def handleResponse[F[_]](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object KamonSupport {
val httpServerConfig = Kamon.config().getConfig("kamon.instrumentation.http4s.server")
val instrumentation = HttpServerInstrumentation.from(httpServerConfig, "http4s.server", interface, port)

Kleisli(kamonService[F](service, instrumentation)(_))
Kleisli((request: Request[F]) => kamonService[F](service, instrumentation)(request))
}


Expand All @@ -56,7 +56,7 @@ object KamonSupport {

private def getHandler[F[_]](instrumentation: HttpServerInstrumentation)(request: Request[F])(implicit F: Sync[F]): Resource[F, RequestHandler] =
for {
handler <- Resource.liftF(F.delay(instrumentation.createHandler(buildRequestMessage(request))))
handler <- Resource.eval(F.delay(instrumentation.createHandler(buildRequestMessage(request))))
_ <- processRequest(handler)
_ <- withContext(handler)
} yield handler
Expand Down
20 changes: 10 additions & 10 deletions src/main/scala/kamon/http4s/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,27 @@ package kamon
import org.http4s.{Header, Headers, Request, Response, Status}
import kamon.instrumentation.http.HttpMessage
import kamon.instrumentation.http.HttpMessage.ResponseBuilder
import org.http4s.util.CaseInsensitiveString
import org.typelevel.ci.CIString

package object http4s {


def buildRequestMessage[F[_]](inner: Request[F]): HttpMessage.Request = new HttpMessage.Request {
override def url: String = inner.uri.toString()

override def path: String = inner.uri.path
override def path: String = inner.uri.path.toString

override def method: String = inner.method.name

override def host: String = inner.uri.authority.map(_.host.value).getOrElse("")

override def port: Int = inner.uri.authority.flatMap(_.port).getOrElse(0)

override def read(header: String): Option[String] = inner.headers.get(CaseInsensitiveString(header)).map(_.value)
override def read(header: String): Option[String] = inner.headers.get(CIString(header)).map(_.toString)

override def readAll(): Map[String, String] = {
val builder = Map.newBuilder[String, String]
inner.headers.foreach(h => builder += (h.name.value -> h.value))
inner.headers.foreach(h => builder += (h.name.toString -> h.toString))
builder.result()
}
}
Expand All @@ -39,7 +39,7 @@ package object http4s {
private var _headers = Headers.empty

override def write(header: String, value: String): Unit =
_headers = _headers.put(Header(header, value))
_headers = _headers.put(Header.Raw(CIString(header), value))

override def statusCode: Int = 404
override def build(): Response[F] = new Response[F](status = Status.NotFound, headers = _headers)
Expand All @@ -53,7 +53,7 @@ package object http4s {
override def build(): Response[F] = response.withHeaders(_headers)

override def write(header: String, value: String): Unit =
_headers = _headers.put(Header(header, value))
_headers = _headers.put(Header.Raw(CIString(header), value))
}


Expand All @@ -63,23 +63,23 @@ package object http4s {
override def build(): Request[F] = request.withHeaders(_headers)

override def write(header: String, value: String): Unit =
_headers = _headers.put(Header(header, value))
_headers = _headers.put(Header.Raw(CIString(header), value))

override def url: String = request.uri.toString()

override def path: String = request.uri.path
override def path: String = request.uri.path.toString()

override def method: String = request.method.name

override def host: String = request.uri.authority.map(_.host.value).getOrElse("")

override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(0)

override def read(header: String): Option[String] = _headers.get(CaseInsensitiveString(header)).map(_.value)
override def read(header: String): Option[String] = _headers.get(CIString(header)).map(_.toString)

override def readAll(): Map[String, String] = {
val builder = Map.newBuilder[String, String]
request.headers.foreach(h => builder += (h.name.value -> h.value))
request.headers.foreach(h => builder += (h.name.toString -> h.value))
builder.result()
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package kamon.http4s
import java.net.ConnectException

import cats.effect.{IO, Resource}
import cats.effect.unsafe.implicits.global
import kamon.Kamon
import kamon.http4s.middleware.client.KamonSupport
import kamon.testkit.TestSpanReporter
Expand Down Expand Up @@ -54,7 +55,8 @@ class ClientInstrumentationSpec extends WordSpec
val okSpan = Kamon.spanBuilder("ok-operation-span").start()

Kamon.runWithSpan(okSpan) {
client.expect[String]("/tracing/ok").unsafeRunSync() shouldBe "ok"
val result = client.expect[String]("/tracing/ok").unsafeRunSync()
result shouldBe "ok"
}

eventually(timeout(2 seconds)) {
Expand All @@ -74,7 +76,7 @@ class ClientInstrumentationSpec extends WordSpec
"close and finish a span even if an exception is thrown by the client" in {
val okSpan = Kamon.spanBuilder("client-exception").start()
val client: Client[IO] = KamonSupport[IO](
Client(_ => Resource.liftF(IO.raiseError[Response[IO]](new ConnectException("Connection Refused."))))
Client(_ => Resource.eval(IO.raiseError[Response[IO]](new ConnectException("Connection Refused."))))
)

Kamon.runWithSpan(okSpan) {
Expand Down
12 changes: 5 additions & 7 deletions src/test/scala/kamon/http4s/HttpMetricsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package kamon.http4s

import cats.effect._
import cats.effect.unsafe.implicits.global
import kamon.testkit.InstrumentInspection
import org.http4s.HttpRoutes
import org.http4s.dsl.io._
Expand All @@ -42,11 +43,8 @@ class HttpMetricsSpec extends WordSpec
with OptionValues
{

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

val srv =
BlazeServerBuilder[IO]
BlazeServerBuilder[IO](global.compute)
.bindLocal(43567)
.withHttpApp(KamonSupport(HttpRoutes.of[IO] {
case GET -> Root / "tracing" / "ok" => Ok("ok")
Expand All @@ -59,13 +57,13 @@ class HttpMetricsSpec extends WordSpec
BlazeClientBuilder[IO](ExecutionContext.global).withMaxTotalConnections(10).resource

val metrics =
Resource.liftF(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567)))
Resource.eval(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567)))


def withServerAndClient[A](f: (Server[IO], Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A =
def withServerAndClient[A](f: (Server, Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A =
(srv, client, metrics).tupled.use(f.tupled).unsafeRunSync()

private def get[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[String] = {
private def get[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[String] = {
client.expect[String](s"http://127.0.0.1:${server.address.getPort}$path")
}

Expand Down
35 changes: 17 additions & 18 deletions src/test/scala/kamon/http4s/ServerInstrumentationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package kamon.http4s

import cats.effect.{ContextShift, IO, Sync, Timer}
import cats.effect.{IO, Async}
import cats.effect.unsafe.implicits.global
import kamon.http4s.middleware.server.KamonSupport
import kamon.trace.Span
import org.http4s.{Headers, HttpRoutes}
Expand All @@ -29,12 +30,11 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec}

import scala.concurrent.ExecutionContext
import org.http4s.implicits._
import cats.implicits._
import kamon.testkit.TestSpanReporter
import kamon.tag.Lookups.{plain, plainLong}
import org.http4s.util.CaseInsensitiveString
import org.typelevel.ci.CIString

class ServerInstrumentationSpec extends WordSpec
with Matchers
Expand All @@ -44,13 +44,9 @@ class ServerInstrumentationSpec extends WordSpec
with TestSpanReporter
with BeforeAndAfterAll {

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

val srv =
BlazeServerBuilder[IO]
BlazeServerBuilder[IO](global.compute)
.bindAny()
.withExecutionContext(ExecutionContext.global)
.withHttpApp(KamonSupport(HttpRoutes.of[IO] {
case GET -> Root / "tracing" / "ok" => Ok("ok")
case GET -> Root / "tracing" / "error" => InternalServerError("error!")
Expand All @@ -61,21 +57,21 @@ class ServerInstrumentationSpec extends WordSpec
.resource

val client =
BlazeClientBuilder[IO](ExecutionContext.global).resource
BlazeClientBuilder[IO](global.compute).resource

def withServerAndClient[A](f: (Server[IO], Client[IO]) => IO[A]): A =
def withServerAndClient[A](f: (Server, Client[IO]) => IO[A]): A =
(srv, client).tupled.use(f.tupled).unsafeRunSync()

private def getResponse[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[(String, Headers)] = {
private def getResponse[F[_]: Async](path: String)(server: Server, client: Client[F]): F[(String, Headers)] = {
client.get(s"http://127.0.0.1:${server.address.getPort}$path"){ r =>
r.bodyAsText.compile.toList.map(_.mkString).map(_ -> r.headers)
r.as[String].tupleRight(r.headers)
}
}

"The Server instrumentation" should {
"propagate the current context and respond to the ok action" in withServerAndClient { (server, client) =>
val request = getResponse("/tracing/ok")(server, client).map { case (body, headers) =>
headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true
headers.headers.exists(_.name == CIString("trace-id")) shouldBe true
body should startWith("ok")
}

Expand All @@ -91,12 +87,15 @@ class ServerInstrumentationSpec extends WordSpec
}
}

request *> test
request.attempt.flatTap{
case Left(e) => IO(e.printStackTrace)
case _ => IO.unit
} *> test
}

"propagate the current context and respond to the not-found action" in withServerAndClient { (server, client) =>
val request = getResponse("/tracing/not-found")(server, client).map { case (body, headers) =>
headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true
val request = getResponse("/tracing/not-found")(server, client).map { case (_, headers) =>
headers.headers.exists(_.name == CIString("trace-id")) shouldBe true
}

val test = IO {
Expand All @@ -116,7 +115,7 @@ class ServerInstrumentationSpec extends WordSpec

"propagate the current context and respond to the error action" in withServerAndClient { (server, client) =>
val request = getResponse("/tracing/error")(server, client).map { case (body, headers) =>
headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true
headers.headers.exists(_.name == CIString("trace-id")) shouldBe true
body should startWith("error!")
}

Expand All @@ -139,7 +138,7 @@ class ServerInstrumentationSpec extends WordSpec
val request = getResponse("/tracing/errorinternal")(server, client)
/* TODO serviceErrorHandler kicks in and rewrites response, loosing trace information
.map { case (body, headers) =>
headers.exists(_.name == CaseInsensitiveString("trace-id")) shouldBe true
headers.exists(_.name == CIString("trace-id")) shouldBe true
}
*/

Expand Down