Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SONiC FM (Fault Mgmt) infrastructure -Base version #421

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions sonic-faultmgrd/scripts/fault_policy.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"chassis": [
{
"name": "Cisco-8102-C64",
"faults": [
{
"type" : "CUSTOM_EVPROFILE_CHANGE",
"severity" : "MAJOR",
"action" : ["syslog"]
},
{
"type" : "TEMPERATURE_EXCEEDED",
"severity" : "CRITICAL",
"action" : ["syslog", "obfl", "reload"]
},
{
"type": "FANS MISSING",
"severity": "CRITICAL",
"action" : ["syslog", "obfl", "shutdown"]
},
{
"type": "EVENTS_MONITORING",
"severity": "WARNING",
"action" : ["syslog"]
}
]
}
]
}
293 changes: 293 additions & 0 deletions sonic-faultmgrd/scripts/faultmgrd
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
#!/usr/bin/env python3

import os
import re
import yaml
import argparse
import json

import redis
from sonic_py_common import daemon_base, logger
from swsscommon import swsscommon

import sys
import syslog
import time
import threading
from enum import Enum
import subprocess
import uuid

#####################################
## Host microservice fault_manager ##
#####################################
SYSLOG_IDENTIFIER = 'faultMgrd'
helper_logger = logger.Logger(SYSLOG_IDENTIFIER)
SELECT_TIMEOUT = 1000
json_data = {}

##############################################################################
# Purpose:
#########
# Have a generic Fault Management (FM) infrastructure which can get/fetch all
# live system events (e.g. faults) and process them against the fault-action
# policy file to determine the right set of system-level action(s) needed.
# Then, perform those action(s).
#
# SONiC system's present state:
##############################
# 1. FDR (Fault Detector and Reporter) can publish events (faults) via event
# framework's event_publish() based on specific sonic-events*.yang file.
# source location: /usr/local/yang-models/
# 2. In absence of eventDB at event-framework, events' consumer to listen to
# live streaming of events over ZMQ, and then parse them in real-time.
# In near future:
################
# a. #1 above would have a generic event SCHEMA yang file sonic-event.yang
# for FDRs to publish their events via event-framework
# b. #2 would have eventDB to capture live events. This FM service can then
# subscriber to eventDB to get all those live events and then parse them
# to take needed action(s)
# Plan (this workflow):
######################
# - Due to #2, fault_publisher (faultpubd) service populate events as
# EVENT_TABLE entry in host's STATE redisDB
# - FM subscribes to EVENT_TABLE of host's STATE DB to get the events.
# - Process them on SET and DEL EVENT_TABLE operations
# a) Parse them against sonic-event.yang file and formulate a local unique
# event (fault) entry for each notified EVENT_TABLE entry
# b) This entry contains entire fault payload (id, type, severity, cause,
# operation etc.)
# c) Parse the fault entry against the fault-action policy file to assess
# action(s) needed i.e. perform lookup for fault type and severity in
# policy file.
# d) Once a match is found, take the action(s) as specified by the match
#############################################################################

class faultManager():
# Interval to run this microservice
FM_INTERVAL = 15
# Policy file defining fault policies and their respective actions
FM_POLICY_FILE = '/usr/share/sonic/platform/fault_policy.json'

state_db = None

def determine_fault_action_policies(self):
global json_data
try:
# Load, parse fault_policy json file
helper_logger.log_notice("Populating data from JSON file:{}".format(json_data))
with open('./fault_policy.json', 'r') as f:
json_data = json.load(f)
helper_logger.log_notice("Populated data from JSON file:{}".format(json_data))
for entry in json_data['chassis']:
helper_logger.log_notice("chassis entry fetched: {}".format(entry))
for fault_entry in entry['faults']:
helper_logger.log_notice("fault entry fetched: {}".format(fault_entry))
#TODO: correct this error name
except redis.exceptions.ConnectionError as exc:
helper_logger.log_notice('issue with opening JSON policy file or parsing its contents: {}'.format(exc))

def __init__(self, id):
"""
Initializer of faultManager microservice(process)
"""
super(faultManager, self).__init__()

self.stop_event = threading.Event()

# Fault-Action policies
self.determine_fault_action_policies()

def stop_fault_manager_algorithm(self):
'''
Stop fault manager algorithm
'''
self._algorithm_running = False

def stop(self):
'''
Stop fault manager instance
'''
self.stop_fault_manager_algorithm()
self._running = False

def deinitialize(self):
'''
Destroy fault manager instance
'''
self.stop()
self.state_db = None


def map_dict_fvm(self, s, d):
for k, v in s.items():
d[k] = v

def analyze_db_events(self, event_obj, cnt):

# Get events from EVENT_TABLE of stateDB and analyze them.
# Map each event against each of the fault policy.
# Once a match is found, take the needed action(s).

global json_data

# Connect to redis state DB
self.state_db = daemon_base.db_connect("STATE_DB")
# Subscribe to EVENT_TABLE notifications in state DB
sel = swsscommon.Select()
sst = swsscommon.SubscriberStateTable(self.state_db, 'EVENT_TABLE')
sst.db_name = self.state_db
sst.table_name = 'EVENT_TABLE'
sel.addSelectable(sst)

# Listen indefinitely for changes to the EVENT_TABLE in state DB
event_entry_cache = {}
while True:
# Use timeout to prevent ignoring the signals to be handled here
# in signal handler() (e.g. SIGTERM for graceful shutdown)
(state, c) = sel.select(SELECT_TIMEOUT)

if state == swsscommon.Select.TIMEOUT:
# Do not flood log when select times out
continue
if state != swsscommon.Select.OBJECT:
helper_logger.log_notice("sel.select() did not return swsscommon.Select.OBJECT")
continue

# pop the data updates
(key, op, fvp) = sst.pop()
if not key:
break
fvp = dict(fvp) if fvp is not None else {}
helper_logger.log_notice("$$$ {} handle_event_table_updates() : op={} DB:{} Table:{} fvp {}".format(
key, op, sst.db_name, sst.table_name, fvp))

if 'id' not in fvp:
helper_logger.log_notice("alert - id field not found in received event_table entry!, setting to key in EVENT_TABLE|{}".format(key))
fvp['id'] = key
helper_logger.log_notice("event id:fvp[{}]".format(fvp['id']))
#fvp['key'] = key
fvp['op'] = op
if op == swsscommon.SET_COMMAND:
# save the received event_table entry, along with DB operation
event_entry_cache[key] = fvp
helper_logger.log_notice("event_entry_cache[{}]".format(event_entry_cache[key]))
helper_logger.log_notice("event_entry_cache after adding the new event_table entry with key:{} {}".format((key), event_entry_cache))
helper_logger.log_notice('Map state DB EVENT_TABLE|{} fault entry to FM action...'.format(key))
self.map_db_event_to_action(key, event_entry_cache, json_data)
elif op == swsscommon.DEL_COMMAND:
# remove the received event_table entry from the local cache
del event_entry_cache[key]
helper_logger.log_notice("event_entry_cache after del of tuple with key:{} {}".format((key), event_entry_cache))
# Alternate way:
# deltuple = event_entry_cache.pop(key, "No key found")
# helper_logger.log_notice("tuple removed: {}".format(deltuple))

def map_db_event_to_action(self, key, event_entry_cache, json_data):
# processing fault event received in event_entry
'''
Retrieve fault data from the event_entry and parse them against fault_policy JSON file
'''
try:
helper_logger.log_notice("Parse event_entry with key:{} {}".format(key, event_entry_cache[key]))
event_entry = {}
event_entry = event_entry_cache[key]
helper_logger.log_notice("Populated event_entry with key:{} {}".format(key, event_entry))

# Perform lookup for the received event(fault) match in fault_action_policy_info dictionary.
# Iterate through the fault_action_policy_info dictionary to find a match for the received event (fault).
# This dictionary is derived from fault_action_policy_info json file.
match = False
for entry in json_data['chassis']:
for fault_entry in entry['faults']:
if ((fault_entry['type'] == event_entry['type-id']) and (fault_entry['severity'] == event_entry['severity'])):
match = True
helper_logger.log_notice("Keys matched at fault_seq_id: {}".format(fault_entry))
helper_logger.log_notice("Fault type:{}, cause:{}, severity:{}, occured@:{}"
.format(event_entry['type-id'], event_entry['text'], event_entry['severity'], event_entry['time-created']))
helper_logger.log_notice("Action(s) to be taken: {}".format(fault_entry['action']))
for action in fault_entry['action']:
#if (action == 'syslog'):
# Already logged above so no further action
if (action == 'obfl'):
helper_logger.log_notice("log to OBFL: Fault type:{}, cause:{}, severity:{}, occured@:{}"
.format(event_entry['type-id'], event_entry['text'], event_entry['severity'], event_entry['time-created']))
# TBA (To Be Added) once obfl filesystem support is in place
if (action == 'reboot'):
helper_logger.log_notice("Initiate system reboot with cause: {}".format(event_entry['text']))
os.system('reboot -r "{}"'.format(event_entry['text']))
helper_logger.log_notice("Due to system reboot action request, shutting down FM service")
self.deinitialize()
sys.exit(1)
break
if not match:
helper_logger.log_notice("No match found in fault-policy JSON file for the received fault!")
except redis.exceptions.ConnectionError as exc:
helper_logger.log_notice('state DB currently unavailable: {}'.format(exc))


def map_event_to_action(self, cnt):
# Initialising event consumer object
event_consume = threading.Event()

# Start events' consumer thread to consume events from eventDB
thread_consume = threading.Thread(target=self.analyze_db_events, args=(event_consume, cnt))
thread_consume.start()
helper_logger.log_notice("analyze_db_events thread started")
event_consume.wait(1)
event_consume.clear()
helper_logger.log_notice("event_consume clear call is through")

def start_fault_manager_algorithm(self, cnt):
'''
Start fault management algorithm
'''
self._algorithm_running = True
helper_logger.log_notice("Entered start_fault_manager_algorithm...")
if self._algorithm_running:
try:
helper_logger.log_notice("Initiating FM algorithm sub-tasks")
helper_logger.log_notice("Task1: spawning map_event_to_action THREAD")
self.map_event_to_action(cnt)
helper_logger.log_notice("Task2: main THREAD returned to its NORMAL course")
except:
self.stop_fault_manager_algorithm()
raise

# primary logic to run fault management service
def run(self, cnt):
"""
Run main logic of this fault management service
:return:
"""
try:
if self:
helper_logger.log_notice("FM start_fault_manager_algorithm starting up...")
self.start_fault_manager_algorithm(cnt)
return True
except Exception as e:
helper_logger.log_error('Caught exception while executing FM run_policy() - {}'.format(repr(e)))
return False

def main():

helper_logger.log_notice("FM (Fault Management) service starting up...")

parser=argparse.ArgumentParser(
description="Check events published, receive and parse them")
parser.add_argument('-n', "--cnt", default=0, type=int,
help="count of events to receive")
args = parser.parse_args()

# Instantiate an object of class faultManager
fault_mgr = faultManager(SYSLOG_IDENTIFIER)

if not fault_mgr.run(args.cnt):
helper_logger.log_notice("Shutting down FM service with exit code ...")
fault_mgr.deinitialize()
helper_logger.log_notice("FM service exiting")
sys.exit(1)

if __name__ == '__main__':
main()
16 changes: 16 additions & 0 deletions sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[Unit]
Description=Fault Manager daemon
Requires=updategraph.service
After=updategraph.service
BindsTo=sonic.target
After=sonic.target
After=database-chassis.service

[Service]
Type=simple
ExecStart=/usr/local/bin/faultmgrd
Restart=always
RestartSec=30

[Install]
WantedBy=sonic.target
11 changes: 11 additions & 0 deletions sonic-faultmgrd/scripts/sonic-host-services-data.faultmgrd.timer
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[Unit]
Description=Delays faultmgrd daemon until SONiC, other services have started
PartOf=faultmgrd.service

[Timer]
OnUnitActiveSec=0 sec
OnBootSec=1min 30 sec
Unit=hostcfgd.service

[Install]
WantedBy=timers.target sonic.target
Loading