Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix container parallel upload bugs #32022

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions routers/api/packages/container/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,29 @@ import (
"fmt"
"os"
"strings"
"sync"

"code.gitea.io/gitea/models/db"
packages_model "code.gitea.io/gitea/models/packages"
container_model "code.gitea.io/gitea/models/packages/container"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/log"
packages_module "code.gitea.io/gitea/modules/packages"
container_module "code.gitea.io/gitea/modules/packages/container"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
packages_service "code.gitea.io/gitea/services/packages"
)

var uploadVersionMutex sync.Mutex
// TODO: use clustered lock
var uploadVersionMutex = sync.NewExclusivePool()

// saveAsPackageBlob creates a package blob from an upload
// The uploaded blob gets stored in a special upload version to link them to the package/image
func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader, pci *packages_service.PackageCreationInfo) (*packages_model.PackageBlob, error) {
pkgPath := pci.PackageInfo.Owner.LowerName + "/" + pci.PackageInfo.Name
uploadVersionMutex.CheckIn(pkgPath)
defer uploadVersionMutex.CheckOut(pkgPath)

pb := packages_service.NewPackageBlob(hsr)

exists := false
Expand Down Expand Up @@ -80,6 +86,10 @@ func saveAsPackageBlob(ctx context.Context, hsr packages_module.HashedSizeReader

// mountBlob mounts the specific blob to a different package
func mountBlob(ctx context.Context, pi *packages_service.PackageInfo, pb *packages_model.PackageBlob) error {
pkgPath := pi.Owner.LowerName + "/" + pi.Name
uploadVersionMutex.CheckIn(pkgPath)
defer uploadVersionMutex.CheckOut(pkgPath)

uploadVersion, err := getOrCreateUploadVersion(ctx, pi)
if err != nil {
return err
Expand All @@ -93,9 +103,6 @@ func mountBlob(ctx context.Context, pi *packages_service.PackageInfo, pb *packag
func getOrCreateUploadVersion(ctx context.Context, pi *packages_service.PackageInfo) (*packages_model.PackageVersion, error) {
var uploadVersion *packages_model.PackageVersion

// FIXME: Replace usage of mutex with database transaction
// https://github.com/go-gitea/gitea/pull/21862
uploadVersionMutex.Lock()
err := db.WithTx(ctx, func(ctx context.Context) error {
created := true
p := &packages_model.Package{
Expand Down Expand Up @@ -140,7 +147,6 @@ func getOrCreateUploadVersion(ctx context.Context, pi *packages_service.PackageI

return nil
})
uploadVersionMutex.Unlock()

return uploadVersion, err
}
Expand Down Expand Up @@ -172,10 +178,14 @@ func createFileForBlob(ctx context.Context, pv *packages_model.PackageVersion, p
return nil
}

func deleteBlob(ctx context.Context, ownerID int64, image, digest string) error {
func deleteBlob(ctx context.Context, owner *user_model.User, image, digest string) error {
pkgPath := owner.LowerName + "/" + image
uploadVersionMutex.CheckIn(pkgPath)
defer uploadVersionMutex.CheckOut(pkgPath)

return db.WithTx(ctx, func(ctx context.Context) error {
pfds, err := container_model.GetContainerBlobs(ctx, &container_model.BlobSearchOptions{
OwnerID: ownerID,
OwnerID: owner.ID,
Image: image,
Digest: digest,
})
Expand Down
14 changes: 13 additions & 1 deletion routers/api/packages/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
packages_module "code.gitea.io/gitea/modules/packages"
container_module "code.gitea.io/gitea/modules/packages/container"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/routers/api/packages/helper"
auth_service "code.gitea.io/gitea/services/auth"
Expand Down Expand Up @@ -540,7 +541,7 @@ func DeleteBlob(ctx *context.Context) {
return
}

if err := deleteBlob(ctx, ctx.Package.Owner.ID, ctx.Params("image"), d); err != nil {
if err := deleteBlob(ctx, ctx.Package.Owner, ctx.Params("image"), d); err != nil {
apiError(ctx, http.StatusInternalServerError, err)
return
}
Expand All @@ -550,6 +551,9 @@ func DeleteBlob(ctx *context.Context) {
})
}

// TODO: use clustered lock
var lockManifest = sync.NewExclusivePool()

// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#pushing-manifests
func UploadManifest(ctx *context.Context) {
reference := ctx.Params("reference")
Expand Down Expand Up @@ -581,6 +585,10 @@ func UploadManifest(ctx *context.Context) {
return
}

imagePath := ctx.Package.Owner.Name + "/" + ctx.Params("image")
lockManifest.CheckIn(imagePath)
defer lockManifest.CheckOut(imagePath)

digest, err := processManifest(ctx, mci, buf)
if err != nil {
var namedError *namedError
Expand Down Expand Up @@ -679,6 +687,10 @@ func DeleteManifest(ctx *context.Context) {
return
}

imagePath := ctx.Package.Owner.Name + "/" + ctx.Params("image")
lockManifest.CheckIn(imagePath)
defer lockManifest.CheckOut(imagePath)

pvs, err := container_model.GetManifestVersions(ctx, opts)
if err != nil {
apiError(ctx, http.StatusInternalServerError, err)
Expand Down