Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: PasswordResetTokens #179

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

type DelayingWorkqueueController struct {
DistributedController
ShardedController
}

func NewDelayingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, kubeClient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *DelayingWorkqueueController {
dwqc := &DelayingWorkqueueController{
*NewDistributedController(ctx, informer, kubeClient, name, resyncPeriod),
*NewShardedController(ctx, informer, kubeClient, name, resyncPeriod),
}

dwqc.SetWorkqueue(workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{Name: name}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
)

type RateLimitingWorkqueueController struct {
DistributedController
ShardedController
}

func NewRateLimitingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, kubeClient *kubernetes.Clientset, reconcileFunc func(objName string) error, name string, resyncPeriod time.Duration, rateLimiter workqueue.RateLimiter) *RateLimitingWorkqueueController {
rlwq := &RateLimitingWorkqueueController{
*NewDistributedController(ctx, informer, kubeClient, name, resyncPeriod),
*NewShardedController(ctx, informer, kubeClient, name, resyncPeriod),
}

rlwq.SetWorkqueue(workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{Name: name}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/binary"
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
Expand All @@ -19,7 +18,7 @@ import (
"k8s.io/client-go/tools/cache"
)

type DistributedController struct {
type ShardedController struct {
BaseController
LoadScheduler
kubeClient *kubernetes.Clientset
Expand All @@ -28,37 +27,39 @@ type DistributedController struct {
replica_identity int
}

func NewDistributedController(ctx context.Context, informer cache.SharedIndexInformer, kubeclient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *DistributedController {
dc := &DistributedController{
func NewShardedController(ctx context.Context, informer cache.SharedIndexInformer, kubeclient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *ShardedController {
dc := &ShardedController{
BaseController: *newBaseController(name, ctx, informer, resyncPeriod),
kubeClient: kubeclient,
}
return dc
}

func (c *DistributedController) enqueue(obj interface{}) {
// This method calculates if the object will be reconciled on this shard and adds it to the local queue
// otherwise it will be ignored
func (c *ShardedController) enqueue(obj interface{}) {
if c.replica_identity > c.replica_count || c.replica_count == 0 {
// we have likely scaled down. No longer enqueue in this replica.
return
}

// calculate the placement of the object
placement, err := c.getReplicaPlacement(obj)
shardPlacement, err := c.getShardPlacement(obj)

if err != nil {
glog.Errorf("Could not enqueue object due to error in placement calculation: %v", err)
return
}

// is this object placed on this replica then enqueue it
if placement == c.replica_identity {
// if this object is placed on this replica then enqueue it
if shardPlacement == c.replica_identity {
c.BaseController.enqueue(obj)
}
}

// This method calculates on which replica an object needs to be reoconciled.
// It uses a hash of the objects name to guarantee an almost equally distribution between replicas.
func (c *DistributedController) getReplicaPlacement(obj interface{}) (int, error) {
func (c *ShardedController) getShardPlacement(obj interface{}) (int, error) {
hasher := md5.New()
var key string
var err error
Expand Down Expand Up @@ -90,19 +91,43 @@ func (c *DistributedController) getReplicaPlacement(obj interface{}) (int, error
}

// RunDistributed will start a distributed controller concept
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// RunDistributed will start a distributed controller concept
// RunSharded will start a sharded controller concept

func (c *DistributedController) RunDistributed(stopCh <-chan struct{}) error {
c.statefulset_name = os.Getenv("STATEFULSET_NAME")
podIdentityName := os.Getenv("POD_IDENTITY")
func (c *ShardedController) RunSharded(stopCh <-chan struct{}, statefulSetName string, shardIdentity string) error {
c.statefulset_name = statefulSetName
podIdentityName := shardIdentity

parts := strings.Split(podIdentityName, "-")
ordinalIndex, err := strconv.Atoi(parts[len(parts)-1])

if err != nil {
return fmt.Errorf("Error in getting a ordinal pod identity from string: %s", podIdentityName)
}

c.replica_identity = ordinalIndex

err = c.watchStatefulSetUpdates(stopCh)
if err != nil {
return err
}

return c.run(stopCh)
}

// handle updates to the statefulset. set the replica count to the current total replica count
func (c *ShardedController) handleStatefulsetUpdate(obj interface{}) {
statefulset, ok := obj.(*v1.StatefulSet)
if !ok {
glog.V(4).Infof("Not a StatefulSet: %v", obj)
return
}

replicaCount := int(*statefulset.Spec.Replicas)
if replicaCount != c.replica_count {
glog.V(8).Infof("Statefulset %s updated replica count from %d to %d replicas", statefulset.Name, c.replica_count, replicaCount)
c.replica_count = replicaCount
}
}

// handle updates to the parent StatefulSet of this replica. Set the replica count to the current total replica count
func (c *ShardedController) watchStatefulSetUpdates(stopCh <-chan struct{}) error {
// client to watch for updates of the parent statefulset object
watchlist := cache.NewListWatchFromClient(
c.kubeClient.AppsV1().RESTClient(),
Expand Down Expand Up @@ -131,25 +156,10 @@ func (c *DistributedController) RunDistributed(stopCh <-chan struct{}) error {

go controller.Run(stopCh)

glog.V(4).Info("Waiting for informer caches to sync")
glog.V(4).Info("Waiting for StatefulSet Informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, controller.HasSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

return c.run(stopCh)
}

// handle updates to the statefulset. set the replica count to the current total replica count
func (c *DistributedController) handleStatefulsetUpdate(obj interface{}) {
statefulset, ok := obj.(*v1.StatefulSet)
if !ok {
glog.V(4).Infof("Not a StatefulSet: %v", obj)
return
return fmt.Errorf("failed to wait for StatefulSet Informer caches to sync")
}

replicaCount := int(*statefulset.Spec.Replicas)
if replicaCount != c.replica_count {
glog.V(8).Infof("Statefulset %s updated replica count from %d to %d replicas", statefulset.Name, c.replica_count, replicaCount)
c.replica_count = replicaCount
}
return nil
}
47 changes: 15 additions & 32 deletions v3/pkg/microservices/microservices.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net"
"net/http"
"os"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -63,6 +64,7 @@ const (
defaultGrpcPort string = "8080"
defaultApiPort string = "80"
InitialConnectionTimeout time.Duration = 30 * time.Second
defaultThreadCount int = 1
)

var CORS_ALLOWED_METHODS_ALL = [...]string{"GET", "POST", "PUT", "HEAD", "OPTIONS", "DELETE"}
Expand Down Expand Up @@ -350,39 +352,20 @@ func BuildServiceConfig() *ServiceConfig {
return cfg
}

/*

type onStartedLeading func(context.Context)
// This method tries to retreive the controller thread count from an environment variable CONTROLLER_THREAD_COUNT
// otherwise it sets the thread count to the default value
func GetWorkerThreadCount() int {
workerThreadCountString := os.Getenv("CONTROLLER_THREAD_COUNT")
workerThreads := defaultThreadCount
if workerThreadCountString != "" {
i, err := strconv.Atoi(workerThreadCountString)
if err != nil {
glog.Infof("Error parsing env var CONTROLLER_THREAD_COUNT, using default thread count %d", workerThreads)
} else {

func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, stopControllersCh chan<- struct{}, onStartedLeadingFunc onStartedLeading) {
lock, err := util.GetLock(string(svc), cfg)
if err != nil {
glog.Fatal(err)
}
workerThreads = i
}
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 10 * time.Second,
RenewDeadline: 5 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: onStartedLeadingFunc,
OnStoppedLeading: func() {
// Need to start informer factory since even when not leader to ensure api layer
// keeps working.
glog.Info("Stopped being the leader. Shutting down controllers")
stopControllersCh <- struct{}{} // Send the stopControllers Signal
},
OnNewLeader: func(current_id string) {
if current_id == lock.Identity() {
glog.Info("I am currently the leader")
return
}
glog.Infof("Leader changed to %s", current_id)

},
},
})
return workerThreads
}

*/
52 changes: 15 additions & 37 deletions v3/services/usersvc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@ import (
"github.com/hobbyfarm/gargantua/v3/pkg/microservices"
"github.com/hobbyfarm/gargantua/v3/pkg/signals"
"github.com/hobbyfarm/gargantua/v3/pkg/util"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"

"github.com/golang/glog"
userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal"
userservicecontroller "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal/controllers"
"github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned"
hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions"

"github.com/hobbyfarm/gargantua/v3/protos/authn"
Expand Down Expand Up @@ -81,6 +78,12 @@ func main() {

user.RegisterUserSvcServer(gs, us)

passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, kubeClient, ctx)
if err != nil {
glog.Fatalf("creating passwordResetTokenController failed: %v", err)
}
passwordResetTokenController.SetWorkerThreadCount(microservices.GetWorkerThreadCount())

var wg sync.WaitGroup
wg.Add(1)

Expand All @@ -99,43 +102,18 @@ func main() {
microservices.StartAPIServer(userServer)
}()

stopControllersCh := make(chan struct{})
go func() {
defer wg.Done()
glog.Info("Starting controllers")
stopControllersCh := make(chan struct{})
err := passwordResetTokenController.RunSharded(stopControllersCh, os.Getenv("STATEFULSET_NAME"), os.Getenv("POD_IDENTITY"))
if err != nil {
glog.Errorf("Error starting up the controllers: %v", err)
}
}()

stopInformerFactoryCh := signals.SetupSignalHandler()
hfInformerFactory.Start(stopInformerFactoryCh)

_err := startControllers(ctx, hfClient, kubeClient, stopControllersCh)
if _err != nil {
glog.Fatal(_err)
}

wg.Wait()
}

func startControllers(ctx context.Context, hfClient *versioned.Clientset, kubeClient *kubernetes.Clientset, stopControllersCh <-chan struct{}) error {
glog.Info("Starting controllers")
hfInformerFactory := hfInformers.NewSharedInformerFactoryWithOptions(hfClient, time.Second*30, hfInformers.WithNamespace(util.GetReleaseNamespace()))
g, gctx := errgroup.WithContext(ctx)

passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, kubeClient, gctx)
if err != nil {
glog.Fatalf("starting passwordResetTokenController failed: %v", err)
}

passwordResetTokenController.SetWorkerThreadCount(2)

g.Go(func() error {
// TODO replica name
return passwordResetTokenController.RunDistributed(stopControllersCh)
})

// TODO start informer for ReplicaSets

hfInformerFactory.Start(stopControllersCh)

if err = g.Wait(); err != nil {
glog.Errorf("Error starting up the controllers: %v", err)
return err
}
return nil
}
Loading