diff --git a/build/charts/flow-aggregator/README.md b/build/charts/flow-aggregator/README.md index d954012977e..e3184f00f2c 100644 --- a/build/charts/flow-aggregator/README.md +++ b/build/charts/flow-aggregator/README.md @@ -26,9 +26,11 @@ Kubernetes: `>= 1.16.0-0` | clickHouse.commitInterval | string | `"8s"` | CommitInterval is the periodical interval between batch commit of flow records to DB. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". | | clickHouse.compress | bool | `true` | Compress enables lz4 compression when committing flow records. | | clickHouse.connectionSecret | object | `{"password":"clickhouse_operator_password","username":"clickhouse_operator"}` | Credentials to connect to ClickHouse. They will be stored in a Secret. | -| clickHouse.databaseURL | string | `"tcp://clickhouse-clickhouse.flow-visibility.svc:9000"` | DatabaseURL is the url to the database. TCP protocol is required. | +| clickHouse.databaseURL | string | `"tcp://clickhouse-clickhouse.flow-visibility.svc:9000"` | DatabaseURL is the url to the database. Provide the database URL as a string with format ://:. The protocol has to be one of the following: "tcp", "tls", "http", "https". When "tls" or "https" is used, tls will be enabled. | | clickHouse.debug | bool | `false` | Debug enables debug logs from ClickHouse sql driver. | | clickHouse.enable | bool | `false` | Determine whether to enable exporting flow records to ClickHouse. | +| clickHouse.tls.caCert | bool | `false` | Indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. If true, a Secret named "clickhouse-ca" must be provided with the following keys: ca.crt: | +| clickHouse.tls.insecureSkipVerify | bool | `false` | Determine whether to skip the verification of the server's certificate chain and host name. Default is false. | | flowAggregatorAddress | string | `""` | Provide an extra DNS name or IP address of flow aggregator for generating TLS certificate. | | flowCollector.address | string | `""` | Provide the flow collector address as string with format :[:], where proto is tcp or udp. If no L4 transport proto is given, we consider tcp as default. | | flowCollector.enable | bool | `false` | Determine whether to enable exporting flow records to external flow collector. | diff --git a/build/charts/flow-aggregator/conf/flow-aggregator.conf b/build/charts/flow-aggregator/conf/flow-aggregator.conf index c61c3856866..63988e24ccc 100644 --- a/build/charts/flow-aggregator/conf/flow-aggregator.conf +++ b/build/charts/flow-aggregator/conf/flow-aggregator.conf @@ -72,9 +72,23 @@ clickHouse: # Database is the name of database where Antrea "flows" table is created. database: "default" - # DatabaseURL is the url to the database. TCP protocol is required. + # DatabaseURL is the url to the database. Provide the database URL as a string with format + # ://:. The protocol has to be + # one of the following: "tcp", "tls", "http", "https". When "tls" or "https" is used, tls + # will be enabled. databaseURL: {{ .Values.clickHouse.databaseURL | quote }} + # TLS configuration options, when using TLS to connect to the ClickHouse service. + tls: + # InsecureSkipVerify determines whether to skip the verification of the server's certificate chain and host name. + # Default is false. + insecureSkipVerify: {{ .Values.clickHouse.tls.insecureSkipVerify }} + + # CACert indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + caCert: {{ .Values.clickHouse.tls.caCert }} + # Debug enables debug logs from ClickHouse sql driver. debug: {{ .Values.clickHouse.debug }} diff --git a/build/charts/flow-aggregator/templates/deployment.yaml b/build/charts/flow-aggregator/templates/deployment.yaml index fd4f438de5a..7422235c201 100644 --- a/build/charts/flow-aggregator/templates/deployment.yaml +++ b/build/charts/flow-aggregator/templates/deployment.yaml @@ -79,6 +79,8 @@ spec: readOnly: true - mountPath: /var/log/antrea/flow-aggregator name: host-var-log-antrea-flow-aggregator + - name: clickhouse-ca + mountPath: /etc/flow-aggregator/certs nodeSelector: kubernetes.io/os: linux kubernetes.io/arch: amd64 @@ -91,3 +93,9 @@ spec: hostPath: path: /var/log/antrea/flow-aggregator type: DirectoryOrCreate + # Make it optional as we only read it when caCert=true. + - name: clickhouse-ca + secret: + secretName: clickhouse-ca + defaultMode: 0400 + optional: true diff --git a/build/charts/flow-aggregator/values.yaml b/build/charts/flow-aggregator/values.yaml index e88b9ea321a..6c34e1516fb 100644 --- a/build/charts/flow-aggregator/values.yaml +++ b/build/charts/flow-aggregator/values.yaml @@ -45,8 +45,18 @@ flowCollector: clickHouse: # -- Determine whether to enable exporting flow records to ClickHouse. enable: false - # -- DatabaseURL is the url to the database. TCP protocol is required. + # -- DatabaseURL is the url to the database. Provide the database URL as a string with format + # ://:. The protocol has to be one of + # the following: "tcp", "tls", "http", "https". When "tls" or "https" is used, tls will be enabled. databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" + # TLS configuration options, when using TLS to connect to the ClickHouse service. + tls: + # -- Determine whether to skip the verification of the server's certificate chain and host name. Default is false. + insecureSkipVerify: false + # -- Indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + caCert: false # -- Debug enables debug logs from ClickHouse sql driver. debug: false # -- Compress enables lz4 compression when committing flow records. diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index 09d1e690ed5..0aaeac6dfe0 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -224,9 +224,23 @@ data: # Database is the name of database where Antrea "flows" table is created. database: "default" - # DatabaseURL is the url to the database. TCP protocol is required. + # DatabaseURL is the url to the database. Provide the database URL as a string with format + # ://:. The protocol has to be + # one of the following: "tcp", "tls", "http", "https". When "tls" or "https" is used, tls + # will be enabled. databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" + # TLS configuration options, when using TLS to connect to the ClickHouse service. + tls: + # InsecureSkipVerify determines whether to skip the verification of the server's certificate chain and host name. + # Default is false. + insecureSkipVerify: false + + # CACert indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + caCert: false + # Debug enables debug logs from ClickHouse sql driver. debug: false @@ -434,6 +448,8 @@ spec: readOnly: true - mountPath: /var/log/antrea/flow-aggregator name: host-var-log-antrea-flow-aggregator + - mountPath: /etc/flow-aggregator/certs + name: clickhouse-ca nodeSelector: kubernetes.io/arch: amd64 kubernetes.io/os: linux @@ -446,3 +462,8 @@ spec: path: /var/log/antrea/flow-aggregator type: DirectoryOrCreate name: host-var-log-antrea-flow-aggregator + - name: clickhouse-ca + secret: + defaultMode: 256 + optional: true + secretName: clickhouse-ca diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 01fa8a58752..058f2053f51 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -45,12 +45,13 @@ function print_usage { } -TESTBED_CMD=$(dirname $0)"/kind-setup.sh" -YML_CMD=$(dirname $0)"/../../hack/generate-manifest.sh" -FLOWAGGREGATOR_YML_CMD=$(dirname $0)"/../../hack/generate-manifest-flow-aggregator.sh" -FLOW_VISIBILITY_HELM_VALUES=$(dirname $0)"/values-flow-exporter.yml" -CH_OPERATOR_YML=$(dirname $0)"/../../build/yamls/clickhouse-operator-install-bundle.yml" -FLOW_VISIBILITY_YML=$(dirname $0)"/../../build/yamls/flow-visibility-e2e.yml" +THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" +TESTBED_CMD="$THIS_DIR/kind-setup.sh" +YML_CMD="$THIS_DIR/../../hack/generate-manifest.sh" +FLOWAGGREGATOR_YML_CMD="$THIS_DIR/../../hack/generate-manifest-flow-aggregator.sh" +FLOW_VISIBILITY_HELM_VALUES="$THIS_DIR/values-flow-exporter.yml" +CH_OPERATOR_YML="$THIS_DIR/../../build/yamls/clickhouse-operator-install-bundle.yml" +FLOW_VISIBILITY_CHART="$THIS_DIR/../../test/e2e/charts/flow-visibility" function quit { result=$? @@ -147,6 +148,16 @@ case $key in esac done +source $THIS_DIR/../../hack/verify-helm.sh + +if [ -z "$HELM" ]; then + HELM="$(verify_helm)" +elif ! $HELM version > /dev/null 2>&1; then + echoerr "$HELM does not appear to be a valid helm binary" + print_help + exit 1 +fi + if [[ $cleanup_only == "true" ]];then $TESTBED_CMD destroy kind exit 0 @@ -246,14 +257,16 @@ function run_test { fi if $flow_visibility; then - timeout="10m" + timeout="15m" flow_visibility_args="-run=TestFlowAggregator --flow-visibility" if $coverage; then $FLOWAGGREGATOR_YML_CMD --coverage | docker exec -i kind-control-plane dd of=/root/flow-aggregator-coverage.yml else $FLOWAGGREGATOR_YML_CMD | docker exec -i kind-control-plane dd of=/root/flow-aggregator.yml fi - cat $FLOW_VISIBILITY_YML | docker exec -i kind-control-plane dd of=/root/flow-visibility.yml + $HELM template "$FLOW_VISIBILITY_CHART" | docker exec -i kind-control-plane dd of=/root/flow-visibility.yml + $HELM template "$FLOW_VISIBILITY_CHART" --set "secureConnection.enable=true" | docker exec -i kind-control-plane dd of=/root/flow-visibility-tls.yml + curl -o $CH_OPERATOR_YML https://raw.githubusercontent.com/Altinity/clickhouse-operator/release-0.21.0/deploy/operator/clickhouse-operator-install-bundle.yaml sed -i -e "s|\"image\": \"clickhouse/clickhouse-server:22.3\"|\"image\": \"projects.registry.vmware.com/antrea/clickhouse-server:23.4\"|g" $CH_OPERATOR_YML sed -i -e "s|image: altinity/clickhouse-operator:0.21.0|image: projects.registry.vmware.com/antrea/clickhouse-operator:0.21.0|g" $CH_OPERATOR_YML diff --git a/docs/network-flow-visibility.md b/docs/network-flow-visibility.md index 8ca6be22f33..911e021d05c 100644 --- a/docs/network-flow-visibility.md +++ b/docs/network-flow-visibility.md @@ -17,6 +17,8 @@ - [Flow Aggregator](#flow-aggregator) - [Deployment](#deployment) - [Configuration](#configuration-1) + - [Configuring secure connections to the ClickHouse database](#configuring-secure-connections-to-the-clickhouse-database) + - [Example of flow-aggregator.conf](#example-of-flow-aggregatorconf) - [IPFIX Information Elements (IEs) in an Aggregated Flow Record](#ipfix-information-elements-ies-in-an-aggregated-flow-record) - [IEs from Antrea IE Registry](#ies-from-antrea-ie-registry-1) - [Supported Capabilities](#supported-capabilities-1) @@ -283,7 +285,43 @@ it is deployed following the [deployment steps](#deployment-steps-1), the ClickHouse server is already exposed via a K8s Service, and no further configuration is required. If a different FQDN or IP is desired, please use the URL for `clickHouse.databaseURL` in the following format: -`tcp://:`. +`://:`. + +#### Configuring secure connections to the ClickHouse database + +Starting with Antrea v1.13, you can enable TLS when connecting to the ClickHouse +Server by setting `clickHouse.databaseURL` with protocol `tls` or `https`. +You can also change the value of `clickHouse.tls.insecureSkipVerify` to +determine whether to skip the verification of the server's certificate. +If you want to provide a custom CA certificate, you can set +`clickHouse.tls.caCert` to `true` and the flow Aggregator will read the +certificate key pair from the`clickhouse-ca` Secret. + +Make sure to follow the following form when creating the `clickhouse-ca` Secret +with the custom CA certificate: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: clickhouse-ca + namespace: flow-aggregator +data: + ca.crt: +``` + +You can use `kubectl apply -f ` to create the above secret +, or use `kubectl create secret`: + +```bash +kubectl create secret generic clickhouse-ca -n flow-aggregator --from-file=ca.crt= +``` + +Prior to Antrea v1.13, secure connections to ClickHouse are not supported, +and TCP is the only supported protocol when connecting to the ClickHouse +server from the Flow Aggregator. + +#### Example of flow-aggregator.conf ```yaml flow-aggregator.conf: | @@ -357,9 +395,23 @@ flow-aggregator.conf: | # Database is the name of database where Antrea "flows" table is created. database: "default" - # DatabaseURL is the url to the database. TCP protocol is required. + # DatabaseURL is the url to the database. Provide the database URL as a string with format + # ://:. The protocol has to be + # one of the following: "tcp", "tls", "http", "https". When "tls" or "https" is used, tls + # will be enabled. databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" + # TLS configuration options, when using TLS to connect to the ClickHouse service. + tls: + # InsecureSkipVerify determines whether to skip the verification of the server's certificate chain and host name. + # Default is false. + insecureSkipVerify: false + + # CACert indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + caCert: false + # Debug enables debug logs from ClickHouse sql driver. debug: false diff --git a/pkg/config/flowaggregator/config.go b/pkg/config/flowaggregator/config.go index 4aacc7cbd71..62bb58ecb09 100644 --- a/pkg/config/flowaggregator/config.go +++ b/pkg/config/flowaggregator/config.go @@ -95,7 +95,9 @@ type ClickHouseConfig struct { Enable bool `yaml:"enable,omitempty"` // Database is the name of database where Antrea "flows" table is created. Database string `yaml:"database,omitempty"` - // DatabaseURL is the url to the database. TCP protocol is required. + // DatabaseURL is the url to the database. Provide the database URL as a string with format + // ://:. The protocol has to be one + // from below: "tcp", "tls", "http", "https". When "tls" or "https" is used, tls will be enabled. // Defaults to "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" DatabaseURL string `yaml:"databaseURL,omitempty"` // Debug enables debug logs from ClickHouse sql driver. Defaults to false. @@ -106,6 +108,18 @@ type ClickHouseConfig struct { // Defaults to "8s". Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". // Min value allowed is "1s". CommitInterval string `yaml:"commitInterval,omitempty"` + // TLS configuration options, when using TLS to connect to the ClickHouse service. + TLS TLSConfig `yaml:"tls,omitempty"` +} + +type TLSConfig struct { + // InsecureSkipVerify determines whether to skip the verification of the server's certificate chain and host name. + // Default is false. + InsecureSkipVerify bool `yaml:"insecureSkipVerify,omitempty"` + // CACert determines whether to use custom CA certificate. Default root CAs will be used if false. + // If true, a Secret named "flow-aggregator-ca" must be provided with the following keys: + // ca.crt: + CACert bool `yaml:"caCert,omitempty"` } type S3UploaderConfig struct { diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index edd57a27fbd..0cfea52cbf5 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -16,6 +16,8 @@ package clickhouseclient import ( "context" + "crypto/tls" + "crypto/x509" "database/sql" "fmt" "net/url" @@ -32,6 +34,7 @@ import ( ) const ( + ProtocolUnknown = -1 maxQueueSize = 1 << 19 // 524288. ~500MB assuming 1KB per record queueFlushTimeout = 10 * time.Second insertQuery = `INSERT INTO flows ( @@ -92,13 +95,6 @@ const ( // PrepareClickHouseConnection is used for unit testing var PrepareClickHouseConnection = prepareConnection -type protocol string - -const ( - protocolTCP = "tcp" - protocolUnknown = "" -) - type stopPayload struct { flushQueue bool } @@ -126,13 +122,16 @@ type ClickHouseExportProcess struct { } type ClickHouseConfig struct { - Username string - Password string - Database string - DatabaseURL string - Debug bool - Compress *bool - CommitInterval time.Duration + Username string + Password string + Database string + DatabaseURL string + Debug bool + Compress *bool + CommitInterval time.Duration + CACert bool + InsecureSkipVerify bool + Certificate []byte } func NewClickHouseClient(config ClickHouseConfig, clusterUUID string) (*ClickHouseExportProcess, error) { @@ -366,7 +365,7 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco func prepareConnection(config ClickHouseConfig) (*sql.DB, error) { connect, err := ConnectClickHouse(&config) if err != nil { - return nil, err + return nil, fmt.Errorf("error when connecting to ClickHouse, %w", err) } // Test open Transaction tx, err := connect.Begin() @@ -411,9 +410,9 @@ func (ch *ClickHouseExportProcess) GetClickHouseConfig() ClickHouseConfig { } func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { - _, addr, err := parseDatabaseURL(config.DatabaseURL) + proto, addr, enableTLS, err := parseDatabaseURL(config.DatabaseURL) if err != nil { - return nil, err + return nil, fmt.Errorf("error when parsing database url: %w", err) } var connect *sql.DB var connErr error @@ -430,6 +429,7 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { Password: config.Password, Database: config.Database, }, + Protocol: proto, } var compression clickhouse.CompressionMethod if *config.Compress { @@ -438,7 +438,20 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { opt.Compression = &clickhouse.Compression{ Method: compression, } - + if enableTLS { // #nosec G402: ignore insecure options + opt.TLS = &tls.Config{ + InsecureSkipVerify: config.InsecureSkipVerify, + } + if config.CACert { + caCertPool := x509.NewCertPool() + ok := caCertPool.AppendCertsFromPEM(config.Certificate) + if !ok { + connErr = fmt.Errorf("failed to add the custom CA certificate") + return false, nil + } + opt.TLS.RootCAs = caCertPool + } + } connect = clickhouse.OpenDB(&opt) if err := connect.Ping(); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { @@ -456,19 +469,24 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { return connect, nil } -func parseDatabaseURL(dbUrl string) (protocol, string, error) { +func parseDatabaseURL(dbUrl string) (clickhouse.Protocol, string, bool, error) { u, err := url.Parse(dbUrl) if err != nil { - return protocolUnknown, "", fmt.Errorf("failed to parse ClickHouse database URL: %w", err) + return ProtocolUnknown, "", false, fmt.Errorf("failed to parse ClickHouse database URL: %w", err) } if u.Path != "" || u.RawQuery != "" || u.User != nil { - return protocolUnknown, "", fmt.Errorf("invalid ClickHouse database URL '%s': path, query string or user info should not be set", dbUrl) + return ProtocolUnknown, "", false, fmt.Errorf("invalid ClickHouse database URL '%s': path, query string or user info should not be set", dbUrl) } - proto := u.Scheme - switch proto { - case protocolTCP: - return protocolTCP, u.Host, nil + switch u.Scheme { + case "http": + return clickhouse.HTTP, u.Host, false, nil + case "https": + return clickhouse.HTTP, u.Host, true, nil + case "tcp": + return clickhouse.Native, u.Host, false, nil + case "tls": + return clickhouse.Native, u.Host, true, nil default: - return protocolUnknown, "", fmt.Errorf("connection over %s transport protocol is not supported", proto) + return ProtocolUnknown, "", false, fmt.Errorf("connection over %s transport protocol is not supported", u.Scheme) } } diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index cee7baf37c5..9253874383b 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/DATA-DOG/go-sqlmock" "github.com/gammazero/deque" "github.com/golang/mock/gomock" @@ -336,17 +337,12 @@ func TestUpdateCH(t *testing.T) { func TestParseDatabaseURL(t *testing.T) { testcases := []struct { - url string - expectedProto protocol - expectedAddr string - expectedErrMsg string + url string + expectedProto clickhouse.Protocol + expectedAddr string + expectedTLSEnable bool + expectedErrMsg string }{ - { - url: "tcp://127.0.0.1:9000", - expectedProto: protocolTCP, - expectedAddr: "127.0.0.1:9000", - expectedErrMsg: "", - }, { url: "abc://127.0.0.1:9000", expectedErrMsg: "connection over abc transport protocol is not supported", @@ -367,14 +363,37 @@ func TestParseDatabaseURL(t *testing.T) { url: "tcp://127.0.0.1:9000?key=value", expectedErrMsg: "invalid ClickHouse database URL", }, + { + url: "tcp://127.0.0.1:9000", + expectedProto: clickhouse.Native, + expectedAddr: "127.0.0.1:9000", + }, + { + url: "tls://127.0.0.1:9000", + expectedProto: clickhouse.Native, + expectedAddr: "127.0.0.1:9000", + expectedTLSEnable: true, + }, + { + url: "http://127.0.0.1:9000", + expectedProto: clickhouse.HTTP, + expectedAddr: "127.0.0.1:9000", + }, + { + url: "https://127.0.0.1:9000", + expectedProto: clickhouse.HTTP, + expectedAddr: "127.0.0.1:9000", + expectedTLSEnable: true, + }, } for _, tc := range testcases { - proto, addr, err := parseDatabaseURL(tc.url) + proto, addr, tlsEnable, err := parseDatabaseURL(tc.url) if tc.expectedErrMsg != "" { assert.Contains(t, err.Error(), tc.expectedErrMsg) } else { require.NoError(t, err) assert.Equal(t, tc.expectedProto, proto) + assert.Equal(t, tc.expectedTLSEnable, tlsEnable) assert.Equal(t, tc.expectedAddr, addr) } } diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index 3d42da9aa04..7276b20f3b4 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -15,10 +15,14 @@ package exporter import ( + "fmt" "os" + "path" "reflect" + "time" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" @@ -31,21 +35,47 @@ type ClickHouseExporter struct { chExportProcess *clickhouseclient.ClickHouseExportProcess } +const ( + CACertFile = "ca.crt" + CertDir = "/etc/flow-aggregator/certs" + DefaultInterval = 1 * time.Second + Timeout = 1 * time.Minute +) + func buildClickHouseConfig(opt *options.Options) clickhouseclient.ClickHouseConfig { return clickhouseclient.ClickHouseConfig{ - Username: os.Getenv("CH_USERNAME"), - Password: os.Getenv("CH_PASSWORD"), - Database: opt.Config.ClickHouse.Database, - DatabaseURL: opt.Config.ClickHouse.DatabaseURL, - Debug: opt.Config.ClickHouse.Debug, - Compress: opt.Config.ClickHouse.Compress, - CommitInterval: opt.ClickHouseCommitInterval, + Username: os.Getenv("CH_USERNAME"), + Password: os.Getenv("CH_PASSWORD"), + Database: opt.Config.ClickHouse.Database, + DatabaseURL: opt.Config.ClickHouse.DatabaseURL, + Debug: opt.Config.ClickHouse.Debug, + Compress: opt.Config.ClickHouse.Compress, + CommitInterval: opt.ClickHouseCommitInterval, + CACert: opt.Config.ClickHouse.TLS.CACert, + InsecureSkipVerify: opt.Config.ClickHouse.TLS.InsecureSkipVerify, } } func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) { chConfig := buildClickHouseConfig(opt) - klog.InfoS("ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval) + klog.InfoS("ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, + "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval, "insecureSkipVerify", chConfig.InsecureSkipVerify, "caCert", chConfig.CACert) + var errMessage error + if chConfig.CACert { + err := wait.Poll(DefaultInterval, Timeout, func() (bool, error) { + caCertPath := path.Join(CertDir, CACertFile) + certificate, err := os.ReadFile(caCertPath) + if err != nil { + errMessage = err + return false, nil + } + chConfig.Certificate = certificate + return true, nil + }) + if err != nil { + return nil, fmt.Errorf("error when reading custom CA certificate: %v", errMessage) + } + } clusterUUID, err := getClusterUUID(k8sClient) if err != nil { return nil, err diff --git a/test/e2e/charts/flow-visibility/.helmignore b/test/e2e/charts/flow-visibility/.helmignore new file mode 100644 index 00000000000..0e8a0eb36f4 --- /dev/null +++ b/test/e2e/charts/flow-visibility/.helmignore @@ -0,0 +1,23 @@ +# Patterns to ignore when building packages. +# This supports shell glob matching, relative path matching, and +# negation (prefixed with !). Only one pattern per line. +.DS_Store +# Common VCS dirs +.git/ +.gitignore +.bzr/ +.bzrignore +.hg/ +.hgignore +.svn/ +# Common backup files +*.swp +*.bak +*.tmp +*.orig +*~ +# Various IDEs +.project +.idea/ +*.tmproj +.vscode/ diff --git a/test/e2e/charts/flow-visibility/Chart.yaml b/test/e2e/charts/flow-visibility/Chart.yaml new file mode 100644 index 00000000000..95671ae8638 --- /dev/null +++ b/test/e2e/charts/flow-visibility/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: flow-visibility +description: This Helm chart is only for e2e test. +type: application +version: 0.1.0 +appVersion: "1.16.0" diff --git a/test/e2e/charts/flow-visibility/provisioning/tls/settings.xml b/test/e2e/charts/flow-visibility/provisioning/tls/settings.xml new file mode 100644 index 00000000000..e69a3accbad --- /dev/null +++ b/test/e2e/charts/flow-visibility/provisioning/tls/settings.xml @@ -0,0 +1,15 @@ + + 8443 + 9440 + + + /opt/certs/tls.crt + /opt/certs/tls.key + none + true + true + sslv2,sslv3 + true + + + diff --git a/test/e2e/charts/flow-visibility/templates/clickhouseinstallation.yaml b/test/e2e/charts/flow-visibility/templates/clickhouseinstallation.yaml new file mode 100644 index 00000000000..34f3699e514 --- /dev/null +++ b/test/e2e/charts/flow-visibility/templates/clickhouseinstallation.yaml @@ -0,0 +1,96 @@ +{{- define "clickhouse.tlsConfig" -}} +{{- $Files := .Files }} +{{- $Global := .Global }} +{{- range $path, $_ := .Files.Glob "provisioning/tls/*" }} +{{ regexReplaceAll "(.*)/" $path "" }}: | +{{ tpl ($.Files.Get $path) $Global | indent 2 }} +{{- end }} +{{- end -}} + +apiVersion: "clickhouse.altinity.com/v1" +kind: "ClickHouseInstallation" +metadata: + name: clickhouse + labels: + app: clickhouse + namespace: flow-visibility +spec: + configuration: + users: + clickhouse_operator/k8s_secret_password: flow-visibility/clickhouse-secret/password + clickhouse_operator/networks/ip: "::/0" + profiles: + readonly/readonly: 1 + {{- if .Values.secureConnection.enable }} + files: + {{- include "clickhouse.tlsConfig" (dict "Files" .Files "Global" .) | indent 6 }} + {{- end }} + clusters: + - name: "clickhouse" + {{- if .Values.secureConnection.enable }} + secure: "yes" + {{- end }} + settings: + tcp_port: 9000 + http_port: 8123 + {{- if .Values.secureConnection.enable }} + tcp_port_secure: 9440 + https_port: 8443 + {{- end }} + layout: + shardsCount: 1 + replicasCount: 1 + defaults: + templates: + podTemplate: pod-template + serviceTemplate: service-template + templates: + serviceTemplates: + - name: service-template + spec: + type: ClusterIP + ports: + - name: http + port: 8123 + - name: tcp + port: 9000 + {{- if .Values.secureConnection.enable }} + - name: https + port: 8443 + - name: secureclient + port: 9440 + {{- end }} + podTemplates: + - name: pod-template + spec: + containers: + - name: clickhouse + image: projects.registry.vmware.com/antrea/clickhouse-server:23.4 + imagePullPolicy: IfNotPresent + volumeMounts: + - name: clickhouse-configmap-volume + mountPath: /docker-entrypoint-initdb.d + - name: clickhouse-storage-volume + mountPath: /var/lib/clickhouse + {{- if .Values.secureConnection.enable }} + - name: clickhouse-tls + mountPath: /opt/certs/tls.crt + subPath: tls.crt + - name: clickhouse-tls + mountPath: /opt/certs/tls.key + subPath: tls.key + {{- end }} + volumes: + - name: clickhouse-configmap-volume + configMap: + name: clickhouse-mounted-configmap + - name: clickhouse-storage-volume + emptyDir: + medium: Memory + sizeLimit: 8Gi + {{- if .Values.secureConnection.enable }} + - name: clickhouse-tls + secret: + secretName: clickhouse-tls + optional: true + {{- end }} diff --git a/build/yamls/flow-visibility-e2e.yml b/test/e2e/charts/flow-visibility/templates/configmap.yaml similarity index 81% rename from build/yamls/flow-visibility-e2e.yml rename to test/e2e/charts/flow-visibility/templates/configmap.yaml index c95ba8e9cc1..611a2488a2a 100644 --- a/build/yamls/flow-visibility-e2e.yml +++ b/test/e2e/charts/flow-visibility/templates/configmap.yaml @@ -1,20 +1,3 @@ -# This manifest is used in the E2E test to check Flow Aggregator exporting flow records -# to ClickHouse database. It is autogenerated by scripts in Theia, please refer to -# https://github.com/antrea-io/theia/blob/main/hack/generate-manifest.sh with option -# '--mode antrea-e2e' to generate a new one. ---- -# Source: theia/templates/clickhouse/secret.yaml -apiVersion: v1 -kind: Secret -metadata: - name: clickhouse-secret - namespace: flow-visibility -type: Opaque -stringData: - username: clickhouse_operator - password: clickhouse_operator_password ---- -# Source: theia/templates/clickhouse/configmap.yaml apiVersion: v1 kind: ConfigMap metadata: @@ -23,7 +6,7 @@ metadata: data: create_table.sh: |- #!/usr/bin/env bash - + # Copyright 2022 Antrea Authors. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -37,10 +20,10 @@ data: # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - + set -e clickhouse client -n -h 127.0.0.1 <<-EOSQL - + CREATE TABLE IF NOT EXISTS flows ( timeInserted DateTime DEFAULT now(), flowStartSeconds DateTime, @@ -98,7 +81,7 @@ data: ORDER BY (timeInserted, flowEndSeconds) TTL timeInserted + INTERVAL 1 HOUR SETTINGS merge_with_ttl_timeout = 3600; - + CREATE MATERIALIZED VIEW IF NOT EXISTS flows_pod_view ENGINE = SummingMergeTree ORDER BY ( @@ -156,7 +139,7 @@ data: destinationPodNamespace, sourceTransportPort, destinationTransportPort; - + CREATE MATERIALIZED VIEW IF NOT EXISTS flows_node_view ENGINE = SummingMergeTree ORDER BY ( @@ -198,7 +181,7 @@ data: destinationNodeName, sourcePodNamespace, destinationPodNamespace; - + CREATE MATERIALIZED VIEW IF NOT EXISTS flows_policy_view ENGINE = SummingMergeTree ORDER BY ( @@ -273,7 +256,7 @@ data: destinationServicePort, destinationServicePortName, destinationIP; - + CREATE TABLE IF NOT EXISTS recommendations ( id String, type String, @@ -281,58 +264,5 @@ data: yamls String ) engine=MergeTree ORDER BY (timeCreated); - + EOSQL ---- -# Source: theia/templates/clickhouse/clickhouseinstallation.yaml -apiVersion: "clickhouse.altinity.com/v1" -kind: "ClickHouseInstallation" -metadata: - name: clickhouse - labels: - app: clickhouse - namespace: flow-visibility -spec: - configuration: - users: - clickhouse_operator/k8s_secret_password: flow-visibility/clickhouse-secret/password - clickhouse_operator/networks/ip: "::/0" - clusters: - - name: "clickhouse" - layout: - shardsCount: 1 - replicasCount: 1 - defaults: - templates: - podTemplate: pod-template - serviceTemplate: service-template - templates: - serviceTemplates: - - name: service-template - spec: - type: ClusterIP - ports: - - name: http - port: 8123 - - name: tcp - port: 9000 - podTemplates: - - name: pod-template - spec: - containers: - - name: clickhouse - image: projects.registry.vmware.com/antrea/clickhouse-server:23.4 - imagePullPolicy: IfNotPresent - volumeMounts: - - name: clickhouse-configmap-volume - mountPath: /docker-entrypoint-initdb.d - - name: clickhouse-storage-volume - mountPath: /var/lib/clickhouse - volumes: - - name: clickhouse-configmap-volume - configMap: - name: clickhouse-mounted-configmap - - name: clickhouse-storage-volume - emptyDir: - medium: Memory - sizeLimit: 8Gi diff --git a/test/e2e/charts/flow-visibility/templates/secret.yaml b/test/e2e/charts/flow-visibility/templates/secret.yaml new file mode 100644 index 00000000000..783d31794a6 --- /dev/null +++ b/test/e2e/charts/flow-visibility/templates/secret.yaml @@ -0,0 +1,33 @@ +{{- if .Values.secureConnection.enable }} +{{- $cert := genSelfSignedCert .Values.secureConnection.commonName .Values.secureConnection.ipAddresses (uniq (append .Values.secureConnection.dnsNames .Values.secureConnection.commonName)) (.Values.secureConnection.daysValid | int) }} +{{- $certPEM := $cert.Cert | b64enc }} +{{- $keyPEM := $cert.Key | b64enc }} +--- +apiVersion: v1 +kind: Secret +type: kubernetes.io/tls +metadata: + name: clickhouse-tls + namespace: flow-visibility +data: + tls.crt: {{ $certPEM | quote }} + tls.key: {{ $keyPEM | quote }} +--- +apiVersion: v1 +kind: Secret +metadata: + name: clickhouse-ca + namespace: flow-visibility +data: + ca.crt: {{ $certPEM | quote }} +{{- end }} +--- +apiVersion: v1 +kind: Secret +metadata: + name: clickhouse-secret + namespace: flow-visibility +type: Opaque +stringData: + username: clickhouse_operator + password: clickhouse_operator_password diff --git a/test/e2e/charts/flow-visibility/values.yaml b/test/e2e/charts/flow-visibility/values.yaml new file mode 100644 index 00000000000..0e4e7b5d246 --- /dev/null +++ b/test/e2e/charts/flow-visibility/values.yaml @@ -0,0 +1,11 @@ +# Settings for ClickHouse +secureConnection: + enable: false + # -- Common name to use in the certificate. + commonName: "clickhouse-clickhouse.flow-visibility.svc" + # -- IP addresses to use in the certificate. + ipAddresses: [] + # -- DNS names to use in the certificate. + dnsNames: [] + # -- Number of days for which the certificate will be valid. + daysValid: 365 diff --git a/test/e2e/fixtures.go b/test/e2e/fixtures.go index f78af3b6fc5..9cfbace71b5 100644 --- a/test/e2e/fixtures.go +++ b/test/e2e/fixtures.go @@ -249,7 +249,7 @@ func setupTest(tb testing.TB) (*TestData, error) { return testData, nil } -func setupTestForFlowAggregator(tb testing.TB) (*TestData, bool, bool, error) { +func setupTestForFlowAggregator(tb testing.TB, o flowVisibilityTestOptions) (*TestData, bool, bool, error) { v4Enabled := clusterInfo.podV4NetworkCIDR != "" v6Enabled := clusterInfo.podV6NetworkCIDR != "" testData, err := setupTest(tb) @@ -274,16 +274,18 @@ func setupTestForFlowAggregator(tb testing.TB) (*TestData, bool, bool, error) { ipfixCollectorAddr := fmt.Sprintf("%s:tcp", net.JoinHostPort(ipStr, ipfixCollectorPort)) tb.Logf("Deploying ClickHouse") - chSvcIP, err := testData.deployFlowVisibilityClickHouse() + chSvcIP, err := testData.deployFlowVisibilityClickHouse(o) if err != nil { return testData, v4Enabled, v6Enabled, err } tb.Logf("ClickHouse Service created with ClusterIP: %v", chSvcIP) tb.Logf("Applying flow aggregator YAML with ipfix collector: %s and clickHouse enabled", ipfixCollectorAddr) - if err := testData.deployFlowAggregator(ipfixCollectorAddr); err != nil { + + if err := testData.deployFlowAggregator(ipfixCollectorAddr, o); err != nil { return testData, v4Enabled, v6Enabled, err } + return testData, v4Enabled, v6Enabled, nil } diff --git a/test/e2e/flowaggregator_test.go b/test/e2e/flowaggregator_test.go index 47dac3e106c..e3c7c66ad05 100644 --- a/test/e2e/flowaggregator_test.go +++ b/test/e2e/flowaggregator_test.go @@ -162,11 +162,75 @@ type testFlow struct { dstPodName string } +func TestFlowAggregatorSecureConnection(t *testing.T) { + skipIfNotFlowVisibilityTest(t) + skipIfHasWindowsNodes(t) + testCases := []struct { + flowVisibilityTestOptions + name string + }{ + { + flowVisibilityTestOptions: flowVisibilityTestOptions{ + databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000", + secureConnection: false, + }, + name: "tcp", + }, + { + flowVisibilityTestOptions: flowVisibilityTestOptions{ + databaseURL: "http://clickhouse-clickhouse.flow-visibility.svc:8123", + secureConnection: false, + }, + name: "http", + }, + { + flowVisibilityTestOptions: flowVisibilityTestOptions{ + databaseURL: "tls://clickhouse-clickhouse.flow-visibility.svc:9440", + secureConnection: true, + }, + name: "tls", + }, + { + flowVisibilityTestOptions: flowVisibilityTestOptions{ + databaseURL: "https://clickhouse-clickhouse.flow-visibility.svc:8443", + secureConnection: true, + }, + name: "https", + }, + } + for _, o := range testCases { + data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, o.flowVisibilityTestOptions) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + t.Run(o.name, func(t *testing.T) { + defer func() { + teardownTest(t, data) + // Execute teardownFlowAggregator later than teardownTest to ensure that the log + // of Flow Aggregator has been exported. + teardownFlowAggregator(t, data) + }() + podAIPs, podBIPs, _, _, _, err := createPerftestPods(data) + if err != nil { + t.Fatalf("Error when creating perftest Pods: %v", err) + } + if v4Enabled { + checkIntraNodeFlows(t, data, podAIPs, podBIPs, false) + } + if v6Enabled { + checkIntraNodeFlows(t, data, podAIPs, podBIPs, false) + } + }) + } +} + func TestFlowAggregator(t *testing.T) { skipIfNotFlowVisibilityTest(t) skipIfHasWindowsNodes(t) - data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t) + data, v4Enabled, v6Enabled, err := setupTestForFlowAggregator(t, flowVisibilityTestOptions{ + databaseURL: defaultCHDatabaseURL, + }) if err != nil { t.Fatalf("Error when setting up test: %v", err) } @@ -197,6 +261,27 @@ func TestFlowAggregator(t *testing.T) { } +func checkIntraNodeFlows(t *testing.T, data *TestData, podAIPs, podBIPs *PodIPs, isIPv6 bool) { + np1, np2 := deployK8sNetworkPolicies(t, data, "perftest-a", "perftest-b") + defer func() { + if np1 != nil { + if err := data.deleteNetworkpolicy(np1); err != nil { + t.Errorf("Error when deleting network policy: %v", err) + } + } + if np2 != nil { + if err := data.deleteNetworkpolicy(np2); err != nil { + t.Errorf("Error when deleting network policy: %v", err) + } + } + }() + if !isIPv6 { + checkRecordsForFlows(t, data, podAIPs.ipv4.String(), podBIPs.ipv4.String(), isIPv6, true, false, true, false) + } else { + checkRecordsForFlows(t, data, podAIPs.ipv6.String(), podBIPs.ipv6.String(), isIPv6, true, false, true, false) + } +} + func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs, podEIPs *PodIPs, isIPv6 bool) { svcB, svcC, err := createPerftestServices(data, isIPv6) if err != nil { @@ -210,24 +295,7 @@ func testHelper(t *testing.T, data *TestData, podAIPs, podBIPs, podCIPs, podDIPs // and their flow information is exported as IPFIX flow records. // K8s network policies are being tested here. t.Run("IntraNodeFlows", func(t *testing.T) { - np1, np2 := deployK8sNetworkPolicies(t, data, "perftest-a", "perftest-b") - defer func() { - if np1 != nil { - if err = data.deleteNetworkpolicy(np1); err != nil { - t.Errorf("Error when deleting network policy: %v", err) - } - } - if np2 != nil { - if err = data.deleteNetworkpolicy(np2); err != nil { - t.Errorf("Error when deleting network policy: %v", err) - } - } - }() - if !isIPv6 { - checkRecordsForFlows(t, data, podAIPs.ipv4.String(), podBIPs.ipv4.String(), isIPv6, true, false, true, false) - } else { - checkRecordsForFlows(t, data, podAIPs.ipv6.String(), podBIPs.ipv6.String(), isIPv6, true, false, true, false) - } + checkIntraNodeFlows(t, data, podAIPs, podBIPs, isIPv6) }) // IntraNodeDenyConnIngressANP tests the case, where Pods are deployed on same Node with an Antrea ingress deny policy rule diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 8ba7b11c8df..a844dd3db79 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -84,6 +84,7 @@ const ( antreaWindowsDaemonSet string = "antrea-agent-windows" antreaDeployment string = "antrea-controller" flowAggregatorDeployment string = "flow-aggregator" + flowAggregatorCHSecret string = "clickhouse-ca" antreaDefaultGW string = "antrea-gw0" testAntreaIPAMNamespace string = "antrea-ipam-test" testAntreaIPAMNamespace11 string = "antrea-ipam-test-11" @@ -102,6 +103,7 @@ const ( flowAggregatorYML string = "flow-aggregator.yml" flowAggregatorCovYML string = "flow-aggregator-coverage.yml" flowVisibilityYML string = "flow-visibility.yml" + flowVisibilityTLSYML string = "flow-visibility-tls.yml" chOperatorYML string = "clickhouse-operator-install-bundle.yml" flowVisibilityCHPodName string = "chi-clickhouse-clickhouse-0-0-0" flowVisibilityNamespace string = "flow-visibility" @@ -142,6 +144,8 @@ const ( aggregatorClickHouseCommitInterval = 1 * time.Second statefulSetRestartAnnotationKey = "antrea-e2e/restartedAt" + + defaultCHDatabaseURL = "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" ) type ClusterNode struct { @@ -200,6 +204,11 @@ type TestOptions struct { deployAntrea bool } +type flowVisibilityTestOptions struct { + databaseURL string + secureConnection bool +} + var testOptions TestOptions // podInfo combines OS info with a Pod name. It is useful when choosing commands and options on Pods of different OS (Windows, Linux). @@ -785,18 +794,23 @@ func (data *TestData) deployAntrea(option deployAntreaOptions) error { } // deployFlowVisibilityClickHouse deploys ClickHouse operator and DB. -func (data *TestData) deployFlowVisibilityClickHouse() (string, error) { +func (data *TestData) deployFlowVisibilityClickHouse(o flowVisibilityTestOptions) (string, error) { err := data.CreateNamespace(flowVisibilityNamespace, nil) if err != nil { return "", err } + visibilityYML := flowVisibilityYML + if o.secureConnection { + visibilityYML = flowVisibilityTLSYML + } + rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", chOperatorYML)) if err != nil || rc != 0 { return "", fmt.Errorf("error when deploying the ClickHouse Operator YML; %s not available on the control-plane Node", chOperatorYML) } if err := wait.Poll(2*time.Second, 10*time.Second, func() (bool, error) { - rc, stdout, stderr, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", flowVisibilityYML)) + rc, stdout, stderr, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply -f %s", visibilityYML)) if err != nil || rc != 0 { // ClickHouseInstallation CRD from ClickHouse Operator install bundle applied soon before // applying CR. Sometimes apiserver validation fails to recognize resource of @@ -804,7 +818,7 @@ func (data *TestData) deployFlowVisibilityClickHouse() (string, error) { if strings.Contains(stderr, "ClickHouseInstallation") || strings.Contains(stdout, "ClickHouseInstallation") { return false, nil } - return false, fmt.Errorf("error when deploying the flow visibility YML %s: %s, %s, %v", flowVisibilityYML, stdout, stderr, err) + return false, fmt.Errorf("error when deploying the flow visibility YML %s: %s, %s, %v", visibilityYML, stdout, stderr, err) } return true, nil }); err != nil { @@ -871,7 +885,8 @@ func (data *TestData) deleteClickHouseOperator() error { } // deployFlowAggregator deploys the Flow Aggregator with ipfix collector and clickHouse address. -func (data *TestData) deployFlowAggregator(ipfixCollector string) error { +func (data *TestData) deployFlowAggregator(ipfixCollector string, o flowVisibilityTestOptions) error { + flowAggYaml := flowAggregatorYML if testOptions.enableCoverage { flowAggYaml = flowAggregatorCovYML @@ -880,7 +895,27 @@ func (data *TestData) deployFlowAggregator(ipfixCollector string) error { if err != nil || rc != 0 { return fmt.Errorf("error when deploying the Flow Aggregator; %s not available on the control-plane Node", flowAggYaml) } - if err = data.mutateFlowAggregatorConfigMap(ipfixCollector); err != nil { + // clickhouse-ca Secret is created in the flow-visibility Namespace. In order to make it accessible to the Flow Aggregator, + // we copy it from Namespace flow-visibility to Namespace flow-aggregator when secureConnection is true. + if o.secureConnection { + secret, err := data.clientset.CoreV1().Secrets(flowVisibilityNamespace).Get(context.TODO(), flowAggregatorCHSecret, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("unable to get Secret with name %s in Namespace %s: %v", flowAggregatorCHSecret, flowVisibilityNamespace, err) + } + newSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: flowAggregatorNamespace, + Name: flowAggregatorCHSecret, + }, + Data: secret.Data, + } + _, err = data.clientset.CoreV1().Secrets(flowAggregatorNamespace).Create(context.TODO(), newSecret, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("unable to create Secret with name %s in Namespace %s: %v", flowAggregatorCHSecret, flowAggregatorNamespace, err) + } + } + + if err = data.mutateFlowAggregatorConfigMap(ipfixCollector, o); err != nil { return err } if rc, _, _, err = data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s rollout status deployment/%s --timeout=%v", flowAggregatorNamespace, flowAggregatorDeployment, 2*defaultTimeout)); err != nil || rc != 0 { @@ -899,7 +934,7 @@ func (data *TestData) deployFlowAggregator(ipfixCollector string) error { return nil } -func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollectorAddr string) error { +func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollectorAddr string, o flowVisibilityTestOptions) error { configMap, err := data.GetFlowAggregatorConfigMap() if err != nil { return err @@ -921,6 +956,10 @@ func (data *TestData) mutateFlowAggregatorConfigMap(ipfixCollectorAddr string) e flowAggregatorConf.ActiveFlowRecordTimeout = aggregatorActiveFlowRecordTimeout.String() flowAggregatorConf.InactiveFlowRecordTimeout = aggregatorInactiveFlowRecordTimeout.String() flowAggregatorConf.RecordContents.PodLabels = true + flowAggregatorConf.ClickHouse.DatabaseURL = o.databaseURL + if o.secureConnection { + flowAggregatorConf.ClickHouse.TLS.CACert = true + } b, err := yaml.Marshal(&flowAggregatorConf) if err != nil {