From 7bb1b1140cd0a2fb319bf95b84226dc4673f8273 Mon Sep 17 00:00:00 2001 From: Tushar Tathgur Date: Mon, 17 Jul 2023 17:43:06 -0700 Subject: [PATCH] Addressed comments Signed-off-by: Tushar Tathgur --- docs/throughput-anomaly-detection.md | 24 +-- .../commands/anomaly_detection_retrieve.go | 2 +- .../anomaly_detection_retrieve_test.go | 6 +- .../commands/anomaly_detection_run_test.go | 129 ++++++++++++- .../anomaly-detection/anomaly_detection.py | 4 +- test/e2e/framework.go | 4 +- test/e2e/throughputanomalydetection_test.go | 171 +++++++++--------- test/e2e/upgrade_test.go | 11 ++ 8 files changed, 248 insertions(+), 103 deletions(-) diff --git a/docs/throughput-anomaly-detection.md b/docs/throughput-anomaly-detection.md index faf1102fe..1402d9b6d 100644 --- a/docs/throughput-anomaly-detection.md +++ b/docs/throughput-anomaly-detection.md @@ -69,28 +69,28 @@ $ theia throughput-anomaly-detection run --algo "ARIMA" Successfully started Throughput Anomaly Detection job with name tad-1234abcd-1234-abcd-12ab-12345678abcd ``` -Throughput Anomaly Detection also provide support for aggregated throughput +Throughput Anomaly Detection also provides support for aggregated throughput anomaly detection. There are three different types of aggregations that are included. -- `external` : Aggregated flow for inbound traffic to external IP, +- `external` : Aggregated flows for inbound traffic to external IP, user could provide external-IP using `external-ip` argument for further filtering. -- `pod`: Aggregated flow for inbound/outbound pod traffic. -- `svc`: Aggregated flow for traffic to service port, user could +- `pod`: Aggregated flows for inbound/outbound pod traffic. +- `svc`: Aggregated flows for traffic to service port, user could provide a destination port name using `svc-name-port` argument for further filtering. For aggregated flow `pod`, user can provide the following filter arguments. -- `pod-label`: The argument aggregates inbound/outbound traffic using pod +- `pod-label`: The argument aggregates inbound/outbound traffic using Pod labels. -- `pod-name`: The argument aggregates inbound/outbound traffic using pod name. +- `pod-name`: The argument aggregates inbound/outbound traffic using Pod name. - `pod-namespace`: The argument aggregates inbound/outbound traffic using - podnamespace. However, this argument only works as a combination to any of + Pod namespace. However, this argument only works as a combination to any of the above two arguments and can not be used alone. -All the above aggregated flow are executed using the following command: +To start an aggregated throughput anomaly detection, please run the following command : ```bash $ theia throughput-anomaly-detection run --algo "ARIMA" --agg-flow pod --pod-label \"test_key\":\"test_value\" @@ -139,14 +139,14 @@ in table format, run: ```bash $ theia throughput-anomaly-detection retrieve tad-1234abcd-1234-abcd-12ab-12345678abcd id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:06:54Z 4.005703059e+09 e2e ARIMA 1.0001208441920074e+10 true -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:24:54Z 1.0004969097e+10 e2e ARIMA 4.006432886406564e+09 true -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:34:54Z 5.0007861276e+10 e2e ARIMA 3.9735067954945493e+09 true +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:06:54Z 4.005703059e+09 None ARIMA 1.0001208441920074e+10 true +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:24:54Z 1.0004969097e+10 None ARIMA 4.006432886406564e+09 true +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:34:54Z 5.0007861276e+10 None ARIMA 3.9735067954945493e+09 true ``` Aggregated Throughput Anomaly Detection has different columns based of the aggregation type. -e.g. agg-type pod output +e.g. when aggregation type is svc, the output is the following ```bash $ theia throughput-anomaly-detection retrieve tad-5ca4413d-6730-463e-8f95-86032ba28a4f diff --git a/pkg/theia/commands/anomaly_detection_retrieve.go b/pkg/theia/commands/anomaly_detection_retrieve.go index 34eb97818..29076f966 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve.go +++ b/pkg/theia/commands/anomaly_detection_retrieve.go @@ -106,7 +106,7 @@ func throughputAnomalyDetectionRetrieve(cmd *cobra.Command, args []string) error } else { var result [][]string switch tad.Stats[0].AggType { - case "e2e": + case "None": result = append(result, []string{"id", "sourceIP", "sourceTransportPort", "destinationIP", "destinationTransportPort", "flowStartSeconds", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"}) for _, p := range tad.Stats { result = append(result, []string{p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly}) diff --git a/pkg/theia/commands/anomaly_detection_retrieve_test.go b/pkg/theia/commands/anomaly_detection_retrieve_test.go index 55c776daf..0bf3cf78f 100644 --- a/pkg/theia/commands/anomaly_detection_retrieve_test.go +++ b/pkg/theia/commands/anomaly_detection_retrieve_test.go @@ -45,7 +45,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { filePath string }{ { - name: "Valid case e2e", + name: "Valid case No agg_type", testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch strings.TrimSpace(r.URL.Path) { case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName): @@ -57,7 +57,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { Id: tadName, Anomaly: "true", AlgoCalc: "1234567", - AggType: "e2e", + AggType: "None", }}, } w.Header().Set("Content-Type", "application/json") @@ -66,7 +66,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) { } })), tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd", - expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"}, + expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "None 1234567 true"}, expectedErrorMsg: "", }, { diff --git a/pkg/theia/commands/anomaly_detection_run_test.go b/pkg/theia/commands/anomaly_detection_run_test.go index 814b7ff42..7c7373438 100644 --- a/pkg/theia/commands/anomaly_detection_run_test.go +++ b/pkg/theia/commands/anomaly_detection_run_test.go @@ -129,7 +129,7 @@ func TestAnomalyDetectionRun(t *testing.T) { cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") - cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("agg-flow", "external", "") cmd.Flags().String("pod-label", "app:label1", "") cmd.Flags().String("pod-name", "testpodname", "") cmd.Flags().String("pod-namespace", "testpodnamespace", "") @@ -236,6 +236,38 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) { name: "Invalid executor-memory", expectedErrorMsg: "executor-memory should conform to the Kubernetes resource quantity convention", }, + { + name: "Unspecified agg-flow", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified pod-label", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified pod-name", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified pod-namespace", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Invalid pod-namespace", + expectedErrorMsg: "argument can not be used alone", + }, + { + name: "Unspecified external-ip", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Unspecified svc-port-name", + expectedErrorMsg: ErrorMsgUnspecifiedCase, + }, + { + name: "Invalid agg-flow", + expectedErrorMsg: "aggregated flow type should be 'pod' or 'external' or 'svc'", + }, { name: "Unspecified use-cluster-ip", expectedErrorMsg: ErrorMsgUnspecifiedCase, @@ -346,6 +378,99 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) { cmd.Flags().String("driver-memory", "1m", "") cmd.Flags().String("executor-core-request", "1", "") cmd.Flags().String("executor-memory", "mock_executor-memory", "") + case "Unspecified agg-flow": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + case "Unspecified pod-label": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + case "Unspecified pod-name": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "mock_pod-label", "") + case "Unspecified pod-namespace": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "mock_pod_label", "") + cmd.Flags().String("pod-name", "mock_pod-name", "") + case "Invalid pod-namespace": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "pod", "") + cmd.Flags().String("pod-label", "", "") + cmd.Flags().String("pod-name", "", "") + cmd.Flags().String("pod-namespace", "mock_pod-namespace", "") + case "Unspecified external-ip": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "external", "") + case "Unspecified svc-port-name": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "svc", "") + case "Invalid agg-flow": + cmd.Flags().String("algo", "ARIMA", "") + cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") + cmd.Flags().String("end-time", "2006-01-03 16:04:05", "") + cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "") + cmd.Flags().Int32("executor-instances", 1, "") + cmd.Flags().String("driver-core-request", "1", "") + cmd.Flags().String("driver-memory", "1m", "") + cmd.Flags().String("executor-core-request", "1", "") + cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "mock_agg-flow", "") case "Unspecified use-cluster-ip": cmd.Flags().String("algo", "ARIMA", "") cmd.Flags().String("start-time", "2006-01-02 15:04:05", "") @@ -356,6 +481,8 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) { cmd.Flags().String("driver-memory", "1m", "") cmd.Flags().String("executor-core-request", "1", "") cmd.Flags().String("executor-memory", "1m", "") + cmd.Flags().String("agg-flow", "svc", "") + cmd.Flags().String("svc-port-name", "mock_svc_name", "") } err := throughputAnomalyDetectionAlgo(cmd, []string{}) if tt.expectedErrorMsg == "" { diff --git a/plugins/anomaly-detection/anomaly_detection.py b/plugins/anomaly-detection/anomaly_detection.py index dc5756ea4..c0c425895 100644 --- a/plugins/anomaly-detection/anomaly_detection.py +++ b/plugins/anomaly-detection/anomaly_detection.py @@ -395,7 +395,7 @@ def filter_df_with_true_anomalies( if ret_plot.count() == 0: ret_plot = ret_plot.collect() if agg_flow == "": - agg_type = "e2e" + agg_type = "None" else: agg_type = agg_flow ret_plot.append({ @@ -624,7 +624,7 @@ def assign_flow_type(prepared_DF, agg_flow=None, direction=None): elif agg_flow == "pod": prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod")) else: - prepared_DF = prepared_DF.withColumn('aggType', f.lit("e2e")) + prepared_DF = prepared_DF.withColumn('aggType', f.lit("None")) return prepared_DF diff --git a/test/e2e/framework.go b/test/e2e/framework.go index c94c63fd9..821bdb604 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -26,7 +26,6 @@ import ( "regexp" "strconv" "strings" - "sync" "testing" "time" @@ -164,7 +163,6 @@ const ( var ( errConnectionLost = fmt.Errorf("http2: client connection lost") clickHousePodName = fmt.Sprintf("%s-0-0-0", clickHousePodNamePrefix) - mutex sync.Mutex ) type FlowVisibiltiySetUpConfig struct { @@ -250,7 +248,7 @@ func (data *TestData) deployAntreaCommon(yamlFile string, extraOptions string, w // See https://kubernetes.io/docs/reference/using-api/api-concepts/#server-side-apply rc, _, _, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl apply %s -f %s", extraOptions, yamlFile)) if err != nil || rc != 0 { - return fmt.Errorf("error when deploying Antrea; is %s available on the control-plane Node?", yamlFile) + return fmt.Errorf("error when deploying Antrea; is %s available on the control-plane Node? err: %v", yamlFile, err) } rc, stdout, stderr, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), fmt.Sprintf("kubectl -n %s rollout status deploy/%s --timeout=%v", antreaNamespace, antreaDeployment, defaultTimeout)) if err != nil || rc != 0 { diff --git a/test/e2e/throughputanomalydetection_test.go b/test/e2e/throughputanomalydetection_test.go index dc60eb042..9cbac0833 100644 --- a/test/e2e/throughputanomalydetection_test.go +++ b/test/e2e/throughputanomalydetection_test.go @@ -38,6 +38,8 @@ const ( tadretrieveCmd = "./theia throughput-anomaly-detection retrieve" ) +var e2emutex sync.Mutex + func TestAnomalyDetection(t *testing.T) { config := FlowVisibiltiySetUpConfig{ withSparkOperator: true, @@ -101,7 +103,7 @@ func prepareFlowTable(t *testing.T, connect *sql.DB) { // Example output: Successfully created Throughput Anomaly Detection job with name tad-eec9d1be-7204-4d50-8f57-d9c8757a2668 func testThroughputAnomalyDetectionAlgo(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - stdout, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") + stdout, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) assert := assert.New(t) assert.Containsf(stdout, fmt.Sprintf("Successfully started Throughput Anomaly Detection job with name: %s", jobName), "stdout: %s", stdout) @@ -114,7 +116,7 @@ func testThroughputAnomalyDetectionAlgo(t *testing.T, data *TestData, connect *s // Example output: Status of this anomaly detection job is COMPLETED func testAnomalyDetectionStatus(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") + _, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) stdout, err := tadgetJobStatus(t, data, jobName) require.NoError(t, err) @@ -133,7 +135,7 @@ func testAnomalyDetectionStatus(t *testing.T, data *TestData, connect *sql.DB) { // 2022-06-17 15:03:39 N/A tad-c7a9e768-559a-4bfb-b0c8-a0291b4c208c SUBMITTED func testAnomalyDetectionList(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") + _, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) stdout, err := tadlistJobs(t, data) require.NoError(t, err) @@ -152,7 +154,7 @@ func testAnomalyDetectionList(t *testing.T, data *TestData, connect *sql.DB) { // Example output: Successfully deleted anomaly detection job with name tad-eec9d1be-7204-4d50-8f57-d9c8757a2668 func testAnomalyDetectionDelete(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") + _, jobName, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) require.NoError(t, err) @@ -173,17 +175,17 @@ func testAnomalyDetectionRetrieve(t *testing.T, data *TestData, connect *sql.DB) algoNames := []string{"ARIMA", "EWMA", "DBSCAN"} // agg_type 'podLabel' stands for agg_type 'pod' with pod-label argument // agg_type 'podName' stands for agg_type 'pod' with pod-name argument - agg_types := []string{"e2e", "podName", "podLabel", "svc", "external"} + agg_types := []string{"None", "podName", "podLabel", "svc", "external"} // Select random algo for the agg_types - podname_algo := algoNames[randInt(t, int64(len(algoNames)))] - podlabel_algo := algoNames[randInt(t, int64(len(algoNames)))] - external_algo := algoNames[randInt(t, int64(len(algoNames)))] - svc_algo := algoNames[randInt(t, int64(len(algoNames)))] + aggTypeToAlgoNameMap := make(map[string]string) + for _, agg_type := range agg_types { + aggTypeToAlgoNameMap[agg_type] = algoNames[randInt(t, int64(len(algoNames)))] + } // Create a worker pool with maximum size of 3 pool := make(chan int, len(algoNames)) var ( - stdout string - wg sync.WaitGroup + wg sync.WaitGroup + poolIdx int ) prepareFlowTable(t, connect) result_map := map[string]map[string]string{ @@ -218,7 +220,7 @@ func testAnomalyDetectionRetrieve(t *testing.T, data *TestData, connect *sql.DB) "1.630": "true"}, } assert_variable_map := map[string]map[string]int{ - "e2e": { + "None": { "tadoutputArray_len": 12, "anomaly_output_idx": 11, "throughput_idx": 7}, @@ -239,64 +241,73 @@ func testAnomalyDetectionRetrieve(t *testing.T, data *TestData, connect *sql.DB) "anomaly_output_idx": 7, "throughput_idx": 3}, } - for _, agg_type := range agg_types { - for idx, algo := range algoNames { - if (agg_type == "e2e") || (agg_type == "podName" && algo == podname_algo) || (agg_type == "podLabel" && algo == podlabel_algo) || (agg_type == "external" && algo == external_algo) || (agg_type == "svc" && algo == svc_algo) { + poolIdx = 0 + for agg_type, algoName := range aggTypeToAlgoNameMap { + poolIdx += 1 + if agg_type == "None" { + for _, algo := range algoNames { + algoName = algo wg.Add(1) - pool <- idx - go func(t *testing.T, data *TestData, algo, agg_type string) { - defer func() { - <-pool - wg.Done() - }() - _, jobName, err := tadrunJob(t, data, algo, agg_type) - require.NoError(t, err) - err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) - require.NoError(t, err) - err = waitTADJobComplete(t, data, jobName, tadjobCompleteTimeout) - require.NoError(t, err) - stdout, err = tadretrieveJobResult(t, data, jobName) - require.NoError(t, err) - resultArray := strings.Split(stdout, "\n") - assert := assert.New(t) - length := len(resultArray) - assert.GreaterOrEqualf(length, 3, "stdout: %s", stdout) - assert.Containsf(stdout, "throughput", "stdout: %s", stdout) - assert.Containsf(stdout, "algoCalc", "stdout: %s", stdout) - assert.Containsf(stdout, "anomaly", "stdout: %s", stdout) - - for i := 1; i < length; i++ { - // check metrics' value - resultArray[i] = strings.TrimSpace(resultArray[i]) - if resultArray[i] != "" { - resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") - tadoutputArray := strings.Fields(resultArray[i]) - anomaly_output := tadoutputArray[assert_variable_map[agg_type]["anomaly_output_idx"]] - throughput := tadoutputArray[assert_variable_map[agg_type]["throughput_idx"]][:5] - assert.Equal(assert_variable_map[agg_type]["tadoutputArray_len"], len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) - switch algo { - case "ARIMA": - assert.Equal(result_map["ARIMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - case "EWMA": - assert.Equal(result_map["EWMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - case "DBSCAN": - assert.Equal(result_map["DBSCAN"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - } - } - } - _, err = taddeleteJob(t, data, jobName) - require.NoError(t, err) - }(t, data, algo, agg_type) + pool <- poolIdx + go executeRetrieveTest(t, data, algoName, agg_type, result_map, assert_variable_map, pool, &wg) } + } else { + wg.Add(1) + pool <- poolIdx + go executeRetrieveTest(t, data, algoName, agg_type, result_map, assert_variable_map, pool, &wg) } } wg.Wait() } +func executeRetrieveTest(t *testing.T, data *TestData, algo, agg_type string, result_map map[string]map[string]string, assert_variable_map map[string]map[string]int, pool chan int, wg *sync.WaitGroup) { + var stdout string + defer func() { + <-pool + wg.Done() + }() + _, jobName, err := tadrunJob(t, data, algo, agg_type) + require.NoError(t, err) + err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) + require.NoError(t, err) + err = waitTADJobComplete(t, data, jobName, tadjobCompleteTimeout) + require.NoError(t, err) + stdout, err = tadretrieveJobResult(t, data, jobName) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 3, "stdout: %s", stdout) + assert.Containsf(stdout, "throughput", "stdout: %s", stdout) + assert.Containsf(stdout, "algoCalc", "stdout: %s", stdout) + assert.Containsf(stdout, "anomaly", "stdout: %s", stdout) + for i := 1; i < length; i++ { + // check metrics' value + resultArray[i] = strings.TrimSpace(resultArray[i]) + if resultArray[i] != "" { + resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") + tadoutputArray := strings.Fields(resultArray[i]) + anomaly_output := tadoutputArray[assert_variable_map[agg_type]["anomaly_output_idx"]] + throughput := tadoutputArray[assert_variable_map[agg_type]["throughput_idx"]][:5] + assert.Equal(assert_variable_map[agg_type]["tadoutputArray_len"], len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) + switch algo { + case "ARIMA": + assert.Equal(result_map["ARIMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + case "EWMA": + assert.Equal(result_map["EWMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + case "DBSCAN": + assert.Equal(result_map["DBSCAN"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + } + } + } + _, err = taddeleteJob(t, data, jobName) + require.NoError(t, err) +} + // waitJobComplete waits for the anomaly detection Spark job completes func waitTADJobComplete(t *testing.T, data *TestData, jobName string, timeout time.Duration) error { - mutex.Lock() - defer mutex.Unlock() + e2emutex.Lock() + defer e2emutex.Unlock() stdout := "" err := wait.PollImmediate(defaultInterval, timeout, func() (bool, error) { stdout, err := tadgetJobStatus(t, data, jobName) @@ -322,27 +333,25 @@ func waitTADJobComplete(t *testing.T, data *TestData, jobName string, timeout ti } func tadrunJob(t *testing.T, data *TestData, algotype, agg_type string) (stdout string, jobName string, err error) { - mutex.Lock() - defer mutex.Unlock() + e2emutex.Lock() + defer e2emutex.Unlock() + var agg_flow_ext, ext string newjobcmd := tadstartCmd + " --algo " + algotype + " --driver-memory 1G --start-time 2022-08-11T06:26:50 --end-time 2022-08-12T08:26:54" switch agg_type { case "podName": - agg_flow_ext := " --agg-flow pod" - ext := " --pod-name test_podName" - newjobcmd = newjobcmd + agg_flow_ext + ext + agg_flow_ext = " --agg-flow pod" + ext = " --pod-name test_podName" case "podLabel": - agg_flow_ext := " --agg-flow pod" - ext := " --pod-label \"test_key\":\"test_value\"" - newjobcmd = newjobcmd + agg_flow_ext + ext + agg_flow_ext = " --agg-flow pod" + ext = " --pod-label \"test_key\":\"test_value\"" case "external": - agg_flow_ext := fmt.Sprintf(" --agg-flow %s", agg_type) - ext := " --external-ip 10.10.1.33" - newjobcmd = newjobcmd + agg_flow_ext + ext + agg_flow_ext = fmt.Sprintf(" --agg-flow %s", agg_type) + ext = " --external-ip 10.10.1.33" case "svc": - agg_flow_ext := fmt.Sprintf(" --agg-flow %s", agg_type) - ext := " --svc-port-name test_serviceportname" - newjobcmd = newjobcmd + agg_flow_ext + ext + agg_flow_ext = fmt.Sprintf(" --agg-flow %s", agg_type) + ext = " --svc-port-name test_serviceportname" } + newjobcmd = newjobcmd + agg_flow_ext + ext stdout, jobName, err = RunJob(t, data, newjobcmd) if err != nil { return "", "", err @@ -368,8 +377,8 @@ func tadlistJobs(t *testing.T, data *TestData) (stdout string, err error) { } func taddeleteJob(t *testing.T, data *TestData, jobName string) (stdout string, err error) { - mutex.Lock() - defer mutex.Unlock() + e2emutex.Lock() + defer e2emutex.Unlock() cmd := fmt.Sprintf("%s %s", taddeleteCmd, jobName) stdout, err = DeleteJob(t, data, cmd) if err != nil { @@ -379,8 +388,8 @@ func taddeleteJob(t *testing.T, data *TestData, jobName string) (stdout string, } func tadretrieveJobResult(t *testing.T, data *TestData, jobName string) (stdout string, err error) { - mutex.Lock() - defer mutex.Unlock() + e2emutex.Lock() + defer e2emutex.Unlock() cmd := fmt.Sprintf("%s %s", tadretrieveCmd, jobName) stdout, err = RetrieveJobResult(t, data, cmd) if err != nil { @@ -520,9 +529,9 @@ func populateFlowTable(t *testing.T, connect *sql.DB) { } func testTADCleanAfterTheiaMgrResync(t *testing.T, data *TestData) { - _, jobName1, err := tadrunJob(t, data, "ARIMA", "e2e") + _, jobName1, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) - _, jobName2, err := tadrunJob(t, data, "ARIMA", "e2e") + _, jobName2, err := tadrunJob(t, data, "ARIMA", "None") require.NoError(t, err) err = TheiaManagerRestart(t, data, jobName1, "tad") diff --git a/test/e2e/upgrade_test.go b/test/e2e/upgrade_test.go index 6295ad6d4..9b7d8d303 100644 --- a/test/e2e/upgrade_test.go +++ b/test/e2e/upgrade_test.go @@ -100,6 +100,8 @@ func TestUpgrade(t *testing.T) { func applyNewVersion(t *testing.T, data *TestData, antreaYML, chOperatorYML, flowVisibilityYML string) { t.Logf("Changing Antrea YAML to %s", antreaYML) // Do not wait for agent rollout as its updateStrategy is set to OnDelete for upgrade test. + rc, stdout, stderr, err := data.provider.RunCommandOnNode(controlPlaneNodeName(), "ls /root/") + t.Logf("error1 finding the directory and files %v, stdout %v, stderr %v, err %v", rc, stdout, stderr, err) if err := data.deployAntreaCommon(antreaYML, "", false); err != nil { t.Fatalf("Error upgrading Antrea: %v", err) } @@ -108,14 +110,23 @@ func applyNewVersion(t *testing.T, data *TestData, antreaYML, chOperatorYML, flo t.Fatalf("Error when restarting Antrea: %v", err) } + rc, stdout, stderr, err = data.provider.RunCommandOnNode(controlPlaneNodeName(), "ls /root/") + t.Logf("error2 finding the directory and files %v, stdout %v, stderr %v, err %v", rc, stdout, stderr, err) + t.Logf("Changing ClickHouse Operator YAML to %s,\nFlow Visibility YAML to %s", chOperatorYML, flowVisibilityYML) if err := data.deployFlowVisibilityCommon(chOperatorYML, flowVisibilityYML); err != nil { t.Fatalf("Error upgrading Flow Visibility: %v", err) } + + rc, stdout, stderr, err = data.provider.RunCommandOnNode(controlPlaneNodeName(), "ls /root/") + t.Logf("error3 finding the directory and files %v, stdout %v, stderr %v, err %v", rc, stdout, stderr, err) + t.Logf("Waiting for the ClickHouse Pod restarting") if err := data.waitForClickHousePod(); err != nil { t.Fatalf("Error when waiting for the ClickHouse Pod restarting: %v", err) } + rc, stdout, stderr, err = data.provider.RunCommandOnNode(controlPlaneNodeName(), "ls /root/") + t.Logf("error4 finding the directory and files %v, stdout %v, stderr %v, err %v", rc, stdout, stderr, err) } func checkClickHouseVersionTable(t *testing.T, data *TestData, version string) {