Skip to content

Commit

Permalink
Changed analysis plugins to directly read from storage #3439
Browse files Browse the repository at this point in the history
  • Loading branch information
joachimmetz committed Jan 17, 2021
1 parent a35c896 commit 697af9a
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 177 deletions.
69 changes: 68 additions & 1 deletion plaso/analysis/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,19 @@
from plaso.containers import events
from plaso.containers import reports
from plaso.lib import definitions
from plaso.storage import event_tag_index


class AnalysisPlugin(object):
"""Class that defines the analysis plugin interface."""
"""Class that defines the analysis plugin interface.
Attributes:
number_of_consumed_events (int): number of events consumed by the analysis
plugin.
number_of_filtered_events (int): number of events filtered by the event
filter during analysis.
plugin_type (str): analysis plugin type.
"""

# The name of the plugin. This is the name that is matched against when
# loading plugins, so it is important that this name is short, concise and
Expand All @@ -25,6 +34,10 @@ def __init__(self):
"""Initializes an analysis plugin."""
super(AnalysisPlugin, self).__init__()
self._analysis_counter = collections.Counter()
self._event_tag_index = event_tag_index.EventTagIndex()

self.number_of_consumed_events = 0
self.number_of_filtered_events = 0
self.plugin_type = analysis_definitions.PLUGIN_TYPE_REPORT

@property
Expand Down Expand Up @@ -90,3 +103,57 @@ def ExamineEvent(self, mediator, event, event_data, event_data_stream):
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
"""

def ProcessEventStore(self, mediator, storage_reader, event_filter=None):
"""Analyzes an event store.
Args:
mediator (AnalysisMediator): mediates interactions between
analysis plugins and other components, such as storage and dfvfs.
storage_reader (StorageReader): storage reader.
event_filter (Optional[EventObjectFilter]): event filter.
"""
# TODO: determine if filter_limit makes sense for analysis plugins or
# that it should be removed.
filter_limit = getattr(event_filter, 'limit', None)

# TODO: test if GetEvents is faster for analysis plugins that do not
# require the events to be in chronological order.
# if event_filter:
# event_generator = storage_reader.GetSortedEvents()
# else:
# event_generator = storage_reader.GetEvents()

for event in storage_reader.GetSortedEvents():
if mediator.abort:
break

event_data_identifier = event.GetEventDataIdentifier()
event_data = storage_reader.GetEventDataByIdentifier(
event_data_identifier)

event_data_stream_identifier = event_data.GetEventDataStreamIdentifier()
event_data_stream = None
if event_data_stream_identifier:
event_data_stream = storage_reader.GetEventDataStreamByIdentifier(
event_data_stream_identifier)

event_identifier = event.GetIdentifier()
event_tag = self._event_tag_index.GetEventTagByIdentifier(
storage_reader, event_identifier)

filter_match = None
if event_filter:
filter_match = event_filter.Match(
event, event_data, event_data_stream, event_tag)

# pylint: disable=singleton-comparison
if filter_match == False:
self.number_of_filtered_events += 1
continue

self.ExamineEvent(mediator, event, event_data, event_data_stream)
self.number_of_consumed_events += 1

if filter_limit and filter_limit == self.number_of_consumed_events:
break
3 changes: 1 addition & 2 deletions plaso/cli/psort_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ def __init__(self, input_reader=None, output_writer=None):
self._analysis_plugins_output_format = None
self._command_line_arguments = None
self._deduplicate_events = True
self._event_filter_expression = None
self._event_filter = None
self._event_filter_expression = None
self._knowledge_base = knowledge_base.KnowledgeBase()
self._number_of_analysis_reports = 0
self._output_time_zone = None
Expand Down Expand Up @@ -567,7 +567,6 @@ def ProcessStorage(self):
analysis_engine.AnalyzeEvents(
self._knowledge_base, storage_writer, self._data_location,
self._analysis_plugins, configuration,
event_filter=self._event_filter,
event_filter_expression=self._event_filter_expression,
status_update_callback=status_update_callback)

Expand Down
96 changes: 38 additions & 58 deletions plaso/multi_processing/analysis_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from plaso.analysis import mediator as analysis_mediator
from plaso.containers import tasks
from plaso.engine import plaso_queue
from plaso.filters import event_filter
from plaso.lib import definitions
from plaso.lib import errors
from plaso.multi_processing import base_process
Expand Down Expand Up @@ -49,7 +50,6 @@ def __init__(
self._event_queue = event_queue
self._foreman_status_wait_event = None
self._knowledge_base = knowledge_base
self._number_of_consumed_events = 0
self._status = definitions.STATUS_INDICATOR_INITIALIZED
self._storage_writer = storage_writer
self._task = None
Expand All @@ -60,6 +60,11 @@ def _GetStatus(self):
Returns:
dict[str, object]: status attributes, indexed by name.
"""
number_of_consumed_events = 0
if self._analysis_plugin:
number_of_consumed_events = (
self._analysis_plugin.number_of_consumed_events)

if self._analysis_mediator:
number_of_produced_event_tags = (
self._analysis_mediator.number_of_produced_event_tags)
Expand All @@ -85,7 +90,7 @@ def _GetStatus(self):
'display_name': '',
'identifier': self._name,
'number_of_consumed_event_tags': None,
'number_of_consumed_events': self._number_of_consumed_events,
'number_of_consumed_events': number_of_consumed_events,
'number_of_consumed_reports': None,
'number_of_consumed_sources': None,
'number_of_consumed_warnings': None,
Expand All @@ -98,11 +103,6 @@ def _GetStatus(self):
'task_identifier': None,
'used_memory': used_memory}

if self._status in (
definitions.STATUS_INDICATOR_ABORTED,
definitions.STATUS_INDICATOR_COMPLETED):
self._foreman_status_wait_event.set()

return status

def _Main(self):
Expand Down Expand Up @@ -149,30 +149,14 @@ def _Main(self):
storage_writer.WriteTaskStart()

try:
logger.debug(
'{0!s} (PID: {1:d}) started monitoring event queue.'.format(
self._name, self._pid))

while not self._abort:
try:
queued_object = self._event_queue.PopItem()
filter_object = None
if self._event_filter_expression:
filter_object = event_filter.EventObjectFilter()
filter_object.CompileFilter(self._event_filter_expression)

except (errors.QueueClose, errors.QueueEmpty) as exception:
logger.debug('ConsumeItems exiting with exception {0!s}.'.format(
type(exception)))
break

if isinstance(queued_object, plaso_queue.QueueAbort):
logger.debug('ConsumeItems exiting, dequeued QueueAbort object.')
break

self._ProcessEvent(self._analysis_mediator, *queued_object)

self._number_of_consumed_events += 1

logger.debug(
'{0!s} (PID: {1:d}) stopped monitoring event queue.'.format(
self._name, self._pid))
with self._storage_writer.CreateStorageReader() as storage_reader:
self._analysis_plugin.ProcessEventStore(
self._analysis_mediator, storage_reader, event_filter=filter_object)

if not self._abort:
self._status = definitions.STATUS_INDICATOR_REPORTING
Expand All @@ -182,6 +166,7 @@ def _Main(self):
# All exceptions need to be caught here to prevent the process
# from being killed by an uncaught exception.
except Exception as exception: # pylint: disable=broad-except
# TODO: write analysis error and change logger to debug only.
logger.warning(
'Unhandled exception in process: {0!s} (PID: {1:d}).'.format(
self._name, self._pid))
Expand All @@ -206,23 +191,38 @@ def _Main(self):
logger.warning('Unable to finalize task storage with error: {0!s}'.format(
exception))

if self._serializers_profiler:
self._storage_writer.SetSerializersProfiler(None)

if self._storage_profiler:
self._storage_writer.SetStorageProfiler(None)

self._StopProfiling()

if self._abort:
self._status = definitions.STATUS_INDICATOR_ABORTED
else:
self._status = definitions.STATUS_INDICATOR_COMPLETED

self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT)
while not self._abort:
try:
queued_object = self._event_queue.PopItem()

logger.debug('Analysis plugin: {0!s} (PID: {1:d}) stopped'.format(
self._name, self._pid))
except (errors.QueueClose, errors.QueueEmpty) as exception:
logger.debug('ConsumeItems exiting with exception {0!s}.'.format(
type(exception)))
break

if self._serializers_profiler:
self._storage_writer.SetSerializersProfiler(None)
if isinstance(queued_object, plaso_queue.QueueAbort):
logger.debug('ConsumeItems exiting, dequeued QueueAbort object.')
break

if self._storage_profiler:
self._storage_writer.SetStorageProfiler(None)
# TODO: is this wait event still needed?
self._foreman_status_wait_event.set()
self._foreman_status_wait_event.wait(self._FOREMAN_STATUS_WAIT)

self._StopProfiling()
logger.debug('Analysis plugin: {0!s} (PID: {1:d}) stopped'.format(
self._name, self._pid))

self._analysis_mediator = None
self._foreman_status_wait_event = None
Expand All @@ -234,26 +234,6 @@ def _Main(self):
except errors.QueueAlreadyClosed:
logger.error('Queue for {0:s} was already closed.'.format(self.name))

def _ProcessEvent(self, mediator, event, event_data, event_data_stream):
"""Processes an event.
Args:
mediator (AnalysisMediator): mediates interactions between
analysis plugins and other components, such as storage and dfvfs.
event (EventObject): event.
event_data (EventData): event data.
event_data_stream (EventDataStream): event data stream.
"""
try:
self._analysis_plugin.ExamineEvent(
mediator, event, event_data, event_data_stream)

except Exception as exception: # pylint: disable=broad-except
# TODO: write analysis error and change logger to debug only.

logger.warning('Unhandled exception while processing event object.')
logger.exception(exception)

def SignalAbort(self):
"""Signals the process to abort."""
self._abort = True
Expand Down
Loading

0 comments on commit 697af9a

Please sign in to comment.