Skip to content

Commit

Permalink
e2e test cases and documentation for Aggregated TAD (#192)
Browse files Browse the repository at this point in the history
* E2E tests and Documentation for Aggregated TAD

This PR introduces:
- e2e tests for Aggregated TAD
- Documentation for Aggregated TAD

partially-solves: 168

Signed-off-by: Tushar Tathgur <[email protected]>

* Addressed comments

Signed-off-by: Tushar Tathgur <[email protected]>

---------

Signed-off-by: Tushar Tathgur <[email protected]>
Co-authored-by: Tushar Tathgur <[email protected]>
  • Loading branch information
tushartathgur and Tushar Tathgur committed Jul 21, 2023
1 parent 696921c commit aaeab26
Show file tree
Hide file tree
Showing 7 changed files with 354 additions and 78 deletions.
2 changes: 1 addition & 1 deletion ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ function run_test {
rm -rf $TMP_DIR
sleep 1

go test -v -timeout=30m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist
go test -v -timeout=45m antrea.io/theia/test/e2e -provider=kind --logs-export-dir=$ANTREA_LOG_DIR --skip=$skiplist
}

echo "======== Test encap mode =========="
Expand Down
49 changes: 44 additions & 5 deletions docs/throughput-anomaly-detection.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,34 @@ $ theia throughput-anomaly-detection run --algo "ARIMA"
Successfully started Throughput Anomaly Detection job with name tad-1234abcd-1234-abcd-12ab-12345678abcd
```

Throughput Anomaly Detection also provides support for aggregated throughput
anomaly detection.
There are three different types of aggregations that are included.

- `external` : Aggregated flows for inbound traffic to external IP,
user could provide external-IP using `external-ip` argument for further
filtering.
- `pod`: Aggregated flows for inbound/outbound pod traffic.
- `svc`: Aggregated flows for traffic to service port, user could
provide a destination port name using `svc-name-port` argument for
further filtering.

For aggregated flows `pod`, user can provide the following filter arguments.

- `pod-label`: The argument aggregates inbound/outbound traffic using Pod
labels.
- `pod-name`: The argument aggregates inbound/outbound traffic using Pod name.
- `pod-namespace`: The argument aggregates inbound/outbound traffic using
Pod namespace. However, this argument only works as a combination to any of
the above two arguments and can not be used alone.

To start an aggregated throughput anomaly detection, please run the following command:

```bash
$ theia throughput-anomaly-detection run --algo "ARIMA" --agg-flow pod --pod-label \"test_key\":\"test_value\"
Successfully started Throughput Anomaly Detection job with name tad-1234abcd-1234-abcd-12ab-12345678abcd
```

The name of the Throughput Anomaly Detection job contains a universally
unique identifier ([UUID](
https://en.wikipedia.org/wiki/Universally_unique_identifier)) that is
Expand Down Expand Up @@ -110,13 +138,24 @@ in table format, run:

```bash
$ theia throughput-anomaly-detection retrieve tad-1234abcd-1234-abcd-12ab-12345678abcd
id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput algoCalc anomaly
1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:24:54 10004969097.000000000000000000 4.0063773860532994E9 true
1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:06:54 4005703059.000000000000000000 1.0001208294655691E10 true
1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11 08:34:54 50007861276.000000000000000000 3.9735065921281104E9 true
id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly
1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:06:54Z 4.005703059e+09 None ARIMA 1.0001208441920074e+10 true
1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:24:54Z 1.0004969097e+10 None ARIMA 4.006432886406564e+09 true
1234abcd-1234-abcd-12ab-12345678abcd 10.10.1.25 58076 10.10.1.33 5201 2022-08-11T06:26:54Z 2022-08-11T08:34:54Z 5.0007861276e+10 None ARIMA 3.9735067954945493e+09 true
```

Aggregated Throughput Anomaly Detection has different columns based on the
aggregation type.
e.g. when aggregation type is `svc`, the output is the following

```bash
$ theia throughput-anomaly-detection retrieve tad-5ca4413d-6730-463e-8f95-86032ba28a4f
id destinationServicePortName flowEndSeconds throughput aggType algoType algoCalc anomaly
5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:24:54Z 5.0024845485e+10 svc ARIMA 2.0863933021708477e+10 true
5ca4413d-6730-463e-8f95-86032ba28a4f test_serviceportname 2022-08-11T08:34:54Z 2.5003930638e+11 svc ARIMA 1.9138281301304165e+10 true
```

User may also save the result in an output file in json format
User may also save the result in an output file in json format.

### List all throughput anomaly detection jobs

Expand Down
2 changes: 1 addition & 1 deletion pkg/theia/commands/anomaly_detection_retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func throughputAnomalyDetectionRetrieve(cmd *cobra.Command, args []string) error
} else {
var result [][]string
switch tad.Stats[0].AggType {
case "e2e":
case "None":
result = append(result, []string{"id", "sourceIP", "sourceTransportPort", "destinationIP", "destinationTransportPort", "flowStartSeconds", "flowEndSeconds", "throughput", "aggType", "algoType", "algoCalc", "anomaly"})
for _, p := range tad.Stats {
result = append(result, []string{p.Id, p.SourceIP, p.SourceTransportPort, p.DestinationIP, p.DestinationTransportPort, p.FlowStartSeconds, p.FlowEndSeconds, p.Throughput, p.AggType, p.AlgoType, p.AlgoCalc, p.Anomaly})
Expand Down
6 changes: 3 additions & 3 deletions pkg/theia/commands/anomaly_detection_retrieve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) {
filePath string
}{
{
name: "Valid case e2e",
name: "Valid case No agg_type",
testServer: httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/intelligence.theia.antrea.io/v1alpha1/throughputanomalydetectors/%s", tadName):
Expand All @@ -57,7 +57,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) {
Id: tadName,
Anomaly: "true",
AlgoCalc: "1234567",
AggType: "e2e",
AggType: "None",
}},
}
w.Header().Set("Content-Type", "application/json")
Expand All @@ -66,7 +66,7 @@ func TestAnomalyDetectorRetrieve(t *testing.T) {
}
})),
tadName: "tad-1234abcd-1234-abcd-12ab-12345678abcd",
expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "e2e 1234567 true"},
expectedMsg: []string{"id sourceIP sourceTransportPort destinationIP destinationTransportPort flowStartSeconds flowEndSeconds throughput aggType algoType algoCalc anomaly", "None 1234567 true"},
expectedErrorMsg: "",
},
{
Expand Down
129 changes: 128 additions & 1 deletion pkg/theia/commands/anomaly_detection_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func TestAnomalyDetectionRun(t *testing.T) {
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().String("agg-flow", "pod", "")
cmd.Flags().String("agg-flow", "external", "")
cmd.Flags().String("pod-label", "app:label1", "")
cmd.Flags().String("pod-name", "testpodname", "")
cmd.Flags().String("pod-namespace", "testpodnamespace", "")
Expand Down Expand Up @@ -236,6 +236,38 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) {
name: "Invalid executor-memory",
expectedErrorMsg: "executor-memory should conform to the Kubernetes resource quantity convention",
},
{
name: "Unspecified agg-flow",
expectedErrorMsg: ErrorMsgUnspecifiedCase,
},
{
name: "Unspecified pod-label",
expectedErrorMsg: ErrorMsgUnspecifiedCase,
},
{
name: "Unspecified pod-name",
expectedErrorMsg: ErrorMsgUnspecifiedCase,
},
{
name: "Unspecified pod-namespace",
expectedErrorMsg: ErrorMsgUnspecifiedCase,
},
{
name: "Invalid pod-namespace",
expectedErrorMsg: "argument can not be used alone",
},
{
name: "Unspecified external-ip",
expectedErrorMsg: ErrorMsgUnspecifiedCase,
},
{
name: "Unspecified svc-port-name",
expectedErrorMsg: ErrorMsgUnspecifiedCase,
},
{
name: "Invalid agg-flow",
expectedErrorMsg: "aggregated flow type should be 'pod' or 'external' or 'svc'",
},
{
name: "Unspecified use-cluster-ip",
expectedErrorMsg: ErrorMsgUnspecifiedCase,
Expand Down Expand Up @@ -346,6 +378,99 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) {
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "mock_executor-memory", "")
case "Unspecified agg-flow":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
case "Unspecified pod-label":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "pod", "")
case "Unspecified pod-name":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "pod", "")
cmd.Flags().String("pod-label", "mock_pod-label", "")
case "Unspecified pod-namespace":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "pod", "")
cmd.Flags().String("pod-label", "mock_pod_label", "")
cmd.Flags().String("pod-name", "mock_pod-name", "")
case "Invalid pod-namespace":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "pod", "")
cmd.Flags().String("pod-label", "", "")
cmd.Flags().String("pod-name", "", "")
cmd.Flags().String("pod-namespace", "mock_pod-namespace", "")
case "Unspecified external-ip":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "external", "")
case "Unspecified svc-port-name":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "svc", "")
case "Invalid agg-flow":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
cmd.Flags().String("end-time", "2006-01-03 16:04:05", "")
cmd.Flags().String("ns-ignore-list", "[\"kube-system\",\"flow-aggregator\",\"flow-visibility\"]", "")
cmd.Flags().Int32("executor-instances", 1, "")
cmd.Flags().String("driver-core-request", "1", "")
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "mock_agg-flow", "")
case "Unspecified use-cluster-ip":
cmd.Flags().String("algo", "ARIMA", "")
cmd.Flags().String("start-time", "2006-01-02 15:04:05", "")
Expand All @@ -356,6 +481,8 @@ func TestThroughputAnomalyDetectionAlgo(t *testing.T) {
cmd.Flags().String("driver-memory", "1m", "")
cmd.Flags().String("executor-core-request", "1", "")
cmd.Flags().String("executor-memory", "1m", "")
cmd.Flags().String("agg-flow", "svc", "")
cmd.Flags().String("svc-port-name", "mock_svc_name", "")
}
err := throughputAnomalyDetectionAlgo(cmd, []string{})
if tt.expectedErrorMsg == "" {
Expand Down
4 changes: 2 additions & 2 deletions plugins/anomaly-detection/anomaly_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ def filter_df_with_true_anomalies(
if ret_plot.count() == 0:
ret_plot = ret_plot.collect()
if agg_flow == "":
agg_type = "e2e"
agg_type = "None"
else:
agg_type = agg_flow
ret_plot.append({
Expand Down Expand Up @@ -624,7 +624,7 @@ def assign_flow_type(prepared_DF, agg_flow=None, direction=None):
elif agg_flow == "pod":
prepared_DF = prepared_DF.withColumn('aggType', f.lit("pod"))
else:
prepared_DF = prepared_DF.withColumn('aggType', f.lit("e2e"))
prepared_DF = prepared_DF.withColumn('aggType', f.lit("None"))
return prepared_DF


Expand Down
Loading

0 comments on commit aaeab26

Please sign in to comment.