From f6eff433c5c61ef75486e3b3dff14a757dec3f70 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Tue, 4 Jul 2023 17:04:50 +0200 Subject: [PATCH 1/4] fix: pubsub discovery Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- go.mod | 1 + go.sum | 2 ++ service.go | 41 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 810654f1..9a030624 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/aead/ecdh v0.2.0 github.com/berty/emitter-go v0.0.0-20221031144724-5dae963c3622 github.com/berty/go-libp2p-mock v1.0.1 + github.com/berty/go-libp2p-pubsub v0.9.4-0.20230706070911-6e35c0f470b8 github.com/berty/go-libp2p-rendezvous v0.5.0 github.com/buicongtan1997/protoc-gen-swagger-config v0.0.0-20200705084907-1342b78c1a7e github.com/daixiang0/gci v0.8.2 diff --git a/go.sum b/go.sum index cd3c30e0..e89c3e16 100644 --- a/go.sum +++ b/go.sum @@ -120,6 +120,8 @@ github.com/berty/emitter-go v0.0.0-20221031144724-5dae963c3622 h1:kJqfCXKR5EJdh9 github.com/berty/emitter-go v0.0.0-20221031144724-5dae963c3622/go.mod h1:G66sIy+q6BKIoKoKNqFU7sxSnrS5d8Z8meQ3Iu0ZJ4o= github.com/berty/go-libp2p-mock v1.0.1 h1:2lsXlZOQvELcvrkHTlK2t2965sg7pkr1sWrucH6S2zg= github.com/berty/go-libp2p-mock v1.0.1/go.mod h1:PsUBOq6zSAjXKLlSKRzhkIpjm7ZxGgjLn/FZY+3CoKs= +github.com/berty/go-libp2p-pubsub v0.9.4-0.20230706070911-6e35c0f470b8 h1:04xF+6wtZhEIHv+1ghqd7hhCePdKmPykJWSjEyy0uT4= +github.com/berty/go-libp2p-pubsub v0.9.4-0.20230706070911-6e35c0f470b8/go.mod h1:d/76O2W9ZLphGN2Q7Q1QSolJGV3zaO8D+da3Bdpk5d0= github.com/berty/go-libp2p-rendezvous v0.5.0 h1:eT8fBh+OewfTB6uAA47rzdgAVo/aevW4TA7PvZq75T4= github.com/berty/go-libp2p-rendezvous v0.5.0/go.mod h1:gkDEobp0lV+DHfNzO7+kCJxApO0vmpOEV+Z5uttZYGM= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= diff --git a/service.go b/service.go index 07de57db..aaea73ba 100644 --- a/service.go +++ b/service.go @@ -4,12 +4,15 @@ import ( "context" "encoding/hex" "fmt" + "math/rand" mrand "math/rand" "path/filepath" "sync" "sync/atomic" "time" + "unsafe" + pubsub_fix "github.com/berty/go-libp2p-pubsub" ds "github.com/ipfs/go-datastore" ds_sync "github.com/ipfs/go-datastore/sync" ipfs_interface "github.com/ipfs/interface-go-ipfs-core" @@ -18,6 +21,7 @@ import ( "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + backoff "github.com/libp2p/go-libp2p/p2p/discovery/backoff" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -29,6 +33,7 @@ import ( "berty.tech/go-orbit-db/baseorbitdb" "berty.tech/go-orbit-db/iface" "berty.tech/go-orbit-db/pubsub/directchannel" + "berty.tech/go-orbit-db/pubsub/pubsubraw" "berty.tech/weshnet/internal/bertyversion" "berty.tech/weshnet/internal/datastoreutil" "berty.tech/weshnet/pkg/bertyvcissuer" @@ -168,7 +173,7 @@ func (opts *Opts) applyDefaults(ctx context.Context) error { mrepo := ipfs_mobile.NewRepoMobile("", repo) mnode, err = ipfsutil.NewIPFSMobile(ctx, mrepo, &ipfsutil.MobileOptions{ ExtraOpts: map[string]bool{ - "pubsub": true, + "pubsub": false, }, }) if err != nil { @@ -218,15 +223,49 @@ func (opts *Opts) applyDefaults(ctx context.Context) error { } } + if opts.PubSub == nil { + var err error + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + popts := []pubsub_fix.Option{ + pubsub_fix.WithMessageSigning(true), + pubsub_fix.WithPeerExchange(true), + } + + backoffstrat := backoff.NewExponentialBackoff( + time.Second*10, time.Hour, + backoff.FullJitter, + time.Second, 10.0, 0, rng) + + cacheSize := 100 + dialTimeout := time.Second * 20 + backoffconnector := func(host host.Host) (*backoff.BackoffConnector, error) { + return backoff.NewBackoffConnector(host, cacheSize, dialTimeout, backoffstrat) + } + + adaptater := tinder.NewDiscoveryAdaptater(opts.Logger.Named("disc"), opts.TinderService) + popts = append(popts, pubsub_fix.WithDiscovery(adaptater, pubsub_fix.WithDiscoverConnector(backoffconnector))) + + // pubsub.DiscoveryPollInterval = m.Node.Protocol.PollInterval + ps, err := pubsub_fix.NewGossipSub(ctx, opts.Host, popts...) + if err != nil { + return fmt.Errorf("unable to init gossipsub: %w", err) + } + + opts.PubSub = (*pubsub.PubSub)(unsafe.Pointer(ps)) + } + if opts.OrbitDB == nil { orbitDirectory := InMemoryDirectory if opts.DatastoreDir != InMemoryDirectory { orbitDirectory = filepath.Join(opts.DatastoreDir, NamespaceOrbitDBDirectory) } + pubsub := pubsubraw.NewPubSub(opts.PubSub, opts.Host.ID(), opts.Logger, nil) odbOpts := &NewOrbitDBOptions{ NewOrbitDBOptions: baseorbitdb.NewOrbitDBOptions{ Directory: &orbitDirectory, + PubSub: pubsub, Logger: opts.Logger, }, PrometheusRegister: opts.PrometheusRegister, From 31813a91d27f62cabe06e8796184953e1c9e920b Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Wed, 5 Jul 2023 10:27:10 +0200 Subject: [PATCH 2/4] fix: disable pubsub by default on ipfs node Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- pkg/ipfsutil/mobile.go | 10 ++++++++++ service.go | 6 +----- service_client.go | 6 +----- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/ipfsutil/mobile.go b/pkg/ipfsutil/mobile.go index f2af4e9a..6f88c4a7 100644 --- a/pkg/ipfsutil/mobile.go +++ b/pkg/ipfsutil/mobile.go @@ -54,6 +54,16 @@ func (o *MobileOptions) fillDefault() { if o.IpfsConfigPatch == nil { o.IpfsConfigPatch = defaultIpfsConfigPatch } + + // apply default extras + if o.ExtraOpts == nil { + o.ExtraOpts = make(map[string]bool) + } + + // if not set, disable pubsub by default to avoid collision + if _, ok := o.ExtraOpts["pubsub"]; !ok { + o.ExtraOpts["pubsub"] = false + } } func NewIPFSMobile(ctx context.Context, repo *ipfs_mobile.RepoMobile, opts *MobileOptions) (*ipfs_mobile.IpfsMobile, error) { diff --git a/service.go b/service.go index aaea73ba..57942a65 100644 --- a/service.go +++ b/service.go @@ -171,11 +171,7 @@ func (opts *Opts) applyDefaults(ctx context.Context) error { } mrepo := ipfs_mobile.NewRepoMobile("", repo) - mnode, err = ipfsutil.NewIPFSMobile(ctx, mrepo, &ipfsutil.MobileOptions{ - ExtraOpts: map[string]bool{ - "pubsub": false, - }, - }) + mnode, err = ipfsutil.NewIPFSMobile(ctx, mrepo, &ipfsutil.MobileOptions{}) if err != nil { return err } diff --git a/service_client.go b/service_client.go index d02aa1ad..8b7e1301 100644 --- a/service_client.go +++ b/service_client.go @@ -99,11 +99,7 @@ func NewPersistentServiceClient(path string) (ServiceClient, error) { } mrepo := ipfs_mobile.NewRepoMobile(path, repo) - mnode, err := ipfsutil.NewIPFSMobile(context.TODO(), mrepo, &ipfsutil.MobileOptions{ - ExtraOpts: map[string]bool{ - "pubsub": true, - }, - }) + mnode, err := ipfsutil.NewIPFSMobile(context.TODO(), mrepo, &ipfsutil.MobileOptions{}) if err != nil { return nil, err } From 2d23bdd454fc3dcbd181fd27b1254982c8408f5f Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Thu, 6 Jul 2023 09:23:14 +0200 Subject: [PATCH 3/4] chore: add note on pubsub fix Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- service.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/service.go b/service.go index 57942a65..af1c604a 100644 --- a/service.go +++ b/service.go @@ -248,6 +248,9 @@ func (opts *Opts) applyDefaults(ctx context.Context) error { return fmt.Errorf("unable to init gossipsub: %w", err) } + // @NOTE(gfanton): we need to force cast here until our fix is push + // upstream on the original go-libp2p-pubsub + // see: https://github.com/gfanton/go-libp2p-pubsub/commit/8f4fd394f8dfcb3a5eb724a03f9e4e1e33194cbd opts.PubSub = (*pubsub.PubSub)(unsafe.Pointer(ps)) } From cff7aaf5caa1673dd9e37ce5aa45f2e033235f23 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Thu, 6 Jul 2023 09:33:01 +0200 Subject: [PATCH 4/4] fix: unsecure rand Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- service.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/service.go b/service.go index af1c604a..c7169f6e 100644 --- a/service.go +++ b/service.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "fmt" - "math/rand" mrand "math/rand" "path/filepath" "sync" @@ -140,6 +139,8 @@ func (opts *Opts) applyDefaults(ctx context.Context) error { opts.Logger = zap.NewNop() } + rng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand + opts.applyDefaultsGetDatastore() if err := opts.applyPushDefaults(); err != nil { @@ -198,7 +199,6 @@ func (opts *Opts) applyDefaults(ctx context.Context) error { // setup default tinder service if opts.TinderService == nil { - rng := mrand.New(mrand.NewSource(srand.MustSecure())) // nolint:gosec // we need to use math/rand here, but it is seeded from crypto/rand drivers := []tinder.IDriver{} // setup loac disc @@ -222,7 +222,6 @@ func (opts *Opts) applyDefaults(ctx context.Context) error { if opts.PubSub == nil { var err error - rng := rand.New(rand.NewSource(time.Now().UnixNano())) popts := []pubsub_fix.Option{ pubsub_fix.WithMessageSigning(true), pubsub_fix.WithPeerExchange(true),