Skip to content

Commit

Permalink
update tekton namespace handling
Browse files Browse the repository at this point in the history
  • Loading branch information
lkingland committed May 8, 2024
1 parent e677216 commit 1177fd5
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 97 deletions.
4 changes: 2 additions & 2 deletions cmd/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func TestDeploy_GitArgsUsed(t *testing.T) {

// A Pipelines Provider which will validate the expected values were received
pipeliner := mock.NewPipelinesProvider()
pipeliner.RunFn = func(f fn.Function) (string, string, error) {
pipeliner.RunFn = func(f fn.Function) (string, fn.Function, error) {
if f.Build.Git.URL != url {
t.Errorf("Pipeline Provider expected git URL '%v' got '%v'", url, f.Build.Git.URL)
}
Expand All @@ -581,7 +581,7 @@ func TestDeploy_GitArgsUsed(t *testing.T) {
if f.Build.Git.ContextDir != dir {
t.Errorf("Pipeline Provider expected git dir '%v' got '%v'", url, f.Build.Git.ContextDir)
}
return url, "", nil
return url, f, nil
}

// Deploy the Function specifying all of the git-related flags and --remote
Expand Down
26 changes: 4 additions & 22 deletions pkg/functions/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ type DNSProvider interface {

// PipelinesProvider manages lifecyle of CI/CD pipelines used by a function
type PipelinesProvider interface {
Run(context.Context, Function) (string, string, error)
Run(context.Context, Function) (string, Function, error)
Remove(context.Context, Function) error
ConfigurePAC(context.Context, Function, any) error
RemovePAC(context.Context, Function, any) error
Expand Down Expand Up @@ -814,31 +814,13 @@ func (c *Client) Deploy(ctx context.Context, f Function, oo ...DeployOption) (Fu
// Returned function contains applicable registry and deployed image name.
// String is the default route.
func (c *Client) RunPipeline(ctx context.Context, f Function) (string, Function, error) {
var err error
var url string
// Default function registry to the client's global registry
if f.Registry == "" {
f.Registry = c.registry
}

// If no image name has been specified by user (--image), calculate.
// Image name is stored on the function for later use by deploy.
if f.Image != "" {
// if user specified an image, use it
f.Deploy.Image = f.Image
} else if f.Deploy.Image == "" {
f.Deploy.Image, err = f.ImageName()
if err != nil {
return "", f, err
}
}

// Build and deploy function using Pipeline
url, f.Deploy.Namespace, err = c.pipelinesProvider.Run(ctx, f)
if err != nil {
return url, f, fmt.Errorf("failed to run pipeline: %w", err)
}
return url, f, nil
return c.pipelinesProvider.Run(ctx, f)
}

// ConfigurePAC generates Pipeline resources on the local filesystem,
Expand Down Expand Up @@ -1369,8 +1351,8 @@ func (n *noopDescriber) Describe(context.Context, string, string) (Instance, err
// PipelinesProvider
type noopPipelinesProvider struct{}

func (n *noopPipelinesProvider) Run(ctx context.Context, _ Function) (string, string, error) {
return "", "", nil
func (n *noopPipelinesProvider) Run(ctx context.Context, f Function) (string, Function, error) {
return "", f, nil
}
func (n *noopPipelinesProvider) Remove(ctx context.Context, _ Function) error { return nil }
func (n *noopPipelinesProvider) ConfigurePAC(ctx context.Context, _ Function, _ any) error {
Expand Down
7 changes: 4 additions & 3 deletions pkg/functions/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,9 +1437,10 @@ func TestClient_Pipelines_Deploy_Namespace(t *testing.T) {
defer rm()

pprovider := mock.NewPipelinesProvider()
pprovider.RunFn = func(f fn.Function) (string, string, error) {
// simulate function getting deployed here and return namespace
return "", f.Namespace, nil
pprovider.RunFn = func(f fn.Function) (string, fn.Function, error) {
// simulate function being deployed
f.Deploy.Namespace = f.Namespace
return "", f, nil
}

client := fn.New(
Expand Down
15 changes: 15 additions & 0 deletions pkg/knative/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,22 @@ func (d *Deployer) isImageInPrivateRegistry(ctx context.Context, client clientse
return false
}

func onClusterFix(f fn.Function) fn.Function {
// This only exists because of a bootstapping problem with On-Cluster
// builds: It appears that, when sending a function to be built on-cluster
// the target namespace is not being transmitted in the pipeline
// configuration. We should figure out how to transmit this information
// to the pipeline run for initial builds. This is a new problem because
// earlier versions of this logic relied entirely on the current
// kubernetes context.
if f.Namespace == "" && f.Deploy.Namespace == "" {
f.Namespace, _ = k8s.GetDefaultNamespace()
}
return f
}

func (d *Deployer) Deploy(ctx context.Context, f fn.Function) (fn.DeploymentResult, error) {
f = onClusterFix(f)
// Choosing f.Namespace vs f.Deploy.Namespace:
// This is minimal logic currently required of all deployer impls.
// If f.Namespace is defined, this is the (possibly new) target
Expand Down
34 changes: 25 additions & 9 deletions pkg/mock/pipelines_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type PipelinesProvider struct {
RunInvoked bool
RunFn func(fn.Function) (string, string, error)
RunFn func(fn.Function) (string, fn.Function, error)
RemoveInvoked bool
RemoveFn func(fn.Function) error
ConfigurePACInvoked bool
Expand All @@ -20,25 +20,41 @@ type PipelinesProvider struct {

func NewPipelinesProvider() *PipelinesProvider {
return &PipelinesProvider{
RunFn: func(f fn.Function) (x string, namespace string, err error) {
RunFn: func(f fn.Function) (string, fn.Function, error) {
// the minimum necessary logic for a deployer, which should be
// confirmed by tests in the respective implementations.
// confirmed by tests in the respective implementations, is to
// return the function with f.Deploy.* values set reflecting the
// now deployed state of the function.
if f.Namespace == "" && f.Deploy.Namespace == "" {
return "", f, errors.New("namespace required for initial deployment")
}

// fabricate that we deployed it to the newly requested namespace
if f.Namespace != "" {
namespace = f.Namespace
} else if f.Deploy.Namespace != "" {
namespace = f.Deploy.Namespace
f.Deploy.Namespace = f.Namespace
}

// fabricate that we deployed the requested image or generated
// it as needed
var err error
if f.Image != "" {
f.Deploy.Image = f.Image
} else {
err = errors.New("namespace required for initial deployment")
if f.Deploy.Image, err = f.ImageName(); err != nil {
return "", f, err
}
}
return

return "", f, nil

},
RemoveFn: func(fn.Function) error { return nil },
ConfigurePACFn: func(fn.Function) error { return nil },
RemovePACFn: func(fn.Function) error { return nil },
}
}

func (p *PipelinesProvider) Run(ctx context.Context, f fn.Function) (string, string, error) {
func (p *PipelinesProvider) Run(ctx context.Context, f fn.Function) (string, fn.Function, error) {
p.RunInvoked = true
return p.RunFn(f)
}
Expand Down
18 changes: 5 additions & 13 deletions pkg/pipelines/tekton/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,19 @@ const (
DefaultWaitingTimeout = 120 * time.Second
)

// NewTektonClientAndResolvedNamespace returns TektonV1beta1Client,namespace,error
func NewTektonClientAndResolvedNamespace(namespace string) (*v1beta1.TektonV1beta1Client, string, error) {
var err error
if namespace == "" {
namespace, err = k8s.GetDefaultNamespace()
if err != nil {
return nil, "", err
}
}

// NewTektonClient returns TektonV1beta1Client for namespace
func NewTektonClient(namespace string) (*v1beta1.TektonV1beta1Client, error) {
restConfig, err := k8s.GetClientConfig().ClientConfig()
if err != nil {
return nil, "", fmt.Errorf("failed to create new tekton client: %w", err)
return nil, fmt.Errorf("failed to create new tekton client: %w", err)
}

client, err := v1beta1.NewForConfig(restConfig)
if err != nil {
return nil, "", fmt.Errorf("failed to create new tekton client: %v", err)
return nil, fmt.Errorf("failed to create new tekton client: %v", err)
}

return client, namespace, nil
return client, nil
}

func NewTektonClients() (*cli.Clients, error) {
Expand Down
94 changes: 59 additions & 35 deletions pkg/pipelines/tekton/pipelines_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,41 +100,61 @@ func NewPipelinesProvider(opts ...Opt) *PipelinesProvider {
return pp
}

// Run creates a Tekton Pipeline and all necessary resources (PVCs, Secrets, SAs,...) for the input Function.
// It ensures that all needed resources are present on the cluster so the PipelineRun can be initialized.
// After the PipelineRun is being initialized, the progress of the PipelineRun is being watched and printed to the output.
func (pp *PipelinesProvider) Run(ctx context.Context, f fn.Function) (string, string, error) {
fmt.Fprintf(os.Stderr, "Creating Pipeline resources\n")
// Run a remote build by creating all necessary resources (PVCs, secrets,
// SAs, etc) specified by the given Function before generating a pipeline
// definition, sending it to the cluster to be run via Tekton.
// Progress is by default piped to stdtout.
// Returned is the final url, and the input Function with the final results of the run populated
// (f.Deploy.Image and f.Deploy.Namespace) or an error.
func (pp *PipelinesProvider) Run(ctx context.Context, f fn.Function) (string, fn.Function, error) {
var err error

// Checks builder and registry:
if err = validatePipeline(f); err != nil {
return "", "", err
return "", f, err
}

// Namespace is either a new namespace, specified as f.Namespace, or
// the currently deployed namespace, recorded on f.Deploy.Namespace.
// If neither exist, an error is returned (namespace is required)
namespace := f.Namespace
if namespace == "" {
namespace = f.Deploy.Namespace
}
if namespace == "" {
return "", f, fn.ErrNamespaceRequired
}
f.Deploy.Namespace = namespace

client, ns2, err := NewTektonClientAndResolvedNamespace(namespace)
if err != nil {
return "", "", err
// Image is either an explicit image indicated with f.Image, or
// generated from a name+registry
image := f.Image
if image == "" {
image, err = f.ImageName()
if err != nil {
return "", f, err
}
}
if ns2 != namespace {
panic("fixme")
f.Deploy.Image = image

// Client for the given namespace
client, err := NewTektonClient(namespace)
if err != nil {
return "", f, err
}

// let's specify labels that will be applied to every resource that is created for a Pipeline
labels, err := f.LabelsMap()
if err != nil {
return "", "", err
return "", f, err
}
if pp.decorator != nil {
labels = pp.decorator.UpdateLabels(f, labels)
}

err = createPipelinePersistentVolumeClaim(ctx, f, namespace, labels)
if err != nil {
return "", "", err
return "", f, err
}

if f.Build.Git.URL == "" {
Expand All @@ -143,84 +163,88 @@ func (pp *PipelinesProvider) Run(ctx context.Context, f fn.Function) (string, st
defer content.Close()
err = k8s.UploadToVolume(ctx, content, getPipelinePvcName(f), namespace)
if err != nil {
return "", "", fmt.Errorf("cannot upload sources to the PVC: %w", err)
return "", f, fmt.Errorf("cannot upload sources to the PVC: %w", err)
}
}

err = createAndApplyPipelineTemplate(f, namespace, labels)
if err != nil {
if !k8serrors.IsAlreadyExists(err) {
if k8serrors.IsNotFound(err) {
return "", "", fmt.Errorf("problem creating pipeline, missing tekton?: %v", err)
return "", f, fmt.Errorf("problem creating pipeline, missing tekton?: %v", err)
}
return "", "", fmt.Errorf("problem creating pipeline: %v", err)
return "", f, fmt.Errorf("problem creating pipeline: %v", err)
}
}

registry, err := docker.GetRegistry(f.Deploy.Image)
registry, err := docker.GetRegistry(image)
if err != nil {
return "", "", fmt.Errorf("problem in resolving image registry name: %v", err)
return "", f, fmt.Errorf("problem in resolving image registry name: %v", err)
}

creds, err := pp.credentialsProvider(ctx, f.Deploy.Image)
creds, err := pp.credentialsProvider(ctx, image)
if err != nil {
return "", "", err
return "", f, err
}

// TODO(lkingland): This registry defaulting logic
// is either incorrect or in the wrong place. At this stage of the
// process registry should already be defined/defaulted, and this
// function should be creating resources and deploying. Missing
// data (like registry) should have failed early in the process
if registry == name.DefaultRegistry {
registry = authn.DefaultAuthKey
}
if f.Registry == "" {
f.Registry = registry
}

err = k8s.EnsureDockerRegistrySecretExist(ctx, getPipelineSecretName(f), namespace, labels, f.Deploy.Annotations, creds.Username, creds.Password, registry)
if err != nil {
return "", "", fmt.Errorf("problem in creating secret: %v", err)
}

if f.Registry == "" {
f.Registry = registry
return "", f, fmt.Errorf("problem in creating secret: %v", err)
}

err = createAndApplyPipelineRunTemplate(f, namespace, labels)
if err != nil {
return "", "", fmt.Errorf("problem in creating pipeline run: %v", err)
return "", f, fmt.Errorf("problem in creating pipeline run: %v", err)
}

// we need to give k8s time to actually create the Pipeline Run
time.Sleep(1 * time.Second)

newestPipelineRun, err := findNewestPipelineRunWithRetry(ctx, f, namespace, client)
if err != nil {
return "", "", fmt.Errorf("problem in listing pipeline runs: %v", err)
return "", f, fmt.Errorf("problem in listing pipeline runs: %v", err)
}

err = pp.watchPipelineRunProgress(ctx, newestPipelineRun, namespace)
if err != nil {
if !errors.Is(err, context.Canceled) {
return "", "", fmt.Errorf("problem in watching started pipeline run: %v", err)
return "", f, fmt.Errorf("problem in watching started pipeline run: %v", err)
}
// TODO replace deletion with pipeline-run cancellation
_ = client.PipelineRuns(namespace).Delete(context.TODO(), newestPipelineRun.Name, metav1.DeleteOptions{})
return "", "", fmt.Errorf("pipeline run cancelled: %w", context.Canceled)
return "", f, fmt.Errorf("pipeline run cancelled: %w", context.Canceled)
}

newestPipelineRun, err = client.PipelineRuns(namespace).Get(ctx, newestPipelineRun.Name, metav1.GetOptions{})
if err != nil {
return "", "", fmt.Errorf("problem in retriving pipeline run status: %v", err)
return "", f, fmt.Errorf("problem in retriving pipeline run status: %v", err)
}

if newestPipelineRun.Status.GetCondition(apis.ConditionSucceeded).Status == corev1.ConditionFalse {
message := getFailedPipelineRunLog(ctx, client, newestPipelineRun, namespace)
return "", "", fmt.Errorf("function pipeline run has failed with message: \n\n%s", message)
return "", f, fmt.Errorf("function pipeline run has failed with message: \n\n%s", message)
}

kClient, err := knative.NewServingClient(namespace)
if err != nil {
return "", "", fmt.Errorf("problem in retrieving status of deployed function: %v", err)
return "", f, fmt.Errorf("problem in retrieving status of deployed function: %v", err)
}

ksvc, err := kClient.GetService(ctx, f.Name)
if err != nil {
return "", "", fmt.Errorf("problem in retrieving status of deployed function: %v", err)
return "", f, fmt.Errorf("problem in retrieving status of deployed function: %v", err)
}

if ksvc.Generation == 1 {
Expand All @@ -230,10 +254,10 @@ func (pp *PipelinesProvider) Run(ctx context.Context, f fn.Function) (string, st
}

if ksvc.Namespace != namespace {
panic("fixme 2")
fmt.Fprintf(os.Stderr, "Warning: Final ksvc namespace %q does not match expected %q", ksvc.Namespace, namespace)
}

return ksvc.Status.URL.String(), ksvc.Namespace, nil
return ksvc.Status.URL.String(), f, nil
}

// Creates tar stream with the function sources as they were in "./source" directory.
Expand Down
Loading

0 comments on commit 1177fd5

Please sign in to comment.