Template Class ChunkDistributor

Class Documentation

template<typename ChunkDistributorDataType>
class ChunkDistributor

The ChunkDistributor is the low layer building block to send SharedChunks to a dynamic number of ChunkQueus. Together with the ChunkQueuePusher, the ChunkDistributor builds the infrastructure to exchange memory chunks between different data producers and consumers that could be located in different processes. Besides a modifiable container of ChunkQueues to which a SharedChunk can be deliverd, it holds a configurable history of last sent chunks. This allows to provide a newly added queue a number of last chunks to start from. This is needed for functionality known as latched topic in ROS or field in ara::com. A ChunkDistributor is used to build elements of higher abstraction layers that also do memory managemet and provide an API towards the real user.

About Concurrency: This ChunkDistributor can be used with different LockingPolicies for different scenarios When different threads operate on it (e.g. application sends chunks and RouDi adds and removes queues), a locking policy must be used that ensures consistent data in the ChunkDistributorData.

Todo:

There are currently some challenge: For the stored queues and the history, containers are used which are not thread safe. Therefore we use an inter-process mutex. But this can lead to deadlocks if a user process gets terminated while one of its threads is in the ChunkDistributor and holds a lock. An easier setup would be if changing the queues by a middleware thread and sending chunks by the user process would not interleave. I.e. there is no concurrent access to the containers. Then a memory synchronization would be sufficient. The cleanup() call is the biggest challenge. This is used to free chunks that are still held by a not properly terminated user application. Even if access from middleware and user threads do not overlap, the history container to cleanup could be in an inconsistent state as the application was hard terminated while changing it. We would need a container like the UsedChunkList to have one that is robust against such inconsistencies…. A perfect job for our future selves

Public Types

using MemberType_t = ChunkDistributorDataType
using ChunkQueueData_t = typename ChunkDistributorDataType::ChunkQueueData_t
using ChunkQueuePusher_t = typename ChunkDistributorDataType::ChunkQueuePusher_t

Public Functions

explicit ChunkDistributor(cxx::not_null<MemberType_t*const> chunkDistrubutorDataPtr) noexcept
ChunkDistributor(const ChunkDistributor &other) = delete
ChunkDistributor &operator=(const ChunkDistributor&) = delete
ChunkDistributor(ChunkDistributor &&rhs) noexcept = default
ChunkDistributor &operator=(ChunkDistributor &&rhs) noexcept = default
virtual ~ChunkDistributor() noexcept = default
cxx::expected<ChunkDistributorError> tryAddQueue(cxx::not_null<ChunkQueueData_t*const> queueToAdd, const uint64_t requestedHistory = 0U) noexcept

Add a queue to the internal list of chunk queues to which chunks are delivered when calling deliverToAllStoredQueues.

Parameters:
  • queueToAdd[in] chunk queue to add to the list

  • requestedHistory[in] number of last chunks from history to send if available. If history size is smaller then the available history size chunks are provided

Returns:

if the queue could be added it returns success, otherwiese a ChunkDistributor error

cxx::expected<ChunkDistributorError> tryRemoveQueue(cxx::not_null<ChunkQueueData_t*const> queueToRemove) noexcept

Remove a queue from the internal list of chunk queues.

Parameters:

queueToRemove[in] is the queue to remove from the list

Returns:

if the queue could be removed it returns success, otherwiese a ChunkDistributor error

void removeAllQueues() noexcept

Delete all the stored chunk queues.

bool hasStoredQueues() const noexcept

Get the information whether there are any stored chunk queues.

Returns:

true if there are stored chunk queues, false if not

uint64_t deliverToAllStoredQueues(mepoo::SharedChunk chunk) noexcept

Deliver the provided shared chunk to all the stored chunk queues. The chunk will be added to the chunk history.

Parameters:

chunk[in] is the SharedChunk to be delivered

Returns:

the number of queues the chunk was delivered to

cxx::expected<ChunkDistributorError> deliverToQueue(const cxx::UniqueId uniqueQueueId, const uint32_t lastKnownQueueIndex, mepoo::SharedChunk chunk) noexcept

Deliver the provided shared chunk to the chunk queue with the provided ID. The chunk will NOT be added to the chunk history.

Parameters:
  • uniqueQueueId[in] is an unique ID which identifies the queue to which this chunk shall be delivered

  • lastKnownQueueIndex[in] is used for a fast lookup of the queue with uniqueQueueId

  • chunk[in] is the SharedChunk to be delivered

Returns:

ChunkDistributorError if the queue was not found

cxx::optional<uint32_t> getQueueIndex(const cxx::UniqueId uniqueQueueId, const uint32_t lastKnownQueueIndex) const noexcept

Lookup for the index of a queue with a specific cxx::UniqueId.

Parameters:
  • uniqueQueueId[in] is the unique ID of the queue to query the index

  • lastKnownQueueIndex[in] is used for a fast lookup of the queue with uniqueQueueId; if the queue is not found at the index, the queue is searched by iteration over all stored queues

Returns:

the index of the queue with uniqueQueueId or cxx::nullopt if the queue was not found

void addToHistoryWithoutDelivery(mepoo::SharedChunk chunk) noexcept

Update the chunk history but do not deliver the chunk to any chunk queue. E.g. use case is to to update a non offered field in ara.

Parameters:

chunk[in] to add to the chunk history

uint64_t getHistorySize() noexcept

Get the current size of the chunk history.

Returns:

chunk history size

uint64_t getHistoryCapacity() const noexcept

Get the capacity of the chunk history.

Returns:

chunk history capacity

void clearHistory() noexcept

Clears the chunk history.

void cleanup() noexcept

cleanup the used shrared memory chunks

Protected Functions

const MemberType_t *getMembers() const noexcept
MemberType_t *getMembers() noexcept
bool pushToQueue(cxx::not_null<ChunkQueueData_t*const> queue, mepoo::SharedChunk chunk) noexcept