Skip to content

Commit

Permalink
Issue #657 - session conflict handling
Browse files Browse the repository at this point in the history
- added handling to package revision Create operation
  - if another request is already creating a package
    revision with the same details, fails withHTTP
    409

nephio-project/nephio#657
  • Loading branch information
JamesMcDermott committed Sep 26, 2024
1 parent ac4efaa commit d70a864
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 17 deletions.
22 changes: 20 additions & 2 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,26 @@
}
},
{
"name": "Launch test function",
"name": "Run Porchctl command",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/porchctl/main.go",
"args": [
"-n", "porch", "rpkg", "approve", "blueprints-835222fccff6e9e042e3b01b8c554f394d6e55d1"
],
"cwd": "${workspaceFolder}"
},
{
"name": "Launch E2E tests",
"type": "go",
"request": "launch",
"mode": "test",
"program": "${workspaceFolder}/test/e2e",
"args": [
"-test.v",
"-test.run",
"TestE2E/PorchSuite/TestGitRepositoryWithReleaseTagsAndDirectory"
"TestE2E/PorchSuite/TestConcurrentInits"
],
"env": { "E2E": "1"}
},
Expand All @@ -63,5 +74,12 @@
"namespace=foo"
]
}
],
"compounds": [
{
"name": "Launch Server and Controllers",
"configurations": ["Launch Server", "Launch Controllers"],
"stopAll": true
}
]
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0
Expand Down Expand Up @@ -167,6 +166,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.10 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
Expand Down
4 changes: 3 additions & 1 deletion pkg/cache/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ func (r *cachedRepository) DeletePackage(ctx context.Context, old repository.Pac
}

func (r *cachedRepository) Close() error {
r.pollOnce(context.TODO())
r.cancel()

// Make sure that watch events are sent for packagerevisions that are
Expand All @@ -338,12 +339,13 @@ func (r *cachedRepository) Close() error {
// There isn't much use in returning an error here, so we just log it
// and create a PackageRevisionMeta with just name and namespace. This
// makes sure that the Delete event is sent.
klog.Warningf("Error looking up PackageRev CR for %s: %v", nn.Name, err)
klog.Warningf("Error deleting PackageRev CR for %s: %v", nn.Name, err)
pkgRevMeta = meta.PackageRevisionMeta{
Name: nn.Name,
Namespace: nn.Namespace,
}
}
klog.Infof("repo %s: successfully deleted packagerev %s/%s", r.id, nn.Namespace, nn.Name)
sent += r.objectNotifier.NotifyPackageRevisionChange(watch.Deleted, pr, pkgRevMeta)
}
klog.Infof("repo %s: sent %d notifications for %d package revisions during close", r.id, sent, len(r.cachedPackageRevisions))
Expand Down
48 changes: 41 additions & 7 deletions pkg/registry/porch/packagerevision.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package porch
import (
"context"
"fmt"
"sync"

api "github.com/nephio-project/porch/api/porch/v1alpha1"
"github.com/nephio-project/porch/pkg/engine"
Expand All @@ -34,6 +35,13 @@ import (

var tracer = otel.Tracer("apiserver")

var mutexMapMutex sync.Mutex
var packageRevisionCreationMutexes = map[string]*sync.Mutex{}

const (
ConflictErrorMsg = "another request is already in progress to create %s with details %s"
)

type packageRevisions struct {
packageCommon
rest.TableConvertor
Expand All @@ -49,13 +57,11 @@ var _ rest.GracefulDeleter = &packageRevisions{}
var _ rest.Watcher = &packageRevisions{}
var _ rest.SingularNameProvider = &packageRevisions{}


// GetSingularName implements the SingularNameProvider interface
func (r *packageRevisions) GetSingularName() (string) {
func (r *packageRevisions) GetSingularName() string {
return "packagerevision"
}


func (r *packageRevisions) New() runtime.Object {
return &api.PackageRevision{}
}
Expand Down Expand Up @@ -120,7 +126,7 @@ func (r *packageRevisions) Get(ctx context.Context, name string, options *metav1
}

// Create implements the Creater interface.
func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc,
func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Object, createValidation rest.ValidateObjectFunc,
options *metav1.CreateOptions) (runtime.Object, error) {
ctx, span := tracer.Start(ctx, "packageRevisions::Create", trace.WithAttributes())
defer span.End()
Expand Down Expand Up @@ -166,6 +172,35 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj
parentPackage = p
}

uncreatedPackageKey := fmt.Sprintf("%s-%s-%s-%s",
newApiPkgRev.Namespace,
newApiPkgRev.Spec.RepositoryName,
newApiPkgRev.Spec.PackageName,
newApiPkgRev.Spec.WorkspaceName)

mutexMapMutex.Lock()
packageMutex, alreadyPresent := packageRevisionCreationMutexes[uncreatedPackageKey]
if !alreadyPresent {
packageMutex = &sync.Mutex{}
packageRevisionCreationMutexes[uncreatedPackageKey] = packageMutex
}
mutexMapMutex.Unlock()

lockAcquired := packageMutex.TryLock()
if !lockAcquired {
return nil,
apierrors.NewConflict(
api.Resource("packagerevisions"),
fmt.Sprintf("(new creation)"),
fmt.Errorf(ConflictErrorMsg, "package revision", fmt.Sprintf(
"namespace=%s, repository=%s, package=%s,workspace=%s",
newApiPkgRev.Namespace,
newApiPkgRev.Spec.RepositoryName,
newApiPkgRev.Spec.PackageName,
newApiPkgRev.Spec.WorkspaceName)))
}
defer packageMutex.Unlock()

createdRepoPkgRev, err := r.cad.CreatePackageRevision(ctx, repositoryObj, newApiPkgRev, parentPackage)
if err != nil {
return nil, apierrors.NewInternalError(err)
Expand All @@ -184,13 +219,12 @@ func (r *packageRevisions) Create(ctx context.Context, runtimeObject runtime.Obj
// Update finds a resource in the storage and updates it. Some implementations
// may allow updates creates the object - they should set the created boolean
// to true.
func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc,
func (r *packageRevisions) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc,
updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
ctx, span := tracer.Start(ctx, "packageRevisions::Update", trace.WithAttributes())
defer span.End()

return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate,
)
return r.packageCommon.updatePackageRevision(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate)
}

// Delete implements the GracefulDeleter interface.
Expand Down
53 changes: 52 additions & 1 deletion test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path/filepath"
"reflect"
"slices"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -260,7 +261,7 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) {
Namespace: t.Namespace,
},
Spec: porchapi.PackageRevisionSpec{
PackageName: "empty-package",
PackageName: packageName,
WorkspaceName: workspace,
RepositoryName: repository,
},
Expand All @@ -285,6 +286,56 @@ func (t *PorchSuite) TestInitEmptyPackage(ctx context.Context) {
}
}

func (t *PorchSuite) TestConcurrentInits(ctx context.Context) {
// Create a new package via init, no task specified
const (
repository = "git"
packageName = "empty-package-concurrent"
revision = "v1"
workspace = "test-workspace"
description = "empty-package description"
)

// Register the repository
t.RegisterMainGitRepositoryF(ctx, repository)

// Create a new package (via init)
pr := &porchapi.PackageRevision{
TypeMeta: metav1.TypeMeta{
Kind: "PackageRevision",
APIVersion: porchapi.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Namespace: t.Namespace,
},
Spec: porchapi.PackageRevisionSpec{
PackageName: packageName,
WorkspaceName: workspace,
RepositoryName: repository,
},
}
// Two clients try to create it at the same time
creationFunction := func() any {
return t.Client.Create(ctx, pr)
}
results := RunInParallel(creationFunction, creationFunction)

expectedResultCount := 2
if actualResultCount := len(results); actualResultCount != expectedResultCount {
t.Fatalf("expected %d results but was %d", expectedResultCount, actualResultCount)
}
if succesfulCreation := slices.ContainsFunc(results, func(eachResult any) bool {
return eachResult == nil
}); !succesfulCreation {
t.Fatalf("expected one request to succeed, but did not happen - results: %v", results)
}
if conflictFailurePresent := slices.ContainsFunc(results, func(eachResult any) bool {
return eachResult != nil && strings.Contains(eachResult.(error).Error(), "another request is already in progress")
}); !conflictFailurePresent {
t.Fatal("expected one request to fail with a conflict, but did not happen")
}
}

func (t *PorchSuite) TestInitTaskPackage(ctx context.Context) {
const (
repository = "git"
Expand Down
21 changes: 21 additions & 0 deletions test/e2e/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,27 @@ func RunSuite(suite interface{}, t *testing.T) {
})
}

func RunInParallel(functions ...func() any) []any {
var group sync.WaitGroup
var results []any
for _, eachFunction := range functions {
group.Add(1)
go func() {
defer group.Done()
if reflect.TypeOf(eachFunction).NumOut() == 0 {
results = append(results, nil)
eachFunction()
} else {
eachResult := eachFunction()

results = append(results, eachResult)
}
}()
}
group.Wait()
return results
}

func (t *TestSuite) SetT(tt *testing.T) {
t.T = tt
}
Expand Down
10 changes: 5 additions & 5 deletions test/e2e/suite_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (t *TestSuite) registerGitRepositoryFromConfigF(ctx context.Context, name s
t.Cleanup(func() {
t.DeleteE(ctx, repository)
t.WaitUntilRepositoryDeleted(ctx, name, t.Namespace)
t.WaitUntilAllPackagesDeleted(ctx, name)
t.WaitUntilAllPackagesDeleted(ctx, name, t.Namespace)
})

// Make sure the repository is ready before we test to (hopefully)
Expand Down Expand Up @@ -365,7 +365,7 @@ func (t *TestSuite) WaitUntilRepositoryDeleted(ctx context.Context, name, namesp
}
}

func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string) {
func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName string, namespace string) {
t.Helper()
err := wait.PollUntilContextTimeout(ctx, time.Second, 60*time.Second, true, func(ctx context.Context) (done bool, err error) {
t.Helper()
Expand All @@ -375,7 +375,7 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st
return false, nil
}
for _, pkgRev := range pkgRevList.Items {
if strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) {
if pkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", pkgRev.Name), repoName) {
t.Logf("Found package %s from repo %s", pkgRev.Name, repoName)
return false, nil
}
Expand All @@ -387,8 +387,8 @@ func (t *TestSuite) WaitUntilAllPackagesDeleted(ctx context.Context, repoName st
return false, nil
}
for _, internalPkgRev := range internalPkgRevList.Items {
if strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) {
t.Logf("Found internalPkg %s from repo %s", internalPkgRev.Name, repoName)
if internalPkgRev.Namespace == namespace && strings.HasPrefix(fmt.Sprintf("%s-", internalPkgRev.Name), repoName) {
t.Logf("Found internalPkg %s/%s from repo %s", internalPkgRev.Namespace, internalPkgRev.Name, repoName)
return false, nil
}
}
Expand Down

0 comments on commit d70a864

Please sign in to comment.