Skip to content

Commit

Permalink
Introduce storageImageDestinationLockProtected
Browse files Browse the repository at this point in the history
We will want to add more comments to groups of the
various fileds, which will make it easier to overlook the comment
about locking.

So, use a nested struct to be much more visible.  Also document the
HasThreadSafePutBlob rationale and scope more explicitly.

Should not change behavior.

Signed-off-by: Miloslav Trmač <[email protected]>
  • Loading branch information
mtrmac committed Feb 9, 2024
1 parent 5482742 commit fa76546
Showing 1 changed file with 61 additions and 53 deletions.
114 changes: 61 additions & 53 deletions storage/storage_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,23 @@ type storageImageDestination struct {
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
metadata storageImageMetadata // Metadata contents being built

// A storage destination may be used concurrently. Accesses are
// serialized via a mutex. Please refer to the individual comments
// below for details.
lock sync.Mutex
// Mapping from layer (by index) to the associated ID in the storage.
// It's protected *implicitly* since `commitLayer()`, at any given
// time, can only be executed by *one* goroutine. Please refer to
// `queueOrCommit()` for further details on how the single-caller
// guarantee is implemented.
indexToStorageID map[int]string

// LOCKING:
// All accesses to below data are, during the concurrent TryReusingBlob/PutBlob/* calls
// (but not necessarily during the final Commit) protected by `lock` which is made
// *explicit* in the code.
//
// A storage destination may be used concurrently, due to HasThreadSafePutBlob.
lock sync.Mutex // Protects lockProtected
lockProtected storageImageDestinationLockProtected
}

// storageImageDestinationLockProtected contains storageImageDestination data which might be
// accessed concurrently, due to HasThreadSafePutBlob.
// _During the concurrent TryReusingBlob/PutBlob/* calls_ (but not necessarily during the final Commit)
// uses must hold storageImageDestination.lock.
type storageImageDestinationLockProtected struct {
// In general, a layer is identified either by (compressed) digest, or by TOC digest.
// When creating a layer, the c/storage layer metadata and image IDs must _only_ be based on trusted values
// we have computed ourselves. (Layer reuse can then look up against such trusted values, but it might not
Expand Down Expand Up @@ -128,21 +129,23 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
HasThreadSafePutBlob: true,
}),

imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
blobDiffIDs: make(map[digest.Digest]digest.Digest),
indexToTOCDigest: make(map[int]digest.Digest),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
metadata: storageImageMetadata{
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
},
indexToStorageID: make(map[int]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[int]*graphdriver.DriverWithDifferOutput),
indexToStorageID: make(map[int]string),
lockProtected: storageImageDestinationLockProtected{
blobDiffIDs: make(map[digest.Digest]digest.Digest),
indexToTOCDigest: make(map[int]digest.Digest),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
diffOutputs: make(map[int]*graphdriver.DriverWithDifferOutput),
},
}
dest.Compat = impl.AddCompat(dest)
return dest, nil
Expand All @@ -156,10 +159,11 @@ func (s *storageImageDestination) Reference() types.ImageReference {

// Close cleans up the temporary directory and additional layer store handlers.
func (s *storageImageDestination) Close() error {
for _, al := range s.blobAdditionalLayer {
// This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.
for _, al := range s.lockProtected.blobAdditionalLayer {
al.Release()
}
for _, v := range s.diffOutputs {
for _, v := range s.lockProtected.diffOutputs {
if v.Target != "" {
_ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target)
}
Expand Down Expand Up @@ -241,9 +245,9 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf

// Record information about the blob.
s.lock.Lock()
s.blobDiffIDs[blobDigest] = diffID.Digest()
s.fileSizes[blobDigest] = counter.Count
s.filenames[blobDigest] = filename
s.lockProtected.blobDiffIDs[blobDigest] = diffID.Digest()
s.lockProtected.fileSizes[blobDigest] = counter.Count
s.lockProtected.filenames[blobDigest] = filename
s.lock.Unlock()
// This is safe because we have just computed diffID, and blobDigest was either computed
// by us, or validated by the caller (usually copy.digestingReader).
Expand Down Expand Up @@ -307,17 +311,17 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
blobDigest := srcInfo.Digest

s.lock.Lock()
s.diffOutputs[options.LayerIndex] = out
s.lockProtected.diffOutputs[options.LayerIndex] = out
if out.UncompressedDigest != "" {
// The computation of UncompressedDigest means the whole layer has been consumed; while doing that, chunked.GetDiffer is
// responsible for ensuring blobDigest has been validated.
s.blobDiffIDs[blobDigest] = out.UncompressedDigest
s.lockProtected.blobDiffIDs[blobDigest] = out.UncompressedDigest
} else {
// Don’t identify layers by TOC if UncompressedDigest is available.
// - Using UncompressedDigest allows image reuse with non-partially-pulled layers
// - If UncompressedDigest has been computed, that means the layer was read completely, and the TOC has been created from scratch.
// That TOC is quite unlikely to match with any other TOC value.
s.indexToTOCDigest[options.LayerIndex] = out.TOCDigest
s.lockProtected.indexToTOCDigest[options.LayerIndex] = out.TOCDigest
}
s.lock.Unlock()

Expand Down Expand Up @@ -360,8 +364,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
if err != nil && !errors.Is(err, storage.ErrLayerUnknown) {
return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, blobDigest, err)
} else if err == nil {
s.blobDiffIDs[blobDigest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[blobDigest] = aLayer
s.lockProtected.blobDiffIDs[blobDigest] = aLayer.UncompressedDigest()
s.lockProtected.blobAdditionalLayer[blobDigest] = aLayer
return true, private.ReusedBlob{
Digest: blobDigest,
Size: aLayer.CompressedSize(),
Expand All @@ -384,7 +388,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
// Check if we have a wasn't-compressed layer in storage that's based on that blob.

// Check if we've already cached it in a file.
if size, ok := s.fileSizes[blobDigest]; ok {
if size, ok := s.lockProtected.fileSizes[blobDigest]; ok {
// FIXME: What ensures layer identification?
return true, private.ReusedBlob{
Digest: blobDigest,
Expand All @@ -397,7 +401,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with digest %q: %w`, blobDigest, err)
}
if len(layers) > 0 {
s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest
s.lockProtected.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: blobDigest,
Size: layers[0].UncompressedSize,
Expand All @@ -410,7 +414,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q: %w`, blobDigest, err)
}
if len(layers) > 0 {
s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest
s.lockProtected.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: blobDigest,
Size: layers[0].CompressedSize,
Expand All @@ -428,7 +432,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
}
if len(layers) > 0 {
if size != -1 {
s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest
s.lockProtected.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: blobDigest,
Size: size,
Expand All @@ -437,7 +441,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
if !options.CanSubstitute {
return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", blobDigest)
}
s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest
s.lockProtected.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: uncompressedDigest,
Size: layers[0].UncompressedSize,
Expand All @@ -454,7 +458,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with TOC digest %q: %w`, options.TOCDigest, err)
}
if len(layers) > 0 {
s.indexToTOCDigest[*options.LayerIndex] = options.TOCDigest
s.lockProtected.indexToTOCDigest[*options.LayerIndex] = options.TOCDigest
return true, private.ReusedBlob{
Digest: blobDigest,
Size: layers[0].UncompressedSize,
Expand All @@ -472,6 +476,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Dige
// that since we don't have a recommendation, a random ID should be used if one needs
// to be allocated.
func (s *storageImageDestination) computeID(m manifest.Manifest) string {
// This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.

// Build the diffID list. We need the decompressed sums that we've been calculating to
// fill in the DiffIDs. It's expected (but not enforced by us) that the number of
// diffIDs corresponds to the number of non-EmptyLayer entries in the history.
Expand All @@ -485,7 +491,7 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string {
continue
}
blobSum := m.FSLayers[i].BlobSum
diffID, ok := s.blobDiffIDs[blobSum]
diffID, ok := s.lockProtected.blobDiffIDs[blobSum]
if !ok {
// this can, in principle, legitimately happen when a layer is reused by TOC.
logrus.Infof("error looking up diffID for layer %q", blobSum.String())
Expand Down Expand Up @@ -521,8 +527,8 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string {
tocIDInput := ""
hasLayerPulledByTOC := false
for i := range m.LayerInfos() {
layerValue := "" // An empty string is not a valid digest, so this is unambiguous with the TOC case.
tocDigest, ok := s.indexToTOCDigest[i] // "" if not a TOC
layerValue := "" // An empty string is not a valid digest, so this is unambiguous with the TOC case.
tocDigest, ok := s.lockProtected.indexToTOCDigest[i] // "" if not a TOC
if ok {
hasLayerPulledByTOC = true
layerValue = tocDigest.String()
Expand Down Expand Up @@ -550,7 +556,7 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er
return nil, fmt.Errorf("invalid digest supplied when reading blob: %w", err)
}
// Assume it's a file, since we're only calling this from a place that expects to read files.
if filename, ok := s.filenames[info.Digest]; ok {
if filename, ok := s.lockProtected.filenames[info.Digest]; ok {
contents, err2 := os.ReadFile(filename)
if err2 != nil {
return nil, fmt.Errorf(`reading blob from file %q: %w`, filename, err2)
Expand Down Expand Up @@ -584,17 +590,17 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)
// caller is the "worker" routine committing layers. All other routines
// can continue pulling and queuing in layers.
s.lock.Lock()
s.indexToAddedLayerInfo[index] = info
s.lockProtected.indexToAddedLayerInfo[index] = info

// We're still waiting for at least one previous/parent layer to be
// committed, so there's nothing to do.
if index != s.currentIndex {
if index != s.lockProtected.currentIndex {
s.lock.Unlock()
return nil
}

for {
info, ok := s.indexToAddedLayerInfo[index]
info, ok := s.lockProtected.indexToAddedLayerInfo[index]
if !ok {
break
}
Expand All @@ -609,7 +615,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)

// Set the index at the very end to make sure that only one routine
// enters stage 2).
s.currentIndex = index
s.lockProtected.currentIndex = index
s.lock.Unlock()
return nil
}
Expand All @@ -619,11 +625,11 @@ func (s *storageImageDestination) singleLayerIDComponent(layerIndex int, blobSum
s.lock.Lock()
defer s.lock.Unlock()

if d, found := s.indexToTOCDigest[layerIndex]; found {
if d, found := s.lockProtected.indexToTOCDigest[layerIndex]; found {
return d.Hex() + "-toc", found // FIXME: If this is the first layer, this is not a valid ID.
}

d, found := s.blobDiffIDs[blobSum]
d, found := s.lockProtected.blobDiffIDs[blobSum]
return d.Hex(), found
}

Expand Down Expand Up @@ -721,7 +727,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// If the layer cannot be committed yet, the function returns (nil, nil).
func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.Digest, parentLayer, newLayerID string) (*storage.Layer, error) {
s.lock.Lock()
diffOutput, ok := s.diffOutputs[index]
diffOutput, ok := s.lockProtected.diffOutputs[index]
s.lock.Unlock()
if ok {
var untrustedUncompressedDigest digest.Digest
Expand Down Expand Up @@ -773,15 +779,15 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D
}

s.lock.Lock()
diffID, ok := s.blobDiffIDs[layerDigest]
diffID, ok := s.lockProtected.blobDiffIDs[layerDigest]
s.lock.Unlock()

if !ok {
return nil, fmt.Errorf("failed to find diffID for layer: %q", layerDigest)
}

s.lock.Lock()
al, ok := s.blobAdditionalLayer[layerDigest]
al, ok := s.lockProtected.blobAdditionalLayer[layerDigest]
s.lock.Unlock()
if ok {
layer, err := al.PutAs(newLayerID, parentLayer, nil)
Expand All @@ -794,7 +800,7 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D
// Check if we previously cached a file with that blob's contents. If we didn't,
// then we need to read the desired contents from a layer.
s.lock.Lock()
filename, ok := s.filenames[layerDigest]
filename, ok := s.lockProtected.filenames[layerDigest]
s.lock.Unlock()
if !ok {
// Try to find the layer with contents matching that blobsum.
Expand Down Expand Up @@ -842,7 +848,7 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D
// Make sure that we can find this file later, should we need the layer's
// contents again.
s.lock.Lock()
s.filenames[layerDigest] = filename
s.lockProtected.filenames[layerDigest] = filename
s.lock.Unlock()
}
// Read the cached blob and use it as a diff.
Expand Down Expand Up @@ -871,6 +877,8 @@ func (s *storageImageDestination) createNewLayer(index int, layerDigest digest.D
// - Uploaded data MAY be visible to others before Commit() is called
// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed)
func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error {
// This function is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.

if len(s.manifest) == 0 {
return errors.New("Internal error: storageImageDestination.Commit() called without PutManifest()")
}
Expand Down Expand Up @@ -936,14 +944,14 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
// Set up to save the non-layer blobs as data items. Since we only share layers, they should all be in files, so
// we just need to screen out the ones that are actually layers to get the list of non-layers.
dataBlobs := set.New[digest.Digest]()
for blob := range s.filenames {
for blob := range s.lockProtected.filenames {
dataBlobs.Add(blob)
}
for _, layerBlob := range layerBlobs {
dataBlobs.Delete(layerBlob.Digest)
}
for _, blob := range dataBlobs.Values() {
v, err := os.ReadFile(s.filenames[blob])
v, err := os.ReadFile(s.lockProtected.filenames[blob])
if err != nil {
return fmt.Errorf("copying non-layer blob %q to image: %w", blob, err)
}
Expand Down

0 comments on commit fa76546

Please sign in to comment.