Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix server-side fragmentation #1239

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions ACE/ace/CDR_Stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class ACE_Export ACE_OutputCDR
ACE_CDR::Octet giop_minor_version = ACE_CDR_GIOP_MINOR_VERSION);

/// destructor
~ACE_OutputCDR (void);
virtual ~ACE_OutputCDR (void);

/**
* Disambiguate overload when inserting booleans, octets, chars, and
Expand Down Expand Up @@ -523,20 +523,14 @@ class ACE_Export ACE_OutputCDR
void unregister_monitor (void);
#endif /* ACE_HAS_MONITOR_POINTS==1 */

private:
// Find the message block in the chain of message blocks
// that the provide location locates.
ACE_Message_Block* find (char* loc);

/// disallow copying...
ACE_OutputCDR (const ACE_OutputCDR& rhs);
ACE_OutputCDR& operator= (const ACE_OutputCDR& rhs);
protected:

ACE_CDR::Boolean write_1 (const ACE_CDR::Octet *x);
ACE_CDR::Boolean write_2 (const ACE_CDR::UShort *x);
ACE_CDR::Boolean write_4 (const ACE_CDR::ULong *x);
ACE_CDR::Boolean write_8 (const ACE_CDR::ULongLong *x);
ACE_CDR::Boolean write_16 (const ACE_CDR::LongDouble *x);
// These are virtual to support GIOP fragmentation at the TAO layer.
virtual ACE_CDR::Boolean write_1 (const ACE_CDR::Octet *x);
virtual ACE_CDR::Boolean write_2 (const ACE_CDR::UShort *x);
virtual ACE_CDR::Boolean write_4 (const ACE_CDR::ULong *x);
virtual ACE_CDR::Boolean write_8 (const ACE_CDR::ULongLong *x);
virtual ACE_CDR::Boolean write_16 (const ACE_CDR::LongDouble *x);

/**
* write an array of @a length elements, each of @a size bytes and the
Expand All @@ -549,12 +543,26 @@ class ACE_Export ACE_OutputCDR
* but for several elements @c memcpy should be more efficient, it
* could be interesting to find the break even point and optimize
* for that case, but that would be too platform dependent.
*
* This is virtual to support GIOP fragmentation at the TAO layer.
*/
ACE_CDR::Boolean write_array (const void *x,
size_t size,
size_t align,
ACE_CDR::ULong length);
virtual ACE_CDR::Boolean write_array (const void *x,
size_t size,
size_t align,
ACE_CDR::ULong length);

// Overrides of the above functions may need to set this bit.
void good_bit (bool bit) { good_bit_ = bit; }

private:

// Find the message block in the chain of message blocks
// that the provide location locates.
ACE_Message_Block* find (char* loc);

/// disallow copying...
ACE_OutputCDR (const ACE_OutputCDR& rhs);
ACE_OutputCDR& operator= (const ACE_OutputCDR& rhs);

ACE_CDR::Boolean write_wchar_array_i (const ACE_CDR::WChar* x,
ACE_CDR::ULong length);
Expand Down Expand Up @@ -1057,7 +1065,6 @@ class ACE_Export ACE_InputCDR
#endif /* ACE_HAS_MONITOR_POINTS==1 */

protected:

/// The start of the chain of message blocks, even though in the
/// current version the chain always has length 1.
ACE_Message_Block start_;
Expand Down Expand Up @@ -1338,7 +1345,6 @@ class ACE_Export ACE_WChar_Codeset_Translator
ACE_CDR::Octet minor_version (ACE_InputCDR& input);
ACE_CDR::Octet major_version (ACE_OutputCDR& output);
ACE_CDR::Octet minor_version (ACE_OutputCDR& output);

};

// @@ These operators should not be inlined since they force SString.h
Expand Down
188 changes: 187 additions & 1 deletion TAO/tao/CDR.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,17 @@ TAO_OutputCDR::fragment_stream (ACE_CDR::ULong pending_alignment,
return true; // Success.
}

ACE_CDR::ULong
TAO_OutputCDR::fragment_bytes_available (ACE_CDR::ULong pending_alignment)
{
if (this->fragmentation_strategy_)
{
return this->fragmentation_strategy_->available (*this,
pending_alignment);
}

return 0xFFFFFFFF;
}

int
TAO_OutputCDR::offset (char* pos)
Expand Down Expand Up @@ -280,6 +290,183 @@ TAO_OutputCDR::offset (char* pos)
return offset;
}

ACE_CDR::Boolean
TAO_OutputCDR::write_1 (const ACE_CDR::Octet *x)
{
return
fragment_stream (ACE_CDR::OCTET_ALIGN, sizeof (CORBA::Octet))
&& ACE_OutputCDR::write_1 (x);
}

ACE_CDR::Boolean
TAO_OutputCDR::write_2 (const ACE_CDR::UShort *x)
{
return
fragment_stream (ACE_CDR::SHORT_ALIGN, sizeof (CORBA::UShort))
&& ACE_OutputCDR::write_2 (x);
}

ACE_CDR::Boolean
TAO_OutputCDR::write_4 (const ACE_CDR::ULong *x)
{
return
fragment_stream (ACE_CDR::LONG_ALIGN, sizeof (CORBA::ULong))
&& ACE_OutputCDR::write_4 (x);
}

ACE_CDR::Boolean
TAO_OutputCDR::write_8 (const ACE_CDR::ULongLong *x)
{
return
fragment_stream (ACE_CDR::LONGLONG_ALIGN, sizeof (CORBA::ULongLong))
&& ACE_OutputCDR::write_8 (x);
}

ACE_CDR::Boolean
TAO_OutputCDR::write_16 (const ACE_CDR::LongDouble *x)
{
if (!fragment_stream (ACE_CDR::LONGLONG_ALIGN, sizeof (CORBA::ULongLong)))
return 0;

ACE_CDR::ULong avail = fragment_bytes_available(ACE_CDR::LONGLONG_ALIGN);

if (avail < sizeof (CORBA::LongDouble))
{
const CORBA::ULongLong* ptr_8
= reinterpret_cast<const CORBA::ULongLong*> (x);

#if !defined (ACE_ENABLE_SWAP_ON_WRITE)

return
ACE_OutputCDR::write_8 (ptr_8)
&& fragment_stream (ACE_CDR::LONGLONG_ALIGN, sizeof (CORBA::ULongLong))
&& ACE_OutputCDR::write_8 (ptr_8 + 1);

#else

if (!this->do_byte_swap_)
{
return
ACE_OutputCDR::write_8 (ptr_8)
&& fragment_stream (ACE_CDR::LONGLONG_ALIGN, sizeof (CORBA::ULongLong))
&& ACE_OutputCDR::write_8 (ptr_8 + 1);
}
else
{
return
ACE_OutputCDR::write_8 (ptr_8 + 1)
&& fragment_stream (ACE_CDR::LONGLONG_ALIGN, sizeof (CORBA::ULongLong))
&& ACE_OutputCDR::write_8 (ptr_8);
}

#endif

}
else
{
return ACE_OutputCDR::write_16 (x);
}
}

ACE_CDR::Boolean
TAO_OutputCDR::write_array (const void *x,
size_t size,
size_t align,
ACE_CDR::ULong length)
{
if (length == 0)
return 1;

if (size <= ACE_CDR::MAX_ALIGNMENT)
{
const char* xPtr = static_cast<const char*> (x);

if (!fragment_stream (align, size))
return 0;

while (true)
{
ACE_CDR::ULong availableBytes = fragment_bytes_available(align);
ACE_CDR::ULong availableLength = availableBytes / size;
bool lastBatch = (availableLength >= length);
ACE_CDR::ULong batchLength = (lastBatch ? length : availableLength);

if (!ACE_OutputCDR::write_array (xPtr, size, align, batchLength))
return 0;

if (lastBatch)
return 1;

if (!fragment_stream (align, size))
return 0;

xPtr += batchLength * size;
length -= batchLength;
}
}
else
{
if (size == 16 && align == ACE_CDR::MAX_ALIGNMENT)
{
return write_array_16 (x, length);
}
else
{
good_bit(false);
return 0;
}
}
}

ACE_CDR::Boolean TAO_OutputCDR::write_array_16 (const void *x,
ACE_CDR::ULong length)
{
// may need to fragment in the middle of an element

const ACE_CDR::LongDouble* xPtr
= static_cast<const ACE_CDR::LongDouble*> (x);

if (!fragment_stream (ACE_CDR::MAX_ALIGNMENT, ACE_CDR::MAX_ALIGNMENT))
return 0;

while (true)
{
ACE_CDR::ULong availableBytes
= fragment_bytes_available(ACE_CDR::MAX_ALIGNMENT);
ACE_CDR::ULong availableLength = availableBytes / 16;
ACE_CDR::ULong batchLength;
bool lastBatch;

if (availableLength == 0)
{
// This will fragment in the middle of the 16-byte element.
if (!write_16 (reinterpret_cast<const ACE_CDR::LongDouble*> (xPtr)))
return 0;
batchLength = 1;
lastBatch = (length == 1);
}
else
{
// We can write a batch of whole elements into the current fragment.
lastBatch = (availableLength >= length);
batchLength = (lastBatch ? length : availableLength);

if (!ACE_OutputCDR::write_array (xPtr, 16,
ACE_CDR::MAX_ALIGNMENT,
batchLength))
return 0;
}

if (lastBatch)
return 1;

if (!fragment_stream (ACE_CDR::MAX_ALIGNMENT, ACE_CDR::MAX_ALIGNMENT))
return 0;

xPtr += batchLength;
length -= batchLength;
}
}

// ****************************************************************

Expand Down Expand Up @@ -346,7 +533,6 @@ TAO_InputCDR::throw_skel_exception (int error_num )

default :
throw ::CORBA::MARSHAL();

}
}

Expand Down
15 changes: 15 additions & 0 deletions TAO/tao/CDR.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ class TAO_Export TAO_OutputCDR : public ACE_OutputCDR
*/
bool fragment_stream (ACE_CDR::ULong pending_alignment,
ACE_CDR::ULong pending_length);
ACE_CDR::ULong fragment_bytes_available(ACE_CDR::ULong pending_alignment);

/// Are there more data fragments to come?
bool more_fragments (void) const;
Expand Down Expand Up @@ -239,12 +240,26 @@ class TAO_Export TAO_OutputCDR : public ACE_OutputCDR
/// Calculate the offset between pos and current wr_ptr.
int offset (char* pos);

protected:
// These are overridden to support GIOP fragmentation.
virtual ACE_CDR::Boolean write_1 (const ACE_CDR::Octet *x);
virtual ACE_CDR::Boolean write_2 (const ACE_CDR::UShort *x);
virtual ACE_CDR::Boolean write_4 (const ACE_CDR::ULong *x);
virtual ACE_CDR::Boolean write_8 (const ACE_CDR::ULongLong *x);
virtual ACE_CDR::Boolean write_16 (const ACE_CDR::LongDouble *x);

virtual ACE_CDR::Boolean write_array (const void *x,
size_t size,
size_t align,
ACE_CDR::ULong length);
private:
// disallow copying...
TAO_OutputCDR (const TAO_OutputCDR& rhs);
TAO_OutputCDR& operator= (const TAO_OutputCDR& rhs);

private:
ACE_CDR::Boolean write_array_16 (const void *x,
ACE_CDR::ULong length);
/**
* @name Outgoing GIOP Fragment Related Attributes
*
Expand Down
Loading