Skip to content

Commit

Permalink
use Tekton Resolvers for a standard on cluster build
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <[email protected]>
  • Loading branch information
zroubalik committed Jul 4, 2023
1 parent b38d19b commit 94757bb
Show file tree
Hide file tree
Showing 43 changed files with 4,001 additions and 489 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ require (
knative.dev/serving v0.37.1-0.20230603021539-349b2d61b0e8
)

require sigs.k8s.io/controller-runtime v0.7.2 // indirect

require (
cloud.google.com/go/compute v1.19.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
Expand Down Expand Up @@ -166,6 +168,8 @@ require (
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/manifestival/client-go-client v0.5.0
github.com/manifestival/manifestival v0.7.2
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
github.com/mattn/go-runewidth v0.0.14 // indirect
Expand Down
93 changes: 93 additions & 0 deletions go.sum

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions pkg/builders/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func (e ErrUnknownBuilder) Error() string {
return fmt.Sprintf("\"%v\" is not a known builder. Available builders are %s", e.Name, e.Known)
}

// ErrBuilderNotSupported
type ErrBuilderNotSupported struct {
Builder string
}

func (e ErrBuilderNotSupported) Error() string {
return fmt.Sprintf("builder %q is not supported", e.Builder)
}

// ErrRuntimeRequired
type ErrRuntimeRequired struct {
Builder string
Expand Down
14 changes: 14 additions & 0 deletions pkg/k8s/manifestival.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package k8s

import (
mfc "github.com/manifestival/client-go-client"
"github.com/manifestival/manifestival"
)

func GetManifestivalClient() (manifestival.Client, error) {
config, err := GetClientConfig().ClientConfig()
if err != nil {
return nil, err
}
return mfc.NewClient(config)
}
31 changes: 20 additions & 11 deletions pkg/pipelines/tekton/pipelines_pac_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (pp *PipelinesProvider) ConfigurePAC(ctx context.Context, f fn.Function, me
}

if data.ConfigureLocalResources {
if err := pp.createLocalResources(ctx, f); err != nil {
if err := pp.createLocalPACResources(ctx, f); err != nil {
return err
}
}
Expand All @@ -60,13 +60,13 @@ func (pp *PipelinesProvider) ConfigurePAC(ctx context.Context, f fn.Function, me
}

if data.ConfigureClusterResources {
if err := pp.createClusterResources(ctx, f, data); err != nil {
if err := pp.createClusterPACResources(ctx, f, data); err != nil {
return err
}
}

if data.ConfigureRemoteResources {
if err := pp.createRemoteResources(ctx, f, data); err != nil {
if err := pp.createRemotePACResources(ctx, f, data); err != nil {
return err
}
}
Expand Down Expand Up @@ -102,26 +102,35 @@ func (pp *PipelinesProvider) RemovePAC(ctx context.Context, f fn.Function, metad
return nil
}

// createLocalResources creates necessary local resources in .tekton directory:
// createLocalPACResources creates necessary local resources in .tekton directory:
// Pipeline and PipelineRun templates
func (pp *PipelinesProvider) createLocalResources(ctx context.Context, f fn.Function) error {
err := createPipelineTemplate(f)
func (pp *PipelinesProvider) createLocalPACResources(ctx context.Context, f fn.Function) error {
// let's specify labels that will be applied to every resource that is created for a Pipeline
labels, err := f.LabelsMap()
if err != nil {
return err
}
if pp.decorator != nil {
labels = pp.decorator.UpdateLabels(f, labels)
}

err = createPipelineTemplatePAC(f, labels)
if err != nil {
return err
}

err = createPipelineRunTemplate(f)
err = createPipelineRunTemplatePAC(f, labels)
if err != nil {
return err
}

return nil
}

// createClusterResources create resources on cluster, it tries to detect PAC installation,
// createClusterPACResources create resources on cluster, it tries to detect PAC installation,
// creates necessary secret with image registry credentials and git credentials (access tokens, webhook secrets),
// also creates PVC for the function source code
func (pp *PipelinesProvider) createClusterResources(ctx context.Context, f fn.Function, metadata pipelines.PacMetadata) error {
func (pp *PipelinesProvider) createClusterPACResources(ctx context.Context, f fn.Function, metadata pipelines.PacMetadata) error {
// figure out pac installation namespace
installed, _, err := pac.DetectPACInstallation(ctx, "")
if !installed {
Expand Down Expand Up @@ -183,10 +192,10 @@ func (pp *PipelinesProvider) createClusterResources(ctx context.Context, f fn.Fu
return nil
}

// createRemoteResources creates resources on the remote git repository
// createRemotePACResources creates resources on the remote git repository
// set up a webhook with secrets, access tokens and it tries to detec PAC installation
// together with PAC controller route url - needed for webhook payload trigger
func (pp *PipelinesProvider) createRemoteResources(ctx context.Context, f fn.Function, metadata pipelines.PacMetadata) error {
func (pp *PipelinesProvider) createRemotePACResources(ctx context.Context, f fn.Function, metadata pipelines.PacMetadata) error {

// figure out pac installation namespace
installed, installationNS, err := pac.DetectPACInstallation(ctx, "")
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipelines/tekton/pipelines_pac_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Test_createLocalResources(t *testing.T) {
f.Registry = TestRegistry

pp := NewPipelinesProvider()
err = pp.createLocalResources(context.Background(), f)
err = pp.createLocalPACResources(context.Background(), f)
if (err != nil) != tt.wantErr {
t.Errorf("pp.createLocalResources() error = %v, wantErr %v", err, tt.wantErr)
return
Expand All @@ -76,7 +76,7 @@ func Test_deleteAllPipelineTemplates(t *testing.T) {
f.Registry = TestRegistry

pp := NewPipelinesProvider()
err = pp.createLocalResources(context.Background(), f)
err = pp.createLocalPACResources(context.Background(), f)
if err != nil {
t.Errorf("unexpected error while running pp.createLocalResources() error = %v", err)
}
Expand Down
80 changes: 63 additions & 17 deletions pkg/pipelines/tekton/pipelines_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package tekton
import (
"archive/tar"
"context"
goErrors "errors"
"errors"
"fmt"
"io"
"io/fs"
Expand All @@ -12,6 +12,7 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/resource"

Expand All @@ -24,7 +25,7 @@ import (
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
pipelineClient "github.com/tektoncd/pipeline/pkg/client/clientset/versioned/typed/pipeline/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8slabels "k8s.io/apimachinery/pkg/labels"

Expand Down Expand Up @@ -152,10 +153,11 @@ func (pp *PipelinesProvider) Run(ctx context.Context, f fn.Function) error {
}
}

_, err = client.Pipelines(pp.namespace).Create(ctx, generatePipeline(f, labels), metav1.CreateOptions{})
// _, err = client.Pipelines(pp.namespace).Create(ctx, generatePipeline(f, labels), metav1.CreateOptions{})
err = createAndApplyPipelineTemplate(f, pp.namespace, labels)
if err != nil {
if !errors.IsAlreadyExists(err) {
if errors.IsNotFound(err) {
if !k8serrors.IsAlreadyExists(err) {
if k8serrors.IsNotFound(err) {
return fmt.Errorf("problem creating pipeline, missing tekton?: %v", err)
}
return fmt.Errorf("problem creating pipeline: %v", err)
Expand Down Expand Up @@ -184,28 +186,37 @@ func (pp *PipelinesProvider) Run(ctx context.Context, f fn.Function) error {
if f.Registry == "" {
f.Registry = registry
}
pr, err := client.PipelineRuns(pp.namespace).Create(ctx, generatePipelineRun(f, labels), metav1.CreateOptions{})

err = createAndApplyPipelineRunTemplate(f, pp.namespace, labels)
if err != nil {
return fmt.Errorf("problem in creating pipeline run: %v", err)
}

err = pp.watchPipelineRunProgress(ctx, pr)
// we need to give k8s time to actually create the Pipeline Run
time.Sleep(1 * time.Second)

newestPipelineRun, err := findNewestPipelineRunWithRetry(ctx, f, pp.namespace, client)
if err != nil {
return fmt.Errorf("problem in listing pipeline runs: %v", err)
}

err = pp.watchPipelineRunProgress(ctx, newestPipelineRun)
if err != nil {
if !goErrors.Is(err, context.Canceled) {
if !errors.Is(err, context.Canceled) {
return fmt.Errorf("problem in watching started pipeline run: %v", err)
}
// TODO replace deletion with pipeline-run cancellation
_ = client.PipelineRuns(pp.namespace).Delete(context.TODO(), pr.Name, metav1.DeleteOptions{})
_ = client.PipelineRuns(pp.namespace).Delete(context.TODO(), newestPipelineRun.Name, metav1.DeleteOptions{})
return fmt.Errorf("pipeline run cancelled: %w", context.Canceled)
}

pr, err = client.PipelineRuns(pp.namespace).Get(ctx, pr.Name, metav1.GetOptions{})
newestPipelineRun, err = client.PipelineRuns(pp.namespace).Get(ctx, newestPipelineRun.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("problem in retriving pipeline run status: %v", err)
}

if pr.Status.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionFalse {
message := getFailedPipelineRunLog(ctx, client, pr, pp.namespace)
if newestPipelineRun.Status.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionFalse {
message := getFailedPipelineRunLog(ctx, client, newestPipelineRun, pp.namespace)
return fmt.Errorf("function pipeline run has failed with message: \n\n%s", message)
}

Expand Down Expand Up @@ -363,7 +374,7 @@ func (pp *PipelinesProvider) removeClusterResources(ctx context.Context, f fn.Fu
go func() {
defer wg.Done()
err := df(ctx, pp.namespace, listOptions)
if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {
if err != nil && !k8serrors.IsNotFound(err) && !k8serrors.IsForbidden(err) {
errChan <- err
}
}()
Expand All @@ -389,9 +400,9 @@ func (pp *PipelinesProvider) removeClusterResources(ctx context.Context, f fn.Fu
// and prints detailed description of the currently executed Tekton Task.
func (pp *PipelinesProvider) watchPipelineRunProgress(ctx context.Context, pr *v1beta1.PipelineRun) error {
taskProgressMsg := map[string]string{
taskNameFetchSources: "Fetching git repository with the function source code",
taskNameBuild: "Building function image on the cluster",
taskNameDeploy: "Deploying function to the cluster",
"fetch-sources": "Fetching git repository with the function source code",
"build": "Building function image on the cluster",
"deploy": "Deploying function to the cluster",
}

clients, err := NewTektonClients()
Expand Down Expand Up @@ -471,6 +482,41 @@ func getFailedPipelineRunLog(ctx context.Context, client *pipelineClient.TektonV
return message
}

// findNewestPipelineRunWithRetry tries to find newest Pipeline Run for the input function
func findNewestPipelineRunWithRetry(ctx context.Context, f fn.Function, namespace string, client *pipelineClient.TektonV1beta1Client) (*v1beta1.PipelineRun, error) {
l := k8slabels.SelectorFromSet(k8slabels.Set(map[string]string{fnlabels.FunctionNameKey: f.Name}))
listOptions := metav1.ListOptions{
LabelSelector: l.String(),
}

var newestPipelineRun *v1beta1.PipelineRun
for attempt := 1; attempt <= 3; attempt++ {
prs, err := client.PipelineRuns(namespace).List(ctx, listOptions)
if err != nil {
return nil, fmt.Errorf("problem in listing pipeline runs: %v", err)
}

for _, pr := range prs.Items {
currentPipelineRun := pr
if len(prs.Items) < 1 || currentPipelineRun.Status.StartTime == nil {
// Restart if StartTime is nil
break
}

if newestPipelineRun == nil || currentPipelineRun.Status.StartTime.After(newestPipelineRun.Status.StartTime.Time) {
newestPipelineRun = &currentPipelineRun
}
}

// If a non-nil newestPipelineRun is found, break the retry loop
if newestPipelineRun != nil {
return newestPipelineRun, nil
}
}

return nil, fmt.Errorf("problem in listing pipeline runs: haven't found any")
}

// allows simple mocking in unit tests, use with caution regarding concurrency
var createPersistentVolumeClaim = k8s.CreatePersistentVolumeClaim

Expand All @@ -483,7 +529,7 @@ func createPipelinePersistentVolumeClaim(ctx context.Context, f fn.Function, nam
}
}
err = createPersistentVolumeClaim(ctx, getPipelinePvcName(f), namespace, labels, f.Deploy.Annotations, corev1.ReadWriteOnce, pvcs)
if err != nil && !errors.IsAlreadyExists(err) {
if err != nil && !k8serrors.IsAlreadyExists(err) {
return fmt.Errorf("problem creating persistent volume claim: %v", err)
}
return nil
Expand Down
Loading

0 comments on commit 94757bb

Please sign in to comment.