Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #315: JTest uses a flawed RNG setup #347

Merged
merged 5 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/examples/EventGroupExample/GroupedEventProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include <JANA/JEventProcessor.h>
#include <JANA/JLogger.h>
#include <JANA/Services/JEventGroupTracker.h>
#include <JANA/Utils/JPerfUtils.h>
#include <JANA/Utils/JBenchUtils.h>

#include "TridasEvent.h"

Expand All @@ -26,8 +26,9 @@ class GroupedEventProcessor : public JEventProcessor {

void Process(const std::shared_ptr<const JEvent>& event) override {

std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(event->GetEventNumber(), NAME_OF_THIS);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor for JBenchUtils might be expensive, so I'd consider adding a method to reset the seed rather than recreating the object on every event. If you make bench_utils be a member variable, you can construct it exactly once and then reset the seed on every event. Also, you don't need the unique_ptr here because ownership stays with the component and there's no inheritance hierarchy to watch out for.

// In parallel, perform a random amount of (slow) computation
consume_cpu_ms(100, 1.0);
bench_utils->consume_cpu_ms(100, 1.0);

auto tridas_event = event->GetSingle<TridasEvent>();
tridas_event->should_keep = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#define _ADCSampleFactory_h_

#include <JANA/JFactoryT.h>
#include <JANA/Utils/JPerfUtils.h>

#include "ADCSample.h"
#include "INDRAMessage.h"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <JANA/Streaming/JStreamingEventSource.h>
#include <JANA/JCsvWriter.h>
#include <JANA/JEventSourceGeneratorT.h>
#include <JANA/Utils/JBenchUtils.h>


#include "RootProcessor.h"
#include "MonitoringProcessor.h"
Expand All @@ -19,6 +21,7 @@ void dummy_publisher_loop(JApplication* app) {

size_t delay_ms = 1;
auto logger = app->GetService<JLoggingService>()->get_logger("dummy_publisher_loop");
std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(7, "InteractiveStreamingExample.cc:dummy_publisher_loop");

std::this_thread::yield();
//std::this_thread::sleep_for(std::chrono::milliseconds(10)); // Wait for JANA to fire up so we don't lose data
Expand Down Expand Up @@ -51,7 +54,7 @@ void dummy_publisher_loop(JApplication* app) {
//LOG_DEBUG(logger) << "Send: " << message << " (" << message.get_buffer_size() << " bytes)" << LOG_END;
std::cout << "dummy_producer_loop: Sending '" << message << "' (" << message.get_buffer_size() << " bytes)" << std::endl;
transport.send(message);
consume_cpu_ms(delay_ms, 0, false);
bench_utils->consume_cpu_ms(delay_ms, 0, false);
std::this_thread::yield();
}

Expand Down
9 changes: 5 additions & 4 deletions src/examples/RootDatamodelExample/JTestRootEventSource.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

#include <JANA/JApplication.h>
#include <JANA/JEvent.h>
#include <JANA/Utils/JPerfUtils.h>
#include <JANA/Utils/JBenchUtils.h>

JTestRootEventSource::JTestRootEventSource() {
SetTypeName(NAME_OF_THIS); // Provide JANA with class name
Expand All @@ -24,13 +24,14 @@ JEventSource::Result JTestRootEventSource::Emit(JEvent& event) {
/// Generate an event by inserting objects into "event".
/// (n.b. a normal event source would read these from a file or stream)

// Spin the CPU a bit to limit the rate
consume_cpu_ms(5);

// Configure event and run numbers
static size_t current_event_number = 1;
event.SetEventNumber(current_event_number++);
event.SetRunNumber(222);

std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(event.GetEventNumber(), NAME_OF_THIS);
// Spin the CPU a bit to limit the rate
bench_utils->consume_cpu_ms(5);

// Generate hit objects. We use random numbers to give some variation
// and make things look a little more realistic
Expand Down
5 changes: 3 additions & 2 deletions src/examples/StreamingExample/AHitAnomalyDetector.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <JANA/JEventProcessor.h>
#include <JANA/JEvent.h>
#include <JANA/Utils/JPerfUtils.h>
#include <JANA/Utils/JBenchUtils.h>
#include "AHit.h"

class AHitAnomalyDetector : public JEventProcessor {
Expand All @@ -33,7 +33,8 @@ class AHitAnomalyDetector : public JEventProcessor {
}
ss << "}" << std::endl;
std::cout << ss.str();
consume_cpu_ms(m_delay_ms);
std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(event.GetEventNumber(), NAME_OF_THIS);
bench_utils->consume_cpu_ms(m_delay_ms);
}
void Finish() override {
std::cout << "Anomaly detection: Done!" << std::endl;
Expand Down
8 changes: 5 additions & 3 deletions src/examples/StreamingExample/AHitBHitFuser.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

#include <JANA/JEventProcessor.h>
#include <JANA/JEvent.h>
#include <JANA/JPerfUtils.h>
#include <JANA/JBenchUtils.h>
#include "AHit.h"

/// AHitBHitFuser
Expand Down Expand Up @@ -39,7 +39,9 @@ class AHitBHitFuser : public JEventProcessor {
}
ss << "}" << std::endl;
std::cout << ss.str();
consume_cpu_ms(m_delay_ms);

std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(event.GetEventNumber(), NAME_OF_THIS);
bench_utils->consume_cpu_ms(m_delay_ms);


auto raw_hits = event.Get<AHit>("raw_hits");
Expand All @@ -52,7 +54,7 @@ class AHitBHitFuser : public JEventProcessor {
calibrated_hit->V += 7;
std::cout << serializer.serialize(*calibrated_hit) << std::endl;
}
consume_cpu_ms(m_delay_ms);
bench_utils->consume_cpu_ms(m_delay_ms);
}
void Finish() override {
std::cout << "Done!" << std::endl;
Expand Down
15 changes: 9 additions & 6 deletions src/examples/StreamingExample/ZmqMain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
#include <JANA/JApplication.h>
#include <JANA/JFactoryGenerator.h>
#include <JANA/JEventSourceGeneratorT.h>
#include <JANA/Utils/JBenchUtils.h>
#include <JANA/Streaming/JEventBuilder.h>
#include <JANA/Streaming/JStreamingEventSource.h>


#include "ReadoutMessageAuto.h"
#include "ZmqTransport.h"
#include "AHitParser.h"
Expand All @@ -18,7 +20,8 @@

void dummy_publisher_loop() {

consume_cpu_ms(3000, 0, false);
std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(6, "ZmqMain.cc:dummy_publisher_loop");
bench_utils->consume_cpu_ms(3000, 0, false);

auto transport = ZmqTransport("tcp://127.0.0.1:5555", true);
transport.initialize();
Expand All @@ -30,14 +33,14 @@ void dummy_publisher_loop() {
message.event_number = counter;

message.payload_size = 4;
message.payload[0] = randfloat(0,1);
message.payload[1] = randfloat(-100,100);
message.payload[2] = randfloat(-100,100);
message.payload[3] = randfloat(-100,100);
message.payload[0] = bench_utils->randfloat(0,1);
message.payload[1] = bench_utils->randfloat(-100,100);
message.payload[2] = bench_utils->randfloat(-100,100);
message.payload[3] = bench_utils->randfloat(-100,100);

transport.send(message);
std::cout << "Send: " << message << "(" << message.get_buffer_capacity() << " bytes)" << std::endl;
consume_cpu_ms(1000, 0, false);
bench_utils->consume_cpu_ms(1000, 0, false);
}

// Send end-of-stream message so that JANA knows to shut down
Expand Down
2 changes: 2 additions & 0 deletions src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ set(JANA2_SOURCES
Utils/JProcessorMapping.cc
Utils/JPerfUtils.cc
Utils/JPerfUtils.h
Utils/JBenchUtils.cc
Utils/JBenchUtils.h
Utils/JStringification.cc
Utils/JStringification.h
Utils/JAutoActivator.cc
Expand Down
91 changes: 91 additions & 0 deletions src/libraries/JANA/Utils/JBenchUtils.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@

// Copyright 2020, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.


#include "JBenchUtils.h"


JBenchUtils::JBenchUtils(size_t event_number, std::string caller_name)
{
std::hash<std::string> hasher;
long seed = event_number ^ hasher(caller_name);
m_generator = std::mt19937(seed);
}


size_t JBenchUtils::rand_size(size_t avg, double spread) {
auto delta = static_cast<size_t>(avg*spread);
std::uniform_int_distribution<size_t> distribution(avg-delta, avg+delta);
return distribution(m_generator);
}


int JBenchUtils::randint(int min, int max) {
std::uniform_int_distribution<int> distribution(min, max);
return distribution(m_generator);
}

double JBenchUtils::randdouble(double min, double max) {
std::uniform_real_distribution<double> dist(min, max);
return dist(m_generator);
}

float JBenchUtils::randfloat(float min, float max) {
std::uniform_real_distribution<float> dist(min, max);
return dist(m_generator);
}

uint64_t JBenchUtils::consume_cpu_ms(uint64_t millisecs, double spread, bool fix_flops) {

uint64_t sampled = rand_size(millisecs, spread);
uint64_t result = 0;

if (fix_flops) {
// Perform a fixed amount of work in a variable time
const uint64_t appx_iters_per_millisec = 14000;
sampled *= appx_iters_per_millisec;

for (uint64_t i=0; i<sampled; ++i) {
double a = (m_generator)();
double b = sqrt(a * pow(1.23, -a)) / a;
result += long(b);
}
}
else {
// Perform a variable amount of work in a fixed time
auto duration = std::chrono::milliseconds(sampled);
auto start_time = std::chrono::steady_clock::now();
while ((std::chrono::steady_clock::now() - start_time) < duration) {

double a = (m_generator)();
double b = sqrt(a * pow(1.23, -a)) / a;
result += long(b);
}
}
return result;
}

uint64_t JBenchUtils::read_memory(const std::vector<char>& buffer) {

auto length = buffer.size();
uint64_t sum = 0;
for (unsigned i=0; i<length; ++i) {
sum += buffer[i];
}
return sum;
}

uint64_t JBenchUtils::write_memory(std::vector<char>& buffer, uint64_t bytes, double spread) {

uint64_t sampled = rand_size(bytes, spread);
for (unsigned i=0; i<sampled; ++i) {
buffer.push_back(2);
}
return sampled*2;
}





33 changes: 33 additions & 0 deletions src/libraries/JANA/Utils/JBenchUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

// Copyright 2020, Jefferson Science Associates, LLC.
// Subject to the terms in the LICENSE file found in the top-level directory.

#pragma once

#include <vector>
#include <random>
#include <string>
#include <algorithm>
#include <string>
#include <typeinfo>
#include <chrono>


class JBenchUtils {

std::mt19937 m_generator;

public:

JBenchUtils(size_t event_number, std::string caller_name);

size_t rand_size(size_t avg, double spread);
int randint(int min, int max);
double randdouble(double min=0.0, double max=1000.0);
float randfloat(float min=0.0, float max=1000.0);

uint64_t consume_cpu_ms(uint64_t millisecs, double spread=0.0, bool fix_flops=true);
uint64_t read_memory(const std::vector<char>& buffer);
uint64_t write_memory(std::vector<char>& buffer, uint64_t bytes, double spread=0.0);

};
9 changes: 5 additions & 4 deletions src/plugins/JTest/JTestDisentangler.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

#include <JANA/JFactoryT.h>
#include <JANA/JEvent.h>
#include <JANA/Utils/JPerfUtils.h>
#include <JANA/Utils/JBenchUtils.h>

#include "JTestDataObjects.h"
#include "JTestCalibrationService.h"
Expand Down Expand Up @@ -38,19 +38,20 @@ class JTestDisentangler : public JFactoryT<JTestEventData> {

void Process(const std::shared_ptr<const JEvent> &aEvent) override {

std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(aEvent->GetEventNumber(), NAME_OF_THIS);
// Read (large) entangled event data
auto eed = aEvent->GetSingle<JTestEntangledEventData>();
read_memory(*eed->buffer);
bench_utils->read_memory(*eed->buffer);

// Read calibration data
m_calibration_service->getCalibration();

// Do a little bit of computation
consume_cpu_ms(m_cputime_ms, m_cputime_spread);
bench_utils->consume_cpu_ms(m_cputime_ms, m_cputime_spread);

// Write (large) event data
auto ed = new JTestEventData;
write_memory(ed->buffer, m_write_bytes, m_write_spread);
bench_utils->write_memory(ed->buffer, m_write_bytes, m_write_spread);
Insert(ed);
}
};
Expand Down
18 changes: 12 additions & 6 deletions src/plugins/JTest/JTestParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
#include <vector>
#include <memory>


#include <JANA/JApplication.h>
#include <JANA/JEventSource.h>
#include <JANA/JEventSourceGeneratorT.h>

#include <JANA/Utils/JPerfUtils.h>
#include <JANA/Utils/JBenchUtils.h>


#include "JTestDataObjects.h"

Expand Down Expand Up @@ -50,23 +52,27 @@ class JTestParser : public JEventSource {

Result Emit(JEvent& event) override {

if ((m_events_generated % 40) == 0) {
const auto prev_m_events_generated = m_events_generated;
m_events_generated++;
event.SetEventNumber(m_events_generated);

std::unique_ptr<JBenchUtils> bench_utils = std::make_unique<JBenchUtils>(m_events_generated, typeid(*this).name());

if ((prev_m_events_generated % 40) == 0) {
// "Read" new entangled event every 40 events
m_latest_entangled_buffer = std::shared_ptr<std::vector<char>>(new std::vector<char>);
write_memory(*m_latest_entangled_buffer, m_write_bytes, m_write_spread);
bench_utils->write_memory(*m_latest_entangled_buffer, m_write_bytes, m_write_spread);
}

// Spin the CPU
consume_cpu_ms(m_cputime_ms, m_cputime_spread);
bench_utils->consume_cpu_ms(m_cputime_ms, m_cputime_spread);

// Emit a shared pointer to the entangled event buffer
auto eec = new JTestEntangledEventData;
eec->buffer = m_latest_entangled_buffer;
event.Insert<JTestEntangledEventData>(eec);

m_events_generated++;

event.SetEventNumber(m_events_generated);
event.SetRunNumber(1);
return Result::Success;
}
Expand Down
Loading
Loading