Skip to content

Commit

Permalink
Merge pull request #341 from JeffersonLab/nbrei_examples_testing
Browse files Browse the repository at this point in the history
Fix examples and add testing to CI
  • Loading branch information
nathanwbrei committed Aug 19, 2024
2 parents ef43454 + 9f682f3 commit b65ac3c
Show file tree
Hide file tree
Showing 19 changed files with 193 additions and 49 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ccpp-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,15 @@ jobs:
run: |
export LD_LIBRARY_PATH=/app/podio/install/lib:$LD_LIBRARY_PATH
ctest --test-dir build --output-on-failure -R jana-example-timeslices-complex-tests
- name: Janadot
run: |
export LD_LIBRARY_PATH=/app/podio/install/lib:$LD_LIBRARY_PATH
ctest --test-dir build --output-on-failure -R jana-plugin-janadot-tests
- name: Other examples
run: |
export LD_LIBRARY_PATH=/app/podio/install/lib:$LD_LIBRARY_PATH
ctest --test-dir build --output-on-failure -R "jana-example-dst-tests|jana-example-tutorial-tests|jana-example-eventgroup-tests|jana-example-unit-tests"
1 change: 0 additions & 1 deletion src/examples/EventGroupExample/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ add_jana_plugin(EventGroupExample)
add_test(NAME jana-example-eventgroup-tests
COMMAND jana -Pplugins=EventGroupExample)

set_tests_properties(jana-example-eventgroup-tests PROPERTIES DISABLED TRUE)
10 changes: 5 additions & 5 deletions src/examples/EventGroupExample/GroupedEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,23 @@ class GroupedEventProcessor : public JEventProcessor {
public:
GroupedEventProcessor() {
SetTypeName(NAME_OF_THIS);
SetCallbackStyle(CallbackStyle::ExpertMode);
SetCallbackStyle(CallbackStyle::LegacyMode);
}

void Process(const JEvent& event) override {
void Process(const std::shared_ptr<const JEvent>& event) override {

// In parallel, perform a random amount of (slow) computation
consume_cpu_ms(100, 1.0);

auto tridas_event = event.GetSingle<TridasEvent>();
auto tridas_event = event->GetSingle<TridasEvent>();
tridas_event->should_keep = true;

auto group = event.GetSingle<JEventGroup>();
auto group = event->GetSingle<JEventGroup>();

// Sequentially, process each event and report when a group finishes
std::lock_guard<std::mutex> lock(m_mutex);

LOG << "Processed group #" << group->GetGroupId() << ", event #" << event.GetEventNumber() << LOG_END;
LOG << "Processed group #" << group->GetGroupId() << ", event #" << event->GetEventNumber() << LOG_END;

bool finishes_group = group->FinishEvent();
if (finishes_group) {
Expand Down
2 changes: 0 additions & 2 deletions src/examples/MetadataExample/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,3 @@ add_jana_plugin(MetadataExample)
add_test(NAME jana-example-metadata-tests
COMMAND jana -Pplugins=MetadataExample)

set_tests_properties(jana-example-metadata-tests
PROPERTIES DISABLED TRUE)
12 changes: 6 additions & 6 deletions src/examples/MetadataExample/MetadataAggregator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

MetadataAggregator::MetadataAggregator() {
SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name
SetCallbackStyle(CallbackStyle::ExpertMode);
SetCallbackStyle(CallbackStyle::LegacyMode);
}

void MetadataAggregator::Init() {
Expand All @@ -21,17 +21,17 @@ void MetadataAggregator::Init() {
LOG << "MetadataAggregator::Init" << LOG_END;
}

void MetadataAggregator::Process(const JEvent& event) {
LOG << "MetadataAggregator::Process, Run #" << event.GetRunNumber() << ", Event #" << event.GetEventNumber() << LOG_END;
void MetadataAggregator::Process(const std::shared_ptr<const JEvent>& event) {
LOG << "MetadataAggregator::Process, Run #" << event->GetRunNumber() << ", Event #" << event->GetEventNumber() << LOG_END;

// Acquire tracks in parallel
auto tracks = event.Get<Track>(m_track_factory);
auto tracks = event->Get<Track>(m_track_factory);

// Lock mutex, so we can update shared state sequentially
std::lock_guard<std::mutex>lock(m_mutex);

// Since the run number probably doesn't change too frequently we cache the last entry
int run_nr = event.GetRunNumber();
int run_nr = event->GetRunNumber();
if (run_nr != m_last_run_nr) {
m_last_run_nr = run_nr;
m_last_statistics = &m_statistics[m_last_run_nr]; // Get-or-create
Expand All @@ -40,7 +40,7 @@ void MetadataAggregator::Process(const JEvent& event) {
// Update the statistics accumulator using the metadata from this event
m_last_statistics->event_count += 1;
m_last_statistics->total_track_count += tracks.size();
m_last_statistics->total_latency_ns += event.GetMetadata<Track>(m_track_factory).elapsed_time_ns;
m_last_statistics->total_latency_ns += event->GetMetadata<Track>(m_track_factory).elapsed_time_ns;
}

void MetadataAggregator::Finish() {
Expand Down
2 changes: 1 addition & 1 deletion src/examples/MetadataExample/MetadataAggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class MetadataAggregator : public JEventProcessor {
virtual ~MetadataAggregator() = default;

void Init() override;
void Process(const JEvent&) override;
void Process(const std::shared_ptr<const JEvent>&) override;
void Finish() override;

};
Expand Down
3 changes: 1 addition & 2 deletions src/examples/Tutorial/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
add_jana_plugin(Tutorial)

add_test(NAME jana-example-tutorial-tests
COMMAND jana -Pplugins=Tutorial)
COMMAND jana -Pplugins=Tutorial -Pjana:nevents=50)

set_tests_properties(jana-example-tutorial-tests PROPERTIES DISABLED TRUE)

8 changes: 4 additions & 4 deletions src/examples/Tutorial/TutorialProcessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

TutorialProcessor::TutorialProcessor() {
SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name
SetCallbackStyle(CallbackStyle::ExpertMode);
SetCallbackStyle(CallbackStyle::LegacyMode);
}

void TutorialProcessor::Init() {
Expand All @@ -21,12 +21,12 @@ void TutorialProcessor::Init() {
}
}

void TutorialProcessor::Process(const JEvent& event) {
LOG << "TutorialProcessor::Process, Event #" << event.GetEventNumber() << LOG_END;
void TutorialProcessor::Process(const std::shared_ptr<const JEvent>& event) {
LOG << "TutorialProcessor::Process, Event #" << event->GetEventNumber() << LOG_END;

/// Do everything we can in parallel
/// Warning: We are only allowed to use local variables and `event` here
auto hits = event.Get<Hit>();
auto hits = event->Get<Hit>();

/// Lock mutex
std::lock_guard<std::mutex>lock(m_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/examples/Tutorial/TutorialProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class TutorialProcessor : public JEventProcessor {
virtual ~TutorialProcessor() = default;

void Init() override;
void Process(const JEvent& event) override;
void Process(const std::shared_ptr<const JEvent>& event) override;
void Finish() override;

};
Expand Down
65 changes: 49 additions & 16 deletions src/libraries/JANA/JEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,31 +47,39 @@ class JEventProcessor : public jana::omni::JComponent,

virtual void DoMap(const std::shared_ptr<const JEvent>& e) {

if (m_callback_style == CallbackStyle::LegacyMode) {
throw JException("Called DoMap() on a legacy-mode JEventProcessor");
}

for (auto* input : m_inputs) {
input->PrefetchCollection(*e);
}
// JExceptions with factory info will be furnished by the callee,
// so we don't need to try/catch here.

// Also we don't have
// a Preprocess(), so we don't technically need Init() here even

if (m_callback_style != CallbackStyle::DeclarativeMode) {
DoReduce(e); // This does all the locking!
}
}


virtual void DoReduce(const std::shared_ptr<const JEvent>& e) {
auto run_number = e->GetRunNumber();
virtual void DoTap(const std::shared_ptr<const JEvent>& e) {

if (m_callback_style == CallbackStyle::LegacyMode) {
throw JException("Called DoReduce() on a legacy-mode JEventProcessor");
}
std::lock_guard<std::mutex> lock(m_mutex);
// In principle DoReduce() is being called by one thread at a time, but we hold a lock anyway
// so that this runs correctly even if that isn't happening. This lock shouldn't experience
// any contention.

if (m_status == Status::Uninitialized) {
DoInitialize();
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
}
for (auto* input : m_inputs) {
// This collection should have already been computed during DoMap()
// We do this before ChangeRun() just in case we will need to pull data out of
// a begin-of-run event.
input->GetCollection(*e);
}
auto run_number = e->GetRunNumber();
if (m_last_run_number != run_number) {
if (m_last_run_number != -1) {
CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
Expand All @@ -82,9 +90,6 @@ class JEventProcessor : public jana::omni::JComponent,
m_last_run_number = run_number;
CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(e); });
}
for (auto* input : m_inputs) {
input->GetCollection(*e);
}
if (m_callback_style == CallbackStyle::DeclarativeMode) {
CallWithJExceptionWrapper("JEventProcessor::Process", [&](){
Process(e->GetRunNumber(), e->GetEventNumber(), e->GetEventIndex());
Expand All @@ -93,9 +98,37 @@ class JEventProcessor : public jana::omni::JComponent,
else if (m_callback_style == CallbackStyle::ExpertMode) {
CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(*e); });
}
else {
CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(e); });
m_event_count += 1;
}


virtual void DoLegacyProcess(const std::shared_ptr<const JEvent>& event) {

// DoLegacyProcess doesn't hold any locks, as it requires the user to hold a lock for it.
// Because of this,
if (m_callback_style != CallbackStyle::LegacyMode) {
throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor");
}

auto run_number = event->GetRunNumber();

if (m_status == Status::Uninitialized) {
DoInitialize();
}
else if (m_status == Status::Finalized) {
throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()");
}
if (m_last_run_number != run_number) {
if (m_last_run_number != -1) {
CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); });
}
for (auto* resource : m_resources) {
resource->ChangeRun(event->GetRunNumber(), m_app);
}
m_last_run_number = run_number;
CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); });
}
CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); });
m_event_count += 1;
}

Expand Down
2 changes: 2 additions & 0 deletions src/libraries/JANA/Omni/JComponentFwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ struct JComponent {

virtual void Summarize(JComponentSummary&) const {};

CallbackStyle GetCallbackStyle() const { return m_callback_style; }

Status GetStatus() const {
std::lock_guard<std::mutex> lock(m_mutex);
return m_status;
Expand Down
15 changes: 14 additions & 1 deletion src/libraries/JANA/Topology/JEventMapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Subject to the terms in the LICENSE file found in the top-level directory.


#include "JANA/JEventProcessor.h"
#include <JANA/Topology/JEventMapArrow.h>

#include <JANA/JEventSource.h>
Expand All @@ -28,8 +29,11 @@ void JEventMapArrow::add_unfolder(JEventUnfolder* unfolder) {
m_unfolders.push_back(unfolder);
}

void JEventMapArrow::add_processor(JEventProcessor* processor) {
m_procs.push_back(processor);
}

void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {


LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventSource* source : m_sources) {
Expand All @@ -40,6 +44,15 @@ void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status&
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), unfolder->GetTypeName()); // times execution until this goes out of scope
unfolder->Preprocess(**event);
}
for (JEventProcessor* processor : m_procs) {
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope
if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) {
processor->DoLegacyProcess(*event);
}
else {
processor->DoMap(*event);
}
}
LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
status = JArrowMetrics::Status::KeepGoing;
Expand Down
3 changes: 3 additions & 0 deletions src/libraries/JANA/Topology/JEventMapArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
class JEventPool;
class JEventSource;
class JEventUnfolder;
class JEventProcessor;
class JEvent;

using Event = std::shared_ptr<JEvent>;
Expand All @@ -18,12 +19,14 @@ class JEventMapArrow : public JPipelineArrow<JEventMapArrow, Event> {
private:
std::vector<JEventSource*> m_sources;
std::vector<JEventUnfolder*> m_unfolders;
std::vector<JEventProcessor*> m_procs;

public:
JEventMapArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue);

void add_source(JEventSource* source);
void add_unfolder(JEventUnfolder* unfolder);
void add_processor(JEventProcessor* proc);

void process(Event* event, bool& success, JArrowMetrics::Status& status);

Expand Down
9 changes: 8 additions & 1 deletion src/libraries/JANA/Topology/JEventProcessorArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ void JEventProcessorArrow::process(Event* event, bool& success, JArrowMetrics::S
for (JEventProcessor* processor : m_processors) {
// TODO: Move me into JEventProcessor::DoMap
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope
processor->DoMap(*event);
if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) {
processor->DoLegacyProcess(*event);
}
else {
processor->DoMap(*event);
processor->DoTap(*event);

}
}
LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
Expand Down
57 changes: 57 additions & 0 deletions src/libraries/JANA/Topology/JEventTapArrow.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.


#include <JANA/Topology/JEventTapArrow.h>
#include <JANA/Utils/JEventPool.h>
#include <JANA/JEventProcessor.h>
#include <JANA/JEventUnfolder.h>
#include <JANA/JEvent.h>


JEventTapArrow::JEventTapArrow(std::string name,
EventQueue *input_queue,
EventQueue *output_queue,
JEventPool *pool)
: JPipelineArrow(std::move(name),
false,
false,
false,
input_queue,
output_queue,
pool) {}

void JEventTapArrow::add_processor(JEventProcessor* proc) {
m_procs.push_back(proc);
}

void JEventTapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {

LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventProcessor* proc : m_procs) {
JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), proc->GetTypeName()); // times execution until this goes out of scope
if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) {
proc->DoTap(*event);
}
}
LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END;
success = true;
status = JArrowMetrics::Status::KeepGoing;
}

void JEventTapArrow::initialize() {
LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_procs) {
processor->DoInitialize();
LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
}
}

void JEventTapArrow::finalize() {
LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END;
for (auto processor : m_procs) {
processor->DoFinalize();
LOG_INFO(m_logger) << "Finalized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END;
}
}

Loading

0 comments on commit b65ac3c

Please sign in to comment.