Skip to content

Commit

Permalink
add mutex protection to a few methods
Browse files Browse the repository at this point in the history
  • Loading branch information
carltimmer committed Jul 1, 2024
1 parent 7da8bd8 commit f67c408
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 35 deletions.
90 changes: 56 additions & 34 deletions src/libsrc++/EventWriterV4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,8 @@ namespace evio {
// most probably won't contain the full file contents.
if (toFile2()) return nullptr;

std::lock_guard<std::recursive_mutex> lock(mtx);

// TODO: We synchronize here so we do not write/close in the middle
// of our messing with the buffer.
std::shared_ptr<ByteBuffer> buf = buffer->duplicate();
Expand All @@ -794,7 +796,10 @@ namespace evio {
*
* @return {@code true} if this object closed, else {@code false}.
*/
bool EventWriterV4::isClosed() const {return closed;}
bool EventWriterV4::isClosed() {
std::lock_guard<std::recursive_mutex> lock(mtx);
return closed;
}


/**
Expand Down Expand Up @@ -920,6 +925,8 @@ namespace evio {
*/
void EventWriterV4::setFirstEvent(std::shared_ptr<EvioNode> node) {

std::lock_guard<std::recursive_mutex> lock(mtx);

// If getting rid of the first event ...
if (node == nullptr) {
if (!xmlDictionary.empty()) {
Expand Down Expand Up @@ -1006,6 +1013,8 @@ namespace evio {
/*synchronized*/
void EventWriterV4::setFirstEvent(std::shared_ptr<ByteBuffer> buffer) {

std::lock_guard<std::recursive_mutex> lock(mtx);

// If getting rid of the first event ...
if (buffer == nullptr) {
if (!xmlDictionary.empty()) {
Expand Down Expand Up @@ -1084,27 +1093,12 @@ namespace evio {
* if file exists but user requested no over-writing;
* if no room when writing to user-given buffer;
*/
void EventWriterV4::setFirstEvent(std::shared_ptr<EvioBank> bank)
{
void EventWriterV4::setFirstEvent(std::shared_ptr<EvioBank> bank) {

// If getting rid of the first event ...
if (bank == nullptr) {
if (!xmlDictionary.empty()) {
commonBlockCount = 1;
commonBlockByteSize = dictionaryBytes;
}
else {
commonBlockCount = 0;
commonBlockByteSize = 0;
}
firstEventBytes = 0;
firstEventByteArray.clear();
haveFirstEvent = false;
return;
}
std::lock_guard<std::recursive_mutex> lock(mtx);

// Find the first event's bytes and the memory size needed
// to contain the common events (dictionary + first event).
// If getting rid of the first event ...
if (bank == nullptr) {
if (!xmlDictionary.empty()) {
commonBlockCount = 1;
commonBlockByteSize = dictionaryBytes;
Expand All @@ -1113,22 +1107,38 @@ namespace evio {
commonBlockCount = 0;
commonBlockByteSize = 0;
}
firstEventBytes = 0;
firstEventByteArray.clear();
haveFirstEvent = false;
return;
}

firstEventBytes = bank->getTotalBytes();
std::shared_ptr<ByteBuffer> firstEventBuf = std::make_shared<ByteBuffer>(firstEventBytes);
firstEventBuf->order(byteOrder);
bank->write(firstEventBuf);
firstEventBuf->flip();
uint8_t* data = firstEventBuf->array();
firstEventByteArray.assign(data, data + firstEventBytes);
commonBlockByteSize += firstEventBytes;
commonBlockCount++;
haveFirstEvent = true;
// Find the first event's bytes and the memory size needed
// to contain the common events (dictionary + first event).
if (!xmlDictionary.empty()) {
commonBlockCount = 1;
commonBlockByteSize = dictionaryBytes;
}
else {
commonBlockCount = 0;
commonBlockByteSize = 0;
}

// Write it to the file/buffer now. In this case it may not be the
// first event written and some splits may not even have it
// (depending on how many events have been written so far).
writeEvent(nullptr, firstEventBuf, false);
firstEventBytes = bank->getTotalBytes();
std::shared_ptr<ByteBuffer> firstEventBuf = std::make_shared<ByteBuffer>(firstEventBytes);
firstEventBuf->order(byteOrder);
bank->write(firstEventBuf);
firstEventBuf->flip();
uint8_t* data = firstEventBuf->array();
firstEventByteArray.assign(data, data + firstEventBytes);
commonBlockByteSize += firstEventBytes;
commonBlockCount++;
haveFirstEvent = true;

// Write it to the file/buffer now. In this case it may not be the
// first event written and some splits may not even have it
// (depending on how many events have been written so far).
writeEvent(nullptr, firstEventBuf, false);
}


Expand All @@ -1141,6 +1151,9 @@ namespace evio {
* Calling this can kill performance. May not call this when simultaneously
* calling writeEvent, close, setFirstEvent, or getByteBuffer. */
void EventWriterV4::flush() {

std::lock_guard<std::recursive_mutex> lock(mtx);

// If lastEmptyBlockHeaderExists is true, then resetBuffer
// has been called and no events have been written into buffer yet.
// In other words, no need to flush an empty, last block header.
Expand All @@ -1166,6 +1179,9 @@ namespace evio {
* May not call this when simultaneously calling
* writeEvent, flush, setFirstEvent, or getByteBuffer. */
void EventWriterV4::close() {

std::lock_guard<std::recursive_mutex> lock(mtx);

if (closed) {
return;
}
Expand Down Expand Up @@ -1219,6 +1235,8 @@ namespace evio {
*/
void EventWriterV4::examineFirstBlockHeader() {

std::lock_guard<std::recursive_mutex> lock(mtx);

// Only for append mode
if (!append) {
// Internal logic error, should never have gotten here
Expand Down Expand Up @@ -2161,6 +2179,8 @@ namespace evio {
bool EventWriterV4::writeEvent(std::shared_ptr<EvioBank> bank,
std::shared_ptr<ByteBuffer> bankBuffer, bool force) {

std::lock_guard<std::recursive_mutex> lock(mtx);

if (closed) {
throw EvioException("close() has already been called");
}
Expand Down Expand Up @@ -2488,6 +2508,8 @@ namespace evio {
bool EventWriterV4::writeEventToFile(std::shared_ptr<EvioBank> bank,
std::shared_ptr<ByteBuffer> bankBuffer, bool force) {

std::lock_guard<std::recursive_mutex> lock(mtx);

if (closed) {
throw EvioException("close() has already been called");
}
Expand Down
8 changes: 7 additions & 1 deletion src/libsrc++/EventWriterV4.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <sys/stat.h>
#include <sys/statvfs.h>
#include <stdexcept>
#include <mutex>


#ifdef USE_FILESYSTEMLIB
Expand Down Expand Up @@ -124,6 +125,11 @@ namespace evio {

private:

/**
* Mutex to protect close, flush, isClosed, writeEvent, getByteBuffer, setFirstEvent,
* and examineFirstBlockHeader from stepping on each other's toes.
*/
std::recursive_mutex mtx;

/**
* Offset to where the block length is written in the byte buffer,
Expand Down Expand Up @@ -538,7 +544,7 @@ namespace evio {
void setBuffer(std::shared_ptr<ByteBuffer> buf);
std::shared_ptr<ByteBuffer> getByteBuffer();
bool toFile2() const;
bool isClosed() const;
bool isClosed();
std::string getCurrentFilename() const;
std::string getCurrentFilePath() const;
uint32_t getSplitNumber() const;
Expand Down

0 comments on commit f67c408

Please sign in to comment.