Skip to content

Commit

Permalink
сохранение данных теперь в др потоке + фиксы
Browse files Browse the repository at this point in the history
  • Loading branch information
medvedev committed Nov 27, 2023
1 parent cc38591 commit 91268f2
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 126 deletions.
205 changes: 114 additions & 91 deletions src/SVServer/src/archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <cstdio>
#include <fstream>
#include <cstring>
#include <thread>

using namespace std;

Expand Down Expand Up @@ -62,125 +63,147 @@ void Archive::setConfig(const SV_Srv::Config& cng_){

void Archive::addSignal(const std::string& sname, const std::string& module, SV_Base::ValueType stype) {

std::string sign = sname + module;
if (m_archiveData.find(sign) != m_archiveData.end()) return;
for (int aIndex = 0; aIndex < 2; ++aIndex){
std::string sign = sname + module;
auto& archiveData = m_archiveData[aIndex];
if (archiveData.find(sign) != archiveData.end()) return;

m_archiveData[sign] = vector<SV_Base::RecData>(m_copySz);
archiveData[sign] = vector<SV_Base::RecData>(m_copySz);

m_valPos[sign] = 0;
auto& vpos = m_valPos[aIndex];
vpos[sign] = 0;

SV_Base::Value* buff = new SV_Base::Value[SV_PACKETSZ * m_copySz];
memset(buff, 0, SV_PACKETSZ * m_copySz * sizeof(SV_Base::Value));
for (size_t i = 0; i < m_copySz; ++i){
m_archiveData[sign][i].vals = &buff[i * SV_PACKETSZ];
}
if (m_chdb){
m_chdb->addSignal(sname, module, stype);
SV_Base::Value* buff = new SV_Base::Value[SV_PACKETSZ * m_copySz];
memset(buff, 0, SV_PACKETSZ * m_copySz * sizeof(SV_Base::Value));
for (size_t i = 0; i < m_copySz; ++i){
archiveData[sign][i].vals = &buff[i * SV_PACKETSZ];
}
if (m_chdb){
m_chdb->addSignal(sname, module, stype);
}
}
}

void Archive::addValue(const string& sign, const SV_Base::RecData& rd) {

int vp = m_valPos[sign];

m_archiveData[sign][vp].beginTime = rd.beginTime;
memcpy(m_archiveData[sign][vp].vals, rd.vals, SV_PACKETSZ * sizeof(SV_Base::Value));
auto& valPos = m_valPos[m_archiveIndex];
int vp = valPos[sign];
auto& archiveData = m_archiveData[m_archiveIndex];
archiveData[sign][vp].beginTime = rd.beginTime;
memcpy(archiveData[sign][vp].vals, rd.vals, SV_PACKETSZ * sizeof(SV_Base::Value));

++m_valPos[sign];

if (m_valPos[sign] == m_copySz) {
++valPos[sign];

if (valPos[sign] == m_copySz) {
copyToDisk(false);
}
}

bool Archive::copyToDisk(bool isStop){

const size_t dataSz = m_archiveData.size();
if (dataSz == 0){
return true;
}

if (cng.outArchiveEna){
const size_t SMAXCNT = 100; // макс кол-во сигналов в посылке

const size_t intSz = sizeof(int32_t),
tmSz = sizeof(uint64_t),
vlSz = sizeof(SV_Base::Value) * SV_PACKETSZ;
void Archive::copyToDisk(bool isStop){
if (m_saveThread && m_saveThread->joinable()){
m_saveThread->join();
}
if (!isStop){
m_saveThread = std::make_shared<std::thread>([this, aIndex = m_archiveIndex]{
copyToDiskImpl(false, aIndex);
});
m_archiveIndex = m_archiveIndex ? 0 : 1;
}else{
copyToDiskImpl(isStop, m_archiveIndex);
}
}

// name module group comment type vCnt
const size_t headSz = SV_NAMESZ + SV_NAMESZ + SV_NAMESZ + SV_COMMENTSZ + intSz + intSz;
void Archive::copyToDiskImpl(bool isStop, int archiveIndex){

vector<char> inArr((tmSz + vlSz) * m_copySz * SMAXCNT + headSz * SMAXCNT);
vector<char> compArr;
auto& valPos = m_valPos[archiveIndex];
auto& archiveData = m_archiveData[archiveIndex];
const size_t dataSz = archiveData.size();
if (dataSz == 0){
return;
}

vector<string> keys;
keys.reserve(dataSz);
for (auto &it : m_archiveData){
keys.push_back(it.first);
}
if (cng.outArchiveEna){
const size_t SMAXCNT = 100; // макс кол-во сигналов в посылке

const size_t intSz = sizeof(int32_t),
tmSz = sizeof(uint64_t),
vlSz = sizeof(SV_Base::Value) * SV_PACKETSZ;

// name module group comment type vCnt
const size_t headSz = SV_NAMESZ + SV_NAMESZ + SV_NAMESZ + SV_COMMENTSZ + intSz + intSz;

vector<char> inArr((tmSz + vlSz) * m_copySz * SMAXCNT + headSz * SMAXCNT);
vector<char> compArr;

vector<string> keys;
keys.reserve(dataSz);
for (auto &it : archiveData){
keys.push_back(it.first);
}

const auto fpath = getOutPath(isStop);
fstream file(fpath, std::fstream::binary | std::fstream::app);
if (!file.good()){
if (pfStatusCBack) pfStatusCBack("Archive::copyToDisk file not open for write, fpath " + fpath);
return;
}

size_t sCnt = 0, csize = 0;
for (size_t i = 0; i < dataSz; ++i) {

auto sign = SV_Srv::getSignalData(keys[i]);

string sn = sign->name + sign->module;

char* pIn = inArr.data();

int vCnt = valPos[sn];
if (vCnt > 0) {
memcpy(pIn + csize, sign->name.c_str(), SV_NAMESZ); csize += SV_NAMESZ;
memcpy(pIn + csize, sign->module.c_str(), SV_NAMESZ); csize += SV_NAMESZ;
memcpy(pIn + csize, sign->group.c_str(), SV_NAMESZ); csize += SV_NAMESZ;
memcpy(pIn + csize, sign->comment.c_str(), SV_COMMENTSZ); csize += SV_COMMENTSZ;
memcpy(pIn + csize, &sign->type, intSz); csize += intSz;
memcpy(pIn + csize, &vCnt, intSz); csize += intSz;

for (int j = 0; j < vCnt; ++j) {
memcpy(pIn + csize, &archiveData[sn][j].beginTime, tmSz); csize += tmSz;
memcpy(pIn + csize, archiveData[sn][j].vals, vlSz); csize += vlSz;
}

fstream file(getOutPath(isStop), std::fstream::binary | std::fstream::app);
if (!file.good()){
return false;
++sCnt;
}

size_t sCnt = 0, csize = 0;
for (size_t i = 0; i < dataSz; ++i) {
if ((sCnt > 0) && ((sCnt >= SMAXCNT) || (i == (dataSz - 1)))) {
sCnt = 0;

auto sign = SV_Srv::getSignalData(keys[i]);
size_t compSz = 0;

string sn = sign->name + sign->module;
if (!compressData(csize, inArr, compSz, compArr)) {
if (pfStatusCBack) pfStatusCBack("Archive::copyToDisk compressData error");
file.close();
return;
};

char* pIn = inArr.data();
file.write((char *)&compSz, sizeof(int));
file.write((char *)&csize, sizeof(int));
file.write((char *)compArr.data(), compSz);

int vCnt = m_valPos[sn];
if (vCnt > 0) {
memcpy(pIn + csize, sign->name.c_str(), SV_NAMESZ); csize += SV_NAMESZ;
memcpy(pIn + csize, sign->module.c_str(), SV_NAMESZ); csize += SV_NAMESZ;
memcpy(pIn + csize, sign->group.c_str(), SV_NAMESZ); csize += SV_NAMESZ;
memcpy(pIn + csize, sign->comment.c_str(), SV_COMMENTSZ); csize += SV_COMMENTSZ;
memcpy(pIn + csize, &sign->type, intSz); csize += intSz;
memcpy(pIn + csize, &vCnt, intSz); csize += intSz;

for (int j = 0; j < vCnt; ++j) {
memcpy(pIn + csize, &m_archiveData[sn][j].beginTime, tmSz); csize += tmSz;
memcpy(pIn + csize, m_archiveData[sn][j].vals, vlSz); csize += vlSz;
}

++sCnt;
}

if ((sCnt > 0) && ((sCnt >= SMAXCNT) || (i == (dataSz - 1)))) {
sCnt = 0;

size_t compSz = 0;

if (!compressData(csize, inArr, compSz, compArr)) {
if (pfStatusCBack) pfStatusCBack("Archive::copyToDisk compressData error");
file.close();
return false;
};

file.write((char *)&compSz, sizeof(int));
file.write((char *)&csize, sizeof(int));
file.write((char *)compArr.data(), compSz);

csize = 0;
}
csize = 0;
}
file.close();
}
}
file.close();
}

if (m_chdb && cng.outDataBaseEna){
m_chdb->saveSData(isStop, m_valPos, m_archiveData);
}
if (m_chdb && cng.outDataBaseEna){
m_chdb->saveSData(isStop, valPos, archiveData);
}

for(auto& v : m_valPos){
v.second = 0;
}
for(auto& v : valPos){
v.second = 0;
}

return true;
return;
}

bool Archive::compressData(size_t inSz, const vector<char>& inArr, size_t& outsz, vector<char>& outArr) {
Expand Down
10 changes: 7 additions & 3 deletions src/SVServer/src/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "SVServer/sv_server.h"

#include <map>
#include <thread>

class ClickHouseDB;

Expand All @@ -40,14 +41,15 @@ class Archive

void init(const SV_Srv::Config&);

bool copyToDisk(bool isStop);
void copyToDisk(bool isStop);

void addSignal(const std::string& sname, const std::string& module, SV_Base::ValueType stype);
void addValue(const std::string& sign, const SV_Base::RecData& rd);

void setConfig(const SV_Srv::Config&);

private:
void copyToDiskImpl(bool isStop, int archiveIndex);
bool isCopyTimeHour();
std::string getOutPath(bool isStop);
bool compressData(size_t insz, const std::vector<char>& inArr, size_t& outsz, std::vector<char>& outArr);
Expand All @@ -60,13 +62,15 @@ class Archive
std::string m_copyDateMem = "";

int m_crtFileHour = 0;
int m_archiveIndex = 0;
size_t m_copySz = 0;
std::map<std::string, int> m_valPos;
std::map<std::string, int> m_valPos[2];

SV_Misc::Front m_front;

std::map<std::string, std::vector<SV_Base::RecData>> m_archiveData;
std::map<std::string, std::vector<SV_Base::RecData>> m_archiveData[2];

ClickHouseDB* m_chdb{};
std::shared_ptr<std::thread> m_saveThread;

};
50 changes: 31 additions & 19 deletions src/SVServer/src/buffer_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,47 +46,59 @@ void BufferData::updateDataSignals(const std::string& indata, uint64_t bTm){
valSz = SV_NAMESZ + sizeof(SV_Base::ValueType) + sizeof(SV_Base::Value) * SV_PACKETSZ,
cPos = SV_NAMESZ;

size_t valCnt = std::max(size_t(0), std::min((dsz - cPos) / valSz, BUFF_SZ));
size_t valCnt = std::max(size_t(0), std::min((dsz - cPos) / valSz, BUFF_SZ / 10)); // 10 сек - запас

m_mtx.lock();

size_t buffWr = m_buffWritePos;
m_mtxWrite.lock();
size_t buffWritePos = m_buffWritePos;
m_buffWritePos += valCnt;

if (m_buffWritePos >= BUFF_SZ) m_buffWritePos -= BUFF_SZ;

m_mtx.unlock();
m_mtxWrite.unlock();

size_t wPos = buffWritePos;
size_t vlsz = sizeof(SV_Base::Value) * SV_PACKETSZ;
while (cPos < dsz){

m_buffer[buffWr].name = indata.data() + cPos;
m_buffer[buffWr].module = indata.c_str();
m_buffer[buffWr].type = SV_Base::ValueType(*(indata.data() + cPos + SV_NAMESZ));
m_buffer[buffWr].data.beginTime = bTm;

memcpy(m_buffer[buffWr].data.vals, indata.data() + cPos + SV_NAMESZ + sizeof(SV_Base::ValueType), vlsz);
m_buffer[wPos].module = indata.c_str();
m_buffer[wPos].name = indata.data() + cPos;
m_buffer[wPos].type = SV_Base::ValueType(*(indata.data() + cPos + SV_NAMESZ));
m_buffer[wPos].data.beginTime = bTm;

m_buffer[buffWr].isActive = true;

++buffWr;
if (buffWr == BUFF_SZ) buffWr = 0;
memcpy(m_buffer[wPos].data.vals, indata.data() + cPos + SV_NAMESZ + sizeof(SV_Base::ValueType), vlsz);
++wPos;
if (wPos == BUFF_SZ) wPos = 0;

cPos += valSz;
}
{std::lock_guard<std::mutex> lck(m_mtxRead);
size_t wPos = buffWritePos;
for (size_t i = 0; i < valCnt; ++i){
m_buffer[wPos].isActive = true;
++wPos;
if (wPos == BUFF_SZ) wPos = 0;
}
}

}

void BufferData::incReadPos(){

std::lock_guard<std::mutex> lck(m_mtxRead);
m_buffer[m_buffReadPos].isActive = false;

++m_buffReadPos;

if (m_buffReadPos == BUFF_SZ) m_buffReadPos = 0;
}

BufferData::InputData BufferData::getDataByReadPos(){
BufferData::InputData BufferData::getDataByReadPos()const{

return m_buffer[m_buffReadPos];
}

bool BufferData::hasNewData(){
std::lock_guard<std::mutex> lck(m_mtxRead);

return m_buffer[m_buffReadPos].isActive;
}

12 changes: 7 additions & 5 deletions src/SVServer/src/buffer_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,24 @@ class BufferData

/// вернуть данные по текущей позиции чтения
/// \return
InputData getDataByReadPos();
InputData getDataByReadPos()const;

/// инкремент позиции чтения
void incReadPos();

const size_t BUFF_SZ = SV_VALUE_MAX_CNT * 10; // 10 сек - запас
bool hasNewData();

static const size_t BUFF_SZ = SV_VALUE_MAX_CNT * 10; // 10 сек - запас

private:

SV_Srv::Config cng;

/// данные
InputData m_buffer[SV_VALUE_MAX_CNT * 10]; // 10 сек - запас;
InputData m_buffer[BUFF_SZ];

size_t m_buffReadPos = 0; ///< тек позиция чтения
size_t m_buffWritePos = 0; ///< тек позиция записи

std::mutex m_mtx;
std::mutex m_mtxRead, m_mtxWrite;
};
Loading

0 comments on commit 91268f2

Please sign in to comment.