diff --git a/api/manager/docs.go b/api/manager/docs.go
index e7744b9ac9c..cd051838885 100644
--- a/api/manager/docs.go
+++ b/api/manager/docs.go
@@ -3945,6 +3945,9 @@ var doc = `{
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
+ "is_default": {
+ "type": "boolean"
+ },
"name": {
"type": "string"
}
@@ -4348,6 +4351,9 @@ var doc = `{
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
+ "is_default": {
+ "type": "boolean"
+ },
"name": {
"type": "string"
}
diff --git a/api/manager/swagger.json b/api/manager/swagger.json
index e5ece420e72..829403a0c6a 100644
--- a/api/manager/swagger.json
+++ b/api/manager/swagger.json
@@ -3931,6 +3931,9 @@
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
+ "is_default": {
+ "type": "boolean"
+ },
"name": {
"type": "string"
}
@@ -4334,6 +4337,9 @@
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
+ "is_default": {
+ "type": "boolean"
+ },
"name": {
"type": "string"
}
diff --git a/api/manager/swagger.yaml b/api/manager/swagger.yaml
index 52a3893d74b..9f0434e34b0 100644
--- a/api/manager/swagger.yaml
+++ b/api/manager/swagger.yaml
@@ -317,6 +317,8 @@ definitions:
type: string
config:
$ref: '#/definitions/types.CDNClusterConfig'
+ is_default:
+ type: boolean
name:
type: string
required:
@@ -591,6 +593,8 @@ definitions:
type: string
config:
$ref: '#/definitions/types.CDNClusterConfig'
+ is_default:
+ type: boolean
name:
type: string
type: object
diff --git a/client/config/dynconfig.go b/client/config/dynconfig.go
new file mode 100644
index 00000000000..5f458681499
--- /dev/null
+++ b/client/config/dynconfig.go
@@ -0,0 +1,199 @@
+/*
+ * Copyright 2020 The Dragonfly Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "os"
+ "path/filepath"
+ "time"
+
+ logger "d7y.io/dragonfly/v2/internal/dflog"
+ "d7y.io/dragonfly/v2/internal/dfpath"
+ internaldynconfig "d7y.io/dragonfly/v2/internal/dynconfig"
+ "d7y.io/dragonfly/v2/manager/searcher"
+ "d7y.io/dragonfly/v2/pkg/rpc/manager"
+ managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
+)
+
+var (
+ // Dynconfig configure the cache path
+ cachePath = filepath.Join(dfpath.DefaultCacheDir, "daemon_dynconfig")
+
+ // Watch dynconfig interval
+ watchInterval = 5 * time.Second
+)
+
+type DynconfigData struct {
+ Schedulers []*manager.Scheduler
+}
+
+type Dynconfig interface {
+ // Get the dynamic config from manager.
+ GetSchedulers() ([]*manager.Scheduler, error)
+
+ // Get the dynamic config from manager.
+ Get() (*DynconfigData, error)
+
+ // Register allows an instance to register itself to listen/observe events.
+ Register(Observer)
+
+ // Deregister allows an instance to remove itself from the collection of observers/listeners.
+ Deregister(Observer)
+
+ // Notify publishes new events to listeners.
+ Notify() error
+
+ // Serve the dynconfig listening service.
+ Serve() error
+
+ // Stop the dynconfig listening service.
+ Stop() error
+}
+
+type Observer interface {
+ // OnNotify allows an event to be "published" to interface implementations.
+ OnNotify(*DynconfigData)
+}
+
+type dynconfig struct {
+ *internaldynconfig.Dynconfig
+ observers map[Observer]struct{}
+ done chan bool
+}
+
+func NewDynconfig(managerClient internaldynconfig.ManagerClient, expire time.Duration) (Dynconfig, error) {
+ client, err := internaldynconfig.New(
+ internaldynconfig.ManagerSourceType,
+ internaldynconfig.WithManagerClient(managerClient),
+ internaldynconfig.WithExpireTime(expire),
+ internaldynconfig.WithCachePath(cachePath),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ return &dynconfig{
+ observers: map[Observer]struct{}{},
+ done: make(chan bool),
+ Dynconfig: client,
+ }, nil
+}
+
+func (d *dynconfig) GetSchedulers() ([]*manager.Scheduler, error) {
+ data, err := d.Get()
+ if err != nil {
+ return nil, err
+ }
+
+ return data.Schedulers, nil
+}
+
+func (d *dynconfig) Get() (*DynconfigData, error) {
+ var data DynconfigData
+ if err := d.Unmarshal(&data); err != nil {
+ return nil, err
+ }
+
+ return &data, nil
+}
+
+func (d *dynconfig) Register(l Observer) {
+ d.observers[l] = struct{}{}
+}
+
+func (d *dynconfig) Deregister(l Observer) {
+ delete(d.observers, l)
+}
+
+func (d *dynconfig) Notify() error {
+ data, err := d.Get()
+ if err != nil {
+ return err
+ }
+
+ for o := range d.observers {
+ o.OnNotify(data)
+ }
+
+ return nil
+}
+
+func (d *dynconfig) Serve() error {
+ if err := d.Notify(); err != nil {
+ return err
+ }
+
+ go d.watch()
+
+ return nil
+}
+
+func (d *dynconfig) watch() {
+ tick := time.NewTicker(watchInterval)
+
+ for {
+ select {
+ case <-tick.C:
+ if err := d.Notify(); err != nil {
+ logger.Error("dynconfig notify failed", err)
+ }
+ case <-d.done:
+ return
+ }
+ }
+}
+
+func (d *dynconfig) Stop() error {
+ close(d.done)
+ if err := os.Remove(cachePath); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+type managerClient struct {
+ managerclient.Client
+ hostOption HostOption
+}
+
+// New the manager client used by dynconfig
+func NewManagerClient(client managerclient.Client, hostOption HostOption) internaldynconfig.ManagerClient {
+ return &managerClient{
+ Client: client,
+ hostOption: hostOption,
+ }
+}
+
+func (mc *managerClient) Get() (interface{}, error) {
+ schedulers, err := mc.ListSchedulers(&manager.ListSchedulersRequest{
+ SourceType: manager.SourceType_CLIENT_SOURCE,
+ HostName: mc.hostOption.Hostname,
+ Ip: mc.hostOption.ListenIP,
+ HostInfo: map[string]string{
+ searcher.ConditionSecurityDomain: mc.hostOption.SecurityDomain,
+ searcher.ConditionIDC: mc.hostOption.IDC,
+ searcher.ConditionNetTopology: mc.hostOption.NetTopology,
+ searcher.ConditionLocation: mc.hostOption.Location,
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ return schedulers, nil
+}
diff --git a/client/config/dynconfig_test.go b/client/config/dynconfig_test.go
new file mode 100644
index 00000000000..af73ae94648
--- /dev/null
+++ b/client/config/dynconfig_test.go
@@ -0,0 +1,475 @@
+/*
+ * Copyright 2020 The Dragonfly Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+import (
+ "os"
+ "testing"
+ "time"
+
+ "github.com/golang/mock/gomock"
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+
+ "d7y.io/dragonfly/v2/client/config/mocks"
+ "d7y.io/dragonfly/v2/pkg/rpc/manager"
+)
+
+func TestDynconfigNewDynconfig(t *testing.T) {
+ tests := []struct {
+ name string
+ expire time.Duration
+ hostOption HostOption
+ cleanFileCache func(t *testing.T)
+ mock func(m *mocks.MockClientMockRecorder)
+ expect func(t *testing.T, err error)
+ }{
+ {
+ name: "new dynconfig succeeded",
+ expire: 10 * time.Second,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder) {
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1)
+ },
+ expect: func(t *testing.T, err error) {
+ assert := assert.New(t)
+ assert.NoError(err)
+ },
+ },
+ {
+ name: "new dynconfig without empty host option",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{},
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder) {
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1)
+ },
+ expect: func(t *testing.T, err error) {
+ assert := assert.New(t)
+ assert.NoError(err)
+ },
+ },
+ {
+ name: "new dynconfig with list scheduler error",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{},
+ cleanFileCache: func(t *testing.T) {},
+ mock: func(m *mocks.MockClientMockRecorder) {
+ m.ListSchedulers(gomock.Any()).Return(nil, errors.New("foo")).Times(1)
+ },
+ expect: func(t *testing.T, err error) {
+ assert := assert.New(t)
+ assert.Errorf(err, "foo")
+ },
+ },
+ {
+ name: "new dynconfig without expire time",
+ expire: 0,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ cleanFileCache: func(t *testing.T) {},
+ mock: func(m *mocks.MockClientMockRecorder) {},
+ expect: func(t *testing.T, err error) {
+ assert := assert.New(t)
+ assert.Errorf(err, "missing parameter Expire, use method WithExpireTime to assign")
+ },
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ ctl := gomock.NewController(t)
+ defer ctl.Finish()
+
+ mockManagerClient := mocks.NewMockClient(ctl)
+ tc.mock(mockManagerClient.EXPECT())
+ _, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire)
+ tc.expect(t, err)
+ tc.cleanFileCache(t)
+ })
+ }
+}
+
+func TestDynconfigGet(t *testing.T) {
+ tests := []struct {
+ name string
+ expire time.Duration
+ hostOption HostOption
+ data *DynconfigData
+ sleep func()
+ cleanFileCache func(t *testing.T)
+ mock func(m *mocks.MockClientMockRecorder, data *DynconfigData)
+ expect func(t *testing.T, dynconfig Dynconfig, data *DynconfigData)
+ }{
+ {
+ name: "get dynconfig cache data succeeded",
+ expire: 10 * time.Second,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: "foo",
+ },
+ },
+ },
+ sleep: func() {},
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: data.Schedulers[0].HostName,
+ },
+ },
+ }, nil).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.Get()
+ assert.NoError(err)
+ assert.EqualValues(result, data)
+ },
+ },
+ {
+ name: "get dynconfig data succeeded",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: "foo",
+ },
+ },
+ },
+ sleep: func() {
+ time.Sleep(100 * time.Millisecond)
+ },
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1),
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: data.Schedulers[0].HostName,
+ },
+ },
+ }, nil).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.Get()
+ assert.NoError(err)
+ assert.EqualValues(result, data)
+ },
+ },
+ {
+ name: "list schedulers error",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: "foo",
+ },
+ },
+ },
+ sleep: func() {
+ time.Sleep(100 * time.Millisecond)
+ },
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: data.Schedulers[0].HostName,
+ },
+ },
+ }, nil).Times(1),
+ m.ListSchedulers(gomock.Any()).Return(nil, errors.New("foo")).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.Get()
+ assert.NoError(err)
+ assert.EqualValues(result, data)
+ },
+ },
+ {
+ name: "list schedulers empty",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler(nil),
+ },
+ sleep: func() {
+ time.Sleep(100 * time.Millisecond)
+ },
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1),
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.Get()
+ assert.NoError(err)
+ assert.EqualValues(result, data)
+ },
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ ctl := gomock.NewController(t)
+ defer ctl.Finish()
+
+ mockManagerClient := mocks.NewMockClient(ctl)
+ tc.mock(mockManagerClient.EXPECT(), tc.data)
+ dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ tc.sleep()
+ tc.expect(t, dynconfig, tc.data)
+ tc.cleanFileCache(t)
+ })
+ }
+}
+
+func TestDynconfigGetSchedulers(t *testing.T) {
+ tests := []struct {
+ name string
+ expire time.Duration
+ hostOption HostOption
+ data *DynconfigData
+ sleep func()
+ cleanFileCache func(t *testing.T)
+ mock func(m *mocks.MockClientMockRecorder, data *DynconfigData)
+ expect func(t *testing.T, dynconfig Dynconfig, data *DynconfigData)
+ }{
+ {
+ name: "get cache schedulers succeeded",
+ expire: 10 * time.Second,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: "foo",
+ },
+ },
+ },
+ sleep: func() {},
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: data.Schedulers[0].HostName,
+ },
+ },
+ }, nil).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.GetSchedulers()
+ assert.NoError(err)
+ assert.EqualValues(result, data.Schedulers)
+ },
+ },
+ {
+ name: "get schedulers succeeded",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: "foo",
+ },
+ },
+ },
+ sleep: func() {
+ time.Sleep(100 * time.Millisecond)
+ },
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1),
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: data.Schedulers[0].HostName,
+ },
+ },
+ }, nil).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.GetSchedulers()
+ assert.NoError(err)
+ assert.EqualValues(result, data.Schedulers)
+ },
+ },
+ {
+ name: "list schedulers error",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: "foo",
+ },
+ },
+ },
+ sleep: func() {
+ time.Sleep(100 * time.Millisecond)
+ },
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{
+ Schedulers: []*manager.Scheduler{
+ {
+ HostName: data.Schedulers[0].HostName,
+ },
+ },
+ }, nil).Times(1),
+ m.ListSchedulers(gomock.Any()).Return(nil, errors.New("foo")).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.GetSchedulers()
+ assert.NoError(err)
+ assert.EqualValues(result, data.Schedulers)
+ },
+ },
+ {
+ name: "list schedulers empty",
+ expire: 10 * time.Millisecond,
+ hostOption: HostOption{
+ Hostname: "foo",
+ },
+ data: &DynconfigData{
+ Schedulers: []*manager.Scheduler(nil),
+ },
+ sleep: func() {
+ time.Sleep(100 * time.Millisecond)
+ },
+ cleanFileCache: func(t *testing.T) {
+ if err := os.Remove(cachePath); err != nil {
+ t.Fatal(err)
+ }
+ },
+ mock: func(m *mocks.MockClientMockRecorder, data *DynconfigData) {
+ gomock.InOrder(
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1),
+ m.ListSchedulers(gomock.Any()).Return(&manager.ListSchedulersResponse{}, nil).Times(1),
+ )
+ },
+ expect: func(t *testing.T, dynconfig Dynconfig, data *DynconfigData) {
+ assert := assert.New(t)
+ result, err := dynconfig.GetSchedulers()
+ assert.NoError(err)
+ assert.EqualValues(result, data.Schedulers)
+ },
+ },
+ }
+
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ ctl := gomock.NewController(t)
+ defer ctl.Finish()
+
+ mockManagerClient := mocks.NewMockClient(ctl)
+ tc.mock(mockManagerClient.EXPECT(), tc.data)
+ dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ tc.sleep()
+ tc.expect(t, dynconfig, tc.data)
+ tc.cleanFileCache(t)
+ })
+ }
+}
diff --git a/client/config/mocks/manager_client_mock.go b/client/config/mocks/manager_client_mock.go
new file mode 100644
index 00000000000..45f5d10ec6e
--- /dev/null
+++ b/client/config/mocks/manager_client_mock.go
@@ -0,0 +1,122 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: d7y.io/dragonfly/v2/pkg/rpc/manager/client (interfaces: Client)
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+ reflect "reflect"
+ time "time"
+
+ manager "d7y.io/dragonfly/v2/pkg/rpc/manager"
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockClient is a mock of Client interface.
+type MockClient struct {
+ ctrl *gomock.Controller
+ recorder *MockClientMockRecorder
+}
+
+// MockClientMockRecorder is the mock recorder for MockClient.
+type MockClientMockRecorder struct {
+ mock *MockClient
+}
+
+// NewMockClient creates a new mock instance.
+func NewMockClient(ctrl *gomock.Controller) *MockClient {
+ mock := &MockClient{ctrl: ctrl}
+ mock.recorder = &MockClientMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockClient) EXPECT() *MockClientMockRecorder {
+ return m.recorder
+}
+
+// Close mocks base method.
+func (m *MockClient) Close() error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Close")
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// Close indicates an expected call of Close.
+func (mr *MockClientMockRecorder) Close() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close))
+}
+
+// GetScheduler mocks base method.
+func (m *MockClient) GetScheduler(arg0 *manager.GetSchedulerRequest) (*manager.Scheduler, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "GetScheduler", arg0)
+ ret0, _ := ret[0].(*manager.Scheduler)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// GetScheduler indicates an expected call of GetScheduler.
+func (mr *MockClientMockRecorder) GetScheduler(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetScheduler", reflect.TypeOf((*MockClient)(nil).GetScheduler), arg0)
+}
+
+// KeepAlive mocks base method.
+func (m *MockClient) KeepAlive(arg0 time.Duration, arg1 *manager.KeepAliveRequest) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "KeepAlive", arg0, arg1)
+}
+
+// KeepAlive indicates an expected call of KeepAlive.
+func (mr *MockClientMockRecorder) KeepAlive(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "KeepAlive", reflect.TypeOf((*MockClient)(nil).KeepAlive), arg0, arg1)
+}
+
+// ListSchedulers mocks base method.
+func (m *MockClient) ListSchedulers(arg0 *manager.ListSchedulersRequest) (*manager.ListSchedulersResponse, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "ListSchedulers", arg0)
+ ret0, _ := ret[0].(*manager.ListSchedulersResponse)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// ListSchedulers indicates an expected call of ListSchedulers.
+func (mr *MockClientMockRecorder) ListSchedulers(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListSchedulers", reflect.TypeOf((*MockClient)(nil).ListSchedulers), arg0)
+}
+
+// UpdateCDN mocks base method.
+func (m *MockClient) UpdateCDN(arg0 *manager.UpdateCDNRequest) (*manager.CDN, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "UpdateCDN", arg0)
+ ret0, _ := ret[0].(*manager.CDN)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// UpdateCDN indicates an expected call of UpdateCDN.
+func (mr *MockClientMockRecorder) UpdateCDN(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateCDN", reflect.TypeOf((*MockClient)(nil).UpdateCDN), arg0)
+}
+
+// UpdateScheduler mocks base method.
+func (m *MockClient) UpdateScheduler(arg0 *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "UpdateScheduler", arg0)
+ ret0, _ := ret[0].(*manager.Scheduler)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// UpdateScheduler indicates an expected call of UpdateScheduler.
+func (mr *MockClientMockRecorder) UpdateScheduler(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateScheduler", reflect.TypeOf((*MockClient)(nil).UpdateScheduler), arg0)
+}
diff --git a/client/config/peerhost.go b/client/config/peerhost.go
index 39a906b3f5a..8d3c599d02b 100644
--- a/client/config/peerhost.go
+++ b/client/config/peerhost.go
@@ -100,6 +100,11 @@ func (p *DaemonOption) Convert() error {
p.Host.AdvertiseIP = ip.String()
}
+ // ScheduleTimeout should not great then AliveTime
+ if p.AliveTime.Duration > 0 && p.Scheduler.ScheduleTimeout.Duration > p.AliveTime.Duration {
+ p.Scheduler.ScheduleTimeout.Duration = p.AliveTime.Duration - time.Second
+ }
+
return nil
}
@@ -107,24 +112,40 @@ func (p *DaemonOption) Validate() error {
if len(p.Scheduler.NetAddrs) == 0 && stringutils.IsBlank(p.ConfigServer) {
return errors.New("empty schedulers and config server is not specified")
}
- // ScheduleTimeout should not great then AliveTime
- if p.AliveTime.Duration > 0 && p.Scheduler.ScheduleTimeout.Duration > p.AliveTime.Duration {
- p.Scheduler.ScheduleTimeout.Duration = p.AliveTime.Duration - time.Second
+
+ if p.Scheduler.Manager.Enable {
+ if p.Scheduler.Manager.Addr == "" {
+ return errors.New("manager addr is not specified")
+ }
+
+ if p.Scheduler.Manager.RefreshInterval == 0 {
+ return errors.New("manager refreshInterval is not specified")
+ }
}
+
return nil
}
type SchedulerOption struct {
+ // Manager is to get the scheduler configuration remotely
+ Manager ManagerOption `mapstructure:"manager" yaml:"manager"`
// NetAddrs is scheduler addresses.
NetAddrs []dfnet.NetAddr `mapstructure:"netAddrs" yaml:"netAddrs"`
-
// ScheduleTimeout is request timeout.
ScheduleTimeout clientutil.Duration `mapstructure:"scheduleTimeout" yaml:"scheduleTimeout"`
-
// DisableAutoBackSource indicates not back source normally, only scheduler says back source
DisableAutoBackSource bool `mapstructure:"disableAutoBackSource" yaml:"disableAutoBackSource"`
}
+type ManagerOption struct {
+ // Enable get configuration from manager
+ Enable bool `mapstructure:"enable" yaml:"enable"`
+ // Addr is manager addresse
+ Addr string `mapstructure:"addr" yaml:"addr"`
+ // RefreshInterval is the refresh interval
+ RefreshInterval time.Duration `mapstructure:"refreshInterval" yaml:"refreshInterval"`
+}
+
type HostOption struct {
// SecurityDomain is the security domain
SecurityDomain string `mapstructure:"securityDomain" yaml:"securityDomain"`
@@ -134,6 +155,8 @@ type HostOption struct {
IDC string `mapstructure:"idc" yaml:"idc"`
// Peerhost net topology for scheduler
NetTopology string `mapstructure:"netTopology" yaml:"netTopology"`
+ // Hostname is daemon host name
+ Hostname string `mapstructure:"hostname" yaml:"hostname"`
// The listen ip for all tcp services of daemon
ListenIP string `mapstructure:"listenIP" yaml:"listenIP"`
// The ip report to scheduler, normal same with listen ip
diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go
index e0d7713993f..58029cb20cc 100644
--- a/client/config/peerhost_darwin.go
+++ b/client/config/peerhost_darwin.go
@@ -44,6 +44,10 @@ var peerHostConfig = DaemonOption{
GCInterval: clientutil.Duration{Duration: DefaultGCInterval},
KeepStorage: false,
Scheduler: SchedulerOption{
+ Manager: ManagerOption{
+ Enable: false,
+ RefreshInterval: 5 * time.Minute,
+ },
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
@@ -53,6 +57,7 @@ var peerHostConfig = DaemonOption{
ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout},
},
Host: HostOption{
+ Hostname: iputils.HostName,
ListenIP: net.IPv4zero.String(),
AdvertiseIP: iputils.HostIP,
SecurityDomain: "",
diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go
index 6109f63d15e..be861e218fe 100644
--- a/client/config/peerhost_linux.go
+++ b/client/config/peerhost_linux.go
@@ -44,6 +44,10 @@ var peerHostConfig = DaemonOption{
GCInterval: clientutil.Duration{Duration: DefaultGCInterval},
KeepStorage: false,
Scheduler: SchedulerOption{
+ Manager: ManagerOption{
+ Enable: false,
+ RefreshInterval: 5 * time.Minute,
+ },
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
@@ -53,6 +57,7 @@ var peerHostConfig = DaemonOption{
ScheduleTimeout: clientutil.Duration{Duration: DefaultScheduleTimeout},
},
Host: HostOption{
+ Hostname: iputils.HostName,
ListenIP: "0.0.0.0",
AdvertiseIP: iputils.HostIP,
SecurityDomain: "",
diff --git a/client/config/peerhost_test.go b/client/config/peerhost_test.go
index 4f36df613b5..a452befd7b6 100644
--- a/client/config/peerhost_test.go
+++ b/client/config/peerhost_test.go
@@ -228,6 +228,11 @@ func TestPeerHostOption_Load(t *testing.T) {
WorkHome: "/tmp/dragonfly/dfdaemon/",
KeepStorage: false,
Scheduler: SchedulerOption{
+ Manager: ManagerOption{
+ Enable: false,
+ Addr: "127.0.0.1:65003",
+ RefreshInterval: 5 * time.Minute,
+ },
NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
@@ -239,6 +244,7 @@ func TestPeerHostOption_Load(t *testing.T) {
},
},
Host: HostOption{
+ Hostname: "d7y.io",
SecurityDomain: "d7y.io",
Location: "0.0.0.0",
IDC: "d7y",
diff --git a/client/config/testdata/config/daemon.yaml b/client/config/testdata/config/daemon.yaml
index 49a2a20ed1a..0bf23bf494a 100644
--- a/client/config/testdata/config/daemon.yaml
+++ b/client/config/testdata/config/daemon.yaml
@@ -4,12 +4,17 @@ dataDir: /tmp/dragonfly/dfdaemon/
workHome: /tmp/dragonfly/dfdaemon/
keepStorage: false
scheduler:
+ manager:
+ enable: false
+ addr: "127.0.0.1:65003"
+ refreshInterval: 5m
netAddrs:
- type: tcp
addr: 127.0.0.1:8002
scheduleTimeout: 0
host:
+ hostname: d7y.io
listenIP: 0.0.0.0
advertiseIP: 0.0.0.0
location: 0.0.0.0
diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go
index 7bd4327fa31..104a9a44db9 100644
--- a/client/daemon/daemon.go
+++ b/client/daemon/daemon.go
@@ -25,6 +25,7 @@ import (
"net"
"net/http"
"os"
+ "reflect"
"runtime"
"sync"
"time"
@@ -49,9 +50,10 @@ import (
"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc"
+ "d7y.io/dragonfly/v2/pkg/rpc/manager"
+ managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
- "d7y.io/dragonfly/v2/pkg/util/net/iputils"
)
type Daemon interface {
@@ -80,9 +82,11 @@ type clientDaemon struct {
PeerTaskManager peer.TaskManager
PieceManager peer.PieceManager
-}
-var _ Daemon = (*clientDaemon)(nil)
+ dynconfig config.Dynconfig
+ schedulerAddrs []dfnet.NetAddr
+ schedulerClient schedulerclient.SchedulerClient
+}
func New(opt *config.DaemonOption) (Daemon, error) {
host := &scheduler.PeerHost{
@@ -90,18 +94,46 @@ func New(opt *config.DaemonOption) (Daemon, error) {
Ip: opt.Host.AdvertiseIP,
RpcPort: int32(opt.Download.PeerGRPC.TCPListen.PortRange.Start),
DownPort: 0,
- HostName: iputils.HostName,
+ HostName: opt.Host.Hostname,
SecurityDomain: opt.Host.SecurityDomain,
Location: opt.Host.Location,
Idc: opt.Host.IDC,
NetTopology: opt.Host.NetTopology,
}
+ var addrs []dfnet.NetAddr
+ var dynconfig config.Dynconfig
+ if opt.Scheduler.Manager.Enable == true {
+ // New manager client
+ managerClient, err := managerclient.New(opt.Scheduler.Manager.Addr)
+ if err != nil {
+ return nil, err
+ }
+
+ // New dynconfig client
+ if dynconfig, err = config.NewDynconfig(
+ config.NewManagerClient(managerClient, opt.Host),
+ opt.Scheduler.Manager.RefreshInterval,
+ ); err != nil {
+ return nil, err
+ }
+
+ // Get schedulers from manager
+ schedulers, err := dynconfig.GetSchedulers()
+ if err != nil {
+ return nil, err
+ }
+
+ addrs = schedulersToNetAddrs(schedulers)
+ } else {
+ addrs = opt.Scheduler.NetAddrs
+ }
+
var opts []grpc.DialOption
if opt.Options.Telemetry.Jaeger != "" {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
}
- sched, err := schedulerclient.GetClientByAddr(opt.Scheduler.NetAddrs, opts...)
+ sched, err := schedulerclient.GetClientByAddr(addrs, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to get schedulers")
}
@@ -186,6 +218,9 @@ func New(opt *config.DaemonOption) (Daemon, error) {
UploadManager: uploadManager,
StorageManager: storageManager,
GCManager: gc.NewManager(opt.GCInterval.Duration),
+ dynconfig: dynconfig,
+ schedulerAddrs: addrs,
+ schedulerClient: sched,
}, nil
}
@@ -431,6 +466,22 @@ func (cd *clientDaemon) Serve() error {
})
}
+ // serve dynconfig service
+ if cd.dynconfig != nil {
+ // dynconfig register client daemon
+ cd.dynconfig.Register(cd)
+
+ // servce dynconfig
+ g.Go(func() error {
+ if err := cd.dynconfig.Serve(); err != nil {
+ logger.Errorf("dynconfig start failed %v", err)
+ return err
+ }
+ logger.Info("dynconfig start successfully")
+ return nil
+ })
+ }
+
werr := g.Wait()
cd.Stop()
return werr
@@ -455,9 +506,40 @@ func (cd *clientDaemon) Stop() {
logger.Infof("keep storage disabled")
cd.StorageManager.CleanUp()
}
+
+ if cd.dynconfig != nil {
+ if err := cd.dynconfig.Stop(); err != nil {
+ logger.Errorf("dynconfig client closed failed %s", err)
+ }
+ logger.Info("dynconfig client closed")
+ }
})
}
+func (cd *clientDaemon) OnNotify(data *config.DynconfigData) {
+ addrs := schedulersToNetAddrs(data.Schedulers)
+ if reflect.DeepEqual(cd.schedulerAddrs, addrs) {
+ return
+ }
+
+ // Update scheduler client addresses
+ cd.schedulerClient.UpdateState(addrs)
+ cd.schedulerAddrs = addrs
+}
+
+// schedulersToNetAddrs coverts []*manager.Scheduler to []dfnet.NetAddr.
+func schedulersToNetAddrs(schedulers []*manager.Scheduler) []dfnet.NetAddr {
+ netAddrs := make([]dfnet.NetAddr, 0, len(schedulers))
+ for _, scheduler := range schedulers {
+ netAddrs = append(netAddrs, dfnet.NetAddr{
+ Type: dfnet.TCP,
+ Addr: fmt.Sprintf("%s:%d", scheduler.HostName, scheduler.Port),
+ })
+ }
+
+ return netAddrs
+}
+
func (cd *clientDaemon) ExportTaskManager() peer.TaskManager {
return cd.PeerTaskManager
}
diff --git a/client/daemon/peer/peertask_dummy.go b/client/daemon/peer/peertask_dummy.go
index 9504fa2f8dc..ef19144a74c 100644
--- a/client/daemon/peer/peertask_dummy.go
+++ b/client/daemon/peer/peertask_dummy.go
@@ -23,6 +23,7 @@ import (
"d7y.io/dragonfly/v2/internal/dfcodes"
"d7y.io/dragonfly/v2/internal/dferrors"
+ "d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)
@@ -50,6 +51,9 @@ func (d *dummySchedulerClient) Close() error {
return nil
}
+func (d *dummySchedulerClient) UpdateState(addrs []dfnet.NetAddr) {
+}
+
type dummyPeerPacketStream struct {
}
diff --git a/client/daemon/test/mock/scheduler/scheduler_client.go b/client/daemon/test/mock/scheduler/scheduler_client.go
index 703d50aecd1..87fb5403482 100644
--- a/client/daemon/test/mock/scheduler/scheduler_client.go
+++ b/client/daemon/test/mock/scheduler/scheduler_client.go
@@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
-// Source: ../../../../../pkg/rpc/scheduler/client/client.go
+// Source: d7y.io/dragonfly/v2/pkg/rpc/scheduler/client (interfaces: SchedulerClient)
// Package mock_client is a generated GoMock package.
package mock_client
@@ -8,6 +8,7 @@ import (
context "context"
reflect "reflect"
+ dfnet "d7y.io/dragonfly/v2/pkg/basic/dfnet"
scheduler "d7y.io/dragonfly/v2/pkg/rpc/scheduler"
client "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
gomock "github.com/golang/mock/gomock"
@@ -52,10 +53,10 @@ func (mr *MockSchedulerClientMockRecorder) Close() *gomock.Call {
}
// LeaveTask mocks base method.
-func (m *MockSchedulerClient) LeaveTask(ctx context.Context, pt *scheduler.PeerTarget, opts ...grpc.CallOption) error {
+func (m *MockSchedulerClient) LeaveTask(arg0 context.Context, arg1 *scheduler.PeerTarget, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper()
- varargs := []interface{}{ctx, pt}
- for _, a := range opts {
+ varargs := []interface{}{arg0, arg1}
+ for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "LeaveTask", varargs...)
@@ -64,17 +65,17 @@ func (m *MockSchedulerClient) LeaveTask(ctx context.Context, pt *scheduler.PeerT
}
// LeaveTask indicates an expected call of LeaveTask.
-func (mr *MockSchedulerClientMockRecorder) LeaveTask(ctx, pt interface{}, opts ...interface{}) *gomock.Call {
+func (mr *MockSchedulerClientMockRecorder) LeaveTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- varargs := append([]interface{}{ctx, pt}, opts...)
+ varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LeaveTask", reflect.TypeOf((*MockSchedulerClient)(nil).LeaveTask), varargs...)
}
// RegisterPeerTask mocks base method.
-func (m *MockSchedulerClient) RegisterPeerTask(ctx context.Context, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (*scheduler.RegisterResult, error) {
+func (m *MockSchedulerClient) RegisterPeerTask(arg0 context.Context, arg1 *scheduler.PeerTaskRequest, arg2 ...grpc.CallOption) (*scheduler.RegisterResult, error) {
m.ctrl.T.Helper()
- varargs := []interface{}{ctx, ptr}
- for _, a := range opts {
+ varargs := []interface{}{arg0, arg1}
+ for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "RegisterPeerTask", varargs...)
@@ -84,17 +85,17 @@ func (m *MockSchedulerClient) RegisterPeerTask(ctx context.Context, ptr *schedul
}
// RegisterPeerTask indicates an expected call of RegisterPeerTask.
-func (mr *MockSchedulerClientMockRecorder) RegisterPeerTask(ctx, ptr interface{}, opts ...interface{}) *gomock.Call {
+func (mr *MockSchedulerClientMockRecorder) RegisterPeerTask(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- varargs := append([]interface{}{ctx, ptr}, opts...)
+ varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterPeerTask", reflect.TypeOf((*MockSchedulerClient)(nil).RegisterPeerTask), varargs...)
}
// ReportPeerResult mocks base method.
-func (m *MockSchedulerClient) ReportPeerResult(ctx context.Context, pr *scheduler.PeerResult, opts ...grpc.CallOption) error {
+func (m *MockSchedulerClient) ReportPeerResult(arg0 context.Context, arg1 *scheduler.PeerResult, arg2 ...grpc.CallOption) error {
m.ctrl.T.Helper()
- varargs := []interface{}{ctx, pr}
- for _, a := range opts {
+ varargs := []interface{}{arg0, arg1}
+ for _, a := range arg2 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ReportPeerResult", varargs...)
@@ -103,17 +104,17 @@ func (m *MockSchedulerClient) ReportPeerResult(ctx context.Context, pr *schedule
}
// ReportPeerResult indicates an expected call of ReportPeerResult.
-func (mr *MockSchedulerClientMockRecorder) ReportPeerResult(ctx, pr interface{}, opts ...interface{}) *gomock.Call {
+func (mr *MockSchedulerClientMockRecorder) ReportPeerResult(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- varargs := append([]interface{}{ctx, pr}, opts...)
+ varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportPeerResult", reflect.TypeOf((*MockSchedulerClient)(nil).ReportPeerResult), varargs...)
}
// ReportPieceResult mocks base method.
-func (m *MockSchedulerClient) ReportPieceResult(ctx context.Context, taskId string, ptr *scheduler.PeerTaskRequest, opts ...grpc.CallOption) (client.PeerPacketStream, error) {
+func (m *MockSchedulerClient) ReportPieceResult(arg0 context.Context, arg1 string, arg2 *scheduler.PeerTaskRequest, arg3 ...grpc.CallOption) (client.PeerPacketStream, error) {
m.ctrl.T.Helper()
- varargs := []interface{}{ctx, taskId, ptr}
- for _, a := range opts {
+ varargs := []interface{}{arg0, arg1, arg2}
+ for _, a := range arg3 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ReportPieceResult", varargs...)
@@ -123,8 +124,20 @@ func (m *MockSchedulerClient) ReportPieceResult(ctx context.Context, taskId stri
}
// ReportPieceResult indicates an expected call of ReportPieceResult.
-func (mr *MockSchedulerClientMockRecorder) ReportPieceResult(ctx, taskId, ptr interface{}, opts ...interface{}) *gomock.Call {
+func (mr *MockSchedulerClientMockRecorder) ReportPieceResult(arg0, arg1, arg2 interface{}, arg3 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
- varargs := append([]interface{}{ctx, taskId, ptr}, opts...)
+ varargs := append([]interface{}{arg0, arg1, arg2}, arg3...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportPieceResult", reflect.TypeOf((*MockSchedulerClient)(nil).ReportPieceResult), varargs...)
}
+
+// UpdateState mocks base method.
+func (m *MockSchedulerClient) UpdateState(arg0 []dfnet.NetAddr) {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "UpdateState", arg0)
+}
+
+// UpdateState indicates an expected call of UpdateState.
+func (mr *MockSchedulerClientMockRecorder) UpdateState(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateState", reflect.TypeOf((*MockSchedulerClient)(nil).UpdateState), arg0)
+}
diff --git a/deploy/helm-charts b/deploy/helm-charts
index e3d9e72f64b..30fa43099a8 160000
--- a/deploy/helm-charts
+++ b/deploy/helm-charts
@@ -1 +1 @@
-Subproject commit e3d9e72f64bb2328edeb574872d873d1336349b1
+Subproject commit 30fa43099a8568aa4d4ed2398c3175ed8f014985
diff --git a/docs/en/api-reference/api-reference.md b/docs/en/api-reference/api-reference.md
index 6fd4145c2ef..d13240f52e6 100644
--- a/docs/en/api-reference/api-reference.md
+++ b/docs/en/api-reference/api-reference.md
@@ -3690,6 +3690,7 @@ delete role by uri config
|---|---|
|**bio**
*optional*|string|
|**config**
*required*|[types.CDNClusterConfig](#types-cdnclusterconfig)|
+|**is_default**
*optional*|boolean|
|**name**
*required*|string|
@@ -3902,6 +3903,7 @@ delete role by uri config
|---|---|
|**bio**
*optional*|string|
|**config**
*optional*|[types.CDNClusterConfig](#types-cdnclusterconfig)|
+|**is_default**
*optional*|boolean|
|**name**
*optional*|string|
diff --git a/docs/en/deployment/configuration/dfget.yaml b/docs/en/deployment/configuration/dfget.yaml
index ce557b01259..e2a4b193f72 100644
--- a/docs/en/deployment/configuration/dfget.yaml
+++ b/docs/en/deployment/configuration/dfget.yaml
@@ -25,6 +25,13 @@ keepStorage: true
# daemon will send tasks to a fixed scheduler by hashing the task url and meta data
# caution: only tcp is supported
scheduler:
+ manager:
+ # get scheduler list dynamically from manager
+ enable: false
+ # manager service address
+ addr: 127.0.0.1:65003
+ # scheduler list refresh interval
+ refreshInterval: 5m
# schedule timeout
scheduleTimeout: 30s
# when true, only scheduler says back source, daemon can back source
@@ -49,11 +56,16 @@ host:
# access ip for other peers
# when local ip is different with access ip, advertiseIP should be set
advertiseIP: 0.0.0.0
- # geographical location and network topology
+ # geographical location, separated by "|" characters
location: ""
+ # idc deployed by daemon
idc: ""
+ # security domain deployed by daemon, network isolation between different security domains
securityDomain: ""
+ # network topology, separated by "|" characters
netTopology: ""
+ # daemon hostname
+ # hostname: ""
# download service option
download:
diff --git a/docs/en/deployment/configuration/manager.yaml b/docs/en/deployment/configuration/manager.yaml
index 1235fdcbed5..136fac28e70 100644
--- a/docs/en/deployment/configuration/manager.yaml
+++ b/docs/en/deployment/configuration/manager.yaml
@@ -34,16 +34,15 @@ database:
host: dragonfly
port: 6379
db: 0
-
# manager server cache
-cache:
- # redis cache configure
- redis:
- # cache ttl configure unit[nanosecond]
- ttl: 30000000000
- # local cache configure
- local:
- # lfu cache size
- size: 10000
- # cache ttl configure unit[nanosecond]
- ttl: 30000000000
+# cache:
+# # redis cache configure
+# redis:
+# # cache ttl configure
+# ttl: 30s
+# # local cache configure
+# local:
+# # lfu cache size
+# size: 10000
+# # cache ttl configure
+# ttl: 30s
diff --git a/docs/zh-CN/api-reference/api-reference.md b/docs/zh-CN/api-reference/api-reference.md
index e4f9b4aed14..1a9fa135d69 100644
--- a/docs/zh-CN/api-reference/api-reference.md
+++ b/docs/zh-CN/api-reference/api-reference.md
@@ -3690,6 +3690,7 @@ delete role by uri config
|---|---|
|**bio**
*可选*|string|
|**config**
*必填*|[types.CDNClusterConfig](#types-cdnclusterconfig)|
+|**is_default**
*可选*|boolean|
|**name**
*必填*|string|
@@ -3902,6 +3903,7 @@ delete role by uri config
|---|---|
|**bio**
*可选*|string|
|**config**
*可选*|[types.CDNClusterConfig](#types-cdnclusterconfig)|
+|**is_default**
*可选*|boolean|
|**name**
*可选*|string|
diff --git a/docs/zh-CN/deployment/configuration/dfget.yaml b/docs/zh-CN/deployment/configuration/dfget.yaml
index 203e40195fb..da2c115a9e2 100644
--- a/docs/zh-CN/deployment/configuration/dfget.yaml
+++ b/docs/zh-CN/deployment/configuration/dfget.yaml
@@ -24,6 +24,13 @@ keepStorage: true
# 尽量使用同一个地区的调度器.
# daemon 将会根据 task id 来进行一致性 hash 来选择所有配置的调度器
scheduler:
+ manager:
+ # 通过 manager 接口动态获取 scheduler 列表
+ enable: false
+ # manager 服务地址
+ addr: 127.0.0.1:65003
+ # scheduler 列表刷新时间
+ refreshInterval: 5m
# 调度超时
scheduleTimeout: 30s
# 是否禁用回源,禁用回源后,在调度失败时不在 daemon 回源,直接返错
@@ -47,11 +54,16 @@ host:
# 访问 IP 地址
# 其他 daemon 可以通过这个 IP 地址连接过来
advertiseIP: 0.0.0.0
- # 地理信息和网络地址
+ # 地理信息, 通过 "|" 符号分隔
location: ""
+ # 机房信息
idc: ""
+ # 安全域信息,不同安全域之间网络隔离
securityDomain: ""
+ # 网络拓扑结构,通过 "|" 符号分隔
netTopology: ""
+ # 主机名称
+ # hostname: ""
# 下载服务选项
download:
@@ -204,4 +216,4 @@ proxy:
# 端口白名单
ports:
# - 80
- # - 443
\ No newline at end of file
+ # - 443
diff --git a/docs/zh-CN/deployment/configuration/manager.yaml b/docs/zh-CN/deployment/configuration/manager.yaml
index 001b8fe75d8..96afa9ca096 100644
--- a/docs/zh-CN/deployment/configuration/manager.yaml
+++ b/docs/zh-CN/deployment/configuration/manager.yaml
@@ -1,47 +1,46 @@
# 此文件是 manager 的配置文件模板,你可以通过根据需要改变对应的值来配置 manager 服务。
---
-# 当前的服务配置
+# 当前的服务配置
server:
# grpc 服务配置
grpc:
# 监听的 ip 地址
listen: 127.0.0.1
- # 监听的端口, manager 会从 start 到 end 之间的按顺序中选择一个可用端口
+ # 监听的端口, manager 会从 start 到 end 之间的按顺序中选择一个可用端口
port:
start: 65003
end: 65003
- # rest 服务配置
+ # rest 服务配置
rest:
- # 标准的 rest 服务地址: ip:port, ip 不配置则默认为0.0.0.0
+ # 标准的 rest 服务地址: ip:port, ip 不配置则默认为0.0.0.0
addr: :8080
# 前端控制台资源路径
# publicPath: /dist
-# 数据库配置, 当前只支持 mysql 以及 redis
+# 数据库配置, 当前只支持 mysql 以及 redis
database:
- # mysql 配置
+ # mysql 配置
mysql:
user: dragonfly
password: dragonfly
host: dragonfly
port: 3306
dbname: manager
- # redis 配置
+ # redis 配置
redis:
password: dragonfly
host: dragonfly
port: 6379
db: 0
-
-# 缓存配置
-cache:
- # redis 缓存配置
- redis:
- # ttl 配置,单位[纳秒]
- ttl: 30000000000
- # 本地缓存配置
- local:
- # LFU 缓存大小
- size: 10000
- # ttl 配置,单位[纳秒]
- ttl: 3000000000
+# 缓存配置
+# cache:
+# # redis 缓存配置
+# redis:
+# # ttl 配置
+# ttl: 30s
+# # 本地缓存配置
+# local:
+# # LFU 缓存大小
+# size: 10000
+# # ttl 配置
+# ttl: 30s
diff --git a/internal/dynconfig/dynconfig_manager.go b/internal/dynconfig/dynconfig_manager.go
index 00b611bc55b..f682a620991 100644
--- a/internal/dynconfig/dynconfig_manager.go
+++ b/internal/dynconfig/dynconfig_manager.go
@@ -60,7 +60,7 @@ func (d *dynconfigManager) get() (interface{}, error) {
// Cache has expired
// Reload and ignore client request error
if err := d.load(); err != nil {
- logger.Warn("reload failed", err)
+ logger.Warn("reload failed ", err)
}
dynconfig, ok := d.cache.Get(defaultCacheKey)
diff --git a/manager/cache/cache.go b/manager/cache/cache.go
index 3f1023bb2c4..8e8a8a8264d 100644
--- a/manager/cache/cache.go
+++ b/manager/cache/cache.go
@@ -72,6 +72,6 @@ func MakeSchedulerCacheKey(hostname string, clusterID uint) string {
return MakeCacheKey(SchedulerNamespace, fmt.Sprintf("%s-%d", hostname, clusterID))
}
-func MakeSchedulersCacheKey(hostname string) string {
- return MakeCacheKey(SchedulersNamespace, hostname)
+func MakeSchedulersCacheKey(hostname, ip string) string {
+ return MakeCacheKey(SchedulersNamespace, fmt.Sprintf("%s-%s", hostname, ip))
}
diff --git a/manager/job/preheat.go b/manager/job/preheat.go
index 864019466a3..cee9fb3a048 100644
--- a/manager/job/preheat.go
+++ b/manager/job/preheat.go
@@ -160,7 +160,7 @@ func (p *preheat) createGroupJob(ctx context.Context, files []*internaljob.Prehe
return nil, err
}
- logger.Infof("create preheat group job successed, group uuid: %s, urls:%s", group.GroupUUID, urls)
+ logger.Infof("create preheat group job succeeded, group uuid: %s, urls: %s", group.GroupUUID, urls)
return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
diff --git a/manager/searcher/searcher.go b/manager/searcher/searcher.go
index 4c5b5a1422a..5d7b4e9abf1 100644
--- a/manager/searcher/searcher.go
+++ b/manager/searcher/searcher.go
@@ -28,17 +28,17 @@ import (
)
const (
- // Condition IDC key
- conditionIDC = "idc"
+ // Condition security domain key
+ ConditionSecurityDomain = "security_domain"
- // Condition location key
- conditionLocation = "location"
+ // Condition IDC key
+ ConditionIDC = "idc"
// Condition netTopology key
- conditionNetTopology = "net_topology"
+ ConditionNetTopology = "net_topology"
- // Condition security domain key
- conditionSecurityDomain = "security_domain"
+ // Condition location key
+ ConditionLocation = "location"
)
const (
@@ -96,7 +96,7 @@ func (s *searcher) FindSchedulerCluster(schedulerClusters []model.SchedulerClust
// If the security domain condition does not exist, it will match all scheduler security domains.
// Then use clusters sets to score according to scopes.
var clusters []model.SchedulerCluster
- securityDomain := conditions[conditionSecurityDomain]
+ securityDomain := conditions[ConditionSecurityDomain]
if securityDomain == "" {
logger.Infof("client %s %s have empty security domain", client.HostName, client.Ip)
}
@@ -146,9 +146,9 @@ func (s *searcher) FindSchedulerCluster(schedulerClusters []model.SchedulerClust
// Evaluate the degree of matching between scheduler cluster and dfdaemon
func evaluate(conditions map[string]string, scopes Scopes) float64 {
- return idcAffinityWeight*calculateIDCAffinityScore(conditions[conditionIDC], scopes.IDC) +
- locationAffinityWeight*calculateMultiElementAffinityScore(conditions[conditionLocation], scopes.Location) +
- netTopologyAffinityWeight*calculateMultiElementAffinityScore(conditions[conditionNetTopology], scopes.NetTopology)
+ return idcAffinityWeight*calculateIDCAffinityScore(conditions[ConditionIDC], scopes.IDC) +
+ locationAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionLocation], scopes.Location) +
+ netTopologyAffinityWeight*calculateMultiElementAffinityScore(conditions[ConditionNetTopology], scopes.NetTopology)
}
// calculateIDCAffinityScore 0.0~1.0 larger and better
diff --git a/manager/service/cdn_cluster.go b/manager/service/cdn_cluster.go
index 596475e001b..00e3438947f 100644
--- a/manager/service/cdn_cluster.go
+++ b/manager/service/cdn_cluster.go
@@ -31,9 +31,10 @@ func (s *rest) CreateCDNCluster(ctx context.Context, json types.CreateCDNCluster
}
cdnCluster := model.CDNCluster{
- Name: json.Name,
- BIO: json.BIO,
- Config: config,
+ Name: json.Name,
+ BIO: json.BIO,
+ Config: config,
+ IsDefault: json.IsDefault,
}
if err := s.db.WithContext(ctx).Create(&cdnCluster).Error; err != nil {
@@ -64,9 +65,10 @@ func (s *rest) UpdateCDNCluster(ctx context.Context, id uint, json types.UpdateC
cdnCluster := model.CDNCluster{}
if err := s.db.WithContext(ctx).First(&cdnCluster, id).Updates(model.CDNCluster{
- Name: json.Name,
- BIO: json.BIO,
- Config: config,
+ Name: json.Name,
+ BIO: json.BIO,
+ Config: config,
+ IsDefault: json.IsDefault,
}).Error; err != nil {
return nil, err
}
diff --git a/manager/service/service_grpc.go b/manager/service/service_grpc.go
index b295f0f4b16..50b6ae20e26 100644
--- a/manager/service/service_grpc.go
+++ b/manager/service/service_grpc.go
@@ -366,7 +366,7 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRequest) (*manager.ListSchedulersResponse, error) {
var pbListSchedulersResponse manager.ListSchedulersResponse
- cacheKey := cache.MakeSchedulersCacheKey(req.HostName)
+ cacheKey := cache.MakeSchedulersCacheKey(req.HostName, req.Ip)
// Cache Hit
if err := s.cache.Get(ctx, cacheKey, &pbListSchedulersResponse); err == nil {
@@ -377,18 +377,14 @@ func (s *GRPC) ListSchedulers(ctx context.Context, req *manager.ListSchedulersRe
// Cache Miss
logger.Infof("%s cache miss", cacheKey)
var schedulerClusters []model.SchedulerCluster
- if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("Schedulers", "status = ?", "active").Find(&schedulerClusters).Error; err != nil {
+ if err := s.db.WithContext(ctx).Preload("SecurityGroup.SecurityRules").Preload("Schedulers", "state = ?", "active").Find(&schedulerClusters).Error; err != nil {
return nil, status.Error(codes.Unknown, err.Error())
}
// Search optimal scheduler cluster
schedulerCluster, ok := s.searcher.FindSchedulerCluster(schedulerClusters, req)
if !ok {
- if err := s.db.WithContext(ctx).Find(&schedulerCluster, &model.SchedulerCluster{
- IsDefault: true,
- }).Error; err != nil {
- return nil, status.Error(codes.Unknown, err.Error())
- }
+ return nil, status.Error(codes.NotFound, "scheduler cluster not found")
}
schedulers := []model.Scheduler{}
diff --git a/manager/types/cdn_cluster.go b/manager/types/cdn_cluster.go
index 07ce8e6ecc5..c5959f84f22 100644
--- a/manager/types/cdn_cluster.go
+++ b/manager/types/cdn_cluster.go
@@ -31,15 +31,17 @@ type AddSchedulerClusterToCDNClusterParams struct {
}
type CreateCDNClusterRequest struct {
- Name string `json:"name" binding:"required"`
- BIO string `json:"bio" binding:"omitempty"`
- Config *CDNClusterConfig `json:"config" binding:"required"`
+ Name string `json:"name" binding:"required"`
+ BIO string `json:"bio" binding:"omitempty"`
+ Config *CDNClusterConfig `json:"config" binding:"required"`
+ IsDefault bool `json:"is_default" binding:"omitempty"`
}
type UpdateCDNClusterRequest struct {
- Name string `json:"name" binding:"omitempty"`
- BIO string `json:"bio" binding:"omitempty"`
- Config *CDNClusterConfig `json:"config" binding:"omitempty"`
+ Name string `json:"name" binding:"omitempty"`
+ BIO string `json:"bio" binding:"omitempty"`
+ Config *CDNClusterConfig `json:"config" binding:"omitempty"`
+ IsDefault bool `json:"is_default" binding:"omitempty"`
}
type GetCDNClustersQuery struct {
diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go
index 9043a9dc8be..37c041a649b 100644
--- a/pkg/rpc/client.go
+++ b/pkg/rpc/client.go
@@ -436,4 +436,6 @@ func (conn *Connection) UpdateState(addrs []dfnet.NetAddr) {
defer conn.rwMutex.Unlock()
conn.serverNodes = addrs
conn.hashRing = hashring.New(addresses)
+
+ logger.GrpcLogger.Infof("update grpc client addresses %v", addresses)
}
diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go
index f08913262ab..c2d9d7b03f3 100644
--- a/pkg/rpc/scheduler/client/client.go
+++ b/pkg/rpc/scheduler/client/client.go
@@ -59,6 +59,8 @@ type SchedulerClient interface {
LeaveTask(context.Context, *scheduler.PeerTarget, ...grpc.CallOption) error
+ UpdateState(addrs []dfnet.NetAddr)
+
Close() error
}
diff --git a/scheduler/config/dynconfig.go b/scheduler/config/dynconfig.go
index ea934267bce..3ad3af818f6 100644
--- a/scheduler/config/dynconfig.go
+++ b/scheduler/config/dynconfig.go
@@ -33,7 +33,7 @@ import (
)
var (
- DefaultDynconfigCachePath = filepath.Join(dfpath.DefaultCacheDir, "scheduler_dynconfig")
+ cachePath = filepath.Join(dfpath.DefaultCacheDir, "scheduler_dynconfig")
)
var (
@@ -105,7 +105,7 @@ type DynconfigInterface interface {
Serve() error
// Stop the dynconfig listening service.
- Stop()
+ Stop() error
}
type Observer interface {
@@ -130,6 +130,7 @@ func NewDynconfig(sourceType dc.SourceType, cdnDirPath string, options ...dc.Opt
sourceType: sourceType,
}
+ options = append(options, dc.WithCachePath(cachePath))
client, err := dc.New(sourceType, options...)
if err != nil {
return nil, err
@@ -310,8 +311,13 @@ func (d *dynconfig) watch() {
}
}
-func (d *dynconfig) Stop() {
+func (d *dynconfig) Stop() error {
close(d.done)
+ if err := os.Remove(cachePath); err != nil {
+ return err
+ }
+
+ return nil
}
type managerClient struct {
diff --git a/scheduler/config/dynconfig_test.go b/scheduler/config/dynconfig_test.go
index a72dc2e1837..0718a37933c 100644
--- a/scheduler/config/dynconfig_test.go
+++ b/scheduler/config/dynconfig_test.go
@@ -44,7 +44,7 @@ func TestDynconfigGet_ManagerSourceType(t *testing.T) {
name: "get dynconfig success",
expire: 10 * time.Second,
cleanFileCache: func(t *testing.T) {
- if err := os.Remove(DefaultDynconfigCachePath); err != nil {
+ if err := os.Remove(cachePath); err != nil {
t.Fatal(err)
}
},
@@ -73,7 +73,7 @@ func TestDynconfigGet_ManagerSourceType(t *testing.T) {
name: "client failed to return for the second time",
expire: 10 * time.Millisecond,
cleanFileCache: func(t *testing.T) {
- if err := os.Remove(DefaultDynconfigCachePath); err != nil {
+ if err := os.Remove(cachePath); err != nil {
t.Fatal(err)
}
},
@@ -114,7 +114,7 @@ func TestDynconfigGet_ManagerSourceType(t *testing.T) {
d, err := NewDynconfig(dc.ManagerSourceType, "", []dc.Option{
dc.WithManagerClient(NewManagerClient(mockManagerClient, uint(1))),
- dc.WithCachePath(DefaultDynconfigCachePath),
+ dc.WithCachePath(cachePath),
dc.WithExpireTime(tc.expire),
}...)
if err != nil {
diff --git a/scheduler/job/job.go b/scheduler/job/job.go
index 69d98235491..a977094919d 100644
--- a/scheduler/job/job.go
+++ b/scheduler/job/job.go
@@ -186,7 +186,7 @@ func (t *job) preheat(ctx context.Context, req string) error {
}
if piece.Done == true {
- plogger.Info("preheat successed")
+ plogger.Info("preheat succeeded")
return nil
}
}
diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go
index 153f7ec1ce8..bdaf22b79dc 100644
--- a/scheduler/scheduler.go
+++ b/scheduler/scheduler.go
@@ -60,7 +60,7 @@ type Server struct {
managerClient managerclient.Client
// Dynamic config
- dynConfig config.DynconfigInterface
+ dynconfig config.DynconfigInterface
// Async job
job job.Job
@@ -99,7 +99,6 @@ func New(cfg *config.Config) (*Server, error) {
if s.managerClient != nil && cfg.DynConfig.Type == dynconfig.ManagerSourceType {
options = append(options,
dynconfig.WithManagerClient(config.NewManagerClient(s.managerClient, cfg.Manager.SchedulerClusterID)),
- dynconfig.WithCachePath(config.DefaultDynconfigCachePath),
dynconfig.WithExpireTime(cfg.DynConfig.ExpireTime),
)
}
@@ -107,7 +106,7 @@ func New(cfg *config.Config) (*Server, error) {
if err != nil {
return nil, err
}
- s.dynConfig = dynConfig
+ s.dynconfig = dynConfig
// Initialize GC
s.gc = gc.New(gc.WithLogger(logger.GcLogger))
@@ -153,7 +152,7 @@ func New(cfg *config.Config) (*Server, error) {
func (s *Server) Serve() error {
// Serve dynConfig
go func() {
- if err := s.dynConfig.Serve(); err != nil {
+ if err := s.dynconfig.Serve(); err != nil {
logger.Fatalf("dynconfig start failed %v", err)
}
logger.Info("dynconfig start successfully")
@@ -222,8 +221,10 @@ func (s *Server) Serve() error {
}
func (s *Server) Stop() {
- // Stop dynamic server
- s.dynConfig.Stop()
+ // Stop dynconfig server
+ if err := s.dynconfig.Stop(); err != nil {
+ logger.Errorf("dynconfig client closed failed %s", err)
+ }
logger.Info("dynconfig client closed")
// Stop manager client