From fceb124b30924355a4ad308079a5ec4e34809240 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Fri, 26 Jan 2024 15:59:17 +0100 Subject: [PATCH 01/12] First proposal for leader election in microservices --- v3/pkg/apis/hobbyfarm.io/v1/types.go | 24 +++ .../hobbyfarm.io/v1/zz_generated.deepcopy.go | 76 ++++++++ .../v1/fake/fake_hobbyfarm.io_client.go | 4 + .../v1/fake/fake_passwordresettoken.go | 129 +++++++++++++ .../hobbyfarm.io/v1/generated_expansion.go | 2 + .../hobbyfarm.io/v1/hobbyfarm.io_client.go | 5 + .../hobbyfarm.io/v1/passwordresettoken.go | 178 ++++++++++++++++++ .../informers/externalversions/generic.go | 2 + .../hobbyfarm.io/v1/interface.go | 7 + .../hobbyfarm.io/v1/passwordresettoken.go | 90 +++++++++ .../hobbyfarm.io/v1/expansion_generated.go | 8 + .../hobbyfarm.io/v1/passwordresettoken.go | 99 ++++++++++ v3/pkg/microservices/microservices.go | 41 +++- .../passwordResetTokenController.go | 159 ++++++++++++++++ v3/services/usersvc/internal/crd.go | 10 + v3/services/usersvc/main.go | 19 ++ 16 files changed, 850 insertions(+), 3 deletions(-) create mode 100644 v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_passwordresettoken.go create mode 100644 v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/passwordresettoken.go create mode 100644 v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/passwordresettoken.go create mode 100644 v3/pkg/client/listers/hobbyfarm.io/v1/passwordresettoken.go create mode 100644 v3/services/usersvc/internal/controllers/passwordResetTokenController.go diff --git a/v3/pkg/apis/hobbyfarm.io/v1/types.go b/v3/pkg/apis/hobbyfarm.io/v1/types.go index 53a77ea9..5aacd9c2 100644 --- a/v3/pkg/apis/hobbyfarm.io/v1/types.go +++ b/v3/pkg/apis/hobbyfarm.io/v1/types.go @@ -555,3 +555,27 @@ type ScopeList struct { Items []Scope `json:"items"` } + +// +genclient +// +genclient:noStatus +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type PasswordResetToken struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec PasswordResetTokenSpec `json:"spec"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type PasswordResetTokenList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + Items []PasswordResetToken `json:"items"` +} + +type PasswordResetTokenSpec struct { + UserId string `json:"user"` + Timestamp string `json:"timestamp"` + Duration string `json:"duration"` +} diff --git a/v3/pkg/apis/hobbyfarm.io/v1/zz_generated.deepcopy.go b/v3/pkg/apis/hobbyfarm.io/v1/zz_generated.deepcopy.go index fc9b4937..4b78a20d 100644 --- a/v3/pkg/apis/hobbyfarm.io/v1/zz_generated.deepcopy.go +++ b/v3/pkg/apis/hobbyfarm.io/v1/zz_generated.deepcopy.go @@ -483,6 +483,82 @@ func (in *OneTimeAccessCodeSpec) DeepCopy() *OneTimeAccessCodeSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PasswordResetToken) DeepCopyInto(out *PasswordResetToken) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + out.Spec = in.Spec + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PasswordResetToken. +func (in *PasswordResetToken) DeepCopy() *PasswordResetToken { + if in == nil { + return nil + } + out := new(PasswordResetToken) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PasswordResetToken) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PasswordResetTokenList) DeepCopyInto(out *PasswordResetTokenList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PasswordResetToken, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PasswordResetTokenList. +func (in *PasswordResetTokenList) DeepCopy() *PasswordResetTokenList { + if in == nil { + return nil + } + out := new(PasswordResetTokenList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PasswordResetTokenList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PasswordResetTokenSpec) DeepCopyInto(out *PasswordResetTokenSpec) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PasswordResetTokenSpec. +func (in *PasswordResetTokenSpec) DeepCopy() *PasswordResetTokenSpec { + if in == nil { + return nil + } + out := new(PasswordResetTokenSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PredefinedService) DeepCopyInto(out *PredefinedService) { *out = *in diff --git a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_hobbyfarm.io_client.go b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_hobbyfarm.io_client.go index 2f1cc637..89e068d3 100644 --- a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_hobbyfarm.io_client.go +++ b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_hobbyfarm.io_client.go @@ -48,6 +48,10 @@ func (c *FakeHobbyfarmV1) OneTimeAccessCodes(namespace string) v1.OneTimeAccessC return &FakeOneTimeAccessCodes{c, namespace} } +func (c *FakeHobbyfarmV1) PasswordResetTokens(namespace string) v1.PasswordResetTokenInterface { + return &FakePasswordResetTokens{c, namespace} +} + func (c *FakeHobbyfarmV1) PredefinedServices(namespace string) v1.PredefinedServiceInterface { return &FakePredefinedServices{c, namespace} } diff --git a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_passwordresettoken.go b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_passwordresettoken.go new file mode 100644 index 00000000..6c88b77b --- /dev/null +++ b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/fake/fake_passwordresettoken.go @@ -0,0 +1,129 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1 "github.com/hobbyfarm/gargantua/v3/pkg/apis/hobbyfarm.io/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakePasswordResetTokens implements PasswordResetTokenInterface +type FakePasswordResetTokens struct { + Fake *FakeHobbyfarmV1 + ns string +} + +var passwordresettokensResource = v1.SchemeGroupVersion.WithResource("passwordresettokens") + +var passwordresettokensKind = v1.SchemeGroupVersion.WithKind("PasswordResetToken") + +// Get takes name of the passwordResetToken, and returns the corresponding passwordResetToken object, and an error if there is any. +func (c *FakePasswordResetTokens) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.PasswordResetToken, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(passwordresettokensResource, c.ns, name), &v1.PasswordResetToken{}) + + if obj == nil { + return nil, err + } + return obj.(*v1.PasswordResetToken), err +} + +// List takes label and field selectors, and returns the list of PasswordResetTokens that match those selectors. +func (c *FakePasswordResetTokens) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PasswordResetTokenList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(passwordresettokensResource, passwordresettokensKind, c.ns, opts), &v1.PasswordResetTokenList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1.PasswordResetTokenList{ListMeta: obj.(*v1.PasswordResetTokenList).ListMeta} + for _, item := range obj.(*v1.PasswordResetTokenList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested passwordResetTokens. +func (c *FakePasswordResetTokens) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(passwordresettokensResource, c.ns, opts)) + +} + +// Create takes the representation of a passwordResetToken and creates it. Returns the server's representation of the passwordResetToken, and an error, if there is any. +func (c *FakePasswordResetTokens) Create(ctx context.Context, passwordResetToken *v1.PasswordResetToken, opts metav1.CreateOptions) (result *v1.PasswordResetToken, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(passwordresettokensResource, c.ns, passwordResetToken), &v1.PasswordResetToken{}) + + if obj == nil { + return nil, err + } + return obj.(*v1.PasswordResetToken), err +} + +// Update takes the representation of a passwordResetToken and updates it. Returns the server's representation of the passwordResetToken, and an error, if there is any. +func (c *FakePasswordResetTokens) Update(ctx context.Context, passwordResetToken *v1.PasswordResetToken, opts metav1.UpdateOptions) (result *v1.PasswordResetToken, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(passwordresettokensResource, c.ns, passwordResetToken), &v1.PasswordResetToken{}) + + if obj == nil { + return nil, err + } + return obj.(*v1.PasswordResetToken), err +} + +// Delete takes name of the passwordResetToken and deletes it. Returns an error if one occurs. +func (c *FakePasswordResetTokens) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteActionWithOptions(passwordresettokensResource, c.ns, name, opts), &v1.PasswordResetToken{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakePasswordResetTokens) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { + action := testing.NewDeleteCollectionAction(passwordresettokensResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &v1.PasswordResetTokenList{}) + return err +} + +// Patch applies the patch and returns the patched passwordResetToken. +func (c *FakePasswordResetTokens) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.PasswordResetToken, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(passwordresettokensResource, c.ns, name, pt, data, subresources...), &v1.PasswordResetToken{}) + + if obj == nil { + return nil, err + } + return obj.(*v1.PasswordResetToken), err +} diff --git a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/generated_expansion.go b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/generated_expansion.go index b9b8a66d..9ea7ad00 100644 --- a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/generated_expansion.go +++ b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/generated_expansion.go @@ -28,6 +28,8 @@ type EnvironmentExpansion interface{} type OneTimeAccessCodeExpansion interface{} +type PasswordResetTokenExpansion interface{} + type PredefinedServiceExpansion interface{} type ProgressExpansion interface{} diff --git a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/hobbyfarm.io_client.go b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/hobbyfarm.io_client.go index f100ee4a..39bae739 100644 --- a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/hobbyfarm.io_client.go +++ b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/hobbyfarm.io_client.go @@ -33,6 +33,7 @@ type HobbyfarmV1Interface interface { DynamicBindConfigurationsGetter EnvironmentsGetter OneTimeAccessCodesGetter + PasswordResetTokensGetter PredefinedServicesGetter ProgressesGetter ScenariosGetter @@ -72,6 +73,10 @@ func (c *HobbyfarmV1Client) OneTimeAccessCodes(namespace string) OneTimeAccessCo return newOneTimeAccessCodes(c, namespace) } +func (c *HobbyfarmV1Client) PasswordResetTokens(namespace string) PasswordResetTokenInterface { + return newPasswordResetTokens(c, namespace) +} + func (c *HobbyfarmV1Client) PredefinedServices(namespace string) PredefinedServiceInterface { return newPredefinedServices(c, namespace) } diff --git a/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/passwordresettoken.go b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/passwordresettoken.go new file mode 100644 index 00000000..93429cd1 --- /dev/null +++ b/v3/pkg/client/clientset/versioned/typed/hobbyfarm.io/v1/passwordresettoken.go @@ -0,0 +1,178 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + "time" + + v1 "github.com/hobbyfarm/gargantua/v3/pkg/apis/hobbyfarm.io/v1" + scheme "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// PasswordResetTokensGetter has a method to return a PasswordResetTokenInterface. +// A group's client should implement this interface. +type PasswordResetTokensGetter interface { + PasswordResetTokens(namespace string) PasswordResetTokenInterface +} + +// PasswordResetTokenInterface has methods to work with PasswordResetToken resources. +type PasswordResetTokenInterface interface { + Create(ctx context.Context, passwordResetToken *v1.PasswordResetToken, opts metav1.CreateOptions) (*v1.PasswordResetToken, error) + Update(ctx context.Context, passwordResetToken *v1.PasswordResetToken, opts metav1.UpdateOptions) (*v1.PasswordResetToken, error) + Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error + Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.PasswordResetToken, error) + List(ctx context.Context, opts metav1.ListOptions) (*v1.PasswordResetTokenList, error) + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.PasswordResetToken, err error) + PasswordResetTokenExpansion +} + +// passwordResetTokens implements PasswordResetTokenInterface +type passwordResetTokens struct { + client rest.Interface + ns string +} + +// newPasswordResetTokens returns a PasswordResetTokens +func newPasswordResetTokens(c *HobbyfarmV1Client, namespace string) *passwordResetTokens { + return &passwordResetTokens{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the passwordResetToken, and returns the corresponding passwordResetToken object, and an error if there is any. +func (c *passwordResetTokens) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.PasswordResetToken, err error) { + result = &v1.PasswordResetToken{} + err = c.client.Get(). + Namespace(c.ns). + Resource("passwordresettokens"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of PasswordResetTokens that match those selectors. +func (c *passwordResetTokens) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PasswordResetTokenList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1.PasswordResetTokenList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("passwordresettokens"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested passwordResetTokens. +func (c *passwordResetTokens) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("passwordresettokens"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a passwordResetToken and creates it. Returns the server's representation of the passwordResetToken, and an error, if there is any. +func (c *passwordResetTokens) Create(ctx context.Context, passwordResetToken *v1.PasswordResetToken, opts metav1.CreateOptions) (result *v1.PasswordResetToken, err error) { + result = &v1.PasswordResetToken{} + err = c.client.Post(). + Namespace(c.ns). + Resource("passwordresettokens"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(passwordResetToken). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a passwordResetToken and updates it. Returns the server's representation of the passwordResetToken, and an error, if there is any. +func (c *passwordResetTokens) Update(ctx context.Context, passwordResetToken *v1.PasswordResetToken, opts metav1.UpdateOptions) (result *v1.PasswordResetToken, err error) { + result = &v1.PasswordResetToken{} + err = c.client.Put(). + Namespace(c.ns). + Resource("passwordresettokens"). + Name(passwordResetToken.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(passwordResetToken). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the passwordResetToken and deletes it. Returns an error if one occurs. +func (c *passwordResetTokens) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("passwordresettokens"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *passwordResetTokens) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("passwordresettokens"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched passwordResetToken. +func (c *passwordResetTokens) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.PasswordResetToken, err error) { + result = &v1.PasswordResetToken{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("passwordresettokens"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/v3/pkg/client/informers/externalversions/generic.go b/v3/pkg/client/informers/externalversions/generic.go index 3ac0023d..1c033df9 100644 --- a/v3/pkg/client/informers/externalversions/generic.go +++ b/v3/pkg/client/informers/externalversions/generic.go @@ -65,6 +65,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Hobbyfarm().V1().Environments().Informer()}, nil case v1.SchemeGroupVersion.WithResource("onetimeaccesscodes"): return &genericInformer{resource: resource.GroupResource(), informer: f.Hobbyfarm().V1().OneTimeAccessCodes().Informer()}, nil + case v1.SchemeGroupVersion.WithResource("passwordresettokens"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Hobbyfarm().V1().PasswordResetTokens().Informer()}, nil case v1.SchemeGroupVersion.WithResource("predefinedservices"): return &genericInformer{resource: resource.GroupResource(), informer: f.Hobbyfarm().V1().PredefinedServices().Informer()}, nil case v1.SchemeGroupVersion.WithResource("progresses"): diff --git a/v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/interface.go b/v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/interface.go index b473f9bb..f2b56dae 100644 --- a/v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/interface.go +++ b/v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/interface.go @@ -34,6 +34,8 @@ type Interface interface { Environments() EnvironmentInformer // OneTimeAccessCodes returns a OneTimeAccessCodeInformer. OneTimeAccessCodes() OneTimeAccessCodeInformer + // PasswordResetTokens returns a PasswordResetTokenInformer. + PasswordResetTokens() PasswordResetTokenInformer // PredefinedServices returns a PredefinedServiceInformer. PredefinedServices() PredefinedServiceInformer // Progresses returns a ProgressInformer. @@ -96,6 +98,11 @@ func (v *version) OneTimeAccessCodes() OneTimeAccessCodeInformer { return &oneTimeAccessCodeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// PasswordResetTokens returns a PasswordResetTokenInformer. +func (v *version) PasswordResetTokens() PasswordResetTokenInformer { + return &passwordResetTokenInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // PredefinedServices returns a PredefinedServiceInformer. func (v *version) PredefinedServices() PredefinedServiceInformer { return &predefinedServiceInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/passwordresettoken.go b/v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/passwordresettoken.go new file mode 100644 index 00000000..c4aff4d1 --- /dev/null +++ b/v3/pkg/client/informers/externalversions/hobbyfarm.io/v1/passwordresettoken.go @@ -0,0 +1,90 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + time "time" + + hobbyfarmiov1 "github.com/hobbyfarm/gargantua/v3/pkg/apis/hobbyfarm.io/v1" + versioned "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned" + internalinterfaces "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions/internalinterfaces" + v1 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// PasswordResetTokenInformer provides access to a shared informer and lister for +// PasswordResetTokens. +type PasswordResetTokenInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1.PasswordResetTokenLister +} + +type passwordResetTokenInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewPasswordResetTokenInformer constructs a new informer for PasswordResetToken type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewPasswordResetTokenInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredPasswordResetTokenInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredPasswordResetTokenInformer constructs a new informer for PasswordResetToken type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredPasswordResetTokenInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.HobbyfarmV1().PasswordResetTokens(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.HobbyfarmV1().PasswordResetTokens(namespace).Watch(context.TODO(), options) + }, + }, + &hobbyfarmiov1.PasswordResetToken{}, + resyncPeriod, + indexers, + ) +} + +func (f *passwordResetTokenInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredPasswordResetTokenInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *passwordResetTokenInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&hobbyfarmiov1.PasswordResetToken{}, f.defaultInformer) +} + +func (f *passwordResetTokenInformer) Lister() v1.PasswordResetTokenLister { + return v1.NewPasswordResetTokenLister(f.Informer().GetIndexer()) +} diff --git a/v3/pkg/client/listers/hobbyfarm.io/v1/expansion_generated.go b/v3/pkg/client/listers/hobbyfarm.io/v1/expansion_generated.go index 38197280..73804a89 100644 --- a/v3/pkg/client/listers/hobbyfarm.io/v1/expansion_generated.go +++ b/v3/pkg/client/listers/hobbyfarm.io/v1/expansion_generated.go @@ -58,6 +58,14 @@ type OneTimeAccessCodeListerExpansion interface{} // OneTimeAccessCodeNamespaceLister. type OneTimeAccessCodeNamespaceListerExpansion interface{} +// PasswordResetTokenListerExpansion allows custom methods to be added to +// PasswordResetTokenLister. +type PasswordResetTokenListerExpansion interface{} + +// PasswordResetTokenNamespaceListerExpansion allows custom methods to be added to +// PasswordResetTokenNamespaceLister. +type PasswordResetTokenNamespaceListerExpansion interface{} + // PredefinedServiceListerExpansion allows custom methods to be added to // PredefinedServiceLister. type PredefinedServiceListerExpansion interface{} diff --git a/v3/pkg/client/listers/hobbyfarm.io/v1/passwordresettoken.go b/v3/pkg/client/listers/hobbyfarm.io/v1/passwordresettoken.go new file mode 100644 index 00000000..086a9735 --- /dev/null +++ b/v3/pkg/client/listers/hobbyfarm.io/v1/passwordresettoken.go @@ -0,0 +1,99 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1 + +import ( + v1 "github.com/hobbyfarm/gargantua/v3/pkg/apis/hobbyfarm.io/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// PasswordResetTokenLister helps list PasswordResetTokens. +// All objects returned here must be treated as read-only. +type PasswordResetTokenLister interface { + // List lists all PasswordResetTokens in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1.PasswordResetToken, err error) + // PasswordResetTokens returns an object that can list and get PasswordResetTokens. + PasswordResetTokens(namespace string) PasswordResetTokenNamespaceLister + PasswordResetTokenListerExpansion +} + +// passwordResetTokenLister implements the PasswordResetTokenLister interface. +type passwordResetTokenLister struct { + indexer cache.Indexer +} + +// NewPasswordResetTokenLister returns a new PasswordResetTokenLister. +func NewPasswordResetTokenLister(indexer cache.Indexer) PasswordResetTokenLister { + return &passwordResetTokenLister{indexer: indexer} +} + +// List lists all PasswordResetTokens in the indexer. +func (s *passwordResetTokenLister) List(selector labels.Selector) (ret []*v1.PasswordResetToken, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1.PasswordResetToken)) + }) + return ret, err +} + +// PasswordResetTokens returns an object that can list and get PasswordResetTokens. +func (s *passwordResetTokenLister) PasswordResetTokens(namespace string) PasswordResetTokenNamespaceLister { + return passwordResetTokenNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// PasswordResetTokenNamespaceLister helps list and get PasswordResetTokens. +// All objects returned here must be treated as read-only. +type PasswordResetTokenNamespaceLister interface { + // List lists all PasswordResetTokens in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1.PasswordResetToken, err error) + // Get retrieves the PasswordResetToken from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1.PasswordResetToken, error) + PasswordResetTokenNamespaceListerExpansion +} + +// passwordResetTokenNamespaceLister implements the PasswordResetTokenNamespaceLister +// interface. +type passwordResetTokenNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all PasswordResetTokens in the indexer for a given namespace. +func (s passwordResetTokenNamespaceLister) List(selector labels.Selector) (ret []*v1.PasswordResetToken, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1.PasswordResetToken)) + }) + return ret, err +} + +// Get retrieves the PasswordResetToken from the indexer for a given namespace and name. +func (s passwordResetTokenNamespaceLister) Get(name string) (*v1.PasswordResetToken, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1.Resource("passwordresettoken"), name) + } + return obj.(*v1.PasswordResetToken), nil +} diff --git a/v3/pkg/microservices/microservices.go b/v3/pkg/microservices/microservices.go index fd94a7dd..e2ada05c 100644 --- a/v3/pkg/microservices/microservices.go +++ b/v3/pkg/microservices/microservices.go @@ -6,15 +6,16 @@ import ( "crypto/x509" "flag" "fmt" - hfClientset "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned" - tls2 "github.com/hobbyfarm/gargantua/v3/pkg/tls" - "github.com/hobbyfarm/gargantua/v3/pkg/util" "net" "net/http" "os" "sync" "time" + hfClientset "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned" + tls2 "github.com/hobbyfarm/gargantua/v3/pkg/tls" + "github.com/hobbyfarm/gargantua/v3/pkg/util" + "github.com/golang/glog" "github.com/gorilla/handlers" "github.com/gorilla/mux" @@ -25,6 +26,7 @@ import ( "google.golang.org/grpc/reflection" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" ) // Add type MicroService based on string that is used to define constants for every service @@ -342,3 +344,36 @@ func BuildServiceConfig() *ServiceConfig { return cfg } + +type onStartedLeading func(context.Context) + +func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, stopControllersCh <-chan struct{}, onStartedLeadingFunc onStartedLeading) { + lock, err := util.GetLock(string(svc), cfg) + if err != nil { + glog.Fatal(err) + } + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 10 * time.Second, + RenewDeadline: 5 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: onStartedLeadingFunc, + OnStoppedLeading: func() { + // Need to start informer factory since even when not leader to ensure api layer + // keeps working. + glog.Info("Stopped being the leader. Shutting down controllers") + <-stopControllersCh // Send the stopControllers Signal + }, + OnNewLeader: func(current_id string) { + if current_id == lock.Identity() { + glog.Info("I am currently the leader") + return + } + glog.Infof("Leader changed to %s", current_id) + + }, + }, + }) +} diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go new file mode 100644 index 00000000..7e2aa5c4 --- /dev/null +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -0,0 +1,159 @@ +package userservicecontroller + +import ( + "context" + "time" + + userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal" + hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" + v1 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v1" + util2 "github.com/hobbyfarm/gargantua/v3/pkg/util" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +const ( + TOKEN_MAX_VALID_DURATION = time.Hour * 1 +) + +type PasswordResetTokenController struct { + internalUserServer *userservice.GrpcUserServer + + workqueue workqueue.DelayingInterface + + prtLister v1.PasswordResetTokenLister + prtInformer cache.SharedIndexInformer + prtSynced cache.InformerSynced + + started bool + + ctx context.Context +} + +func NewPasswordResetTokenController(internalUserServer *userservice.GrpcUserServer, hfInformerFactory hfInformers.SharedInformerFactory, ctx context.Context) (*PasswordResetTokenController, error) { + prtController := PasswordResetTokenController{} + prtController.internalUserServer = internalUserServer + prtController.ctx = ctx + prtController.prtSynced = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer().HasSynced + + prtController.workqueue = workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{ + Name: "prt-queue", + }) + prtController.prtLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() + + prtInformer := hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer() + + prtInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: prtController.enqueue, + UpdateFunc: func(old, new interface{}) { + prtController.enqueue(new) + }, + }, time.Minute*30) + + return &prtController, nil +} + +func (s *PasswordResetTokenController) enqueue(obj interface{}) { + if s.workqueue.ShuttingDown() || !s.started { + return + } + + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + //utilruntime.HandleError(err) + return + } + glog.V(8).Infof("Enqueueing PRT %s", key) + //s.ssWorkqueue.AddRateLimited(key) + s.workqueue.Add(key) +} + +func (s *PasswordResetTokenController) Run(stopCh <-chan struct{}) error { + defer s.workqueue.ShutDown() + s.started = true + + glog.V(4).Infof("Starting Token controller") + go wait.Until(s.runWorker, time.Second, stopCh) + glog.Info("Started Token controller workers") + + <-stopCh + s.started = false + glog.V(4).Infof("Stopping Token controller due to stop signal") + return nil +} + +func (s *PasswordResetTokenController) runWorker() { + glog.V(6).Infof("Starting Token worker") + for s.processNextSession() { + + } +} + +func (s *PasswordResetTokenController) processNextSession() bool { + obj, shutdown := s.workqueue.Get() + + if shutdown { + return false + } + + err := func() error { + defer s.workqueue.Done(obj) + glog.V(8).Infof("processing token in token controller: %v", obj) + _, objName, err := cache.SplitMetaNamespaceKey(obj.(string)) + if err != nil { + glog.Errorf("error while splitting meta namespace key %v", err) + return nil + } + + err = s.reconcile(objName) + + if err != nil { + glog.Error(err) + } + + glog.V(8).Infof("PRT processed by PasswordResetToken controller: %v", objName) + return nil + + }() + + if err != nil { + return true + } + + return true +} + +func (s *PasswordResetTokenController) reconcile(token string) error { + glog.V(4).Infof("reconciling PRT %s", token) + + passwordResetToken, err := s.prtLister.PasswordResetTokens(util2.GetReleaseNamespace()).Get(token) + + if err != nil { + return err + } + + now := time.Now() + + expires, err := time.Parse(time.UnixDate, passwordResetToken.Spec.Timestamp) + expires = expires.Add(TOKEN_MAX_VALID_DURATION) + + if err != nil { + return err + } + + timeUntilExpires := expires.Sub(now) + + if timeUntilExpires < 0 { + glog.V(4).Infof("PRT %s seems to old, can be deleted", passwordResetToken.Name) + } else { + // requeue the session at the correct expiration time + glog.V(4).Infof("Requeueing PRT %s", passwordResetToken.Name) + s.workqueue.AddAfter(passwordResetToken, timeUntilExpires) + } + + return nil +} diff --git a/v3/services/usersvc/internal/crd.go b/v3/services/usersvc/internal/crd.go index 7d50fbaa..eed7d598 100644 --- a/v3/services/usersvc/internal/crd.go +++ b/v3/services/usersvc/internal/crd.go @@ -33,6 +33,16 @@ func GenerateUserCRD(caBundle string, reference crd.ServiceReference) []crder.CR WithVersions("v2", "v1") }) }), + hobbyfarmCRD(&v1.PasswordResetToken{}, func(c *crder.CRD) { + c.IsNamespaced(true).AddVersion("v1", &v1.PasswordResetToken{}, func(cv *crder.Version) { + cv. + WithColumn("User", ".spec.user"). + WithColumn("Timestamp", ".spec.timestamp"). + WithColumn("Duration", ".spec.duration"). + IsServed(true). + IsStored(true) + }) + }), } } diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index 9fd81fd6..1f1f3a0b 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -11,9 +11,11 @@ import ( "github.com/hobbyfarm/gargantua/v3/pkg/microservices" "github.com/hobbyfarm/gargantua/v3/pkg/signals" "github.com/hobbyfarm/gargantua/v3/pkg/util" + "golang.org/x/sync/errgroup" "github.com/golang/glog" userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal" + userservicecontroller "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal/controllers" hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" "github.com/hobbyfarm/gargantua/v3/protos/authn" @@ -95,6 +97,23 @@ func main() { microservices.StartAPIServer(userServer) }() + stopControllersCh := signals.SetupSignalHandler() + g, gctx := errgroup.WithContext(ctx) + passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(us, hfInformerFactory, gctx) + if err != nil { + glog.Fatalf("starting grpc user server failed: %v", err) + } + + microservices.ElectLeaderOrDie(microservices.User, cfg, gctx, stopControllersCh, func(c context.Context) { + glog.Info("Started being the leader. Starting controllers") + g.Go(func() error { + return passwordResetTokenController.Run(stopControllersCh) + }) + if err != nil { + glog.Fatal(err) + } + }) + stopCh := signals.SetupSignalHandler() hfInformerFactory.Start(stopCh) wg.Wait() From dac15e95982ae6a04acf4e4783b87543e42ab1de Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Fri, 26 Jan 2024 17:10:07 +0100 Subject: [PATCH 02/12] Change to channel --- .../internal/controllers/passwordResetTokenController.go | 6 ++++-- v3/services/usersvc/main.go | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index 7e2aa5c4..61b04065 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -58,6 +58,7 @@ func NewPasswordResetTokenController(internalUserServer *userservice.GrpcUserSer func (s *PasswordResetTokenController) enqueue(obj interface{}) { if s.workqueue.ShuttingDown() || !s.started { + glog.V(8).Infof("token is not being added to the workqueue while controller is not started") return } @@ -65,6 +66,7 @@ func (s *PasswordResetTokenController) enqueue(obj interface{}) { var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { //utilruntime.HandleError(err) + glog.V(8).Infof("Error enquing token %s: %v", key, err) return } glog.V(8).Infof("Enqueueing PRT %s", key) @@ -115,7 +117,7 @@ func (s *PasswordResetTokenController) processNextSession() bool { glog.Error(err) } - glog.V(8).Infof("PRT processed by PasswordResetToken controller: %v", objName) + glog.V(8).Infof("token processed by Token controller: %v", objName) return nil }() @@ -128,7 +130,7 @@ func (s *PasswordResetTokenController) processNextSession() bool { } func (s *PasswordResetTokenController) reconcile(token string) error { - glog.V(4).Infof("reconciling PRT %s", token) + glog.V(4).Infof("reconciling token %s", token) passwordResetToken, err := s.prtLister.PasswordResetTokens(util2.GetReleaseNamespace()).Get(token) diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index 1f1f3a0b..2e36327f 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -97,7 +97,7 @@ func main() { microservices.StartAPIServer(userServer) }() - stopControllersCh := signals.SetupSignalHandler() + stopControllersCh := make(chan struct{}, 1) g, gctx := errgroup.WithContext(ctx) passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(us, hfInformerFactory, gctx) if err != nil { @@ -114,7 +114,7 @@ func main() { } }) - stopCh := signals.SetupSignalHandler() - hfInformerFactory.Start(stopCh) + stopInformerFactoryCh := signals.SetupSignalHandler() + hfInformerFactory.Start(stopInformerFactoryCh) wg.Wait() } From 2c07a149d366dc47541b68b401131f4b8c5f44fa Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Mon, 29 Jan 2024 15:40:50 +0100 Subject: [PATCH 03/12] Improve controller --- v3/pkg/apis/hobbyfarm.io/v1/register.go | 2 + v3/pkg/microservices/microservices.go | 4 +- .../passwordResetTokenController.go | 67 +++++++++++-------- v3/services/usersvc/main.go | 7 +- 4 files changed, 47 insertions(+), 33 deletions(-) diff --git a/v3/pkg/apis/hobbyfarm.io/v1/register.go b/v3/pkg/apis/hobbyfarm.io/v1/register.go index e1c15518..54f9196f 100644 --- a/v3/pkg/apis/hobbyfarm.io/v1/register.go +++ b/v3/pkg/apis/hobbyfarm.io/v1/register.go @@ -51,6 +51,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ScheduledEventList{}, &DynamicBindConfiguration{}, &DynamicBindConfigurationList{}, + &PasswordResetToken{}, + &PasswordResetTokenList{}, &Progress{}, &ProgressList{}, &Setting{}, diff --git a/v3/pkg/microservices/microservices.go b/v3/pkg/microservices/microservices.go index e2ada05c..1f2fd3fa 100644 --- a/v3/pkg/microservices/microservices.go +++ b/v3/pkg/microservices/microservices.go @@ -347,7 +347,7 @@ func BuildServiceConfig() *ServiceConfig { type onStartedLeading func(context.Context) -func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, stopControllersCh <-chan struct{}, onStartedLeadingFunc onStartedLeading) { +func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, stopControllersCh chan<- struct{}, onStartedLeadingFunc onStartedLeading) { lock, err := util.GetLock(string(svc), cfg) if err != nil { glog.Fatal(err) @@ -364,7 +364,7 @@ func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, s // Need to start informer factory since even when not leader to ensure api layer // keeps working. glog.Info("Stopped being the leader. Shutting down controllers") - <-stopControllersCh // Send the stopControllers Signal + stopControllersCh <- struct{}{} // Send the stopControllers Signal }, OnNewLeader: func(current_id string) { if current_id == lock.Identity() { diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index 61b04065..64cb61b6 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -2,6 +2,7 @@ package userservicecontroller import ( "context" + "fmt" "time" userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal" @@ -43,20 +44,13 @@ func NewPasswordResetTokenController(internalUserServer *userservice.GrpcUserSer Name: "prt-queue", }) prtController.prtLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() - - prtInformer := hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer() - - prtInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ - AddFunc: prtController.enqueue, - UpdateFunc: func(old, new interface{}) { - prtController.enqueue(new) - }, - }, time.Minute*30) - + prtController.prtInformer = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer() return &prtController, nil } func (s *PasswordResetTokenController) enqueue(obj interface{}) { + glog.V(8).Infof("Trying to enqueue PRT %v", obj) + if s.workqueue.ShuttingDown() || !s.started { glog.V(8).Infof("token is not being added to the workqueue while controller is not started") return @@ -65,45 +59,62 @@ func (s *PasswordResetTokenController) enqueue(obj interface{}) { var key string var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - //utilruntime.HandleError(err) glog.V(8).Infof("Error enquing token %s: %v", key, err) return } glog.V(8).Infof("Enqueueing PRT %s", key) - //s.ssWorkqueue.AddRateLimited(key) s.workqueue.Add(key) } -func (s *PasswordResetTokenController) Run(stopCh <-chan struct{}) error { - defer s.workqueue.ShutDown() - s.started = true - +func (c *PasswordResetTokenController) Run(stopCh <-chan struct{}) error { + defer c.workqueue.ShutDown() glog.V(4).Infof("Starting Token controller") - go wait.Until(s.runWorker, time.Second, stopCh) + reg, err := c.prtInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueue, + UpdateFunc: func(old, new interface{}) { + c.enqueue(new) + }, + }, time.Minute*30) + + if err != nil { + glog.V(4).Infof("Event Handler could not be started. Aborting controller start") + return err + } + + glog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.prtSynced); !ok { + glog.V(4).Infof("Error: failed to wait for informer caches to sync") + return fmt.Errorf("failed to wait for informer caches to sync") + } + + go wait.Until(c.runWorker, time.Second, stopCh) glog.Info("Started Token controller workers") + c.started = true <-stopCh - s.started = false glog.V(4).Infof("Stopping Token controller due to stop signal") + c.prtInformer.RemoveEventHandler(reg) + c.started = false + glog.V(4).Infof("Token controller was stopped.") return nil } -func (s *PasswordResetTokenController) runWorker() { +func (c *PasswordResetTokenController) runWorker() { glog.V(6).Infof("Starting Token worker") - for s.processNextSession() { + for c.processNextToken() { } } -func (s *PasswordResetTokenController) processNextSession() bool { - obj, shutdown := s.workqueue.Get() +func (c *PasswordResetTokenController) processNextToken() bool { + obj, shutdown := c.workqueue.Get() if shutdown { return false } err := func() error { - defer s.workqueue.Done(obj) + defer c.workqueue.Done(obj) glog.V(8).Infof("processing token in token controller: %v", obj) _, objName, err := cache.SplitMetaNamespaceKey(obj.(string)) if err != nil { @@ -111,7 +122,7 @@ func (s *PasswordResetTokenController) processNextSession() bool { return nil } - err = s.reconcile(objName) + err = c.reconcile(objName) if err != nil { glog.Error(err) @@ -129,10 +140,10 @@ func (s *PasswordResetTokenController) processNextSession() bool { return true } -func (s *PasswordResetTokenController) reconcile(token string) error { +func (c *PasswordResetTokenController) reconcile(token string) error { glog.V(4).Infof("reconciling token %s", token) - passwordResetToken, err := s.prtLister.PasswordResetTokens(util2.GetReleaseNamespace()).Get(token) + passwordResetToken, err := c.prtLister.PasswordResetTokens(util2.GetReleaseNamespace()).Get(token) if err != nil { return err @@ -152,9 +163,9 @@ func (s *PasswordResetTokenController) reconcile(token string) error { if timeUntilExpires < 0 { glog.V(4).Infof("PRT %s seems to old, can be deleted", passwordResetToken.Name) } else { - // requeue the session at the correct expiration time + // requeue the token at the correct expiration time glog.V(4).Infof("Requeueing PRT %s", passwordResetToken.Name) - s.workqueue.AddAfter(passwordResetToken, timeUntilExpires) + c.workqueue.AddAfter(passwordResetToken, timeUntilExpires) } return nil diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index 2e36327f..e113f08e 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -97,13 +97,16 @@ func main() { microservices.StartAPIServer(userServer) }() - stopControllersCh := make(chan struct{}, 1) + stopControllersCh := make(chan struct{}) g, gctx := errgroup.WithContext(ctx) passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(us, hfInformerFactory, gctx) if err != nil { glog.Fatalf("starting grpc user server failed: %v", err) } + stopInformerFactoryCh := signals.SetupSignalHandler() + hfInformerFactory.Start(stopInformerFactoryCh) + microservices.ElectLeaderOrDie(microservices.User, cfg, gctx, stopControllersCh, func(c context.Context) { glog.Info("Started being the leader. Starting controllers") g.Go(func() error { @@ -114,7 +117,5 @@ func main() { } }) - stopInformerFactoryCh := signals.SetupSignalHandler() - hfInformerFactory.Start(stopInformerFactoryCh) wg.Wait() } From e2a4b7be10145cc88c3bf1440d8143fb8b78a18e Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Tue, 30 Jan 2024 15:00:40 +0100 Subject: [PATCH 04/12] Improve controller structure further --- .../passwordResetTokenController.go | 55 +++++++++++-------- v3/services/usersvc/main.go | 34 +++++++++--- 2 files changed, 56 insertions(+), 33 deletions(-) diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index 64cb61b6..56ff40cd 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -18,6 +18,7 @@ import ( const ( TOKEN_MAX_VALID_DURATION = time.Hour * 1 + WORKQUEUE_NAME = "workqueue-token" ) type PasswordResetTokenController struct { @@ -25,55 +26,58 @@ type PasswordResetTokenController struct { workqueue workqueue.DelayingInterface - prtLister v1.PasswordResetTokenLister - prtInformer cache.SharedIndexInformer - prtSynced cache.InformerSynced + prtLister v1.PasswordResetTokenLister + prtInformer cache.SharedIndexInformer + prtHandlerRegistration cache.ResourceEventHandlerRegistration started bool ctx context.Context } -func NewPasswordResetTokenController(internalUserServer *userservice.GrpcUserServer, hfInformerFactory hfInformers.SharedInformerFactory, ctx context.Context) (*PasswordResetTokenController, error) { +func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, ctx context.Context) (*PasswordResetTokenController, error) { prtController := PasswordResetTokenController{} - prtController.internalUserServer = internalUserServer prtController.ctx = ctx - prtController.prtSynced = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer().HasSynced - - prtController.workqueue = workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{ - Name: "prt-queue", - }) prtController.prtLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() prtController.prtInformer = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer() + return &prtController, nil } -func (s *PasswordResetTokenController) enqueue(obj interface{}) { - glog.V(8).Infof("Trying to enqueue PRT %v", obj) +func (c *PasswordResetTokenController) enqueue(obj interface{}) { + glog.V(4).Infof("Trying to enqueue PRT %v", obj) // TODO remove - if s.workqueue.ShuttingDown() || !s.started { - glog.V(8).Infof("token is not being added to the workqueue while controller is not started") + if !c.started || c.workqueue == nil || c.workqueue.ShuttingDown() { + glog.V(4).Infof("token is not being added to the workqueue while controller is not started") return } var key string var err error if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - glog.V(8).Infof("Error enquing token %s: %v", key, err) + glog.V(4).Infof("Error enquing token %s: %v", key, err) return } - glog.V(8).Infof("Enqueueing PRT %s", key) - s.workqueue.Add(key) + glog.V(4).Infof("Enqueueing PRT %s", key) + c.workqueue.Add(key) } func (c *PasswordResetTokenController) Run(stopCh <-chan struct{}) error { - defer c.workqueue.ShutDown() + defer c.stopWorker() + glog.V(4).Infof("Starting Token controller") + + c.workqueue = workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{ + Name: WORKQUEUE_NAME, + }) + defer c.workqueue.ShutDown() + reg, err := c.prtInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueue, UpdateFunc: func(old, new interface{}) { c.enqueue(new) }, + DeleteFunc: c.enqueue, }, time.Minute*30) if err != nil { @@ -81,21 +85,20 @@ func (c *PasswordResetTokenController) Run(stopCh <-chan struct{}) error { return err } + c.prtHandlerRegistration = reg + defer c.prtInformer.RemoveEventHandler(c.prtHandlerRegistration) + glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.prtSynced); !ok { + if ok := cache.WaitForCacheSync(stopCh, c.prtInformer.HasSynced); !ok { glog.V(4).Infof("Error: failed to wait for informer caches to sync") return fmt.Errorf("failed to wait for informer caches to sync") } go wait.Until(c.runWorker, time.Second, stopCh) - glog.Info("Started Token controller workers") c.started = true - + glog.Info("Started Token controller worker") <-stopCh glog.V(4).Infof("Stopping Token controller due to stop signal") - c.prtInformer.RemoveEventHandler(reg) - c.started = false - glog.V(4).Infof("Token controller was stopped.") return nil } @@ -170,3 +173,7 @@ func (c *PasswordResetTokenController) reconcile(token string) error { return nil } + +func (c *PasswordResetTokenController) stopWorker() { + c.started = false +} diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index e113f08e..1f51113c 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -16,6 +16,7 @@ import ( "github.com/golang/glog" userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal" userservicecontroller "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal/controllers" + "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned" hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" "github.com/hobbyfarm/gargantua/v3/protos/authn" @@ -98,20 +99,13 @@ func main() { }() stopControllersCh := make(chan struct{}) - g, gctx := errgroup.WithContext(ctx) - passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(us, hfInformerFactory, gctx) - if err != nil { - glog.Fatalf("starting grpc user server failed: %v", err) - } stopInformerFactoryCh := signals.SetupSignalHandler() hfInformerFactory.Start(stopInformerFactoryCh) - microservices.ElectLeaderOrDie(microservices.User, cfg, gctx, stopControllersCh, func(c context.Context) { + microservices.ElectLeaderOrDie(microservices.User, cfg, ctx, stopControllersCh, func(c context.Context) { glog.Info("Started being the leader. Starting controllers") - g.Go(func() error { - return passwordResetTokenController.Run(stopControllersCh) - }) + startControllers(ctx, hfClient, stopControllersCh) if err != nil { glog.Fatal(err) } @@ -119,3 +113,25 @@ func main() { wg.Wait() } + +func startControllers(ctx context.Context, hfClient *versioned.Clientset, stopControllersCh <-chan struct{}) error { + hfInformerFactory := hfInformers.NewSharedInformerFactoryWithOptions(hfClient, time.Second*30, hfInformers.WithNamespace(util.GetReleaseNamespace())) + g, gctx := errgroup.WithContext(ctx) + + passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, gctx) + if err != nil { + glog.Fatalf("starting passwordResetTokenController failed: %v", err) + } + + g.Go(func() error { + return passwordResetTokenController.Run(stopControllersCh) + }) + + hfInformerFactory.Start(stopControllersCh) + + if err = g.Wait(); err != nil { + glog.Errorf("Error starting up the controllers: %v", err) + return err + } + return nil +} From 6f6f5ee9095848c729430e1b3ae00ee39629ecd7 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Wed, 31 Jan 2024 14:58:25 +0100 Subject: [PATCH 05/12] Add final proposal for controllers in microservices --- v3/pkg/microservices/controller.go | 145 ++++++++++++++ .../delayingWorkqueueController.go | 32 ++++ .../passwordResetTokenController.go | 181 ++++-------------- v3/services/usersvc/internal/crd.go | 18 +- 4 files changed, 222 insertions(+), 154 deletions(-) create mode 100644 v3/pkg/microservices/controller.go create mode 100644 v3/pkg/microservices/delayingWorkqueueController.go diff --git a/v3/pkg/microservices/controller.go b/v3/pkg/microservices/controller.go new file mode 100644 index 00000000..9047dacd --- /dev/null +++ b/v3/pkg/microservices/controller.go @@ -0,0 +1,145 @@ +package microservices + +import ( + "context" + "fmt" + "time" + + "github.com/golang/glog" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type BaseController struct { + Workqueue workqueue.Interface + Started bool + Context context.Context + Informer cache.SharedIndexInformer + InformerHandlerRegistration cache.ResourceEventHandlerRegistration + ResyncPeriod time.Duration + ReconcileFunc func(objName string) error +} + +// Should not be instiantiated in its own, use specific implementation of delayingWorkqueueController or RateLimitingWorkqueueController +func NewBaseController(ctx context.Context, informer cache.SharedIndexInformer, reconcileFunc func(objName string) error, resyncPeriod time.Duration) *BaseController { + return &BaseController{ + Workqueue: workqueue.New(), + Context: ctx, + Informer: informer, + ResyncPeriod: resyncPeriod, + ReconcileFunc: reconcileFunc, + } +} + +func (c *BaseController) AddEventHandlerWithResyncPeriod() error { + glog.Info("Add EventHandlerWithResyncPeriod") + reg, err := c.Informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: c.enqueue, + UpdateFunc: func(old, new interface{}) { + c.enqueue(new) + }, + DeleteFunc: c.enqueue, + }, c.ResyncPeriod) + + if err != nil { + glog.V(4).Infof("Event Handler could not be started. Aborting controller start") + return err + } + + c.InformerHandlerRegistration = reg + return nil +} + +// Override this method if you need to wait for other informers aswell +func (c *BaseController) WaitForCacheSync(stopCh <-chan struct{}) error { + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.Informer.HasSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + return nil +} + +func (c *BaseController) Run(stopCh <-chan struct{}) error { + defer c.Workqueue.ShutDown() + defer c.stopWorker() + + glog.Info("Starting controller") + c.Started = true + + err := c.AddEventHandlerWithResyncPeriod() + if err != nil { + return err + } + defer c.Informer.RemoveEventHandler(c.InformerHandlerRegistration) + + err = c.WaitForCacheSync(stopCh) + if err != nil { + return err + } + + go wait.Until(c.runWorker, time.Second, stopCh) + + <-stopCh + glog.Info("Stopping base controller") + return nil +} + +func (c *BaseController) enqueue(obj interface{}) { + if !c.Started || c.Workqueue == nil || c.Workqueue.ShuttingDown() { + glog.V(4).Infof("Object is not being added to the workqueue while controller is not started") + return + } + + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + glog.V(4).Infof("Error enquing %s: %v", key, err) + return + } + glog.V(4).Infof("Enqueueing: %s", key) + c.Workqueue.Add(key) +} + +func (c *BaseController) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *BaseController) processNextWorkItem() bool { + obj, shutdown := c.Workqueue.Get() + + if shutdown { + return false + } + + err := func() error { + defer c.Workqueue.Done(obj) + glog.V(8).Infof("processing next token in queue: %v", obj) + _, objName, err := cache.SplitMetaNamespaceKey(obj.(string)) + if err != nil { + glog.Errorf("error while splitting meta namespace key %v", err) + return nil + } + + err = c.ReconcileFunc(objName) + + if err != nil { + glog.Error(err) + } + + glog.Infof("Successfully processed: %s", objName) + return nil + }() + + if err != nil { + return true + } + + return true +} + +func (c *BaseController) stopWorker() { + c.Started = false +} diff --git a/v3/pkg/microservices/delayingWorkqueueController.go b/v3/pkg/microservices/delayingWorkqueueController.go new file mode 100644 index 00000000..6dde6812 --- /dev/null +++ b/v3/pkg/microservices/delayingWorkqueueController.go @@ -0,0 +1,32 @@ +package microservices + +import ( + "context" + "fmt" + "time" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type DelayingWorkqueueController struct { + BaseController +} + +func NewDelayingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, reconcileFunc func(objName string) error, queueName string, resyncPeriod time.Duration) *DelayingWorkqueueController { + dwqc := DelayingWorkqueueController{ + *NewBaseController(ctx, informer, reconcileFunc, resyncPeriod), + } + + dwqc.Workqueue = workqueue.NewNamedDelayingQueue(queueName) + + return &dwqc +} + +func (dwq DelayingWorkqueueController) GetDelayingWorkqueue() (workqueue.DelayingInterface, error) { + delayingQueue, ok := dwq.Workqueue.(workqueue.DelayingInterface) + if !ok { + return nil, fmt.Errorf("workqueue is not a DelayingQueue") + } + return delayingQueue, nil +} diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index 56ff40cd..d547a585 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -2,178 +2,67 @@ package userservicecontroller import ( "context" - "fmt" "time" - userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal" hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" - v1 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v1" - util2 "github.com/hobbyfarm/gargantua/v3/pkg/util" + informerV1 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v1" + "github.com/hobbyfarm/gargantua/v3/pkg/microservices" "github.com/golang/glog" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" ) const ( - TOKEN_MAX_VALID_DURATION = time.Hour * 1 - WORKQUEUE_NAME = "workqueue-token" + WORKQUEUE_NAME = "workqueue-token" ) -type PasswordResetTokenController struct { - internalUserServer *userservice.GrpcUserServer - - workqueue workqueue.DelayingInterface - - prtLister v1.PasswordResetTokenLister - prtInformer cache.SharedIndexInformer - prtHandlerRegistration cache.ResourceEventHandlerRegistration - - started bool - - ctx context.Context -} - -func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, ctx context.Context) (*PasswordResetTokenController, error) { - prtController := PasswordResetTokenController{} - prtController.ctx = ctx - prtController.prtLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() - prtController.prtInformer = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer() - - return &prtController, nil +type TokenController struct { + microservices.DelayingWorkqueueController + prtLister informerV1.PasswordResetTokenLister } -func (c *PasswordResetTokenController) enqueue(obj interface{}) { - glog.V(4).Infof("Trying to enqueue PRT %v", obj) // TODO remove - - if !c.started || c.workqueue == nil || c.workqueue.ShuttingDown() { - glog.V(4).Infof("token is not being added to the workqueue while controller is not started") - return +func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, ctx context.Context) (*TokenController, error) { + tokenController := TokenController{ + DelayingWorkqueueController: *microservices.NewDelayingWorkqueueController( + ctx, + hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer(), + ReconcileTokenFunc, + WORKQUEUE_NAME, + 30*time.Minute), } + tokenController.prtLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() - var key string - var err error - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - glog.V(4).Infof("Error enquing token %s: %v", key, err) - return - } - glog.V(4).Infof("Enqueueing PRT %s", key) - c.workqueue.Add(key) + return &tokenController, nil } -func (c *PasswordResetTokenController) Run(stopCh <-chan struct{}) error { - defer c.stopWorker() - - glog.V(4).Infof("Starting Token controller") - - c.workqueue = workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{ - Name: WORKQUEUE_NAME, - }) - defer c.workqueue.ShutDown() - - reg, err := c.prtInformer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueue, - UpdateFunc: func(old, new interface{}) { - c.enqueue(new) - }, - DeleteFunc: c.enqueue, - }, time.Minute*30) +func ReconcileTokenFunc(objName string) error { + glog.V(4).Infof("reconciling token inside specific token controller %s", objName) - if err != nil { - glog.V(4).Infof("Event Handler could not be started. Aborting controller start") - return err - } - - c.prtHandlerRegistration = reg - defer c.prtInformer.RemoveEventHandler(c.prtHandlerRegistration) - - glog.Info("Waiting for informer caches to sync") - if ok := cache.WaitForCacheSync(stopCh, c.prtInformer.HasSynced); !ok { - glog.V(4).Infof("Error: failed to wait for informer caches to sync") - return fmt.Errorf("failed to wait for informer caches to sync") - } - - go wait.Until(c.runWorker, time.Second, stopCh) - c.started = true - glog.Info("Started Token controller worker") - <-stopCh - glog.V(4).Infof("Stopping Token controller due to stop signal") - return nil -} - -func (c *PasswordResetTokenController) runWorker() { - glog.V(6).Infof("Starting Token worker") - for c.processNextToken() { - - } -} + /* + passwordResetToken, err := c.prtLister.PasswordResetTokens(util2.GetReleaseNamespace()).Get(token) -func (c *PasswordResetTokenController) processNextToken() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { - return false - } - - err := func() error { - defer c.workqueue.Done(obj) - glog.V(8).Infof("processing token in token controller: %v", obj) - _, objName, err := cache.SplitMetaNamespaceKey(obj.(string)) if err != nil { - glog.Errorf("error while splitting meta namespace key %v", err) - return nil + return err } - err = c.reconcile(objName) + now := time.Now() + + expires, err := time.Parse(time.UnixDate, passwordResetToken.Spec.Timestamp) + expires = expires.Add(TOKEN_MAX_VALID_DURATION) if err != nil { - glog.Error(err) + return err } - glog.V(8).Infof("token processed by Token controller: %v", objName) - return nil - - }() - - if err != nil { - return true - } - - return true -} - -func (c *PasswordResetTokenController) reconcile(token string) error { - glog.V(4).Infof("reconciling token %s", token) - - passwordResetToken, err := c.prtLister.PasswordResetTokens(util2.GetReleaseNamespace()).Get(token) + timeUntilExpires := expires.Sub(now) - if err != nil { - return err - } - - now := time.Now() - - expires, err := time.Parse(time.UnixDate, passwordResetToken.Spec.Timestamp) - expires = expires.Add(TOKEN_MAX_VALID_DURATION) - - if err != nil { - return err - } - - timeUntilExpires := expires.Sub(now) - - if timeUntilExpires < 0 { - glog.V(4).Infof("PRT %s seems to old, can be deleted", passwordResetToken.Name) - } else { - // requeue the token at the correct expiration time - glog.V(4).Infof("Requeueing PRT %s", passwordResetToken.Name) - c.workqueue.AddAfter(passwordResetToken, timeUntilExpires) - } + if timeUntilExpires < 0 { + glog.V(4).Infof("PRT %s seems to old, can be deleted", passwordResetToken.Name) + } else { + // requeue the token at the correct expiration time + glog.V(4).Infof("Requeueing PRT %s", passwordResetToken.Name) + c.workqueue.AddAfter(passwordResetToken, timeUntilExpires) + } + */ return nil } - -func (c *PasswordResetTokenController) stopWorker() { - c.started = false -} diff --git a/v3/services/usersvc/internal/crd.go b/v3/services/usersvc/internal/crd.go index eed7d598..3398f073 100644 --- a/v3/services/usersvc/internal/crd.go +++ b/v3/services/usersvc/internal/crd.go @@ -34,14 +34,16 @@ func GenerateUserCRD(caBundle string, reference crd.ServiceReference) []crder.CR }) }), hobbyfarmCRD(&v1.PasswordResetToken{}, func(c *crder.CRD) { - c.IsNamespaced(true).AddVersion("v1", &v1.PasswordResetToken{}, func(cv *crder.Version) { - cv. - WithColumn("User", ".spec.user"). - WithColumn("Timestamp", ".spec.timestamp"). - WithColumn("Duration", ".spec.duration"). - IsServed(true). - IsStored(true) - }) + c. + IsNamespaced(true). + AddVersion("v1", &v1.PasswordResetToken{}, func(cv *crder.Version) { + cv. + WithColumn("User", ".spec.user"). + WithColumn("Timestamp", ".spec.timestamp"). + WithColumn("Duration", ".spec.duration"). + IsServed(true). + IsStored(true) + }) }), } } From 5b99f95d6503e93106cab2ba16f042e8cb78fa55 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Thu, 1 Feb 2024 13:27:58 +0100 Subject: [PATCH 06/12] Further improvements --- .../{ => controller}/controller.go | 71 ++++++++++++------ .../controller/delayingWorkqueueController.go | 32 ++++++++ .../rateLimitingWorkqueueController.go | 32 ++++++++ .../delayingWorkqueueController.go | 32 -------- .../passwordResetTokenController.go | 74 +++++++++++-------- 5 files changed, 156 insertions(+), 85 deletions(-) rename v3/pkg/microservices/{ => controller}/controller.go (60%) create mode 100644 v3/pkg/microservices/controller/delayingWorkqueueController.go create mode 100644 v3/pkg/microservices/controller/rateLimitingWorkqueueController.go delete mode 100644 v3/pkg/microservices/delayingWorkqueueController.go diff --git a/v3/pkg/microservices/controller.go b/v3/pkg/microservices/controller/controller.go similarity index 60% rename from v3/pkg/microservices/controller.go rename to v3/pkg/microservices/controller/controller.go index 9047dacd..f4cbe7e1 100644 --- a/v3/pkg/microservices/controller.go +++ b/v3/pkg/microservices/controller/controller.go @@ -11,29 +11,35 @@ import ( "k8s.io/client-go/util/workqueue" ) +// Reconciler required Reconcile method. +type Reconciler interface { + Reconcile(objName string) error +} + type BaseController struct { - Workqueue workqueue.Interface + name string + workqueue workqueue.Interface Started bool Context context.Context - Informer cache.SharedIndexInformer - InformerHandlerRegistration cache.ResourceEventHandlerRegistration + Informer cache.SharedIndexInformer // The informer to attach to + InformerHandlerRegistration cache.ResourceEventHandlerRegistration // We save the Registration here to unregister when shutting down ResyncPeriod time.Duration - ReconcileFunc func(objName string) error + reconciler Reconciler } -// Should not be instiantiated in its own, use specific implementation of delayingWorkqueueController or RateLimitingWorkqueueController -func NewBaseController(ctx context.Context, informer cache.SharedIndexInformer, reconcileFunc func(objName string) error, resyncPeriod time.Duration) *BaseController { - return &BaseController{ - Workqueue: workqueue.New(), - Context: ctx, - Informer: informer, - ResyncPeriod: resyncPeriod, - ReconcileFunc: reconcileFunc, +// Should not be instiantiated on its own, use specific implementation of delayingWorkqueueController or RateLimitingWorkqueueController +func newBaseController(name string, ctx context.Context, informer cache.SharedIndexInformer, resyncPeriod time.Duration) *BaseController { + baseController := BaseController{ + name: name, + Context: ctx, + Informer: informer, + ResyncPeriod: resyncPeriod, } + return &baseController } func (c *BaseController) AddEventHandlerWithResyncPeriod() error { - glog.Info("Add EventHandlerWithResyncPeriod") + glog.V(4).Infof("Add EventHandlerWithResyncPeriod") reg, err := c.Informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: c.enqueue, UpdateFunc: func(old, new interface{}) { @@ -43,7 +49,7 @@ func (c *BaseController) AddEventHandlerWithResyncPeriod() error { }, c.ResyncPeriod) if err != nil { - glog.V(4).Infof("Event Handler could not be started. Aborting controller start") + glog.Errorf("Event Handler could not be started. Aborting controller start") return err } @@ -54,7 +60,7 @@ func (c *BaseController) AddEventHandlerWithResyncPeriod() error { // Override this method if you need to wait for other informers aswell func (c *BaseController) WaitForCacheSync(stopCh <-chan struct{}) error { // Wait for the caches to be synced before starting workers - glog.Info("Waiting for informer caches to sync") + glog.V(4).Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.Informer.HasSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } @@ -62,10 +68,17 @@ func (c *BaseController) WaitForCacheSync(stopCh <-chan struct{}) error { } func (c *BaseController) Run(stopCh <-chan struct{}) error { - defer c.Workqueue.ShutDown() + if c.workqueue == nil { + return fmt.Errorf("Workqueue not instantiated.") + } + if c.reconciler == nil { + return fmt.Errorf("Reconciler not instantiated. Call SetReconciler(r Reconciler) to set it") + } + + defer c.workqueue.ShutDown() defer c.stopWorker() - glog.Info("Starting controller") + glog.Info("Starting controller: %s", c.name) c.Started = true err := c.AddEventHandlerWithResyncPeriod() @@ -87,7 +100,7 @@ func (c *BaseController) Run(stopCh <-chan struct{}) error { } func (c *BaseController) enqueue(obj interface{}) { - if !c.Started || c.Workqueue == nil || c.Workqueue.ShuttingDown() { + if !c.Started || c.workqueue == nil || c.workqueue.ShuttingDown() { glog.V(4).Infof("Object is not being added to the workqueue while controller is not started") return } @@ -99,7 +112,7 @@ func (c *BaseController) enqueue(obj interface{}) { return } glog.V(4).Infof("Enqueueing: %s", key) - c.Workqueue.Add(key) + c.workqueue.Add(key) } func (c *BaseController) runWorker() { @@ -108,14 +121,14 @@ func (c *BaseController) runWorker() { } func (c *BaseController) processNextWorkItem() bool { - obj, shutdown := c.Workqueue.Get() + obj, shutdown := c.workqueue.Get() if shutdown { return false } err := func() error { - defer c.Workqueue.Done(obj) + defer c.workqueue.Done(obj) glog.V(8).Infof("processing next token in queue: %v", obj) _, objName, err := cache.SplitMetaNamespaceKey(obj.(string)) if err != nil { @@ -123,13 +136,13 @@ func (c *BaseController) processNextWorkItem() bool { return nil } - err = c.ReconcileFunc(objName) + err = c.reconciler.Reconcile(objName) if err != nil { glog.Error(err) } - glog.Infof("Successfully processed: %s", objName) + glog.V(8).Infof("Successfully processed: %s", objName) return nil }() @@ -143,3 +156,15 @@ func (c *BaseController) processNextWorkItem() bool { func (c *BaseController) stopWorker() { c.Started = false } + +func (c *BaseController) SetReconciler(r Reconciler) { + c.reconciler = r +} + +func (c *BaseController) SetWorkqueue(w workqueue.Interface) { + c.workqueue = w +} + +func (c *BaseController) GetWorkqueue() workqueue.Interface { + return c.workqueue +} diff --git a/v3/pkg/microservices/controller/delayingWorkqueueController.go b/v3/pkg/microservices/controller/delayingWorkqueueController.go new file mode 100644 index 00000000..535b14cd --- /dev/null +++ b/v3/pkg/microservices/controller/delayingWorkqueueController.go @@ -0,0 +1,32 @@ +package microservices + +import ( + "context" + "fmt" + "time" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type DelayingWorkqueueController struct { + BaseController +} + +func NewDelayingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, name string, resyncPeriod time.Duration) *DelayingWorkqueueController { + dwqc := &DelayingWorkqueueController{ + *newBaseController(name, ctx, informer, resyncPeriod), + } + + dwqc.SetWorkqueue(workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{Name: name})) + + return dwqc +} + +func (dwq *DelayingWorkqueueController) GetDelayingWorkqueue() (workqueue.DelayingInterface, error) { + delayingQueue, ok := dwq.GetWorkqueue().(workqueue.DelayingInterface) + if !ok { + return nil, fmt.Errorf("Workqueue is not a DelayingQueue") + } + return delayingQueue, nil +} diff --git a/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go b/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go new file mode 100644 index 00000000..3f34ac7d --- /dev/null +++ b/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go @@ -0,0 +1,32 @@ +package microservices + +import ( + "context" + "fmt" + "time" + + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type RateLimitingWorkqueueController struct { + BaseController +} + +func NewRateLimitingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, reconcileFunc func(objName string) error, name string, resyncPeriod time.Duration, rateLimiter workqueue.RateLimiter) *RateLimitingWorkqueueController { + rlwq := &RateLimitingWorkqueueController{ + *newBaseController(name, ctx, informer, resyncPeriod), + } + + rlwq.SetWorkqueue(workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{Name: name})) + + return rlwq +} + +func (rlwq *RateLimitingWorkqueueController) GetRateLimitingWorkqueue() (workqueue.RateLimitingInterface, error) { + rateLimitingQueue, ok := rlwq.GetWorkqueue().(workqueue.RateLimitingInterface) + if !ok { + return nil, fmt.Errorf("Workqueue is not a DelayingQueue") + } + return rateLimitingQueue, nil +} diff --git a/v3/pkg/microservices/delayingWorkqueueController.go b/v3/pkg/microservices/delayingWorkqueueController.go deleted file mode 100644 index 6dde6812..00000000 --- a/v3/pkg/microservices/delayingWorkqueueController.go +++ /dev/null @@ -1,32 +0,0 @@ -package microservices - -import ( - "context" - "fmt" - "time" - - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" -) - -type DelayingWorkqueueController struct { - BaseController -} - -func NewDelayingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, reconcileFunc func(objName string) error, queueName string, resyncPeriod time.Duration) *DelayingWorkqueueController { - dwqc := DelayingWorkqueueController{ - *NewBaseController(ctx, informer, reconcileFunc, resyncPeriod), - } - - dwqc.Workqueue = workqueue.NewNamedDelayingQueue(queueName) - - return &dwqc -} - -func (dwq DelayingWorkqueueController) GetDelayingWorkqueue() (workqueue.DelayingInterface, error) { - delayingQueue, ok := dwq.Workqueue.(workqueue.DelayingInterface) - if !ok { - return nil, fmt.Errorf("workqueue is not a DelayingQueue") - } - return delayingQueue, nil -} diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index d547a585..a1d03966 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -6,63 +6,77 @@ import ( hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" informerV1 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v1" - "github.com/hobbyfarm/gargantua/v3/pkg/microservices" + controllers "github.com/hobbyfarm/gargantua/v3/pkg/microservices/controller" + "github.com/hobbyfarm/gargantua/v3/pkg/util" "github.com/golang/glog" ) const ( - WORKQUEUE_NAME = "workqueue-token" + NAME = "token-controller" ) type TokenController struct { - microservices.DelayingWorkqueueController - prtLister informerV1.PasswordResetTokenLister + controllers.DelayingWorkqueueController + controllers.Reconciler + tokenLister informerV1.PasswordResetTokenLister } func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, ctx context.Context) (*TokenController, error) { - tokenController := TokenController{ - DelayingWorkqueueController: *microservices.NewDelayingWorkqueueController( + tokenController := &TokenController{ + DelayingWorkqueueController: *controllers.NewDelayingWorkqueueController( ctx, hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer(), - ReconcileTokenFunc, - WORKQUEUE_NAME, + NAME, 30*time.Minute), } - tokenController.prtLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() - return &tokenController, nil + tokenController.tokenLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() + tokenController.SetReconciler(tokenController) + + return tokenController, nil } -func ReconcileTokenFunc(objName string) error { +func (dwq *TokenController) Reconcile(objName string) error { glog.V(4).Infof("reconciling token inside specific token controller %s", objName) + token, err := dwq.tokenLister.PasswordResetTokens(util.GetReleaseNamespace()).Get(objName) - /* - passwordResetToken, err := c.prtLister.PasswordResetTokens(util2.GetReleaseNamespace()).Get(token) + if err != nil { + return err + } - if err != nil { - return err - } + tokenDuration, err := util.GetDurationWithDays(token.Spec.Duration) + if err != nil { + return err + } - now := time.Now() + tokenDurationParsed, err := time.ParseDuration(tokenDuration) + if err != nil { + return err + } - expires, err := time.Parse(time.UnixDate, passwordResetToken.Spec.Timestamp) - expires = expires.Add(TOKEN_MAX_VALID_DURATION) + now := time.Now() - if err != nil { - return err - } + expires, err := time.Parse(time.UnixDate, token.Spec.Timestamp) + expires = expires.Add(tokenDurationParsed) + + if err != nil { + return err + } - timeUntilExpires := expires.Sub(now) + timeUntilExpires := expires.Sub(now) - if timeUntilExpires < 0 { - glog.V(4).Infof("PRT %s seems to old, can be deleted", passwordResetToken.Name) - } else { - // requeue the token at the correct expiration time - glog.V(4).Infof("Requeueing PRT %s", passwordResetToken.Name) - c.workqueue.AddAfter(passwordResetToken, timeUntilExpires) + if timeUntilExpires < 0 { + glog.V(4).Infof("Token %s seems to old, can be deleted", token.Name) + } else { + // requeue the token at the correct expiration time + glog.V(4).Infof("Requeueing token %s as the duration is not reached", token.Name) + delayingWorkqueue, err := dwq.GetDelayingWorkqueue() + if err != nil { + return err } - */ + delayingWorkqueue.AddAfter(token, timeUntilExpires) + } return nil } From acb64568017bcf40dc42804d9ae8bcd5f3b53c6d Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Thu, 1 Feb 2024 16:49:51 +0100 Subject: [PATCH 07/12] Idea for threads --- v3/pkg/microservices/controller/controller.go | 14 ++++++++++++-- .../controllers/passwordResetTokenController.go | 2 +- v3/services/usersvc/main.go | 10 ++++++---- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/v3/pkg/microservices/controller/controller.go b/v3/pkg/microservices/controller/controller.go index f4cbe7e1..a0fb5894 100644 --- a/v3/pkg/microservices/controller/controller.go +++ b/v3/pkg/microservices/controller/controller.go @@ -25,6 +25,7 @@ type BaseController struct { InformerHandlerRegistration cache.ResourceEventHandlerRegistration // We save the Registration here to unregister when shutting down ResyncPeriod time.Duration reconciler Reconciler + threads int } // Should not be instiantiated on its own, use specific implementation of delayingWorkqueueController or RateLimitingWorkqueueController @@ -34,6 +35,7 @@ func newBaseController(name string, ctx context.Context, informer cache.SharedIn Context: ctx, Informer: informer, ResyncPeriod: resyncPeriod, + threads: 1, } return &baseController } @@ -78,7 +80,7 @@ func (c *BaseController) Run(stopCh <-chan struct{}) error { defer c.workqueue.ShutDown() defer c.stopWorker() - glog.Info("Starting controller: %s", c.name) + glog.Infof("Starting controller: %s", c.name) c.Started = true err := c.AddEventHandlerWithResyncPeriod() @@ -92,7 +94,10 @@ func (c *BaseController) Run(stopCh <-chan struct{}) error { return err } - go wait.Until(c.runWorker, time.Second, stopCh) + glog.Infof("Starting %d worker threads for %s", c.threads, c.name) + for i := 0; i < c.threads; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } <-stopCh glog.Info("Stopping base controller") @@ -116,6 +121,7 @@ func (c *BaseController) enqueue(obj interface{}) { } func (c *BaseController) runWorker() { + glog.V(4).Infof("Starting worker thread for %s", c.name) for c.processNextWorkItem() { } } @@ -168,3 +174,7 @@ func (c *BaseController) SetWorkqueue(w workqueue.Interface) { func (c *BaseController) GetWorkqueue() workqueue.Interface { return c.workqueue } + +func (c *BaseController) SetWorkerThreadCount(threads int) { + c.threads = threads +} diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index a1d03966..5d9ff886 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -38,7 +38,7 @@ func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInforme } func (dwq *TokenController) Reconcile(objName string) error { - glog.V(4).Infof("reconciling token inside specific token controller %s", objName) + glog.V(8).Infof("reconciling token inside specific token controller %s", objName) token, err := dwq.tokenLister.PasswordResetTokens(util.GetReleaseNamespace()).Get(objName) if err != nil { diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index 1f51113c..3ce606a3 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -104,10 +104,9 @@ func main() { hfInformerFactory.Start(stopInformerFactoryCh) microservices.ElectLeaderOrDie(microservices.User, cfg, ctx, stopControllersCh, func(c context.Context) { - glog.Info("Started being the leader. Starting controllers") - startControllers(ctx, hfClient, stopControllersCh) - if err != nil { - glog.Fatal(err) + _err := startControllers(ctx, hfClient, stopControllersCh) + if _err != nil { + glog.Fatal(_err) } }) @@ -115,6 +114,7 @@ func main() { } func startControllers(ctx context.Context, hfClient *versioned.Clientset, stopControllersCh <-chan struct{}) error { + glog.Info("Starting controllers") hfInformerFactory := hfInformers.NewSharedInformerFactoryWithOptions(hfClient, time.Second*30, hfInformers.WithNamespace(util.GetReleaseNamespace())) g, gctx := errgroup.WithContext(ctx) @@ -123,6 +123,8 @@ func startControllers(ctx context.Context, hfClient *versioned.Clientset, stopCo glog.Fatalf("starting passwordResetTokenController failed: %v", err) } + passwordResetTokenController.SetWorkerThreadCount(2) + g.Go(func() error { return passwordResetTokenController.Run(stopControllersCh) }) From 45ea8cfa593c9b5496c544fd9006cd1ee1bfd5b0 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Fri, 2 Feb 2024 22:19:18 +0100 Subject: [PATCH 08/12] Add distributed computing --- v3/pkg/microservices/controller/controller.go | 33 +++- .../controller/delayingWorkqueueController.go | 7 +- .../controller/distributedController.go | 155 ++++++++++++++++++ .../rateLimitingWorkqueueController.go | 7 +- v3/pkg/microservices/microservices.go | 15 +- v3/services/accesscodesvc/main.go | 2 +- v3/services/conversionsvc/main.go | 2 +- v3/services/rbacsvc/main.go | 2 +- v3/services/settingsvc/main.go | 2 +- .../passwordResetTokenController.go | 5 +- v3/services/usersvc/main.go | 22 +-- 11 files changed, 222 insertions(+), 30 deletions(-) create mode 100644 v3/pkg/microservices/controller/distributedController.go diff --git a/v3/pkg/microservices/controller/controller.go b/v3/pkg/microservices/controller/controller.go index a0fb5894..55d09143 100644 --- a/v3/pkg/microservices/controller/controller.go +++ b/v3/pkg/microservices/controller/controller.go @@ -16,6 +16,11 @@ type Reconciler interface { Reconcile(objName string) error } +// LoadScheduler probvides enqueue method to only enqueue objects it is suited for. +type LoadScheduler interface { + enqueue(obj interface{}) +} + type BaseController struct { name string workqueue workqueue.Interface @@ -25,29 +30,32 @@ type BaseController struct { InformerHandlerRegistration cache.ResourceEventHandlerRegistration // We save the Registration here to unregister when shutting down ResyncPeriod time.Duration reconciler Reconciler + loadScheduler LoadScheduler threads int } // Should not be instiantiated on its own, use specific implementation of delayingWorkqueueController or RateLimitingWorkqueueController func newBaseController(name string, ctx context.Context, informer cache.SharedIndexInformer, resyncPeriod time.Duration) *BaseController { - baseController := BaseController{ + baseController := &BaseController{ name: name, Context: ctx, Informer: informer, ResyncPeriod: resyncPeriod, threads: 1, } - return &baseController + + baseController.loadScheduler = baseController // Default is to schedule for itself + return baseController } func (c *BaseController) AddEventHandlerWithResyncPeriod() error { glog.V(4).Infof("Add EventHandlerWithResyncPeriod") reg, err := c.Informer.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ - AddFunc: c.enqueue, + AddFunc: c.loadScheduler.enqueue, UpdateFunc: func(old, new interface{}) { - c.enqueue(new) + c.loadScheduler.enqueue(new) }, - DeleteFunc: c.enqueue, + DeleteFunc: c.loadScheduler.enqueue, }, c.ResyncPeriod) if err != nil { @@ -69,7 +77,11 @@ func (c *BaseController) WaitForCacheSync(stopCh <-chan struct{}) error { return nil } -func (c *BaseController) Run(stopCh <-chan struct{}) error { +func (c *BaseController) run(stopCh <-chan struct{}) error { + if c.Started { + glog.Infof("Controller %s was already started. Not starting again", c.name) + return nil + } if c.workqueue == nil { return fmt.Errorf("Workqueue not instantiated.") } @@ -104,6 +116,7 @@ func (c *BaseController) Run(stopCh <-chan struct{}) error { return nil } +// enqueue will add an object to the local workqueue func (c *BaseController) enqueue(obj interface{}) { if !c.Started || c.workqueue == nil || c.workqueue.ShuttingDown() { glog.V(4).Infof("Object is not being added to the workqueue while controller is not started") @@ -120,12 +133,14 @@ func (c *BaseController) enqueue(obj interface{}) { c.workqueue.Add(key) } +// runWorker starts a simple worker that processes the workqueue inside a loop func (c *BaseController) runWorker() { glog.V(4).Infof("Starting worker thread for %s", c.name) for c.processNextWorkItem() { } } +// this method processes the next workqueue item. It calls the reconcile method on it func (c *BaseController) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() @@ -163,6 +178,7 @@ func (c *BaseController) stopWorker() { c.Started = false } +// Set the reconciler interface with the controller specific reconcile logic func (c *BaseController) SetReconciler(r Reconciler) { c.reconciler = r } @@ -171,10 +187,15 @@ func (c *BaseController) SetWorkqueue(w workqueue.Interface) { c.workqueue = w } +func (c *BaseController) SetWorkScheduler(s LoadScheduler) { + c.loadScheduler = s +} + func (c *BaseController) GetWorkqueue() workqueue.Interface { return c.workqueue } +// Set the thread count of workers processing the local queue. This has to be defined before starting the controller func (c *BaseController) SetWorkerThreadCount(threads int) { c.threads = threads } diff --git a/v3/pkg/microservices/controller/delayingWorkqueueController.go b/v3/pkg/microservices/controller/delayingWorkqueueController.go index 535b14cd..c2bd74e7 100644 --- a/v3/pkg/microservices/controller/delayingWorkqueueController.go +++ b/v3/pkg/microservices/controller/delayingWorkqueueController.go @@ -5,17 +5,18 @@ import ( "fmt" "time" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) type DelayingWorkqueueController struct { - BaseController + DistributedController } -func NewDelayingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, name string, resyncPeriod time.Duration) *DelayingWorkqueueController { +func NewDelayingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, kubeClient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *DelayingWorkqueueController { dwqc := &DelayingWorkqueueController{ - *newBaseController(name, ctx, informer, resyncPeriod), + *NewDistributedController(ctx, informer, kubeClient, name, resyncPeriod), } dwqc.SetWorkqueue(workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{Name: name})) diff --git a/v3/pkg/microservices/controller/distributedController.go b/v3/pkg/microservices/controller/distributedController.go new file mode 100644 index 00000000..66d1de2a --- /dev/null +++ b/v3/pkg/microservices/controller/distributedController.go @@ -0,0 +1,155 @@ +package microservices + +import ( + "context" + "crypto/md5" + "encoding/binary" + "fmt" + "io" + "os" + "strconv" + "strings" + "time" + + "github.com/golang/glog" + "github.com/hobbyfarm/gargantua/v3/pkg/util" + v1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type DistributedController struct { + BaseController + LoadScheduler + kubeClient *kubernetes.Clientset + statefulset_name string + replica_count int + replica_identity int +} + +func NewDistributedController(ctx context.Context, informer cache.SharedIndexInformer, kubeclient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *DistributedController { + dc := &DistributedController{ + BaseController: *newBaseController(name, ctx, informer, resyncPeriod), + kubeClient: kubeclient, + } + return dc +} + +func (c *DistributedController) enqueue(obj interface{}) { + if c.replica_identity > c.replica_count || c.replica_count == 0 { + // we have likely scaled down. No longer enqueue in this replica. + return + } + + // calculate the placement of the object + placement, err := c.getReplicaPlacement(obj) + + if err != nil { + glog.Errorf("Could not enqueue object due to error in placement calculation: %v", err) + return + } + + // is this object placed on this replica then enqueue it + if placement == c.replica_identity { + c.BaseController.enqueue(obj) + } +} + +// This method calculates on which replica an object needs to be reoconciled. +// It uses a hash of the objects name to guarantee an almost equally distribution between replicas. +func (c *DistributedController) getReplicaPlacement(obj interface{}) (int, error) { + hasher := md5.New() + var key string + var err error + // Get the objects cache name + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + glog.V(4).Infof("Error enquing %s: %v", key, err) + return -1, err + } + + // calc md5 hash of the key + _, err = io.WriteString(hasher, key) + if err != nil { + panic(err) + } + + //store the has as bytearray + hash := hasher.Sum(nil) + + // convert the hash into an integer by truncating it + truncatedHash := int(binary.BigEndian.Uint32(hash[:4])) + + if truncatedHash < 0 { + //Ensure only positive values are taken + truncatedHash = -truncatedHash + } + + // return the hash modulo the total replica count, this creates an almost equally distributed placement + return truncatedHash % c.replica_count, nil +} + +// RunDistributed will start a distributed controller concept +func (c *DistributedController) RunDistributed(stopCh <-chan struct{}) error { + c.statefulset_name = os.Getenv("STATEFULSET_NAME") + podIdentityName := os.Getenv("POD_IDENTITY") + + parts := strings.Split(podIdentityName, "-") + ordinalIndex, err := strconv.Atoi(parts[len(parts)-1]) + + if err != nil { + return fmt.Errorf("Error in getting a ordinal pod identity from string: %s", podIdentityName) + } + + c.replica_identity = ordinalIndex + + // client to watch for updates of the parent statefulset object + watchlist := cache.NewListWatchFromClient( + c.kubeClient.AppsV1().RESTClient(), + "statefulsets", + util.GetReleaseNamespace(), + fields.OneTermEqualSelector("metadata.name", c.statefulset_name), + ) + + // build an informer to watch updates on the parent statefulset and update the total number of replicas accordingly + _, controller := cache.NewInformer( + watchlist, + &v1.StatefulSet{}, + c.ResyncPeriod, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.handleStatefulsetUpdate(obj) + }, + DeleteFunc: func(obj interface{}) { + c.handleStatefulsetUpdate(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + c.handleStatefulsetUpdate(newObj) + }, + }, + ) + + go controller.Run(stopCh) + + glog.V(4).Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, controller.HasSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + return c.run(stopCh) +} + +// handle updates to the statefulset. set the replica count to the current total replica count +func (c *DistributedController) handleStatefulsetUpdate(obj interface{}) { + statefulset, ok := obj.(*v1.StatefulSet) + if !ok { + glog.V(4).Infof("Not a StatefulSet: %v", obj) + return + } + + replicaCount := int(*statefulset.Spec.Replicas) + if replicaCount != c.replica_count { + glog.V(8).Infof("Statefulset %s updated replica count from %d to %d replicas", statefulset.Name, c.replica_count, replicaCount) + c.replica_count = replicaCount + } +} diff --git a/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go b/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go index 3f34ac7d..e807340e 100644 --- a/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go +++ b/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go @@ -5,17 +5,18 @@ import ( "fmt" "time" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" ) type RateLimitingWorkqueueController struct { - BaseController + DistributedController } -func NewRateLimitingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, reconcileFunc func(objName string) error, name string, resyncPeriod time.Duration, rateLimiter workqueue.RateLimiter) *RateLimitingWorkqueueController { +func NewRateLimitingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, kubeClient *kubernetes.Clientset, reconcileFunc func(objName string) error, name string, resyncPeriod time.Duration, rateLimiter workqueue.RateLimiter) *RateLimitingWorkqueueController { rlwq := &RateLimitingWorkqueueController{ - *newBaseController(name, ctx, informer, resyncPeriod), + *NewDistributedController(ctx, informer, kubeClient, name, resyncPeriod), } rlwq.SetWorkqueue(workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{Name: name})) diff --git a/v3/pkg/microservices/microservices.go b/v3/pkg/microservices/microservices.go index 1f2fd3fa..941c86a2 100644 --- a/v3/pkg/microservices/microservices.go +++ b/v3/pkg/microservices/microservices.go @@ -24,9 +24,9 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/tools/leaderelection" ) // Add type MicroService based on string that is used to define constants for every service @@ -292,7 +292,7 @@ func buildTLSServerCredentials(certPath string, keyPath string) (credentials.Tra }), nil } -func BuildClusterConfig(serviceConfig *ServiceConfig) (*rest.Config, *hfClientset.Clientset) { +func BuildClusterConfig(serviceConfig *ServiceConfig) (*rest.Config, *hfClientset.Clientset, *kubernetes.Clientset) { const ( ClientGoQPS = 100 ClientGoBurst = 100 @@ -312,7 +312,12 @@ func BuildClusterConfig(serviceConfig *ServiceConfig) (*rest.Config, *hfClientse glog.Fatal(err) } - return cfg, hfClient + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + glog.Fatalf("Error building kubernetes clientset: %s", err.Error()) + } + + return cfg, hfClient, kubeClient } // ParseFlags declares the flags and parses them, then returns a ServiceConfig struct. @@ -345,6 +350,8 @@ func BuildServiceConfig() *ServiceConfig { return cfg } +/* + type onStartedLeading func(context.Context) func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, stopControllersCh chan<- struct{}, onStartedLeadingFunc onStartedLeading) { @@ -377,3 +384,5 @@ func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, s }, }) } + +*/ diff --git a/v3/services/accesscodesvc/main.go b/v3/services/accesscodesvc/main.go index 05bab209..6f8db66e 100644 --- a/v3/services/accesscodesvc/main.go +++ b/v3/services/accesscodesvc/main.go @@ -21,7 +21,7 @@ func init() { } func main() { - cfg, hfClient := microservices.BuildClusterConfig(serviceConfig) + cfg, hfClient, _ := microservices.BuildClusterConfig(serviceConfig) crds := accesscodeservice.GenerateAccessCodeCRD() glog.Info("installing/updating access code CRDs") diff --git a/v3/services/conversionsvc/main.go b/v3/services/conversionsvc/main.go index 82e0ec28..2d40d5c6 100644 --- a/v3/services/conversionsvc/main.go +++ b/v3/services/conversionsvc/main.go @@ -25,7 +25,7 @@ func init() { } func main() { - cfg, hfClient := microservices.BuildClusterConfig(serviceConfig) + cfg, hfClient, _ := microservices.BuildClusterConfig(serviceConfig) ca, err := os.ReadFile(serviceConfig.TLSCA) if err != nil { diff --git a/v3/services/rbacsvc/main.go b/v3/services/rbacsvc/main.go index 6914e106..7f482f97 100644 --- a/v3/services/rbacsvc/main.go +++ b/v3/services/rbacsvc/main.go @@ -30,7 +30,7 @@ func init() { } func main() { - cfg, _ := microservices.BuildClusterConfig(serviceConfig) + cfg, _, _ := microservices.BuildClusterConfig(serviceConfig) // self manage default rbac roles if installRBACRoles { diff --git a/v3/services/settingsvc/main.go b/v3/services/settingsvc/main.go index c2225cf9..8228b418 100644 --- a/v3/services/settingsvc/main.go +++ b/v3/services/settingsvc/main.go @@ -30,7 +30,7 @@ func init() { } func main() { - cfg, hfClient := microservices.BuildClusterConfig(serviceConfig) + cfg, hfClient, _ := microservices.BuildClusterConfig(serviceConfig) namespace := util.GetReleaseNamespace() diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index 5d9ff886..b47b6ae6 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -8,6 +8,7 @@ import ( informerV1 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v1" controllers "github.com/hobbyfarm/gargantua/v3/pkg/microservices/controller" "github.com/hobbyfarm/gargantua/v3/pkg/util" + "k8s.io/client-go/kubernetes" "github.com/golang/glog" ) @@ -22,17 +23,19 @@ type TokenController struct { tokenLister informerV1.PasswordResetTokenLister } -func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, ctx context.Context) (*TokenController, error) { +func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, kubeClient *kubernetes.Clientset, ctx context.Context) (*TokenController, error) { tokenController := &TokenController{ DelayingWorkqueueController: *controllers.NewDelayingWorkqueueController( ctx, hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Informer(), + kubeClient, NAME, 30*time.Minute), } tokenController.tokenLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() tokenController.SetReconciler(tokenController) + tokenController.SetWorkScheduler(tokenController) return tokenController, nil } diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index 3ce606a3..2e1a428b 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -12,6 +12,7 @@ import ( "github.com/hobbyfarm/gargantua/v3/pkg/signals" "github.com/hobbyfarm/gargantua/v3/pkg/util" "golang.org/x/sync/errgroup" + "k8s.io/client-go/kubernetes" "github.com/golang/glog" userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal" @@ -34,7 +35,7 @@ func init() { } func main() { - cfg, hfClient := microservices.BuildClusterConfig(serviceConfig) + cfg, hfClient, kubeClient := microservices.BuildClusterConfig(serviceConfig) namespace := util.GetReleaseNamespace() hfInformerFactory := hfInformers.NewSharedInformerFactoryWithOptions(hfClient, time.Second*30, hfInformers.WithNamespace(namespace)) @@ -103,22 +104,20 @@ func main() { stopInformerFactoryCh := signals.SetupSignalHandler() hfInformerFactory.Start(stopInformerFactoryCh) - microservices.ElectLeaderOrDie(microservices.User, cfg, ctx, stopControllersCh, func(c context.Context) { - _err := startControllers(ctx, hfClient, stopControllersCh) - if _err != nil { - glog.Fatal(_err) - } - }) + _err := startControllers(ctx, hfClient, kubeClient, stopControllersCh) + if _err != nil { + glog.Fatal(_err) + } wg.Wait() } -func startControllers(ctx context.Context, hfClient *versioned.Clientset, stopControllersCh <-chan struct{}) error { +func startControllers(ctx context.Context, hfClient *versioned.Clientset, kubeClient *kubernetes.Clientset, stopControllersCh <-chan struct{}) error { glog.Info("Starting controllers") hfInformerFactory := hfInformers.NewSharedInformerFactoryWithOptions(hfClient, time.Second*30, hfInformers.WithNamespace(util.GetReleaseNamespace())) g, gctx := errgroup.WithContext(ctx) - passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, gctx) + passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, kubeClient, gctx) if err != nil { glog.Fatalf("starting passwordResetTokenController failed: %v", err) } @@ -126,9 +125,12 @@ func startControllers(ctx context.Context, hfClient *versioned.Clientset, stopCo passwordResetTokenController.SetWorkerThreadCount(2) g.Go(func() error { - return passwordResetTokenController.Run(stopControllersCh) + // TODO replica name + return passwordResetTokenController.RunDistributed(stopControllersCh) }) + // TODO start informer for ReplicaSets + hfInformerFactory.Start(stopControllersCh) if err = g.Wait(); err != nil { From 973ad7548ecc13a3a5bcdec9be5a43b12e5fa915 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Tue, 6 Feb 2024 08:39:12 +0100 Subject: [PATCH 09/12] Add shardedController --- .../controller/delayingWorkqueueController.go | 4 +- .../rateLimitingWorkqueueController.go | 4 +- ...utedController.go => shardedController.go} | 72 +++++++++++-------- v3/pkg/microservices/microservices.go | 47 ++++-------- v3/services/usersvc/main.go | 52 ++++---------- 5 files changed, 75 insertions(+), 104 deletions(-) rename v3/pkg/microservices/controller/{distributedController.go => shardedController.go} (72%) diff --git a/v3/pkg/microservices/controller/delayingWorkqueueController.go b/v3/pkg/microservices/controller/delayingWorkqueueController.go index c2bd74e7..39e5e367 100644 --- a/v3/pkg/microservices/controller/delayingWorkqueueController.go +++ b/v3/pkg/microservices/controller/delayingWorkqueueController.go @@ -11,12 +11,12 @@ import ( ) type DelayingWorkqueueController struct { - DistributedController + ShardedController } func NewDelayingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, kubeClient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *DelayingWorkqueueController { dwqc := &DelayingWorkqueueController{ - *NewDistributedController(ctx, informer, kubeClient, name, resyncPeriod), + *NewShardedController(ctx, informer, kubeClient, name, resyncPeriod), } dwqc.SetWorkqueue(workqueue.NewDelayingQueueWithConfig(workqueue.DelayingQueueConfig{Name: name})) diff --git a/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go b/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go index e807340e..078fa630 100644 --- a/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go +++ b/v3/pkg/microservices/controller/rateLimitingWorkqueueController.go @@ -11,12 +11,12 @@ import ( ) type RateLimitingWorkqueueController struct { - DistributedController + ShardedController } func NewRateLimitingWorkqueueController(ctx context.Context, informer cache.SharedIndexInformer, kubeClient *kubernetes.Clientset, reconcileFunc func(objName string) error, name string, resyncPeriod time.Duration, rateLimiter workqueue.RateLimiter) *RateLimitingWorkqueueController { rlwq := &RateLimitingWorkqueueController{ - *NewDistributedController(ctx, informer, kubeClient, name, resyncPeriod), + *NewShardedController(ctx, informer, kubeClient, name, resyncPeriod), } rlwq.SetWorkqueue(workqueue.NewRateLimitingQueueWithConfig(rateLimiter, workqueue.RateLimitingQueueConfig{Name: name})) diff --git a/v3/pkg/microservices/controller/distributedController.go b/v3/pkg/microservices/controller/shardedController.go similarity index 72% rename from v3/pkg/microservices/controller/distributedController.go rename to v3/pkg/microservices/controller/shardedController.go index 66d1de2a..6de52e7f 100644 --- a/v3/pkg/microservices/controller/distributedController.go +++ b/v3/pkg/microservices/controller/shardedController.go @@ -6,7 +6,6 @@ import ( "encoding/binary" "fmt" "io" - "os" "strconv" "strings" "time" @@ -19,7 +18,7 @@ import ( "k8s.io/client-go/tools/cache" ) -type DistributedController struct { +type ShardedController struct { BaseController LoadScheduler kubeClient *kubernetes.Clientset @@ -28,37 +27,39 @@ type DistributedController struct { replica_identity int } -func NewDistributedController(ctx context.Context, informer cache.SharedIndexInformer, kubeclient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *DistributedController { - dc := &DistributedController{ +func NewShardedController(ctx context.Context, informer cache.SharedIndexInformer, kubeclient *kubernetes.Clientset, name string, resyncPeriod time.Duration) *ShardedController { + dc := &ShardedController{ BaseController: *newBaseController(name, ctx, informer, resyncPeriod), kubeClient: kubeclient, } return dc } -func (c *DistributedController) enqueue(obj interface{}) { +// This method calculates if the object will be reconciled on this shard and adds it to the local queue +// otherwise it will be ignored +func (c *ShardedController) enqueue(obj interface{}) { if c.replica_identity > c.replica_count || c.replica_count == 0 { // we have likely scaled down. No longer enqueue in this replica. return } // calculate the placement of the object - placement, err := c.getReplicaPlacement(obj) + shardPlacement, err := c.getShardPlacement(obj) if err != nil { glog.Errorf("Could not enqueue object due to error in placement calculation: %v", err) return } - // is this object placed on this replica then enqueue it - if placement == c.replica_identity { + // if this object is placed on this replica then enqueue it + if shardPlacement == c.replica_identity { c.BaseController.enqueue(obj) } } // This method calculates on which replica an object needs to be reoconciled. // It uses a hash of the objects name to guarantee an almost equally distribution between replicas. -func (c *DistributedController) getReplicaPlacement(obj interface{}) (int, error) { +func (c *ShardedController) getShardPlacement(obj interface{}) (int, error) { hasher := md5.New() var key string var err error @@ -90,9 +91,9 @@ func (c *DistributedController) getReplicaPlacement(obj interface{}) (int, error } // RunDistributed will start a distributed controller concept -func (c *DistributedController) RunDistributed(stopCh <-chan struct{}) error { - c.statefulset_name = os.Getenv("STATEFULSET_NAME") - podIdentityName := os.Getenv("POD_IDENTITY") +func (c *ShardedController) RunSharded(stopCh <-chan struct{}, statefulSetName string, shardIdentity string) error { + c.statefulset_name = statefulSetName + podIdentityName := shardIdentity parts := strings.Split(podIdentityName, "-") ordinalIndex, err := strconv.Atoi(parts[len(parts)-1]) @@ -100,9 +101,33 @@ func (c *DistributedController) RunDistributed(stopCh <-chan struct{}) error { if err != nil { return fmt.Errorf("Error in getting a ordinal pod identity from string: %s", podIdentityName) } - c.replica_identity = ordinalIndex + err = c.watchStatefulSetUpdates(stopCh) + if err != nil { + return err + } + + return c.run(stopCh) +} + +// handle updates to the statefulset. set the replica count to the current total replica count +func (c *ShardedController) handleStatefulsetUpdate(obj interface{}) { + statefulset, ok := obj.(*v1.StatefulSet) + if !ok { + glog.V(4).Infof("Not a StatefulSet: %v", obj) + return + } + + replicaCount := int(*statefulset.Spec.Replicas) + if replicaCount != c.replica_count { + glog.V(8).Infof("Statefulset %s updated replica count from %d to %d replicas", statefulset.Name, c.replica_count, replicaCount) + c.replica_count = replicaCount + } +} + +// handle updates to the parent StatefulSet of this replica. Set the replica count to the current total replica count +func (c *ShardedController) watchStatefulSetUpdates(stopCh <-chan struct{}) error { // client to watch for updates of the parent statefulset object watchlist := cache.NewListWatchFromClient( c.kubeClient.AppsV1().RESTClient(), @@ -131,25 +156,10 @@ func (c *DistributedController) RunDistributed(stopCh <-chan struct{}) error { go controller.Run(stopCh) - glog.V(4).Info("Waiting for informer caches to sync") + glog.V(4).Info("Waiting for StatefulSet Informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, controller.HasSynced); !ok { - return fmt.Errorf("failed to wait for caches to sync") - } - - return c.run(stopCh) -} - -// handle updates to the statefulset. set the replica count to the current total replica count -func (c *DistributedController) handleStatefulsetUpdate(obj interface{}) { - statefulset, ok := obj.(*v1.StatefulSet) - if !ok { - glog.V(4).Infof("Not a StatefulSet: %v", obj) - return + return fmt.Errorf("failed to wait for StatefulSet Informer caches to sync") } - replicaCount := int(*statefulset.Spec.Replicas) - if replicaCount != c.replica_count { - glog.V(8).Infof("Statefulset %s updated replica count from %d to %d replicas", statefulset.Name, c.replica_count, replicaCount) - c.replica_count = replicaCount - } + return nil } diff --git a/v3/pkg/microservices/microservices.go b/v3/pkg/microservices/microservices.go index 941c86a2..aba028fb 100644 --- a/v3/pkg/microservices/microservices.go +++ b/v3/pkg/microservices/microservices.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "strconv" "sync" "time" @@ -63,6 +64,7 @@ const ( defaultGrpcPort string = "8080" defaultApiPort string = "80" InitialConnectionTimeout time.Duration = 30 * time.Second + defaultThreadCount int = 1 ) var CORS_ALLOWED_METHODS_ALL = [...]string{"GET", "POST", "PUT", "HEAD", "OPTIONS", "DELETE"} @@ -350,39 +352,20 @@ func BuildServiceConfig() *ServiceConfig { return cfg } -/* - -type onStartedLeading func(context.Context) +// This method tries to retreive the controller thread count from an environment variable CONTROLLER_THREAD_COUNT +// otherwise it sets the thread count to the default value +func GetWorkerThreadCount() int { + workerThreadCountString := os.Getenv("CONTROLLER_THREAD_COUNT") + workerThreads := defaultThreadCount + if workerThreadCountString != "" { + i, err := strconv.Atoi(workerThreadCountString) + if err != nil { + glog.Infof("Error parsing env var CONTROLLER_THREAD_COUNT, using default thread count %d", workerThreads) + } else { -func ElectLeaderOrDie(svc MicroService, cfg *rest.Config, ctx context.Context, stopControllersCh chan<- struct{}, onStartedLeadingFunc onStartedLeading) { - lock, err := util.GetLock(string(svc), cfg) - if err != nil { - glog.Fatal(err) + } + workerThreads = i } - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - ReleaseOnCancel: true, - LeaseDuration: 10 * time.Second, - RenewDeadline: 5 * time.Second, - RetryPeriod: 2 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: onStartedLeadingFunc, - OnStoppedLeading: func() { - // Need to start informer factory since even when not leader to ensure api layer - // keeps working. - glog.Info("Stopped being the leader. Shutting down controllers") - stopControllersCh <- struct{}{} // Send the stopControllers Signal - }, - OnNewLeader: func(current_id string) { - if current_id == lock.Identity() { - glog.Info("I am currently the leader") - return - } - glog.Infof("Leader changed to %s", current_id) - }, - }, - }) + return workerThreads } - -*/ diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index 2e1a428b..3bd56b72 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -11,13 +11,10 @@ import ( "github.com/hobbyfarm/gargantua/v3/pkg/microservices" "github.com/hobbyfarm/gargantua/v3/pkg/signals" "github.com/hobbyfarm/gargantua/v3/pkg/util" - "golang.org/x/sync/errgroup" - "k8s.io/client-go/kubernetes" "github.com/golang/glog" userservice "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal" userservicecontroller "github.com/hobbyfarm/gargantua/services/usersvc/v3/internal/controllers" - "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned" hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" "github.com/hobbyfarm/gargantua/v3/protos/authn" @@ -81,6 +78,12 @@ func main() { user.RegisterUserSvcServer(gs, us) + passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, kubeClient, ctx) + if err != nil { + glog.Fatalf("creating passwordResetTokenController failed: %v", err) + } + passwordResetTokenController.SetWorkerThreadCount(microservices.GetWorkerThreadCount()) + var wg sync.WaitGroup wg.Add(1) @@ -99,43 +102,18 @@ func main() { microservices.StartAPIServer(userServer) }() - stopControllersCh := make(chan struct{}) + go func() { + defer wg.Done() + glog.Info("Starting controllers") + stopControllersCh := make(chan struct{}) + err := passwordResetTokenController.RunSharded(stopControllersCh, os.Getenv("STATEFULSET_NAME"), os.Getenv("POD_IDENTITY")) + if err != nil { + glog.Errorf("Error starting up the controllers: %v", err) + } + }() stopInformerFactoryCh := signals.SetupSignalHandler() hfInformerFactory.Start(stopInformerFactoryCh) - _err := startControllers(ctx, hfClient, kubeClient, stopControllersCh) - if _err != nil { - glog.Fatal(_err) - } - wg.Wait() } - -func startControllers(ctx context.Context, hfClient *versioned.Clientset, kubeClient *kubernetes.Clientset, stopControllersCh <-chan struct{}) error { - glog.Info("Starting controllers") - hfInformerFactory := hfInformers.NewSharedInformerFactoryWithOptions(hfClient, time.Second*30, hfInformers.WithNamespace(util.GetReleaseNamespace())) - g, gctx := errgroup.WithContext(ctx) - - passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, kubeClient, gctx) - if err != nil { - glog.Fatalf("starting passwordResetTokenController failed: %v", err) - } - - passwordResetTokenController.SetWorkerThreadCount(2) - - g.Go(func() error { - // TODO replica name - return passwordResetTokenController.RunDistributed(stopControllersCh) - }) - - // TODO start informer for ReplicaSets - - hfInformerFactory.Start(stopControllersCh) - - if err = g.Wait(); err != nil { - glog.Errorf("Error starting up the controllers: %v", err) - return err - } - return nil -} From 1a8b7d095a0e6a1fecc1a2e202c93d06a5cf1a11 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Tue, 6 Feb 2024 09:58:15 +0100 Subject: [PATCH 10/12] Use lister instead of hfclient --- .../controller/shardedController.go | 2 +- v3/services/usersvc/internal/grpc.go | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/v3/pkg/microservices/controller/shardedController.go b/v3/pkg/microservices/controller/shardedController.go index 6de52e7f..2d966a3d 100644 --- a/v3/pkg/microservices/controller/shardedController.go +++ b/v3/pkg/microservices/controller/shardedController.go @@ -90,7 +90,7 @@ func (c *ShardedController) getShardPlacement(obj interface{}) (int, error) { return truncatedHash % c.replica_count, nil } -// RunDistributed will start a distributed controller concept +// RunSharded will start a sharded controller that watches the parent StatefulSet and applies sharding based on the total replica count func (c *ShardedController) RunSharded(stopCh <-chan struct{}, statefulSetName string, shardIdentity string) error { c.statefulset_name = statefulSetName podIdentityName := shardIdentity diff --git a/v3/services/usersvc/internal/grpc.go b/v3/services/usersvc/internal/grpc.go index 7ccd94ff..2ac01ea5 100644 --- a/v3/services/usersvc/internal/grpc.go +++ b/v3/services/usersvc/internal/grpc.go @@ -12,6 +12,7 @@ import ( hfv2 "github.com/hobbyfarm/gargantua/v3/pkg/apis/hobbyfarm.io/v2" hfClientset "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned" hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" + listerv2 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v2" "github.com/hobbyfarm/gargantua/v3/pkg/util" userProto "github.com/hobbyfarm/gargantua/v3/protos/user" "golang.org/x/crypto/bcrypt" @@ -19,6 +20,7 @@ import ( "google.golang.org/grpc/status" empty "google.golang.org/protobuf/types/known/emptypb" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" ) @@ -31,6 +33,7 @@ type GrpcUserServer struct { userProto.UnimplementedUserSvcServer hfClientSet hfClientset.Interface userIndexer cache.Indexer + userLister listerv2.UserLister ctx context.Context } @@ -45,6 +48,7 @@ func NewGrpcUserServer(hfClientSet hfClientset.Interface, hfInformerFactory hfIn return &GrpcUserServer{ hfClientSet: hfClientSet, userIndexer: inf.GetIndexer(), + userLister: hfInformerFactory.Hobbyfarm().V2().Users().Lister(), ctx: ctx, }, nil } @@ -137,7 +141,7 @@ func (u *GrpcUserServer) getUser(id string) (*userProto.User, error) { if len(id) == 0 { return &userProto.User{}, fmt.Errorf("user id passed in was empty") } - obj, err := u.hfClientSet.HobbyfarmV2().Users(util.GetReleaseNamespace()).Get(u.ctx, id, metav1.GetOptions{}) + obj, err := u.userLister.Users(util.GetReleaseNamespace()).Get(id) if err != nil { return &userProto.User{}, fmt.Errorf("error while retrieving User by id: %s with error: %v", id, err) } @@ -184,7 +188,8 @@ func (u *GrpcUserServer) GetUserById(ctx context.Context, gur *userProto.UserId) } func (u *GrpcUserServer) ListUser(ctx context.Context, empty *empty.Empty) (*userProto.ListUsersResponse, error) { - users, err := u.hfClientSet.HobbyfarmV2().Users(util.GetReleaseNamespace()).List(u.ctx, metav1.ListOptions{}) + //users, err := u.hfClientSet.HobbyfarmV2().Users(util.GetReleaseNamespace()).List(u.ctx, metav1.ListOptions{}) + users, err := u.userLister.Users(util.GetReleaseNamespace()).List(labels.Everything()) if err != nil { glog.Errorf("error while retrieving users %v", err) @@ -196,7 +201,7 @@ func (u *GrpcUserServer) ListUser(ctx context.Context, empty *empty.Empty) (*use } preparedUsers := []*userProto.User{} // must be declared this way so as to JSON marshal into [] instead of null - for _, s := range users.Items { + for _, s := range users { preparedUsers = append(preparedUsers, &userProto.User{ Id: s.Name, Email: s.Spec.Email, @@ -226,7 +231,7 @@ func (u *GrpcUserServer) UpdateUser(ctx context.Context, userRequest *userProto. } retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - user, err := u.hfClientSet.HobbyfarmV2().Users(util.GetReleaseNamespace()).Get(u.ctx, id, metav1.GetOptions{}) + user, err := u.userLister.Users(util.GetReleaseNamespace()).Get(id) if err != nil { newErr := status.Newf( codes.Internal, @@ -294,7 +299,7 @@ func (u *GrpcUserServer) UpdateAccessCodes(ctx context.Context, updateAccessCode } retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error { - user, err := u.hfClientSet.HobbyfarmV2().Users(util.GetReleaseNamespace()).Get(u.ctx, id, metav1.GetOptions{}) + user, err := u.userLister.Users(util.GetReleaseNamespace()).Get(id) if err != nil { newErr := status.Newf( codes.Internal, @@ -414,7 +419,7 @@ func (u *GrpcUserServer) DeleteUser(c context.Context, userId *userProto.UserId) return &empty.Empty{}, newErr.Err() } - user, err := u.hfClientSet.HobbyfarmV2().Users(util.GetReleaseNamespace()).Get(u.ctx, id, metav1.GetOptions{}) + user, err := u.userLister.Users(util.GetReleaseNamespace()).Get(id) if err != nil { newErr := status.Newf( codes.Internal, From 047d906e2400368c612265a1cfd95b200c721165 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Wed, 7 Feb 2024 14:20:52 +0100 Subject: [PATCH 11/12] Change to FNV --- v3/pkg/microservices/controller/shardedController.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v3/pkg/microservices/controller/shardedController.go b/v3/pkg/microservices/controller/shardedController.go index 2d966a3d..a1c062d2 100644 --- a/v3/pkg/microservices/controller/shardedController.go +++ b/v3/pkg/microservices/controller/shardedController.go @@ -2,9 +2,9 @@ package microservices import ( "context" - "crypto/md5" "encoding/binary" "fmt" + "hash/fnv" "io" "strconv" "strings" @@ -60,7 +60,7 @@ func (c *ShardedController) enqueue(obj interface{}) { // This method calculates on which replica an object needs to be reoconciled. // It uses a hash of the objects name to guarantee an almost equally distribution between replicas. func (c *ShardedController) getShardPlacement(obj interface{}) (int, error) { - hasher := md5.New() + hasher := fnv.New32a() var key string var err error // Get the objects cache name From 7648d25b3340f737a3756b9a7c73e28477fe1a04 Mon Sep 17 00:00:00 2001 From: Jan-Gerrit Goebel Date: Thu, 8 Feb 2024 16:53:38 +0100 Subject: [PATCH 12/12] AddAfter should be string --- v3/pkg/microservices/controller/shardedController.go | 8 ++++++-- .../controllers/passwordResetTokenController.go | 11 ++++++++--- v3/services/usersvc/main.go | 4 ++-- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/v3/pkg/microservices/controller/shardedController.go b/v3/pkg/microservices/controller/shardedController.go index a1c062d2..ed576185 100644 --- a/v3/pkg/microservices/controller/shardedController.go +++ b/v3/pkg/microservices/controller/shardedController.go @@ -6,6 +6,7 @@ import ( "fmt" "hash/fnv" "io" + "os" "strconv" "strings" "time" @@ -91,9 +92,12 @@ func (c *ShardedController) getShardPlacement(obj interface{}) (int, error) { } // RunSharded will start a sharded controller that watches the parent StatefulSet and applies sharding based on the total replica count -func (c *ShardedController) RunSharded(stopCh <-chan struct{}, statefulSetName string, shardIdentity string) error { +func (c *ShardedController) RunSharded(stopCh <-chan struct{}, statefulSetName string) error { c.statefulset_name = statefulSetName - podIdentityName := shardIdentity + podIdentityName, err := os.Hostname() + if err != nil { + return fmt.Errorf("Error in getting Hostname") + } parts := strings.Split(podIdentityName, "-") ordinalIndex, err := strconv.Atoi(parts[len(parts)-1]) diff --git a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go index b47b6ae6..71a40b02 100644 --- a/v3/services/usersvc/internal/controllers/passwordResetTokenController.go +++ b/v3/services/usersvc/internal/controllers/passwordResetTokenController.go @@ -4,10 +4,12 @@ import ( "context" "time" + "github.com/hobbyfarm/gargantua/v3/pkg/client/clientset/versioned" hfInformers "github.com/hobbyfarm/gargantua/v3/pkg/client/informers/externalversions" informerV1 "github.com/hobbyfarm/gargantua/v3/pkg/client/listers/hobbyfarm.io/v1" controllers "github.com/hobbyfarm/gargantua/v3/pkg/microservices/controller" "github.com/hobbyfarm/gargantua/v3/pkg/util" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "github.com/golang/glog" @@ -21,9 +23,10 @@ type TokenController struct { controllers.DelayingWorkqueueController controllers.Reconciler tokenLister informerV1.PasswordResetTokenLister + hfClient *versioned.Clientset } -func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, kubeClient *kubernetes.Clientset, ctx context.Context) (*TokenController, error) { +func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInformerFactory, kubeClient *kubernetes.Clientset, hfClient *versioned.Clientset, ctx context.Context) (*TokenController, error) { tokenController := &TokenController{ DelayingWorkqueueController: *controllers.NewDelayingWorkqueueController( ctx, @@ -31,9 +34,10 @@ func NewPasswordResetTokenController(hfInformerFactory hfInformers.SharedInforme kubeClient, NAME, 30*time.Minute), + hfClient: hfClient, + tokenLister: hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister(), } - tokenController.tokenLister = hfInformerFactory.Hobbyfarm().V1().PasswordResetTokens().Lister() tokenController.SetReconciler(tokenController) tokenController.SetWorkScheduler(tokenController) @@ -71,6 +75,7 @@ func (dwq *TokenController) Reconcile(objName string) error { if timeUntilExpires < 0 { glog.V(4).Infof("Token %s seems to old, can be deleted", token.Name) + dwq.hfClient.HobbyfarmV1().PasswordResetTokens(util.GetReleaseNamespace()).Delete(dwq.Context, token.Name, metav1.DeleteOptions{}) } else { // requeue the token at the correct expiration time glog.V(4).Infof("Requeueing token %s as the duration is not reached", token.Name) @@ -78,7 +83,7 @@ func (dwq *TokenController) Reconcile(objName string) error { if err != nil { return err } - delayingWorkqueue.AddAfter(token, timeUntilExpires) + delayingWorkqueue.AddAfter(token.Name, timeUntilExpires) } return nil diff --git a/v3/services/usersvc/main.go b/v3/services/usersvc/main.go index 3bd56b72..147298d9 100644 --- a/v3/services/usersvc/main.go +++ b/v3/services/usersvc/main.go @@ -78,7 +78,7 @@ func main() { user.RegisterUserSvcServer(gs, us) - passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, kubeClient, ctx) + passwordResetTokenController, err := userservicecontroller.NewPasswordResetTokenController(hfInformerFactory, kubeClient, hfClient, ctx) if err != nil { glog.Fatalf("creating passwordResetTokenController failed: %v", err) } @@ -106,7 +106,7 @@ func main() { defer wg.Done() glog.Info("Starting controllers") stopControllersCh := make(chan struct{}) - err := passwordResetTokenController.RunSharded(stopControllersCh, os.Getenv("STATEFULSET_NAME"), os.Getenv("POD_IDENTITY")) + err := passwordResetTokenController.RunSharded(stopControllersCh, os.Getenv("STATEFULSET_NAME")) if err != nil { glog.Errorf("Error starting up the controllers: %v", err) }