Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConnectionProvider: Create a Connection object which includes the iio_context and the CommandQueue. #1500

Merged
merged 3 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 3 additions & 11 deletions iioutil/include/iioutil/commandqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,21 @@ class SCOPY_IIOUTIL_EXPORT CommandQueue : public QObject
* @param numberOfThreads
* @param parent
*/
explicit CommandQueue(int numberOfThreads = 1, QObject *parent = nullptr);
explicit CommandQueue(QObject *parent = nullptr);
~CommandQueue();
void enqueue(Command *newCmd);
void start();
void wait();
void requestStop();
Q_SIGNALS:
void started(scopy::Command *);
void finished(scopy::Command *);
void runCmd();
private Q_SLOTS:
void work();
void cmdStarted(scopy::Command *cmd);
void cmdFinished(scopy::Command *cmd);
void resolveNext(scopy::Command *cmd);

private:
Command *m_currentCommand;
std::deque<Command *> m_commandQueue;
std::mutex m_commandMutex;
std::atomic<bool> m_running;
QThreadPool m_commandExecThreadPool;
int m_nbThreads;
bool m_async;
bool m_workNewThread;
};
} // namespace scopy
#endif // IIOCOMMANDQUEUE_H
65 changes: 65 additions & 0 deletions iioutil/include/iioutil/connection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#ifndef CONNECTION_H
#define CONNECTION_H
#include "scopy-iioutil_export.h"
#include "commandqueue.h"
#include <iio.h>
#include <QObject>

namespace scopy {
class SCOPY_IIOUTIL_EXPORT Connection : public QObject
{
Q_OBJECT
public:
Connection(QString uri);

const QString &uri() const;
CommandQueue *commandQueue() const;
struct iio_context *context() const;
int refCount() const;

protected:
~Connection();

/**
* @brief open
* Initialize the connection if not previously opened.
* If previously opened, increase the internal refCount.
*/
void open();

/**
* @brief close
* Decrement the internal refCount.
* Emit the aboutToBeDestroyed() signal if refCount is zero.
*/
void close();

/**
* @brief closeAll
* Reset the internal refCount to zero.
* Force close the Connection.
* Emit the aboutToBeDestroyed() signal.
*/
void closeAll();

Q_SIGNALS:
/**
* @brief aboutToBeDestroyed
* Connection clients should handle deinitialization
* of their iio_context/CommandQueue related operations
* in a slot connected to this signal.
* After the signal is emitted, the Connection object
* will no longer be valid.
*/
void aboutToBeDestroyed();

private:
friend class ConnectionProvider;
QString m_uri;
CommandQueue *m_commandQueue;
struct iio_context *m_context;
int m_refCount = 0;
};
} // namespace scopy

#endif // CONNECTION_H
39 changes: 39 additions & 0 deletions iioutil/include/iioutil/connectionprovider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef CONNECTIONPROVIDER_H
#define CONNECTIONPROVIDER_H

#include "scopy-iioutil_export.h"
#include "connection.h"

#include <QObject>
#include <QMap>
#include <mutex>

namespace scopy {
class SCOPY_IIOUTIL_EXPORT ConnectionProvider : public QObject
{
Q_OBJECT
protected:
ConnectionProvider(QObject *parent = nullptr);
~ConnectionProvider();

public:
ConnectionProvider(ConnectionProvider &other) = delete;
void operator=(const ConnectionProvider &) = delete;

static ConnectionProvider *GetInstance();
static Connection *open(QString uri);
static void close(QString uri);
static void closeAll(QString uri);

private:
Connection *_open(QString uri);
void _close(QString uri);
void _closeAll(QString uri);
void _closeAndRemove(QString uri);
static ConnectionProvider *pinstance_;
static std::mutex mutex_;
QMap<QString, Connection *> map;
};
} // namespace scopy

#endif // CONNECTIONPROVIDER_H
112 changes: 33 additions & 79 deletions iioutil/src/commandqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,14 @@ using namespace scopy;

Q_LOGGING_CATEGORY(CAT_COMMANDQUEUE, "CommandQueue");

CommandQueue::CommandQueue(int numberOfThreads, QObject *parent)
CommandQueue::CommandQueue(QObject *parent)
: QObject(parent)
, m_running(false)
, m_nbThreads(numberOfThreads)
, m_async(m_nbThreads > 1)
, m_workNewThread(m_nbThreads != 0)
, m_currentCommand(nullptr)
{
m_commandExecThreadPool.setMaxThreadCount(std::min(m_nbThreads, QThread::idealThreadCount()));
}
{}

CommandQueue::~CommandQueue()
{
requestStop();
wait();

for(auto c : m_commandQueue) {
delete c;
}
Expand All @@ -34,96 +26,58 @@ CommandQueue::~CommandQueue()

void CommandQueue::enqueue(Command *command)
{
std::lock_guard<std::mutex> lock(m_commandMutex);
start();
connect(command, &Command::started, this, &CommandQueue::cmdStarted,
static_cast<Qt::ConnectionType>(Qt::QueuedConnection | Qt::UniqueConnection));
connect(command, &Command::finished, this, &CommandQueue::cmdFinished,
static_cast<Qt::ConnectionType>(Qt::QueuedConnection | Qt::UniqueConnection));
m_commandQueue.push_back(command);
}
qDebug(CAT_COMMANDQUEUE) << "enqueued " << command << " " << m_commandQueue.size();

void CommandQueue::start()
{
if(!m_running) {
qDebug(CAT_COMMANDQUEUE) << "CommandQueue set running to true (start)";
m_running = true;
if(m_workNewThread) {
m_commandExecThreadPool.start(std::bind(&CommandQueue::work, this));
} else {
// trigger work on Main Thread
QMetaObject::invokeMethod(this, "work", Qt::QueuedConnection);
}
start();
}
}

void CommandQueue::requestStop()
void CommandQueue::start()
{
if(m_running) {
qDebug(CAT_COMMANDQUEUE) << "CommandQueue set running to false (stop)";
m_running = false;
if(m_workNewThread) {
std::unique_lock<std::mutex> lock(m_commandMutex);
m_commandQueue.clear();
}
}
m_running = true;
runCmd();
}

void CommandQueue::wait()
void CommandQueue::resolveNext(scopy::Command *cmd)
{
if(m_running) {
qDebug(CAT_COMMANDQUEUE) << "CommandQueue set running to true (wait)";
m_commandQueue.pop_front(); // also delete/disconnect
qDebug(CAT_COMMANDQUEUE) << "delete " << cmd;
disconnect(cmd, &Command::finished, this, &CommandQueue::resolveNext);
cmd->deleteLater();

if(m_commandQueue.size() == 0) {
m_running = false;
}
if(m_workNewThread) {
m_commandExecThreadPool.waitForDone();
} else {
runCmd();
}
}

void CommandQueue::work()
void CommandQueue::runCmd()
{
while(m_running) {
std::unique_lock<std::mutex> lock(m_commandMutex);
if(m_commandQueue.empty()) {
m_running = false;
break;
}

m_currentCommand = m_commandQueue.front();
if(m_async) {
QtConcurrent::run(&m_commandExecThreadPool, std::bind([=]() { m_currentCommand->execute(); }));
} else {
int size = m_commandQueue.size();
lock.unlock();
qDebug(CAT_COMMANDQUEUE)
<< "CommandQueue executing " << m_currentCommand << " cmdq size: " << size;
m_currentCommand->execute();
m_currentCommand->deleteLater();
m_currentCommand = nullptr;
lock.lock();
}
m_commandQueue.pop_front();
std::lock_guard<std::mutex> lock(m_commandMutex);
qDebug(CAT_COMMANDQUEUE) << "run cmd " << m_commandQueue.at(0);
if(m_running) {
connect(m_commandQueue.at(0), &Command::finished, this, &CommandQueue::resolveNext);
QtConcurrent::run(QThreadPool::globalInstance(), std::bind([=]() {
std::unique_lock<std::mutex> lock(m_commandMutex);
qDebug(CAT_COMMANDQUEUE) << "execute start " << m_commandQueue.at(0);
m_commandQueue.at(0)->execute();
qDebug(CAT_COMMANDQUEUE) << "execute stop " << m_commandQueue.at(0);
}));
}
}

void CommandQueue::cmdFinished(scopy::Command *cmd)
void CommandQueue::requestStop()
{
if(!cmd) {
cmd = dynamic_cast<Command *>(QObject::sender());
}
if(cmd) {
Q_EMIT finished(cmd);
std::lock_guard<std::mutex> lock(m_commandMutex);
qDebug(CAT_COMMANDQUEUE) << "request stop " << m_commandQueue.size();
if(m_running) {
m_running = false;
}
}

void CommandQueue::cmdStarted(scopy::Command *cmd)
{
if(!cmd) {
cmd = dynamic_cast<Command *>(QObject::sender());
}
if(cmd) {
Q_EMIT started(cmd);
}
}
void CommandQueue::wait() { QThreadPool::globalInstance()->waitForDone(); }

#include "moc_commandqueue.cpp"
2 changes: 1 addition & 1 deletion iioutil/src/commandqueueprovider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CommandQueueRefCounter::CommandQueueRefCounter(struct iio_context *ctx)
this->ctx = ctx;
this->refcnt++;
// TBD: automatically check the iio_context to see if multiple threads are possible (iiod vs tinyiiod)
this->cmdQueue = new CommandQueue(1);
this->cmdQueue = new CommandQueue();
}

CommandQueueRefCounter::~CommandQueueRefCounter()
Expand Down
64 changes: 64 additions & 0 deletions iioutil/src/connection.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#include "connection.h"

using namespace scopy;

Connection::Connection(QString uri)
{
this->m_uri = uri;
this->m_context = nullptr;
this->m_commandQueue = nullptr;
this->m_refCount = 0;
}

Connection::~Connection()
{
if(this->m_commandQueue) {
delete this->m_commandQueue;
this->m_commandQueue = nullptr;
}
if(this->m_context) {
iio_context_destroy(this->m_context);
this->m_context = nullptr;
}
}

const QString &Connection::uri() const { return m_uri; }

CommandQueue *Connection::commandQueue() const { return m_commandQueue; }

iio_context *Connection::context() const { return m_context; }

int Connection::refCount() const { return m_refCount; }

void Connection::open()
{
if(!this->m_context) {
this->m_context = iio_create_context_from_uri(this->m_uri.toStdString().c_str());
if(this->m_context) {
this->m_commandQueue = new CommandQueue();
this->m_refCount++;
}
} else {
this->m_refCount++;
}
}

void Connection::closeAll()
{
this->m_refCount = 0;
close();
}

void Connection::close()
{
this->m_refCount--;
if(this->m_refCount <= 0) {
/* If the open() and close() number of calls done by a client
* is mismatched, all the remaining clients should be notified of the
* destruction. */
this->m_refCount = 0;
Q_EMIT aboutToBeDestroyed();
}
}

#include "moc_connection.cpp"
Loading