Skip to content

Commit

Permalink
Merge pull request #1586 from bbockelm/sync_upload
Browse files Browse the repository at this point in the history
Add "sync" functionality to the client
  • Loading branch information
turetske committed Sep 19, 2024
2 parents e4a090b + c94fc33 commit 72438ca
Show file tree
Hide file tree
Showing 9 changed files with 687 additions and 80 deletions.
10 changes: 7 additions & 3 deletions client/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (

// Make a request to the director for a given verb/resource; return the
// HTTP response object only if a 307 is returned.
func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (resp *http.Response, err error) {
func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string, token string) (resp *http.Response, err error) {
resourceUrl := directorUrl + sourcePath
// Here we use http.Transport to prevent the client from following the director's
// redirect. We use the Location url elsewhere (plus we still need to do the token
Expand All @@ -63,6 +63,10 @@ func queryDirector(ctx context.Context, verb, sourcePath, directorUrl string) (r
// cannot.
req.Header.Set("User-Agent", getUserAgent(""))

if token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}

// Perform the HTTP request
resp, err = client.Do(req)

Expand Down Expand Up @@ -161,7 +165,7 @@ func parseServersFromDirectorResponse(resp *http.Response) (servers []*url.URL,
}

// Retrieve federation namespace information for a given URL.
func GetDirectorInfoForPath(ctx context.Context, resourcePath, directorUrl string, isPut bool, query string) (parsedResponse server_structs.DirectorResponse, err error) {
func GetDirectorInfoForPath(ctx context.Context, resourcePath, directorUrl string, isPut bool, query string, token string) (parsedResponse server_structs.DirectorResponse, err error) {
if directorUrl == "" {
return server_structs.DirectorResponse{},
errors.Errorf("unable to retrieve information from a Director for object %s because no director URL was provided", resourcePath)
Expand All @@ -176,7 +180,7 @@ func GetDirectorInfoForPath(ctx context.Context, resourcePath, directorUrl strin
resourcePath += "?" + query
}
var dirResp *http.Response
dirResp, err = queryDirector(ctx, verb, resourcePath, directorUrl)
dirResp, err = queryDirector(ctx, verb, resourcePath, directorUrl, token)
if err != nil {
if isPut && dirResp != nil && dirResp.StatusCode == 405 {
err = errors.New("error 405: No writeable origins were found")
Expand Down
4 changes: 2 additions & 2 deletions client/director_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestQueryDirector(t *testing.T) {
defer server.Close()

// Call QueryDirector with the test server URL and a source path
actualResp, err := queryDirector(context.Background(), "GET", "/foo/bar", server.URL)
actualResp, err := queryDirector(context.Background(), "GET", "/foo/bar", server.URL, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestGetDirectorInfoForPath(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.Background()
_, err := GetDirectorInfoForPath(ctx, tt.resourcePath, tt.directorUrl, tt.isPut, tt.query)
_, err := GetDirectorInfoForPath(ctx, tt.resourcePath, tt.directorUrl, tt.isPut, tt.query, "")
if tt.expectedError != "" {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedError)
Expand Down
275 changes: 275 additions & 0 deletions client/fed_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,278 @@ func TestFailureOnOriginDisablingListings(t *testing.T) {
assert.Error(t, err)
assert.Contains(t, err.Error(), "no collections URL found in director response")
}

func TestSyncUpload(t *testing.T) {
// Create instance of test federation
viper.Reset()
server_utils.ResetOriginExports()

fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)

// Create a token file
issuer, err := config.GetServerIssuerURL()
require.NoError(t, err)

tokenConfig := token.NewWLCGToken()
tokenConfig.Lifetime = time.Minute
tokenConfig.Issuer = issuer
tokenConfig.Subject = "origin"
tokenConfig.AddAudienceAny()
tokenConfig.AddResourceScopes(token_scopes.NewResourceScope(token_scopes.Storage_Read, "/"),
token_scopes.NewResourceScope(token_scopes.Storage_Modify, "/"))
token, err := tokenConfig.CreateToken()
assert.NoError(t, err)
tempToken, err := os.CreateTemp(t.TempDir(), "token")
assert.NoError(t, err, "Error creating temp token file")
defer os.Remove(tempToken.Name())
_, err = tempToken.WriteString(token)
assert.NoError(t, err, "Error writing to temp token file")
tempToken.Close()

// Disable progress bars to not reuse the same mpb instance
viper.Set("Logging.DisableProgressBars", true)

// Make our test directories and files
tempDir := t.TempDir()
innerTempDir, err := os.MkdirTemp(tempDir, "InnerUploadDir")
assert.NoError(t, err)
permissions := os.FileMode(0755)
err = os.Chmod(tempDir, permissions)
require.NoError(t, err)
err = os.Chmod(innerTempDir, permissions)
require.NoError(t, err)

testFileContent1 := "test file content"
testFileContent2 := "more test file content!"
innerTestFileContent := "this content is within another dir!"
tempFile1, err := os.CreateTemp(tempDir, "test1")
require.NoError(t, err, "Error creating temp1 file")
tempFile2, err := os.CreateTemp(tempDir, "test1")
require.NoError(t, err, "Error creating temp2 file")
innerTempFile, err := os.CreateTemp(innerTempDir, "testInner")
require.NoError(t, err, "Error creating inner test file")

_, err = tempFile1.WriteString(testFileContent1)
require.NoError(t, err, "Error writing to temp1 file")
tempFile1.Close()
_, err = tempFile2.WriteString(testFileContent2)
require.NoError(t, err, "Error writing to temp2 file")
tempFile2.Close()
_, err = innerTempFile.WriteString(innerTestFileContent)
require.NoError(t, err, "Error writing to inner test file")
innerTempFile.Close()

t.Run("testSyncUploadFull", func(t *testing.T) {
// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the files we just uploaded
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
})

t.Run("testSyncUploadNone", func(t *testing.T) {
// Set path for object to upload/download
dirName := filepath.Base(tempDir)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_none/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Synchronize the uploaded files again.
transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)

// Should have already been uploaded once
assert.Len(t, transferDetailsUpload, 0)
})

t.Run("testSyncUploadPartial", func(t *testing.T) {
// Set path for object to upload/download
dirName := filepath.Base(tempDir)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_partial/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)
uploadInnerURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_upload_partial/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName, filepath.Base(innerTempDir))

// Upload some files with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, innerTempDir, uploadInnerURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
require.Len(t, transferDetailsUpload, 1)

// Change the contents of the already-uploaded file; changes shouldn't be detected as the size stays the same
newTestFileContent := "XXXX content is within another XXXX"
err = os.WriteFile(innerTempFile.Name(), []byte(newTestFileContent), os.FileMode(0755))
require.NoError(t, err)

// Upload again; this time there should be fewer uploads as the subdir was already moved.
transferDetailsUpload, err = client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
require.Len(t, transferDetailsUpload, 2)

// Download all the objects
downloadDir := t.TempDir()
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, downloadDir, true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)

// Verify we received the original contents, not any modified contents
contentBytes, err := os.ReadFile(filepath.Join(downloadDir, dirName, filepath.Base(innerTempDir), filepath.Base(innerTempFile.Name())))
require.NoError(t, err)
assert.Equal(t, innerTestFileContent, string(contentBytes))
})
}

func TestSyncDownload(t *testing.T) {
// Create instance of test federation
viper.Reset()
server_utils.ResetOriginExports()

fed := fed_test_utils.NewFedTest(t, bothAuthOriginCfg)

// Create a token file
issuer, err := config.GetServerIssuerURL()
require.NoError(t, err)

tokenConfig := token.NewWLCGToken()
tokenConfig.Lifetime = time.Minute
tokenConfig.Issuer = issuer
tokenConfig.Subject = "origin"
tokenConfig.AddAudienceAny()
tokenConfig.AddResourceScopes(token_scopes.NewResourceScope(token_scopes.Storage_Read, "/"),
token_scopes.NewResourceScope(token_scopes.Storage_Modify, "/"))
token, err := tokenConfig.CreateToken()
assert.NoError(t, err)
tempToken, err := os.CreateTemp(t.TempDir(), "token")
assert.NoError(t, err, "Error creating temp token file")
defer os.Remove(tempToken.Name())
_, err = tempToken.WriteString(token)
assert.NoError(t, err, "Error writing to temp token file")
tempToken.Close()

// Disable progress bars to not reuse the same mpb instance
viper.Set("Logging.DisableProgressBars", true)

// Make our test directories and files
tempDir := t.TempDir()
innerTempDir, err := os.MkdirTemp(tempDir, "InnerUploadDir")
assert.NoError(t, err)
permissions := os.FileMode(0755)
err = os.Chmod(tempDir, permissions)
require.NoError(t, err)
err = os.Chmod(innerTempDir, permissions)
require.NoError(t, err)

testFileContent1 := "test file content"
testFileContent2 := "more test file content!"
innerTestFileContent := "this content is within another dir!"
tempFile1, err := os.CreateTemp(tempDir, "test1")
require.NoError(t, err, "Error creating temp1 file")
tempFile2, err := os.CreateTemp(tempDir, "test1")
require.NoError(t, err, "Error creating temp2 file")
innerTempFile, err := os.CreateTemp(innerTempDir, "testInner")
require.NoError(t, err, "Error creating inner test file")

_, err = tempFile1.WriteString(testFileContent1)
require.NoError(t, err, "Error writing to temp1 file")
tempFile1.Close()
_, err = tempFile2.WriteString(testFileContent2)
require.NoError(t, err, "Error writing to temp2 file")
tempFile2.Close()
_, err = innerTempFile.WriteString(innerTestFileContent)
require.NoError(t, err, "Error writing to inner test file")
innerTempFile.Close()

// Set path for object to upload/download
tempPath := tempDir
dirName := filepath.Base(tempPath)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)

// Upload the file with PUT
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

t.Run("testSyncDownloadFull", func(t *testing.T) {
// Download the files we just uploaded
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, t.TempDir(), true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)
})

t.Run("testSyncDownloadNone", func(t *testing.T) {
// Set path for object to upload/download
dirName := t.TempDir()

// Synchronize the uploaded files to a local directory
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadURL, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsDownload)

// Synchronize the files again; should result in no transfers
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, dirName, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
assert.NoError(t, err)
assert.Len(t, transferDetailsDownload, 0)
})

t.Run("testSyncDownloadPartial", func(t *testing.T) {
// Set path for object to upload/download
downloadDir := t.TempDir()
dirName := filepath.Base(tempDir)
uploadURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download_partial/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName)
uploadInnerURL := fmt.Sprintf("pelican://%s:%s/first/namespace/sync_download_partial/%s/%s", param.Server_Hostname.GetString(), strconv.Itoa(param.Server_WebPort.GetInt()), dirName, filepath.Base(innerTempDir))

// Upload the initial files
transferDetailsUpload, err := client.DoPut(fed.Ctx, tempDir, uploadURL, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
verifySuccessfulTransfer(t, transferDetailsUpload)

// Download the inner directory
innerDownloadDir := filepath.Join(downloadDir, dirName, filepath.Base(innerTempDir))
transferDetailsDownload, err := client.DoGet(fed.Ctx, uploadInnerURL, innerDownloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
require.Len(t, transferDetailsDownload, 1)

// Change the contents of one already-uploaded file and re-upload it.
// Filesize is the same so a re-download should be skipped.
newTestFileContent := "XXXX content is within another XXXX"
err = os.WriteFile(innerTempFile.Name(), []byte(newTestFileContent), os.FileMode(0755))
require.NoError(t, err)
transferDetailsUpload, err = client.DoPut(fed.Ctx, innerTempDir, uploadInnerURL, true, client.WithTokenLocation(tempToken.Name()))
require.NoError(t, err)
require.Len(t, transferDetailsUpload, 1)

// Download all the objects
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL, downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
assert.Len(t, transferDetailsDownload, 2)

// Verify we received the original contents, not any modified contents
contentBytes, err := os.ReadFile(filepath.Join(innerDownloadDir, filepath.Base(innerTempFile.Name())))
require.NoError(t, err)
assert.Equal(t, innerTestFileContent, string(contentBytes))

// Change the local size, then re-sync
innerDownloadFile := filepath.Join(innerDownloadDir, filepath.Base(innerTempFile.Name()))
log.Debugln("Overwriting old version of file", innerDownloadFile)
err = os.Remove(innerDownloadFile)
require.NoError(t, err)
err = os.WriteFile(innerDownloadFile, []byte("XXXX"), os.FileMode(0755))
require.NoError(t, err)
log.Debugln("Re-downloading file direct from origin")
transferDetailsDownload, err = client.DoGet(fed.Ctx, uploadURL+"?directread", downloadDir, true, client.WithTokenLocation(tempToken.Name()), client.WithSynchronize(client.SyncSize))
require.NoError(t, err)
assert.Len(t, transferDetailsDownload, 1)
contentBytes, err = os.ReadFile(filepath.Join(innerDownloadDir, filepath.Base(innerTempFile.Name())))
require.NoError(t, err)
assert.Equal(t, newTestFileContent, string(contentBytes))
})
}
Loading

0 comments on commit 72438ca

Please sign in to comment.