From d70a8644cfbe53706cba7a87866371992eecfb3a Mon Sep 17 00:00:00 2001 From: James McDermott Date: Thu, 26 Sep 2024 11:33:44 +0100 Subject: [PATCH] Issue #657 - session conflict handling - added handling to package revision Create operation - if another request is already creating a package revision with the same details, fails withHTTP 409 https://github.com/nephio-project/nephio/issues/657 --- .vscode/launch.json | 22 ++++++++++- go.mod | 2 +- pkg/cache/repository.go | 4 +- pkg/registry/porch/packagerevision.go | 48 ++++++++++++++++++++---- test/e2e/e2e_test.go | 53 ++++++++++++++++++++++++++- test/e2e/suite.go | 21 +++++++++++ test/e2e/suite_utils.go | 10 ++--- 7 files changed, 143 insertions(+), 17 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index a72ff53f..9979c12c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -37,7 +37,18 @@ } }, { - "name": "Launch test function", + "name": "Run Porchctl command", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}/cmd/porchctl/main.go", + "args": [ + "-n", "porch", "rpkg", "approve", "blueprints-835222fccff6e9e042e3b01b8c554f394d6e55d1" + ], + "cwd": "${workspaceFolder}" + }, + { + "name": "Launch E2E tests", "type": "go", "request": "launch", "mode": "test", @@ -45,7 +56,7 @@ "args": [ "-test.v", "-test.run", - "TestE2E/PorchSuite/TestGitRepositoryWithReleaseTagsAndDirectory" + "TestE2E/PorchSuite/TestConcurrentInits" ], "env": { "E2E": "1"} }, @@ -63,5 +74,12 @@ "namespace=foo" ] } + ], + "compounds": [ + { + "name": "Launch Server and Controllers", + "configurations": ["Launch Server", "Launch Controllers"], + "stopAll": true + } ] } \ No newline at end of file diff --git a/go.mod b/go.mod index 1438ba54..7ad46dab 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,6 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.9.0 - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 go.opentelemetry.io/otel v1.28.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 @@ -167,6 +166,7 @@ require ( go.etcd.io/etcd/client/v3 v3.5.10 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect go.opentelemetry.io/otel/metric v1.28.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect diff --git a/pkg/cache/repository.go b/pkg/cache/repository.go index 5dad725d..55f39b2c 100644 --- a/pkg/cache/repository.go +++ b/pkg/cache/repository.go @@ -319,6 +319,7 @@ func (r *cachedRepository) DeletePackage(ctx context.Context, old repository.Pac } func (r *cachedRepository) Close() error { + r.pollOnce(context.TODO()) r.cancel() // Make sure that watch events are sent for packagerevisions that are @@ -338,12 +339,13 @@ func (r *cachedRepository) Close() error { // There isn't much use in returning an error here, so we just log it // and create a PackageRevisionMeta with just name and namespace. This // makes sure that the Delete event is sent. - klog.Warningf("Error looking up PackageRev CR for %s: %v", nn.Name, err) + klog.Warningf("Error deleting PackageRev CR for %s: %v", nn.Name, err) pkgRevMeta = meta.PackageRevisionMeta{ Name: nn.Name, Namespace: nn.Namespace, } } + klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name) sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta) } klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions)) diff --git a/pkg/registry/porch/packagerevision.go b/pkg/registry/porch/packagerevision.go index 51dd98cd..1226875b 100644 --- a/pkg/registry/porch/packagerevision.go +++ b/pkg/registry/porch/packagerevision.go @@ -17,6 +17,7 @@ package porch import ( "context" "fmt" + "sync" api "github.com/nephio-project/porch/api/porch/v1alpha1" "github.com/nephio-project/porch/pkg/engine" @@ -34,6 +35,13 @@ import ( var tracer = otel.Tracer("apiserver") +var mutexMapMutex sync.Mutex +var packageRevisionCreationMutexes = map[string]*sync.Mutex{} + +const ( + ConflictErrorMsg = "another request is already in progress to create %s with details %s" +) + type packageRevisions struct { packageCommon rest.TableConvertor @@ -49,13 +57,11 @@ var _ rest.GracefulDeleter = &packageRevisions{} var _ rest.Watcher = &packageRevisions{} var _ rest.SingularNameProvider = &packageRevisions{} - // GetSingularName implements the SingularNameProvider interface -func (r *packageRevisions) GetSingularName() (string) { +func (r *packageRevisions) GetSingularName() string { return "packagerevision" } - func (r *packageRevisions) New() runtime.Object { return &api.PackageRevision{} } @@ -120,7 +126,7 @@ func (r *packageRevisions) Get(ctx context.Context, name string, options *metav1 } // Create implements the Creater interface. -func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc, +func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { ctx, span := tracer.Start(ctx, "packageRevisions::Create", trace.WithAttributes()) defer span.End() @@ -166,6 +172,35 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj parentPackage = p } + uncreatedPackageKey := fmt.Sprintf("%s-%s-%s-%s", + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName) + + mutexMapMutex.Lock() + packageMutex, alreadyPresent := packageRevisionCreationMutexes[uncreatedPackageKey] + if !alreadyPresent { + packageMutex = &sync.Mutex{} + packageRevisionCreationMutexes[uncreatedPackageKey] = packageMutex + } + mutexMapMutex.Unlock() + + lockAcquired := packageMutex.TryLock() + if !lockAcquired { + return nil, + apierrors.NewConflict( + api.Resource("packagerevisions"), + fmt.Sprintf("(new creation)"), + fmt.Errorf(ConflictErrorMsg, "package revision", fmt.Sprintf( + "namespace=%s, repository=%s, package=%s,workspace=%s", + newApiPkgRev.Namespace, + newApiPkgRev.Spec.RepositoryName, + newApiPkgRev.Spec.PackageName, + newApiPkgRev.Spec.WorkspaceName))) + } + defer packageMutex.Unlock() + createdRepoPkgRev, err := r.cad.CreatePackageRevision(ctx, repositoryObj, newApiPkgRev, parentPackage) if err != nil { return nil, apierrors.NewInternalError(err) @@ -184,13 +219,12 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj // Update finds a resource in the storage and updates it. Some implementations // may allow updates creates the object - they should set the created boolean // to true. -func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, +func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { ctx, span := tracer.Start(ctx, "packageRevisions::Update", trace.WithAttributes()) defer span.End() - return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, - ) + return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate) } // Delete implements the GracefulDeleter interface. diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 5233073d..b37cde48 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -21,6 +21,7 @@ import ( "os" "path/filepath" "reflect" + "slices" "strings" "testing" "time" @@ -260,7 +261,7 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { Namespace: t.Namespace, }, Spec: porchapi.PackageRevisionSpec{ - PackageName: "empty-package", + PackageName: packageName, WorkspaceName: workspace, RepositoryName: repository, }, @@ -285,6 +286,56 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) { } } +func (t *PorchSuite) TestConcurrentInits(ctx context.Context) { + // Create a new package via init, no task specified + const ( + repository = "git" + packageName = "empty-package-concurrent" + revision = "v1" + workspace = "test-workspace" + description = "empty-package description" + ) + + // Register the repository + t.RegisterMainGitRepositoryF(ctx, repository) + + // Create a new package (via init) + pr := &porchapi.PackageRevision{ + TypeMeta: metav1.TypeMeta{ + Kind: "PackageRevision", + APIVersion: porchapi.SchemeGroupVersion.String(), + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: t.Namespace, + }, + Spec: porchapi.PackageRevisionSpec{ + PackageName: packageName, + WorkspaceName: workspace, + RepositoryName: repository, + }, + } + // Two clients try to create it at the same time + creationFunction := func() any { + return t.Client.Create(ctx, pr) + } + results := RunInParallel(creationFunction, creationFunction) + + expectedResultCount := 2 + if actualResultCount := len(results); actualResultCount != expectedResultCount { + t.Fatalf("expected %d results but was %d", expectedResultCount, actualResultCount) + } + if succesfulCreation := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult == nil + }); !succesfulCreation { + t.Fatalf("expected one request to succeed, but did not happen - results: %v", results) + } + if conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool { + return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress") + }); !conflictFailurePresent { + t.Fatal("expected one request to fail with a conflict, but did not happen") + } +} + func (t *PorchSuite) TestInitTaskPackage(ctx context.Context) { const ( repository = "git" diff --git a/test/e2e/suite.go b/test/e2e/suite.go index 2fcadc9f..db6c8ffb 100644 --- a/test/e2e/suite.go +++ b/test/e2e/suite.go @@ -127,6 +127,27 @@ func RunSuite(suite interface{}, t *testing.T) { }) } +func RunInParallel(functions ...func() any) []any { + var group sync.WaitGroup + var results []any + for _, eachFunction := range functions { + group.Add(1) + go func() { + defer group.Done() + if reflect.TypeOf(eachFunction).NumOut() == 0 { + results = append(results, nil) + eachFunction() + } else { + eachResult := eachFunction() + + results = append(results, eachResult) + } + }() + } + group.Wait() + return results +} + func (t *TestSuite) SetT(tt *testing.T) { t.T = tt } diff --git a/test/e2e/suite_utils.go b/test/e2e/suite_utils.go index 801325a0..b89b085f 100644 --- a/test/e2e/suite_utils.go +++ b/test/e2e/suite_utils.go @@ -213,7 +213,7 @@ func (t *TestSuite) registerGitRepositoryFromConfigF(ctx context.Context, name s t.Cleanup(func() { t.DeleteE(ctx, repository) t.WaitUntilRepositoryDeleted(ctx, name, t.Namespace) - t.WaitUntilAllPackagesDeleted(ctx, name) + t.WaitUntilAllPackagesDeleted(ctx, name, t.Namespace) }) // Make sure the repository is ready before we test to (hopefully) @@ -365,7 +365,7 @@ func (t *TestSuite) WaitUntilRepositoryDeleted(ctx context.Context, name, namesp } } -func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) { +func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) { t.Helper() err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) { t.Helper() @@ -375,7 +375,7 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st return false, nil } for _, pkgRev := range pkgRevList.Items { - if strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) { + if pkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) { t.Logf("Found package %s from repo %s", pkgRev.Name, repoName) return false, nil } @@ -387,8 +387,8 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st return false, nil } for _, internalPkgRev := range internalPkgRevList.Items { - if strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) { - t.Logf("Found internalPkg %s from repo %s", internalPkgRev.Name, repoName) + if internalPkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) { + t.Logf("Found internalPkg %s/%s from repo %s", internalPkgRev.Namespace, internalPkgRev.Name, repoName) return false, nil } }