From dee13a2bccc4fec0a4cec7ed47a1ac33ce7cfc27 Mon Sep 17 00:00:00 2001 From: Kirk Byers Date: Wed, 7 Aug 2024 12:15:27 -0700 Subject: [PATCH] Unifying old/new pysnmp library support --- netmiko/snmp_autodetect.py | 67 +++++++++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/netmiko/snmp_autodetect.py b/netmiko/snmp_autodetect.py index 72549f3dd..7f147bf04 100644 --- a/netmiko/snmp_autodetect.py +++ b/netmiko/snmp_autodetect.py @@ -23,12 +23,18 @@ from typing import Optional, Dict, List from typing.re import Pattern +import asyncio import re import socket -import asyncio try: + from pysnmp.entity.rfc3413.oneliner import cmdgen + + SNMP_MODE = "legacy" +except ImportError: from pysnmp.hlapi.asyncio import cmdgen + + SNMP_MODE = "v6_async" except ImportError: raise ImportError("pysnmp not installed; please install it: 'pip install pysnmp'") @@ -333,6 +339,24 @@ async def _run_query(self, creds: object, oid: str) -> str: return str(varBinds[0][1]) return "" + def _get_snmpv3_asyncwr(self, oid: str) -> str: + """ + This is an asynchronous wrapper to call code in newer versions of the pysnmp library + (V6 and later). + """ + return asyncio.run( + self._run_query( + cmdgen.UsmUserData( + self.user, + self.auth_key, + self.encrypt_key, + authProtocol=self.auth_proto, + privProtocol=self.encryp_proto, + ), + oid, + ) + ) + def _get_snmpv3(self, oid: str) -> str: """ Try to send an SNMP GET operation using SNMPv3 for the specified OID. @@ -347,9 +371,10 @@ def _get_snmpv3(self, oid: str) -> str: string : str The string as part of the value from the OID you are trying to retrieve. """ + if SNMP_MODE == "legacy": + cmd_gen = cmdgen.CommandGenerator() - return asyncio.run( - self._run_query( + (error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd( cmdgen.UsmUserData( self.user, self.auth_key, @@ -357,9 +382,26 @@ def _get_snmpv3(self, oid: str) -> str: authProtocol=self.auth_proto, privProtocol=self.encryp_proto, ), + self.udp_transport_target, oid, + lookupNames=True, + lookupValues=True, ) - ) + + if not error_detected and snmp_data[0][1]: + return str(snmp_data[0][1]) + return "" + elif SNMP_MODE == "v6_async": + return self._get_snmpv3_asyncwr(oid=oid) + else: + raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'") + + def _get_snmpv2c_asyncwr(self, oid: str) -> str: + """ + This is an asynchronous wrapper to call code in newer versions of the pysnmp library + (V6 and later). + """ + return asyncio.run(self._run_query(cmdgen.CommunityData(self.community), oid)) def _get_snmpv2c(self, oid: str) -> str: """ @@ -375,8 +417,23 @@ def _get_snmpv2c(self, oid: str) -> str: string : str The string as part of the value from the OID you are trying to retrieve. """ + if SNMP_MODE == "legacy": + cmd_gen = cmdgen.CommandGenerator() + (error_detected, error_status, error_index, snmp_data) = cmd_gen.getCmd( + cmdgen.CommunityData(self.community), + self.udp_transport_target, + oid, + lookupNames=True, + lookupValues=True, + ) + if not error_detected and snmp_data[0][1]: + return str(snmp_data[0][1]) + return "" - return asyncio.run(self._run_query(cmdgen.CommunityData(self.community), oid)) + elif SNMP_MODE == "v6_async": + return self._get_snmpv2c_asyncwr(oid=oid) + else: + raise ValueError("SNMP mode must be set to 'legacy' or 'v6_async'") def _get_snmp(self, oid: str) -> str: """Wrapper for generic SNMP call."""