Skip to content

Commit

Permalink
Add call to vpc-cni-k8s-connector for EKS windows (#94)
Browse files Browse the repository at this point in the history
1. CNI plugin depends on vpc-cni-k8s-connector executed in EKS Windows nodes.
2. Call is made to vpc-cni-k8s-connector with podname, pod namespace and path to pipe as input args.
3. CNI receives output over named pipe and vpc-cni-k8s-connector logs over stdout.
4. CNI plugin creates a named pipe, start listener and read pipe connections to receive output from vpc-cni-k8s-connector
  • Loading branch information
KlwntSingh committed Jul 13, 2023
1 parent 030ff29 commit c20156f
Show file tree
Hide file tree
Showing 5 changed files with 293 additions and 105 deletions.
6 changes: 6 additions & 0 deletions plugins/vpc-bridge/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
## Amazon vpc-bridge CNI plugin
vpc-bridge CNI plugin for EKS Windows.

## Pre-requisites for running vpc-bridge CNI plugin for EKS Windows
1. Env variable `AWS_VPC_CNI_K8S_CONNECTOR_BINARY_PATH` is required to be set. This will be already set in EKS Windows Optimized AMIs.
Set env variable `AWS_VPC_CNI_K8S_CONNECTOR_BINARY_PATH` to `C:\Program Files\Amazon\EKS\bin\aws-vpc-cni-k8s-connector.exe`.
121 changes: 121 additions & 0 deletions plugins/vpc-bridge/config/k8s/connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

//go:build enablek8sconnector && windows

package k8s

import (
"bytes"
"context"
"fmt"
"os"
"os/exec"

log "github.com/cihub/seelog"
)

const (
// Represents Env variable for k8s connector binary path.
envK8sConnectorBinaryPath = "AWS_VPC_CNI_K8S_CONNECTOR_BINARY_PATH"
// Represents default path for k8s connector binary in EKS Windows AMIs.
defaultK8sConnectorBinaryPath = `C:\Program Files\Amazon\EKS\bin\aws-vpc-cni-k8s-connector.exe`

// Represents Env for log level for k8s connector binary.
envLogLevel = "VPC_CNI_LOG_LEVEL"
// Represents default log level for k8s connector binary.
defaultLogLevel = "info"
)

// GetPodIP retrieves pod IP address using k8s connector binary.
// Output from binary is received over named pipe.
// Create and read operations on named pipe are handled in separate go routine.
// Named pipe path will be passed as argument to k8s connector binary execution.
func GetPodIP(ctx context.Context, podNamespace, podName string) (string, error) {
// Get new named pipe path.
pipeName, err := newPipe()
if err != nil {
return "", fmt.Errorf("error creating new pipe: %w", err)
}

resultChan := make(chan pipeReadResult)

// Read output from named pipe in separate go routine as accepting and reading connection is blocking operation.
go readResultFromPipe(ctx, pipeName, resultChan)

// Executing k8s connector binary in main routine. binary will write output to named pipe.
err = executeK8sConnector(ctx, podNamespace, podName, pipeName)
if err != nil {
return "", fmt.Errorf("error executing k8s connector: %w", err)
}

// Get output from named pipe using result chan.
var result pipeReadResult
select {
// Check if context timed out.
case <-ctx.Done():
return "", fmt.Errorf("error getting output from pipe: %w", ctx.Err())
case result = <-resultChan:
}

if result.error != nil {
return "", fmt.Errorf("error reading output from pipe: %w", result.error)
}

log.Debugf("Got pod IP address %s for pod %s in namespace %s", result.output, podName, podNamespace)
return result.output, nil
}

// executeK8sConnector executes aws-vpc-cni-k8s-connector binary to get pod IP address.
// Output from binary is received over named pipe. Execution logs from binary are returned over stdout.
func executeK8sConnector(ctx context.Context, podNamespace string, podName string, pipe string) error {
// Prepare command to execute binary with required args.
cmd := exec.CommandContext(ctx, getK8sConnectorBinaryPath(),
"-pod-name", podName, "-pod-namespace", podNamespace,
"-pipe", pipe, "-log-level", getK8sConnectorLogLevel())
log.Debugf("Executing cmd %s to get pod IP address", cmd.String())

// Setting Stderr for command to receive complete error.
var errBytes bytes.Buffer
cmd.Stderr = &errBytes

output, err := cmd.Output()

log.Infof("Logs from k8s connector binary...\n")
log.Infof("%s\n", string(output))
log.Infof("End of k8s connector binary logs\n")

if err != nil {
return fmt.Errorf("error executing connector binary: %w with execution error: %s", err, errBytes.String())
}

return nil
}

// getK8sConnectorBinaryPath returns path to k8s connector binary.
func getK8sConnectorBinaryPath() string {
connectorBinaryPath := os.Getenv(envK8sConnectorBinaryPath)
if connectorBinaryPath == "" {
connectorBinaryPath = defaultK8sConnectorBinaryPath
}
return connectorBinaryPath
}

// getK8sConnectorLogLevel returns the log level for k8s connector binary.
func getK8sConnectorLogLevel() string {
logLevel := os.Getenv(envLogLevel)
if logLevel == "" {
logLevel = defaultLogLevel
}
return logLevel
}
111 changes: 111 additions & 0 deletions plugins/vpc-bridge/config/k8s/pipe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

//go:build enablek8sconnector && windows

package k8s

import (
"bytes"
"context"
"fmt"
"io"
"time"

winio "github.com/Microsoft/go-winio"
log "github.com/cihub/seelog"
"golang.org/x/sys/windows"
)

const (
// PipeConnectionTimeout Represents default timeout for reading from connection object.
PipeConnectionTimeout = 1 * time.Minute

// NamedPipePathPrefix Represents prefix for named pipe path.
NamedPipePathPrefix = `\\.\pipe\ProtectedPrefix\Administrators\aws-vpc-k8s-connector`
)

// pipeReadResult Represents message for communication between main and go routine which reads output from named pipe.
type pipeReadResult struct {
output string // Represents output from named pipe.
error error // Represents error while reading output.
}

// newPipe Create new random named pipe path.
func newPipe() (string, error) {
// Generate GUID random string.
g, err := windows.GenerateGUID()
if err != nil {
return "", fmt.Errorf("error creating unique name for named pipe: %w", err)
}
// Create pipe path with prefix.
pipeName := fmt.Sprintf("%s-%s", NamedPipePathPrefix, g.String())
return pipeName, nil
}

// readResultFromPipe func executes in separate go routine as it has blocking operations.
// It creates listener and start accepting connections on named pipe. Read output from conn object.
// Pass output from named pipe to main routine via chan passed as argument. Write any error to same result chan.
// Func closes result channel, pipe listener, conn object.
func readResultFromPipe(ctx context.Context, pipeName string, result chan pipeReadResult) {
// Defer closing channel from sender go routine.
// We can't close in receiver as sender go routine will panic trying to send on closed channel.
defer close(result)

// Create pipe and get listener.
pipeListener, err := winio.ListenPipe(pipeName, nil)
if err != nil {
log.Errorf("Error creating named pipe %s: %w", pipeName, err)
result <- pipeReadResult{error: fmt.Errorf("error creating named pipe %s: %w", pipeName, err)}
return
}
defer pipeListener.Close()
log.Debugf("Named piped %s created", pipeName)

// Accept connection on named pipe. This operation is blocked until client(k8s connector binary) dials into pipe.
conn, err := pipeListener.Accept()
if err != nil {
log.Errorf("Error accepting connection on pipe listener %v: %w", pipeName, err)
result <- pipeReadResult{error: fmt.Errorf("error accepting connection on pipe listener %v: %w", pipeName, err)}
return
}
defer conn.Close()

// Get deadline from context, use it to set deadline for conn.
contextDeadline, ok := ctx.Deadline()
if !ok {
log.Debugf("Using default timeout %s for pipe connection", PipeConnectionTimeout)
contextDeadline = time.Now().Add(PipeConnectionTimeout)
}
// Setup read timeout on conn object.
err = conn.SetReadDeadline(contextDeadline)
if err != nil {
log.Errorf("Error setting timeout for pipe connection %v: %w", pipeName, err)
result <- pipeReadResult{error: fmt.Errorf("error setting timeout for pipe connection %v: %w", pipeName, err)}
return
}

// Get output from named pipe. This operation is blocking until client(k8s connector binary) has written output.
// Returns error if output is not available within above given timeout.
var output bytes.Buffer
_, err = io.Copy(&output, conn) // Copy output from pipe connection to buffer.
if err != nil {
log.Errorf("Error reading from named piped %v: %w", pipeName, err)
result <- pipeReadResult{error: fmt.Errorf("error reading from named piped %v: %w", pipeName, err)}
return
}

// Send output from named pipe on channel.
result <- pipeReadResult{output: output.String()}
return
}
55 changes: 55 additions & 0 deletions plugins/vpc-bridge/config/k8s_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may
// not use this file except in compliance with the License. A copy of the
// License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed
// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language governing
// permissions and limitations under the License.

//go:build enablek8sconnector && windows

package config

import (
"context"
"fmt"
"time"

"github.com/aws/amazon-vpc-cni-plugins/network/vpc"
"github.com/aws/amazon-vpc-cni-plugins/plugins/vpc-bridge/config/k8s"
)

func init() {
// Define retrievePodConfigHandler when enablek8sconnector tag is set.
retrievePodConfigHandler = retrievePodConfig
}

// retrievePodConfig retrieves a pod's configuration from an external source.
func retrievePodConfig(netConfig *NetConfig) error {
kc := netConfig.Kubernetes

// Creating context with timeout.
ctx := context.Background()
ctx, ctxCancel := context.WithTimeout(ctx, time.Minute)
defer ctxCancel()

// Retrieve pod IP address CIDR string using k8s connector.
ipAddress, err := k8s.GetPodIP(ctx, kc.Namespace, kc.PodName)
if err != nil {
return fmt.Errorf("failed to get pod IP address %s: %w", kc.PodName, err)
}

// Parse IP address returned by k8s binary.
ipAddr, err := vpc.GetIPAddressFromString(ipAddress)
if err != nil {
return fmt.Errorf("invalid IPAddress %s from pod label", ipAddress)
}
netConfig.IPAddresses = append(netConfig.IPAddresses, *ipAddr)

return nil
}
Loading

0 comments on commit c20156f

Please sign in to comment.