Skip to content

Commit

Permalink
Add support for Container mirroring (#646)
Browse files Browse the repository at this point in the history
This commit adds the support for container mirror functionality. 

Container mirror allows creating a copy of a container repository to a target container registry 
keeping the unchanged the  repository name and the container checksums, and therefore 
creating an identical copy to another registry. 

Container mirror is an extension of the Wave build-on-demand feature.  For example 

```
wave -i docker.io/my/foo:bar --mirror-to quay.io
```

it will create a copy of the specified repository using the target image name `quay.io/my/foo:bar`

Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Sep 22, 2024
1 parent 80a7e81 commit 2f5a1fe
Show file tree
Hide file tree
Showing 50 changed files with 2,632 additions and 144 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dependencies {
compileOnly("io.micronaut:micronaut-http-validation")
implementation("jakarta.persistence:jakarta.persistence-api:3.0.0")
api 'io.seqera:lib-mail:1.0.0'
api 'io.seqera:wave-api:0.10.0'
api 'io.seqera:wave-api:0.12.0'
api 'io.seqera:wave-utils:0.13.1'

implementation("io.micronaut:micronaut-http-client")
Expand Down
31 changes: 26 additions & 5 deletions src/main/groovy/io/seqera/wave/controller/BuildController.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ import io.micronaut.http.server.types.files.StreamedFile
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.seqera.wave.api.BuildStatusResponse
import io.seqera.wave.exception.BadRequestException
import io.seqera.wave.service.builder.ContainerBuildService
import io.seqera.wave.service.logs.BuildLogService
import io.seqera.wave.service.mirror.ContainerMirrorService
import io.seqera.wave.service.mirror.MirrorRequest
import io.seqera.wave.service.persistence.WaveBuildRecord
import jakarta.inject.Inject
/**
Expand All @@ -48,12 +51,15 @@ class BuildController {
@Inject
private ContainerBuildService buildService

@Inject
private ContainerMirrorService mirrorService

@Inject
@Nullable
BuildLogService logService

@Get("/v1alpha1/builds/{buildId}")
HttpResponse<WaveBuildRecord> getBuildRecord(String buildId){
HttpResponse<WaveBuildRecord> getBuildRecord(String buildId) {
final record = buildService.getBuildRecord(buildId)
return record
? HttpResponse.ok(record)
Expand All @@ -72,11 +78,26 @@ class BuildController {
}

@Get("/v1alpha1/builds/{buildId}/status")
HttpResponse<BuildStatusResponse> getBuildStatus(String buildId){
final build = buildService.getBuildRecord(buildId)
build != null
? HttpResponse.ok(build.toStatusResponse())
HttpResponse<BuildStatusResponse> getBuildStatus(String buildId) {
final resp = buildResponse0(buildId)
resp != null
? HttpResponse.ok(resp)
: HttpResponse.<BuildStatusResponse>notFound()
}

protected BuildStatusResponse buildResponse0(String buildId) {
if( !buildId )
throw new BadRequestException("Missing 'buildId' parameter")
// build IDs starting with the `mr-` prefix are interpreted as mirror requests
if( buildId.startsWith(MirrorRequest.ID_PREFIX) ) {
return mirrorService
.getMirrorState(buildId)
?.toStatusResponse()
}
else {
return buildService
.getBuildRecord(buildId)
?.toStatusResponse()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ import io.seqera.wave.service.builder.ContainerBuildService
import io.seqera.wave.service.builder.FreezeService
import io.seqera.wave.service.inclusion.ContainerInclusionService
import io.seqera.wave.service.inspect.ContainerInspectService
import io.seqera.wave.service.mirror.ContainerMirrorService
import io.seqera.wave.service.mirror.MirrorRequest
import io.seqera.wave.service.pairing.PairingService
import io.seqera.wave.service.pairing.socket.PairingChannel
import io.seqera.wave.service.persistence.PersistenceService
Expand Down Expand Up @@ -125,7 +127,7 @@ class ContainerController {
ContainerBuildService buildService

@Inject
ContainerInspectService dockerAuthService
ContainerInspectService inspectService

@Inject
RegistryProxyService registryProxyService
Expand All @@ -152,6 +154,9 @@ class ContainerController {
@Nullable
RateLimiterService rateLimiterService

@Inject
private ContainerMirrorService mirrorService

@PostConstruct
private void init() {
log.info "Wave server url: $serverUrl; allowAnonymous: $allowAnonymous; tower-endpoint-url: $towerEndpointUrl; default-build-repo: $buildConfig.defaultBuildRepository; default-cache-repo: $buildConfig.defaultCacheRepository; default-public-repo: $buildConfig.defaultPublicRepository"
Expand All @@ -176,6 +181,7 @@ class ContainerController {

// validate request
validateContainerRequest(req)
validateMirrorRequest(req, v2)

// this is needed for backward compatibility with old clients
if( !req.towerEndpoint ) {
Expand Down Expand Up @@ -325,7 +331,7 @@ class ContainerController {
? buildConfig.defaultPublicRepository
: buildConfig.defaultBuildRepository), req.nameStrategy)
final cacheRepository = req.cacheRepository ?: buildConfig.defaultCacheRepository
final configJson = dockerAuthService.credentialsConfigJson(containerSpec, buildRepository, cacheRepository, identity)
final configJson = inspectService.credentialsConfigJson(containerSpec, buildRepository, cacheRepository, identity)
final containerConfig = req.freeze ? req.containerConfig : null
final offset = DataTimeUtils.offsetId(req.timestamp)
final scanId = scanEnabled && format==DOCKER ? LongRndKey.rndHex() : null
Expand Down Expand Up @@ -414,6 +420,15 @@ class ContainerController {
buildId = track.id
buildNew = !track.cached
}
else if( req.mirrorRegistry ) {
final mirror = makeMirrorRequest(req, identity)
final track = checkMirror(mirror, identity, req.dryRun)
targetImage = track.targetImage
targetContent = null
condaContent = null
buildId = track.id
buildNew = !track.cached
}
else if( req.containerImage ) {
// normalize container image
final coords = ContainerCoordinates.parse(req.containerImage)
Expand All @@ -435,7 +450,51 @@ class ContainerController {
ContainerPlatform.of(req.containerPlatform),
buildId,
buildNew,
req.freeze )
req.freeze,
req.mirrorRegistry!=null
)
}

protected MirrorRequest makeMirrorRequest(SubmitContainerTokenRequest request, PlatformId identity) {
final coords = ContainerCoordinates.parse(request.containerImage)
if( coords.registry == request.mirrorRegistry )
throw new BadRequestException("Source and target mirror registry as the same - offending value '${request.mirrorRegistry}'")
final targetImage = request.mirrorRegistry + '/' + coords.imageAndTag
final configJson = inspectService.credentialsConfigJson(null, request.containerImage, targetImage, identity)
final platform = request.containerPlatform
? ContainerPlatform.of(request.containerPlatform)
: ContainerPlatform.DEFAULT
final digest = registryProxyService.getImageDigest(request.containerImage, identity)
if( !digest )
throw new BadRequestException("Container image '$request.containerImage' does not exist")
return MirrorRequest.create(
request.containerImage,
targetImage,
digest,
platform,
Path.of(buildConfig.buildWorkspace).toAbsolutePath(),
configJson )
}

protected BuildTrack checkMirror(MirrorRequest request, PlatformId identity, boolean dryRun) {
final targetDigest = registryProxyService.getImageDigest(request.targetImage, identity)
log.debug "== Mirror target digest: $targetDigest"
final cached = request.digest==targetDigest
// check for dry-run execution
if( dryRun ) {
log.debug "== Dry-run request request: $request"
final dryId = request.id + BuildRequest.SEP + '0'
return new BuildTrack(dryId, request.targetImage, cached)
}
// check for existing image
if( request.digest==targetDigest ) {
log.debug "== Found cached request for request: $request"
final cache = persistenceService.loadMirrorState(request.targetImage, targetDigest)
return new BuildTrack(cache?.mirrorId, request.targetImage, true)
}
else {
return mirrorService.mirrorImage(request)
}
}

protected String targetImage(String token, ContainerCoordinates container) {
Expand All @@ -461,7 +520,7 @@ class ContainerController {
return HttpResponse.ok()
}

void validateContainerRequest(SubmitContainerTokenRequest req) throws BadRequestException{
void validateContainerRequest(SubmitContainerTokenRequest req) throws BadRequestException {
String msg
// check valid image name
msg = validationService.checkContainerName(req.containerImage)
Expand All @@ -474,6 +533,32 @@ class ContainerController {
if( msg ) throw new BadRequestException(msg)
}

void validateMirrorRequest(SubmitContainerTokenRequest req, boolean v2) throws BadRequestException {
if( !req.mirrorRegistry )
return
// container mirror validation
if( !v2 )
throw new BadRequestException("Container mirroring requires the use of v2 API")
if( !req.containerImage )
throw new BadRequestException("Attribute `containerImage` is required when specifying `mirrorRegistry`")
if( !req.towerAccessToken )
throw new BadRequestException("Container mirroring requires an authenticated request - specify the tower token attribute")
if( req.freeze )
throw new BadRequestException("Attribute `mirrorRegistry` and `freeze` conflict each other")
if( req.containerFile )
throw new BadRequestException("Attribute `mirrorRegistry` and `containerFile` conflict each other")
if( req.containerIncludes )
throw new BadRequestException("Attribute `mirrorRegistry` and `containerIncludes` conflict each other")
if( req.containerConfig )
throw new BadRequestException("Attribute `mirrorRegistry` and `containerConfig` conflict each other")
final coords = ContainerCoordinates.parse(req.containerImage)
if( coords.registry == req.mirrorRegistry )
throw new BadRequestException("Source and target mirror registry as the same - offending value '${req.mirrorRegistry}'")
def msg = validationService.checkMirrorRegistry(req.mirrorRegistry)
if( msg )
throw new BadRequestException(msg)
}

@Error(exception = AuthorizationException.class)
HttpResponse<?> handleAuthorizationException() {
return HttpResponse.unauthorized()
Expand Down
53 changes: 53 additions & 0 deletions src/main/groovy/io/seqera/wave/controller/MirrorController.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.controller

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.http.HttpResponse
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.scheduling.TaskExecutors
import io.micronaut.scheduling.annotation.ExecuteOn
import io.seqera.wave.service.mirror.ContainerMirrorService
import io.seqera.wave.service.mirror.MirrorState
import jakarta.inject.Inject
/**
* Implements a controller for container mirror apis
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@CompileStatic
@Controller("/")
@ExecuteOn(TaskExecutors.IO)
class MirrorController {

@Inject
private ContainerMirrorService mirrorService

@Get("/v1alpha1/mirrors/{mirrorId}")
HttpResponse<MirrorState> getMirrorRecord(String mirrorId) {
final result = mirrorService.getMirrorState(mirrorId)
return result
? HttpResponse.ok(result)
: HttpResponse.<MirrorState>notFound()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,15 @@ class RegistryProxyService {
}

String getImageDigest(BuildRequest request, boolean retryOnNotFound=false) {
return getImageDigest(request.targetImage, request.identity, retryOnNotFound)
}

String getImageDigest(String containerImage, PlatformId identity, boolean retryOnNotFound=false) {
try {
return getImageDigest0(request.targetImage, request.identity, retryOnNotFound)
return getImageDigest0(containerImage, identity, retryOnNotFound)
}
catch(Exception e) {
log.warn "Unable to retrieve digest for image '${request.targetImage}' -- cause: ${e.message}"
log.warn "Unable to retrieve digest for image '${containerImage}' -- cause: ${e.message}"
return null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class ContainerRequestData {
final String buildId
final Boolean buildNew
final Boolean freeze
final Boolean mirror

boolean durable() {
return freeze || mirror
}

PlatformId getIdentity() {
return identity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package io.seqera.wave.service.cache

import java.time.Duration

import groovy.transform.CompileStatic
import io.seqera.wave.encoder.EncodingStrategy
import io.seqera.wave.service.cache.impl.CacheProvider

Expand All @@ -28,6 +29,7 @@ import io.seqera.wave.service.cache.impl.CacheProvider
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
abstract class AbstractCacheStore<V> implements CacheStore<String,V> {

private EncodingStrategy<V> encodingStrategy
Expand All @@ -45,6 +47,10 @@ abstract class AbstractCacheStore<V> implements CacheStore<String,V> {

protected String key0(String k) { return getPrefix() + k }

protected String recordId0(String recordId) {
return getPrefix() + 'state-id/' + recordId
}

protected V deserialize(String encoded) {
return encodingStrategy.decode(encoded)
}
Expand All @@ -63,32 +69,34 @@ abstract class AbstractCacheStore<V> implements CacheStore<String,V> {
return result ? deserialize(result) : null
}

V getByRecordId(String recordId) {
final key = delegate.get(recordId0(recordId))
return get(key)
}

void put(String key, V value) {
delegate.put(key0(key), serialize(value), getDuration())
put(key, value, getDuration())
}

@Override
void put(String key, V value, Duration ttl) {
delegate.put(key0(key), serialize(value), ttl)
if( value instanceof StateRecord ) {
delegate.put(recordId0(value.getRecordId()), key, ttl)
}
}

@Override
boolean putIfAbsent(String key, V value, Duration ttl) {
delegate.putIfAbsent(key0(key), serialize(value), ttl)
final result = delegate.putIfAbsent(key0(key), serialize(value), ttl)
if( result && value instanceof StateRecord ) {
delegate.put(recordId0(value.getRecordId()), key, ttl)
}
return result
}

boolean putIfAbsent(String key, V value) {
delegate.putIfAbsent(key0(key), serialize(value), getDuration())
}

@Override
V putIfAbsentAndGetCurrent(String key, V value, Duration ttl) {
final result = delegate.putIfAbsentAndGetCurrent(key0(key), serialize(value), ttl)
return result? deserialize(result) : null
}

V putIfAbsentAndGetCurrent(String key, V value) {
return putIfAbsentAndGetCurrent(key, value, getDuration())
return putIfAbsent(key, value, getDuration())
}

@Override
Expand Down
Loading

0 comments on commit 2f5a1fe

Please sign in to comment.