diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index b7237550a..659cb1b80 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -173,7 +173,7 @@ function run_test { rm -rf $TMP_DIR sleep 1 - go test -v -timeout=30m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist + go test -v -timeout=45m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist } echo "======== Test encap mode ==========" diff --git a/docs/throughput-anomaly-detection.md b/docs/throughput-anomaly-detection.md index 6ea4ce0f7..faf1102fe 100644 --- a/docs/throughput-anomaly-detection.md +++ b/docs/throughput-anomaly-detection.md @@ -69,6 +69,34 @@ $ theia throughput-anomaly-detection run --algo "ARIMA" Successfully started Throughput Anomaly Detection job with name tad-1234abcd-1234-abcd-12ab-12345678abcd ``` +Throughput Anomaly Detection also provide support for aggregated throughput +anomaly detection. +There are three different types of aggregations that are included. + +- `external` : Aggregated flow for inbound traffic to external IP, + user could provide external-IP using `external-ip` argument for further + filtering. +- `pod`: Aggregated flow for inbound/outbound pod traffic. +- `svc`: Aggregated flow for traffic to service port, user could + provide a destination port name using `svc-name-port` argument for + further filtering. + +For aggregated flow `pod`, user can provide the following filter arguments. + +- `pod-label`: The argument aggregates inbound/outbound traffic using pod + labels. +- `pod-name`: The argument aggregates inbound/outbound traffic using pod name. +- `pod-namespace`: The argument aggregates inbound/outbound traffic using + podnamespace. However, this argument only works as a combination to any of + the above two arguments and can not be used alone. + +All the above aggregated flow are executed using the following command: + +```bash +$ theia throughput-anomaly-detection run --algo "ARIMA" --agg-flow pod --pod-label \"test_key\":\"test_value\" +Successfully started Throughput Anomaly Detection job with name tad-1234abcd-1234-abcd-12ab-12345678abcd +``` + The name of the Throughput Anomaly Detection job contains a universally unique identifier ([UUID]( https://en.wikipedia.org/wiki/Universally_unique_identifier)) that is @@ -110,13 +138,24 @@ in table format, run: ```bash $ theia throughput-anomaly-detection retrieve tad-1234abcd-1234-abcd-12ab-12345678abcd -id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput algoCalc anomaly -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:24:54 10004969097.000000000000000000 4.0063773860532994E9 true -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:06:54 4005703059.000000000000000000 1.0001208294655691E10 true -1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:34:54 50007861276.000000000000000000 3.9735065921281104E9 true +id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:06:54Z 4.005703059e+09 e2e ARIMA 1.0001208441920074e+10 true +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:24:54Z 1.0004969097e+10 e2e ARIMA 4.006432886406564e+09 true +1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:34:54Z 5.0007861276e+10 e2e ARIMA 3.9735067954945493e+09 true +``` + +Aggregated Throughput Anomaly Detection has different columns based of the +aggregation type. +e.g. agg-type pod output + +```bash +$ theia throughput-anomaly-detection retrieve tad-5ca4413d-6730-463e-8f95-86032ba28a4f +id destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly +5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:24:54Z 5.0024845485e+10 svc ARIMA 2.0863933021708477e+10 true +5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:34:54Z 2.5003930638e+11 svc ARIMA 1.9138281301304165e+10 true ``` -User may also save the result in an output file in json format +User may also save the result in an output file in json format. ### List all throughput anomaly detection jobs diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 559eb3b9f..c94c63fd9 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -26,6 +26,7 @@ import ( "regexp" "strconv" "strings" + "sync" "testing" "time" @@ -163,6 +164,7 @@ const ( var ( errConnectionLost = fmt.Errorf("http2: client connection lost") clickHousePodName = fmt.Sprintf("%s-0-0-0", clickHousePodNamePrefix) + mutex sync.Mutex ) type FlowVisibiltiySetUpConfig struct { diff --git a/test/e2e/throughputanomalydetection_test.go b/test/e2e/throughputanomalydetection_test.go index dbbd766ee..dc60eb042 100644 --- a/test/e2e/throughputanomalydetection_test.go +++ b/test/e2e/throughputanomalydetection_test.go @@ -101,7 +101,7 @@ func prepareFlowTable(t *testing.T, connect *sql.DB) { // Example output: Successfully created Throughput Anomaly Detection job with name tad-eec9d1be-7204-4d50-8f57-d9c8757a2668 func testThroughputAnomalyDetectionAlgo(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - stdout, jobName, err := tadrunJob(t, data, "ARIMA") + stdout, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") require.NoError(t, err) assert := assert.New(t) assert.Containsf(stdout, fmt.Sprintf("Successfully started Throughput Anomaly Detection job with name: %s", jobName), "stdout: %s", stdout) @@ -114,7 +114,7 @@ func testThroughputAnomalyDetectionAlgo(t *testing.T, data *TestData, connect *s // Example output: Status of this anomaly detection job is COMPLETED func testAnomalyDetectionStatus(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA") + _, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") require.NoError(t, err) stdout, err := tadgetJobStatus(t, data, jobName) require.NoError(t, err) @@ -133,7 +133,7 @@ func testAnomalyDetectionStatus(t *testing.T, data *TestData, connect *sql.DB) { // 2022-06-17 15:03:39 N/A tad-c7a9e768-559a-4bfb-b0c8-a0291b4c208c SUBMITTED func testAnomalyDetectionList(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA") + _, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") require.NoError(t, err) stdout, err := tadlistJobs(t, data) require.NoError(t, err) @@ -152,7 +152,7 @@ func testAnomalyDetectionList(t *testing.T, data *TestData, connect *sql.DB) { // Example output: Successfully deleted anomaly detection job with name tad-eec9d1be-7204-4d50-8f57-d9c8757a2668 func testAnomalyDetectionDelete(t *testing.T, data *TestData, connect *sql.DB) { prepareFlowTable(t, connect) - _, jobName, err := tadrunJob(t, data, "ARIMA") + _, jobName, err := tadrunJob(t, data, "ARIMA", "e2e") require.NoError(t, err) err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) require.NoError(t, err) @@ -166,78 +166,147 @@ func testAnomalyDetectionDelete(t *testing.T, data *TestData, connect *sql.DB) { } // Example Output -// id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput algoCalc anomaly -// 4196479b-6e90-462c-b44f-e326baa52686 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:24:54Z 1.0004969097e+10 4.006432886406564e+09 true -// 4196479b-6e90-462c-b44f-e326baa52686 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:06:54Z 4.005703059e+09 1.0001208441920074e+10 true -// 4196479b-6e90-462c-b44f-e326baa52686 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:34:54Z 5.0007861276e+10 3.9735067954945493e+09 true +// id destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly +// 5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:24:54Z 5.0024845485e+10 svc ARIMA 2.0863933021708477e+10 true +// 5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:34:54Z 2.5003930638e+11 svc ARIMA 1.9138281301304165e+10 true func testAnomalyDetectionRetrieve(t *testing.T, data *TestData, connect *sql.DB) { - prepareFlowTable(t, connect) algoNames := []string{"ARIMA", "EWMA", "DBSCAN"} + // agg_type 'podLabel' stands for agg_type 'pod' with pod-label argument + // agg_type 'podName' stands for agg_type 'pod' with pod-name argument + agg_types := []string{"e2e", "podName", "podLabel", "svc", "external"} + // Select random algo for the agg_types + podname_algo := algoNames[randInt(t, int64(len(algoNames)))] + podlabel_algo := algoNames[randInt(t, int64(len(algoNames)))] + external_algo := algoNames[randInt(t, int64(len(algoNames)))] + svc_algo := algoNames[randInt(t, int64(len(algoNames)))] + // Create a worker pool with maximum size of 3 + pool := make(chan int, len(algoNames)) + var ( + stdout string + wg sync.WaitGroup + ) + prepareFlowTable(t, connect) result_map := map[string]map[string]string{ "ARIMA": { - "4.006": "true", + "4.005": "true", "1.000": "true", - "3.973": "true"}, + "5.000": "true", + "2.500": "true", + "5.002": "true", + "2.003": "true", + "2.002": "true", + }, "EWMA": { - "2.700": "true", - "1.550": "true", - "9.755": "true"}, + "4.004": "true", + "4.005": "true", + "4.006": "true", + "5.000": "true", + "2.002": "true", + "2.003": "true", + "2.500": "true", + }, "DBSCAN": { "1.000": "true", "1.005": "true", "5.000": "true", "3.260": "true", - "2.058": "true"}, + "2.058": "true", + "5.002": "true", + "5.027": "true", + "2.500": "true", + "1.029": "true", + "1.630": "true"}, } - for _, algo := range algoNames { - _, jobName, err := tadrunJob(t, data, algo) - require.NoError(t, err) - err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) - require.NoError(t, err) - err = waitTADJobComplete(t, data, jobName, tadjobCompleteTimeout) - require.NoErrorf(t, err, "Throughput Anomaly Detection Spark job failed to complete") - stdout, err := tadretrieveJobResult(t, data, jobName) - resultArray := strings.Split(stdout, "\n") - assert := assert.New(t) - length := len(resultArray) - assert.GreaterOrEqualf(length, 3, "stdout: %s", stdout) - assert.Containsf(stdout, "throughput", "stdout: %s", stdout) - assert.Containsf(stdout, "algoCalc", "stdout: %s", stdout) - assert.Containsf(stdout, "anomaly", "stdout: %s", stdout) - - for i := 1; i < length; i++ { - // check metrics' value - resultArray[i] = strings.TrimSpace(resultArray[i]) - if resultArray[i] != "" { - resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") - tadoutputArray := strings.Fields(resultArray[i]) - anomaly_output := tadoutputArray[11] - assert.Equal(12, len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) - switch algo { - case "ARIMA": - algo_throughput := tadoutputArray[10][:5] - assert.Equal(result_map["ARIMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - case "EWMA": - algo_throughput := tadoutputArray[10][:5] - assert.Equal(result_map["EWMA"][algo_throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - case "DBSCAN": - throughput := tadoutputArray[7][:5] - assert.Equal(result_map["DBSCAN"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) - } + assert_variable_map := map[string]map[string]int{ + "e2e": { + "tadoutputArray_len": 12, + "anomaly_output_idx": 11, + "throughput_idx": 7}, + "podName": { + "tadoutputArray_len": 10, + "anomaly_output_idx": 9, + "throughput_idx": 5}, + "podLabel": { + "tadoutputArray_len": 11, + "anomaly_output_idx": 10, + "throughput_idx": 6}, + "external": { + "tadoutputArray_len": 8, + "anomaly_output_idx": 7, + "throughput_idx": 3}, + "svc": { + "tadoutputArray_len": 8, + "anomaly_output_idx": 7, + "throughput_idx": 3}, + } + for _, agg_type := range agg_types { + for idx, algo := range algoNames { + if (agg_type == "e2e") || (agg_type == "podName" && algo == podname_algo) || (agg_type == "podLabel" && algo == podlabel_algo) || (agg_type == "external" && algo == external_algo) || (agg_type == "svc" && algo == svc_algo) { + wg.Add(1) + pool <- idx + go func(t *testing.T, data *TestData, algo, agg_type string) { + defer func() { + <-pool + wg.Done() + }() + _, jobName, err := tadrunJob(t, data, algo, agg_type) + require.NoError(t, err) + err = data.podWaitForReady(defaultTimeout, jobName+"-driver", flowVisibilityNamespace) + require.NoError(t, err) + err = waitTADJobComplete(t, data, jobName, tadjobCompleteTimeout) + require.NoError(t, err) + stdout, err = tadretrieveJobResult(t, data, jobName) + require.NoError(t, err) + resultArray := strings.Split(stdout, "\n") + assert := assert.New(t) + length := len(resultArray) + assert.GreaterOrEqualf(length, 3, "stdout: %s", stdout) + assert.Containsf(stdout, "throughput", "stdout: %s", stdout) + assert.Containsf(stdout, "algoCalc", "stdout: %s", stdout) + assert.Containsf(stdout, "anomaly", "stdout: %s", stdout) + + for i := 1; i < length; i++ { + // check metrics' value + resultArray[i] = strings.TrimSpace(resultArray[i]) + if resultArray[i] != "" { + resultArray[i] = strings.ReplaceAll(resultArray[i], "\t", " ") + tadoutputArray := strings.Fields(resultArray[i]) + anomaly_output := tadoutputArray[assert_variable_map[agg_type]["anomaly_output_idx"]] + throughput := tadoutputArray[assert_variable_map[agg_type]["throughput_idx"]][:5] + assert.Equal(assert_variable_map[agg_type]["tadoutputArray_len"], len(tadoutputArray), "tadoutputArray: %s", tadoutputArray) + switch algo { + case "ARIMA": + assert.Equal(result_map["ARIMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + case "EWMA": + assert.Equal(result_map["EWMA"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + case "DBSCAN": + assert.Equal(result_map["DBSCAN"][throughput], anomaly_output, "Anomaly outputs dont match in tadoutputArray: %s", tadoutputArray) + } + } + } + _, err = taddeleteJob(t, data, jobName) + require.NoError(t, err) + }(t, data, algo, agg_type) } } - require.NoError(t, err) - _, err = taddeleteJob(t, data, jobName) - require.NoError(t, err) } + wg.Wait() } // waitJobComplete waits for the anomaly detection Spark job completes func waitTADJobComplete(t *testing.T, data *TestData, jobName string, timeout time.Duration) error { + mutex.Lock() + defer mutex.Unlock() stdout := "" err := wait.PollImmediate(defaultInterval, timeout, func() (bool, error) { stdout, err := tadgetJobStatus(t, data, jobName) - require.NoError(t, err) + if err != nil { + if strings.Contains(err.Error(), "TLS handshake timeout") { + return false, nil + } + } else { + require.NoError(t, err) + } if strings.Contains(stdout, "Status of this anomaly detection job is COMPLETED") { return true, nil } @@ -252,8 +321,28 @@ func waitTADJobComplete(t *testing.T, data *TestData, jobName string, timeout ti return nil } -func tadrunJob(t *testing.T, data *TestData, algotype string) (stdout string, jobName string, err error) { +func tadrunJob(t *testing.T, data *TestData, algotype, agg_type string) (stdout string, jobName string, err error) { + mutex.Lock() + defer mutex.Unlock() newjobcmd := tadstartCmd + " --algo " + algotype + " --driver-memory 1G --start-time 2022-08-11T06:26:50 --end-time 2022-08-12T08:26:54" + switch agg_type { + case "podName": + agg_flow_ext := " --agg-flow pod" + ext := " --pod-name test_podName" + newjobcmd = newjobcmd + agg_flow_ext + ext + case "podLabel": + agg_flow_ext := " --agg-flow pod" + ext := " --pod-label \"test_key\":\"test_value\"" + newjobcmd = newjobcmd + agg_flow_ext + ext + case "external": + agg_flow_ext := fmt.Sprintf(" --agg-flow %s", agg_type) + ext := " --external-ip 10.10.1.33" + newjobcmd = newjobcmd + agg_flow_ext + ext + case "svc": + agg_flow_ext := fmt.Sprintf(" --agg-flow %s", agg_type) + ext := " --svc-port-name test_serviceportname" + newjobcmd = newjobcmd + agg_flow_ext + ext + } stdout, jobName, err = RunJob(t, data, newjobcmd) if err != nil { return "", "", err @@ -279,6 +368,8 @@ func tadlistJobs(t *testing.T, data *TestData) (stdout string, err error) { } func taddeleteJob(t *testing.T, data *TestData, jobName string) (stdout string, err error) { + mutex.Lock() + defer mutex.Unlock() cmd := fmt.Sprintf("%s %s", taddeleteCmd, jobName) stdout, err = DeleteJob(t, data, cmd) if err != nil { @@ -288,6 +379,8 @@ func taddeleteJob(t *testing.T, data *TestData, jobName string) (stdout string, } func tadretrieveJobResult(t *testing.T, data *TestData, jobName string) (stdout string, err error) { + mutex.Lock() + defer mutex.Unlock() cmd := fmt.Sprintf("%s %s", tadretrieveCmd, jobName) stdout, err = RetrieveJobResult(t, data, cmd) if err != nil { @@ -304,6 +397,14 @@ func addFakeRecordforTAD(t *testing.T, stmt *sql.Stmt) { destinationIP := "10.10.1.33" destinationTransportPort := 5201 protocolIndentifier := 6 + sourcePodNamespace := "test_namespace" + sourcePodName := "test_podName" + destinationPodName := "test_podName" + destinationPodNamespace := "test_namespace" + sourcePodLabels := "{\"test_key\":\"test_value\"}" + destinationPodLabels := "{\"test_key\":\"test_value\"}" + destinationServicePortName := "test_serviceportname" + flowtype := 3 throughputs := []int64{ 4007380032, 4006917952, 4004471308, 4005277827, 4005486294, @@ -344,15 +445,15 @@ func addFakeRecordforTAD(t *testing.T, stmt *sql.Stmt) { uint64(randInt(t, MaxInt32)), uint64(randInt(t, MaxInt32)), uint64(randInt(t, MaxInt32)), - fmt.Sprintf("PodName-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodNameSpace-%d", randInt(t, MaxInt32)), + sourcePodName, + sourcePodNamespace, fmt.Sprintf("NodeName-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodName-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodNameSpace-%d", randInt(t, MaxInt32)), + destinationPodName, + destinationPodNamespace, fmt.Sprintf("NodeName-%d", randInt(t, MaxInt32)), getRandIP(t), uint16(randInt(t, 65535)), - fmt.Sprintf("ServicePortName-%d", randInt(t, MaxInt32)), + destinationServicePortName, fmt.Sprintf("PolicyName-%d", randInt(t, MaxInt32)), fmt.Sprintf("PolicyNameSpace-%d", randInt(t, MaxInt32)), fmt.Sprintf("PolicyRuleName-%d", randInt(t, MaxInt32)), @@ -364,9 +465,9 @@ func addFakeRecordforTAD(t *testing.T, stmt *sql.Stmt) { 1, 1, "tcpState", - 0, - fmt.Sprintf("PodLabels-%d", randInt(t, MaxInt32)), - fmt.Sprintf("PodLabels-%d", randInt(t, MaxInt32)), + flowtype, + sourcePodLabels, + destinationPodLabels, uint64(throughput), uint64(randInt(t, MaxInt32)), uint64(randInt(t, MaxInt32)), @@ -419,9 +520,9 @@ func populateFlowTable(t *testing.T, connect *sql.DB) { } func testTADCleanAfterTheiaMgrResync(t *testing.T, data *TestData) { - _, jobName1, err := tadrunJob(t, data, "ARIMA") + _, jobName1, err := tadrunJob(t, data, "ARIMA", "e2e") require.NoError(t, err) - _, jobName2, err := tadrunJob(t, data, "ARIMA") + _, jobName2, err := tadrunJob(t, data, "ARIMA", "e2e") require.NoError(t, err) err = TheiaManagerRestart(t, data, jobName1, "tad")