Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: when entering idle mode, the old resolve-related resources have … #3402

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions contrib/registry/consul/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
services: &atomic.Value{},
serviceName: name,
}
set.ctx, set.cancel = context.WithCancel(context.Background())
r.registry[name] = set
}

Expand All @@ -209,10 +208,8 @@ func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, er
// otherwise the initial data may be blocked forever during the watch.
w.event <- struct{}{}
}
if !ok {
if err := r.resolve(set.ctx, set); err != nil {
return nil, err
}
if err := r.resolve(ctx, set); err != nil {
return nil, err
}
return w, nil
}
Expand Down Expand Up @@ -248,9 +245,6 @@ func (r *Registry) resolve(ctx context.Context, ss *serviceSet) error {
}
idx = tmpIdx
case <-ctx.Done():
r.lock.Lock()
delete(r.registry, ss.serviceName)
r.lock.Unlock()
return
}
}
Expand Down
156 changes: 144 additions & 12 deletions contrib/registry/consul/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,17 @@ func TestRegistry_IdleAndWatch(t *testing.T) {
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
instance2 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.2",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}

type args struct {
ctx context.Context
instance *registry.ServiceInstance
ctx context.Context
instance *registry.ServiceInstance
changeInstance *registry.ServiceInstance
}

tests := []struct {
Expand All @@ -447,8 +454,9 @@ func TestRegistry_IdleAndWatch(t *testing.T) {
{
name: "many client, one idle",
args: args{
ctx: context.Background(),
instance: instance1,
ctx: context.Background(),
instance: instance1,
changeInstance: instance2,
},
want: []*registry.ServiceInstance{instance1},
wantErr: false,
Expand Down Expand Up @@ -506,9 +514,7 @@ func TestRegistry_IdleAndWatch(t *testing.T) {
}
}
time.Sleep(2 * time.Second)
change := tt.args.instance
change.Version = "v0.0.2"
err = r.Register(tt.args.ctx, change)
err = r.Register(tt.args.ctx, tt.args.changeInstance)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -576,7 +582,7 @@ func TestRegistry_IdleAndWatch2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 10; i++ {
stopCtx, stopCancel := context.WithCancel(ctx)
watch, err1 := r.Watch(context.Background(), tt.args.instance.Name)
watch, err1 := r.Watch(stopCtx, tt.args.instance.Name)
if err1 != nil {
t.Error(err1)
}
Expand All @@ -588,10 +594,6 @@ func TestRegistry_IdleAndWatch2(t *testing.T) {
t.Errorf("GetService() got = %v", service)
return
}
_, err2 = watch.Next()
if err2 == nil {
t.Errorf("watch exit exception:%d ", i)
}
}(i)
go func() {
select {
Expand Down Expand Up @@ -633,6 +635,136 @@ func TestRegistry_IdleAndWatch2(t *testing.T) {
}
}

func TestRegistry_ExitOldResolverAndReWatch(t *testing.T) {
addr := fmt.Sprintf("%s:9091", getIntranetIP())

time.Sleep(time.Millisecond * 100)
cli, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500", WaitTime: 2 * time.Second})
if err != nil {
t.Fatalf("create consul client failed: %v", err)
}

instance1 := &registry.ServiceInstance{
ID: "1",
Name: "server-1",
Version: "v0.0.1",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
instance2 := &registry.ServiceInstance{
ID: "2",
Name: "server-1",
Version: "v0.0.2",
Endpoints: []string{fmt.Sprintf("tcp://%s?isSecure=false", addr)},
}
type args struct {
ctx context.Context
opts []Option
instance *registry.ServiceInstance
initialInstance *registry.ServiceInstance
}

tests := []struct {
name string
args args
want []*registry.ServiceInstance
wantErr bool
}{
{
name: "When it has entered idle mode, but the old resolver has not completely exited, the watch will be re-established due to new requests coming in.",
args: args{
ctx: context.Background(),
initialInstance: instance1,
instance: instance2,
opts: []Option{
WithHealthCheck(false),
WithTimeout(time.Second * 2),
},
},
want: []*registry.ServiceInstance{instance2},
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := New(cli, tt.args.opts...)

err = r.Register(tt.args.ctx, tt.args.initialInstance)
if err != nil {
t.Error(err)
}
// first watch
ctx, cancel := context.WithCancel(context.Background())
watch, err := r.Watch(ctx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
service, err := watch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
}

time.Sleep(time.Second * 3)
// The simulation entered idle mode first, but the old resolver was not closed yet, and new requests triggered a new Watch.
watchCtx := context.Background()
// old resolver cancel
err = watch.Stop()
if err != nil {
t.Errorf("watch stop err:%v", err)
}
cancel()
// If it sleeps for a period of time, the old resolve goroutine will exit before the new Watch is processed, and there will be no problems at this time.
// time.Sleep(time.Second * 8)
newWatch, err := r.Watch(watchCtx, tt.args.instance.Name)
if err != nil {
t.Error(err)
}
service, err = newWatch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
}
// change register info
time.Sleep(time.Second * 1)
err = r.Deregister(tt.args.ctx, tt.args.initialInstance)
if err != nil {
t.Error(err)
}
time.Sleep(time.Second * 5)
err = r.Register(tt.args.ctx, tt.args.instance)
if err != nil {
t.Error(err)
}

time.Sleep(time.Second * 2)

newWatchCtx, newWatchCancel := context.WithCancel(context.Background())
c := make(chan struct{}, 1)

go func() {
service, err = newWatch.Next()
if (err != nil) != tt.wantErr {
t.Errorf("GetService() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("GetService() got = %v", service)
return
}
if !reflect.DeepEqual(service, tt.want) {
t.Errorf("GetService() got = %v, want %v", service, tt.want)
}
c <- struct{}{}
}()
time.AfterFunc(time.Second*10, newWatchCancel)
select {
case <-newWatchCtx.Done():
t.Errorf("Timeout getservice. May be no new resolve goroutine to obtain the latest service information")
case <-c:
return
}
})
}
}

func getIntranetIP() string {
addrs, err := net.InterfaceAddrs()
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions contrib/registry/consul/service.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package consul

import (
"context"
"sync"
"sync/atomic"

Expand All @@ -13,9 +12,6 @@ type serviceSet struct {
watcher map[*watcher]struct{}
services *atomic.Value
lock sync.RWMutex

ctx context.Context
cancel context.CancelFunc
}

func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) {
Expand Down
4 changes: 0 additions & 4 deletions contrib/registry/consul/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,5 @@ func (w *watcher) Stop() error {
w.set.lock.Lock()
defer w.set.lock.Unlock()
delete(w.set.watcher, w)
// close resolve
if len(w.set.watcher) == 0 {
w.set.cancel()
}
return nil
}
Loading