Skip to content

How to make a processor

Stijn Peeters edited this page Jan 25, 2022 · 29 revisions

Processors

4CAT is a modular tool. Its modules come in two varietes: data sources and processors. This article covers the latter.

Processors are bits of code that produce a dataset. Typically, their input is another dataset. As such they can be used to analyse data; for example, a processor can take a csv file containing posts as input, count how many posts occur per month, and produce another csv file with the amount of posts per month (one month per row) as output. Processors always produce the following things:

  • A set of metadata for the Dataset the processor will produce. This is stored in 4CAT's PostgreSQL database. The record for the database is created when the processor's job is first queued, and updated by the processor.
  • A result file, which may have an arbitrary format. This file contains whatever the processor produces, e.g. a list of frequencies, an image wall or a zip archive containing word embedding models.
  • A log file, with the same file name as the result file but with a '.log' extension. This documents any output from the processor while it was producing the result file.

4CAT has an API that can do most of the scaffolding around this for you so processors can be quite lightweight and mostly focus on the analysis while 4CAT's back-end takes care of the scheduling, determining where the output should go, et cetera.

Example

This is a minimal, annotated example of a 4CAT processor:

"""
A minimal example 4CAT processor
"""
from backend.abstract.processor import BasicProcessor

class ExampleProcessor(BasicProcessor):
	"""
	Example Processor
	"""
	type = "example-processor"  # job type ID
	category = "Examples" # category
	title = "A simple example"  # title displayed in UI
	description = "This doesn't do much"  # description displayed in UI
	extension = "csv"  # extension of result file, used internally and in UI

	input = "csv:body"
	output = "csv:value"

	def process(self):
		"""
		Saves a CSV file with one column ("value") and one row with a value ("Hello
		world") and marks the dataset as finished.
		"""
		data = {"value": "Hello world!"}
		self.write_csv_items_and_finish(data)

Processor properties

Processor settings and metadata is stored as a class property. The following properties are available and can be set:

  • type (string) - A unique identifier for this processor type. Used internally to queue jobs, store results, et cetera. Should be a string without any spaces in it.
  • category (string) - A category for this processor, used in the 4CAT web interface to group processors thematically. Displayed as-is in the web interface.
  • title (string) - Processor title, displayed as-is in the web interface.
  • description (string) - Description, displayed in the web interface. Markdown in the description will be parsed.
  • extension (string) - Extension for files produced by this processor, e.g. csv or ndjson.
  • options (dictionary) - A dictionary of configurable input fields for this processor. See the page on input fields for more details.
  • interrupted (bool) - false by default. This may be set to true by 4CAT's scheduler. Once it is true, the processor should return as soon as possible, even if processing is not complete yet. If processing has not finished yet, the result so far (if any) should be discarded and the dataset status updated to reflect that it has been interrupted. Since abort procedures are different per processor, this is the processor's responsibility. The simplest way of addressing this is raising a ProcessorInterruptedException (found in backend.lib.exceptions); this will gracefully stop the processor and clean up so it may be attempted again later.
  • followups (list) - Optional. A list of processor type IDs that can be listed in the interface as being possible to run on the result of this processor. This is purely informative to the user - actual compatibility is determined by the is_compatible_with method (see below).

Processor API

The full API available to processors is as follows. All of these are members of the processor object, i.e. they are accessed via self.property within the processor code. While other methods or classes may be available within processors, relying on them is discouraged and unsupported; when possible, use only the API documented here.

Methods

  • process() -> void: This method should contain the processor's logic. Other methods may be defined, but this one will be called when an analysis with this processor is queued.
  • iterate_archive_contents(Path source_file) -> Generator: This yields Path objects for each file in the given archive. Files are temporarily uncompressed so they can be worked with, and then the temporary file is deleted afterwards.
  • write_csv_items_and_finish(list data) -> void: Writes a list of dictionaries as a CSV as the result of this processor, and finishes processing. Raises a ProcessorInterruptedException if the processor has been interrupted while writing.
  • write_archive_and_finish(list|Path files) -> void: Compresses all files in list files, or all files in folder files, into a zip archive and saves that as the dataset result. Files are deleted after adding them, and if files is a folder the folder is deleted too. Useful when processor output cannot be conveniently saved into one file.
  • map_item(item) -> dict: Optional. If defined, any item yielded by DataSet.iterate_items() (see below) for datasets produced by this processor will be passed through this method first, and the result of this method will be passed instead. This is especially useful for datasets that are stored as NDJSON, as items may be stored as nested objects with arbitrary depth in that format, and map_item() can be used to reduce them to 'flat' dictionaries before processing.
  • is_compatible_with(DataSet) -> bool: Optional. If defined, this method should be decorated as a @classmethod and take cls as its first argument. For every dataset, this method is then used to determine if the processor can run on that dataset. If the processor does not define this method, it will be available for any 'top-level' dataset (i.e. any dataset created via 'Create Dataset').
  • get_options(DataSet) -> dict: Optional. If defined, this method should be decorated as a @classmethod and take cls as its first argument. The method should then return a processor options definition (see below) that is compatible with the given dataset. If this method is not defined, the (static) options attribute of the processor class is used instead.

Properties

  • dataset (DataSet): The dataset produced by this processor
    • dataset.finish(int num_items_in_result) -> void: Manually mark the dataset as finished. Either this method or write_csv_and_finish() should always be called at the end of process(). If num_items_in_result is 0 or less, no download link is displayed for the result in the web interface. If it is 0, "No results" is additionally displayed in the interface.
    • dataset.get_result_path() -> pathlib.Path: Returns a Path object referencing the location the processor result should be saved to.
    • dataset.update_status(bool is_final=False) -> void: Updates the dataset status. This can be used to indicate processor progress to the user through the web interface. Note that this updates the database, which is relatively expensive, and you should not call it too often (for example, not every iteration of a loop, but only every 500 iterations). If is_final is set, subsequent calls to this method have no effect.
    • dataset.iterate_items(BasicProcessor) -> Generator: This yields one item from the dataset's data file per call and raises a ProcessorInterruptedException if the processor has been interrupted while iterating. Can be used on datasets that are CSV or NDJSON files. If possible, you should always use this method instead of interacting with the source file directly. If the map_item() method (see below) is defined for the processor that produced the dataset, the result of that method called with the item as an argument will be yielded instead.
    • dataset.get_staging_area() -> pathlib.Path: Returns a Path object referencing a newly created folder that can be used as a staging area; files can be stored here while the processor is active. After the processor finishes, files here may be deleted and cannot be relied on. You should nevertheless take care of this within the processor, and delete the path (e.g. with shutil.rmtree()) after it is no longer needed, or use the processor.write_archive_and_finish() method with the staging area Path as an argument, which will also delete it.
  • parent (DataSet): The Dataset used as input by this processor
  • source_file (pathlib.Path): The file to be processed as input, i.e. the data file corresponding to the parent Dataset
  • parameters (dict): Options set by the user for this processor