From f67c408712fb16efadbae7b09a88acf95b7f9ea5 Mon Sep 17 00:00:00 2001 From: carltimmer Date: Mon, 1 Jul 2024 17:06:01 -0400 Subject: [PATCH] add mutex protection to a few methods --- src/libsrc++/EventWriterV4.cpp | 90 +++++++++++++++++++++------------- src/libsrc++/EventWriterV4.h | 8 ++- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/src/libsrc++/EventWriterV4.cpp b/src/libsrc++/EventWriterV4.cpp index 1f0f8d531..429823a91 100644 --- a/src/libsrc++/EventWriterV4.cpp +++ b/src/libsrc++/EventWriterV4.cpp @@ -769,6 +769,8 @@ namespace evio { // most probably won't contain the full file contents. if (toFile2()) return nullptr; + std::lock_guard lock(mtx); + // TODO: We synchronize here so we do not write/close in the middle // of our messing with the buffer. std::shared_ptr buf = buffer->duplicate(); @@ -794,7 +796,10 @@ namespace evio { * * @return {@code true} if this object closed, else {@code false}. */ - bool EventWriterV4::isClosed() const {return closed;} + bool EventWriterV4::isClosed() { + std::lock_guard lock(mtx); + return closed; + } /** @@ -920,6 +925,8 @@ namespace evio { */ void EventWriterV4::setFirstEvent(std::shared_ptr node) { + std::lock_guard lock(mtx); + // If getting rid of the first event ... if (node == nullptr) { if (!xmlDictionary.empty()) { @@ -1006,6 +1013,8 @@ namespace evio { /*synchronized*/ void EventWriterV4::setFirstEvent(std::shared_ptr buffer) { + std::lock_guard lock(mtx); + // If getting rid of the first event ... if (buffer == nullptr) { if (!xmlDictionary.empty()) { @@ -1084,27 +1093,12 @@ namespace evio { * if file exists but user requested no over-writing; * if no room when writing to user-given buffer; */ - void EventWriterV4::setFirstEvent(std::shared_ptr bank) - { + void EventWriterV4::setFirstEvent(std::shared_ptr bank) { - // If getting rid of the first event ... - if (bank == nullptr) { - if (!xmlDictionary.empty()) { - commonBlockCount = 1; - commonBlockByteSize = dictionaryBytes; - } - else { - commonBlockCount = 0; - commonBlockByteSize = 0; - } - firstEventBytes = 0; - firstEventByteArray.clear(); - haveFirstEvent = false; - return; - } + std::lock_guard lock(mtx); - // Find the first event's bytes and the memory size needed - // to contain the common events (dictionary + first event). + // If getting rid of the first event ... + if (bank == nullptr) { if (!xmlDictionary.empty()) { commonBlockCount = 1; commonBlockByteSize = dictionaryBytes; @@ -1113,22 +1107,38 @@ namespace evio { commonBlockCount = 0; commonBlockByteSize = 0; } + firstEventBytes = 0; + firstEventByteArray.clear(); + haveFirstEvent = false; + return; + } - firstEventBytes = bank->getTotalBytes(); - std::shared_ptr firstEventBuf = std::make_shared(firstEventBytes); - firstEventBuf->order(byteOrder); - bank->write(firstEventBuf); - firstEventBuf->flip(); - uint8_t* data = firstEventBuf->array(); - firstEventByteArray.assign(data, data + firstEventBytes); - commonBlockByteSize += firstEventBytes; - commonBlockCount++; - haveFirstEvent = true; + // Find the first event's bytes and the memory size needed + // to contain the common events (dictionary + first event). + if (!xmlDictionary.empty()) { + commonBlockCount = 1; + commonBlockByteSize = dictionaryBytes; + } + else { + commonBlockCount = 0; + commonBlockByteSize = 0; + } - // Write it to the file/buffer now. In this case it may not be the - // first event written and some splits may not even have it - // (depending on how many events have been written so far). - writeEvent(nullptr, firstEventBuf, false); + firstEventBytes = bank->getTotalBytes(); + std::shared_ptr firstEventBuf = std::make_shared(firstEventBytes); + firstEventBuf->order(byteOrder); + bank->write(firstEventBuf); + firstEventBuf->flip(); + uint8_t* data = firstEventBuf->array(); + firstEventByteArray.assign(data, data + firstEventBytes); + commonBlockByteSize += firstEventBytes; + commonBlockCount++; + haveFirstEvent = true; + + // Write it to the file/buffer now. In this case it may not be the + // first event written and some splits may not even have it + // (depending on how many events have been written so far). + writeEvent(nullptr, firstEventBuf, false); } @@ -1141,6 +1151,9 @@ namespace evio { * Calling this can kill performance. May not call this when simultaneously * calling writeEvent, close, setFirstEvent, or getByteBuffer. */ void EventWriterV4::flush() { + + std::lock_guard lock(mtx); + // If lastEmptyBlockHeaderExists is true, then resetBuffer // has been called and no events have been written into buffer yet. // In other words, no need to flush an empty, last block header. @@ -1166,6 +1179,9 @@ namespace evio { * May not call this when simultaneously calling * writeEvent, flush, setFirstEvent, or getByteBuffer. */ void EventWriterV4::close() { + + std::lock_guard lock(mtx); + if (closed) { return; } @@ -1219,6 +1235,8 @@ namespace evio { */ void EventWriterV4::examineFirstBlockHeader() { + std::lock_guard lock(mtx); + // Only for append mode if (!append) { // Internal logic error, should never have gotten here @@ -2161,6 +2179,8 @@ namespace evio { bool EventWriterV4::writeEvent(std::shared_ptr bank, std::shared_ptr bankBuffer, bool force) { + std::lock_guard lock(mtx); + if (closed) { throw EvioException("close() has already been called"); } @@ -2488,6 +2508,8 @@ namespace evio { bool EventWriterV4::writeEventToFile(std::shared_ptr bank, std::shared_ptr bankBuffer, bool force) { + std::lock_guard lock(mtx); + if (closed) { throw EvioException("close() has already been called"); } diff --git a/src/libsrc++/EventWriterV4.h b/src/libsrc++/EventWriterV4.h index cfda6ab5e..44c8d41e7 100644 --- a/src/libsrc++/EventWriterV4.h +++ b/src/libsrc++/EventWriterV4.h @@ -30,6 +30,7 @@ #include #include #include +#include #ifdef USE_FILESYSTEMLIB @@ -124,6 +125,11 @@ namespace evio { private: + /** + * Mutex to protect close, flush, isClosed, writeEvent, getByteBuffer, setFirstEvent, + * and examineFirstBlockHeader from stepping on each other's toes. + */ + std::recursive_mutex mtx; /** * Offset to where the block length is written in the byte buffer, @@ -538,7 +544,7 @@ namespace evio { void setBuffer(std::shared_ptr buf); std::shared_ptr getByteBuffer(); bool toFile2() const; - bool isClosed() const; + bool isClosed(); std::string getCurrentFilename() const; std::string getCurrentFilePath() const; uint32_t getSplitNumber() const;