Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Change resource key logic for k8s" #4997

Merged
merged 2 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/app/piped/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func (p *planner) Run(ctx context.Context) error {
in := pln.Input{
ApplicationID: p.deployment.ApplicationId,
ApplicationName: p.deployment.ApplicationName,
PlatformProviderName: p.deployment.PlatformProvider,
GitPath: *p.deployment.GitPath,
Trigger: *p.deployment.Trigger,
MostRecentSuccessfulCommitHash: p.lastSuccessfulCommitHash,
Expand Down
17 changes: 5 additions & 12 deletions pkg/app/piped/driftdetector/kubernetes/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/pipe-cd/pipecd/pkg/app/piped/livestatestore/kubernetes"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/kubernetes"
Expand Down Expand Up @@ -122,12 +121,6 @@ func (d *detector) Run(ctx context.Context) error {
}

func (d *detector) check(ctx context.Context) {
isNamespacedResources, err := provider.GetIsNamespacedResources(d.provider.KubernetesConfig)
if err != nil {
d.logger.Error("failed to get isNamespacedResources", zap.Error(err))
return
}

appsByRepo := d.listGroupedApplication()

for repoID, apps := range appsByRepo {
Expand Down Expand Up @@ -173,16 +166,16 @@ func (d *detector) check(ctx context.Context) {

// Start checking all applications in this repository.
for _, app := range apps {
if err := d.checkApplication(ctx, app, gitRepo, headCommit, isNamespacedResources); err != nil {
if err := d.checkApplication(ctx, app, gitRepo, headCommit); err != nil {
d.logger.Error(fmt.Sprintf("failed to check application: %s", app.Id), zap.Error(err))
}
}
}
}

func (d *detector) checkApplication(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit, isNamespacedResources map[schema.GroupVersionKind]bool) error {
func (d *detector) checkApplication(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit) error {
watchingResourceKinds := d.stateGetter.GetWatchingResourceKinds()
headManifests, err := d.loadHeadManifests(ctx, app, repo, headCommit, watchingResourceKinds, isNamespacedResources)
headManifests, err := d.loadHeadManifests(ctx, app, repo, headCommit, watchingResourceKinds)
if err != nil {
return err
}
Expand Down Expand Up @@ -226,7 +219,7 @@ func (d *detector) checkApplication(ctx context.Context, app *model.Application,
return d.reporter.ReportApplicationSyncState(ctx, app.Id, state)
}

func (d *detector) loadHeadManifests(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit, watchingResourceKinds []provider.APIVersionKind, isNamespacedResources map[schema.GroupVersionKind]bool) ([]provider.Manifest, error) {
func (d *detector) loadHeadManifests(ctx context.Context, app *model.Application, repo git.Repo, headCommit git.Commit, watchingResourceKinds []provider.APIVersionKind) ([]provider.Manifest, error) {
var (
manifestCache = provider.AppManifestsCache{
AppID: app.Id,
Expand Down Expand Up @@ -288,7 +281,7 @@ func (d *detector) loadHeadManifests(ctx context.Context, app *model.Application
}
}

loader := provider.NewLoader(app.Name, appDir, repoDir, app.GitPath.ConfigFilename, cfg.KubernetesApplicationSpec.Input, isNamespacedResources, d.gitClient, d.logger)
loader := provider.NewLoader(app.Name, appDir, repoDir, app.GitPath.ConfigFilename, cfg.KubernetesApplicationSpec.Input, d.gitClient, d.logger)
manifests, err = loader.LoadManifests(ctx)
if err != nil {
err = fmt.Errorf("failed to load new manifests: %w", err)
Expand Down
19 changes: 1 addition & 18 deletions pkg/app/piped/executor/kubernetes/baseline.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,7 @@ func (e *deployExecutor) ensureBaselineRollout(ctx context.Context) model.StageS

// Load running manifests at the most successful deployed commit.
e.LogPersister.Infof("Loading running manifests at commit %s for handling", runningCommit)
ds, err := e.RunningDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare running deploy source (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

loader := provider.NewLoader(
e.Deployment.ApplicationName,
ds.AppDir,
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.isNamespacedResources,
e.GitClient,
e.Logger,
)

manifests, err := loadManifests(ctx, e.Deployment.ApplicationId, runningCommit, e.AppManifestsCache, loader, e.Logger)
manifests, err := e.loadRunningManifests(ctx)
if err != nil {
e.LogPersister.Errorf("Failed while loading running manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
Expand Down
58 changes: 40 additions & 18 deletions pkg/app/piped/executor/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/pipe-cd/pipecd/pkg/app/piped/executor"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/kubernetes"
Expand All @@ -37,9 +36,8 @@ import (
type deployExecutor struct {
executor.Input

commit string
appCfg *config.KubernetesApplicationSpec
isNamespacedResources map[schema.GroupVersionKind]bool
commit string
appCfg *config.KubernetesApplicationSpec

loader provider.Loader
applierGetter applierGetter
Expand Down Expand Up @@ -77,18 +75,6 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
ctx := sig.Context()
e.commit = e.Deployment.Trigger.Commit.Hash

cp, ok := e.PipedConfig.FindPlatformProvider(e.Deployment.PlatformProvider, model.ApplicationKind_KUBERNETES)
if !ok {
e.LogPersister.Errorf("provider %s was not found", e.Deployment.PlatformProvider)
return model.StageStatus_STAGE_FAILURE
}
isNamespacedResources, err := provider.GetIsNamespacedResources(cp.KubernetesConfig)
if err != nil {
e.LogPersister.Errorf("failed to get isNamespacedResources %v", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
e.isNamespacedResources = isNamespacedResources

ds, err := e.TargetDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare target deploy source data (%v)", err)
Expand Down Expand Up @@ -124,7 +110,6 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.isNamespacedResources,
e.GitClient,
e.Logger,
)
Expand Down Expand Up @@ -169,7 +154,44 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
return executor.DetermineStageStatus(sig.Signal(), originalStatus, status)
}

// loadManifests loads the manifest using the given loader. It caches the loaded manifests for the given commit.
func (e *deployExecutor) loadRunningManifests(ctx context.Context) (manifests []provider.Manifest, err error) {
commit := e.Deployment.RunningCommitHash
if commit == "" {
return nil, fmt.Errorf("unable to determine running commit")
}

loader := &manifestsLoadFunc{
loadFunc: func(ctx context.Context) ([]provider.Manifest, error) {
ds, err := e.RunningDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare running deploy source (%v)", err)
return nil, err
}

loader := provider.NewLoader(
e.Deployment.ApplicationName,
ds.AppDir,
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.GitClient,
e.Logger,
)
return loader.LoadManifests(ctx)
},
}

return loadManifests(ctx, e.Deployment.ApplicationId, commit, e.AppManifestsCache, loader, e.Logger)
}

type manifestsLoadFunc struct {
loadFunc func(context.Context) ([]provider.Manifest, error)
}

func (l *manifestsLoadFunc) LoadManifests(ctx context.Context) ([]provider.Manifest, error) {
return l.loadFunc(ctx)
}

func loadManifests(ctx context.Context, appID, commit string, manifestsCache cache.Cache, loader provider.Loader, logger *zap.Logger) (manifests []provider.Manifest, err error) {
cache := provider.AppManifestsCache{
AppID: appID,
Expand Down
21 changes: 1 addition & 20 deletions pkg/app/piped/executor/kubernetes/primary.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,26 +156,7 @@ func (e *deployExecutor) ensurePrimaryRollout(ctx context.Context) model.StageSt

// Find the running resources that are not defined in Git.
e.LogPersister.Info("Start finding all running PRIMARY resources but no longer defined in Git")
// Load running manifests at the most successful deployed commit.
e.LogPersister.Infof("Loading running manifests at commit %s for handling", e.Deployment.RunningCommitHash)
ds, err := e.RunningDSP.Get(ctx, e.LogPersister)
if err != nil {
e.LogPersister.Errorf("Failed to prepare running deploy source (%v)", err)
return model.StageStatus_STAGE_FAILURE
}

loader := provider.NewLoader(
e.Deployment.ApplicationName,
ds.AppDir,
ds.RepoDir,
e.Deployment.GitPath.ConfigFilename,
e.appCfg.Input,
e.isNamespacedResources,
e.GitClient,
e.Logger,
)

runningManifests, err := loadManifests(ctx, e.Deployment.ApplicationId, e.Deployment.RunningCommitHash, e.AppManifestsCache, loader, e.Logger)
runningManifests, err := e.loadRunningManifests(ctx)
if err != nil {
e.LogPersister.Errorf("Failed while loading running manifests (%v)", err)
return model.StageStatus_STAGE_FAILURE
Expand Down
41 changes: 2 additions & 39 deletions pkg/app/piped/executor/kubernetes/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import (
"strings"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"

"github.com/pipe-cd/pipecd/pkg/app/piped/executor"
provider "github.com/pipe-cd/pipecd/pkg/app/piped/platformprovider/kubernetes"
Expand All @@ -34,8 +31,7 @@ import (
type rollbackExecutor struct {
executor.Input

appDir string
isNamespacedResources map[schema.GroupVersionKind]bool
appDir string
}

func (e *rollbackExecutor) Execute(sig executor.StopSignal) model.StageStatus {
Expand All @@ -45,39 +41,6 @@ func (e *rollbackExecutor) Execute(sig executor.StopSignal) model.StageStatus {
status model.StageStatus
)

// Use discovery to discover APIs supported by the Kubernetes API server.
// This should be run periodically with a low rate because the APIs are not added frequently.
// https://godoc.org/k8s.io/client-go/discovery
cp, ok := e.PipedConfig.FindPlatformProvider(e.Deployment.PlatformProvider, model.ApplicationKind_KUBERNETES)
if !ok {
e.LogPersister.Errorf("provider %s was not found", e.Deployment.PlatformProvider)
return model.StageStatus_STAGE_FAILURE
}
kubeConfig, err := clientcmd.BuildConfigFromFlags(cp.KubernetesConfig.MasterURL, cp.KubernetesConfig.KubeConfigPath)
if err != nil {
e.LogPersister.Errorf("failed to build kube config", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
discoveryClient, err := discovery.NewDiscoveryClientForConfig(kubeConfig)
if err != nil {
e.LogPersister.Errorf("failed to create discovery client: %v", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
groupResources, err := discoveryClient.ServerPreferredResources()
if err != nil {
e.LogPersister.Errorf("failed to fetch preferred resources: %v", zap.Error(err))
return model.StageStatus_STAGE_FAILURE
}
e.LogPersister.Infof("successfully preferred resources that contains for %d groups", len(groupResources))

e.isNamespacedResources = make(map[schema.GroupVersionKind]bool)
for _, gr := range groupResources {
for _, resource := range gr.APIResources {
gvk := schema.FromAPIVersionAndKind(gr.GroupVersion, resource.Kind)
e.isNamespacedResources[gvk] = resource.Namespaced
}
}

switch model.Stage(e.Stage.Name) {
case model.StageRollback:
status = e.ensureRollback(ctx)
Expand Down Expand Up @@ -119,7 +82,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus

e.appDir = ds.AppDir

loader := provider.NewLoader(e.Deployment.ApplicationName, ds.AppDir, ds.RepoDir, e.Deployment.GitPath.ConfigFilename, appCfg.Input, e.isNamespacedResources, e.GitClient, e.Logger)
loader := provider.NewLoader(e.Deployment.ApplicationName, ds.AppDir, ds.RepoDir, e.Deployment.GitPath.ConfigFilename, appCfg.Input, e.GitClient, e.Logger)
e.Logger.Info("start executing kubernetes stage",
zap.String("stage-name", e.Stage.Name),
zap.String("app-dir", ds.AppDir),
Expand Down
22 changes: 3 additions & 19 deletions pkg/app/piped/planner/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"time"

"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/pipe-cd/pipecd/pkg/app/piped/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/piped/planner"
Expand All @@ -41,7 +40,6 @@ const (

// Planner plans the deployment pipeline for kubernetes application.
type Planner struct {
isNamespacedResources map[schema.GroupVersionKind]bool
}

type registerer interface {
Expand All @@ -50,25 +48,11 @@ type registerer interface {

// Register registers this planner into the given registerer.
func Register(r registerer) {
r.Register(model.ApplicationKind_KUBERNETES, &Planner{
isNamespacedResources: make(map[schema.GroupVersionKind]bool),
})
r.Register(model.ApplicationKind_KUBERNETES, &Planner{})
}

// Plan decides which pipeline should be used for the given input.
func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Output, err error) {
cp, ok := in.PipedConfig.FindPlatformProvider(in.PlatformProviderName, model.ApplicationKind_KUBERNETES)
if !ok {
err = fmt.Errorf("provider %s was not found", in.PlatformProviderName)
return
}
isNamespacedResources, err := provider.GetIsNamespacedResources(cp.KubernetesConfig)
if err != nil {
err = fmt.Errorf("failed to get isNamespacedResources: %v", err)
return
}
p.isNamespacedResources = isNamespacedResources

ds, err := in.TargetDSP.Get(ctx, io.Discard)
if err != nil {
err = fmt.Errorf("error while preparing deploy source data (%v)", err)
Expand Down Expand Up @@ -97,7 +81,7 @@ func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Outpu
newManifests, ok := manifestCache.Get(in.Trigger.Commit.Hash)
if !ok {
// When the manifests were not in the cache we have to load them.
loader := provider.NewLoader(in.ApplicationName, ds.AppDir, ds.RepoDir, in.GitPath.ConfigFilename, cfg.Input, p.isNamespacedResources, in.GitClient, in.Logger)
loader := provider.NewLoader(in.ApplicationName, ds.AppDir, ds.RepoDir, in.GitPath.ConfigFilename, cfg.Input, in.GitClient, in.Logger)
newManifests, err = loader.LoadManifests(ctx)
if err != nil {
return
Expand Down Expand Up @@ -221,7 +205,7 @@ func (p *Planner) Plan(ctx context.Context, in planner.Input) (out planner.Outpu
err = fmt.Errorf("unable to find the running configuration (%v)", err)
return
}
loader := provider.NewLoader(in.ApplicationName, runningDs.AppDir, runningDs.RepoDir, in.GitPath.ConfigFilename, runningCfg.Input, p.isNamespacedResources, in.GitClient, in.Logger)
loader := provider.NewLoader(in.ApplicationName, runningDs.AppDir, runningDs.RepoDir, in.GitPath.ConfigFilename, runningCfg.Input, in.GitClient, in.Logger)
oldManifests, err = loader.LoadManifests(ctx)
if err != nil {
err = fmt.Errorf("failed to load previously deployed manifests: %w", err)
Expand Down
1 change: 0 additions & 1 deletion pkg/app/piped/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ type gitClient interface {
type Input struct {
ApplicationID string
ApplicationName string
PlatformProviderName string
GitPath model.ApplicationGitPath
Trigger model.DeploymentTrigger
MostRecentSuccessfulCommitHash string
Expand Down
7 changes: 3 additions & 4 deletions pkg/app/piped/planpreview/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,9 @@ func (b *builder) plan(ctx context.Context, app *model.Application, targetDSP de
}

in := planner.Input{
ApplicationID: app.Id,
ApplicationName: app.Name,
PlatformProviderName: app.PlatformProvider,
GitPath: *app.GitPath,
ApplicationID: app.Id,
ApplicationName: app.Name,
GitPath: *app.GitPath,
Trigger: model.DeploymentTrigger{
Commit: &model.Commit{
Branch: b.repoCfg.Branch,
Expand Down
Loading
Loading