Skip to content

Commit

Permalink
allow configuration of custom headers for the schemaregistry
Browse files Browse the repository at this point in the history
  • Loading branch information
hagl committed Aug 1, 2024
1 parent cd26a58 commit 5c129cf
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions src/Kafka/Avro/SchemaRegistry.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
module Kafka.Avro.SchemaRegistry
( schemaRegistry, loadSchema, sendSchema
, schemaRegistry_
, schemaRegistryWithHeaders
, loadSubjectSchema
, getGlobalConfig, getSubjectConfig
, getVersions, isCompatible
Expand All @@ -20,7 +21,7 @@ module Kafka.Avro.SchemaRegistry
import Control.Arrow (first)
import Control.Exception (throwIO, SomeException (SomeException))
import Control.Exception.Safe (try, MonadCatch)
import Control.Lens (view, (&), (.~), (^.))
import Control.Lens (view, (&), (.~), (^.), (%~))
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Except (ExceptT (ExceptT), except, runExceptT, withExceptT)
Expand All @@ -44,6 +45,7 @@ import Data.Word (Word32)
import GHC.Exception (SomeException, displayException, fromException)
import GHC.Generics (Generic)
import Network.HTTP.Client (HttpException (..), HttpExceptionContent (..), Manager, defaultManagerSettings, newManager, responseStatus)
import Network.HTTP.Types.Header (Header)
import Network.HTTP.Types.Status (notFound404)
import qualified Network.Wreq as Wreq

Expand All @@ -67,6 +69,7 @@ data SchemaRegistry = SchemaRegistry
, srReverseCache :: Cache (Subject, SchemaName) SchemaId
, srBaseUrl :: String
, srAuth :: Maybe Wreq.Auth
, srExtraHeaders :: [Header]
}

data SchemaRegistryError = SchemaRegistryConnectError String
Expand All @@ -81,13 +84,16 @@ schemaRegistry :: MonadIO m => String -> m SchemaRegistry
schemaRegistry = schemaRegistry_ Nothing

schemaRegistry_ :: MonadIO m => Maybe Wreq.Auth -> String -> m SchemaRegistry
schemaRegistry_ auth url = liftIO $
schemaRegistry_ auth = schemaRegistryWithHeaders auth []

schemaRegistryWithHeaders :: MonadIO m => Maybe Wreq.Auth -> [Header] -> String -> m SchemaRegistry
schemaRegistryWithHeaders auth headers url = liftIO $
SchemaRegistry
<$> newCache Nothing
<*> newCache Nothing
<*> pure url
<*> pure auth

<*> pure headers

loadSchema :: MonadIO m => SchemaRegistry -> SchemaId -> m (Either SchemaRegistryError Schema)
loadSchema sr sid = do
Expand Down Expand Up @@ -197,7 +203,8 @@ wreqOpts sr =
accept = ["application/vnd.schemaregistry.v1+json", "application/vnd.schemaregistry+json", "application/json"]
acceptHeader = Wreq.header "Accept" .~ accept
authHeader = Wreq.auth .~ srAuth sr
in Wreq.defaults & acceptHeader & authHeader
extraHeaders = Wreq.headers %~ (++ srExtraHeaders sr)
in Wreq.defaults & acceptHeader & authHeader & extraHeaders

getSchemaById :: SchemaRegistry -> SchemaId -> IO (Either SchemaRegistryError RegisteredSchema)
getSchemaById sr sid@(SchemaId i) = runExceptT $ do
Expand Down Expand Up @@ -249,7 +256,7 @@ wrapErrorWith mkError x exception = case fromException exception of
_ -> wrapError exception

tryWith :: MonadCatch m => (SomeException -> e) -> m a -> ExceptT e m a
tryWith wrapException = withExceptT wrapException . ExceptT . try
tryWith wrapException = withExceptT wrapException . ExceptT . try

---------------------------------------------------------------------
fullTypeName :: Schema -> SchemaName
Expand Down

0 comments on commit 5c129cf

Please sign in to comment.