diff --git a/.github/workflows/ccpp-docker.yml b/.github/workflows/ccpp-docker.yml index 716092016..1498e2fc2 100644 --- a/.github/workflows/ccpp-docker.yml +++ b/.github/workflows/ccpp-docker.yml @@ -57,5 +57,15 @@ jobs: run: | export LD_LIBRARY_PATH=/app/podio/install/lib:$LD_LIBRARY_PATH ctest --test-dir build --output-on-failure -R jana-example-timeslices-complex-tests + - name: Janadot + run: | + export LD_LIBRARY_PATH=/app/podio/install/lib:$LD_LIBRARY_PATH + ctest --test-dir build --output-on-failure -R jana-plugin-janadot-tests + - name: Other examples + run: | + export LD_LIBRARY_PATH=/app/podio/install/lib:$LD_LIBRARY_PATH + ctest --test-dir build --output-on-failure -R "jana-example-dst-tests|jana-example-tutorial-tests|jana-example-eventgroup-tests|jana-example-unit-tests" + + diff --git a/src/examples/EventGroupExample/CMakeLists.txt b/src/examples/EventGroupExample/CMakeLists.txt index 8b761b3aa..63b34dd36 100644 --- a/src/examples/EventGroupExample/CMakeLists.txt +++ b/src/examples/EventGroupExample/CMakeLists.txt @@ -5,4 +5,3 @@ add_jana_plugin(EventGroupExample) add_test(NAME jana-example-eventgroup-tests COMMAND jana -Pplugins=EventGroupExample) -set_tests_properties(jana-example-eventgroup-tests PROPERTIES DISABLED TRUE) diff --git a/src/examples/EventGroupExample/GroupedEventProcessor.h b/src/examples/EventGroupExample/GroupedEventProcessor.h index d4bda7d24..795fac81d 100644 --- a/src/examples/EventGroupExample/GroupedEventProcessor.h +++ b/src/examples/EventGroupExample/GroupedEventProcessor.h @@ -21,23 +21,23 @@ class GroupedEventProcessor : public JEventProcessor { public: GroupedEventProcessor() { SetTypeName(NAME_OF_THIS); - SetCallbackStyle(CallbackStyle::ExpertMode); + SetCallbackStyle(CallbackStyle::LegacyMode); } - void Process(const JEvent& event) override { + void Process(const std::shared_ptr& event) override { // In parallel, perform a random amount of (slow) computation consume_cpu_ms(100, 1.0); - auto tridas_event = event.GetSingle(); + auto tridas_event = event->GetSingle(); tridas_event->should_keep = true; - auto group = event.GetSingle(); + auto group = event->GetSingle(); // Sequentially, process each event and report when a group finishes std::lock_guard lock(m_mutex); - LOG << "Processed group #" << group->GetGroupId() << ", event #" << event.GetEventNumber() << LOG_END; + LOG << "Processed group #" << group->GetGroupId() << ", event #" << event->GetEventNumber() << LOG_END; bool finishes_group = group->FinishEvent(); if (finishes_group) { diff --git a/src/examples/MetadataExample/CMakeLists.txt b/src/examples/MetadataExample/CMakeLists.txt index 67da55203..0358a8aed 100644 --- a/src/examples/MetadataExample/CMakeLists.txt +++ b/src/examples/MetadataExample/CMakeLists.txt @@ -5,5 +5,3 @@ add_jana_plugin(MetadataExample) add_test(NAME jana-example-metadata-tests COMMAND jana -Pplugins=MetadataExample) -set_tests_properties(jana-example-metadata-tests - PROPERTIES DISABLED TRUE) diff --git a/src/examples/MetadataExample/MetadataAggregator.cc b/src/examples/MetadataExample/MetadataAggregator.cc index aa785cf23..0cca79e4a 100644 --- a/src/examples/MetadataExample/MetadataAggregator.cc +++ b/src/examples/MetadataExample/MetadataAggregator.cc @@ -9,7 +9,7 @@ MetadataAggregator::MetadataAggregator() { SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name - SetCallbackStyle(CallbackStyle::ExpertMode); + SetCallbackStyle(CallbackStyle::LegacyMode); } void MetadataAggregator::Init() { @@ -21,17 +21,17 @@ void MetadataAggregator::Init() { LOG << "MetadataAggregator::Init" << LOG_END; } -void MetadataAggregator::Process(const JEvent& event) { - LOG << "MetadataAggregator::Process, Run #" << event.GetRunNumber() << ", Event #" << event.GetEventNumber() << LOG_END; +void MetadataAggregator::Process(const std::shared_ptr& event) { + LOG << "MetadataAggregator::Process, Run #" << event->GetRunNumber() << ", Event #" << event->GetEventNumber() << LOG_END; // Acquire tracks in parallel - auto tracks = event.Get(m_track_factory); + auto tracks = event->Get(m_track_factory); // Lock mutex, so we can update shared state sequentially std::lock_guardlock(m_mutex); // Since the run number probably doesn't change too frequently we cache the last entry - int run_nr = event.GetRunNumber(); + int run_nr = event->GetRunNumber(); if (run_nr != m_last_run_nr) { m_last_run_nr = run_nr; m_last_statistics = &m_statistics[m_last_run_nr]; // Get-or-create @@ -40,7 +40,7 @@ void MetadataAggregator::Process(const JEvent& event) { // Update the statistics accumulator using the metadata from this event m_last_statistics->event_count += 1; m_last_statistics->total_track_count += tracks.size(); - m_last_statistics->total_latency_ns += event.GetMetadata(m_track_factory).elapsed_time_ns; + m_last_statistics->total_latency_ns += event->GetMetadata(m_track_factory).elapsed_time_ns; } void MetadataAggregator::Finish() { diff --git a/src/examples/MetadataExample/MetadataAggregator.h b/src/examples/MetadataExample/MetadataAggregator.h index 32beca74a..749643176 100644 --- a/src/examples/MetadataExample/MetadataAggregator.h +++ b/src/examples/MetadataExample/MetadataAggregator.h @@ -41,7 +41,7 @@ class MetadataAggregator : public JEventProcessor { virtual ~MetadataAggregator() = default; void Init() override; - void Process(const JEvent&) override; + void Process(const std::shared_ptr&) override; void Finish() override; }; diff --git a/src/examples/Tutorial/CMakeLists.txt b/src/examples/Tutorial/CMakeLists.txt index 93c115f98..e5c57d9f0 100644 --- a/src/examples/Tutorial/CMakeLists.txt +++ b/src/examples/Tutorial/CMakeLists.txt @@ -3,7 +3,6 @@ add_jana_plugin(Tutorial) add_test(NAME jana-example-tutorial-tests - COMMAND jana -Pplugins=Tutorial) + COMMAND jana -Pplugins=Tutorial -Pjana:nevents=50) -set_tests_properties(jana-example-tutorial-tests PROPERTIES DISABLED TRUE) diff --git a/src/examples/Tutorial/TutorialProcessor.cc b/src/examples/Tutorial/TutorialProcessor.cc index 48133e46d..ec0d46b7d 100644 --- a/src/examples/Tutorial/TutorialProcessor.cc +++ b/src/examples/Tutorial/TutorialProcessor.cc @@ -8,7 +8,7 @@ TutorialProcessor::TutorialProcessor() { SetTypeName(NAME_OF_THIS); // Provide JANA with this class's name - SetCallbackStyle(CallbackStyle::ExpertMode); + SetCallbackStyle(CallbackStyle::LegacyMode); } void TutorialProcessor::Init() { @@ -21,12 +21,12 @@ void TutorialProcessor::Init() { } } -void TutorialProcessor::Process(const JEvent& event) { - LOG << "TutorialProcessor::Process, Event #" << event.GetEventNumber() << LOG_END; +void TutorialProcessor::Process(const std::shared_ptr& event) { + LOG << "TutorialProcessor::Process, Event #" << event->GetEventNumber() << LOG_END; /// Do everything we can in parallel /// Warning: We are only allowed to use local variables and `event` here - auto hits = event.Get(); + auto hits = event->Get(); /// Lock mutex std::lock_guardlock(m_mutex); diff --git a/src/examples/Tutorial/TutorialProcessor.h b/src/examples/Tutorial/TutorialProcessor.h index 40c6749b6..dd46d677f 100644 --- a/src/examples/Tutorial/TutorialProcessor.h +++ b/src/examples/Tutorial/TutorialProcessor.h @@ -18,7 +18,7 @@ class TutorialProcessor : public JEventProcessor { virtual ~TutorialProcessor() = default; void Init() override; - void Process(const JEvent& event) override; + void Process(const std::shared_ptr& event) override; void Finish() override; }; diff --git a/src/libraries/JANA/JEventProcessor.h b/src/libraries/JANA/JEventProcessor.h index bebe7d039..65db49a30 100644 --- a/src/libraries/JANA/JEventProcessor.h +++ b/src/libraries/JANA/JEventProcessor.h @@ -47,24 +47,25 @@ class JEventProcessor : public jana::omni::JComponent, virtual void DoMap(const std::shared_ptr& e) { + if (m_callback_style == CallbackStyle::LegacyMode) { + throw JException("Called DoMap() on a legacy-mode JEventProcessor"); + } + for (auto* input : m_inputs) { input->PrefetchCollection(*e); } - // JExceptions with factory info will be furnished by the callee, - // so we don't need to try/catch here. - - // Also we don't have - // a Preprocess(), so we don't technically need Init() here even - - if (m_callback_style != CallbackStyle::DeclarativeMode) { - DoReduce(e); // This does all the locking! - } } - virtual void DoReduce(const std::shared_ptr& e) { - auto run_number = e->GetRunNumber(); + virtual void DoTap(const std::shared_ptr& e) { + + if (m_callback_style == CallbackStyle::LegacyMode) { + throw JException("Called DoReduce() on a legacy-mode JEventProcessor"); + } std::lock_guard lock(m_mutex); + // In principle DoReduce() is being called by one thread at a time, but we hold a lock anyway + // so that this runs correctly even if that isn't happening. This lock shouldn't experience + // any contention. if (m_status == Status::Uninitialized) { DoInitialize(); @@ -72,6 +73,13 @@ class JEventProcessor : public jana::omni::JComponent, else if (m_status == Status::Finalized) { throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()"); } + for (auto* input : m_inputs) { + // This collection should have already been computed during DoMap() + // We do this before ChangeRun() just in case we will need to pull data out of + // a begin-of-run event. + input->GetCollection(*e); + } + auto run_number = e->GetRunNumber(); if (m_last_run_number != run_number) { if (m_last_run_number != -1) { CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); }); @@ -82,9 +90,6 @@ class JEventProcessor : public jana::omni::JComponent, m_last_run_number = run_number; CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(e); }); } - for (auto* input : m_inputs) { - input->GetCollection(*e); - } if (m_callback_style == CallbackStyle::DeclarativeMode) { CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(e->GetRunNumber(), e->GetEventNumber(), e->GetEventIndex()); @@ -93,9 +98,37 @@ class JEventProcessor : public jana::omni::JComponent, else if (m_callback_style == CallbackStyle::ExpertMode) { CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(*e); }); } - else { - CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(e); }); + m_event_count += 1; + } + + + virtual void DoLegacyProcess(const std::shared_ptr& event) { + + // DoLegacyProcess doesn't hold any locks, as it requires the user to hold a lock for it. + // Because of this, + if (m_callback_style != CallbackStyle::LegacyMode) { + throw JException("Called DoLegacyProcess() on a non-legacy-mode JEventProcessor"); + } + + auto run_number = event->GetRunNumber(); + + if (m_status == Status::Uninitialized) { + DoInitialize(); + } + else if (m_status == Status::Finalized) { + throw JException("JEventProcessor: Attempted to call DoMap() after Finalize()"); + } + if (m_last_run_number != run_number) { + if (m_last_run_number != -1) { + CallWithJExceptionWrapper("JEventProcessor::EndRun", [&](){ EndRun(); }); + } + for (auto* resource : m_resources) { + resource->ChangeRun(event->GetRunNumber(), m_app); + } + m_last_run_number = run_number; + CallWithJExceptionWrapper("JEventProcessor::BeginRun", [&](){ BeginRun(event); }); } + CallWithJExceptionWrapper("JEventProcessor::Process", [&](){ Process(event); }); m_event_count += 1; } diff --git a/src/libraries/JANA/Omni/JComponentFwd.h b/src/libraries/JANA/Omni/JComponentFwd.h index be8da4871..521e0cd38 100644 --- a/src/libraries/JANA/Omni/JComponentFwd.h +++ b/src/libraries/JANA/Omni/JComponentFwd.h @@ -83,6 +83,8 @@ struct JComponent { virtual void Summarize(JComponentSummary&) const {}; + CallbackStyle GetCallbackStyle() const { return m_callback_style; } + Status GetStatus() const { std::lock_guard lock(m_mutex); return m_status; diff --git a/src/libraries/JANA/Topology/JEventMapArrow.cc b/src/libraries/JANA/Topology/JEventMapArrow.cc index 73636fee5..9713e3cb2 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.cc +++ b/src/libraries/JANA/Topology/JEventMapArrow.cc @@ -2,6 +2,7 @@ // Subject to the terms in the LICENSE file found in the top-level directory. +#include "JANA/JEventProcessor.h" #include #include @@ -28,8 +29,11 @@ void JEventMapArrow::add_unfolder(JEventUnfolder* unfolder) { m_unfolders.push_back(unfolder); } +void JEventMapArrow::add_processor(JEventProcessor* processor) { + m_procs.push_back(processor); +} + void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { - LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END; for (JEventSource* source : m_sources) { @@ -40,6 +44,15 @@ void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), unfolder->GetTypeName()); // times execution until this goes out of scope unfolder->Preprocess(**event); } + for (JEventProcessor* processor : m_procs) { + JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope + if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { + processor->DoLegacyProcess(*event); + } + else { + processor->DoMap(*event); + } + } LOG_DEBUG(m_logger) << "JEventMapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END; success = true; status = JArrowMetrics::Status::KeepGoing; diff --git a/src/libraries/JANA/Topology/JEventMapArrow.h b/src/libraries/JANA/Topology/JEventMapArrow.h index 7da94014b..a0d521bb8 100644 --- a/src/libraries/JANA/Topology/JEventMapArrow.h +++ b/src/libraries/JANA/Topology/JEventMapArrow.h @@ -8,6 +8,7 @@ class JEventPool; class JEventSource; class JEventUnfolder; +class JEventProcessor; class JEvent; using Event = std::shared_ptr; @@ -18,12 +19,14 @@ class JEventMapArrow : public JPipelineArrow { private: std::vector m_sources; std::vector m_unfolders; + std::vector m_procs; public: JEventMapArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue); void add_source(JEventSource* source); void add_unfolder(JEventUnfolder* unfolder); + void add_processor(JEventProcessor* proc); void process(Event* event, bool& success, JArrowMetrics::Status& status); diff --git a/src/libraries/JANA/Topology/JEventProcessorArrow.cc b/src/libraries/JANA/Topology/JEventProcessorArrow.cc index 3efdb43bd..5ff2a2361 100644 --- a/src/libraries/JANA/Topology/JEventProcessorArrow.cc +++ b/src/libraries/JANA/Topology/JEventProcessorArrow.cc @@ -32,7 +32,14 @@ void JEventProcessorArrow::process(Event* event, bool& success, JArrowMetrics::S for (JEventProcessor* processor : m_processors) { // TODO: Move me into JEventProcessor::DoMap JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), processor->GetTypeName()); // times execution until this goes out of scope - processor->DoMap(*event); + if (processor->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode) { + processor->DoLegacyProcess(*event); + } + else { + processor->DoMap(*event); + processor->DoTap(*event); + + } } LOG_DEBUG(m_logger) << "JEventProcessorArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END; success = true; diff --git a/src/libraries/JANA/Topology/JEventTapArrow.cc b/src/libraries/JANA/Topology/JEventTapArrow.cc new file mode 100644 index 000000000..ee91db984 --- /dev/null +++ b/src/libraries/JANA/Topology/JEventTapArrow.cc @@ -0,0 +1,57 @@ +// Copyright 2024, Jefferson Science Associates, LLC. +// Subject to the terms in the LICENSE file found in the top-level directory. + + +#include +#include +#include +#include +#include + + +JEventTapArrow::JEventTapArrow(std::string name, + EventQueue *input_queue, + EventQueue *output_queue, + JEventPool *pool) + : JPipelineArrow(std::move(name), + false, + false, + false, + input_queue, + output_queue, + pool) {} + +void JEventTapArrow::add_processor(JEventProcessor* proc) { + m_procs.push_back(proc); +} + +void JEventTapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) { + + LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Starting event# " << (*event)->GetEventNumber() << LOG_END; + for (JEventProcessor* proc : m_procs) { + JCallGraphEntryMaker cg_entry(*(*event)->GetJCallGraphRecorder(), proc->GetTypeName()); // times execution until this goes out of scope + if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { + proc->DoTap(*event); + } + } + LOG_DEBUG(m_logger) << "JEventTapArrow '" << get_name() << "': Finished event# " << (*event)->GetEventNumber() << LOG_END; + success = true; + status = JArrowMetrics::Status::KeepGoing; +} + +void JEventTapArrow::initialize() { + LOG_DEBUG(m_logger) << "Initializing arrow '" << get_name() << "'" << LOG_END; + for (auto processor : m_procs) { + processor->DoInitialize(); + LOG_INFO(m_logger) << "Initialized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; + } +} + +void JEventTapArrow::finalize() { + LOG_DEBUG(m_logger) << "Finalizing arrow '" << get_name() << "'" << LOG_END; + for (auto processor : m_procs) { + processor->DoFinalize(); + LOG_INFO(m_logger) << "Finalized JEventProcessor '" << processor->GetTypeName() << "'" << LOG_END; + } +} + diff --git a/src/libraries/JANA/Topology/JEventTapArrow.h b/src/libraries/JANA/Topology/JEventTapArrow.h new file mode 100644 index 000000000..ad862411e --- /dev/null +++ b/src/libraries/JANA/Topology/JEventTapArrow.h @@ -0,0 +1,28 @@ +// Copyright 2024, Jefferson Science Associates, LLC. +// Subject to the terms in the LICENSE file found in the top-level directory. + +#pragma once + +#include + +class JEventPool; +class JEventProcessor; +class JEvent; + +using Event = std::shared_ptr; +using EventQueue = JMailbox; + +class JEventTapArrow : public JPipelineArrow { + +private: + std::vector m_procs; + +public: + JEventTapArrow(std::string name, EventQueue *input_queue, EventQueue *output_queue, JEventPool *pool); + + void add_processor(JEventProcessor* proc); + void process(Event* event, bool& success, JArrowMetrics::Status& status); + void initialize() final; + void finalize() final; +}; + diff --git a/src/plugins/JTest/JTestPlotter.h b/src/plugins/JTest/JTestPlotter.h index a8b882ef6..0c643f925 100644 --- a/src/plugins/JTest/JTestPlotter.h +++ b/src/plugins/JTest/JTestPlotter.h @@ -23,7 +23,6 @@ class JTestPlotter : public JEventProcessor { JTestPlotter() { SetPrefix("jtest:plotter"); SetTypeName(NAME_OF_THIS); - SetCallbackStyle(CallbackStyle::ExpertMode); } void Init() override { @@ -34,14 +33,14 @@ class JTestPlotter : public JEventProcessor { app->SetDefaultParameter("jtest:plotter_bytes_spread", m_write_spread, "Spread of bytes written during plotting"); } - void Process(const JEvent& event) override { + void Process(const std::shared_ptr& event) override { // Read the track data - auto td = event.GetSingle(); + auto td = event->GetSingle(); read_memory(td->buffer); // Read the extra data objects inserted by JTestTracker - event.Get(); + event->Get(); // Everything that happens after here is in a critical section std::lock_guard lock(m_mutex); @@ -52,7 +51,7 @@ class JTestPlotter : public JEventProcessor { // Write the histogram data auto hd = new JTestHistogramData; write_memory(hd->buffer, m_write_bytes, m_write_spread); - event.Insert(hd); + event->Insert(hd); } }; diff --git a/src/programs/perf_tests/CMakeLists.txt b/src/programs/perf_tests/CMakeLists.txt index d12240f1f..558516bbd 100644 --- a/src/programs/perf_tests/CMakeLists.txt +++ b/src/programs/perf_tests/CMakeLists.txt @@ -1,7 +1,6 @@ add_jana_test(jana-perf-tests) -set_tests_properties(jana-perf-tests PROPERTIES DISABLED TRUE) if (USE_PODIO) find_package(podio REQUIRED) diff --git a/src/programs/perf_tests/PerfTests.cc b/src/programs/perf_tests/PerfTests.cc index b1f9f12a7..974a57731 100644 --- a/src/programs/perf_tests/PerfTests.cc +++ b/src/programs/perf_tests/PerfTests.cc @@ -13,9 +13,6 @@ #include #endif -#include -#include - int main() {