Skip to content

Commit

Permalink
Merge branch 'dev/dynamic_zero_copy' into 'main'
Browse files Browse the repository at this point in the history
Adding an option for zero copy dynamic updates

See merge request lightspeedrtx/bridge-remix-nv!16
  • Loading branch information
yomoma committed Aug 8, 2023
2 parents db762c9 + ee5a98f commit 224ce21
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 29 deletions.
10 changes: 10 additions & 0 deletions src/client/client_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@ namespace ClientOptions {
bridge_util::Config::getOption<bool>("client.enableDpiAwareness", true);
return enableDpiAwareness;
}

// If set, the space for data for dynamic buffer updates will be preallocated on data channel
// and redundant copy will be avioded. However, because D3D applications are not obliged
// to write the entire locked region this optimization is NOT considered safe and may
// not always work.
inline bool getOptimizedDynamicLock() {
static const bool optimizedDynamicLock =
bridge_util::Config::getOption<bool>("client.optimizedDynamicLock", false);
return optimizedDynamicLock;
}
}
64 changes: 55 additions & 9 deletions src/client/lockable_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,20 @@
template <typename T>
class LockableBuffer: public Direct3DResource9_LSS<T> {
static constexpr bool bIsVertexBuffer = std::is_same_v<IDirect3DVertexBuffer9, T>;
static constexpr Commands::D3D9Command LockCmd = bIsVertexBuffer ?
Commands::IDirect3DVertexBuffer9_Lock : Commands::IDirect3DIndexBuffer9_Lock;
static constexpr Commands::D3D9Command UnlockCmd = bIsVertexBuffer ?
Commands::IDirect3DVertexBuffer9_Unlock : Commands::IDirect3DIndexBuffer9_Unlock;
static constexpr size_t kSIMDAlign = 16;
static constexpr uint32_t kLockCheckValue = 0xbaadf00d;
using DescType = std::conditional_t<bIsVertexBuffer, D3DVERTEXBUFFER_DESC, D3DINDEXBUFFER_DESC>;

struct LockInfo {
UINT offsetToLock;
UINT sizeToLock;
void* pbData;
DWORD flags;
uint32_t* checkPtr;
SharedHeap::AllocId bufferId = SharedHeap::kInvalidId;
SharedHeap::AllocId discardedBufferId = SharedHeap::kInvalidId;
};
Expand Down Expand Up @@ -78,19 +83,18 @@ class LockableBuffer: public Direct3DResource9_LSS<T> {
protected:
const DescType m_desc;
SharedHeap::AllocId m_bufferId = SharedHeap::kInvalidId;
bool m_sendWhole = false;
const bool m_sendWhole = false;
const bool m_optimizedLock = false;

LockableBuffer(T* const pD3dBuf, BaseDirect3DDevice9Ex_LSS* const pDevice, const DescType& desc)
: Direct3DResource9_LSS<T>(pD3dBuf, pDevice)
, m_desc(desc)
, m_bUseSharedHeap(getSharedHeapPolicy(m_desc)) {
, m_bUseSharedHeap(getSharedHeapPolicy(m_desc))
, m_sendWhole((desc.Usage& D3DUSAGE_DYNAMIC) == 0 && GlobalOptions::getAlwaysCopyEntireStaticBuffer())
, m_optimizedLock((desc.Usage& D3DUSAGE_DYNAMIC) != 0 && ClientOptions::getOptimizedDynamicLock()) {
if (!m_bUseSharedHeap) {
initShadowMem();
}

if ((m_desc.Usage & D3DUSAGE_DYNAMIC) == 0 && GlobalOptions::getAlwaysCopyEntireStaticBuffer()) {
m_sendWhole = true;
}
}

~LockableBuffer() {
Expand All @@ -111,6 +115,9 @@ class LockableBuffer: public Direct3DResource9_LSS<T> {
if (ppbData == nullptr) {
return D3DERR_INVALIDCALL;
}

uint32_t* checkPtr = nullptr;

if (m_bUseSharedHeap) {
SharedHeap::AllocId discardedBufferId = SharedHeap::kInvalidId;
const bool bDiscard = (flags & D3DLOCK_DISCARD) != 0;
Expand All @@ -134,10 +141,30 @@ class LockableBuffer: public Direct3DResource9_LSS<T> {
}
m_bufferId = nextBufId;
*ppbData = SharedHeap::getBuf(m_bufferId) + offset;
m_lockInfos.push({ offset, size, nullptr, flags, m_bufferId, discardedBufferId });
m_lockInfos.push({ offset, size, nullptr, flags, checkPtr, m_bufferId, discardedBufferId });
} else {
*ppbData = m_shadow.get() + offset;
m_lockInfos.push({ offset, size, *ppbData, flags });

if (m_optimizedLock) {
const size_t dataSize = (size == 0) ? m_desc.Size : size;

// Send the buffer lock parameters and handle
ClientMessage c(LockCmd, getId(), 0);
// Reserve blob in data stream
uintptr_t blobAddr = reinterpret_cast<uintptr_t>(
c.begin_data_blob(dataSize + sizeof(kLockCheckValue) + kSIMDAlign));
c.end_data_blob();

// Push buffer check value in front. If front gets corrupted the entire region deemed invalid.
checkPtr = reinterpret_cast<uint32_t*>(blobAddr);
blobAddr += sizeof(kLockCheckValue);
*checkPtr = kLockCheckValue;

// Align data blob for SIMD ops
blobAddr = align<uintptr_t>(blobAddr, kSIMDAlign);
*ppbData = reinterpret_cast<void*>(blobAddr);
}
m_lockInfos.push({ offset, size, *ppbData, flags, checkPtr });
}
// Store locked buffer pointer locally so we can copy the data on unlock
return S_OK;
Expand All @@ -163,12 +190,31 @@ class LockableBuffer: public Direct3DResource9_LSS<T> {
// If this is a read only access then don't bother sending
if ((lockInfo.flags & D3DLOCK_READONLY) == 0) {
{
Commands::Flags cmdFlags = 0;

if (m_bUseSharedHeap) {
cmdFlags = Commands::FlagBits::DataInSharedHeap;
} else if (m_optimizedLock) {
cmdFlags = Commands::FlagBits::DataIsReserved;

if (kLockCheckValue != *lockInfo.checkPtr) {
Logger::err("Fatal: reserved buffer region has been corrupted! "
"Application will now exit.");
throw;
}
}

// Send the buffer lock parameters and handle
ClientMessage c(UnlockCmd, getId(), m_bUseSharedHeap ? Commands::FlagBits::DataInSharedHeap : 0);
ClientMessage c(UnlockCmd, getId(), cmdFlags);
c.send_many(offset, size, lockInfo.flags);

if (m_bUseSharedHeap) {
c.send_data(lockInfo.bufferId);
} else if (m_optimizedLock) {
// Now send data offset in the channel
const uint32_t dataOffset = static_cast<uint32_t*>(ptr) -
DeviceBridge::getWriterChannel().get_data_ptr();
c.send_many(dataOffset);
} else {
// Now send the buffer bytes
c.send_data(size, ptr);
Expand Down
16 changes: 14 additions & 2 deletions src/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2227,6 +2227,9 @@ void ProcessDeviceCommandQueue() {
case IDirect3DVertexBuffer9_Lock:
{
// This is a no-op right now because we're doing all the logic on Unlock
GET_HND(pHandle);
void* data = nullptr;
DeviceBridge::get_data(&data);
break;
}
case IDirect3DVertexBuffer9_Unlock:
Expand All @@ -2244,7 +2247,10 @@ void ProcessDeviceCommandQueue() {

// Copy the data over
void* data = nullptr;
if (Commands::IsDataInSharedHeap(rpcHeader.flags)) {
if (Commands::IsDataReserved(rpcHeader.flags)) {
PULL_D(DataOffset);
data = DeviceBridge::Bridge::getReaderChannel().get_data_ptr() + DataOffset;
} else if (Commands::IsDataInSharedHeap(rpcHeader.flags)) {
PULL_U(allocId);
data = SharedHeap::getBuf(allocId) + OffsetToLock;
} else {
Expand Down Expand Up @@ -2309,6 +2315,9 @@ void ProcessDeviceCommandQueue() {
case IDirect3DIndexBuffer9_Lock:
{
// This is a no-op right now because we're doing all the logic on Unlock
GET_HND(pHandle);
void* data = nullptr;
DeviceBridge::get_data(&data);
break;
}
case IDirect3DIndexBuffer9_Unlock:
Expand All @@ -2326,7 +2335,10 @@ void ProcessDeviceCommandQueue() {

// Copy the data over
void* data = nullptr;
if (Commands::IsDataInSharedHeap(rpcHeader.flags)) {
if (Commands::IsDataReserved(rpcHeader.flags)) {
PULL_D(DataOffset);
data = DeviceBridge::Bridge::getReaderChannel().get_data_ptr() + DataOffset;
} else if (Commands::IsDataInSharedHeap(rpcHeader.flags)) {
PULL_U(allocId);
data = SharedHeap::getBuf(allocId) + OffsetToLock;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/util/util_bridgecommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class Bridge {
inline void send_data(const DataT size, const void* obj) {
ZoneScoped;
if (gbBridgeRunning) {
size_t memUsed = align<size_t>(size, sizeof(DataT)) / sizeof(DataT);
size_t memUsed = align<size_t>(size, sizeof(DataT)) / sizeof(DataT) + 1;
syncDataQueue(memUsed, true);
const auto result = s_pWriterChannel->data->push(size, obj);
if (RESULT_FAILURE(result)) {
Expand Down Expand Up @@ -263,7 +263,7 @@ class Bridge {
ZoneScoped;
uint8_t* blobPacketPtr = nullptr;
if (gbBridgeRunning) {
size_t memUsed = align<size_t>(size, sizeof(DataT)) / sizeof(DataT);
size_t memUsed = align<size_t>(size, sizeof(DataT)) / sizeof(DataT) + 1;
syncDataQueue(memUsed, true);
const auto result = s_pWriterChannel->data->begin_blob_push(size, blobPacketPtr);
if (RESULT_FAILURE(result)) {
Expand Down
1 change: 1 addition & 0 deletions src/util/util_circularbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace bridge_util {
public:
using CircularQueue::push;
using CircularQueue::pull;
using BaseType = T;

CircularBuffer(const std::string& name, Accessor access, void* pMemory,
const size_t memSize, const size_t queueSize):
Expand Down
4 changes: 4 additions & 0 deletions src/util/util_circularqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ namespace bridge_util {
return batchSize;
}

T* data() const {
return m_data;
}

private:
template<bool BatchInProgress>
Result pushImpl(const T obj) {
Expand Down
8 changes: 7 additions & 1 deletion src/util/util_commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -842,13 +842,19 @@ namespace Commands {
typedef uint16_t Flags;

enum FlagBits: Flags {
DataInSharedHeap = 0b00000001, // Any data the command operates with is stored in shared heap
DataInSharedHeap = 0b00000001, // Any data a command operates with is stored in shared heap
// and only allocation id(s) is transferred on the queue
DataIsReserved = 0b00000010, // Data was already reserved in data queue and only its
// offset is transferred
};

inline bool IsDataInSharedHeap(Flags flags) {
return (flags & FlagBits::DataInSharedHeap) != 0;
}

inline bool IsDataReserved(Flags flags) {
return (flags & FlagBits::DataIsReserved) != 0;
}
}

struct Header {
Expand Down
32 changes: 17 additions & 15 deletions src/util/util_ipcchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,24 @@ class IpcChannel {
const size_t memSize,
const size_t cmdQueueSize,
const size_t dataQueueSize)
: m_extraMemForSync(sizeof(*serverDataPos) +
sizeof(*clientDataExpectedPos) +
sizeof(*serverResetPosRequired))
, sharedMem(new bridge_util::SharedMemory(name + "Channel", memSize + m_extraMemForSync))
: sharedMem(new bridge_util::SharedMemory(name + "Channel", memSize + kReservedSpace))
, m_cmdMemSize(sizeof(Header)* cmdQueueSize + CommandQueue::getExtraMemoryRequirements())
, m_dataMemSize(memSize - m_cmdMemSize)
, serverDataPos(static_cast<int64_t*>(sharedMem->data()))
, clientDataExpectedPos(static_cast<int64_t*>(serverDataPos + sizeof(*serverDataPos)))
, serverResetPosRequired(reinterpret_cast<bool*>(clientDataExpectedPos +
sizeof(*clientDataExpectedPos)))
, clientDataExpectedPos(serverDataPos + 1)
, serverResetPosRequired(reinterpret_cast<bool*>(clientDataExpectedPos + 1))
// Offsetting shared memory to account for 3 pointers used above
, commands(new CommandQueue(name + "Command",
reinterpret_cast<void*>(
reinterpret_cast<uintptr_t>(sharedMem->data()) +
m_extraMemForSync),
kReservedSpace),
m_cmdMemSize,
cmdQueueSize))
, data(new bridge_util::DataQueue(name + "Data",
Accessor,
reinterpret_cast<void*>(
reinterpret_cast<uintptr_t>(sharedMem->data()) +
m_extraMemForSync +
m_cmdMemSize),
kReservedSpace + m_cmdMemSize),
m_dataMemSize,
dataQueueSize))
, dataSemaphore(new bridge_util::NamedSemaphore(name + "Semaphore", 0, 1))
Expand All @@ -89,18 +84,25 @@ class IpcChannel {
return data->get_pos();
}

const size_t m_extraMemForSync; // Extra storage needed for data queue synchronization
bridge_util::DataQueue::BaseType* get_data_ptr() const {
return data->data();
}

bridge_util::SharedMemory* const sharedMem;
const size_t m_cmdMemSize;
const size_t m_dataMemSize;
int64_t* serverDataPos;
int64_t* clientDataExpectedPos;
bool* serverResetPosRequired;
int64_t* serverDataPos;
int64_t* clientDataExpectedPos;
bool* serverResetPosRequired;
CommandQueue* const commands;
bridge_util::DataQueue* const data;
bridge_util::NamedSemaphore* const dataSemaphore;
std::atomic<bool>* const pbCmdInProgress;
mutable std::mutex m_mutex;
mutable std::mutex m_mutex;

// Extra storage needed for data queue synchronization params
static constexpr size_t kReservedSpace = align<size_t>(sizeof(*serverDataPos) +
sizeof(*clientDataExpectedPos) + sizeof(*serverResetPosRequired), 64);
};
using WriterChannel = IpcChannel<bridge_util::Accessor::Writer>;
using ReaderChannel = IpcChannel<bridge_util::Accessor::Reader>;

0 comments on commit 224ce21

Please sign in to comment.