Program Listing for File DataQueue.hpp
↰ Return to documentation for file (include/depthai/device/DataQueue.hpp)
#pragma once
// std
#include <atomic>
#include <memory>
#include <vector>
// project
#include "depthai/pipeline/datatype/ADatatype.hpp"
#include "depthai/utility/LockingQueue.hpp"
#include "depthai/xlink/XLinkConnection.hpp"
// shared
#include "depthai-shared/datatype/RawBuffer.hpp"
#include "depthai-shared/xlink/XLinkConstants.hpp"
namespace dai {
class DataOutputQueue {
public:
using CallbackId = int;
private:
LockingQueue<std::shared_ptr<ADatatype>> queue;
std::thread readingThread;
std::atomic<bool> running{true};
std::string exceptionMessage{""};
const std::string name{""};
std::mutex callbacksMtx;
std::unordered_map<CallbackId, std::function<void(std::string, std::shared_ptr<ADatatype>)>> callbacks;
CallbackId uniqueCallbackId{0};
// const std::chrono::milliseconds READ_TIMEOUT{500};
public:
// DataOutputQueue constructor
DataOutputQueue(const std::shared_ptr<XLinkConnection> conn, const std::string& streamName, unsigned int maxSize = 16, bool blocking = true);
~DataOutputQueue();
bool isClosed() const;
void close();
void setBlocking(bool blocking);
bool getBlocking() const;
void setMaxSize(unsigned int maxSize);
unsigned int getMaxSize() const;
std::string getName() const;
CallbackId addCallback(std::function<void(std::string, std::shared_ptr<ADatatype>)>);
CallbackId addCallback(std::function<void(std::shared_ptr<ADatatype>)>);
CallbackId addCallback(std::function<void()> callback);
bool removeCallback(CallbackId callbackId);
template <class T>
bool has() {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::shared_ptr<ADatatype> val = nullptr;
if(queue.front(val) && dynamic_cast<T*>(val.get())) {
return true;
}
return false;
}
bool has() {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
return !queue.empty();
}
template <class T>
std::shared_ptr<T> tryGet() {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.tryPop(val)) return nullptr;
return std::dynamic_pointer_cast<T>(val);
}
std::shared_ptr<ADatatype> tryGet() {
return tryGet<ADatatype>();
}
template <class T>
std::shared_ptr<T> get() {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.waitAndPop(val)) {
throw std::runtime_error(exceptionMessage.c_str());
}
return std::dynamic_pointer_cast<T>(val);
}
std::shared_ptr<ADatatype> get() {
return get<ADatatype>();
}
template <class T>
std::shared_ptr<T> front() {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.front(val)) return nullptr;
return std::dynamic_pointer_cast<T>(val);
}
std::shared_ptr<ADatatype> front() {
return front<ADatatype>();
}
template <class T, typename Rep, typename Period>
std::shared_ptr<T> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::shared_ptr<ADatatype> val = nullptr;
if(!queue.tryWaitAndPop(val, timeout)) {
hasTimedout = true;
return nullptr;
}
hasTimedout = false;
return std::dynamic_pointer_cast<T>(val);
}
template <typename Rep, typename Period>
std::shared_ptr<ADatatype> get(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
return get<ADatatype>(timeout, hasTimedout);
}
template <class T>
std::vector<std::shared_ptr<T>> tryGetAll() {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::vector<std::shared_ptr<T>> messages;
queue.consumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
// dynamic pointer cast may return nullptr
// in which case that message in vector will be nullptr
messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
});
return messages;
}
std::vector<std::shared_ptr<ADatatype>> tryGetAll() {
return tryGetAll<ADatatype>();
}
template <class T>
std::vector<std::shared_ptr<T>> getAll() {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::vector<std::shared_ptr<T>> messages;
queue.waitAndConsumeAll([&messages](std::shared_ptr<ADatatype>& msg) {
// dynamic pointer cast may return nullptr
// in which case that message in vector will be nullptr
messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
});
return messages;
}
std::vector<std::shared_ptr<ADatatype>> getAll() {
return getAll<ADatatype>();
}
template <class T, typename Rep, typename Period>
std::vector<std::shared_ptr<T>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
if(!running) throw std::runtime_error(exceptionMessage.c_str());
std::vector<std::shared_ptr<T>> messages;
hasTimedout = !queue.waitAndConsumeAll(
[&messages](std::shared_ptr<ADatatype>& msg) {
// dynamic pointer cast may return nullptr
// in which case that message in vector will be nullptr
messages.push_back(std::dynamic_pointer_cast<T>(std::move(msg)));
},
timeout);
return messages;
}
template <typename Rep, typename Period>
std::vector<std::shared_ptr<ADatatype>> getAll(std::chrono::duration<Rep, Period> timeout, bool& hasTimedout) {
return getAll<ADatatype>(timeout, hasTimedout);
}
};
class DataInputQueue {
LockingQueue<std::shared_ptr<RawBuffer>> queue;
std::thread writingThread;
std::atomic<bool> running{true};
std::string exceptionMessage;
const std::string name;
std::atomic<std::size_t> maxDataSize{device::XLINK_USB_BUFFER_MAX_SIZE};
public:
DataInputQueue(const std::shared_ptr<XLinkConnection> conn,
const std::string& streamName,
unsigned int maxSize = 16,
bool blocking = true,
std::size_t maxDataSize = device::XLINK_USB_BUFFER_MAX_SIZE);
~DataInputQueue();
bool isClosed() const;
void close();
void setMaxDataSize(std::size_t maxSize);
std::size_t getMaxDataSize();
void setBlocking(bool blocking);
bool getBlocking() const;
void setMaxSize(unsigned int maxSize);
unsigned int getMaxSize() const;
std::string getName() const;
void send(const std::shared_ptr<RawBuffer>& rawMsg);
void send(const std::shared_ptr<ADatatype>& msg);
void send(const ADatatype& msg);
bool send(const std::shared_ptr<RawBuffer>& rawMsg, std::chrono::milliseconds timeout);
bool send(const std::shared_ptr<ADatatype>& msg, std::chrono::milliseconds timeout);
bool send(const ADatatype& msg, std::chrono::milliseconds timeout);
};
} // namespace dai