From fcbe7c46d22bb2e1dddf4fc7c8c0e5e7c1940fd1 Mon Sep 17 00:00:00 2001 From: Annie <108039750+annielz@users.noreply.github.com> Date: Thu, 18 Jan 2024 11:41:34 +0800 Subject: [PATCH] feat: update get object for speed (#219) * feat: hard code endpoint * fix: un-init endpoint * feat: test ap speed * feat: try not refresh storage providers * feat: use opt for get object endpoint * feat: pass in sp endpoint in new api client option * feat: cleanup feat: cleanup feat: cleanup * feat: modify param name and set directly in client * feat: add e2e test for get object with ForceToUseSpecifiedSpEndpointForDownloadOnly * feat: modify e2e test * feat: cleanup * fix: comment for ForceToUseSpecifiedSpEndpointForDownloadOnly in client and option --- client/api_client.go | 32 ++++++++++++++++--- client/api_object.go | 20 ++++++++---- e2e/e2e_storage_test.go | 69 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 10 deletions(-) diff --git a/client/api_client.go b/client/api_client.go index 692f3e2b..ffe9f408 100644 --- a/client/api_client.go +++ b/client/api_client.go @@ -73,6 +73,9 @@ type Client struct { offChainAuthOption *OffChainAuthOption useWebsocketConn bool expireSeconds uint64 + // forceToUseSpecifiedSpEndpointForDownloadOnly indicates a fixed SP endpoint to which to send the download request + // If this option is set, the client can only make download requests, and can only download from the fixed endpoint + forceToUseSpecifiedSpEndpointForDownloadOnly *url.URL } // Option - Configurations for providing optional parameters for the Greenfield SDK Client. @@ -96,6 +99,9 @@ type Option struct { UseWebSocketConn bool // ExpireSeconds indicates the number of seconds after which the authentication of the request sent to the SP will become invalid,the default value is 1000. ExpireSeconds uint64 + // ForceToUseSpecifiedSpEndpointForDownloadOnly indicates a fixed SP endpoint to which to send the download request + // If this option is set, the client can only make download requests, and can only download from the fixed endpoint + ForceToUseSpecifiedSpEndpointForDownloadOnly string } // OffChainAuthOption - The optional configurations for off-chain-auth. @@ -160,14 +166,32 @@ func New(chainID string, endpoint string, option Option) (IClient, error) { expireSeconds: option.ExpireSeconds, } - // fetch sp endpoints info from chain - err = c.refreshStorageProviders(context.Background()) + if option.ForceToUseSpecifiedSpEndpointForDownloadOnly != "" { + var useHttps bool + if strings.Contains(option.ForceToUseSpecifiedSpEndpointForDownloadOnly, "https") { + useHttps = true + } else { + useHttps = c.secure + } - if err != nil { - return nil, err + c.forceToUseSpecifiedSpEndpointForDownloadOnly, err = utils.GetEndpointURL(option.ForceToUseSpecifiedSpEndpointForDownloadOnly, useHttps) + if err != nil { + log.Error().Msg(fmt.Sprintf("fetch endpoint from option %s fail:%v", option.ForceToUseSpecifiedSpEndpointForDownloadOnly, err)) + return nil, err + } + } else { + // fetch sp endpoints info from chain + err = c.refreshStorageProviders(context.Background()) + + if err != nil { + return nil, err + } } // register off-chain-auth pubkey to all sps if option.OffChainAuthOption != nil { + if option.ForceToUseSpecifiedSpEndpointForDownloadOnly != "" { + return nil, errors.New("forceToUseSpecifiedSpEndpointForDownloadOnly option does not support OffChainAuthOption, please adjust option inputs and try again") + } if option.OffChainAuthOption.Seed == "" || option.OffChainAuthOption.Domain == "" { return nil, errors.New("seed and domain can't be empty in OffChainAuthOption") } diff --git a/client/api_object.go b/client/api_object.go index 259f5458..022cdcad 100644 --- a/client/api_object.go +++ b/client/api_object.go @@ -484,11 +484,12 @@ func (c *Client) FPutObject(ctx context.Context, bucketName, objectName, filePat func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, opts types.GetObjectOptions, ) (io.ReadCloser, types.ObjectStat, error) { - if err := s3util.CheckValidBucketName(bucketName); err != nil { + var err error + if err = s3util.CheckValidBucketName(bucketName); err != nil { return nil, types.ObjectStat{}, err } - if err := s3util.CheckValidObjectName(objectName); err != nil { + if err = s3util.CheckValidObjectName(objectName); err != nil { return nil, types.ObjectStat{}, err } @@ -507,10 +508,17 @@ func (c *Client) GetObject(ctx context.Context, bucketName, objectName string, disableCloseBody: true, } - endpoint, err := c.getSPUrlByBucket(bucketName) - if err != nil { - log.Error().Msg(fmt.Sprintf("route endpoint by bucket: %s failed, err: %s", bucketName, err.Error())) - return nil, types.ObjectStat{}, err + var endpoint *url.URL + + if c.forceToUseSpecifiedSpEndpointForDownloadOnly != nil { + endpoint = c.forceToUseSpecifiedSpEndpointForDownloadOnly + } else { + endpoint, err = c.getSPUrlByBucket(bucketName) + + if err != nil { + log.Error().Msg(fmt.Sprintf("route endpoint by bucket: %s failed, err: %s", bucketName, err.Error())) + return nil, types.ObjectStat{}, err + } } resp, err := c.sendReq(ctx, reqMeta, &sendOpt, endpoint) diff --git a/e2e/e2e_storage_test.go b/e2e/e2e_storage_test.go index 4fb0672e..c4ba6e8f 100644 --- a/e2e/e2e_storage_test.go +++ b/e2e/e2e_storage_test.go @@ -844,3 +844,72 @@ func (s *StorageTestSuite) Test_Object_And_Set_Tag() { s.Require().Equal(tags, *objectDetail.ObjectInfo.Tags) } } + +func (s *StorageTestSuite) Test_Get_Object_With_ForcedSpEndpoint() { + bucketName := storageTestUtil.GenRandomBucketName() + objectName := storageTestUtil.GenRandomObjectName() + + s.T().Logf("BucketName:%s, objectName: %s", bucketName, objectName) + + bucketTx, err := s.Client.CreateBucket(s.ClientContext, bucketName, s.PrimarySP.OperatorAddress, types.CreateBucketOptions{Visibility: storageTypes.VISIBILITY_TYPE_PUBLIC_READ}) + s.Require().NoError(err) + + _, err = s.Client.WaitForTx(s.ClientContext, bucketTx) + s.Require().NoError(err) + + bucketInfo, err := s.Client.HeadBucket(s.ClientContext, bucketName) + s.Require().NoError(err) + if err == nil { + s.Require().Equal(bucketInfo.Visibility, storageTypes.VISIBILITY_TYPE_PUBLIC_READ) + } + + var buffer bytes.Buffer + line := `1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,1234567890,123456789012` + // Create 1MiB content where each line contains 1024 characters. + for i := 0; i < 1024*300; i++ { + buffer.WriteString(fmt.Sprintf("[%05d] %s\n", i, line)) + } + + s.T().Log("---> CreateObject and HeadObject <---") + objectTx, err := s.Client.CreateObject(s.ClientContext, bucketName, objectName, bytes.NewReader(buffer.Bytes()), types.CreateObjectOptions{Visibility: storageTypes.VISIBILITY_TYPE_PUBLIC_READ}) + s.Require().NoError(err) + _, err = s.Client.WaitForTx(s.ClientContext, objectTx) + s.Require().NoError(err) + + time.Sleep(5 * time.Second) + objectDetail, err := s.Client.HeadObject(s.ClientContext, bucketName, objectName) + s.Require().NoError(err) + s.Require().Equal(objectDetail.ObjectInfo.ObjectName, objectName) + s.Require().Equal(objectDetail.ObjectInfo.GetObjectStatus().String(), "OBJECT_STATUS_CREATED") + + objectSize := int64(buffer.Len()) + s.T().Logf("---> PutObject and GetObject, objectName:%s objectSize:%d <---", objectName, objectSize) + err = s.Client.PutObject(s.ClientContext, bucketName, objectName, objectSize, + bytes.NewReader(buffer.Bytes()), types.PutObjectOptions{}) + s.Require().NoError(err) + + s.WaitSealObject(bucketName, objectName) + + s.T().Log("---> client.New with ForceToUseSpecifiedSpEndpointForDownloadOnly option param filled <---") + origClient := s.Client + s.Client, err = client.New(basesuite.ChainID, basesuite.Endpoint, client.Option{ + DefaultAccount: s.DefaultAccount, + ForceToUseSpecifiedSpEndpointForDownloadOnly: s.PrimarySP.Endpoint, + }) + s.Require().NoError(err) + + s.T().Log("---> get object with ForceToUseSpecifiedSpEndpointForDownloadOnly <---") + objectContent, _, err := s.Client.GetObject(s.ClientContext, bucketName, objectName, types.GetObjectOptions{}) + if err != nil { + fmt.Printf("error: %v", err) + quota2, _ := s.Client.GetBucketReadQuota(s.ClientContext, bucketName) + fmt.Printf("quota: %v", quota2) + } + objectBytes, err := io.ReadAll(objectContent) + s.Require().NoError(err) + s.Require().Equal(objectBytes, buffer.Bytes()) + s.Require().NoError(err) + + s.T().Log("---> restore client without ForceToUseSpecifiedSpEndpointForDownloadOnly option param <---") + s.Client = origClient +}