Skip to content

Commit

Permalink
Converting 'netmiko_grep' over to concurrent futures
Browse files Browse the repository at this point in the history
  • Loading branch information
ktbyers committed Sep 20, 2024
1 parent b725eb4 commit e7d095c
Showing 1 changed file with 27 additions and 36 deletions.
63 changes: 27 additions & 36 deletions netmiko/cli_tools/netmiko_grep.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
import sys
import os
import subprocess
import threading

try:
from Queue import Queue
except ImportError:
from queue import Queue
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from getpass import getpass

Expand All @@ -27,6 +22,8 @@
ERROR_PATTERN = "%%%failed%%%"
__version__ = "5.0.0"

max_workers = int(os.environ.get("NETMIKO_MAX_THREADS", 10))


def grepx(files, pattern, grep_options, use_colors=True):
"""Call system grep"""
Expand All @@ -48,14 +45,14 @@ def grepx(files, pattern, grep_options, use_colors=True):
return ""


def ssh_conn(device_name, device_params, cli_command, output_q):
def ssh_conn(device_name, device_params, cli_command):
try:
with ConnectHandler(**device_params) as net_connect:
net_connect.enable()
output = net_connect.send_command(cli_command)
return device_name, output
except Exception:
output = ERROR_PATTERN
output_q.put({device_name: output})
return device_name, ERROR_PATTERN


def parse_arguments(args):
Expand Down Expand Up @@ -131,18 +128,18 @@ def main(args):
use_cached_files = cli_args.use_cache
hide_failed = cli_args.hide_failed

output_q = Queue()

# DEVICE LOADING #####
devices = obtain_devices(device_or_group)

# Retrieve output from devices
my_files = []
failed_devices = []
results = {}
if not use_cached_files:

# UPDATE DEVICE PARAMS (WITH CLI ARGS) #####
device_tasks = []
for device_name, device_params in devices.items():
# UPDATE DEVICE PARAMS (WITH CLI ARGS) #####
update_device_params(
device_params,
username=cli_username,
Expand All @@ -152,31 +149,25 @@ def main(args):
if not cmd_arg:
device_type = device_params["device_type"]
cli_command = SHOW_RUN_MAPPER.get(device_type, "show run")
device_tasks.append((device_name, device_params, cli_command))

# THREADING #####
my_thread = threading.Thread(
target=ssh_conn,
args=(device_name, device_params, cli_command, output_q),
)
my_thread.start()

# Make sure all threads have finished
main_thread = threading.current_thread()
for some_thread in threading.enumerate():
if some_thread != main_thread:
some_thread.join()
# Write files
while not output_q.empty():
my_dict = output_q.get()
netmiko_base_dir, netmiko_full_dir = find_netmiko_dir()
ensure_dir_exists(netmiko_base_dir)
ensure_dir_exists(netmiko_full_dir)
for device_name, output in my_dict.items():
file_name = write_tmp_file(device_name, output)
if ERROR_PATTERN not in output:
my_files.append(file_name)
else:
failed_devices.append(device_name)
# THREADING #####
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(ssh_conn, *args) for args in device_tasks]
for future in as_completed(futures):
device_name, output = future.result()
results[device_name] = output

netmiko_base_dir, netmiko_full_dir = find_netmiko_dir()
ensure_dir_exists(netmiko_base_dir)
ensure_dir_exists(netmiko_full_dir)
for device_name, output in results.items():

file_name = write_tmp_file(device_name, output)
if ERROR_PATTERN not in output:
my_files.append(file_name)
else:
failed_devices.append(device_name)
else:
for device_name in devices:
file_name = obtain_netmiko_filename(device_name)
Expand Down

0 comments on commit e7d095c

Please sign in to comment.