Skip to content

Commit

Permalink
Use per-process file locks to enable concurrent use of multiple Gobbl…
Browse files Browse the repository at this point in the history
…ers.

Particularly useful on complex HPC setups where the same disk is mounted
in different compute environments; now we just need to set up the
Gobbler on each environment and they'll all contribute to the same
registry without clobbering each other.
  • Loading branch information
LTLA committed Apr 10, 2024
1 parent 976446a commit b80a21d
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 33 deletions.
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ cd gobbler && go build
```

Then, set up a staging directory with global read/write permissions.
ll parent directories of the staging directory should be at least globally executable.
All parent directories of the staging directory should be at least globally executable.

```sh
mkdir STAGING
Expand All @@ -333,5 +333,10 @@ Finally, start the Gobbler by running the binary with a few arguments, including
-port PORT
```

For requests, clients should write to `STAGING` and hit the API at `PORT` (or any equivalent alias).
All registered files can be read from `REGISTRY`.
Multiple Gobbler instances can target the same `REGISTRY` with different `STAGING`.
This is useful for complex configurations where the same filesystem is mounted in multiple compute environments,
whereby a separate Gobbler instance can be set up in each environment to enable uploads.

Clients need to know `STAGING`, `REGISTRY` and the URL of the REST API.
The location of the staging directory and the URL will be used to make requests as described [above](#general-instructions).
The contents of the registry can be directly read from the filesystem.
4 changes: 2 additions & 2 deletions create.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ func createProject(project string, inperms *unsafePermissionsMetadata, req_user
// No need to lock before MkdirAll, it just no-ops if the directory already exists.
err = os.MkdirAll(project_dir, 0755)

globals.Locks.LockPath(project_dir, 1000 * time.Second)
globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return fmt.Errorf("failed to acquire the lock on %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

perms := permissionsMetadata{}
if inperms != nil && inperms.Owners != nil {
Expand Down
8 changes: 4 additions & 4 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@ func deleteAssetHandler(reqpath string, globals *globalConfiguration) error {
if _, err := os.Stat(project_dir); errors.Is(err, os.ErrNotExist) {
return nil
}
err = globals.Locks.LockPath(project_dir, 1000 * time.Second)
err = globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return fmt.Errorf("failed to lock project directory %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

asset_dir := filepath.Join(project_dir, *(incoming.Asset))
if _, err := os.Stat(asset_dir); errors.Is(err, os.ErrNotExist) {
Expand Down Expand Up @@ -191,11 +191,11 @@ func deleteVersionHandler(reqpath string, globals *globalConfiguration) error {
if _, err := os.Stat(project_dir); errors.Is(err, os.ErrNotExist) {
return nil
}
err = globals.Locks.LockPath(project_dir, 1000 * time.Second)
err = globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return fmt.Errorf("failed to lock project directory %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

asset_dir := filepath.Join(project_dir, *(incoming.Asset))
if _, err := os.Stat(asset_dir); errors.Is(err, os.ErrNotExist) {
Expand Down
4 changes: 2 additions & 2 deletions latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ func refreshLatestHandler(reqpath string, globals *globalConfiguration) (*latest
// Technically we only need a lock on the asset directory, but all
// mutating operations will lock the project directory, so we respect that.
project_dir := filepath.Join(globals.Registry, *(incoming.Project))
err = globals.Locks.LockPath(project_dir, 1000 * time.Second)
err = globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return nil, fmt.Errorf("failed to acquire the lock on the project directory %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

asset_dir := filepath.Join(project_dir, *(incoming.Asset))
output, err := refreshLatest(asset_dir)
Expand Down
41 changes: 33 additions & 8 deletions lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@ import (
"time"
"fmt"
"sync"
"os"
"syscall"
"path/filepath"
)

type pathLocks struct {
Lock sync.Mutex
InUse map[string]bool
InUse map[string]*os.File
}

func newPathLocks() pathLocks {
return pathLocks{ InUse: map[string]bool{} }
return pathLocks{ InUse: map[string]*os.File{} }
}

func (pl *pathLocks) LockPath(path string, timeout time.Duration) error {
func (pl *pathLocks) obtainLock(path string, lockfile string, timeout time.Duration) error {
var t time.Time
init := true

for {
if init {
t = time.Now()
Expand All @@ -31,12 +35,24 @@ func (pl *pathLocks) LockPath(path string, timeout time.Duration) error {
defer pl.Lock.Unlock()

_, ok := pl.InUse[path]
if !ok {
pl.InUse[path] = true
return false
} else {
if ok {
return true
}

// Place an advisory lock across multiple gobbler processes.
file, err := os.OpenFile(lockfile, os.O_RDWR|os.O_CREATE, 0666)
if err != nil { // Maybe we failed to write it because the handle was opened by some other process.
return true
}

err = syscall.Flock(int(file.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil { // The lock failed because of contention, or permissions, or who knows.
file.Close()
return true
}

pl.InUse[path] = file
return false
}()

if !already_locked {
Expand All @@ -47,8 +63,17 @@ func (pl *pathLocks) LockPath(path string, timeout time.Duration) error {
}
}

func (pl* pathLocks) UnlockPath(path string) {
func (pl *pathLocks) LockDirectory(path string, timeout time.Duration) error {
return pl.obtainLock(path, filepath.Join(path, "..LOCK"), timeout)
}

func (pl* pathLocks) Unlock(path string) {
pl.Lock.Lock()
defer pl.Lock.Unlock()

file := pl.InUse[path]
defer file.Close()

syscall.Flock(int(file.Fd()), syscall.LOCK_UN)
delete(pl.InUse, path)
}
16 changes: 10 additions & 6 deletions lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import (
"testing"
"time"
"strings"
"os"
)

func TestLock(t *testing.T) {
pl := newPathLocks()
path, err := os.MkdirTemp("", "")
if err != nil {
t.Fatalf("failed to create a mock directory; %v", err)
}

path := "FOO"
err := pl.LockPath(path, 10 * time.Second)
pl := newPathLocks()
err = pl.LockDirectory(path, 10 * time.Second)
if err != nil {
t.Fatalf("failed to acquire the lock; %v", err)
}

err = pl.LockPath(path, 0 * time.Second)
err = pl.LockDirectory(path, 0 * time.Second)
if err == nil || !strings.Contains(err.Error(), "timed out") {
t.Fatal("should have failed to acquire the lock")
}

pl.UnlockPath(path)
err = pl.LockPath(path, 0 * time.Second)
pl.Unlock(path)
err = pl.LockDirectory(path, 0 * time.Second)
if err != nil {
t.Fatalf("failed to acquire the lock with a zero timeout; %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ func setPermissionsHandler(reqpath string, globals *globalConfiguration) error {

project := *(incoming.Project)
project_dir := filepath.Join(globals.Registry, project)
err = globals.Locks.LockPath(project_dir, 1000 * time.Second)
err = globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return fmt.Errorf("failed to lock project directory %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

existing, err := readPermissions(project_dir)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions probation.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func baseProbationHandler(reqpath string, globals *globalConfiguration, approve

project := *(incoming.Project)
project_dir := filepath.Join(globals.Registry, project)
err = globals.Locks.LockPath(project_dir, 1000 * time.Second)
err = globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return fmt.Errorf("failed to lock project directory %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

existing, err := readPermissions(project_dir)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func uploadHandler(reqpath string, globals *globalConfiguration) error {
project := *(request.Project)

project_dir := filepath.Join(globals.Registry, project)
err = globals.Locks.LockPath(project_dir, 1000 * time.Second)
err = globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return fmt.Errorf("failed to acquire the lock on %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

perms, err := readPermissions(project_dir)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func refreshUsageHandler(reqpath string, globals *globalConfiguration) (*usageMe
}

project_dir := filepath.Join(globals.Registry, *(incoming.Project))
err = globals.Locks.LockPath(project_dir, 1000 * time.Second)
err = globals.Locks.LockDirectory(project_dir, 10 * time.Second)
if err != nil {
return nil, fmt.Errorf("failed to lock the project directory %q; %w", project_dir, err)
}
defer globals.Locks.UnlockPath(project_dir)
defer globals.Locks.Unlock(project_dir)

new_usage, err := computeUsage(project_dir, true)
if err != nil {
Expand Down

0 comments on commit b80a21d

Please sign in to comment.