Skip to content

Commit

Permalink
feat: dfdaemon get scheduler list dynamically from manager (#812)
Browse files Browse the repository at this point in the history
* feat: dfdaemon get scheduler list dynamically from manager

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Nov 22, 2021
1 parent ccde8e8 commit 543a4a2
Show file tree
Hide file tree
Showing 34 changed files with 1,106 additions and 114 deletions.
6 changes: 6 additions & 0 deletions api/manager/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3945,6 +3945,9 @@ var doc = `{
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
"is_default": {
"type": "boolean"
},
"name": {
"type": "string"
}
Expand Down Expand Up @@ -4348,6 +4351,9 @@ var doc = `{
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
"is_default": {
"type": "boolean"
},
"name": {
"type": "string"
}
Expand Down
6 changes: 6 additions & 0 deletions api/manager/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -3931,6 +3931,9 @@
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
"is_default": {
"type": "boolean"
},
"name": {
"type": "string"
}
Expand Down Expand Up @@ -4334,6 +4337,9 @@
"config": {
"$ref": "#/definitions/types.CDNClusterConfig"
},
"is_default": {
"type": "boolean"
},
"name": {
"type": "string"
}
Expand Down
4 changes: 4 additions & 0 deletions api/manager/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ definitions:
type: string
config:
$ref: '#/definitions/types.CDNClusterConfig'
is_default:
type: boolean
name:
type: string
required:
Expand Down Expand Up @@ -591,6 +593,8 @@ definitions:
type: string
config:
$ref: '#/definitions/types.CDNClusterConfig'
is_default:
type: boolean
name:
type: string
type: object
Expand Down
199 changes: 199 additions & 0 deletions client/config/dynconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import (
"os"
"path/filepath"
"time"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/dfpath"
internaldynconfig "d7y.io/dragonfly/v2/internal/dynconfig"
"d7y.io/dragonfly/v2/manager/searcher"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
)

var (
// Dynconfig configure the cache path
cachePath = filepath.Join(dfpath.DefaultCacheDir, "daemon_dynconfig")

// Watch dynconfig interval
watchInterval = 5 * time.Second
)

type DynconfigData struct {
Schedulers []*manager.Scheduler
}

type Dynconfig interface {
// Get the dynamic config from manager.
GetSchedulers() ([]*manager.Scheduler, error)

// Get the dynamic config from manager.
Get() (*DynconfigData, error)

// Register allows an instance to register itself to listen/observe events.
Register(Observer)

// Deregister allows an instance to remove itself from the collection of observers/listeners.
Deregister(Observer)

// Notify publishes new events to listeners.
Notify() error

// Serve the dynconfig listening service.
Serve() error

// Stop the dynconfig listening service.
Stop() error
}

type Observer interface {
// OnNotify allows an event to be "published" to interface implementations.
OnNotify(*DynconfigData)
}

type dynconfig struct {
*internaldynconfig.Dynconfig
observers map[Observer]struct{}
done chan bool
}

func NewDynconfig(managerClient internaldynconfig.ManagerClient, expire time.Duration) (Dynconfig, error) {
client, err := internaldynconfig.New(
internaldynconfig.ManagerSourceType,
internaldynconfig.WithManagerClient(managerClient),
internaldynconfig.WithExpireTime(expire),
internaldynconfig.WithCachePath(cachePath),
)
if err != nil {
return nil, err
}

return &dynconfig{
observers: map[Observer]struct{}{},
done: make(chan bool),
Dynconfig: client,
}, nil
}

func (d *dynconfig) GetSchedulers() ([]*manager.Scheduler, error) {
data, err := d.Get()
if err != nil {
return nil, err
}

return data.Schedulers, nil
}

func (d *dynconfig) Get() (*DynconfigData, error) {
var data DynconfigData
if err := d.Unmarshal(&data); err != nil {
return nil, err
}

return &data, nil
}

func (d *dynconfig) Register(l Observer) {
d.observers[l] = struct{}{}
}

func (d *dynconfig) Deregister(l Observer) {
delete(d.observers, l)
}

func (d *dynconfig) Notify() error {
data, err := d.Get()
if err != nil {
return err
}

for o := range d.observers {
o.OnNotify(data)
}

return nil
}

func (d *dynconfig) Serve() error {
if err := d.Notify(); err != nil {
return err
}

go d.watch()

return nil
}

func (d *dynconfig) watch() {
tick := time.NewTicker(watchInterval)

for {
select {
case <-tick.C:
if err := d.Notify(); err != nil {
logger.Error("dynconfig notify failed", err)
}
case <-d.done:
return
}
}
}

func (d *dynconfig) Stop() error {
close(d.done)
if err := os.Remove(cachePath); err != nil {
return err
}

return nil
}

type managerClient struct {
managerclient.Client
hostOption HostOption
}

// New the manager client used by dynconfig
func NewManagerClient(client managerclient.Client, hostOption HostOption) internaldynconfig.ManagerClient {
return &managerClient{
Client: client,
hostOption: hostOption,
}
}

func (mc *managerClient) Get() (interface{}, error) {
schedulers, err := mc.ListSchedulers(&manager.ListSchedulersRequest{
SourceType: manager.SourceType_CLIENT_SOURCE,
HostName: mc.hostOption.Hostname,
Ip: mc.hostOption.ListenIP,
HostInfo: map[string]string{
searcher.ConditionSecurityDomain: mc.hostOption.SecurityDomain,
searcher.ConditionIDC: mc.hostOption.IDC,
searcher.ConditionNetTopology: mc.hostOption.NetTopology,
searcher.ConditionLocation: mc.hostOption.Location,
},
})
if err != nil {
return nil, err
}

return schedulers, nil
}
Loading

0 comments on commit 543a4a2

Please sign in to comment.