diff --git a/storage/storage_dest.go b/storage/storage_dest.go index 34e17e5808..ef0784160c 100644 --- a/storage/storage_dest.go +++ b/storage/storage_dest.go @@ -63,10 +63,6 @@ type storageImageDestination struct { signatureses map[digest.Digest][]byte // Instance signature contents, temporary metadata storageImageMetadata // Metadata contents being built - // A storage destination may be used concurrently. Accesses are - // serialized via a mutex. Please refer to the individual comments - // below for details. - lock sync.Mutex // Mapping from layer (by index) to the associated ID in the storage. // It's protected *implicitly* since `commitLayer()`, at any given // time, can only be executed by *one* goroutine. Please refer to @@ -74,11 +70,16 @@ type storageImageDestination struct { // guarantee is implemented. indexToStorageID map[int]string - // LOCKING: - // All accesses to below data are, during the concurrent TryReusingBlob/PutBlob/* calls - // (but not necessarily during the final Commit) protected by `lock` which is made - // *explicit* in the code. - // + // A storage destination may be used concurrently, due to HasThreadSafePutBlob. + lock sync.Mutex // Protects lockProtected + lockProtected storageImageDestinationLockProtected +} + +// storageImageDestinationLockProtected contains storageImageDestination data which might be +// accessed concurrently, due to HasThreadSafePutBlob. +// _During the concurrent TryReusingBlob/PutBlob/* calls_ (but not necessarily during the final Commit) +// uses must hold storageImageDestination.lock. +type storageImageDestinationLockProtected struct { // In general, a layer is identified either by (compressed) digest, or by TOC digest. // When creating a layer, the c/storage layer metadata and image IDs must _only_ be based on trusted values // we have computed ourselves. (Layer reuse can then look up against such trusted values, but it might not @@ -128,21 +129,23 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (* HasThreadSafePutBlob: true, }), - imageRef: imageRef, - directory: directory, - signatureses: make(map[digest.Digest][]byte), - blobDiffIDs: make(map[digest.Digest]digest.Digest), - indexToTOCDigest: make(map[int]digest.Digest), - blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), + imageRef: imageRef, + directory: directory, + signatureses: make(map[digest.Digest][]byte), metadata: storageImageMetadata{ SignatureSizes: []int{}, SignaturesSizes: make(map[digest.Digest][]int), }, - indexToStorageID: make(map[int]string), - indexToAddedLayerInfo: make(map[int]addedLayerInfo), - diffOutputs: make(map[int]*graphdriver.DriverWithDifferOutput), + indexToStorageID: make(map[int]string), + lockProtected: storageImageDestinationLockProtected{ + blobDiffIDs: make(map[digest.Digest]digest.Digest), + indexToTOCDigest: make(map[int]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToAddedLayerInfo: make(map[int]addedLayerInfo), + blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), + diffOutputs: make(map[int]*graphdriver.DriverWithDifferOutput), + }, } dest.Compat = impl.AddCompat(dest) return dest, nil @@ -156,10 +159,11 @@ func (s *storageImageDestination) Reference() types.ImageReference { // Close cleans up the temporary directory and additional layer store handlers. func (s *storageImageDestination) Close() error { - for _, al := range s.blobAdditionalLayer { + // This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock. + for _, al := range s.lockProtected.blobAdditionalLayer { al.Release() } - for _, v := range s.diffOutputs { + for _, v := range s.lockProtected.diffOutputs { if v.Target != "" { _ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target) } @@ -241,9 +245,9 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf // Record information about the blob. s.lock.Lock() - s.blobDiffIDs[blobDigest] = diffID.Digest() - s.fileSizes[blobDigest] = counter.Count - s.filenames[blobDigest] = filename + s.lockProtected.blobDiffIDs[blobDigest] = diffID.Digest() + s.lockProtected.fileSizes[blobDigest] = counter.Count + s.lockProtected.filenames[blobDigest] = filename s.lock.Unlock() // This is safe because we have just computed diffID, and blobDigest was either computed // by us, or validated by the caller (usually copy.digestingReader). @@ -307,17 +311,17 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces blobDigest := srcInfo.Digest s.lock.Lock() - s.diffOutputs[options.LayerIndex] = out + s.lockProtected.diffOutputs[options.LayerIndex] = out if out.UncompressedDigest != "" { // The computation of UncompressedDigest means the whole layer has been consumed; while doing that, chunked.GetDiffer is // responsible for ensuring blobDigest has been validated. - s.blobDiffIDs[blobDigest] = out.UncompressedDigest + s.lockProtected.blobDiffIDs[blobDigest] = out.UncompressedDigest } else { // Don’t identify layers by TOC if UncompressedDigest is available. // - Using UncompressedDigest allows image reuse with non-partially-pulled layers // - If UncompressedDigest has been computed, that means the layer was read completely, and the TOC has been created from scratch. // That TOC is quite unlikely to match with any other TOC value. - s.indexToTOCDigest[options.LayerIndex] = out.TOCDigest + s.lockProtected.indexToTOCDigest[options.LayerIndex] = out.TOCDigest } s.lock.Unlock() @@ -360,8 +364,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, blobDigest, err) } else if err == nil { - s.blobDiffIDs[blobDigest] = aLayer.UncompressedDigest() - s.blobAdditionalLayer[blobDigest] = aLayer + s.lockProtected.blobDiffIDs[blobDigest] = aLayer.UncompressedDigest() + s.lockProtected.blobAdditionalLayer[blobDigest] = aLayer return true, private.ReusedBlob{ Digest: blobDigest, Size: aLayer.CompressedSize(), @@ -384,7 +388,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige // Check if we have a wasn't-compressed layer in storage that's based on that blob. // Check if we've already cached it in a file. - if size, ok := s.fileSizes[blobDigest]; ok { + if size, ok := s.lockProtected.fileSizes[blobDigest]; ok { // FIXME: What ensures layer identification? return true, private.ReusedBlob{ Digest: blobDigest, @@ -397,7 +401,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with digest %q: %w`, blobDigest, err) } if len(layers) > 0 { - s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest + s.lockProtected.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: blobDigest, Size: layers[0].UncompressedSize, @@ -410,7 +414,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q: %w`, blobDigest, err) } if len(layers) > 0 { - s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest + s.lockProtected.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: blobDigest, Size: layers[0].CompressedSize, @@ -428,7 +432,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige } if len(layers) > 0 { if size != -1 { - s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest + s.lockProtected.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: blobDigest, Size: size, @@ -437,7 +441,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige if !options.CanSubstitute { return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", blobDigest) } - s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest + s.lockProtected.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -454,7 +458,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with TOC digest %q: %w`, options.TOCDigest, err) } if len(layers) > 0 { - s.indexToTOCDigest[*options.LayerIndex] = options.TOCDigest + s.lockProtected.indexToTOCDigest[*options.LayerIndex] = options.TOCDigest return true, private.ReusedBlob{ Digest: blobDigest, Size: layers[0].UncompressedSize, @@ -472,6 +476,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige // that since we don't have a recommendation, a random ID should be used if one needs // to be allocated. func (s *storageImageDestination) computeID(m manifest.Manifest) string { + // This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock. + // Build the diffID list. We need the decompressed sums that we've been calculating to // fill in the DiffIDs. It's expected (but not enforced by us) that the number of // diffIDs corresponds to the number of non-EmptyLayer entries in the history. @@ -485,7 +491,7 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string { continue } blobSum := m.FSLayers[i].BlobSum - diffID, ok := s.blobDiffIDs[blobSum] + diffID, ok := s.lockProtected.blobDiffIDs[blobSum] if !ok { // this can, in principle, legitimately happen when a layer is reused by TOC. logrus.Infof("error looking up diffID for layer %q", blobSum.String()) @@ -521,8 +527,8 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string { tocIDInput := "" hasLayerPulledByTOC := false for i := range m.LayerInfos() { - layerValue := "" // An empty string is not a valid digest, so this is unambiguous with the TOC case. - tocDigest, ok := s.indexToTOCDigest[i] // "" if not a TOC + layerValue := "" // An empty string is not a valid digest, so this is unambiguous with the TOC case. + tocDigest, ok := s.lockProtected.indexToTOCDigest[i] // "" if not a TOC if ok { hasLayerPulledByTOC = true layerValue = tocDigest.String() @@ -550,7 +556,7 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, fmt.Errorf("invalid digest supplied when reading blob: %w", err) } // Assume it's a file, since we're only calling this from a place that expects to read files. - if filename, ok := s.filenames[info.Digest]; ok { + if filename, ok := s.lockProtected.filenames[info.Digest]; ok { contents, err2 := os.ReadFile(filename) if err2 != nil { return nil, fmt.Errorf(`reading blob from file %q: %w`, filename, err2) @@ -584,17 +590,17 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) // caller is the "worker" routine committing layers. All other routines // can continue pulling and queuing in layers. s.lock.Lock() - s.indexToAddedLayerInfo[index] = info + s.lockProtected.indexToAddedLayerInfo[index] = info // We're still waiting for at least one previous/parent layer to be // committed, so there's nothing to do. - if index != s.currentIndex { + if index != s.lockProtected.currentIndex { s.lock.Unlock() return nil } for { - info, ok := s.indexToAddedLayerInfo[index] + info, ok := s.lockProtected.indexToAddedLayerInfo[index] if !ok { break } @@ -609,7 +615,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) // Set the index at the very end to make sure that only one routine // enters stage 2). - s.currentIndex = index + s.lockProtected.currentIndex = index s.lock.Unlock() return nil } @@ -619,11 +625,11 @@ func (s *storageImageDestination) singleLayerIDComponent(layerIndex int, blobSum s.lock.Lock() defer s.lock.Unlock() - if d, found := s.indexToTOCDigest[layerIndex]; found { + if d, found := s.lockProtected.indexToTOCDigest[layerIndex]; found { return d.Hex() + "-toc", found // FIXME: If this is the first layer, this is not a valid ID. } - d, found := s.blobDiffIDs[blobSum] + d, found := s.lockProtected.blobDiffIDs[blobSum] return d.Hex(), found } @@ -721,7 +727,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // If the layer cannot be committed yet, the function returns (nil, nil). func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.Digest, parentLayer, newLayerID string) (*storage.Layer, error) { s.lock.Lock() - diffOutput, ok := s.diffOutputs[index] + diffOutput, ok := s.lockProtected.diffOutputs[index] s.lock.Unlock() if ok { var untrustedUncompressedDigest digest.Digest @@ -773,7 +779,7 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D } s.lock.Lock() - diffID, ok := s.blobDiffIDs[layerDigest] + diffID, ok := s.lockProtected.blobDiffIDs[layerDigest] s.lock.Unlock() if !ok { @@ -781,7 +787,7 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D } s.lock.Lock() - al, ok := s.blobAdditionalLayer[layerDigest] + al, ok := s.lockProtected.blobAdditionalLayer[layerDigest] s.lock.Unlock() if ok { layer, err := al.PutAs(newLayerID, parentLayer, nil) @@ -794,7 +800,7 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D // Check if we previously cached a file with that blob's contents. If we didn't, // then we need to read the desired contents from a layer. s.lock.Lock() - filename, ok := s.filenames[layerDigest] + filename, ok := s.lockProtected.filenames[layerDigest] s.lock.Unlock() if !ok { // Try to find the layer with contents matching that blobsum. @@ -842,7 +848,7 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D // Make sure that we can find this file later, should we need the layer's // contents again. s.lock.Lock() - s.filenames[layerDigest] = filename + s.lockProtected.filenames[layerDigest] = filename s.lock.Unlock() } // Read the cached blob and use it as a diff. @@ -871,6 +877,8 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D // - Uploaded data MAY be visible to others before Commit() is called // - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error { + // This function is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock. + if len(s.manifest) == 0 { return errors.New("Internal error: storageImageDestination.Commit() called without PutManifest()") } @@ -936,14 +944,14 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t // Set up to save the non-layer blobs as data items. Since we only share layers, they should all be in files, so // we just need to screen out the ones that are actually layers to get the list of non-layers. dataBlobs := set.New[digest.Digest]() - for blob := range s.filenames { + for blob := range s.lockProtected.filenames { dataBlobs.Add(blob) } for _, layerBlob := range layerBlobs { dataBlobs.Delete(layerBlob.Digest) } for _, blob := range dataBlobs.Values() { - v, err := os.ReadFile(s.filenames[blob]) + v, err := os.ReadFile(s.lockProtected.filenames[blob]) if err != nil { return fmt.Errorf("copying non-layer blob %q to image: %w", blob, err) }