From 2f5a1fe6811b49378c62531c18898c49162d992b Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sun, 22 Sep 2024 17:20:35 +0200 Subject: [PATCH] Add support for Container mirroring (#646) This commit adds the support for container mirror functionality. Container mirror allows creating a copy of a container repository to a target container registry keeping the unchanged the repository name and the container checksums, and therefore creating an identical copy to another registry. Container mirror is an extension of the Wave build-on-demand feature. For example ``` wave -i docker.io/my/foo:bar --mirror-to quay.io ``` it will create a copy of the specified repository using the target image name `quay.io/my/foo:bar` Signed-off-by: Paolo Di Tommaso --- build.gradle | 2 +- .../wave/controller/BuildController.groovy | 31 +- .../controller/ContainerController.groovy | 93 +++++- .../wave/controller/MirrorController.groovy | 53 ++++ .../wave/core/RegistryProxyService.groovy | 8 +- .../wave/service/ContainerRequestData.groovy | 5 + .../service/cache/AbstractCacheStore.groovy | 34 ++- .../wave/service/cache/CacheStore.groovy | 21 +- .../wave/service/cache/StateRecord.groovy | 30 ++ .../cache/impl/LocalCacheProvider.groovy | 15 +- .../cache/impl/RedisCacheProvider.groovy | 33 ++- .../ContainerInspectServiceImpl.groovy | 3 +- .../seqera/wave/service/job/JobFactory.groovy | 16 + .../seqera/wave/service/job/JobHandler.groovy | 36 +++ .../seqera/wave/service/job/JobService.groovy | 3 + .../wave/service/job/JobServiceImpl.groovy | 16 + .../io/seqera/wave/service/job/JobSpec.groovy | 26 +- .../seqera/wave/service/k8s/K8sService.groovy | 4 + .../wave/service/k8s/K8sServiceImpl.groovy | 62 ++++ .../mirror/ContainerMirrorService.groovy | 63 ++++ .../mirror/ContainerMirrorServiceImpl.groovy | 139 +++++++++ .../wave/service/mirror/MirrorConfig.groovy | 59 ++++ .../wave/service/mirror/MirrorRequest.groovy | 99 +++++++ .../wave/service/mirror/MirrorState.groovy | 111 +++++++ .../service/mirror/MirrorStateStore.groovy | 72 +++++ .../strategy/DockerMirrorStrategy.groovy | 106 +++++++ .../mirror/strategy/KubeMirrorStrategy.groovy | 83 ++++++ .../mirror/strategy/MirrorStrategy.groovy | 57 ++++ .../persistence/PersistenceService.groovy | 25 ++ .../impl/LocalPersistenceService.groovy | 14 + .../persistence/impl/SurrealClient.groovy | 3 + .../impl/SurrealPersistenceService.groovy | 56 ++++ .../validation/ValidationService.groovy | 2 + .../validation/ValidationServiceImpl.groovy | 16 + .../seqera/wave/util/ContainerHelper.groovy | 10 +- .../controller/ContainerControllerTest.groovy | 138 ++++++++- .../service/ContainerRequestDataTest.groovy | 54 ++++ .../cache/impl/AbstractCacheStoreTest.groovy | 273 ++++++++++++++++++ .../cache/impl/LocalCacheProviderTest.groovy | 112 +++++-- .../cache/impl/RedisCacheProviderTest.groovy | 117 ++++++-- .../wave/service/job/JobFactoryTest.groovy | 28 ++ .../wave/service/job/JobSpecTest.groovy | 15 + .../service/k8s/K8sServiceImplTest.groovy | 60 ++++ .../mirror/ContainerMirrorServiceTest.groovy | 205 +++++++++++++ .../service/mirror/MirrorRequestTest.groovy | 56 ++++ .../service/mirror/MirrorResultTest.groovy | 121 ++++++++ .../strategy/DockerMirrorStrategyTest.groovy | 53 ++++ .../mirror/strategy/MirrorStrategyTest.groovy | 62 ++++ .../impl/SurrealPersistenceServiceTest.groovy | 61 +++- .../validation/ValidationServiceTest.groovy | 15 + 50 files changed, 2632 insertions(+), 144 deletions(-) create mode 100644 src/main/groovy/io/seqera/wave/controller/MirrorController.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/cache/StateRecord.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorService.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/MirrorConfig.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/MirrorRequest.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/MirrorState.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/MirrorStateStore.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategy.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/strategy/KubeMirrorStrategy.groovy create mode 100644 src/main/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategy.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/cache/impl/AbstractCacheStoreTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/mirror/MirrorRequestTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/mirror/MirrorResultTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategyTest.groovy create mode 100644 src/test/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategyTest.groovy diff --git a/build.gradle b/build.gradle index d1e3293bf..951f39628 100644 --- a/build.gradle +++ b/build.gradle @@ -34,7 +34,7 @@ dependencies { compileOnly("io.micronaut:micronaut-http-validation") implementation("jakarta.persistence:jakarta.persistence-api:3.0.0") api 'io.seqera:lib-mail:1.0.0' - api 'io.seqera:wave-api:0.10.0' + api 'io.seqera:wave-api:0.12.0' api 'io.seqera:wave-utils:0.13.1' implementation("io.micronaut:micronaut-http-client") diff --git a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy index 28891d3d9..e0e0bc786 100644 --- a/src/main/groovy/io/seqera/wave/controller/BuildController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/BuildController.groovy @@ -30,8 +30,11 @@ import io.micronaut.http.server.types.files.StreamedFile import io.micronaut.scheduling.TaskExecutors import io.micronaut.scheduling.annotation.ExecuteOn import io.seqera.wave.api.BuildStatusResponse +import io.seqera.wave.exception.BadRequestException import io.seqera.wave.service.builder.ContainerBuildService import io.seqera.wave.service.logs.BuildLogService +import io.seqera.wave.service.mirror.ContainerMirrorService +import io.seqera.wave.service.mirror.MirrorRequest import io.seqera.wave.service.persistence.WaveBuildRecord import jakarta.inject.Inject /** @@ -48,12 +51,15 @@ class BuildController { @Inject private ContainerBuildService buildService + @Inject + private ContainerMirrorService mirrorService + @Inject @Nullable BuildLogService logService @Get("/v1alpha1/builds/{buildId}") - HttpResponse getBuildRecord(String buildId){ + HttpResponse getBuildRecord(String buildId) { final record = buildService.getBuildRecord(buildId) return record ? HttpResponse.ok(record) @@ -72,11 +78,26 @@ class BuildController { } @Get("/v1alpha1/builds/{buildId}/status") - HttpResponse getBuildStatus(String buildId){ - final build = buildService.getBuildRecord(buildId) - build != null - ? HttpResponse.ok(build.toStatusResponse()) + HttpResponse getBuildStatus(String buildId) { + final resp = buildResponse0(buildId) + resp != null + ? HttpResponse.ok(resp) : HttpResponse.notFound() } + protected BuildStatusResponse buildResponse0(String buildId) { + if( !buildId ) + throw new BadRequestException("Missing 'buildId' parameter") + // build IDs starting with the `mr-` prefix are interpreted as mirror requests + if( buildId.startsWith(MirrorRequest.ID_PREFIX) ) { + return mirrorService + .getMirrorState(buildId) + ?.toStatusResponse() + } + else { + return buildService + .getBuildRecord(buildId) + ?.toStatusResponse() + } + } } diff --git a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy index 6fcd84dd8..5478d44dd 100644 --- a/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy +++ b/src/main/groovy/io/seqera/wave/controller/ContainerController.groovy @@ -59,6 +59,8 @@ import io.seqera.wave.service.builder.ContainerBuildService import io.seqera.wave.service.builder.FreezeService import io.seqera.wave.service.inclusion.ContainerInclusionService import io.seqera.wave.service.inspect.ContainerInspectService +import io.seqera.wave.service.mirror.ContainerMirrorService +import io.seqera.wave.service.mirror.MirrorRequest import io.seqera.wave.service.pairing.PairingService import io.seqera.wave.service.pairing.socket.PairingChannel import io.seqera.wave.service.persistence.PersistenceService @@ -125,7 +127,7 @@ class ContainerController { ContainerBuildService buildService @Inject - ContainerInspectService dockerAuthService + ContainerInspectService inspectService @Inject RegistryProxyService registryProxyService @@ -152,6 +154,9 @@ class ContainerController { @Nullable RateLimiterService rateLimiterService + @Inject + private ContainerMirrorService mirrorService + @PostConstruct private void init() { log.info "Wave server url: $serverUrl; allowAnonymous: $allowAnonymous; tower-endpoint-url: $towerEndpointUrl; default-build-repo: $buildConfig.defaultBuildRepository; default-cache-repo: $buildConfig.defaultCacheRepository; default-public-repo: $buildConfig.defaultPublicRepository" @@ -176,6 +181,7 @@ class ContainerController { // validate request validateContainerRequest(req) + validateMirrorRequest(req, v2) // this is needed for backward compatibility with old clients if( !req.towerEndpoint ) { @@ -325,7 +331,7 @@ class ContainerController { ? buildConfig.defaultPublicRepository : buildConfig.defaultBuildRepository), req.nameStrategy) final cacheRepository = req.cacheRepository ?: buildConfig.defaultCacheRepository - final configJson = dockerAuthService.credentialsConfigJson(containerSpec, buildRepository, cacheRepository, identity) + final configJson = inspectService.credentialsConfigJson(containerSpec, buildRepository, cacheRepository, identity) final containerConfig = req.freeze ? req.containerConfig : null final offset = DataTimeUtils.offsetId(req.timestamp) final scanId = scanEnabled && format==DOCKER ? LongRndKey.rndHex() : null @@ -414,6 +420,15 @@ class ContainerController { buildId = track.id buildNew = !track.cached } + else if( req.mirrorRegistry ) { + final mirror = makeMirrorRequest(req, identity) + final track = checkMirror(mirror, identity, req.dryRun) + targetImage = track.targetImage + targetContent = null + condaContent = null + buildId = track.id + buildNew = !track.cached + } else if( req.containerImage ) { // normalize container image final coords = ContainerCoordinates.parse(req.containerImage) @@ -435,7 +450,51 @@ class ContainerController { ContainerPlatform.of(req.containerPlatform), buildId, buildNew, - req.freeze ) + req.freeze, + req.mirrorRegistry!=null + ) + } + + protected MirrorRequest makeMirrorRequest(SubmitContainerTokenRequest request, PlatformId identity) { + final coords = ContainerCoordinates.parse(request.containerImage) + if( coords.registry == request.mirrorRegistry ) + throw new BadRequestException("Source and target mirror registry as the same - offending value '${request.mirrorRegistry}'") + final targetImage = request.mirrorRegistry + '/' + coords.imageAndTag + final configJson = inspectService.credentialsConfigJson(null, request.containerImage, targetImage, identity) + final platform = request.containerPlatform + ? ContainerPlatform.of(request.containerPlatform) + : ContainerPlatform.DEFAULT + final digest = registryProxyService.getImageDigest(request.containerImage, identity) + if( !digest ) + throw new BadRequestException("Container image '$request.containerImage' does not exist") + return MirrorRequest.create( + request.containerImage, + targetImage, + digest, + platform, + Path.of(buildConfig.buildWorkspace).toAbsolutePath(), + configJson ) + } + + protected BuildTrack checkMirror(MirrorRequest request, PlatformId identity, boolean dryRun) { + final targetDigest = registryProxyService.getImageDigest(request.targetImage, identity) + log.debug "== Mirror target digest: $targetDigest" + final cached = request.digest==targetDigest + // check for dry-run execution + if( dryRun ) { + log.debug "== Dry-run request request: $request" + final dryId = request.id + BuildRequest.SEP + '0' + return new BuildTrack(dryId, request.targetImage, cached) + } + // check for existing image + if( request.digest==targetDigest ) { + log.debug "== Found cached request for request: $request" + final cache = persistenceService.loadMirrorState(request.targetImage, targetDigest) + return new BuildTrack(cache?.mirrorId, request.targetImage, true) + } + else { + return mirrorService.mirrorImage(request) + } } protected String targetImage(String token, ContainerCoordinates container) { @@ -461,7 +520,7 @@ class ContainerController { return HttpResponse.ok() } - void validateContainerRequest(SubmitContainerTokenRequest req) throws BadRequestException{ + void validateContainerRequest(SubmitContainerTokenRequest req) throws BadRequestException { String msg // check valid image name msg = validationService.checkContainerName(req.containerImage) @@ -474,6 +533,32 @@ class ContainerController { if( msg ) throw new BadRequestException(msg) } + void validateMirrorRequest(SubmitContainerTokenRequest req, boolean v2) throws BadRequestException { + if( !req.mirrorRegistry ) + return + // container mirror validation + if( !v2 ) + throw new BadRequestException("Container mirroring requires the use of v2 API") + if( !req.containerImage ) + throw new BadRequestException("Attribute `containerImage` is required when specifying `mirrorRegistry`") + if( !req.towerAccessToken ) + throw new BadRequestException("Container mirroring requires an authenticated request - specify the tower token attribute") + if( req.freeze ) + throw new BadRequestException("Attribute `mirrorRegistry` and `freeze` conflict each other") + if( req.containerFile ) + throw new BadRequestException("Attribute `mirrorRegistry` and `containerFile` conflict each other") + if( req.containerIncludes ) + throw new BadRequestException("Attribute `mirrorRegistry` and `containerIncludes` conflict each other") + if( req.containerConfig ) + throw new BadRequestException("Attribute `mirrorRegistry` and `containerConfig` conflict each other") + final coords = ContainerCoordinates.parse(req.containerImage) + if( coords.registry == req.mirrorRegistry ) + throw new BadRequestException("Source and target mirror registry as the same - offending value '${req.mirrorRegistry}'") + def msg = validationService.checkMirrorRegistry(req.mirrorRegistry) + if( msg ) + throw new BadRequestException(msg) + } + @Error(exception = AuthorizationException.class) HttpResponse handleAuthorizationException() { return HttpResponse.unauthorized() diff --git a/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy new file mode 100644 index 000000000..bb1626505 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/controller/MirrorController.groovy @@ -0,0 +1,53 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.controller + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.micronaut.http.HttpResponse +import io.micronaut.http.annotation.Controller +import io.micronaut.http.annotation.Get +import io.micronaut.scheduling.TaskExecutors +import io.micronaut.scheduling.annotation.ExecuteOn +import io.seqera.wave.service.mirror.ContainerMirrorService +import io.seqera.wave.service.mirror.MirrorState +import jakarta.inject.Inject +/** + * Implements a controller for container mirror apis + * + * @author Paolo Di Tommaso + */ +@Slf4j +@CompileStatic +@Controller("/") +@ExecuteOn(TaskExecutors.IO) +class MirrorController { + + @Inject + private ContainerMirrorService mirrorService + + @Get("/v1alpha1/mirrors/{mirrorId}") + HttpResponse getMirrorRecord(String mirrorId) { + final result = mirrorService.getMirrorState(mirrorId) + return result + ? HttpResponse.ok(result) + : HttpResponse.notFound() + } + +} diff --git a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy index ba9bb553f..e8a8fae6a 100644 --- a/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy +++ b/src/main/groovy/io/seqera/wave/core/RegistryProxyService.groovy @@ -188,11 +188,15 @@ class RegistryProxyService { } String getImageDigest(BuildRequest request, boolean retryOnNotFound=false) { + return getImageDigest(request.targetImage, request.identity, retryOnNotFound) + } + + String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) { try { - return getImageDigest0(request.targetImage, request.identity, retryOnNotFound) + return getImageDigest0(containerImage, identity, retryOnNotFound) } catch(Exception e) { - log.warn "Unable to retrieve digest for image '${request.targetImage}' -- cause: ${e.message}" + log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}" return null } } diff --git a/src/main/groovy/io/seqera/wave/service/ContainerRequestData.groovy b/src/main/groovy/io/seqera/wave/service/ContainerRequestData.groovy index 959b0f798..2a6020947 100644 --- a/src/main/groovy/io/seqera/wave/service/ContainerRequestData.groovy +++ b/src/main/groovy/io/seqera/wave/service/ContainerRequestData.groovy @@ -43,6 +43,11 @@ class ContainerRequestData { final String buildId final Boolean buildNew final Boolean freeze + final Boolean mirror + + boolean durable() { + return freeze || mirror + } PlatformId getIdentity() { return identity diff --git a/src/main/groovy/io/seqera/wave/service/cache/AbstractCacheStore.groovy b/src/main/groovy/io/seqera/wave/service/cache/AbstractCacheStore.groovy index e1b142766..91e707701 100644 --- a/src/main/groovy/io/seqera/wave/service/cache/AbstractCacheStore.groovy +++ b/src/main/groovy/io/seqera/wave/service/cache/AbstractCacheStore.groovy @@ -20,6 +20,7 @@ package io.seqera.wave.service.cache import java.time.Duration +import groovy.transform.CompileStatic import io.seqera.wave.encoder.EncodingStrategy import io.seqera.wave.service.cache.impl.CacheProvider @@ -28,6 +29,7 @@ import io.seqera.wave.service.cache.impl.CacheProvider * * @author Paolo Di Tommaso */ +@CompileStatic abstract class AbstractCacheStore implements CacheStore { private EncodingStrategy encodingStrategy @@ -45,6 +47,10 @@ abstract class AbstractCacheStore implements CacheStore { protected String key0(String k) { return getPrefix() + k } + protected String recordId0(String recordId) { + return getPrefix() + 'state-id/' + recordId + } + protected V deserialize(String encoded) { return encodingStrategy.decode(encoded) } @@ -63,32 +69,34 @@ abstract class AbstractCacheStore implements CacheStore { return result ? deserialize(result) : null } + V getByRecordId(String recordId) { + final key = delegate.get(recordId0(recordId)) + return get(key) + } + void put(String key, V value) { - delegate.put(key0(key), serialize(value), getDuration()) + put(key, value, getDuration()) } @Override void put(String key, V value, Duration ttl) { delegate.put(key0(key), serialize(value), ttl) + if( value instanceof StateRecord ) { + delegate.put(recordId0(value.getRecordId()), key, ttl) + } } @Override boolean putIfAbsent(String key, V value, Duration ttl) { - delegate.putIfAbsent(key0(key), serialize(value), ttl) + final result = delegate.putIfAbsent(key0(key), serialize(value), ttl) + if( result && value instanceof StateRecord ) { + delegate.put(recordId0(value.getRecordId()), key, ttl) + } + return result } boolean putIfAbsent(String key, V value) { - delegate.putIfAbsent(key0(key), serialize(value), getDuration()) - } - - @Override - V putIfAbsentAndGetCurrent(String key, V value, Duration ttl) { - final result = delegate.putIfAbsentAndGetCurrent(key0(key), serialize(value), ttl) - return result? deserialize(result) : null - } - - V putIfAbsentAndGetCurrent(String key, V value) { - return putIfAbsentAndGetCurrent(key, value, getDuration()) + return putIfAbsent(key, value, getDuration()) } @Override diff --git a/src/main/groovy/io/seqera/wave/service/cache/CacheStore.groovy b/src/main/groovy/io/seqera/wave/service/cache/CacheStore.groovy index 9bd6f33ae..822260639 100644 --- a/src/main/groovy/io/seqera/wave/service/cache/CacheStore.groovy +++ b/src/main/groovy/io/seqera/wave/service/cache/CacheStore.groovy @@ -37,6 +37,14 @@ interface CacheStore { */ V get(K key) + /** + * Store a the specified key-value pair in the underlying cache + * + * @param key The key to retrieve the associated value + * @param value The value to be store in the cache + */ + void put(K key, V value) + /** * Store a the specified key-value pair in the underlying cache * @@ -50,23 +58,18 @@ interface CacheStore { * Store a value in the cache only if does not exist yet * @param key The unique associated with this object * @param value The object to store - * @param ttl The max time-to-live of the stored entry * @return {@code true} if the value was stored, {@code false} otherwise */ - boolean putIfAbsent(K key, V value, Duration ttl) + boolean putIfAbsent(K key, V value) /** * Store a value in the cache only if does not exist yet - * and returns the value that is in the cache after the - * operation. - * - * @param key The unique key associated with this object + * @param key The unique associated with this object * @param value The object to store * @param ttl The max time-to-live of the stored entry - * @return The current value in the cache, that is the existing one - * if already present or the new value otherwise + * @return {@code true} if the value was stored, {@code false} otherwise */ - V putIfAbsentAndGetCurrent(K key, V value, Duration ttl) + boolean putIfAbsent(K key, V value, Duration ttl) /** * Remove the entry with the specified key from the cache diff --git a/src/main/groovy/io/seqera/wave/service/cache/StateRecord.groovy b/src/main/groovy/io/seqera/wave/service/cache/StateRecord.groovy new file mode 100644 index 000000000..3e3896957 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/cache/StateRecord.groovy @@ -0,0 +1,30 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.cache + +/** + * Marker interface for record object that model long running operation state + * + * @author Paolo Di Tommaso + */ +interface StateRecord { + + String getRecordId() + +} diff --git a/src/main/groovy/io/seqera/wave/service/cache/impl/LocalCacheProvider.groovy b/src/main/groovy/io/seqera/wave/service/cache/impl/LocalCacheProvider.groovy index 0659f7335..696461895 100644 --- a/src/main/groovy/io/seqera/wave/service/cache/impl/LocalCacheProvider.groovy +++ b/src/main/groovy/io/seqera/wave/service/cache/impl/LocalCacheProvider.groovy @@ -67,19 +67,24 @@ class LocalCacheProvider implements CacheProvider { return entry.value } + @Override + void put(String key, String value) { + store.put(key, new Entry<>(value,null)) + } + + @Override void put(String key, String value, Duration ttl) { store.put(key, new Entry<>(value,ttl)) } @Override - boolean putIfAbsent(String key, String value, Duration ttl) { - return putIfAbsent0(key, value, ttl) == null + boolean putIfAbsent(String key, String value) { + return putIfAbsent0(key, value, null) == null } @Override - String putIfAbsentAndGetCurrent(String key, String value, Duration ttl) { - final ret = putIfAbsent0(key, value, ttl) - return ret!=null ? ret : value + boolean putIfAbsent(String key, String value, Duration ttl) { + return putIfAbsent0(key, value, ttl) == null } private String putIfAbsent0(String key, String value, Duration ttl) { diff --git a/src/main/groovy/io/seqera/wave/service/cache/impl/RedisCacheProvider.groovy b/src/main/groovy/io/seqera/wave/service/cache/impl/RedisCacheProvider.groovy index a551f5083..95b006152 100644 --- a/src/main/groovy/io/seqera/wave/service/cache/impl/RedisCacheProvider.groovy +++ b/src/main/groovy/io/seqera/wave/service/cache/impl/RedisCacheProvider.groovy @@ -47,31 +47,34 @@ class RedisCacheProvider implements CacheProvider { } } + @Override + void put(String key, String value) { + put(key, value, null) + } + + @Override void put(String key, String value, Duration ttl) { try( Jedis conn=pool.getResource() ) { - final params = new SetParams().ex(ttl.toSeconds()) + final params = new SetParams() + if( ttl ) + params.px(ttl.toMillis()) conn.set(key, value, params) } } @Override - boolean putIfAbsent(String key, String value, Duration duration) { - try( Jedis conn=pool.getResource() ) { - final params = new SetParams().nx().ex(duration.toSeconds()) - final result = conn.set(key, value, params) - return result == 'OK' - } + boolean putIfAbsent(String key, String value) { + putIfAbsent(key, value, null) } @Override - String putIfAbsentAndGetCurrent(String key, String value, Duration ttl) { - try (Jedis conn = pool.getResource()){ - final params = new SetParams().nx().ex(ttl.toSeconds()) - final tx = conn.multi() - tx.set(key,value,params) - tx.get(key) - final result = tx.exec() - return result[1].toString() + boolean putIfAbsent(String key, String value, Duration ttl) { + try( Jedis conn=pool.getResource() ) { + final params = new SetParams().nx() + if( ttl ) + params.px(ttl.toMillis()) + final result = conn.set(key, value, params) + return result == 'OK' } } diff --git a/src/main/groovy/io/seqera/wave/service/inspect/ContainerInspectServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/inspect/ContainerInspectServiceImpl.groovy index 03b4e5093..90f7355ca 100644 --- a/src/main/groovy/io/seqera/wave/service/inspect/ContainerInspectServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/inspect/ContainerInspectServiceImpl.groovy @@ -87,7 +87,8 @@ class ContainerInspectServiceImpl implements ContainerInspectService { @Override String credentialsConfigJson(String containerFile, String buildRepo, String cacheRepo, PlatformId identity) { final repos = new HashSet(10) - repos.addAll(findRepositories(containerFile)) + if( containerFile ) + repos.addAll(findRepositories(containerFile)) if( buildRepo ) repos.add(buildRepo) if( cacheRepo ) diff --git a/src/main/groovy/io/seqera/wave/service/job/JobFactory.groovy b/src/main/groovy/io/seqera/wave/service/job/JobFactory.groovy index c01158145..132861ebe 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobFactory.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobFactory.groovy @@ -26,6 +26,8 @@ import groovy.transform.CompileStatic import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.ScanConfig import io.seqera.wave.service.builder.BuildRequest +import io.seqera.wave.service.mirror.MirrorConfig +import io.seqera.wave.service.mirror.MirrorRequest import io.seqera.wave.service.scan.ScanRequest import jakarta.inject.Inject import jakarta.inject.Singleton @@ -46,6 +48,10 @@ class JobFactory { @Nullable private ScanConfig scanConfig + @Inject + @Nullable + private MirrorConfig mirrorConfig + JobSpec transfer(String stateId) { JobSpec.transfer( stateId, @@ -75,6 +81,16 @@ class JobFactory { ) } + JobSpec mirror(MirrorRequest request) { + JobSpec.mirror( + request.targetImage, + "mirror-${request.id.substring(MirrorRequest.ID_PREFIX.length())}", + request.creationTime, + mirrorConfig.maxDuration, + request.workDir + ) + } + static private String generate(String type, String id, Instant creationTime) { final prefix = type.toLowerCase() return prefix + '-' + Hashing diff --git a/src/main/groovy/io/seqera/wave/service/job/JobHandler.groovy b/src/main/groovy/io/seqera/wave/service/job/JobHandler.groovy index 1630c0f26..493df35ec 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobHandler.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobHandler.groovy @@ -24,12 +24,48 @@ package io.seqera.wave.service.job */ interface JobHandler { + /** + * Retrieve the {@link JobRecord} instance associated with the specified {@link JobSpec} object + * + * @param job + * The {@link JobSpec} object for which the corresponding record is needed + * @return + * The associated job record or {@link null} otherwise + */ R getJobRecord(JobSpec job) + /** + * Event invoked when a job complete either successfully or with a failure + * + * @param job + * The {@link JobSpec} object + * @param jobRecord + * The associate state record + * @param state + * The job execution state + */ void onJobCompletion(JobSpec job, R jobRecord, JobState state) + /** + * Event invoked when a job execution reports an exception + * + * @param job + * The {@link JobSpec} object + * @param jobRecord + * The associate state record + * @param error + * The job job exception + */ void onJobException(JobSpec job, R jobRecord, Throwable error) + /** + * Event invoked when a job exceed the expected max execution duration + * + * @param job + * The {@link JobSpec} object + * @param jobRecord + * The associate state record + */ void onJobTimeout(JobSpec job, R jobRecord) } diff --git a/src/main/groovy/io/seqera/wave/service/job/JobService.groovy b/src/main/groovy/io/seqera/wave/service/job/JobService.groovy index 91bd19b77..20439c17f 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobService.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobService.groovy @@ -20,6 +20,7 @@ package io.seqera.wave.service.job import io.seqera.wave.service.blob.BlobCacheInfo import io.seqera.wave.service.builder.BuildRequest +import io.seqera.wave.service.mirror.MirrorRequest import io.seqera.wave.service.scan.ScanRequest /** @@ -35,6 +36,8 @@ interface JobService { JobSpec launchScan(ScanRequest request) + JobSpec launchMirror(MirrorRequest request) + JobState status(JobSpec jobSpec) void cleanup(JobSpec jobSpec, Integer exitStatus) diff --git a/src/main/groovy/io/seqera/wave/service/job/JobServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/job/JobServiceImpl.groovy index 8e093af1d..8d4ee4a4f 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobServiceImpl.groovy @@ -27,6 +27,8 @@ import io.seqera.wave.service.blob.TransferStrategy import io.seqera.wave.service.builder.BuildRequest import io.seqera.wave.service.builder.BuildStrategy import io.seqera.wave.service.cleanup.CleanupService +import io.seqera.wave.service.mirror.MirrorRequest +import io.seqera.wave.service.mirror.strategy.MirrorStrategy import io.seqera.wave.service.scan.ScanRequest import io.seqera.wave.service.scan.ScanStrategy import jakarta.inject.Inject @@ -64,6 +66,9 @@ class JobServiceImpl implements JobService { @Inject private JobFactory jobFactory + @Inject + private MirrorStrategy mirrorStrategy + @Override JobSpec launchTransfer(BlobCacheInfo blob, List command) { if( !transferStrategy ) @@ -102,6 +107,17 @@ class JobServiceImpl implements JobService { return job } + @Override + JobSpec launchMirror(MirrorRequest request) { + // create the unique job id for the build + final job = jobFactory.mirror(request) + // launch the scan job + mirrorStrategy.mirrorJob(job.operationName, request) + // signal the build has been submitted + jobQueue.offer(job) + return job + } + @Override JobState status(JobSpec job) { try { diff --git a/src/main/groovy/io/seqera/wave/service/job/JobSpec.groovy b/src/main/groovy/io/seqera/wave/service/job/JobSpec.groovy index 90bae336c..14e75d1f3 100644 --- a/src/main/groovy/io/seqera/wave/service/job/JobSpec.groovy +++ b/src/main/groovy/io/seqera/wave/service/job/JobSpec.groovy @@ -37,7 +37,7 @@ import io.seqera.wave.util.LongRndKey @CompileStatic class JobSpec { - enum Type { Transfer, Build, Scan } + enum Type { Transfer, Build, Scan, Mirror } /** * The job unique identifier @@ -86,11 +86,11 @@ class JobSpec { this.workDir = dir } - static JobSpec transfer(String stateId, String operationName, Instant creationTime, Duration maxDuration) { + static JobSpec transfer(String recordId, String operationName, Instant creationTime, Duration maxDuration) { new JobSpec( LongRndKey.rndHex(), Type.Transfer, - stateId, + recordId, operationName, creationTime, maxDuration, @@ -98,11 +98,11 @@ class JobSpec { ) } - static JobSpec scan(String stateId, String operationName, Instant creationTime, Duration maxDuration, Path dir) { + static JobSpec scan(String recordId, String operationName, Instant creationTime, Duration maxDuration, Path dir) { new JobSpec( LongRndKey.rndHex(), Type.Scan, - stateId, + recordId, operationName, creationTime, maxDuration, @@ -110,15 +110,27 @@ class JobSpec { ) } - static JobSpec build(String stateId, String operationName, Instant creationTime, Duration maxDuration, Path dir) { + static JobSpec build(String recordId, String operationName, Instant creationTime, Duration maxDuration, Path dir) { new JobSpec( LongRndKey.rndHex(), Type.Build, - stateId, + recordId, operationName, creationTime, maxDuration, dir ) } + + static JobSpec mirror(String recordId, String operationName, Instant creationTime, Duration maxDuration, Path workDir) { + new JobSpec( + LongRndKey.rndHex(), + Type.Mirror, + recordId, + operationName, + creationTime, + maxDuration, + workDir + ) + } } diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy index bb042d86a..6fa0a18b2 100644 --- a/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy +++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sService.groovy @@ -26,6 +26,8 @@ import io.kubernetes.client.openapi.models.V1Pod import io.kubernetes.client.openapi.models.V1PodList import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.ScanConfig +import io.seqera.wave.service.mirror.MirrorConfig + /** * Defines Kubernetes operations * @@ -68,6 +70,8 @@ interface K8sService { V1Job launchScanJob(String name, String containerImage, List args, Path workDir, Path creds, ScanConfig scanConfig, Map nodeSelector) + V1Job launchMirrorJob(String name, String containerImage, List args, Path workDir, Path creds, MirrorConfig config) + @Deprecated V1PodList waitJob(V1Job job, Long timeout) diff --git a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy index 448870ac3..1a91e9a39 100644 --- a/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/k8s/K8sServiceImpl.groovy @@ -48,6 +48,7 @@ import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.BuildConfig import io.seqera.wave.configuration.ScanConfig import io.seqera.wave.core.ContainerPlatform +import io.seqera.wave.service.mirror.MirrorConfig import io.seqera.wave.service.scan.Trivy import jakarta.inject.Inject import jakarta.inject.Singleton @@ -790,6 +791,67 @@ class K8sServiceImpl implements K8sService { return builder.build() } + @Override + V1Job launchMirrorJob(String name, String containerImage, List args, Path workDir, Path creds, MirrorConfig config) { + final spec = mirrorJobSpec(name, containerImage, args, workDir, creds, config) + return k8sClient + .batchV1Api() + .createNamespacedJob(namespace, spec, null, null, null,null) + } + + V1Job mirrorJobSpec(String name, String containerImage, List args, Path workDir, Path credsFile, MirrorConfig config) { + + // required volumes + final mounts = new ArrayList(5) + mounts.add(mountBuildStorage(workDir, storageMountPath, true)) + + final volumes = new ArrayList(5) + volumes.add(volumeBuildStorage(storageMountPath, storageClaimName)) + + if( credsFile ){ + mounts.add(0, mountHostPath(credsFile, storageMountPath, '/tmp/config.json')) + } + + V1JobBuilder builder = new V1JobBuilder() + + //metadata section + builder.withNewMetadata() + .withNamespace(namespace) + .withName(name) + .addToLabels(labels) + .endMetadata() + + //spec section + def spec = builder + .withNewSpec() + .withBackoffLimit(config.retryAttempts) + .withNewTemplate() + .editOrNewSpec() + .withServiceAccount(serviceAccount) + .withRestartPolicy("Never") + .addAllToVolumes(volumes) + + final requests = new V1ResourceRequirements() + if( config.requestsCpu ) + requests.putRequestsItem('cpu', new Quantity(config.requestsCpu)) + if( config.requestsMemory ) + requests.putRequestsItem('memory', new Quantity(config.requestsMemory)) + + // container section + final container = new V1ContainerBuilder() + .withName(name) + .withImage(containerImage) + .withArgs(args) + .withVolumeMounts(mounts) + .withResources(requests) + .withEnv(new V1EnvVar().name("REGISTRY_AUTH_FILE").value("/tmp/config.json")) + + // spec section + spec.withContainers(container.build()).endSpec().endTemplate().endSpec() + + return builder.build() + } + protected List toEnvList(Map env) { final result = new ArrayList(env.size()) for( Map.Entry it : env ) diff --git a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorService.groovy b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorService.groovy new file mode 100644 index 000000000..6ef528507 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorService.groovy @@ -0,0 +1,63 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import java.util.concurrent.CompletableFuture + +import io.seqera.wave.service.builder.BuildTrack + +/** + * Define the contract for container images mirroring service + * + * @author Paolo Di Tommaso + */ +interface ContainerMirrorService { + + /** + * Submit a container mirror request + * + * @param request + * The {@link MirrorRequest} modelling the container mirror request + * @return + * A {@link BuildTrack} object representing the state of the request + */ + BuildTrack mirrorImage(MirrorRequest request) + + /** + * Await the completion for the specified target container image + * + * @param targetImage + * The container image of the mirror operation to be awaited + * @return + * A future holding the {@link MirrorState} when the mirror operation complete + */ + CompletableFuture awaitCompletion(String targetImage) + + /** + * Retrieve the current state of the mirror operation + * + * @param id + * The id of the mirror state record + * @return + * The {@link MirrorState} object modelling the current state of the mirror operation, + * or {@link null} otherwise + */ + MirrorState getMirrorState(String id) + +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy new file mode 100644 index 000000000..5e7adc8a1 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceImpl.groovy @@ -0,0 +1,139 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ExecutorService + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.micronaut.scheduling.TaskExecutors +import io.seqera.wave.service.builder.BuildTrack +import io.seqera.wave.service.job.JobHandler +import io.seqera.wave.service.job.JobService +import io.seqera.wave.service.job.JobSpec +import io.seqera.wave.service.job.JobState +import io.seqera.wave.service.persistence.PersistenceService +import jakarta.inject.Inject +import jakarta.inject.Named +import jakarta.inject.Singleton + +/** + * Implement a service to mirror a container image to a repository specified by the user + * + * @author Paolo Di Tommaso + */ +@Slf4j +@Singleton +@Named('Mirror') +@CompileStatic +class ContainerMirrorServiceImpl implements ContainerMirrorService, JobHandler { + + @Inject + private MirrorStateStore store + + @Inject + private JobService jobService + + @Inject + @Named(TaskExecutors.IO) + private ExecutorService ioExecutor + + @Inject + private PersistenceService persistence + + /** + * {@inheritDoc} + */ + @Override + BuildTrack mirrorImage(MirrorRequest request) { + if( store.putIfAbsent(request.targetImage, MirrorState.from(request))) { + log.info "== Container mirror submitted - request=$request" + jobService.launchMirror(request) + return new BuildTrack(request.id, request.targetImage, false) + } + final ret = store.get(request.targetImage) + if( ret ) { + log.info "== Container mirror hit cache - request=$request" + // note: mark as cached only if the build result is 'done' + // if the build is still in progress it should be marked as not cached + // so that the client will wait for the container completion + return new BuildTrack(ret.mirrorId, ret.targetImage, ret.done()) + } + // invalid state + throw new IllegalStateException("Unable to determine mirror status for '$request.targetImage'") + } + + /** + * {@inheritDoc} + */ + @Override + CompletableFuture awaitCompletion(String targetImage) { + return CompletableFuture.supplyAsync(()-> store.awaitCompletion(targetImage), ioExecutor) + } + + /** + * {@inheritDoc} + */ + @Override + MirrorState getMirrorState(String mirrorId) { + store.getByRecordId(mirrorId) ?: persistence.loadMirrorState(mirrorId) + } + + /** + * {@inheritDoc} + */ + @Override + MirrorState getJobRecord(JobSpec jobSpec) { + store.get(jobSpec.recordId) + } + + /** + * {@inheritDoc} + */ + @Override + void onJobCompletion(JobSpec jobSpec, MirrorState mirror, JobState jobState) { + final result = mirror.complete(jobState.exitCode, jobState.stdout) + store.put(mirror.targetImage, result) + persistence.saveMirrorState(result) + log.debug "Mirror container completed - job=${jobSpec.operationName}; result=${result}; state=${jobState}" + } + + /** + * {@inheritDoc} + */ + @Override + void onJobTimeout(JobSpec jobSpec, MirrorState mirror) { + final result = mirror.complete(null, "Container mirror timed out") + store.put(mirror.targetImage, result) + persistence.saveMirrorState(result) + log.warn "Mirror container timed out - job=${jobSpec.operationName}; result=${result}" + } + + /** + * {@inheritDoc} + */ + @Override + void onJobException(JobSpec jobSpec, MirrorState mirror, Throwable error) { + final result = mirror.complete(null, error.message) + store.put(mirror.targetImage, result) + persistence.saveMirrorState(result) + log.error("Mirror container errored - job=${jobSpec.operationName}; result=${result}", error) + } +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/MirrorConfig.groovy b/src/main/groovy/io/seqera/wave/service/mirror/MirrorConfig.groovy new file mode 100644 index 000000000..52be93819 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/MirrorConfig.groovy @@ -0,0 +1,59 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import java.time.Duration + +import groovy.transform.CompileStatic +import io.micronaut.context.annotation.Value +import io.micronaut.core.annotation.Nullable +import jakarta.inject.Singleton +/** + * Model mirror service config options + * + * @author Paolo Di Tommaso + */ +@Singleton +@CompileStatic +class MirrorConfig { + + @Value('${wave.mirror.max-duration:15m}') + Duration maxDuration + + @Value('${wave.mirror.status.delay:4s}') + Duration statusDelay + + @Value('${wave.mirror.status.duration:1h}') + Duration statusDuration + + @Value('${wave.mirror.skopeoImage:`quay.io/skopeo/stable`}') + String skopeoImage + + @Value('${wave.mirror.retry-attempts:3}') + Integer retryAttempts + + @Nullable + @Value('${wave.mirror.requestsCpu}') + String requestsCpu + + @Nullable + @Value('${wave.mirror.requestsMemory}') + String requestsMemory + +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/MirrorRequest.groovy b/src/main/groovy/io/seqera/wave/service/mirror/MirrorRequest.groovy new file mode 100644 index 000000000..f1157cc72 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/MirrorRequest.groovy @@ -0,0 +1,99 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import java.nio.file.Path +import java.time.Instant + +import groovy.transform.Canonical +import groovy.transform.CompileStatic +import groovy.transform.ToString +import io.seqera.wave.core.ContainerPlatform +import io.seqera.wave.util.LongRndKey + +/** + * Model a container mirror request + * + * @author Paolo Di Tommaso + */ +@ToString(includeNames = true, includePackage = false) +@Canonical +@CompileStatic +class MirrorRequest { + + static final String ID_PREFIX = 'mr-' + + /** + * Unique id of the request + */ + final String id + + /** + * The container image to be mirrored + */ + final String sourceImage + + /** + * The target image name where the container should be mirrored + */ + final String targetImage + + /** + * The (SHA256) digest of the container to be mirrored + */ + final String digest + + /** + * The container platform to be copied + */ + final ContainerPlatform platform + + /** + * The work directory used by the mirror operation + */ + final Path workDir + + /** + * Docker config json to authorise the mirror (pull & push) operation + */ + final String authJson + + /** + * The timestamp when the request has been submitted + */ + final Instant creationTime + + static MirrorRequest create(String sourceImage, String targetImage, String digest, ContainerPlatform platform, Path workspace, String authJson, Instant ts=Instant.now()) { + assert sourceImage, "Argument 'sourceImage' cannot be null" + assert targetImage, "Argument 'targetImage' cannot be empty" + assert workspace, "Argument 'workspace' cannot be null" + assert digest, "Argument 'digest' cannot be empty" + + final id = LongRndKey.rndHex() + return new MirrorRequest( + ID_PREFIX + id, + sourceImage, + targetImage, + digest, + platform, + workspace.resolve("mirror-${id}"), + authJson, + ts ) + } +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/MirrorState.groovy b/src/main/groovy/io/seqera/wave/service/mirror/MirrorState.groovy new file mode 100644 index 000000000..39f313d51 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/MirrorState.groovy @@ -0,0 +1,111 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import java.time.Duration +import java.time.Instant + +import groovy.transform.Canonical +import groovy.transform.CompileStatic +import groovy.transform.ToString +import io.seqera.wave.api.BuildStatusResponse +import io.seqera.wave.core.ContainerPlatform +import io.seqera.wave.service.cache.StateRecord +import io.seqera.wave.service.job.JobRecord +import jakarta.inject.Singleton + +/** + * Model a container mirror result object + * + * @author Paolo Di Tommaso + */ +@ToString(includeNames = true, includePackage = false) +@Singleton +@CompileStatic +@Canonical +class MirrorState implements JobRecord, StateRecord { + enum Status { PENDING, COMPLETED } + + final String mirrorId + final String digest + final String sourceImage + final String targetImage + final ContainerPlatform platform + final Instant creationTime + final Status status + final Duration duration + final Integer exitCode + final String logs + + @Override + String getRecordId() { + return mirrorId + } + + @Override + boolean done() { + status==Status.COMPLETED + } + + boolean succeeded() { + status==Status.COMPLETED && exitCode==0 + } + + MirrorState complete(Integer exitCode, String logs ) { + new MirrorState( + this.mirrorId, + this.digest, + this.sourceImage, + this.targetImage, + this.platform, + this.creationTime, + Status.COMPLETED, + Duration.between(this.creationTime, Instant.now()), + exitCode, + logs + ) + } + + static MirrorState from(MirrorRequest request) { + new MirrorState( + request.id, + request.digest, + request.sourceImage, + request.targetImage, + request.platform, + request.creationTime, + Status.PENDING + ) + } + + BuildStatusResponse toStatusResponse() { + final status = status == Status.COMPLETED + ? BuildStatusResponse.Status.COMPLETED + : BuildStatusResponse.Status.PENDING + final succeeded = exitCode!=null + ? exitCode==0 + : null + return new BuildStatusResponse( + mirrorId, + status, + creationTime, + duration, + succeeded ) + } +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/MirrorStateStore.groovy b/src/main/groovy/io/seqera/wave/service/mirror/MirrorStateStore.groovy new file mode 100644 index 000000000..85338ded2 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/MirrorStateStore.groovy @@ -0,0 +1,72 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import java.time.Duration + +import groovy.transform.CompileStatic +import io.seqera.wave.encoder.MoshiEncodeStrategy +import io.seqera.wave.service.cache.AbstractCacheStore +import io.seqera.wave.service.cache.impl.CacheProvider +import jakarta.inject.Inject +import jakarta.inject.Singleton + +/** + * Implement a {@link io.seqera.wave.service.cache.CacheStore} for {@link MirrorState} objects + * + * @author Paolo Di Tommaso + */ +@Singleton +@CompileStatic +class MirrorStateStore extends AbstractCacheStore { + + @Inject + private MirrorConfig config + + MirrorStateStore(CacheProvider provider) { + super(provider, new MoshiEncodeStrategy() {}) + } + + @Override + protected String getPrefix() { + return 'wave-mirror/v1:' + } + + @Override + protected Duration getDuration() { + return config.statusDuration + } + + MirrorState awaitCompletion(String targetImage) { + final beg = System.currentTimeMillis() + while( true ) { + final result = get(targetImage) + // missing record + if( !result ) + throw new IllegalStateException("Unknown mirror container $targetImage") + // ok done + if( result.done() ) + return result + Thread.sleep(config.statusDelay) + // timeout the request + if( System.currentTimeMillis()-beg > config.statusDuration.toMillis() ) + throw new IllegalStateException("Timeout mirror container $targetImage") + } + } +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategy.groovy b/src/main/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategy.groovy new file mode 100644 index 000000000..9fa346d66 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategy.groovy @@ -0,0 +1,106 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror.strategy + + +import java.nio.file.Files +import java.nio.file.Path + +import groovy.json.JsonOutput +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.micronaut.context.annotation.Value +import io.seqera.wave.service.mirror.MirrorConfig +import io.seqera.wave.service.mirror.MirrorRequest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import static java.nio.file.StandardOpenOption.CREATE +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING +import static java.nio.file.StandardOpenOption.WRITE + +/** + * Implements a container mirror runner based on Docker + * + * @author Paolo Di Tommaso + */ +@Singleton +@CompileStatic +@Slf4j +class DockerMirrorStrategy extends MirrorStrategy { + + @Value('${wave.debug:false}') + private Boolean debug + + @Inject + private MirrorConfig mirrorConfig + + @Override + void mirrorJob(String jobName, MirrorRequest request) { + Path configFile = null + // create the work directory + Files.createDirectories(request.workDir) + // save docker config for creds + if( request.authJson ) { + configFile = request.workDir.resolve('config.json') + Files.write(configFile, JsonOutput.prettyPrint(request.authJson).bytes, CREATE, WRITE, TRUNCATE_EXISTING) + } + + // command the docker build command + final buildCmd = mirrorCmd(jobName, request.workDir, configFile) + buildCmd.addAll( copyCommand(request) ) + log.debug "Container mirror command: ${buildCmd.join(' ')}" + // save docker cli for debugging purpose + if( debug ) { + Files.write(request.workDir.resolve('docker.sh'), + buildCmd.join(' ').bytes, + CREATE, WRITE, TRUNCATE_EXISTING) + } + + final process = new ProcessBuilder() + .command(buildCmd) + .directory(request.workDir.toFile()) + .redirectErrorStream(true) + .start() + if( process.waitFor()!=0 ) { + throw new IllegalStateException("Unable to launch mirror container job - exitCode=${process.exitValue()}; output=${process.text}") + } + } + + protected List mirrorCmd(String name, Path workDir, Path credsFile ) { + //checkout the documentation here to know more about these options https://github.com/moby/buildkit/blob/master/docs/rootless.md#docker + final wrapper = ['docker', + 'run', + '--detach', + '--name', name, + '-v', "$workDir:$workDir".toString() ] + + if( credsFile ) { + wrapper.add('-v') + wrapper.add("$credsFile:/tmp/config.json:ro".toString()) + + wrapper.add("-e") + wrapper.add("REGISTRY_AUTH_FILE=/tmp/config.json") + } + + // the container image to be used to build + wrapper.add( mirrorConfig.skopeoImage ) + // return it + return wrapper + } +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/strategy/KubeMirrorStrategy.groovy b/src/main/groovy/io/seqera/wave/service/mirror/strategy/KubeMirrorStrategy.groovy new file mode 100644 index 000000000..40fa62ed2 --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/strategy/KubeMirrorStrategy.groovy @@ -0,0 +1,83 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror.strategy + +import java.nio.file.Files +import java.nio.file.Path + +import groovy.json.JsonOutput +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import io.kubernetes.client.openapi.ApiException +import io.micronaut.context.annotation.Primary +import io.micronaut.context.annotation.Requires +import io.seqera.wave.exception.BadRequestException +import io.seqera.wave.service.k8s.K8sService +import io.seqera.wave.service.mirror.MirrorConfig +import io.seqera.wave.service.mirror.MirrorRequest +import jakarta.inject.Inject +import jakarta.inject.Singleton +import static java.nio.file.StandardOpenOption.CREATE +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING +import static java.nio.file.StandardOpenOption.WRITE + +/** + * Implements a container mirror runner based on Kubernetes + * + * @author Paolo Di Tommaso + */ +@Slf4j +@Primary +@Requires(property = 'wave.build.k8s') +@Singleton +@CompileStatic +class KubeMirrorStrategy extends MirrorStrategy { + + @Inject + private MirrorConfig config + + @Inject + private K8sService k8sService + + @Override + void mirrorJob(String jobName, MirrorRequest request) { + Path configFile = null + // create the work directory + Files.createDirectories(request.workDir) + // save docker config for creds + if( request.authJson ) { + configFile = request.workDir.resolve('config.json') + Files.write(configFile, JsonOutput.prettyPrint(request.authJson).bytes, CREATE, WRITE, TRUNCATE_EXISTING) + } + + try { + k8sService.launchMirrorJob( + jobName, + config.skopeoImage, + copyCommand(request), + request.workDir, + configFile, + config) + } + catch (ApiException e) { + throw new BadRequestException("Unexpected build failure - ${e.responseBody}", e) + } + + } +} diff --git a/src/main/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategy.groovy b/src/main/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategy.groovy new file mode 100644 index 000000000..a69acfe6e --- /dev/null +++ b/src/main/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategy.groovy @@ -0,0 +1,57 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror.strategy + + +import groovy.transform.CompileStatic +import io.seqera.wave.service.mirror.MirrorRequest +/** + * Implement the common strategy to handle container mirror + * via Skopeo + * + * @author Paolo Di Tommaso + */ +@CompileStatic +abstract class MirrorStrategy { + + abstract void mirrorJob(String jobName, MirrorRequest request) + + protected List copyCommand(MirrorRequest request) { + final result = new ArrayList(20) + if( request.platform ) { + result.add("--override-os") + result.add(request.platform.os) + result.add("--override-arch") + result.add(request.platform.arch) + if( request.platform.variant ) { + result.add("--override-variant") + result.add(request.platform.variant) + } + } + + result.add("copy") + result.add("--preserve-digests") + result.add("--multi-arch") + result.add( request.platform ? 'system' : 'all') + result.add("docker://${request.sourceImage}".toString()) + result.add("docker://${request.targetImage}".toString()) + + return result + } +} diff --git a/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy b/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy index 0269bd267..fb1a1eb61 100644 --- a/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy +++ b/src/main/groovy/io/seqera/wave/service/persistence/PersistenceService.groovy @@ -21,6 +21,7 @@ package io.seqera.wave.service.persistence import groovy.transform.CompileStatic import io.seqera.wave.core.ContainerDigestPair import io.seqera.wave.exception.NotFoundException +import io.seqera.wave.service.mirror.MirrorState import io.seqera.wave.service.scan.ScanResult /** * A storage for statistic data @@ -134,4 +135,28 @@ interface PersistenceService { scanRecord.vulnerabilities ) } + /** + * Load a mirror state record + * + * @param mirrorId The ID of the mirror record + * @return The corresponding {@link MirrorState} object or null if it cannot be found + */ + MirrorState loadMirrorState(String mirrorId) + + /** + * Load a mirror state record given the target image name and the image digest + * + * @param targetImage The target mirrored image name + * @param digest The image content SHA256 digest + * @return The corresponding {@link MirrorState} object or null if it cannot be found + */ + MirrorState loadMirrorState(String targetImage, String digest) + + /** + * Persists a {@link MirrorState} state record + * + * @param mirror {@link MirrorState} object + */ + void saveMirrorState(MirrorState mirror) + } diff --git a/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy b/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy index ca620ea45..bebed9952 100644 --- a/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy +++ b/src/main/groovy/io/seqera/wave/service/persistence/impl/LocalPersistenceService.groovy @@ -20,6 +20,7 @@ package io.seqera.wave.service.persistence.impl import groovy.transform.CompileStatic import io.seqera.wave.core.ContainerDigestPair +import io.seqera.wave.service.mirror.MirrorState import io.seqera.wave.service.persistence.PersistenceService import io.seqera.wave.service.persistence.WaveBuildRecord import io.seqera.wave.service.persistence.WaveContainerRecord @@ -38,6 +39,7 @@ class LocalPersistenceService implements PersistenceService { private Map requestStore = new HashMap<>() private Map scanStore = new HashMap<>() + private Map mirrorStore = new HashMap<>() @Override void saveBuild(WaveBuildRecord record) { @@ -96,4 +98,16 @@ class LocalPersistenceService implements PersistenceService { scanStore.get(scanId) } + MirrorState loadMirrorState(String mirrorId) { + mirrorStore.get(mirrorId) + } + + MirrorState loadMirrorState(String targetImage, String digest) { + mirrorStore.values().find( (MirrorState mirror) -> mirror.targetImage==targetImage && mirror.digest==digest ) + } + + void saveMirrorState(MirrorState mirror) { + mirrorStore.put(mirror.mirrorId, mirror) + } + } diff --git a/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy b/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy index fd5f582d6..14fee8927 100644 --- a/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy +++ b/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealClient.groovy @@ -27,6 +27,7 @@ import io.micronaut.http.annotation.Post import io.micronaut.http.annotation.Put import io.micronaut.http.client.annotation.Client import io.micronaut.retry.annotation.Retryable +import io.seqera.wave.service.mirror.MirrorState import io.seqera.wave.service.persistence.WaveScanRecord import io.seqera.wave.service.scan.ScanVulnerability import io.seqera.wave.service.persistence.WaveBuildRecord @@ -85,4 +86,6 @@ interface SurrealClient { @Post('/key/wave_scan_vuln') Map insertScanVulnerability(@Header String authorization, @Body ScanVulnerability scanVulnerability) + @Post('/key/wave_mirror') + Flux> insertMirrorAsync(@Header String authorization, @Body MirrorState body) } diff --git a/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceService.groovy b/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceService.groovy index ac5aca338..0eeaef4fe 100644 --- a/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceService.groovy +++ b/src/main/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceService.groovy @@ -30,6 +30,7 @@ import io.micronaut.runtime.event.ApplicationStartupEvent import io.micronaut.runtime.event.annotation.EventListener import io.seqera.wave.core.ContainerDigestPair import io.seqera.wave.service.builder.BuildRequest +import io.seqera.wave.service.mirror.MirrorState import io.seqera.wave.service.persistence.PersistenceService import io.seqera.wave.service.persistence.WaveBuildRecord import io.seqera.wave.service.persistence.WaveContainerRecord @@ -87,6 +88,10 @@ class SurrealPersistenceService implements PersistenceService { final ret4 = surrealDb.sqlAsMap(authorization, "define table wave_scan_vuln SCHEMALESS") if( ret4.status != "OK") throw new IllegalStateException("Unable to define SurrealDB table wave_scan_vuln - cause: $ret4") + // create wave_mirror table + final ret5 = surrealDb.sqlAsMap(authorization, "define table wave_mirror SCHEMALESS") + if( ret5.status != "OK") + throw new IllegalStateException("Unable to define SurrealDB table wave_mirror - cause: $ret5") } protected String getAuthorization() { @@ -249,4 +254,55 @@ class SurrealPersistenceService implements PersistenceService { return result } + // === mirror operations + + /** + * Load a mirror state record + * + * @param mirrorId The ID of the mirror record + * @return The corresponding {@link MirrorState} object or null if it cannot be found + */ + MirrorState loadMirrorState(String mirrorId) { + final query = "select * from wave_mirror where mirrorId = '$mirrorId'" + final json = surrealDb.sqlAsString(getAuthorization(), query) + final type = new TypeReference>>() {} + final data= json ? JacksonHelper.fromJson(json, type) : null + final result = data && data[0].result ? data[0].result[0] : null + return result + } + + /** + * Load a mirror state record given the target image name and the image digest + * + * @param targetImage The target mirrored image name + * @param digest The image content SHA256 digest + * @return The corresponding {@link MirrorState} object or null if it cannot be found + */ + MirrorState loadMirrorState(String targetImage, String digest) { + final query = "select * from wave_mirror where targetImage = '$targetImage' and digest = '$digest'" + final json = surrealDb.sqlAsString(getAuthorization(), query) + final type = new TypeReference>>() {} + final data= json ? JacksonHelper.fromJson(json, type) : null + final result = data && data[0].result ? data[0].result[0] : null + return result + } + + /** + * Persists a {@link MirrorState} object + * + * @param mirror {@link MirrorState} object + */ + @Override + void saveMirrorState(MirrorState mirror) { + surrealDb.insertMirrorAsync(getAuthorization(), mirror).subscribe({ result-> + log.trace "Mirror request with id '$mirror.mirrorId' saved record: ${result}" + }, {error-> + def msg = error.message + if( error instanceof HttpClientResponseException ){ + msg += ":\n $error.response.body" + } + log.error("Error saving Mirror request record ${msg}\n${mirror}", error) + }) + } + } diff --git a/src/main/groovy/io/seqera/wave/service/validation/ValidationService.groovy b/src/main/groovy/io/seqera/wave/service/validation/ValidationService.groovy index 472105112..888e83c94 100644 --- a/src/main/groovy/io/seqera/wave/service/validation/ValidationService.groovy +++ b/src/main/groovy/io/seqera/wave/service/validation/ValidationService.groovy @@ -31,4 +31,6 @@ interface ValidationService { String checkBuildRepository(String repo, boolean cache) + String checkMirrorRegistry(String registry) + } diff --git a/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy b/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy index 6e5e220e4..cbecf6f61 100644 --- a/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy +++ b/src/main/groovy/io/seqera/wave/service/validation/ValidationServiceImpl.groovy @@ -92,4 +92,20 @@ class ValidationServiceImpl implements ValidationService { return null } + @Override + String checkMirrorRegistry(String registry) { + if( !registry ) + return null + final prot = StringUtils.getUrlProtocol(registry) + if( prot ) + return "Mirror registry should not include any protocol prefix - offending value: $registry" + // check no tag is included + final coords = ContainerCoordinates.parse(registry) + if( coords.repository ) + return "Mirror registry syntax is invalid - offending value: ${registry}" + if( coords.registry == 'wave.seqera.io' || coords.registry?.contains('.wave.seqera.io') ) + return "Mirror registry not allowed - offending value: ${registry}" + return null + } + } diff --git a/src/main/groovy/io/seqera/wave/util/ContainerHelper.groovy b/src/main/groovy/io/seqera/wave/util/ContainerHelper.groovy index ae90a0578..7348d27ce 100644 --- a/src/main/groovy/io/seqera/wave/util/ContainerHelper.groovy +++ b/src/main/groovy/io/seqera/wave/util/ContainerHelper.groovy @@ -133,16 +133,16 @@ class ContainerHelper { static SubmitContainerTokenResponse makeResponseV1(ContainerRequestData data, TokenData token, String waveImage) { final target = waveImage final build = data.buildNew ? data.buildId : null - return new SubmitContainerTokenResponse(token.value, target, token.expiration, data.containerImage, build, null, null) + return new SubmitContainerTokenResponse(token.value, target, token.expiration, data.containerImage, build, null, null, null) } static SubmitContainerTokenResponse makeResponseV2(ContainerRequestData data, TokenData token, String waveImage) { - final target = data.freeze ? data.containerImage : waveImage + final target = data.durable() ? data.containerImage : waveImage final build = data.buildId final Boolean cached = !data.buildNew - final expiration = !data.freeze ? token.expiration : null - final tokenId = !data.freeze ? token.value : null - return new SubmitContainerTokenResponse(tokenId, target, expiration, data.containerImage, build, cached, data.freeze) + final expiration = !data.durable() ? token.expiration : null + final tokenId = !data.durable() ? token.value : null + return new SubmitContainerTokenResponse(tokenId, target, expiration, data.containerImage, build, cached, data.freeze, data.mirror) } static String patchPlatformEndpoint(String endpoint) { diff --git a/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy b/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy index aaf737ab9..74ff3bf59 100644 --- a/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy +++ b/src/test/groovy/io/seqera/wave/controller/ContainerControllerTest.groovy @@ -24,12 +24,14 @@ import spock.lang.Unroll import java.time.Instant import java.time.temporal.ChronoUnit +import io.micronaut.context.ApplicationContext import io.micronaut.context.annotation.Property import io.micronaut.http.HttpRequest import io.micronaut.http.HttpStatus import io.micronaut.http.client.HttpClient import io.micronaut.http.client.annotation.Client import io.micronaut.http.server.util.HttpClientAddressResolver +import io.micronaut.test.annotation.MockBean import io.micronaut.test.extensions.spock.annotation.MicronautTest import io.seqera.wave.api.ContainerConfig import io.seqera.wave.api.ImageNameStrategy @@ -49,6 +51,9 @@ import io.seqera.wave.service.builder.FreezeService import io.seqera.wave.service.builder.FreezeServiceImpl import io.seqera.wave.service.inclusion.ContainerInclusionService import io.seqera.wave.service.inspect.ContainerInspectServiceImpl +import io.seqera.wave.service.job.JobService +import io.seqera.wave.service.job.JobServiceImpl +import io.seqera.wave.service.mirror.ContainerMirrorService import io.seqera.wave.service.pairing.PairingService import io.seqera.wave.service.pairing.socket.PairingChannel import io.seqera.wave.service.persistence.PersistenceService @@ -80,6 +85,14 @@ class ContainerControllerTest extends Specification { @Inject JwtAuthStore jwtAuthStore + @Inject + ApplicationContext applicationContext + + @MockBean(JobServiceImpl) + JobService mockJobService() { + Mock(JobService) + } + def setup() { jwtAuthStore.clear() } @@ -156,7 +169,7 @@ class ContainerControllerTest extends Specification { def builder = Mock(ContainerBuildService) def dockerAuth = Mock(ContainerInspectServiceImpl) def proxyRegistry = Mock(RegistryProxyService) - def controller = new ContainerController(buildService: builder, dockerAuthService: dockerAuth, registryProxyService: proxyRegistry, buildConfig: buildConfig, inclusionService: Mock(ContainerInclusionService)) + def controller = new ContainerController(buildService: builder, inspectService: dockerAuth, registryProxyService: proxyRegistry, buildConfig: buildConfig, inclusionService: Mock(ContainerInclusionService)) def DOCKER = 'FROM foo' def user = new PlatformId(new User(id: 100)) def cfg = new ContainerConfig() @@ -184,7 +197,7 @@ class ContainerControllerTest extends Specification { def dockerAuth = Mock(ContainerInspectServiceImpl) def proxyRegistry = Mock(RegistryProxyService) def persistenceService = Mock(PersistenceService) - def controller = new ContainerController(buildService: builder, dockerAuthService: dockerAuth, registryProxyService: proxyRegistry, buildConfig: buildConfig, persistenceService:persistenceService, inclusionService: Mock(ContainerInclusionService)) + def controller = new ContainerController(buildService: builder, inspectService: dockerAuth, registryProxyService: proxyRegistry, buildConfig: buildConfig, persistenceService:persistenceService, inclusionService: Mock(ContainerInclusionService)) def DOCKER = 'FROM foo' def user = new PlatformId(new User(id: 100)) def cfg = new ContainerConfig() @@ -212,7 +225,7 @@ class ContainerControllerTest extends Specification { def builder = Mock(ContainerBuildService) def dockerAuth = Mock(ContainerInspectServiceImpl) def proxyRegistry = Mock(RegistryProxyService) - def controller = new ContainerController(buildService: builder, dockerAuthService: dockerAuth, registryProxyService: proxyRegistry, buildConfig:buildConfig, inclusionService: Mock(ContainerInclusionService)) + def controller = new ContainerController(buildService: builder, inspectService: dockerAuth, registryProxyService: proxyRegistry, buildConfig:buildConfig, inclusionService: Mock(ContainerInclusionService)) def DOCKER = 'FROM foo' def user = new PlatformId(new User(id: 100)) def cfg = new ContainerConfig() @@ -236,10 +249,39 @@ class ContainerControllerTest extends Specification { data.platform.toString() == 'linux/arm64' } + def 'should make a mirror request' () { + given: + def jobService = applicationContext.getBean(JobService) + def mirrorService = applicationContext.getBean(ContainerMirrorService) + def inspectService = Mock(ContainerInspectServiceImpl) + def proxyRegistry = Mock(RegistryProxyService) + def controller = new ContainerController(mirrorService: mirrorService, inspectService: inspectService, registryProxyService: proxyRegistry, buildConfig: buildConfig, inclusionService: Mock(ContainerInclusionService)) + def user = new PlatformId(new User(id: 100)) + def req = new SubmitContainerTokenRequest( + containerImage: 'docker.io/source/image:latest', + containerPlatform: 'arm64', + mirrorRegistry: 'quay.io' + ) + + when: + def data = controller.makeRequestData(req, user, "") + then: + 1 * proxyRegistry.getImageDigest('docker.io/source/image:latest', user) >> 'sha256:12345' + 1 * proxyRegistry.getImageDigest('quay.io/source/image:latest', user) >> null + and: + data.identity.userId == 100 + data.containerImage == 'quay.io/source/image:latest' + data.platform.toString() == 'linux/arm64' + data.buildId =~ /mr-.+/ + data.buildNew + !data.freeze + data.mirror + } + def 'should create build request' () { given: def dockerAuth = Mock(ContainerInspectServiceImpl) - def controller = new ContainerController(dockerAuthService: dockerAuth, buildConfig: buildConfig) + def controller = new ContainerController(inspectService: dockerAuth, buildConfig: buildConfig) when: def submit = new SubmitContainerTokenRequest(containerFile: encode('FROM foo')) @@ -284,7 +326,7 @@ class ContainerControllerTest extends Specification { def 'should return a bad request exception when field is not encoded' () { given: def dockerAuth = Mock(ContainerInspectServiceImpl) - def controller = new ContainerController(dockerAuthService: dockerAuth, buildConfig: buildConfig) + def controller = new ContainerController(inspectService: dockerAuth, buildConfig: buildConfig) // validate containerFile when: @@ -369,7 +411,7 @@ class ContainerControllerTest extends Specification { def pairing = Mock(PairingService) def channel = Mock(PairingChannel) def controller = new ContainerController(validationService: validation, pairingService: pairing, pairingChannel: channel) - def msg + def err when: controller.validateContainerRequest(new SubmitContainerTokenRequest()) @@ -389,15 +431,89 @@ class ContainerControllerTest extends Specification { when: controller.validateContainerRequest(new SubmitContainerTokenRequest(containerImage: 'http://docker.io/foo:latest')) then: - msg = thrown(BadRequestException) - msg.message == 'Invalid container repository name — offending value: http://docker.io/foo:latest' + err = thrown(BadRequestException) + err.message == 'Invalid container repository name — offending value: http://docker.io/foo:latest' when: controller.validateContainerRequest(new SubmitContainerTokenRequest(containerImage: 'http:docker.io/foo:latest')) then: - msg = thrown(BadRequestException) - msg.message == 'Invalid container image name — offending value: http:docker.io/foo:latest' + err = thrown(BadRequestException) + err.message == 'Invalid container image name — offending value: http:docker.io/foo:latest' + + } + + def 'should validate mirror request' () { + given: + def validation = new ValidationServiceImpl() + def pairing = Mock(PairingService) + def channel = Mock(PairingChannel) + def controller = new ContainerController(validationService: validation, pairingService: pairing, pairingChannel: channel) + def err + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(containerImage: 'foo:latest'), false) + then: + noExceptionThrown() + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io'), false) + then: + err = thrown(BadRequestException) + err.message == 'Container mirroring requires the use of v2 API' + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io'), true) + then: + err = thrown(BadRequestException) + err.message == 'Attribute `containerImage` is required when specifying `mirrorRegistry`' + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io', containerImage: 'docker.io/foo'), true) + then: + err = thrown(BadRequestException) + err.message == 'Container mirroring requires an authenticated request - specify the tower token attribute' + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'docker.io', containerImage: 'docker.io/foo', towerAccessToken: 'xyz'), true) + then: + err = thrown(BadRequestException) + err.message == "Source and target mirror registry as the same - offending value 'docker.io'" + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'docker.io', containerImage: 'foo', towerAccessToken: 'xyz'), true) + then: + err = thrown(BadRequestException) + err.message == "Source and target mirror registry as the same - offending value 'docker.io'" + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io', containerImage: 'docker.io/foo', towerAccessToken: 'xyz', containerFile: 'content'), true) + then: + err = thrown(BadRequestException) + err.message == "Attribute `mirrorRegistry` and `containerFile` conflict each other" + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io', containerImage: 'docker.io/foo', towerAccessToken: 'xyz', freeze: true), true) + then: + err = thrown(BadRequestException) + err.message == "Attribute `mirrorRegistry` and `freeze` conflict each other" + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io', containerImage: 'docker.io/foo', towerAccessToken: 'xyz', containerIncludes: ['include']), true) + then: + err = thrown(BadRequestException) + err.message == "Attribute `mirrorRegistry` and `containerIncludes` conflict each other" + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io', containerImage: 'docker.io/foo', towerAccessToken: 'xyz', containerConfig: new ContainerConfig(entrypoint: ['foo'])), true) + then: + err = thrown(BadRequestException) + err.message == "Attribute `mirrorRegistry` and `containerConfig` conflict each other" + + when: + controller.validateMirrorRequest(new SubmitContainerTokenRequest(mirrorRegistry: 'quay.io/bar', containerImage: 'docker.io/foo', towerAccessToken: 'xyz'), true) + then: + err = thrown(BadRequestException) + err.message == "Mirror registry syntax is invalid - offending value: quay.io/bar" } def 'should create response with conda packages' () { @@ -409,7 +525,7 @@ class ContainerControllerTest extends Specification { def addressResolver = Mock(HttpClientAddressResolver) def tokenService = Mock(ContainerTokenService) def persistence = Mock(PersistenceService) - def controller = new ContainerController(freezeService: freeze, buildService: builder, dockerAuthService: dockerAuth, + def controller = new ContainerController(freezeService: freeze, buildService: builder, inspectService: dockerAuth, registryProxyService: proxyRegistry, buildConfig: buildConfig, inclusionService: Mock(ContainerInclusionService), addressResolver: addressResolver, tokenService: tokenService, persistenceService: persistence, serverUrl: 'http://wave.com') diff --git a/src/test/groovy/io/seqera/wave/service/ContainerRequestDataTest.groovy b/src/test/groovy/io/seqera/wave/service/ContainerRequestDataTest.groovy index d807826bb..fd1c647ac 100644 --- a/src/test/groovy/io/seqera/wave/service/ContainerRequestDataTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/ContainerRequestDataTest.groovy @@ -20,6 +20,8 @@ package io.seqera.wave.service import spock.lang.Specification +import io.seqera.wave.api.ContainerConfig +import io.seqera.wave.core.ContainerPlatform import io.seqera.wave.tower.PlatformId import io.seqera.wave.tower.User @@ -38,7 +40,59 @@ class ContainerRequestDataTest extends Specification { then: req.identity req.identity == new PlatformId(new User(id:1)) + } + + def 'should validate constructor' () { + when: + def cfg = Mock(ContainerConfig) + def req = new ContainerRequestData( + new PlatformId(new User(id:1)), + 'foo', + 'from docker', + cfg, + 'conda file', + ContainerPlatform.DEFAULT, + '12345', + true, + true, + true ) + then: + req.identity == new PlatformId(new User(id:1)) + req.containerImage == 'foo' + req.containerFile == 'from docker' + req.containerConfig == cfg + req.condaFile == 'conda file' + req.platform == ContainerPlatform.DEFAULT + req.buildId == '12345' + req.buildNew + req.freeze + req.mirror + + } + + def 'should validate durable flag' () { + given: + def req = new ContainerRequestData( + new PlatformId(new User(id:1)), + null, + null, + null, + null, + null, + null, + null, + FREEZE, + MIRROR ) + + expect: + req.durable() == EXPECTED + where: + FREEZE | MIRROR | EXPECTED + false | false | false + true | false | true + false | true | true + true | true | true } } diff --git a/src/test/groovy/io/seqera/wave/service/cache/impl/AbstractCacheStoreTest.groovy b/src/test/groovy/io/seqera/wave/service/cache/impl/AbstractCacheStoreTest.groovy new file mode 100644 index 000000000..f89db42fd --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/cache/impl/AbstractCacheStoreTest.groovy @@ -0,0 +1,273 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.cache.impl + +import spock.lang.Specification + +import java.time.Duration + +import groovy.transform.Canonical +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.seqera.wave.encoder.MoshiEncodeStrategy +import io.seqera.wave.service.cache.AbstractCacheStore +import io.seqera.wave.service.cache.StateRecord +import jakarta.inject.Inject +/** + * + * @author Paolo Di Tommaso + */ +@MicronautTest +class AbstractCacheStoreTest extends Specification { + + @Inject LocalCacheProvider provider + + static public long ttlMillis = 100 + + @Canonical + static class MyObject { + String field1 + String field2 + } + + static class MyState extends MyObject implements StateRecord { + + MyState(String field1, String field2) { + super(field1, field2) + } + + @Override + String getRecordId() { + return field1 + } + } + + static class MyCacheStore extends AbstractCacheStore { + + MyCacheStore(CacheProvider provider) { + super(provider, new MoshiEncodeStrategy() {}) + } + + @Override + protected String getPrefix() { + return 'test/v1:' + } + + @Override + protected Duration getDuration() { + return Duration.ofMillis(ttlMillis) + } + } + + def 'should get key' () { + given: + def store = new MyCacheStore(provider) + + expect: + store.key0('one') == 'test/v1:one' + } + + def 'should get record id' () { + given: + def store = new MyCacheStore(provider) + + expect: + store.recordId0('one') == 'test/v1:state-id/one' + } + + def 'should get and put a value' () { + given: + def store = new MyCacheStore(provider) + def key = UUID.randomUUID().toString() + + expect: + store.get(key) == null + + when: + store.put(key, new MyObject('this','that')) + then: + store.get(key) == new MyObject('this','that') + } + + def 'should get and put a value' () { + given: + def store = new MyCacheStore(provider) + def key = UUID.randomUUID().toString() + + expect: + store.get(key) == null + + when: + store.put(key, new MyObject('this','that')) + then: + store.get(key) == new MyObject('this','that') + + when: + sleep ttlMillis *2 + then: + store.get(key) == null + } + + def 'should get and put a value with custom ttl' () { + given: + def store = new MyCacheStore(provider) + def key = UUID.randomUUID().toString() + + expect: + store.get(key) == null + + when: + store.put(key, new MyObject('this','that'), Duration.ofSeconds(10)) + then: + store.get(key) == new MyObject('this','that') + + when: + sleep ttlMillis *2 + then: + store.get(key) == new MyObject('this','that') + } + + def 'should put and remove and item' () { + given: + def store = new MyCacheStore(provider) + def key = UUID.randomUUID().toString() + + when: + store.put(key, new MyObject('this','that'), Duration.ofSeconds(10)) + then: + store.get(key) == new MyObject('this','that') + + when: + store.remove(key) + then: + store.get(key) == null + } + + def 'should put if absent' () { + given: + def store = new MyCacheStore(provider) + def key = UUID.randomUUID().toString() + + when: + def done = store.putIfAbsent(key, new MyObject('this','that')) + then: + done + and: + store.get(key) == new MyObject('this','that') + + when: + done = store.putIfAbsent(key, new MyObject('xx','yy')) + then: + !done + and: + store.get(key) == new MyObject('this','that') + + when: + sleep ttlMillis*2 + done = store.putIfAbsent(key, new MyObject('xx','yy')) + then: + done + and: + store.get(key) == new MyObject('xx','yy') + + } + + def 'should put if absent with custom ttl' () { + given: + def store = new MyCacheStore(provider) + def key = UUID.randomUUID().toString() + + when: + def done = store.putIfAbsent(key, new MyObject('this','that'), Duration.ofSeconds(10)) + then: + done + and: + store.get(key) == new MyObject('this','that') + + when: + done = store.putIfAbsent(key, new MyObject('xx','yy')) + then: + !done + and: + store.get(key) == new MyObject('this','that') + + when: + sleep ttlMillis*2 + done = store.putIfAbsent(key, new MyObject('xx','yy')) + then: + !done + and: + store.get(key) == new MyObject('this','that') + + } + + + def 'should put and get value by record id' () { + given: + def store = new MyCacheStore(provider) + def recId = UUID.randomUUID().toString() + def key = UUID.randomUUID().toString() + + expect: + store.get(key) == null + store.getByRecordId(recId) == null + + when: + def value = new MyState(recId, 'value') + store.put(key, value) + then: + store.get(key) == value + store.getByRecordId(recId) == value + and: + store.get(recId) == null + store.getByRecordId(key) == null + } + + + def 'should put and get value by record id if absent' () { + given: + def store = new MyCacheStore(provider) + def recId = UUID.randomUUID().toString() + def key = UUID.randomUUID().toString() + + expect: + store.get(key) == null + store.getByRecordId(recId) == null + + when: + def value = new MyState(recId, 'value') + def done = store.putIfAbsent(key, value) + then: + done + and: + store.get(key) == value + store.getByRecordId(recId) == value + and: + store.get(recId) == null + store.getByRecordId(key) == null + + when: + done = store.putIfAbsent(key, new MyState('xx', 'yy')) + then: + !done + and: + store.get(key) == value + store.getByRecordId(recId) == value + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/cache/impl/LocalCacheProviderTest.groovy b/src/test/groovy/io/seqera/wave/service/cache/impl/LocalCacheProviderTest.groovy index d1eaee375..6b05d76f5 100644 --- a/src/test/groovy/io/seqera/wave/service/cache/impl/LocalCacheProviderTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/cache/impl/LocalCacheProviderTest.groovy @@ -32,52 +32,102 @@ class LocalCacheProviderTest extends Specification { LocalCacheProvider localCacheProvider - def 'conditional put with current value when ke is not set'() { - when: 'conditionally set a key that has no current value' - def current = localCacheProvider.putIfAbsentAndGetCurrent('key', 'new-value', Duration.ofMillis(Long.MAX_VALUE)) + def 'should get and put a key-value pair' () { + given: + def k = UUID.randomUUID().toString() - then: 'the provided value is returned' - current == 'new-value' + expect: + localCacheProvider.get(k) == null - and: 'the value is set in the store' - localCacheProvider.get('key') == 'new-value' - when: - def other = localCacheProvider.putIfAbsentAndGetCurrent('key', 'hola', Duration.ofMillis(Long.MAX_VALUE)) + localCacheProvider.put(k, "hello") then: - // should not be set because it already exists - other == 'new-value' + localCacheProvider.get(k) == 'hello' } - def 'conditional put with current value when key is already set'() { - given: 'a store containing a mapping for key that is not expired' - localCacheProvider.put('key','existing', Duration.ofMillis(Long.MAX_VALUE)) + def 'should get and put a key-value pair with ttl' () { + given: + def TTL = 100 + def k = UUID.randomUUID().toString() - when: 'try to conditionally set the key to a new value' - def current = localCacheProvider.putIfAbsentAndGetCurrent('key', 'new-value', Duration.ofMillis(Long.MAX_VALUE)) + expect: + localCacheProvider.get(k) == null - then: 'the existing value is returned' - current == 'existing' + when: + localCacheProvider.put(k, "hello", Duration.ofMillis(TTL)) + then: + localCacheProvider.get(k) == 'hello' + then: + sleep(TTL *2) + and: + localCacheProvider.get(k) == null + } + + def 'should get and put only if absent' () { + given: + def k = UUID.randomUUID().toString() + + expect: + localCacheProvider.get(k) == null - and: 'the value is not updated in the store' - localCacheProvider.get('key') == 'existing' + when: + def done = localCacheProvider.putIfAbsent(k, 'foo') + then: + done + and: + localCacheProvider.get(k) == 'foo' + + when: + done = localCacheProvider.putIfAbsent(k, 'bar') + then: + !done + and: + localCacheProvider.get(k) == 'foo' } + def 'should get and put if absent with ttl' () { + given: + def TTL = 100 + def k = UUID.randomUUID().toString() + + when: + def done = localCacheProvider.putIfAbsent(k, 'foo', Duration.ofMillis(TTL)) + then: + done + and: + localCacheProvider.get(k) == 'foo' + + when: + done = localCacheProvider.putIfAbsent(k, 'bar', Duration.ofMillis(TTL)) + then: + !done + and: + localCacheProvider.get(k) == 'foo' - def 'conditional put with current value when key is set and has expired'() { - given: 'a store containing a mapping for key that will expire' - localCacheProvider.put('key', 'existing', Duration.ofMillis(100)) - // give time for cache store to expire the key - sleep(Duration.ofMillis(200).toMillis()) + when: + sleep(TTL *2) + and: + done = localCacheProvider.putIfAbsent(k, 'bar', Duration.ofMillis(TTL)) + then: + done + and: + localCacheProvider.get(k) == 'bar' + } - when: 'try to conditionally set the key to a new value' - def current = localCacheProvider.putIfAbsentAndGetCurrent('key', 'new-value', Duration.ofMillis(100)) + def 'should put and remove a value' () { + given: + def TTL = 100 + def k = UUID.randomUUID().toString() - then: 'the provided value is returned' - current == 'new-value' + when: + localCacheProvider.put(k, 'foo') + then: + localCacheProvider.get(k) == 'foo' - and: 'the value is updated is set in the store' - localCacheProvider.get('key') == 'new-value' + when: + localCacheProvider.remove(k) + then: + localCacheProvider.get(k) == null } } diff --git a/src/test/groovy/io/seqera/wave/service/cache/impl/RedisCacheProviderTest.groovy b/src/test/groovy/io/seqera/wave/service/cache/impl/RedisCacheProviderTest.groovy index 6f548d7f4..e11c4b7a2 100644 --- a/src/test/groovy/io/seqera/wave/service/cache/impl/RedisCacheProviderTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/cache/impl/RedisCacheProviderTest.groovy @@ -47,46 +47,101 @@ class RedisCacheProviderTest extends Specification implements RedisTestContainer applicationContext.close() } - def 'conditional put with current value when ke is not set'() { - when: 'conditionally set a key that has no current value' - def current = redisCacheProvider.putIfAbsentAndGetCurrent('key', 'new-value', Duration.ofSeconds(100)) + def 'should get and put a key-value pair' () { + given: + def k = UUID.randomUUID().toString() - then: 'the provided value is returned' - current == 'new-value' + expect: + redisCacheProvider.get(k) == null - and: 'the value is set in the store' - redisCacheProvider.get('key') == 'new-value' + when: + redisCacheProvider.put(k, "hello") + then: + redisCacheProvider.get(k) == 'hello' } - def 'conditional put with current value when key is already set'() { - given: 'a store containing a mapping for key that is not expired' - redisCacheProvider.put('key', 'existing', Duration.ofSeconds(100)) - - when: 'try to conditionally set the key to a new value' - def current = redisCacheProvider.putIfAbsentAndGetCurrent('key', 'new-value', Duration.ofSeconds(100)) - - then: 'the existing value is returned' - current == 'existing' - - and: 'the value is not updated in the store' - redisCacheProvider.get('key') == 'existing' + def 'should get and put a key-value pair with ttl' () { + given: + def TTL = 100 + def k = UUID.randomUUID().toString() + + expect: + redisCacheProvider.get(k) == null + + when: + redisCacheProvider.put(k, "hello", Duration.ofMillis(TTL)) + then: + redisCacheProvider.get(k) == 'hello' + then: + sleep(TTL *2) + and: + redisCacheProvider.get(k) == null + } + def 'should get and put only if absent' () { + given: + def k = UUID.randomUUID().toString() + + expect: + redisCacheProvider.get(k) == null + + when: + def done = redisCacheProvider.putIfAbsent(k, 'foo') + then: + done + and: + redisCacheProvider.get(k) == 'foo' + + when: + done = redisCacheProvider.putIfAbsent(k, 'bar') + then: + !done + and: + redisCacheProvider.get(k) == 'foo' } - def 'conditional put with current value when key is set and has expired'() { - given: 'a store containing a mapping for key that will expire' - redisCacheProvider.put('key', 'existing', Duration.ofSeconds(1)) - // give time for redis to expire the key - sleep(Duration.ofSeconds(2).toMillis()) + def 'should get and put if absent with ttl' () { + given: + def TTL = 100 + def k = UUID.randomUUID().toString() + + when: + def done = redisCacheProvider.putIfAbsent(k, 'foo', Duration.ofMillis(TTL)) + then: + done + and: + redisCacheProvider.get(k) == 'foo' + + when: + done = redisCacheProvider.putIfAbsent(k, 'bar', Duration.ofMillis(TTL)) + then: + !done + and: + redisCacheProvider.get(k) == 'foo' + + when: + sleep(TTL *2) + and: + done = redisCacheProvider.putIfAbsent(k, 'bar', Duration.ofMillis(TTL)) + then: + done + and: + redisCacheProvider.get(k) == 'bar' + } - when: 'try to conditionally set the key to a new value' - def current = redisCacheProvider.putIfAbsentAndGetCurrent('key', 'new-value', Duration.ofSeconds(100)) + def 'should put and remove a value' () { + given: + def TTL = 100 + def k = UUID.randomUUID().toString() - then: 'the provided value is returned' - current == 'new-value' + when: + redisCacheProvider.put(k, 'foo') + then: + redisCacheProvider.get(k) == 'foo' - and: 'the value is updated is set in the store' - redisCacheProvider.get('key') == 'new-value' + when: + redisCacheProvider.remove(k) + then: + redisCacheProvider.get(k) == null } - } diff --git a/src/test/groovy/io/seqera/wave/service/job/JobFactoryTest.groovy b/src/test/groovy/io/seqera/wave/service/job/JobFactoryTest.groovy index 2fa094560..633ca6814 100644 --- a/src/test/groovy/io/seqera/wave/service/job/JobFactoryTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/job/JobFactoryTest.groovy @@ -28,6 +28,8 @@ import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.ScanConfig import io.seqera.wave.core.ContainerPlatform import io.seqera.wave.service.builder.BuildRequest +import io.seqera.wave.service.mirror.MirrorConfig +import io.seqera.wave.service.mirror.MirrorRequest import io.seqera.wave.service.scan.ScanRequest /** @@ -100,4 +102,30 @@ class JobFactoryTest extends Specification { job.creationTime == request.creationTime job.workDir == workdir } + + def 'should create mirror job' () { + given: + def workspace = Path.of('/some/work/dir') + def duration = Duration.ofMinutes(1) + def config = new MirrorConfig(maxDuration: duration) + def factory = new JobFactory(mirrorConfig: config) + and: + def request = MirrorRequest.create( + 'source/foo', + 'target/foo', + 'sha256:12345', + Mock(ContainerPlatform), + workspace, + '{config}' ) + + when: + def job = factory.mirror(request) + then: + job.recordId == "target/foo" + job.operationName == /mirror-${request.id.substring(3)}/ + job.type == JobSpec.Type.Mirror + job.maxDuration == duration + job.workDir == workspace.resolve(/mirror-${request.id.substring(3)}/) + job.creationTime == request.creationTime + } } diff --git a/src/test/groovy/io/seqera/wave/service/job/JobSpecTest.groovy b/src/test/groovy/io/seqera/wave/service/job/JobSpecTest.groovy index 507ca9712..a4e657db8 100644 --- a/src/test/groovy/io/seqera/wave/service/job/JobSpecTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/job/JobSpecTest.groovy @@ -93,4 +93,19 @@ class JobSpecTest extends Specification { job.operationName == 'xyz' job.workDir == Path.of('/some/path') } + + def 'should create mirror job' () { + given: + def now = Instant.now() + def job = JobSpec.mirror('12345','xyz', now, Duration.ofMinutes(1), Path.of('/some/path')) + + expect: + job.id + job.recordId == '12345' + job.type == JobSpec.Type.Mirror + job.creationTime == now + job.maxDuration == Duration.ofMinutes(1) + job.operationName == 'xyz' + job.workDir == Path.of('/some/path') + } } diff --git a/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy b/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy index 6760c919e..5ce208064 100644 --- a/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/k8s/K8sServiceImplTest.groovy @@ -42,6 +42,7 @@ import io.micronaut.context.annotation.Replaces import io.micronaut.test.extensions.spock.annotation.MicronautTest import io.seqera.wave.configuration.BlobCacheConfig import io.seqera.wave.configuration.ScanConfig +import io.seqera.wave.service.mirror.MirrorConfig /** * * @author Paolo Di Tommaso @@ -859,6 +860,65 @@ class K8sServiceImplTest extends Specification { ctx.close() } + def 'should create mirror job spec'() { + given: + def PROPS = [ + 'wave.build.workspace': '/build/work', + 'wave.build.k8s.namespace': 'foo', + 'wave.build.k8s.configPath': '/home/kube.config', + 'wave.build.k8s.storage.claimName': 'bar', + 'wave.build.k8s.storage.mountPath': '/build', + 'wave.build.k8s.service-account': 'theAdminAccount', + 'wave.mirror.retry-attempts': 3 + ] + and: + def ctx = ApplicationContext.run(PROPS) + def k8sService = ctx.getBean(K8sServiceImpl) + def name = 'scan-job' + def containerImage = 'scan-image:latest' + def args = ['arg1', 'arg2'] + def workDir = Path.of('/build/work/dir') + def credsFile = Path.of('/build/work/dir/creds/file') + def mirrorConfig = Mock(MirrorConfig) { + getRequestsCpu() >> null + getRequestsMemory() >> null + getRetryAttempts() >> 3 + } + + when: + def job = k8sService.mirrorJobSpec(name, containerImage, args, workDir, credsFile, mirrorConfig) + + then: + job.metadata.name == name + job.metadata.namespace == 'foo' + job.spec.backoffLimit == 3 + job.spec.template.spec.containers[0].image == containerImage + job.spec.template.spec.containers[0].args == args + job.spec.template.spec.containers[0].resources.requests == null + job.spec.template.spec.containers[0].env == [new V1EnvVar().name('REGISTRY_AUTH_FILE').value('/tmp/config.json')] + and: + job.spec.template.spec.containers[0].volumeMounts.size() == 2 + and: + with(job.spec.template.spec.containers[0].volumeMounts[0]) { + mountPath == '/tmp/config.json' + readOnly == true + subPath == 'work/dir/creds/file' + } + and: + with(job.spec.template.spec.containers[0].volumeMounts[1]) { + mountPath == '/build/work/dir' + readOnly == true + subPath == 'work/dir' + } + and: + job.spec.template.spec.volumes.size() == 1 + job.spec.template.spec.volumes[0].persistentVolumeClaim.claimName == 'bar' + job.spec.template.spec.restartPolicy == 'Never' + + cleanup: + ctx.close() + } + def 'should create scan job spec without resource requests'() { given: def PROPS = [ diff --git a/src/test/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceTest.groovy b/src/test/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceTest.groovy new file mode 100644 index 000000000..4664c05aa --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/mirror/ContainerMirrorServiceTest.groovy @@ -0,0 +1,205 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import spock.lang.Requires +import spock.lang.Specification + +import java.nio.file.Files +import java.nio.file.Path +import java.time.Duration +import java.time.Instant +import java.util.concurrent.TimeUnit + +import groovy.util.logging.Slf4j +import io.micronaut.test.extensions.spock.annotation.MicronautTest +import io.seqera.wave.core.ContainerPlatform +import io.seqera.wave.service.inspect.ContainerInspectService +import io.seqera.wave.service.job.JobSpec +import io.seqera.wave.service.job.JobState +import io.seqera.wave.service.persistence.PersistenceService +import io.seqera.wave.tower.PlatformId +import jakarta.inject.Inject +/** + * + * @author Paolo Di Tommaso + */ +@Slf4j +@MicronautTest +class ContainerMirrorServiceTest extends Specification { + + @Inject + ContainerMirrorServiceImpl mirrorService + + @Inject + MirrorStateStore mirrorStateStore + + @Inject + PersistenceService persistenceService + + @Inject + ContainerInspectService dockerAuthService + + @Requires({System.getenv('DOCKER_USER') && System.getenv('DOCKER_PAT')}) + def 'should mirror a container' () { + given: + def source = 'docker.io/hello-world:latest' + def target = 'docker.io/pditommaso/wave-tests' + def folder = Files.createTempDirectory('test') + println "Temp path: $folder" + when: + def creds = dockerAuthService.credentialsConfigJson(null, source, target, Mock(PlatformId)) + def request = MirrorRequest.create( + source, + target, + 'sha256:12345', + ContainerPlatform.DEFAULT, + folder, + creds ) + and: + mirrorService.mirrorImage(request) + then: + mirrorService.awaitCompletion(target) + .get(90, TimeUnit.SECONDS) + .succeeded() + + cleanup: + folder?.deleteDir() + } + + def 'should get mirror result from state store' () { + given: + def request = MirrorRequest.create( + 'source/foo', + 'target/foo', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/some/dir'), + '{config}' ) + and: + def state = MirrorState.from(request) + and: + persistenceService.saveMirrorState(state) + when: + def copy = mirrorService.getMirrorState(request.id) + then: + copy == state + } + + def 'should get mirror result from persistent service' () { + given: + def request = MirrorRequest.create( + 'source/foo', + 'target/foo', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/some/dir'), + '{config}' ) + and: + def state = MirrorState.from(request) + and: + mirrorStateStore.put('target/foo', state) + when: + def copy = mirrorService.getMirrorState(request.id) + then: + copy == state + } + + def 'should update mirror state on job completion' () { + given: + def request = MirrorRequest.create( + 'source/foo', + 'target/foo', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/some/dir'), + '{config}' ) + and: + def state = MirrorState.from(request) + def job = JobSpec.mirror(request.id, 'mirror-123', Instant.now(), Duration.ofMillis(1), Mock(Path)) + when: + mirrorService.onJobCompletion(job, state, new JobState(JobState.Status.SUCCEEDED, 0, 'OK')) + then: + def s1 = mirrorStateStore.get(request.targetImage) + and: + s1.done() + s1.succeeded() + s1.exitCode == 0 + s1.logs == 'OK' + and: + def s2 = persistenceService.loadMirrorState(request.id) + and: + s2 == s1 + } + + def 'should update mirror state on job exception' () { + given: + def request = MirrorRequest.create( + 'source/foo', + 'target/foo', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/some/dir'), + '{config}' ) + and: + def state = MirrorState.from(request) + def job = JobSpec.mirror(request.id, 'mirror-123', Instant.now(), Duration.ofMillis(1), Mock(Path)) + when: + mirrorService.onJobException(job, state, new Exception('Oops something went wrong')) + then: + def s1 = mirrorStateStore.get(request.targetImage) + and: + s1.done() + !s1.succeeded() + s1.exitCode == null + s1.logs == 'Oops something went wrong' + and: + def s2 = persistenceService.loadMirrorState(request.id) + and: + s2 == s1 + } + + def 'should update mirror state on job timeout' () { + given: + def request = MirrorRequest.create( + 'source/foo', + 'target/foo', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/some/dir'), + '{config}' ) + and: + def state = MirrorState.from(request) + def job = JobSpec.mirror(request.id, 'mirror-123', Instant.now(), Duration.ofMillis(1), Mock(Path)) + when: + mirrorService.onJobTimeout(job, state) + then: + def s1 = mirrorStateStore.get(request.targetImage) + and: + s1.done() + !s1.succeeded() + s1.exitCode == null + s1.logs == 'Container mirror timed out' + and: + def s2 = persistenceService.loadMirrorState(request.id) + and: + s2 == s1 + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/mirror/MirrorRequestTest.groovy b/src/test/groovy/io/seqera/wave/service/mirror/MirrorRequestTest.groovy new file mode 100644 index 000000000..be2e54727 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/mirror/MirrorRequestTest.groovy @@ -0,0 +1,56 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import spock.lang.Specification + +import java.nio.file.Path +import java.time.Instant + +import io.seqera.wave.core.ContainerPlatform + +/** + * + * @author Paolo Di Tommaso + */ +class MirrorRequestTest extends Specification { + + def 'should create mirror request' () { + when: + def ts = Instant.now() + def req = MirrorRequest.create( + 'docker.io/foo:latest', + 'quay.io/foo:latest', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/workspace'), + '{json config}') + then: + req.id + req.sourceImage == 'docker.io/foo:latest' + req.targetImage == 'quay.io/foo:latest' + req.digest == 'sha256:12345' + req.platform == ContainerPlatform.DEFAULT + req.workDir == Path.of("/workspace/mirror-${req.id.substring(3)}") + req.authJson == '{json config}' + req.creationTime >= ts + req.creationTime <= Instant.now() + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/mirror/MirrorResultTest.groovy b/src/test/groovy/io/seqera/wave/service/mirror/MirrorResultTest.groovy new file mode 100644 index 000000000..33c8eff16 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/mirror/MirrorResultTest.groovy @@ -0,0 +1,121 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror + +import spock.lang.Specification + +import java.nio.file.Path +import java.time.Instant + +import io.seqera.wave.api.BuildStatusResponse +import io.seqera.wave.core.ContainerPlatform +/** + * + * @author Paolo Di Tommaso + */ +class MirrorResultTest extends Specification { + + def 'should create a result from a request' () { + given: + def request = MirrorRequest.create( + 'source.io/foo', + 'target.io/foo', + 'sha256:12345', + Mock(ContainerPlatform), + Path.of('/workspace'), + '{auth json}' ) + + when: + def result = MirrorState.from(request) + then: + result.mirrorId == request.id + result.digest == request.digest + result.platform == request.platform + result.sourceImage == request.sourceImage + result.targetImage == request.targetImage + result.creationTime == request.creationTime + result.status == MirrorState.Status.PENDING + and: + result.duration == null + result.exitCode == null + result.logs == null + } + + def 'should complete a result result' () { + given: + def request = MirrorRequest.create( + 'source.io/foo', + 'target.io/foo', + 'sha256:12345', + Mock(ContainerPlatform), + Path.of('/workspace'), + '{auth json}' ) + + when: + def m1 = MirrorState.from(request) + then: + m1.status == MirrorState.Status.PENDING + m1.duration == null + m1.exitCode == null + m1.logs == null + + when: + def m2 = m1.complete(0, 'Some logs') + then: + m2.mirrorId == request.id + m2.digest == request.digest + m2.sourceImage == request.sourceImage + m2.targetImage == request.targetImage + m2.creationTime == request.creationTime + m2.platform == request.platform + and: + m2.status == MirrorState.Status.COMPLETED + m2.duration != null + m2.exitCode == 0 + m2.logs == 'Some logs' + } + + def 'should convert to status response' () { + when: + def result1 = new MirrorState('mr-123', 'sha256:12345', 'source/foo', 'target/foo', Mock(ContainerPlatform), Instant.now()) + def resp = result1.toStatusResponse() + then: + resp.id == result1.mirrorId + resp.status == BuildStatusResponse.Status.PENDING + resp.startTime == result1.creationTime + and: + resp.duration == null + resp.succeeded == null + + when: + final result2 = result1.complete(1, 'Some error') + final resp2 = result2.toStatusResponse() + then: + resp2.duration == result2.duration + resp2.succeeded == false + + when: + final result3 = result1.complete(0, 'OK') + final resp3 = result3.toStatusResponse() + then: + resp3.duration == result3.duration + resp3.succeeded == true + + } +} diff --git a/src/test/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategyTest.groovy b/src/test/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategyTest.groovy new file mode 100644 index 000000000..1e0bf8d35 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/mirror/strategy/DockerMirrorStrategyTest.groovy @@ -0,0 +1,53 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror.strategy + +import spock.lang.Specification + +import java.nio.file.Path + +import io.seqera.wave.service.mirror.MirrorConfig + +/** + * + * @author Paolo Di Tommaso + */ +class DockerMirrorStrategyTest extends Specification { + + def 'should build docker command' () { + given: + def config = new MirrorConfig(skopeoImage: 'skopeo:latest') + def strategy = new DockerMirrorStrategy(mirrorConfig: config) + + when: + def result = strategy.mirrorCmd('foo', Path.of('/work/dir'), Path.of('/work/dir/creds.json')) + then: + result == ['docker', + 'run', + '--detach', + '--name', 'foo', + '-v', '/work/dir:/work/dir', + '-v', '/work/dir/creds.json:/tmp/config.json:ro', + '-e', 'REGISTRY_AUTH_FILE=/tmp/config.json', + 'skopeo:latest' + ] + + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategyTest.groovy b/src/test/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategyTest.groovy new file mode 100644 index 000000000..99d8205c5 --- /dev/null +++ b/src/test/groovy/io/seqera/wave/service/mirror/strategy/MirrorStrategyTest.groovy @@ -0,0 +1,62 @@ +/* + * Wave, containers provisioning service + * Copyright (c) 2023-2024, Seqera Labs + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package io.seqera.wave.service.mirror.strategy + +import io.seqera.wave.service.mirror.MirrorRequest + +import spock.lang.Specification +import spock.lang.Unroll + +import java.nio.file.Path + +import io.seqera.wave.core.ContainerPlatform + +/** + * + * @author Paolo Di Tommaso + */ +class MirrorStrategyTest extends Specification { + + @Unroll + def 'should return copy command' () { + given: + def strategy = Spy(MirrorStrategy) + and: + def request = MirrorRequest.create( + 'source.io/foo', + 'target.io/foo', + 'sha256:12345', + PLATFORM ? ContainerPlatform.of(PLATFORM) : null, + Path.of('/workspace'), + '{auth json}') + when: + def cmd = strategy.copyCommand(request) + then: + cmd == EXPECTED.tokenize(' ') + + where: + PLATFORM | EXPECTED + null | "copy --preserve-digests --multi-arch all docker://source.io/foo docker://target.io/foo" + 'linux/amd64' | "--override-os linux --override-arch amd64 copy --preserve-digests --multi-arch system docker://source.io/foo docker://target.io/foo" + 'linux/arm64' | "--override-os linux --override-arch arm64 copy --preserve-digests --multi-arch system docker://source.io/foo docker://target.io/foo" + 'linux/arm64/7'| "--override-os linux --override-arch arm64 --override-variant 7 copy --preserve-digests --multi-arch system docker://source.io/foo docker://target.io/foo" + + } + +} diff --git a/src/test/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceServiceTest.groovy b/src/test/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceServiceTest.groovy index 2b277b59f..50e204e67 100644 --- a/src/test/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceServiceTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/persistence/impl/SurrealPersistenceServiceTest.groovy @@ -28,19 +28,21 @@ import io.micronaut.context.ApplicationContext import io.micronaut.http.HttpRequest import io.micronaut.http.client.HttpClient import io.seqera.wave.api.ContainerConfig -import io.seqera.wave.api.SubmitContainerTokenRequest -import io.seqera.wave.core.ContainerPlatform import io.seqera.wave.api.ContainerLayer +import io.seqera.wave.api.SubmitContainerTokenRequest import io.seqera.wave.core.ContainerDigestPair -import io.seqera.wave.service.builder.BuildFormat -import io.seqera.wave.service.scan.ScanVulnerability +import io.seqera.wave.core.ContainerPlatform import io.seqera.wave.service.ContainerRequestData import io.seqera.wave.service.builder.BuildEvent +import io.seqera.wave.service.builder.BuildFormat import io.seqera.wave.service.builder.BuildRequest import io.seqera.wave.service.builder.BuildResult +import io.seqera.wave.service.mirror.MirrorRequest +import io.seqera.wave.service.mirror.MirrorState import io.seqera.wave.service.persistence.WaveBuildRecord import io.seqera.wave.service.persistence.WaveContainerRecord import io.seqera.wave.service.persistence.WaveScanRecord +import io.seqera.wave.service.scan.ScanVulnerability import io.seqera.wave.test.SurrealDBTestContainer import io.seqera.wave.tower.PlatformId import io.seqera.wave.tower.User @@ -319,4 +321,55 @@ class SurrealPersistenceServiceTest extends Specification implements SurrealDBTe result2 == scanRecord2 } + //== mirror records tests + + void "should save and load a mirror record by id"() { + given: + def storage = applicationContext.getBean(SurrealPersistenceService) + and: + def request = MirrorRequest.create( + 'source.io/foo', + 'target.io/foo', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/workspace'), + '{auth json}' ) + and: + storage.initializeDb() + and: + def result = MirrorState.from(request) + storage.saveMirrorState(result) + sleep 100 + + when: + def stored = storage.loadMirrorState(request.id) + then: + stored == result + } + + void "should save and load a mirror record by target and digest"() { + given: + def storage = applicationContext.getBean(SurrealPersistenceService) + and: + def request = MirrorRequest.create( + 'source.io/foo', + 'target.io/foo', + 'sha256:12345', + ContainerPlatform.DEFAULT, + Path.of('/workspace'), + '{auth json}' ) + and: + storage.initializeDb() + and: + def result = MirrorState.from(request) + storage.saveMirrorState(result) + sleep 100 + + when: + def stored = storage.loadMirrorState(request.targetImage, request.digest) + then: + stored == result + } + + } diff --git a/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy b/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy index 6578aa863..e2e4d7c4d 100644 --- a/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy +++ b/src/test/groovy/io/seqera/wave/service/validation/ValidationServiceTest.groovy @@ -95,4 +95,19 @@ class ValidationServiceTest extends Specification { } + @Unroll + def 'should check registry' () { + expect: + validationService.checkMirrorRegistry(REG)==EXPECTED + + where: + REG | EXPECTED + null | null + 'docker.io' | null + 'docker.io/foo' | 'Mirror registry syntax is invalid - offending value: docker.io/foo' + 'docker://foo.io' | 'Mirror registry should not include any protocol prefix - offending value: docker://foo.io' + 'wave.seqera.io' | 'Mirror registry not allowed - offending value: wave.seqera.io' + 'cr.wave.seqera.io' | 'Mirror registry not allowed - offending value: cr.wave.seqera.io' + } + }