Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor AMQP logic to better isolate rust AMQP code from uAMQP code. #6008

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@
"*nlohmann-json*",
"eng/docs/api/assets/**/*",
"eng/CredScanSuppression.json",
"sdk/core/azure-core-amqp/vendor/**/*",
"sdk/core/azure-core-amqp/rust_amqp/azure_core_amqp/**/*",
"sdk/core/azure-core-amqp/**/vendor/**/*",
"sdk/core/azure-core-amqp/**/rust_amqp/azure_core_amqp/**/*",
"*.toml",
"sdk/storage/*/NOTICE.txt",
"sdk/tables/*/NOTICE.txt"
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[workspace]
resolver = "2"
members = [
"sdk/core/azure-core-amqp/rust_amqp/azure_core_amqp",
"sdk/core/azure-core-amqp/rust_amqp/rust_wrapper",
"sdk/core/azure-core-amqp/src/impl/rust_amqp/rust_amqp/azure_core_amqp",
"sdk/core/azure-core-amqp/src/impl/rust_amqp/rust_amqp/rust_wrapper",
]

[workspace.package]
Expand All @@ -14,7 +14,7 @@ rust-version = "1.76"


[workspace.dependencies.azure_core_amqp]
path = "sdk/core/azure-core-amqp/rust_amqp/azure_core_amqp"
path = "sdk/core/azure-core-amqp/src/impl/rust_amqp/rust_amqp/azure_core_amqp"


[workspace.dependencies]
Expand Down
1 change: 1 addition & 0 deletions cmake-modules/AzureDoxygen.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ function(generate_documentation PROJECT_NAME PROJECT_VERSION)
set(DOXYGEN_REPEAT_BRIEF NO)

doxygen_add_docs(${PROJECT_NAME}-docs
./inc ./README.md
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
ALL
COMMENT "Generate documentation for ${PROJECT_NAME} with Doxygen Version ${DOXYGEN_VERSION}")
endif()
Expand Down
122 changes: 100 additions & 22 deletions sdk/core/azure-core-amqp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ if (USE_UAMQP)
set(build_as_object_library ON CACHE BOOL "Produce object library" FORCE)
set(atomic_refcount ON CACHE BOOL "Use atomic refcount" FORCE)

add_subdirectory(vendor/azure-uamqp-c SYSTEM)
add_subdirectory(src/impl/uamqp/vendor/azure-uamqp-c SYSTEM)

# uAMQP specific compiler settings. Primarily used to disable warnings in the uAMQP codebase.
if (MSVC)
Expand All @@ -67,8 +67,8 @@ endif()

if (USE_RUST_AMQP)

include (${CMAKE_SOURCE_DIR}/cmake-modules/acquire_rust.cmake)
install_rustup()
# include (${CMAKE_SOURCE_DIR}/cmake-modules/acquire_rust.cmake)
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
# install_rustup()

include(FetchContent)
FetchContent_Declare(
Expand All @@ -79,7 +79,7 @@ if (USE_RUST_AMQP)
FetchContent_MakeAvailable(Corrosion)

# Import targets defined in a package or workspace manifest `Cargo.toml` file
corrosion_import_crate(MANIFEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/rust_amqp/rust_wrapper/Cargo.toml)
corrosion_import_crate(MANIFEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/src/impl/rust_amqp/rust_amqp/rust_wrapper/Cargo.toml)
endif()


Expand Down Expand Up @@ -123,21 +123,13 @@ set (AZURE_CORE_AMQP_HEADER
)

set(AZURE_CORE_AMQP_SOURCE
src/amqp/cancellable.cpp
src/amqp/claim_based_security.cpp
src/amqp/connection.cpp
src/amqp/connection_string_credential.cpp
src/amqp/link.cpp
src/amqp/management.cpp
src/amqp/message_receiver.cpp
src/amqp/message_sender.cpp
src/amqp/private/claims_based_security_impl.hpp
src/amqp/private/connection_impl.hpp
src/amqp/private/link_impl.hpp
src/amqp/private/management_impl.hpp
src/amqp/private/message_receiver_impl.hpp
src/amqp/private/message_sender_impl.hpp
src/amqp/private/session_impl.hpp
src/amqp/private/unique_handle.hpp
src/amqp/session.cpp
src/common/global_state.cpp
Expand All @@ -160,25 +152,111 @@ set(AZURE_CORE_AMQP_SOURCE
src/models/private/source_impl.hpp
src/models/private/target_impl.hpp
src/models/private/value_impl.hpp
src/network/amqp_header_transport.cpp
src/network/private/transport_impl.hpp
src/network/private/transport_impl.hpp
src/network/sasl_transport.cpp
src/network/socket_listener.cpp
src/network/socket_transport.cpp
src/network/tls_transport.cpp
src/network/transport.cpp
src/private/package_version.hpp
)

if (USE_UAMQP)
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} $<TARGET_OBJECTS:uamqp>)
set(AZURE_UAMQP_SOURCE
src/impl/uamqp/amqp/cancellable.cpp
src/impl/uamqp/amqp/claim_based_security.cpp
src/impl/uamqp/amqp/connection.cpp
src/impl/uamqp/amqp/connection_string_credential.cpp
src/impl/uamqp/amqp/link.cpp
src/impl/uamqp/amqp/management.cpp
src/impl/uamqp/amqp/message_receiver.cpp
src/impl/uamqp/amqp/message_sender.cpp
src/impl/uamqp/amqp/session.cpp
src/impl/uamqp/amqp/private/claims_based_security_impl.hpp
src/impl/uamqp/amqp/private/connection_impl.hpp
src/impl/uamqp/amqp/private/link_impl.hpp
src/impl/uamqp/amqp/private/management_impl.hpp
src/impl/uamqp/amqp/private/message_receiver_impl.hpp
src/impl/uamqp/amqp/private/message_sender_impl.hpp
src/impl/uamqp/amqp/private/session_impl.hpp
# src/impl/uamqp/models/amqp_detach.cpp
# src/impl/uamqp/models/amqp_error.cpp
# src/impl/uamqp/models/amqp_header.cpp
# src/impl/uamqp/models/amqp_message.cpp
# src/impl/uamqp/models/amqp_properties.cpp
# src/impl/uamqp/models/amqp_transfer.cpp
# src/impl/uamqp/models/amqp_value.cpp
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
# src/impl/uamqp/models/message_source.cpp
# src/impl/uamqp/models/message_target.cpp
# src/impl/uamqp/models/messaging_values.cpp
# src/impl/uamqp/models/private/error_impl.hpp
# src/impl/uamqp/models/private/header_impl.hpp
# src/impl/uamqp/models/private/message_impl.hpp
# src/impl/uamqp/models/private/performatives
# src/impl/uamqp/models/private/properties_impl.hpp
# src/impl/uamqp/models/private/source_impl.hpp
# src/impl/uamqp/models/private/target_impl.hpp
# src/impl/uamqp/models/private/value_impl.hpp
# src/impl/uamqp/models/private/performatives/detach_impl.hpp
# src/impl/uamqp/models/private/performatives/transfer_impl.hpp
src/impl/uamqp/network/amqp_header_transport.cpp
src/impl/uamqp/network/sasl_transport.cpp
src/impl/uamqp/network/socket_listener.cpp
src/impl/uamqp/network/socket_transport.cpp
src/impl/uamqp/network/tls_transport.cpp
src/impl/uamqp/network/transport.cpp
src/impl/uamqp/network/private/transport_impl.hpp
)
endif()

if(USE_RUST_AMQP)
set(AZURE_RUST_AMQP_SOURCE
src/impl/rust_amqp/amqp/claim_based_security.cpp
src/impl/rust_amqp/amqp/connection.cpp
src/impl/rust_amqp/amqp/connection_string_credential.cpp
src/impl/rust_amqp/amqp/link.cpp
src/impl/rust_amqp/amqp/management.cpp
src/impl/rust_amqp/amqp/message_receiver.cpp
src/impl/rust_amqp/amqp/message_sender.cpp
src/impl/rust_amqp/amqp/session.cpp
src/impl/rust_amqp/amqp/private/claims_based_security_impl.hpp
src/impl/rust_amqp/amqp/private/connection_impl.hpp
src/impl/rust_amqp/amqp/private/link_impl.hpp
src/impl/rust_amqp/amqp/private/management_impl.hpp
src/impl/rust_amqp/amqp/private/message_receiver_impl.hpp
src/impl/rust_amqp/amqp/private/message_sender_impl.hpp
src/impl/rust_amqp/amqp/private/session_impl.hpp
# src/impl/rust_amqp/models/amqp_detach.cpp
# src/impl/rust_amqp/models/amqp_error.cpp
# src/impl/rust_amqp/models/amqp_header.cpp
# src/impl/rust_amqp/models/amqp_message.cpp
# src/impl/rust_amqp/models/amqp_properties.cpp
# src/impl/rust_amqp/models/amqp_transfer.cpp
# src/impl/rust_amqp/models/amqp_value.cpp
# src/impl/rust_amqp/models/message_source.cpp
# src/impl/rust_amqp/models/message_target.cpp
# src/impl/rust_amqp/models/messaging_values.cpp
# src/impl/rust_amqp/models/private/error_impl.hpp
# src/impl/rust_amqp/models/private/header_impl.hpp
# src/impl/rust_amqp/models/private/message_impl.hpp
# src/impl/rust_amqp/models/private/performatives
# src/impl/rust_amqp/models/private/properties_impl.hpp
# src/impl/rust_amqp/models/private/source_impl.hpp
# src/impl/rust_amqp/models/private/target_impl.hpp
# src/impl/rust_amqp/models/private/value_impl.hpp
# src/impl/rust_amqp/models/private/performatives/detach_impl.hpp
# src/impl/rust_amqp/models/private/performatives/transfer_impl.hpp
)

endif()

if (USE_UAMQP)
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} ${AZURE_UAMQP_SOURCE} $<TARGET_OBJECTS:uamqp>)
elseif(USE_RUST_AMQP)
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER})
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} ${AZURE_RUST_AMQP_SOURCE})
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
endif()

if (USE_UAMQP)
target_include_directories(azure-core-amqp SYSTEM PRIVATE ${UAMQP_INC_FOLDER})
target_include_directories(azure-core-amqp PRIVATE src/impl/uamqp/amqp/private src/impl/uamqp/amqp/network ${UAMQP_INC_FOLDER})
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
endif()

if (USE_RUST_AMQP)
target_include_directories(azure-core-amqp PRIVATE src/impl/rust_amqp/amqp/private src/impl/rust_amqp/rust_amqp/rust_wrapper)
endif()

target_include_directories(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,11 @@
#include <thread>

#if ENABLE_RUST_AMQP
#include "thread_context.hpp"
#include "runtime_context.hpp"
#endif

namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _detail {

#if ENABLE_RUST_AMQP
extern thread_local RustThreadContext RustThreadContextInstance;
#endif

#if ENABLE_UAMQP
/**
* uAMQP and azure-c-shared-util require that the platform_init and platform_uninit
Expand Down Expand Up @@ -55,6 +51,8 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
std::thread m_pollingThread;
std::atomic<bool> m_activelyPolling;
bool m_stopped{false};
#elif ENABLE_RUST_AMQP
RustRuntimeContext m_runtimeContext;
#endif

public:
Expand All @@ -70,6 +68,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
void AddPollable(std::shared_ptr<Pollable> pollable);

void RemovePollable(std::shared_ptr<Pollable> pollable);
#elif ENABLE_RUST_AMQP
Azure::Core::Amqp::_detail::RustRuntimeContext* GetRuntimeContext()
{
return m_runtimeContext.GetRuntimeContext();
}
#endif

void AssertIdle()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
// Licensed under the MIT License.

#pragma once

#if ENABLE_RUST_AMQP
#include "../rust_amqp/rust_wrapper/rust_amqp_wrapper.h"
#include "../src/amqp/private/unique_handle.hpp"
#include "rust_amqp_wrapper.h"

#include <azure/core/azure_assert.hpp>

Expand All @@ -31,16 +30,23 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace

using UniqueRustRuntimeContext = Azure::Core::Amqp::_detail::UniqueHandleHelper<
Azure::Core::Amqp::_detail::RustRuntimeContext>::type;
class RustThreadContext final {

/**
* @brief Represents the an implementation of the rust multithreaded runtime.
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
*
* Needed to implement blocking Rust API calls.
*/
class RustRuntimeContext final {

UniqueRustRuntimeContext m_runtimeContext;

public:
RustThreadContext()
: m_runtimeContext(Azure::Core::Amqp::_detail::RustInterop::runtime_context_new())
RustRuntimeContext()
: m_runtimeContext{Azure::Core::Amqp::_detail::RustInterop::runtime_context_new()}
{
}
Azure::Core::Amqp::_detail::RustRuntimeContext* GetRuntimeContext() const noexcept

Azure::Core::Amqp::_detail::RustRuntimeContext* GetRuntimeContext()
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
{
return m_runtimeContext.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
namespace Azure { namespace Core { namespace Amqp { namespace _detail {
class LinkImpl;

#if ENABLE_UAMQP
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
enum class LinkState
{
Invalid,
Expand All @@ -48,6 +49,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
};

std::ostream& operator<<(std::ostream& stream, LinkState linkState);
#endif

enum class LinkTransferResult
{
Expand All @@ -65,11 +67,11 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Invalid
};

#if defined(_azure_TESTING_BUILD)

// Note that this entire class is a test hook to enable testing of the Link family of apis. It is
// not exposed to customers because there are no customer scenarios for it.
#if defined(_azure_TESTING_BUILD)
class Link;
#if ENABLE_UAMQP
class LinkImplEvents;
class LinkImplEventsImpl;

Expand All @@ -89,6 +91,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
virtual void OnLinkFlowOn(Link const& link) = 0;
virtual ~LinkEvents() = default;
};
#endif
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved

class Link final {
public:
Expand Down Expand Up @@ -165,7 +168,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
friend class LinkImpl;
friend class LinkImplEventsImpl;
Link(std::shared_ptr<LinkImpl> impl) : m_impl{impl} {}
#if ENABLE_UAMQP
std::shared_ptr<LinkImplEvents> m_implEvents;
#endif
std::shared_ptr<LinkImpl> m_impl;
};
#endif // _azure_TESTING_BUILD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
}}}} // namespace Azure::Core::Amqp::_detail

namespace Azure { namespace Core { namespace Amqp { namespace _internal {
#if ENABLE_UAMQP
enum class MessageSendStatus
{
Invalid,
Expand All @@ -51,8 +52,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Error,
};
std::ostream& operator<<(std::ostream& stream, MessageSenderState state);

#endif
class MessageSender;
#if ENABLE_UAMQP
class MessageSenderEvents {
protected:
~MessageSenderEvents() = default;
Expand All @@ -68,6 +70,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
Models::_internal::AmqpError const& error)
= 0;
};
#endif

struct MessageSenderOptions final
{
Expand Down Expand Up @@ -128,9 +131,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {

class MessageSender final {
public:
#if ENABLE_UAMQP
using MessageSendCompleteCallback
= std::function<void(MessageSendStatus sendResult, Models::AmqpValue const& deliveryState)>;

#endif
~MessageSender() noexcept;

MessageSender(MessageSender const&) = default;
Expand Down Expand Up @@ -166,6 +170,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
std::uint64_t GetMaxMessageSize() const;

#if ENABLE_UAMQP
/** @brief Send a message synchronously to the target of the message sender.
*
* @param message The message to send.
Expand All @@ -176,7 +181,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
_azure_NODISCARD std::tuple<MessageSendStatus, Models::_internal::AmqpError> Send(
Models::AmqpMessage const& message,
Context const& context = {});
#elif ENABLE_RUST_AMQP
_azure_NODISCARD Models::_internal::AmqpError Send(
Models::AmqpMessage const& message,
Context const& context = {});

#endif
LarryOsterman marked this conversation as resolved.
Show resolved Hide resolved
private:
// Half-open the message sender (does not block waiting on the Open to complete).
_azure_NODISCARD Models::_internal::AmqpError HalfOpen(Context const& context = {});
Expand Down
Loading
Loading