diff --git a/build/charts/flow-aggregator/conf/flow-aggregator.conf b/build/charts/flow-aggregator/conf/flow-aggregator.conf index c61c3856866..ada47f68dd7 100644 --- a/build/charts/flow-aggregator/conf/flow-aggregator.conf +++ b/build/charts/flow-aggregator/conf/flow-aggregator.conf @@ -72,9 +72,23 @@ clickHouse: # Database is the name of database where Antrea "flows" table is created. database: "default" - # DatabaseURL is the url to the database. TCP protocol is required. + # DatabaseURL is the url to the database. TCP/HTTP/HTTPS protocols are supported. databaseURL: {{ .Values.clickHouse.databaseURL | quote }} + # tls contains tls related configuration options, which is used to connect to clickhouse service. + tls: + # Enable is the switch to enable TLS when connecting to ClickHouse server. + enable: {{ .Values.clickHouse.tls.enable }} + + # InsecureSkipVerify determine whether a client verifies the server's certificate chain and host name. + # Default is false. + insecureSkipVerify: {{ .Values.clickHouse.tls.insecureSkipVerify }} + + # Indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + customCACert: {{ .Values.clickHouse.tls.customCACert }} + # Debug enables debug logs from ClickHouse sql driver. debug: {{ .Values.clickHouse.debug }} diff --git a/build/charts/flow-aggregator/templates/deployment.yaml b/build/charts/flow-aggregator/templates/deployment.yaml index fd4f438de5a..db755d9bccb 100644 --- a/build/charts/flow-aggregator/templates/deployment.yaml +++ b/build/charts/flow-aggregator/templates/deployment.yaml @@ -79,6 +79,8 @@ spec: readOnly: true - mountPath: /var/log/antrea/flow-aggregator name: host-var-log-antrea-flow-aggregator + - name: clickhouse-ca + mountPath: /etc/ssl/certs nodeSelector: kubernetes.io/os: linux kubernetes.io/arch: amd64 @@ -91,3 +93,9 @@ spec: hostPath: path: /var/log/antrea/flow-aggregator type: DirectoryOrCreate + # Make it optional as we only read it when customCACert=true. + - name: clickhouse-ca + secret: + secretName: clickhouse-ca + defaultMode: 0400 + optional: true diff --git a/build/charts/flow-aggregator/values.yaml b/build/charts/flow-aggregator/values.yaml index e88b9ea321a..0fa5b9c069a 100644 --- a/build/charts/flow-aggregator/values.yaml +++ b/build/charts/flow-aggregator/values.yaml @@ -45,8 +45,18 @@ flowCollector: clickHouse: # -- Determine whether to enable exporting flow records to ClickHouse. enable: false - # -- DatabaseURL is the url to the database. TCP protocol is required. + # -- DatabaseURL is the url to the database. TCP/HTTP/HTTPS protocols are supported. databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" + # -- tls contains tls related configuration options, which is used to connect to clickhouse service. + tls: + # -- Determine whether to enable tls. + enable: false + # -- Determine whether a client verifies the server's certificate chain and host name. Default is false. + insecureSkipVerify: false + # -- Indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + customCACert: false # -- Debug enables debug logs from ClickHouse sql driver. debug: false # -- Compress enables lz4 compression when committing flow records. diff --git a/build/yamls/flow-aggregator.yml b/build/yamls/flow-aggregator.yml index 09d1e690ed5..6dd6871f1c9 100644 --- a/build/yamls/flow-aggregator.yml +++ b/build/yamls/flow-aggregator.yml @@ -224,9 +224,23 @@ data: # Database is the name of database where Antrea "flows" table is created. database: "default" - # DatabaseURL is the url to the database. TCP protocol is required. + # DatabaseURL is the url to the database. TCP/HTTP/HTTPS protocols are supported. databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" + # tls contains tls related configuration options, which is used to connect to clickhouse service. + tls: + # Enable is the switch to enable TLS when connecting to ClickHouse server. + enable: false + + # InsecureSkipVerify determine whether a client verifies the server's certificate chain and host name. + # Default is false. + insecureSkipVerify: false + + # Indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + customCACert: false + # Debug enables debug logs from ClickHouse sql driver. debug: false @@ -434,6 +448,8 @@ spec: readOnly: true - mountPath: /var/log/antrea/flow-aggregator name: host-var-log-antrea-flow-aggregator + - mountPath: /etc/ssl/certs + name: clickhouse-ca nodeSelector: kubernetes.io/arch: amd64 kubernetes.io/os: linux @@ -446,3 +462,8 @@ spec: path: /var/log/antrea/flow-aggregator type: DirectoryOrCreate name: host-var-log-antrea-flow-aggregator + - name: clickhouse-ca + secret: + defaultMode: 256 + optional: true + secretName: clickhouse-ca diff --git a/docs/network-flow-visibility.md b/docs/network-flow-visibility.md index 8ca6be22f33..dd8db7ecd7f 100644 --- a/docs/network-flow-visibility.md +++ b/docs/network-flow-visibility.md @@ -17,6 +17,10 @@ - [Flow Aggregator](#flow-aggregator) - [Deployment](#deployment) - [Configuration](#configuration-1) + - [Support secure connections to ClickHouse database](#support-secure-connections-to-clickhouse-database) + - [Using kubectl](#using-kubectl) + - [Configuration for ClickHouse pre Antrea v1.13](#configuration-for-clickhouse-pre-antrea-v113) + - [Example of flow-aggregator.conf:](#example-of-flow-aggregatorconf) - [IPFIX Information Elements (IEs) in an Aggregated Flow Record](#ipfix-information-elements-ies-in-an-aggregated-flow-record) - [IEs from Antrea IE Registry](#ies-from-antrea-ie-registry-1) - [Supported Capabilities](#supported-capabilities-1) @@ -283,7 +287,42 @@ it is deployed following the [deployment steps](#deployment-steps-1), the ClickHouse server is already exposed via a K8s Service, and no further configuration is required. If a different FQDN or IP is desired, please use the URL for `clickHouse.databaseURL` in the following format: -`tcp://:`. +`://:`. + +#### Support secure connections to ClickHouse database + +Since Antrea v1.13, you can enable TLS when connecting to ClickHouse Server by setting `clickHouse.tls.enable` to `true`. +You can also change the value of `clickHouse.tls.insecureSkipVerify` to determine whether a client verifies the +server's certificate. +If you want to provide a custom CA certificate, you can set `clickHouse.tls.customizedCACert` to `true` and +the flow-aggregator will read the certificate key pair from the `clickhouse-ca` Secret. + +Make sure to follow the following form when creating the `clickhouse-ca` Secret with the custom CA certificate: + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: clickhouse-ca + namespace: flow-aggregator +data: + ca.crt: +``` + +#### Using kubectl + +You can use `kubectl apply -f ` to create the above secret, or use `kubectl create secret`: + +```bash +kubectl create secret generic clickhouse-ca -n flow-aggregator --from-file=ca.crt= +``` + +#### Configuration for ClickHouse pre Antrea v1.13 + +We don't support secure connection to ClickHouse database prior to Antrea v1.13. We only support TCP when connecting to +ClickHouse server from Flow-Aggregator. + +#### Example of flow-aggregator.conf ```yaml flow-aggregator.conf: | @@ -357,9 +396,23 @@ flow-aggregator.conf: | # Database is the name of database where Antrea "flows" table is created. database: "default" - # DatabaseURL is the url to the database. TCP protocol is required. + # DatabaseURL is the url to the database. TCP/HTTP/HTTPS protocols are supported. databaseURL: "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" + # tls contains tls related configuration options, which is used to connect to clickhouse service. + tls: + # Enable is the switch to enable TLS when connecting to ClickHouse server. + enable: false + + # InsecureSkipVerify determine whether a client verifies the server's certificate chain and host name. + # Default is false. + insecureSkipVerify: false + + # Indicates whether to use custom CA certificate. Default root CAs will be used if this field is false. + # If true, a Secret named "clickhouse-ca" must be provided with the following keys: + # ca.crt: + customCACert: false + # Debug enables debug logs from ClickHouse sql driver. debug: false diff --git a/pkg/config/flowaggregator/config.go b/pkg/config/flowaggregator/config.go index 4aacc7cbd71..ecffd487453 100644 --- a/pkg/config/flowaggregator/config.go +++ b/pkg/config/flowaggregator/config.go @@ -95,7 +95,7 @@ type ClickHouseConfig struct { Enable bool `yaml:"enable,omitempty"` // Database is the name of database where Antrea "flows" table is created. Database string `yaml:"database,omitempty"` - // DatabaseURL is the url to the database. TCP protocol is required. + // DatabaseURL is the url to the database. TCP/HTTP/HTTPS protocols are supported. // Defaults to "tcp://clickhouse-clickhouse.flow-visibility.svc:9000" DatabaseURL string `yaml:"databaseURL,omitempty"` // Debug enables debug logs from ClickHouse sql driver. Defaults to false. @@ -106,6 +106,20 @@ type ClickHouseConfig struct { // Defaults to "8s". Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". // Min value allowed is "1s". CommitInterval string `yaml:"commitInterval,omitempty"` + // tls contains tls related configuration options, which is used to connect to clickhouse service. + TLS TLSConfig `yaml:"tls,omitempty"` +} + +type TLSConfig struct { + // Enable is the switch to enable TLS when connecting to ClickHouse server. + Enable bool `yaml:"enable,omitempty"` + // InsecureSkipVerify determine whether a client verifies the server's certificate chain and host name. + // Default is false. + InsecureSkipVerify bool `yaml:"insecureSkipVerify,omitempty"` + // CustomCACert determine whether to use custom CA certificate. Default root CAs will be used if false. + // If true, a Secret named "flow-aggregator-ca" must be provided with the following keys: + // ca.crt: + CustomCACert bool `yaml:"customCACert,omitempty"` } type S3UploaderConfig struct { diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go index edd57a27fbd..043a521d201 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient.go @@ -16,6 +16,8 @@ package clickhouseclient import ( "context" + "crypto/tls" + "crypto/x509" "database/sql" "fmt" "net/url" @@ -92,13 +94,6 @@ const ( // PrepareClickHouseConnection is used for unit testing var PrepareClickHouseConnection = prepareConnection -type protocol string - -const ( - protocolTCP = "tcp" - protocolUnknown = "" -) - type stopPayload struct { flushQueue bool } @@ -126,13 +121,17 @@ type ClickHouseExportProcess struct { } type ClickHouseConfig struct { - Username string - Password string - Database string - DatabaseURL string - Debug bool - Compress *bool - CommitInterval time.Duration + Username string + Password string + Database string + DatabaseURL string + Debug bool + Compress *bool + CommitInterval time.Duration + TLSEnable bool + CustomCACert bool + InsecureSkipVerify bool + CACert []byte } func NewClickHouseClient(config ClickHouseConfig, clusterUUID string) (*ClickHouseExportProcess, error) { @@ -366,7 +365,7 @@ func (ch *ClickHouseExportProcess) pushRecordsToFrontOfQueue(records []*flowreco func prepareConnection(config ClickHouseConfig) (*sql.DB, error) { connect, err := ConnectClickHouse(&config) if err != nil { - return nil, err + return nil, fmt.Errorf("error when connecting to ClickHouse, %w", err) } // Test open Transaction tx, err := connect.Begin() @@ -411,9 +410,9 @@ func (ch *ClickHouseExportProcess) GetClickHouseConfig() ClickHouseConfig { } func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { - _, addr, err := parseDatabaseURL(config.DatabaseURL) + proc, addr, err := parseDatabaseURL(config.DatabaseURL, config.TLSEnable) if err != nil { - return nil, err + return nil, fmt.Errorf("error when parsing database url: %w", err) } var connect *sql.DB var connErr error @@ -430,6 +429,7 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { Password: config.Password, Database: config.Database, }, + Protocol: proc, } var compression clickhouse.CompressionMethod if *config.Compress { @@ -438,7 +438,20 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { opt.Compression = &clickhouse.Compression{ Method: compression, } - + if config.TLSEnable { // #nosec G402: ignore insecure options + opt.TLS = &tls.Config{ + InsecureSkipVerify: config.InsecureSkipVerify, + } + if config.CustomCACert { + caCertPool := x509.NewCertPool() + successful := caCertPool.AppendCertsFromPEM(config.CACert) + if !successful { + connErr = fmt.Errorf("failed to add custom CA certification") + return false, nil + } + opt.TLS.RootCAs = caCertPool + } + } connect = clickhouse.OpenDB(&opt) if err := connect.Ping(); err != nil { if exception, ok := err.(*clickhouse.Exception); ok { @@ -456,19 +469,28 @@ func ConnectClickHouse(config *ClickHouseConfig) (*sql.DB, error) { return connect, nil } -func parseDatabaseURL(dbUrl string) (protocol, string, error) { +func parseDatabaseURL(dbUrl string, secure bool) (clickhouse.Protocol, string, error) { u, err := url.Parse(dbUrl) if err != nil { - return protocolUnknown, "", fmt.Errorf("failed to parse ClickHouse database URL: %w", err) + return -1, "", fmt.Errorf("failed to parse ClickHouse database URL: %w", err) } if u.Path != "" || u.RawQuery != "" || u.User != nil { - return protocolUnknown, "", fmt.Errorf("invalid ClickHouse database URL '%s': path, query string or user info should not be set", dbUrl) + return -1, "", fmt.Errorf("invalid ClickHouse database URL '%s': path, query string or user info should not be set", dbUrl) } - proto := u.Scheme - switch proto { - case protocolTCP: - return protocolTCP, u.Host, nil + switch u.Scheme { + case "http": + if secure { + return -1, u.Host, fmt.Errorf("invalid ClickHouse setting: http with TLS enabled") + } + return clickhouse.HTTP, u.Host, nil + case "https": + if !secure { + return -1, u.Host, fmt.Errorf("invalid ClickHouse setting: https without TLS enabled") + } + return clickhouse.HTTP, u.Host, nil + case "tcp": + return clickhouse.Native, u.Host, nil default: - return protocolUnknown, "", fmt.Errorf("connection over %s transport protocol is not supported", proto) + return -1, "", fmt.Errorf("connection over %s transport protocol is not supported", u.Scheme) } } diff --git a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go index 8ee3fdb1e69..55e7a84e97c 100644 --- a/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go +++ b/pkg/flowaggregator/clickhouseclient/clickhouseclient_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/ClickHouse/clickhouse-go/v2" "github.com/DATA-DOG/go-sqlmock" "github.com/gammazero/deque" "github.com/golang/mock/gomock" @@ -337,13 +338,14 @@ func TestUpdateCH(t *testing.T) { func TestParseDatabaseURL(t *testing.T) { testcases := []struct { url string - expectedProto protocol + expectedProto clickhouse.Protocol expectedAddr string expectedErrMsg string + secure bool }{ { url: "tcp://127.0.0.1:9000", - expectedProto: protocolTCP, + expectedProto: clickhouse.Native, expectedAddr: "127.0.0.1:9000", expectedErrMsg: "", }, @@ -367,9 +369,31 @@ func TestParseDatabaseURL(t *testing.T) { url: "tcp://127.0.0.1:9000?key=value", expectedErrMsg: "invalid ClickHouse database URL", }, + { + url: "http://127.0.0.1:9000", + expectedProto: clickhouse.HTTP, + expectedAddr: "127.0.0.1:9000", + expectedErrMsg: "", + }, + { + url: "http://127.0.0.1:9000", + expectedErrMsg: "invalid ClickHouse setting: http with TLS enabled", + secure: true, + }, + { + url: "https://127.0.0.1:9000", + expectedProto: clickhouse.HTTP, + expectedAddr: "127.0.0.1:9000", + expectedErrMsg: "", + secure: true, + }, + { + url: "https://127.0.0.1:9000", + expectedErrMsg: "invalid ClickHouse setting: https without TLS enabled", + }, } for _, tc := range testcases { - proto, addr, err := parseDatabaseURL(tc.url) + proto, addr, err := parseDatabaseURL(tc.url, tc.secure) if tc.expectedErrMsg != "" { assert.Contains(t, err.Error(), tc.expectedErrMsg) } else { diff --git a/pkg/flowaggregator/exporter/clickhouse.go b/pkg/flowaggregator/exporter/clickhouse.go index 3d42da9aa04..35c5cb22eee 100644 --- a/pkg/flowaggregator/exporter/clickhouse.go +++ b/pkg/flowaggregator/exporter/clickhouse.go @@ -15,7 +15,9 @@ package exporter import ( + "fmt" "os" + "path" "reflect" ipfixentities "github.com/vmware/go-ipfix/pkg/entities" @@ -31,21 +33,37 @@ type ClickHouseExporter struct { chExportProcess *clickhouseclient.ClickHouseExportProcess } +const ( + CACertFile = "ca.crt" + CertDir = "/etc/ssl/certs" +) + func buildClickHouseConfig(opt *options.Options) clickhouseclient.ClickHouseConfig { return clickhouseclient.ClickHouseConfig{ - Username: os.Getenv("CH_USERNAME"), - Password: os.Getenv("CH_PASSWORD"), - Database: opt.Config.ClickHouse.Database, - DatabaseURL: opt.Config.ClickHouse.DatabaseURL, - Debug: opt.Config.ClickHouse.Debug, - Compress: opt.Config.ClickHouse.Compress, - CommitInterval: opt.ClickHouseCommitInterval, + Username: os.Getenv("CH_USERNAME"), + Password: os.Getenv("CH_PASSWORD"), + Database: opt.Config.ClickHouse.Database, + DatabaseURL: opt.Config.ClickHouse.DatabaseURL, + Debug: opt.Config.ClickHouse.Debug, + Compress: opt.Config.ClickHouse.Compress, + CommitInterval: opt.ClickHouseCommitInterval, + TLSEnable: opt.Config.ClickHouse.TLS.Enable, + CustomCACert: opt.Config.ClickHouse.TLS.CustomCACert, + InsecureSkipVerify: opt.Config.ClickHouse.TLS.InsecureSkipVerify, } } func NewClickHouseExporter(k8sClient kubernetes.Interface, opt *options.Options) (*ClickHouseExporter, error) { chConfig := buildClickHouseConfig(opt) - klog.InfoS("ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval) + klog.InfoS("ClickHouse configuration", "database", chConfig.Database, "databaseURL", chConfig.DatabaseURL, "debug", chConfig.Debug, "compress", *chConfig.Compress, "commitInterval", chConfig.CommitInterval, "TLSEnable", chConfig.TLSEnable, "customCACert", chConfig.CustomCACert) + if chConfig.TLSEnable && chConfig.CustomCACert { + caCertPath := path.Join(CertDir, CACertFile) + caCert, err := os.ReadFile(caCertPath) + if err != nil { + return nil, fmt.Errorf("error when getting CA certificate") + } + chConfig.CACert = caCert + } clusterUUID, err := getClusterUUID(k8sClient) if err != nil { return nil, err