Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ComQueue Does Not Deallocate Buffers on Overflow #2853

Merged
merged 15 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Svc/ComQueue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ set(UT_SOURCE_FILES
"${CMAKE_CURRENT_LIST_DIR}/test/ut/ComQueueTestMain.cpp"
"${CMAKE_CURRENT_LIST_DIR}/test/ut/ComQueueTester.cpp"
)

set(UT_AUTO_HELPERS ON)
register_fprime_ut()
31 changes: 25 additions & 6 deletions Svc/ComQueue/ComQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,19 @@
void ComQueue::comQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::ComBuffer& data, U32 context) {
// Ensure that the port number of comQueueIn is consistent with the expectation
FW_ASSERT(portNum >= 0 && portNum < COM_PORT_COUNT, portNum);
this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));
(void)this->enqueue(portNum, QueueType::COM_QUEUE, reinterpret_cast<const U8*>(&data), sizeof(Fw::ComBuffer));

Check warning

Code scanning / CodeQL

Unchecked function argument Warning

This use of parameter data has not been checked.
}

void ComQueue::buffQueueIn_handler(const NATIVE_INT_TYPE portNum, Fw::Buffer& fwBuffer) {
const NATIVE_INT_TYPE queueNum = portNum + COM_PORT_COUNT;
// Ensure that the port number of buffQueueIn is consistent with the expectation
FW_ASSERT(portNum >= 0 && portNum < BUFFER_PORT_COUNT, portNum);
FW_ASSERT(queueNum < TOTAL_PORT_COUNT);
this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));
bool status =

Check notice

Code scanning / CodeQL

Use of basic integral type Note

status uses the basic integral type bool rather than a typedef with size and signedness.
this->enqueue(queueNum, QueueType::BUFFER_QUEUE, reinterpret_cast<const U8*>(&fwBuffer), sizeof(Fw::Buffer));

Check warning

Code scanning / CodeQL

Unchecked function argument Warning

This use of parameter fwBuffer has not been checked.
if (!status) {
this->deallocate_out(portNum, fwBuffer);
}
}

void ComQueue::comStatusIn_handler(const NATIVE_INT_TYPE portNum, Fw::Success& condition) {
Expand Down Expand Up @@ -181,29 +185,44 @@
this->tlmWrite_buffQueueDepth(buffQueueDepth);
}

// ----------------------------------------------------------------------
// Hook implementations for typed async input ports
// ----------------------------------------------------------------------

void ComQueue::buffQueueIn_overflowHook(FwIndexType portNum, Fw::Buffer& fwBuffer) {
Fixed Show fixed Hide fixed

Check notice

Code scanning / CodeQL

Use of basic integral type Note

portNum uses the basic integral type int rather than a typedef with size and signedness.
this->deallocate_out(portNum, fwBuffer);
Fixed Show fixed Hide fixed

Check warning

Code scanning / CodeQL

Unchecked function argument Warning

This use of parameter fwBuffer has not been checked.
}

// ----------------------------------------------------------------------
// Private helper methods
// ----------------------------------------------------------------------

void ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {
bool ComQueue::enqueue(const FwIndexType queueNum, QueueType queueType, const U8* data, const FwSizeType size) {

Check notice

Code scanning / CodeQL

Use of basic integral type Note

enqueue uses the basic integral type bool rather than a typedef with size and signedness.

Check notice

Code scanning / CodeQL

Use of basic integral type Note

queueNum uses the basic integral type int rather than a typedef with size and signedness.

Check notice

Code scanning / CodeQL

Use of basic integral type Note

size uses the basic integral type unsigned long rather than a typedef with size and signedness.

Check notice

Code scanning / CodeQL

Long function without assertion Note

All functions of more than 10 lines should have at least one assertion.
// Enqueue the given message onto the matching queue. When no space is available then emit the queue overflow event,
// set the appropriate throttle, and move on. Will assert if passed a message for a depth 0 queue.
const FwSizeType expectedSize = (queueType == QueueType::COM_QUEUE) ? sizeof(Fw::ComBuffer) : sizeof(Fw::Buffer);
const FwIndexType portNum = queueNum - ((queueType == QueueType::COM_QUEUE) ? 0 : COM_PORT_COUNT);
bool rvStatus = true;

Check notice

Code scanning / CodeQL

Use of basic integral type Note

rvStatus uses the basic integral type bool rather than a typedef with size and signedness.
FW_ASSERT(
expectedSize == size,
static_cast<FwAssertArgType>(size),
static_cast<FwAssertArgType>(expectedSize));
FW_ASSERT(portNum >= 0, portNum);
Fw::SerializeStatus status = this->m_queues[queueNum].enqueue(data, size);
if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT && !this->m_throttle[queueNum]) {
this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
this->m_throttle[queueNum] = true;
if (status == Fw::FW_SERIALIZE_NO_ROOM_LEFT) {
if (!this->m_throttle[queueNum]) {
this->log_WARNING_HI_QueueOverflow(queueType, static_cast<U32>(portNum));
this->m_throttle[queueNum] = true;
}

rvStatus = false;
}
// When the component is already in READY state process the queue to send out the next available message immediately
if (this->m_state == READY) {
this->processQueue();
}

return rvStatus;
}

void ComQueue::sendComBuffer(Fw::ComBuffer& comBuffer) {
Expand Down
7 changes: 5 additions & 2 deletions Svc/ComQueue/ComQueue.fpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ module Svc {

@ Array of queue depths for Fw::Com types
array ComQueueDepth = [ComQueueComPorts] U32

@ Array of queue depths for Fw::Buffer types
array BuffQueueDepth = [ComQueueBufferPorts] U32

Expand All @@ -22,14 +22,17 @@ module Svc {
@ Fw::Buffer output port
output port buffQueueSend: Fw.BufferSend

@ Port for deallocating Fw::Buffer on queue overflow
output port deallocate: Fw.BufferSend

@ Port for receiving the status signal
async input port comStatusIn: Fw.SuccessCondition

@ Port array for receiving Fw::ComBuffers
async input port comQueueIn: [ComQueueComPorts] Fw.Com drop

@ Port array for receiving Fw::Buffers
async input port buffQueueIn: [ComQueueBufferPorts] Fw.BufferSend drop
async input port buffQueueIn: [ComQueueBufferPorts] Fw.BufferSend hook

@ Port for scheduling telemetry output
async input port run: Svc.Sched drop
Expand Down
12 changes: 11 additions & 1 deletion Svc/ComQueue/ComQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,23 @@ class ComQueue : public ComQueueComponentBase {
U32 context /*!<The call order*/
);

// ----------------------------------------------------------------------
// Hook implementations for typed async input ports
// ----------------------------------------------------------------------

//! Queue overflow hook method that deallocates the fwBuffer
//!
void buffQueueIn_overflowHook(FwIndexType portNum, //!< The port number
Fw::Buffer& fwBuffer //!< The buffer
);

// ----------------------------------------------------------------------
// Helper Functions
// ----------------------------------------------------------------------

//! Enqueues a message on the appropriate queue
//!
void enqueue(const FwIndexType queueNum, //!< Index of the queue to enqueue the message
bool enqueue(const FwIndexType queueNum, //!< Index of the queue to enqueue the message
QueueType queueType, //!< Type of the queue and message data
const U8* data, //!< Pointer to the message data
const FwSizeType size //!< Size of the message
Expand Down
1 change: 1 addition & 0 deletions Svc/ComQueue/docs/sdd.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ The diagram below shows the `Svc::ComQueue` component.
|---------------|-------------------|---------------------------------------|--------------------------------------------------------|
| `output` | `comQueueSend` | `Fw.Com` | Fw::ComBuffer output port |
| `output` | `buffQueueSend` | `Fw.BufferSend` | Fw::Buffer output port |
| `output` | `deallocate` | `Fw.BufferSend` | Port for deallocating Fw::Buffer on queue overflow |
| `async input` | `comStatusIn` | `Fw.SuccessCondition` | Port for receiving the status signal |
| `async input` | `comQueueIn` | `[ComQueueComPorts] Fw.Com` | Port array for receiving Fw::ComBuffers |
| `async input` | `buffQueueIn` | `[ComQueueBufferPorts] Fw.BufferSend` | Port array for receiving Fw::Buffers |
Expand Down
71 changes: 23 additions & 48 deletions Svc/ComQueue/test/ut/ComQueueTester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,12 @@ void ComQueueTester ::configure() {
component.configure(configurationTable, 0, mallocAllocator);
}

void ComQueueTester ::sendByQueueNumber(NATIVE_INT_TYPE queueNum, NATIVE_INT_TYPE& portNum, QueueType& queueType) {
U8 data[BUFFER_LENGTH] = {0xde, 0xad, 0xbe};
Fw::ComBuffer comBuffer(&data[0], sizeof(data));
Fw::Buffer buffer(&data[0], sizeof(data));
void ComQueueTester ::sendByQueueNumber(Fw::Buffer& buffer,
NATIVE_INT_TYPE queueNum,
NATIVE_INT_TYPE& portNum,
QueueType& queueType) {
if (queueNum < ComQueue::COM_PORT_COUNT) {
Fw::ComBuffer comBuffer(buffer.getData(), buffer.getSize());
portNum = queueNum;
queueType = QueueType::COM_QUEUE;
invoke_to_comQueueIn(portNum, comBuffer, 0);
Expand Down Expand Up @@ -205,25 +206,40 @@ void ComQueueTester::testQueueOverflow(){

component.configure(configurationTable, 0, mallocAllocator);

U8 data[BUFFER_LENGTH] = {0xde, 0xad, 0xbe};
Fw::Buffer buffer(&data[0], sizeof(data));

for(NATIVE_INT_TYPE queueNum = 0; queueNum < ComQueue::TOTAL_PORT_COUNT; queueNum++) {
QueueType overflow_type;
NATIVE_INT_TYPE portNum;
// queue[portNum].depth + 2 to deliberately cause overflow and check throttle of exactly 1
for (NATIVE_UINT_TYPE msgCount = 0; msgCount < configurationTable.entries[queueNum].depth + 2; msgCount++) {
sendByQueueNumber(queueNum, portNum, overflow_type);
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);
dispatchAll();
}

if (QueueType::BUFFER_QUEUE == overflow_type) {
ASSERT_from_deallocate_SIZE(2);
ASSERT_from_deallocate(0, buffer);
ASSERT_from_deallocate(1, buffer);
}

ASSERT_EVENTS_QueueOverflow_SIZE(1);
ASSERT_EVENTS_QueueOverflow(0, overflow_type, portNum);

// Drain a message, and see if throttle resets
emitOne();

// Force another overflow by filling then deliberately overflowing the queue
sendByQueueNumber(queueNum, portNum, overflow_type);
sendByQueueNumber(queueNum, portNum, overflow_type);
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);
sendByQueueNumber(buffer, queueNum, portNum, overflow_type);
dispatchAll();

if (QueueType::BUFFER_QUEUE == overflow_type) {
ASSERT_from_deallocate_SIZE(3);
ASSERT_from_deallocate(2, buffer);
}

ASSERT_EVENTS_QueueOverflow_SIZE(2);
ASSERT_EVENTS_QueueOverflow(1, overflow_type, portNum);

Expand Down Expand Up @@ -283,45 +299,4 @@ void ComQueueTester ::from_comQueueSend_handler(const NATIVE_INT_TYPE portNum, F
// Helper methods
// ----------------------------------------------------------------------

void ComQueueTester ::connectPorts() {
// buffQueueIn
for (NATIVE_INT_TYPE i = 0; i < ComQueue::BUFFER_PORT_COUNT; ++i) {
this->connect_to_buffQueueIn(i, this->component.get_buffQueueIn_InputPort(i));
}

// comQueueIn
for (NATIVE_INT_TYPE i = 0; i < ComQueue::COM_PORT_COUNT; ++i) {
this->connect_to_comQueueIn(i, this->component.get_comQueueIn_InputPort(i));
}

// comStatusIn
this->connect_to_comStatusIn(0, this->component.get_comStatusIn_InputPort(0));

// run
this->connect_to_run(0, this->component.get_run_InputPort(0));

// Log
this->component.set_Log_OutputPort(0, this->get_from_Log(0));

// LogText
this->component.set_LogText_OutputPort(0, this->get_from_LogText(0));

// Time
this->component.set_Time_OutputPort(0, this->get_from_Time(0));

// Tlm
this->component.set_Tlm_OutputPort(0, this->get_from_Tlm(0));

// buffQueueSend
this->component.set_buffQueueSend_OutputPort(0, this->get_from_buffQueueSend(0));

// comQueueSend
this->component.set_comQueueSend_OutputPort(0, this->get_from_comQueueSend(0));
}

void ComQueueTester ::initComponents() {
this->init();
this->component.init(QUEUE_DEPTH, INSTANCE);
}

} // end namespace Svc
18 changes: 17 additions & 1 deletion Svc/ComQueue/test/ut/ComQueueTester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@
namespace Svc {

class ComQueueTester : public ComQueueGTestBase {

public:

// ----------------------------------------------------------------------
// Constants
// ----------------------------------------------------------------------

// Instance ID supplied to the component instance under test
static const NATIVE_INT_TYPE TEST_INSTANCE_ID = 0;

// Queue depth supplied to the component instance under test
static const NATIVE_INT_TYPE TEST_INSTANCE_QUEUE_DEPTH = 10;

private:
// ----------------------------------------------------------------------
// Construction and destruction
Expand All @@ -38,7 +51,10 @@ class ComQueueTester : public ComQueueGTestBase {
// ----------------------------------------------------------------------
void configure();

void sendByQueueNumber(NATIVE_INT_TYPE queueNumber, NATIVE_INT_TYPE& portNum, QueueType& queueType);
void sendByQueueNumber(Fw::Buffer& buffer,
NATIVE_INT_TYPE queueNumber,
NATIVE_INT_TYPE& portNum,
QueueType& queueType);

void emitOne();

Expand Down
26 changes: 13 additions & 13 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ fprime-fpl-convert-xml==1.0.3
fprime-fpl-extract-xml==1.0.3
fprime-fpl-layout==1.0.3
fprime-fpl-write-pic==1.0.3
fprime-fpp-check==2.2.0a1
fprime-fpp-depend==2.2.0a1
fprime-fpp-filenames==2.2.0a1
fprime-fpp-format==2.2.0a1
fprime-fpp-from-xml==2.2.0a1
fprime-fpp-locate-defs==2.2.0a1
fprime-fpp-locate-uses==2.2.0a1
fprime-fpp-syntax==2.2.0a1
fprime-fpp-to-cpp==2.2.0a1
fprime-fpp-to-dict==2.2.0a1
fprime-fpp-to-json==2.2.0a1
fprime-fpp-to-xml==2.2.0a1
fprime-fpp-check==2.2.0a2
fprime-fpp-depend==2.2.0a2
fprime-fpp-filenames==2.2.0a2
fprime-fpp-format==2.2.0a2
fprime-fpp-from-xml==2.2.0a2
fprime-fpp-locate-defs==2.2.0a2
fprime-fpp-locate-uses==2.2.0a2
fprime-fpp-syntax==2.2.0a2
fprime-fpp-to-cpp==2.2.0a2
fprime-fpp-to-dict==2.2.0a2
fprime-fpp-to-json==2.2.0a2
fprime-fpp-to-xml==2.2.0a2
fprime-gds==3.4.4a3
fprime-tools==3.4.4
fprime-tools==v3.4.5a1
fprime-visual==1.0.2
gcovr==6.0
idna==3.4
Expand Down
Loading