Skip to content

Commit

Permalink
Refactor remote storage auth to be from a function (#4234)
Browse files Browse the repository at this point in the history
* Refactor remote storage auth to be from a function

* Have a single instance of RemoteDiskStorageClient

* rename AuthTokenProvider

* consistent parameter order

* add doc

* scalafmt

* fix compilation problems
  • Loading branch information
shinyhappydan committed Sep 1, 2023
1 parent f7b24c6 commit 89b10fb
Show file tree
Hide file tree
Showing 30 changed files with 278 additions and 235 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ package ch.epfl.bluebrain.nexus.delta.plugins.storage

import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.sdk.ServiceDependency
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.ComponentDescription.ServiceDescription
import monix.bio.UIO

/**
* Describes the remote storage [[ServiceDependency]] providing a way to extract the [[ServiceDescription]] from a
* remote storage calling its ''/version'' endpoint
*/
class RemoteStorageServiceDependency(remoteClient: RemoteDiskStorageClient) extends ServiceDependency {
class RemoteStorageServiceDependency(remoteClient: RemoteDiskStorageClient, baseUri: BaseUri)
extends ServiceDependency {

override def serviceDescription: UIO[ServiceDescription] =
remoteClient.serviceDescription
remoteClient.serviceDescription(baseUri)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.Sto
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.contexts.{storages => storageCtxId, storagesMetadata => storageMetaCtxId}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageAccess
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.AuthTokenProvider
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.routes.StoragesRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.schemas.{storage => storagesSchemaId}
Expand Down Expand Up @@ -66,24 +67,23 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
(
fetchContext: FetchContext[ContextRejection],
contextResolution: ResolverContextResolution,
remoteDiskStorageClient: RemoteDiskStorageClient,
permissions: Permissions,
xas: Transactors,
cfg: StoragePluginConfig,
serviceAccount: ServiceAccount,
api: JsonLdApi,
client: HttpClient @Id("storage"),
clock: Clock[UIO],
uuidF: UUIDF,
as: ActorSystem[Nothing]
) =>
implicit val classicAs: actor.ActorSystem = as.classicSystem
implicit val storageTypeConfig: StorageTypeConfig = cfg.storages.storageTypeConfig
implicit val c: HttpClient = client
Storages(
fetchContext.mapRejection(StorageRejection.ProjectContextRejection),
contextResolution,
permissions.fetchPermissionSet,
StorageAccess.apply(_, _),
StorageAccess.apply(_, _, remoteDiskStorageClient, storageTypeConfig),
xas,
cfg.storages,
serviceAccount
Expand Down Expand Up @@ -146,12 +146,15 @@ class StoragePluginModule(priority: Int) extends ModuleDef {

many[ResourceShift[_, _, _]].ref[Storage.Shift]

make[AuthTokenProvider].from { (cfg: StorageTypeConfig) =>
AuthTokenProvider(cfg)
}

make[Files]
.fromEffect {
(
cfg: StoragePluginConfig,
storageTypeConfig: StorageTypeConfig,
client: HttpClient @Id("storage"),
aclCheck: AclCheck,
fetchContext: FetchContext[ContextRejection],
storages: Storages,
Expand All @@ -161,6 +164,7 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
clock: Clock[UIO],
uuidF: UUIDF,
as: ActorSystem[Nothing],
remoteDiskStorageClient: RemoteDiskStorageClient,
scheduler: Scheduler
) =>
Task
Expand All @@ -172,10 +176,10 @@ class StoragePluginModule(priority: Int) extends ModuleDef {
storagesStatistics,
xas,
storageTypeConfig,
cfg.files
cfg.files,
remoteDiskStorageClient
)(
clock,
client,
uuidF,
scheduler,
as
Expand Down Expand Up @@ -218,14 +222,23 @@ class StoragePluginModule(priority: Int) extends ModuleDef {

many[ResourceShift[_, _, _]].ref[File.Shift]

make[RemoteDiskStorageClient].from {
(
client: HttpClient @Id("storage"),
as: ActorSystem[Nothing],
authTokenProvider: AuthTokenProvider
) => new RemoteDiskStorageClient(client, authTokenProvider)(as.classicSystem)
}

many[ServiceDependency].addSet {
(cfg: StorageTypeConfig, client: HttpClient @Id("storage"), as: ActorSystem[Nothing]) =>
val remoteStorageClient = cfg.remoteDisk.map { r =>
new RemoteDiskStorageClient(r.defaultEndpoint)(client, as.classicSystem)
}
remoteStorageClient.fold(Set.empty[RemoteStorageServiceDependency])(client =>
Set(new RemoteStorageServiceDependency(client))
)
(
cfg: StorageTypeConfig,
remoteStorageClient: RemoteDiskStorageClient
) =>
cfg.remoteDisk
.map(_.defaultEndpoint)
.map(endpoint => Set(new RemoteStorageServiceDependency(remoteStorageClient, endpoint)))
.getOrElse(Set.empty[RemoteStorageServiceDependency])
}

make[StorageScopeInitialization].from {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.Sto
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageRejection.StorageIsDeprecated
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{DigestAlgorithm, Storage, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchFileRejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.{FetchAttributes, FetchFile, LinkFile, SaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{Storages, StoragesStatistics}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
import ch.epfl.bluebrain.nexus.delta.sdk.directives.FileResponse
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.Caller
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.ExpandIri
Expand All @@ -45,9 +45,9 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.offset.Offset
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.{DroppedElem, SuccessElem}
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor}
import com.typesafe.scalalogging.Logger
import fs2.Stream
import monix.bio.{IO, Task, UIO}
import monix.execution.Scheduler
import fs2.Stream

import java.util.UUID

Expand All @@ -60,8 +60,13 @@ final class Files(
aclCheck: AclCheck,
fetchContext: FetchContext[FileRejection],
storages: Storages,
storagesStatistics: StoragesStatistics
)(implicit config: StorageTypeConfig, client: HttpClient, uuidF: UUIDF, system: ClassicActorSystem) {
storagesStatistics: StoragesStatistics,
remoteDiskStorageClient: RemoteDiskStorageClient,
config: StorageTypeConfig
)(implicit
uuidF: UUIDF,
system: ClassicActorSystem
) {

implicit private val kamonComponent: KamonMetricComponent = KamonMetricComponent(entityType.value)

Expand Down Expand Up @@ -248,7 +253,9 @@ final class Files(
(storageRef, storage) <- fetchActiveStorage(storageId, projectRef, pc)
resolvedFilename <- IO.fromOption(filename.orElse(path.lastSegment), InvalidFileLink(iri))
description <- FileDescription(resolvedFilename, mediaType)
attributes <- LinkFile(storage).apply(path, description).mapError(LinkRejection(iri, storage.id, _))
attributes <- LinkFile(storage, remoteDiskStorageClient, config)
.apply(path, description)
.mapError(LinkRejection(iri, storage.id, _))
res <- eval(UpdateFile(iri, projectRef, storageRef, storage.tpe, attributes, rev, caller.subject))
} yield res
}.span("updateLink")
Expand Down Expand Up @@ -343,7 +350,7 @@ final class Files(
storage <- storages.fetch(file.value.storage, project)
permission = storage.value.storageValue.readPermission
_ <- aclCheck.authorizeForOr(project, permission)(AuthorizationFailed(project, permission))
s = FetchFile(storage.value)
s = FetchFile(storage.value, remoteDiskStorageClient, config)
.apply(attributes)
.mapError(FetchRejection(file.id, storage.id, _))
.leftWiden[FileRejection]
Expand Down Expand Up @@ -388,7 +395,9 @@ final class Files(
(storageRef, storage) <- fetchActiveStorage(storageId, ref, pc)
resolvedFilename <- IO.fromOption(filename.orElse(path.lastSegment), InvalidFileLink(iri))
description <- FileDescription(resolvedFilename, mediaType)
attributes <- LinkFile(storage).apply(path, description).mapError(LinkRejection(iri, storage.id, _))
attributes <- LinkFile(storage, remoteDiskStorageClient, config)
.apply(path, description)
.mapError(LinkRejection(iri, storage.id, _))
res <- eval(CreateFile(iri, ref, storageRef, storage.tpe, attributes, caller.subject))
} yield res

Expand Down Expand Up @@ -428,7 +437,9 @@ final class Files(
)
}
(description, source) <- formDataExtractor(iri, entity, storage.storageValue.maxFileSize, storageAvailableSpace)
attributes <- SaveFile(storage).apply(description, source).mapError(SaveRejection(iri, storage.id, _))
attributes <- SaveFile(storage, remoteDiskStorageClient, config)
.apply(description, source)
.mapError(SaveRejection(iri, storage.id, _))
} yield attributes

private def expandStorageIri(segment: IdSegment, pc: ProjectContext): IO[WrappedStorageRejection, Iri] =
Expand Down Expand Up @@ -518,7 +529,9 @@ final class Files(
for {
_ <- IO.raiseWhen(f.attributes.digest.computed)(DigestAlreadyComputed(f.id))
newAttr <-
FetchAttributes(storage).apply(attr).mapError(FetchAttributesRejection(f.id, storage.id, _))
FetchAttributes(storage, remoteDiskStorageClient)
.apply(attr)
.mapError(FetchAttributesRejection(f.id, storage.id, _))
_ <- IO.raiseWhen(!newAttr.digest.computed)(DigestNotComputed(f.id))
mediaType = attr.mediaType orElse Some(newAttr.mediaType)
command = UpdateFileAttributes(f.id, f.project, mediaType, newAttr.bytes, newAttr.digest, f.rev, f.updatedBy)
Expand Down Expand Up @@ -697,23 +710,24 @@ object Files {
storagesStatistics: StoragesStatistics,
xas: Transactors,
storageTypeConfig: StorageTypeConfig,
config: FilesConfig
config: FilesConfig,
remoteDiskStorageClient: RemoteDiskStorageClient
)(implicit
clock: Clock[UIO],
client: HttpClient,
uuidF: UUIDF,
scheduler: Scheduler,
as: ActorSystem[Nothing]
): Files = {
implicit val classicAs: ClassicActorSystem = as.classicSystem
implicit val sTypeConfig: StorageTypeConfig = storageTypeConfig
implicit val classicAs: ClassicActorSystem = as.classicSystem
new Files(
FormDataExtractor.apply,
ScopedEventLog(definition, config.eventLog, xas),
aclCheck,
fetchContext,
storages,
storagesStatistics
storagesStatistics,
remoteDiskStorageClient,
storageTypeConfig
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import cats.implicits.toBifunctorOps
import ch.epfl.bluebrain.nexus.delta.kernel.{RetryStrategyConfig, Secret}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{AbsolutePath, DigestAlgorithm, StorageType}
import ch.epfl.bluebrain.nexus.delta.sdk.identities.model.AuthToken
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.PaginationConfig
import ch.epfl.bluebrain.nexus.delta.sdk.permissions.model.Permission
Expand Down Expand Up @@ -73,15 +72,6 @@ object StoragesConfig {

object StorageTypeConfig {

/**
* Construct the auth token to query the remote storage
*/
def remoteStorageAuthToken(config: StorageTypeConfig): Option[AuthToken] = {
config.remoteDisk
.flatMap(_.defaultCredentials)
.map(secret => AuthToken(secret.value))
}

final case class WrongAllowedKeys(defaultVolume: AbsolutePath) extends FailureReason {
val description: String = s"'allowed-volumes' must contain at least '$defaultVolume' (default-volume)"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.Sto
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.Storage.Metadata
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.StorageValue.{DiskStorageValue, RemoteDiskStorageValue, S3StorageValue}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.disk.{DiskStorageFetchFile, DiskStorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.{RemoteDiskStorageFetchFile, RemoteDiskStorageLinkFile, RemoteDiskStorageSaveFile, RemoteStorageFetchAttributes}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote._
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.s3.{S3StorageFetchFile, S3StorageLinkFile, S3StorageSaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.{FetchAttributes, FetchFile, LinkFile, SaveFile}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.{contexts, Storages}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.ContextValue
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import ch.epfl.bluebrain.nexus.delta.sdk.jsonld.JsonLdContent
import ch.epfl.bluebrain.nexus.delta.sdk.model.{BaseUri, IdSegmentRef, Tags}
import ch.epfl.bluebrain.nexus.delta.sdk.{OrderingFields, ResourceShift}
Expand Down Expand Up @@ -105,14 +105,14 @@ object Storage {
override val default: Boolean = value.default
override val storageValue: StorageValue = value

def fetchFile(implicit config: StorageTypeConfig, as: ActorSystem): FetchFile =
new S3StorageFetchFile(value)
def fetchFile(config: StorageTypeConfig)(implicit as: ActorSystem): FetchFile =
new S3StorageFetchFile(value, config)

def saveFile(implicit config: StorageTypeConfig, as: ActorSystem): SaveFile =
new S3StorageSaveFile(this)
def saveFile(config: StorageTypeConfig)(implicit as: ActorSystem): SaveFile =
new S3StorageSaveFile(this, config)

def linkFile(implicit config: StorageTypeConfig, as: ActorSystem): LinkFile =
new S3StorageLinkFile(this)
def linkFile(config: StorageTypeConfig)(implicit as: ActorSystem): LinkFile =
new S3StorageLinkFile(this, config)

}

Expand All @@ -129,21 +129,17 @@ object Storage {
override val default: Boolean = value.default
override val storageValue: StorageValue = value

def fetchFile(implicit config: StorageTypeConfig, client: HttpClient, as: ActorSystem): FetchFile =
new RemoteDiskStorageFetchFile(value)
def fetchFile(client: RemoteDiskStorageClient): FetchFile =
new RemoteDiskStorageFetchFile(value, client)

def saveFile(implicit config: StorageTypeConfig, client: HttpClient, as: ActorSystem): SaveFile =
new RemoteDiskStorageSaveFile(this)
def saveFile(client: RemoteDiskStorageClient): SaveFile =
new RemoteDiskStorageSaveFile(this, client)

def linkFile(implicit config: StorageTypeConfig, client: HttpClient, as: ActorSystem): LinkFile =
new RemoteDiskStorageLinkFile(this)
def linkFile(client: RemoteDiskStorageClient): LinkFile =
new RemoteDiskStorageLinkFile(this, client)

def fetchComputedAttributes(implicit
config: StorageTypeConfig,
client: HttpClient,
as: ActorSystem
): FetchAttributes =
new RemoteStorageFetchAttributes(value)
def fetchComputedAttributes(client: RemoteDiskStorageClient): FetchAttributes =
new RemoteStorageFetchAttributes(value, client)
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations

import akka.actor.ActorSystem
import akka.http.scaladsl.model.Uri
import ch.epfl.bluebrain.nexus.delta.plugins.storage.files.model.{ComputedFileAttributes, FileAttributes}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.StoragesConfig.StorageTypeConfig
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.model.{Storage, StorageType}
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.StorageFileRejection.FetchAttributeRejection
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.RemoteDiskStorageClient
import ch.epfl.bluebrain.nexus.delta.plugins.storage.storages.operations.remote.client.model.RemoteDiskStorageFileAttributes
import ch.epfl.bluebrain.nexus.delta.sdk.http.HttpClient
import monix.bio.IO

trait FetchAttributes {
Expand Down Expand Up @@ -38,12 +36,13 @@ object FetchAttributes {
* Construct a [[FetchAttributes]] from the given ''storage''.
*/
def apply(
storage: Storage
)(implicit config: StorageTypeConfig, as: ActorSystem, client: HttpClient): FetchAttributes =
storage: Storage,
client: RemoteDiskStorageClient
): FetchAttributes =
storage match {
case storage: Storage.DiskStorage => unsupported(storage.tpe)
case storage: Storage.S3Storage => unsupported(storage.tpe)
case storage: Storage.RemoteDiskStorage => storage.fetchComputedAttributes
case storage: Storage.RemoteDiskStorage => storage.fetchComputedAttributes(client)
}

private def unsupported(storageType: StorageType): FetchAttributes =
Expand Down
Loading

0 comments on commit 89b10fb

Please sign in to comment.