Skip to content

Commit

Permalink
[test_bgp_stress_link_flap] case hung sometimes due to memory exhaust (
Browse files Browse the repository at this point in the history
…sonic-net#14163)

What is the motivation for this PR?
The case is flaky
Sometimes the case runs a long time and no response. Especially on kvm device.
Based on current log, it should be related to the memory resource limitation.
The case would create so many threads to flap the neighbor, it would cause kvm device memory exhaust, and same for low memory physical device.
Based on available logs, no obvious memory leak issue.

How did you do it?
The case is for stress link flap, it creates thread per interface to flap.
1: enlarge the delay time for kvm
2: only test one interface for kvm
3: Use one thread to flap all the interfaces for fanout.
4: correct neighbor host
6: add event stop and timeout for thread function to ensure thread exit

How did you verify/test it?
run the case locally and verified using elastictest
https://elastictest.org/scheduler/testplan/66c69c4008761ba27f76ed5d
https://elastictest.org/scheduler/testplan/66c69c2708761ba27f76ed5b
https://elastictest.org/scheduler/testplan/66cdafd1bd14ce56b2e820f7

Signed-off-by: xuliping <[email protected]>
  • Loading branch information
lipxu authored and mssonicbld committed Sep 2, 2024
1 parent d58b749 commit 41cb2f7
Showing 1 changed file with 205 additions and 74 deletions.
279 changes: 205 additions & 74 deletions tests/bgp/test_bgp_stress_link_flap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
import time
import traceback
import threading
from tests.common.platform.device_utils import fanout_switch_port_lookup
from tests.common.helpers.assertions import pytest_assert
from tests.common.utilities import wait_until
Expand All @@ -15,6 +16,12 @@
]

stop_threads = False
SLEEP_DURATION = 0.005
TEST_RUN_DURATION = 600
MEMORY_EXHAUST_THRESHOLD = 300
dut_flap_count = 0
fanout_flap_count = 0
neighbor_flap_count = 0


@pytest.fixture(scope='module')
Expand Down Expand Up @@ -90,156 +97,280 @@ def setup(duthosts, rand_one_dut_hostname, nbrhosts, fanouthosts):

neighbor = dev_nbrs[port]["name"]
neighbor_port = dev_nbrs[port]["port"]

logger.info("no shutdown neighbor interface, neighbor {} port {}".format(neighbor, neighbor_port))
nbrhosts[neighbor]['host'].no_shutdown(neighbor_port)
neighbor_host = nbrhosts.get(neighbor, {}).get('host', None)
if neighbor_host:
neighbor_host.no_shutdown(neighbor_port)
logger.info("no shutdown neighbor interface, neighbor {} port {}".format(neighbor, neighbor_port))
else:
logger.debug("neighbor host not found for {} port {}".format(neighbor, neighbor_port))

time.sleep(1)

pytest_assert(wait_until(600, 10, 0, duthost.check_bgp_session_state, list(bgp_neighbors.keys())),
"Not all BGP sessions are established on DUT")


def flap_dut_interface(duthost, port):
logger.info("flap dut {} interface {}".format(duthost, port))
def flap_dut_interface(duthost, port, stop_event, sleep_duration, test_run_duration):
logger.info("flap dut {} interface {} delay time {} timeout {}".format(
duthost, port, sleep_duration, test_run_duration))
global dut_flap_count
dut_flap_count = 0
while (True):

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
duthost.shutdown(port)
time.sleep(0.1)
time.sleep(sleep_duration)
duthost.no_shutdown(port)
time.sleep(0.1)
time.sleep(sleep_duration)
if stop_threads:
logger.info("stop_threads now true, breaking flap dut {} interface {} flap count {}".format(
logger.info("Stop flap thread, breaking dut flap dut {} interface {} flap count {}".format(
duthost, port, dut_flap_count))
break
dut_flap_count += 1


def flap_fanout_interface(fanouthosts, duthost, port):
fanout, fanout_port = fanout_switch_port_lookup(fanouthosts, duthost.hostname, port)
def flap_fanout_interface_all(interface_list, fanouthosts, duthost, stop_event, sleep_duration, test_run_duration):
global fanout_flap_count
fanout_flap_count = 0
fanout_interfaces = {}

for port in interface_list:
fanout, fanout_port = fanout_switch_port_lookup(fanouthosts, duthost.hostname, port)
if fanout and fanout_port:
if fanout not in fanout_interfaces:
fanout_interfaces[fanout] = []
fanout_interfaces[fanout].append(fanout_port)

logger.info("flap interface fanout port {}".format(fanout_interfaces))

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
for fanout_host, fanout_ports in fanout_interfaces.items():
logger.info("flap interface fanout {} port {}".format(fanout_host, fanout_port))

fanout_host.shutdown_multiple(fanout_ports)
time.sleep(sleep_duration)
fanout_host.no_shutdown_multiple(fanout_ports)
time.sleep(sleep_duration)

fanout_flap_count += 1
if stop_threads:
logger.info("Stop flap thread, breaking flap fanout {} dut {} flap count {}".format(
fanouthosts, duthost, fanout_flap_count))
break


def flap_fanout_interface(interface_list, fanouthosts, duthost, stop_event, sleep_duration, test_run_duration):
global fanout_flap_count
fanout_flap_count = 0
if fanout and fanout_port:
logger.info("flap interface fanout {} port {}".format(fanout, fanout_port))
while (True):
fanout.shutdown(fanout_port)
time.sleep(0.1)
fanout.no_shutdown(fanout_port)
time.sleep(0.1)

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
for port in interface_list:
if stop_threads:
logger.info("stop_threads now true, breaking flap fanout {} interface {} flap count {}".format(
fanout, fanout_port, fanout_flap_count))
break
fanout_flap_count += 1
else:
logger.warning("fanout not found for {} port {}".format(duthost.hostname, port))

fanout, fanout_port = fanout_switch_port_lookup(fanouthosts, duthost.hostname, port)
if fanout and fanout_port:
logger.info("flap interface fanout {} port {}".format(fanout, fanout_port))
fanout.shutdown(fanout_port)
time.sleep(sleep_duration)
fanout.no_shutdown(fanout_port)
time.sleep(sleep_duration)
else:
logger.warning("fanout not found for {} port {}".format(duthost.hostname, port))

fanout_flap_count += 1
if stop_threads:
logger.info("Stop flap thread, breaking flap fanout {} dut {} interface {} flap count {}".format(
fanouthosts, duthost, port, fanout_flap_count))
break

def flap_neighbor_interface(neighbor, neighbor_port):

def flap_neighbor_interface(neighbor, neighbor_port, stop_event, sleep_duration, test_run_duration):
logger.info("flap neighbor {} interface {}".format(neighbor, neighbor_port))
global neighbor_flap_count
neighbor_flap_count = 0
while (True):

start_time = time.time() # Record the start time
while not stop_event.is_set() and time.time() - start_time < test_run_duration:
neighbor.shutdown(neighbor_port)
time.sleep(0.1)
time.sleep(sleep_duration)
neighbor.no_shutdown(neighbor_port)
time.sleep(0.1)
time.sleep(sleep_duration)
if stop_threads:
logger.info("stop_threads now true, breaking flap neighbor {} interface {} flap count {}".format(
logger.info("Stop flap thread, breaking flap neighbor {} interface {} flap count {}".format(
neighbor, neighbor_port, neighbor_flap_count))
break
neighbor_flap_count += 1


@pytest.mark.parametrize("interface", ["dut", "fanout", "neighbor", "all"])
def test_bgp_stress_link_flap(duthosts, rand_one_dut_hostname, setup, fanouthosts, interface):
@pytest.mark.parametrize("test_type", ["dut", "fanout", "neighbor", "all"])
def test_bgp_stress_link_flap(duthosts, rand_one_dut_hostname, setup, nbrhosts, fanouthosts, test_type):
global stop_threads
global dut_flap_count
global fanout_flap_count
global neighbor_flap_count

duthost = duthosts[rand_one_dut_hostname]

# Skip the test on Virtual Switch due to fanout switch dependency and warm reboot
asic_type = duthost.facts['asic_type']
if asic_type == "vs" and (interface == "fanout" or interface == "all"):
if asic_type == "vs" and (test_type == "fanout" or test_type == "all"):
pytest.skip("Stress link flap test is not supported on Virtual Switch")

if asic_type != "vs":
delay_time = SLEEP_DURATION
else:
delay_time = SLEEP_DURATION * 100

eth_nbrs = setup.get('eth_nbrs', {})
interface_list = eth_nbrs.keys()
logger.debug('interface_list: {}'.format(interface_list))

stop_threads = False
dut_flap_count = 0
fanout_flap_count = 0
neighbor_flap_count = 0
# Create a stop event
stop_event = threading.Event()

flap_threads = []

if interface == "dut":
if test_type == "dut":
for interface in interface_list:
thread = InterruptableThread(
target=flap_dut_interface,
args=(duthost, interface)
)
thread.daemon = True
thread.start()
flap_threads.append(thread)
elif interface == "fanout":
for interface in interface_list:
thread = InterruptableThread(
target=flap_fanout_interface,
args=(fanouthosts, duthost, interface)
args=(duthost, interface, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread.daemon = True
thread.start()
logger.info("Start flap thread {} dut {} interface {}".format(thread, duthost, interface))
flap_threads.append(thread)
elif interface == "neighbor":
# create only one thread for vs due to memory resource limitation
if asic_type == "vs":
break
elif test_type == "fanout":
thread = InterruptableThread(
target=flap_fanout_interface,
args=(interface_list, fanouthosts, duthost, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread.daemon = True
thread.start()
logger.info("Start flap thread {} fanout {} dut {}".format(thread, fanouthosts, duthost))
flap_threads.append(thread)
elif test_type == "neighbor":
for interface in interface_list:
neighbor = eth_nbrs[interface]["name"]
neighbor_name = eth_nbrs[interface]["name"]
neighbor_port = eth_nbrs[interface]["port"]
logger.info("shutdown interface neighbor {} port {}".format(neighbor, neighbor_port))
thread = InterruptableThread(
target=flap_neighbor_interface,
args=(neighbor, neighbor_port)
)
thread.daemon = True
thread.start()
flap_threads.append(thread)
elif interface == "all":
neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None)
if neighbor_host:
thread = InterruptableThread(
target=flap_neighbor_interface,
args=(neighbor_host, neighbor_port, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread.daemon = True
thread.start()
logger.info("Start flap thread {} neighbor {} port {}".format(thread, neighbor_host, neighbor_port))
flap_threads.append(thread)
else:
logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port))
# create only one thread for vs due to memory resource limitation
if asic_type == "vs":
break
elif test_type == "all":
for interface in interface_list:
logger.info("shutdown all interface {} ".format(interface))
thread_dut = InterruptableThread(
target=flap_dut_interface,
args=(duthost, interface)
args=(duthost, interface, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread_dut.daemon = True
thread_dut.start()
logger.info("Start flap thread {} dut {} interface {}".format(thread_dut, duthost, interface))
flap_threads.append(thread_dut)

thread_fanout = InterruptableThread(
target=flap_fanout_interface,
args=(fanouthosts, duthost, interface)
)
thread_fanout.daemon = True
thread_fanout.start()
flap_threads.append(thread_fanout)

neighbor = eth_nbrs[interface]["name"]
neighbor_name = eth_nbrs[interface]["name"]
neighbor_port = eth_nbrs[interface]["port"]
thread_neighbor = InterruptableThread(
target=flap_neighbor_interface,
args=(neighbor, neighbor_port)
)
thread_neighbor.daemon = True
thread_neighbor.start()
flap_threads.append(thread_neighbor)
neighbor_host = nbrhosts.get(neighbor_name, {}).get('host', None)
if neighbor_host:
thread_neighbor = InterruptableThread(
target=flap_neighbor_interface,
args=(neighbor_host, neighbor_port, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread_neighbor.daemon = True
thread_neighbor.start()
logger.info("Start flap thread {} neighbor {} port {}".format(
thread_neighbor, neighbor_host, neighbor_port))
flap_threads.append(thread_neighbor)
else:
logger.debug("neighbor host not found for {} port {}".format(neighbor_name, neighbor_port))

thread_fanout = InterruptableThread(
target=flap_fanout_interface,
args=(interface_list, fanouthosts, duthost, stop_event, delay_time, TEST_RUN_DURATION,)
)
thread_fanout.daemon = True
thread_fanout.start()
logger.info("Start flap thread {} fanout {} dut {} ".format(
thread_fanout, fanouthosts, duthost))
flap_threads.append(thread_fanout)

logger.info("flap_threads {} ".format(flap_threads))
time.sleep(600)

end_time = time.time() + TEST_RUN_DURATION
while time.time() < end_time:
time.sleep(30)

cmd = "free -m"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))
lines = cmd_out.split('\n')
for line in lines:
if line.startswith("Mem:"):
fields = line.split()
total_mem, avail_mem = int(fields[1]), int(fields[-1])
logger.info("Total memory {} Available memory: {}".format(total_mem, avail_mem))
break

if avail_mem < MEMORY_EXHAUST_THRESHOLD:
logger.error("Available memory {} is less than {}, stopping the test".format(
avail_mem, MEMORY_EXHAUST_THRESHOLD))

cmd = "top -b -n 1"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))

cmd = "sudo monit status"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))

cmd = "docker stats --no-stream"
cmd_out = duthost.shell(cmd, module_ignore_errors=True).get('stdout', None)
logger.info("cmd {} response: {}".format(cmd, cmd_out))

break

logger.info("Test running for {} seconds".format(time.time() + TEST_RUN_DURATION - end_time))
logger.info("Test run duration dut_flap_count {} fanout_flap_count {} neighbor_flap_count {}".format(
dut_flap_count, fanout_flap_count, neighbor_flap_count))
stop_event.set()
stop_threads = True
time.sleep(60)
logger.info("stop_threads {} ".format(flap_threads))
time.sleep(30)

for thread in flap_threads:
logger.info("waiting thread {} done".format(thread))
try:
thread.join(timeout=30)
logger.info("thread {} joined".format(thread))
if thread.is_alive():
thread.join(timeout=30)
logger.info("thread {} joined".format(thread))
except Exception as e:
logger.debug("Exception occurred in thread %r:", thread)
logger.debug("".join(traceback.format_exception(None, e, e.__traceback__)))

# Clean up the thread list after joining all threads
logger.info("clear threads {} ".format(flap_threads))
flap_threads.clear()

return

0 comments on commit 41cb2f7

Please sign in to comment.