Skip to content

Commit

Permalink
feat: support google cloud storage as das
Browse files Browse the repository at this point in the history
  • Loading branch information
renlulu committed Sep 6, 2024
1 parent bff2ddd commit 283d2c9
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 49 deletions.
8 changes: 5 additions & 3 deletions das/das.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ type DataAvailabilityConfig struct {
LocalCache CacheConfig `koanf:"local-cache"`
RedisCache RedisConfig `koanf:"redis-cache"`

LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
LocalDBStorage LocalDBStorageConfig `koanf:"local-db-storage"`
LocalFileStorage LocalFileStorageConfig `koanf:"local-file-storage"`
S3Storage S3StorageServiceConfig `koanf:"s3-storage"`
GoogleCloudStorage GoogleCloudStorageServiceConfig `koanf:"google-cloud-storage"`

MigrateLocalDBToFileStorage bool `koanf:"migrate-local-db-to-file-storage"`

Expand Down Expand Up @@ -114,6 +115,7 @@ func dataAvailabilityConfigAddOptions(prefix string, f *flag.FlagSet, r role) {
LocalDBStorageConfigAddOptions(prefix+".local-db-storage", f)
LocalFileStorageConfigAddOptions(prefix+".local-file-storage", f)
S3ConfigAddOptions(prefix+".s3-storage", f)
GoogleCloudConfigAddOptions(prefix+".google-cloud-storage", f)
f.Bool(prefix+".migrate-local-db-to-file-storage", DefaultDataAvailabilityConfig.MigrateLocalDBToFileStorage, "daserver will migrate all data on startup from local-db-storage to local-file-storage, then mark local-db-storage as unusable")

// Key config for storage
Expand Down
9 changes: 9 additions & 0 deletions das/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ func CreatePersistentStorageService(
storageServices = append(storageServices, s)
}

if config.GoogleCloudStorage.Enable {
s, err := NewGoogleCloudStorageService(config.GoogleCloudStorage)
if err != nil {
return nil, nil, err
}
lifecycleManager.Register(s)
storageServices = append(storageServices, s)
}

if len(storageServices) > 1 {
s, err := NewRedundantStorageService(ctx, storageServices)
if err != nil {
Expand Down
126 changes: 126 additions & 0 deletions das/google_cloud_storage_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package das

import (
googlestorage "cloud.google.com/go/storage"
"context"
"encoding/hex"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/google/go-cmp/cmp"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/das/dastree"
"github.com/offchainlabs/nitro/util/pretty"
flag "github.com/spf13/pflag"
"google.golang.org/api/option"
"io"
"sort"
)

type GoogleCloudStorageServiceConfig struct {
Enable bool `koanf:"enable"`
AccessToken string `koanf:"access-token"`
Bucket string `koanf:"bucket"`
ObjectPrefix string `koanf:"object-prefix"`
DiscardAfterTimeout bool `koanf:"discard-after-timeout"`
}

var DefaultGoogleCloudStorageServiceConfig = GoogleCloudStorageServiceConfig{}

func GoogleCloudConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Bool(prefix+".enable", DefaultGoogleCloudStorageServiceConfig.Enable, "enable storage/retrieval of sequencer batch data from an Google Cloud Storage bucket")
f.String(prefix+".access-token", DefaultGoogleCloudStorageServiceConfig.AccessToken, "Google Cloud Storage access token")
f.String(prefix+".bucket", DefaultGoogleCloudStorageServiceConfig.Bucket, "Google Cloud Storage bucket")
f.String(prefix+".object-prefix", DefaultGoogleCloudStorageServiceConfig.ObjectPrefix, "prefix to add to Google Cloud Storage objects")
f.Bool(prefix+".discard-after-timeout", DefaultGoogleCloudStorageServiceConfig.DiscardAfterTimeout, "discard data after its expiry timeout")

}

type GoogleCloudStorageService struct {
client *googlestorage.Client
bucket string
objectPrefix string
discardAfterTimeout bool
}

func NewGoogleCloudStorageService(config GoogleCloudStorageServiceConfig) (StorageService, error) {
client, err := googlestorage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(config.AccessToken)))
if err != nil {
return nil, err
}
return &GoogleCloudStorageService{
client: client,
bucket: config.Bucket,
objectPrefix: config.ObjectPrefix,
discardAfterTimeout: config.DiscardAfterTimeout,
}, nil
}

func (gcs *GoogleCloudStorageService) Put(ctx context.Context, value []byte, timeout uint64) error {
logPut("das.GoogleCloudStorageService.Store", value, timeout, gcs)
bucket := gcs.client.Bucket(gcs.bucket).Object(gcs.objectPrefix + EncodeStorageServiceKey(dastree.Hash(value)))
w := bucket.NewWriter(ctx)
if _, err := fmt.Fprintln(w, hex.EncodeToString(value)); err != nil {
log.Error("das.GoogleCloudStorageService.Store", "err", err)
return err
}
return w.Close()
}

func (gcs *GoogleCloudStorageService) GetByHash(ctx context.Context, key common.Hash) ([]byte, error) {
log.Trace("das.GoogleCloudStorageService.GetByHash", "key", pretty.PrettyHash(key), "this", gcs)
bucket := gcs.client.Bucket(gcs.bucket).Object(gcs.objectPrefix + EncodeStorageServiceKey(key))
reader, err := bucket.NewReader(ctx)
if err != nil {
return nil, err
}
buf, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
return hex.DecodeString(string(buf))
}

func (gcs *GoogleCloudStorageService) ExpirationPolicy(ctx context.Context) (daprovider.ExpirationPolicy, error) {
if gcs.discardAfterTimeout {
return daprovider.DiscardAfterDataTimeout, nil
}
return daprovider.KeepForever, nil
}

func (gcs *GoogleCloudStorageService) Sync(ctx context.Context) error {
return nil
}

func (gcs *GoogleCloudStorageService) Close(ctx context.Context) error {
return gcs.client.Close()
}

func (gcs *GoogleCloudStorageService) String() string {
return fmt.Sprintf("GoogleCloudStorageService(:%s)", gcs.bucket)
}

func (gcs *GoogleCloudStorageService) HealthCheck(ctx context.Context) error {
bucket := gcs.client.Bucket(gcs.bucket)
// check if we have bucket permissions
permissions := []string{
"storage.buckets.get",
"storage.buckets.list",
"storage.objects.create",
"storage.objects.delete",
"storage.objects.list",
"storage.objects.get",
}
perms, err := bucket.IAM().TestPermissions(ctx, permissions)
if err != nil {
return err
}
sort.Strings(permissions)
sort.Strings(perms)
if !cmp.Equal(perms, permissions) {
return fmt.Errorf("permissions mismatch (-want +got):\n%s", cmp.Diff(permissions, perms))
}
// check if bucket exists (and others)
_, err = bucket.Attrs(ctx)
return err
}
56 changes: 40 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ replace github.com/VictoriaMetrics/fastcache => ./fastcache
replace github.com/ethereum/go-ethereum => ./go-ethereum

require (
cloud.google.com/go/storage v1.43.0
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
github.com/Shopify/toxiproxy v2.1.4+incompatible
github.com/alicebob/miniredis/v2 v2.32.1
Expand All @@ -30,7 +31,7 @@ require (
github.com/gobwas/ws-examples v0.0.0-20190625122829-a9e8908d9484
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.3.0
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.2.4
github.com/knadh/koanf v1.4.0
Expand All @@ -42,15 +43,38 @@ require (
github.com/spf13/pflag v1.0.5
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7
github.com/wealdtech/go-merkletree v1.0.0
golang.org/x/crypto v0.21.0
golang.org/x/crypto v0.24.0
golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa
golang.org/x/sys v0.18.0
golang.org/x/term v0.18.0
golang.org/x/tools v0.16.0
golang.org/x/sys v0.21.0
golang.org/x/term v0.21.0
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

require github.com/google/go-querystring v1.1.0 // indirect
require (
cloud.google.com/go v0.115.0 // indirect
cloud.google.com/go/auth v0.6.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
github.com/googleapis/gax-go/v2 v2.12.5 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
google.golang.org/api v0.187.0 // indirect
google.golang.org/genproto v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240617180043-68d350f18fd4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/grpc v1.64.0 // indirect
)

require (
github.com/DataDog/zstd v1.4.5 // indirect
Expand Down Expand Up @@ -107,9 +131,9 @@ require (
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/glog v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/google/go-github/v62 v62.0.0
Expand Down Expand Up @@ -159,13 +183,13 @@ require (
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.23.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.22.0
golang.org/x/sync v0.5.0
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
golang.org/x/sync v0.7.0
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
rsc.io/tmplfunc v0.0.3 // indirect
)
Loading

0 comments on commit 283d2c9

Please sign in to comment.