From 91268f2bbf892b625025d62498839ca66dceecaf Mon Sep 17 00:00:00 2001 From: medvedev Date: Mon, 27 Nov 2023 11:13:53 +0500 Subject: [PATCH] =?UTF-8?q?=D1=81=D0=BE=D1=85=D1=80=D0=B0=D0=BD=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85=20?= =?UTF-8?q?=D1=82=D0=B5=D0=BF=D0=B5=D1=80=D1=8C=20=D0=B2=20=D0=B4=D1=80=20?= =?UTF-8?q?=D0=BF=D0=BE=D1=82=D0=BE=D0=BA=D0=B5=20+=20=D1=84=D0=B8=D0=BA?= =?UTF-8?q?=D1=81=D1=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/SVServer/src/archive.cpp | 205 ++++++++++++++++------------- src/SVServer/src/archive.h | 10 +- src/SVServer/src/buffer_data.cpp | 50 ++++--- src/SVServer/src/buffer_data.h | 12 +- src/SVServer/src/thread_update.cpp | 15 +-- 5 files changed, 166 insertions(+), 126 deletions(-) diff --git a/src/SVServer/src/archive.cpp b/src/SVServer/src/archive.cpp index 4cca7f8..e9c62be 100644 --- a/src/SVServer/src/archive.cpp +++ b/src/SVServer/src/archive.cpp @@ -32,6 +32,7 @@ #include #include #include +#include using namespace std; @@ -62,125 +63,147 @@ void Archive::setConfig(const SV_Srv::Config& cng_){ void Archive::addSignal(const std::string& sname, const std::string& module, SV_Base::ValueType stype) { - std::string sign = sname + module; - if (m_archiveData.find(sign) != m_archiveData.end()) return; + for (int aIndex = 0; aIndex < 2; ++aIndex){ + std::string sign = sname + module; + auto& archiveData = m_archiveData[aIndex]; + if (archiveData.find(sign) != archiveData.end()) return; - m_archiveData[sign] = vector(m_copySz); + archiveData[sign] = vector(m_copySz); - m_valPos[sign] = 0; + auto& vpos = m_valPos[aIndex]; + vpos[sign] = 0; - SV_Base::Value* buff = new SV_Base::Value[SV_PACKETSZ * m_copySz]; - memset(buff, 0, SV_PACKETSZ * m_copySz * sizeof(SV_Base::Value)); - for (size_t i = 0; i < m_copySz; ++i){ - m_archiveData[sign][i].vals = &buff[i * SV_PACKETSZ]; - } - if (m_chdb){ - m_chdb->addSignal(sname, module, stype); + SV_Base::Value* buff = new SV_Base::Value[SV_PACKETSZ * m_copySz]; + memset(buff, 0, SV_PACKETSZ * m_copySz * sizeof(SV_Base::Value)); + for (size_t i = 0; i < m_copySz; ++i){ + archiveData[sign][i].vals = &buff[i * SV_PACKETSZ]; + } + if (m_chdb){ + m_chdb->addSignal(sname, module, stype); + } } } void Archive::addValue(const string& sign, const SV_Base::RecData& rd) { - int vp = m_valPos[sign]; - - m_archiveData[sign][vp].beginTime = rd.beginTime; - memcpy(m_archiveData[sign][vp].vals, rd.vals, SV_PACKETSZ * sizeof(SV_Base::Value)); + auto& valPos = m_valPos[m_archiveIndex]; + int vp = valPos[sign]; + auto& archiveData = m_archiveData[m_archiveIndex]; + archiveData[sign][vp].beginTime = rd.beginTime; + memcpy(archiveData[sign][vp].vals, rd.vals, SV_PACKETSZ * sizeof(SV_Base::Value)); - ++m_valPos[sign]; - - if (m_valPos[sign] == m_copySz) { + ++valPos[sign]; + if (valPos[sign] == m_copySz) { copyToDisk(false); } } -bool Archive::copyToDisk(bool isStop){ - - const size_t dataSz = m_archiveData.size(); - if (dataSz == 0){ - return true; - } - - if (cng.outArchiveEna){ - const size_t SMAXCNT = 100; // макс кол-во сигналов в посылке - - const size_t intSz = sizeof(int32_t), - tmSz = sizeof(uint64_t), - vlSz = sizeof(SV_Base::Value) * SV_PACKETSZ; +void Archive::copyToDisk(bool isStop){ + if (m_saveThread && m_saveThread->joinable()){ + m_saveThread->join(); + } + if (!isStop){ + m_saveThread = std::make_shared([this, aIndex = m_archiveIndex]{ + copyToDiskImpl(false, aIndex); + }); + m_archiveIndex = m_archiveIndex ? 0 : 1; + }else{ + copyToDiskImpl(isStop, m_archiveIndex); + } +} - // name module group comment type vCnt - const size_t headSz = SV_NAMESZ + SV_NAMESZ + SV_NAMESZ + SV_COMMENTSZ + intSz + intSz; +void Archive::copyToDiskImpl(bool isStop, int archiveIndex){ - vector inArr((tmSz + vlSz) * m_copySz * SMAXCNT + headSz * SMAXCNT); - vector compArr; + auto& valPos = m_valPos[archiveIndex]; + auto& archiveData = m_archiveData[archiveIndex]; + const size_t dataSz = archiveData.size(); + if (dataSz == 0){ + return; + } - vector keys; - keys.reserve(dataSz); - for (auto &it : m_archiveData){ - keys.push_back(it.first); - } + if (cng.outArchiveEna){ + const size_t SMAXCNT = 100; // макс кол-во сигналов в посылке + + const size_t intSz = sizeof(int32_t), + tmSz = sizeof(uint64_t), + vlSz = sizeof(SV_Base::Value) * SV_PACKETSZ; + + // name module group comment type vCnt + const size_t headSz = SV_NAMESZ + SV_NAMESZ + SV_NAMESZ + SV_COMMENTSZ + intSz + intSz; + + vector inArr((tmSz + vlSz) * m_copySz * SMAXCNT + headSz * SMAXCNT); + vector compArr; + + vector keys; + keys.reserve(dataSz); + for (auto &it : archiveData){ + keys.push_back(it.first); + } + + const auto fpath = getOutPath(isStop); + fstream file(fpath, std::fstream::binary | std::fstream::app); + if (!file.good()){ + if (pfStatusCBack) pfStatusCBack("Archive::copyToDisk file not open for write, fpath " + fpath); + return; + } + + size_t sCnt = 0, csize = 0; + for (size_t i = 0; i < dataSz; ++i) { + + auto sign = SV_Srv::getSignalData(keys[i]); + + string sn = sign->name + sign->module; + + char* pIn = inArr.data(); + + int vCnt = valPos[sn]; + if (vCnt > 0) { + memcpy(pIn + csize, sign->name.c_str(), SV_NAMESZ); csize += SV_NAMESZ; + memcpy(pIn + csize, sign->module.c_str(), SV_NAMESZ); csize += SV_NAMESZ; + memcpy(pIn + csize, sign->group.c_str(), SV_NAMESZ); csize += SV_NAMESZ; + memcpy(pIn + csize, sign->comment.c_str(), SV_COMMENTSZ); csize += SV_COMMENTSZ; + memcpy(pIn + csize, &sign->type, intSz); csize += intSz; + memcpy(pIn + csize, &vCnt, intSz); csize += intSz; + + for (int j = 0; j < vCnt; ++j) { + memcpy(pIn + csize, &archiveData[sn][j].beginTime, tmSz); csize += tmSz; + memcpy(pIn + csize, archiveData[sn][j].vals, vlSz); csize += vlSz; + } - fstream file(getOutPath(isStop), std::fstream::binary | std::fstream::app); - if (!file.good()){ - return false; + ++sCnt; } - size_t sCnt = 0, csize = 0; - for (size_t i = 0; i < dataSz; ++i) { + if ((sCnt > 0) && ((sCnt >= SMAXCNT) || (i == (dataSz - 1)))) { + sCnt = 0; - auto sign = SV_Srv::getSignalData(keys[i]); + size_t compSz = 0; - string sn = sign->name + sign->module; + if (!compressData(csize, inArr, compSz, compArr)) { + if (pfStatusCBack) pfStatusCBack("Archive::copyToDisk compressData error"); + file.close(); + return; + }; - char* pIn = inArr.data(); + file.write((char *)&compSz, sizeof(int)); + file.write((char *)&csize, sizeof(int)); + file.write((char *)compArr.data(), compSz); - int vCnt = m_valPos[sn]; - if (vCnt > 0) { - memcpy(pIn + csize, sign->name.c_str(), SV_NAMESZ); csize += SV_NAMESZ; - memcpy(pIn + csize, sign->module.c_str(), SV_NAMESZ); csize += SV_NAMESZ; - memcpy(pIn + csize, sign->group.c_str(), SV_NAMESZ); csize += SV_NAMESZ; - memcpy(pIn + csize, sign->comment.c_str(), SV_COMMENTSZ); csize += SV_COMMENTSZ; - memcpy(pIn + csize, &sign->type, intSz); csize += intSz; - memcpy(pIn + csize, &vCnt, intSz); csize += intSz; - - for (int j = 0; j < vCnt; ++j) { - memcpy(pIn + csize, &m_archiveData[sn][j].beginTime, tmSz); csize += tmSz; - memcpy(pIn + csize, m_archiveData[sn][j].vals, vlSz); csize += vlSz; - } - - ++sCnt; - } - - if ((sCnt > 0) && ((sCnt >= SMAXCNT) || (i == (dataSz - 1)))) { - sCnt = 0; - - size_t compSz = 0; - - if (!compressData(csize, inArr, compSz, compArr)) { - if (pfStatusCBack) pfStatusCBack("Archive::copyToDisk compressData error"); - file.close(); - return false; - }; - - file.write((char *)&compSz, sizeof(int)); - file.write((char *)&csize, sizeof(int)); - file.write((char *)compArr.data(), compSz); - - csize = 0; - } + csize = 0; } - file.close(); - } + } + file.close(); + } - if (m_chdb && cng.outDataBaseEna){ - m_chdb->saveSData(isStop, m_valPos, m_archiveData); - } + if (m_chdb && cng.outDataBaseEna){ + m_chdb->saveSData(isStop, valPos, archiveData); + } - for(auto& v : m_valPos){ - v.second = 0; - } + for(auto& v : valPos){ + v.second = 0; + } - return true; + return; } bool Archive::compressData(size_t inSz, const vector& inArr, size_t& outsz, vector& outArr) { diff --git a/src/SVServer/src/archive.h b/src/SVServer/src/archive.h index 6c76fb0..f4f3dfa 100644 --- a/src/SVServer/src/archive.h +++ b/src/SVServer/src/archive.h @@ -28,6 +28,7 @@ #include "SVServer/sv_server.h" #include +#include class ClickHouseDB; @@ -40,7 +41,7 @@ class Archive void init(const SV_Srv::Config&); - bool copyToDisk(bool isStop); + void copyToDisk(bool isStop); void addSignal(const std::string& sname, const std::string& module, SV_Base::ValueType stype); void addValue(const std::string& sign, const SV_Base::RecData& rd); @@ -48,6 +49,7 @@ class Archive void setConfig(const SV_Srv::Config&); private: + void copyToDiskImpl(bool isStop, int archiveIndex); bool isCopyTimeHour(); std::string getOutPath(bool isStop); bool compressData(size_t insz, const std::vector& inArr, size_t& outsz, std::vector& outArr); @@ -60,13 +62,15 @@ class Archive std::string m_copyDateMem = ""; int m_crtFileHour = 0; + int m_archiveIndex = 0; size_t m_copySz = 0; - std::map m_valPos; + std::map m_valPos[2]; SV_Misc::Front m_front; - std::map> m_archiveData; + std::map> m_archiveData[2]; ClickHouseDB* m_chdb{}; + std::shared_ptr m_saveThread; }; diff --git a/src/SVServer/src/buffer_data.cpp b/src/SVServer/src/buffer_data.cpp index 1ac14b5..5cfa932 100644 --- a/src/SVServer/src/buffer_data.cpp +++ b/src/SVServer/src/buffer_data.cpp @@ -46,47 +46,59 @@ void BufferData::updateDataSignals(const std::string& indata, uint64_t bTm){ valSz = SV_NAMESZ + sizeof(SV_Base::ValueType) + sizeof(SV_Base::Value) * SV_PACKETSZ, cPos = SV_NAMESZ; - size_t valCnt = std::max(size_t(0), std::min((dsz - cPos) / valSz, BUFF_SZ)); + size_t valCnt = std::max(size_t(0), std::min((dsz - cPos) / valSz, BUFF_SZ / 10)); // 10 сек - запас - m_mtx.lock(); - - size_t buffWr = m_buffWritePos; + m_mtxWrite.lock(); + + size_t buffWritePos = m_buffWritePos; m_buffWritePos += valCnt; - if (m_buffWritePos >= BUFF_SZ) m_buffWritePos -= BUFF_SZ; - m_mtx.unlock(); + m_mtxWrite.unlock(); + size_t wPos = buffWritePos; size_t vlsz = sizeof(SV_Base::Value) * SV_PACKETSZ; while (cPos < dsz){ - m_buffer[buffWr].name = indata.data() + cPos; - m_buffer[buffWr].module = indata.c_str(); - m_buffer[buffWr].type = SV_Base::ValueType(*(indata.data() + cPos + SV_NAMESZ)); - m_buffer[buffWr].data.beginTime = bTm; - - memcpy(m_buffer[buffWr].data.vals, indata.data() + cPos + SV_NAMESZ + sizeof(SV_Base::ValueType), vlsz); + m_buffer[wPos].module = indata.c_str(); + m_buffer[wPos].name = indata.data() + cPos; + m_buffer[wPos].type = SV_Base::ValueType(*(indata.data() + cPos + SV_NAMESZ)); + m_buffer[wPos].data.beginTime = bTm; - m_buffer[buffWr].isActive = true; - - ++buffWr; - if (buffWr == BUFF_SZ) buffWr = 0; + memcpy(m_buffer[wPos].data.vals, indata.data() + cPos + SV_NAMESZ + sizeof(SV_Base::ValueType), vlsz); + + ++wPos; + if (wPos == BUFF_SZ) wPos = 0; cPos += valSz; + } + {std::lock_guard lck(m_mtxRead); + size_t wPos = buffWritePos; + for (size_t i = 0; i < valCnt; ++i){ + m_buffer[wPos].isActive = true; + ++wPos; + if (wPos == BUFF_SZ) wPos = 0; + } } + } void BufferData::incReadPos(){ - + std::lock_guard lck(m_mtxRead); m_buffer[m_buffReadPos].isActive = false; + ++m_buffReadPos; - if (m_buffReadPos == BUFF_SZ) m_buffReadPos = 0; } -BufferData::InputData BufferData::getDataByReadPos(){ +BufferData::InputData BufferData::getDataByReadPos()const{ return m_buffer[m_buffReadPos]; } +bool BufferData::hasNewData(){ + std::lock_guard lck(m_mtxRead); + + return m_buffer[m_buffReadPos].isActive; +} diff --git a/src/SVServer/src/buffer_data.h b/src/SVServer/src/buffer_data.h index 2f3f9cb..bc0c1d9 100644 --- a/src/SVServer/src/buffer_data.h +++ b/src/SVServer/src/buffer_data.h @@ -56,22 +56,24 @@ class BufferData /// вернуть данные по текущей позиции чтения /// \return - InputData getDataByReadPos(); + InputData getDataByReadPos()const; /// инкремент позиции чтения void incReadPos(); - const size_t BUFF_SZ = SV_VALUE_MAX_CNT * 10; // 10 сек - запас + bool hasNewData(); + + static const size_t BUFF_SZ = SV_VALUE_MAX_CNT * 10; // 10 сек - запас private: SV_Srv::Config cng; /// данные - InputData m_buffer[SV_VALUE_MAX_CNT * 10]; // 10 сек - запас; + InputData m_buffer[BUFF_SZ]; size_t m_buffReadPos = 0; ///< тек позиция чтения size_t m_buffWritePos = 0; ///< тек позиция записи - - std::mutex m_mtx; + + std::mutex m_mtxRead, m_mtxWrite; }; diff --git a/src/SVServer/src/thread_update.cpp b/src/SVServer/src/thread_update.cpp index e073138..645f267 100644 --- a/src/SVServer/src/thread_update.cpp +++ b/src/SVServer/src/thread_update.cpp @@ -112,9 +112,10 @@ void ThreadUpdate::updateSignals(std::map& sr const size_t buffSz = 2 * 3600000 / SV_CYCLESAVE_MS; // 2 часа const size_t packSz = SV_PACKETSZ * sizeof(Value); - - BufferData::InputData bufPos = m_buffData.getDataByReadPos(); - while (bufPos.isActive){ + + while (m_buffData.hasNewData()){ + BufferData::InputData bufPos = m_buffData.getDataByReadPos(); + isBuffActive = true; string sign = bufPos.name + bufPos.module; @@ -148,12 +149,10 @@ void ThreadUpdate::updateSignals(std::map& sr if (sdata->buffBeginPos >= buffSz) sdata->buffBeginPos = 0; } } - if (cng.outArchiveEna || cng.outDataBaseEna){ m_archive.addValue(sign, bufPos.data); } m_buffData.incReadPos(); - bufPos = m_buffData.getDataByReadPos(); } if (isBuffActive && pfUpdateSignalsCBack) { @@ -220,7 +219,7 @@ void ThreadUpdate::updateCycle(){ SV_Misc::TimerDelay tmDelay; tmDelay.update(); - int checkConnectTout = 5 * SV_CYCLESAVE_MS / 1000; + int checkConnectTout = 5 * SV_CYCLESAVE_MS; while (!m_thrStop){ @@ -234,8 +233,8 @@ void ThreadUpdate::updateCycle(){ } // проверка связи - if (tmDelay.onDelaySec(true, checkConnectTout, 0)){ - tmDelay.onDelaySec(false, 0, 0); + if (tmDelay.onDelayMS(true, checkConnectTout, 0)){ + tmDelay.onDelayMS(false, 0, 0); std::lock_guard lock(SV_Srv::m_mtxRW);