Skip to content

Commit

Permalink
Add connection support; restructured tests to fail on RUST AMQP rathe…
Browse files Browse the repository at this point in the history
…r than attempting to run; removed some uAMQP-only features (#5986)

* Checkpoint of connection logic

* Started implementing Rust based Connection by pulling out uAMQP artifacts

* Implemented AMQP Connection in Rust; started API surface refactoring for Rust APIs; Refactored tests to remove some uAMQP only elements.

* Don't leak runtime context on calls

* export UUID from AMQP

* clang-format
  • Loading branch information
LarryOsterman committed Sep 18, 2024
1 parent 3fa6db9 commit f72edbe
Show file tree
Hide file tree
Showing 75 changed files with 2,158 additions and 630 deletions.
7 changes: 2 additions & 5 deletions sdk/core/azure-core-amqp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ set(AZURE_CORE_AMQP_SOURCE
src/private/package_version.hpp
)


if (USE_UAMQP)
add_library(azure-core-amqp ${AZURE_CORE_AMQP_SOURCE} ${AZURE_CORE_AMQP_HEADER} $<TARGET_OBJECTS:uamqp>)
elseif(USE_RUST_AMQP)
Expand All @@ -193,8 +192,10 @@ target_include_directories(
add_library(Azure::azure-core-amqp ALIAS azure-core-amqp)

if(USE_UAMQP)
# target_compile_definitions(azure-core-amqp PRIVATE ENABLE_UAMQP)
add_compile_definitions(ENABLE_UAMQP)
elseif(USE_RUST_AMQP)
# target_compile_definitions(azure-core-amqp PRIVATE ENABLE_RUST_AMQP)
add_compile_definitions(ENABLE_RUST_AMQP)
endif()

Expand All @@ -220,10 +221,6 @@ elseif(USE_RUST_AMQP)
if (MSVC)
target_link_libraries(azure-core-amqp PRIVATE Secur32 ncrypt)
endif()
else()
target_link_libraries(azure-core-amqp
PRIVATE
PUBLIC Azure::azure-core)
endif()

get_az_version("${CMAKE_CURRENT_SOURCE_DIR}/src/private/package_version.hpp")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,20 @@
#include <mutex>
#include <thread>

#if ENABLE_RUST_AMQP
#include "thread_context.hpp"
#endif

namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _detail {

#if ENABLE_RUST_AMQP
extern thread_local RustThreadContext RustThreadContextInstance;
#endif

#if ENABLE_UAMQP
/**
* uAMQP and azure-c-shared-util require that the platform_init and platform_uninit functions be
* called before using the various API functions.
* uAMQP and azure-c-shared-util require that the platform_init and platform_uninit
* functions be called before using the various API functions.
*
* The GlobalState class maintains a singleton static local variable using [static local
* variables](https://en.cppreference.com/w/cpp/language/storage_duration#Static_local_variables),
Expand All @@ -34,15 +43,19 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
virtual void Poll() = 0;
virtual ~Pollable() = default;
};

#endif
class GlobalStateHolder final {
GlobalStateHolder();
~GlobalStateHolder();

#if ENABLE_UAMQP
std::list<std::shared_ptr<Pollable>> m_pollables;
std::mutex m_pollablesMutex;
std::thread m_pollingThread;
std::atomic<bool> m_activelyPolling;
bool m_stopped{false};
#endif

public:
static GlobalStateHolder* GlobalStateInstance();
Expand All @@ -53,18 +66,22 @@ namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace
GlobalStateHolder(GlobalStateHolder&&) = delete;
GlobalStateHolder& operator=(GlobalStateHolder&&) = delete;

#if ENABLE_UAMQP
void AddPollable(std::shared_ptr<Pollable> pollable);

void RemovePollable(std::shared_ptr<Pollable> pollable);
#endif

void AssertIdle()
{
#if ENABLE_UAMQP
std::lock_guard<std::mutex> lock(m_pollablesMutex);
AZURE_ASSERT(m_pollables.empty());
if (!m_pollables.empty())
{
Azure::Core::_internal::AzureNoReturnPath("Global state is not idle.");
}
#endif
}
};
}}}}} // namespace Azure::Core::Amqp::Common::_detail
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#pragma once

#if ENABLE_RUST_AMQP
#include "../rust_amqp/rust_wrapper/rust_amqp_wrapper.h"
#include "../src/amqp/private/unique_handle.hpp"

#include <azure/core/azure_assert.hpp>

#include <atomic>
#include <list>
#include <memory>
#include <mutex>
#include <thread>

namespace Azure { namespace Core { namespace Amqp { namespace _detail {

using RustRuntimeContext = RustInterop::RuntimeContext;

template <> struct UniqueHandleHelper<RustRuntimeContext>
{
static void FreeRuntimeContext(RustRuntimeContext* obj);

using type = Core::_internal::BasicUniqueHandle<RustRuntimeContext, FreeRuntimeContext>;
};
}}}} // namespace Azure::Core::Amqp::_detail

namespace Azure { namespace Core { namespace Amqp { namespace Common { namespace _detail {

using UniqueRustRuntimeContext = Azure::Core::Amqp::_detail::UniqueHandleHelper<
Azure::Core::Amqp::_detail::RustRuntimeContext>::type;
class RustThreadContext final {

UniqueRustRuntimeContext m_runtimeContext;

public:
RustThreadContext()
: m_runtimeContext(Azure::Core::Amqp::_detail::RustInterop::runtime_context_new())
{
}
Azure::Core::Amqp::_detail::RustRuntimeContext* GetRuntimeContext() const noexcept
{
return m_runtimeContext.get();
}
};

}}}}} // namespace Azure::Core::Amqp::Common::_detail
#endif // ENABLE_RUST_AMQP
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
#if defined(_azure_TESTING_BUILD)
// Define the test classes dependant on this class here.
namespace Azure { namespace Core { namespace Amqp { namespace Tests {
#if ENABLE_UAMQP
namespace MessageTests {
class AmqpServerMock;
class MessageListenerEvents;
} // namespace MessageTests

#endif
class TestConnections_ConnectionAttributes_Test;
class TestConnections_ConnectionOpenClose_Test;
class TestConnections_ConnectionListenClose_Test;
Expand Down Expand Up @@ -62,6 +63,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
/** @brief The default port to use to connect to an AMQP server using TLS. */
constexpr uint16_t AmqpTlsPort = 5671;

#if ENABLE_UAMQP
/**
* @brief The state of the connection.
*
Expand Down Expand Up @@ -205,6 +207,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
virtual bool OnNewEndpoint(Connection const& connection, Endpoint& endpoint) = 0;
};
#endif

/** @brief Options used to create a connection. */
struct ConnectionOptions final
Expand Down Expand Up @@ -262,6 +265,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
* - desired-capabilities
*
*/
#if ENABLE_RUST_AMQP
std::vector<std::string> OutgoingLocales;
std::vector<std::string> IncomingLocales;
std::vector<std::string> OfferedCapabilities;
std::vector<std::string> DesiredCapabilities;
uint32_t BufferSize;
#endif

/** @brief Defines the ID of the container for this connection. If empty, a unique 128 bit value
* will be used.
Expand All @@ -275,6 +285,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {

class Connection final {
public:
#if ENABLE_UAMQP
/** @brief Construct a new AMQP Connection.
*
* @param hostName The name of the host to connect to.
Expand All @@ -288,7 +299,21 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
std::shared_ptr<Credentials::TokenCredential> credential,
ConnectionOptions const& options,
ConnectionEvents* eventHandler = nullptr);
#else
/** @brief Construct a new AMQP Connection.
*
* @param hostName The name of the host to connect to.
* @param options The options to use when creating the connection.
*
* @remarks The requestUri must be a valid AMQP URI.
*/
Connection(
std::string const& hostName,
std::shared_ptr<Credentials::TokenCredential> credential,
ConnectionOptions const& options);
#endif

#if ENABLE_UAMQP
/** @brief Construct a new AMQP Connection.
*
* @param transport The transport to use for the connection.
Expand All @@ -303,10 +328,12 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
ConnectionOptions const& options,
ConnectionEvents* eventHandler,
ConnectionEndpointEvents* endpointEvents);
#endif

/** @brief Destroy an AMQP connection */
~Connection();

#if ENABLE_UAMQP
/** @brief Create a session on the current Connection object.
*
* An AMQP Session provides a context for sending and receiving messages. A single connection
Expand All @@ -318,7 +345,20 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
Session CreateSession(SessionOptions const& options = {}, SessionEvents* eventHandler = nullptr)
const;
#else
/** @brief Create a session on the current Connection object.
*
* An AMQP Session provides a context for sending and receiving messages. A single connection
* may have multiple independent sessions active simultaneously up to the negotiated maximum
* channel count.
*
* @param options The options to use when creating the session.
*/
Session CreateSession(SessionOptions const& options = {}) const;

#endif

#if ENABLE_UAMQP
/** @brief Construct a new session associated with the specified connection over the specified
* endpoint.
*
Expand All @@ -335,8 +375,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
SessionEvents* eventHandler = nullptr) const;

void Poll();

#endif
#if ENABLE_RUST_AMQP
private:
#endif
/** @brief Opens the current connection.
*
* @remarks In general, a customer will not need to call this method, instead the connection
Expand All @@ -349,6 +391,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
void Open();

#if ENABLE_UAMQP
/** @brief Starts listening for incoming connections.
*
* @remarks This method should only be called on a connection that was created with a transport
Expand All @@ -362,6 +405,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
* the connection, BEFORE destroying it.
*/
void Listen();
#endif

/** @brief Closes the current connection.
*
Expand All @@ -380,7 +424,9 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
std::string const& condition = {},
std::string const& description = {},
Models::AmqpValue info = {});

#if ENABLE_RUST_AMQP
private:
#endif
/** @brief Gets host configured by the connection.
*
* @return The host used in the connection.
Expand All @@ -399,11 +445,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
uint32_t GetMaxFrameSize() const;

#if ENABLE_UAMQP
/** @brief Gets the max frame size configured for the remote node.
*
* @return The configured maximum frame size for the remote node.
*/
uint32_t GetRemoteMaxFrameSize() const;
#endif

/** @brief Gets the max channel count configured for the connection.
*
Expand All @@ -423,6 +471,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
Models::AmqpMap GetProperties() const;

#if ENABLE_UAMQP
/** @brief Sets the percentage of the idle timeout before an empty frame is sent to the remote
* node.
*
Expand All @@ -440,6 +489,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
* connection is opened.
*/
void SetIdleEmptyFrameSendPercentage(double idleTimeoutEmptyFrameSendRatio);
#endif

private:
/** @brief Create an AMQP Connection from an existing connection implementation.
Expand All @@ -453,8 +503,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
std::shared_ptr<_detail::ConnectionImpl> m_impl;
friend class _detail::ConnectionFactory;
#if _azure_TESTING_BUILD
#if ENABLE_UAMQP
friend class Azure::Core::Amqp::Tests::MessageTests::AmqpServerMock;
friend class Azure::Core::Amqp::Tests::MessageTests::MessageListenerEvents;
#endif
friend class Azure::Core::Amqp::Tests::TestSocketListenerEvents;
friend class Azure::Core::Amqp::Tests::LinkSocketListenerEvents;
friend class Azure::Core::Amqp::Tests::TestConnections_ConnectionAttributes_Test;
Expand All @@ -468,8 +520,10 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
friend class Azure::Core::Amqp::Tests::TestMessages_SenderOpenClose_Test;

#endif // _azure_TESTING_BUILD
#if ENABLE_UAMQP
#if SAMPLES_BUILD
friend int LocalServerSample::LocalServerSampleMain();
#endif // SAMPLES_BUILD
#endif
};
}}}} // namespace Azure::Core::Amqp::_internal
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,14 @@ namespace Azure { namespace Core { namespace Amqp { namespace _internal {
*/
std::string GetAudience();

#if ENABLE_UAMQP
/** @brief Return a SASL transport configured for SASL Anonymous which will be used to
* communicate with the AMQP service.
*
* @return A SASL transport configured for SASL Anonymous.
*/
virtual std::shared_ptr<Network::_internal::Transport> GetTransport() const;
#endif

std::string const& GetEndpoint() const { return m_connectionParser.GetEndpoint(); }
std::string const& GetSharedAccessKeyName() const
Expand Down
10 changes: 8 additions & 2 deletions sdk/core/azure-core-amqp/inc/azure/core/amqp/internal/link.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,13 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
std::string const& name,
_internal::SessionRole role,
Models::_internal::MessageSource const& source,
Models::_internal::MessageTarget const& target,
LinkEvents* events = nullptr);
Models::_internal::MessageTarget const& target
#if ENABLE_UAMQP
,
LinkEvents* events = nullptr
#endif
);
#if ENABLE_UAMQP
Link(
_internal::Session const& session,
_internal::LinkEndpoint& linkEndpoint,
Expand All @@ -107,6 +112,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace _detail {
Models::_internal::MessageSource const& source,
Models::_internal::MessageTarget const& target,
LinkEvents* events = nullptr);
#endif
~Link() noexcept;

Link(Link const&) = default;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#if ENABLE_UAMQP
#include "transport.hpp"

#include <exception>
Expand All @@ -28,3 +29,5 @@ namespace Azure { namespace Core { namespace Amqp { namespace Network { namespac
};

}}}}} // namespace Azure::Core::Amqp::Network::_internal

#endif
Loading

0 comments on commit f72edbe

Please sign in to comment.