-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
UID2-4391 Major refactor to improve performance and maintainability
- Removed redundant separation of IO and Worker threads keeping a single pool IO worker threads - Removed memory arena and custom memory management for IO buffers - Removed half-baked support for multiple inflight output buffers - Increased IO buffer size to 1024 bytes - Simplified and cleaned up a lot of code - Removed support for socks proxy in config as this was not an implemented feature - Simplified socket implementation, especially around how sockets get closed in case of errors - Make proxy a bit fairer by round-robining across all connections with a single IO operation at a time, instead of draining each socket in order - Make support for handling async connections more explicit - Add unit test coverage, most importantly - for the sockets IO logic - Improved/corrected README Manually verified through various tests as well as using tcptunnelchecker.
- Loading branch information
1 parent
92ae49d
commit c1784a6
Showing
31 changed files
with
1,332 additions
and
1,278 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,4 @@ | |
cmake_minimum_required (VERSION 3.8) | ||
|
||
add_subdirectory (src) | ||
|
||
enable_testing () | ||
add_subdirectory (test) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,222 +1,63 @@ | ||
#pragma once | ||
|
||
#include "logger.h" | ||
|
||
#include <algorithm> | ||
#include <array> | ||
#include <cassert> | ||
#include <cstdint> | ||
#include <iostream> | ||
#include <list> | ||
#include <memory> | ||
#include <vector> | ||
|
||
#include <unistd.h> | ||
|
||
namespace vsockio | ||
{ | ||
struct MemoryBlock | ||
{ | ||
MemoryBlock(int size, class MemoryArena* region) | ||
: _startPtr(std::make_unique<uint8_t[]>(size)), _region(region) {} | ||
|
||
uint8_t* offset(int x) const | ||
{ | ||
return _startPtr.get() + x; | ||
} | ||
|
||
std::unique_ptr<uint8_t[]> _startPtr; | ||
class MemoryArena* _region; | ||
}; | ||
|
||
struct MemoryArena | ||
{ | ||
std::vector<MemoryBlock> _blocks; | ||
std::list<MemoryBlock*> _handles; | ||
uint32_t _blockSizeInBytes = 0; | ||
bool _initialized = false; | ||
|
||
MemoryArena() = default; | ||
|
||
void init(int blockSize, int numBlocks) | ||
{ | ||
if (_initialized) throw; | ||
|
||
Logger::instance->Log(Logger::INFO, "Thread-local memory arena init: blockSize=", blockSize, ", numBlocks=", numBlocks); | ||
|
||
_blockSizeInBytes = blockSize; | ||
|
||
for (int i = 0; i < numBlocks; i++) | ||
{ | ||
_blocks.emplace_back(blockSize, this); | ||
} | ||
|
||
for (int i = 0; i < numBlocks; i++) | ||
{ | ||
_handles.push_back(&_blocks[i]); | ||
} | ||
|
||
_initialized = true; | ||
} | ||
|
||
MemoryBlock* get() | ||
{ | ||
if (!_handles.empty()) | ||
{ | ||
auto mb = _handles.front(); | ||
_handles.pop_front(); | ||
return mb; | ||
} | ||
else | ||
{ | ||
return new MemoryBlock(_blockSizeInBytes, nullptr); | ||
} | ||
} | ||
|
||
void put(MemoryBlock* mb) | ||
{ | ||
if (mb->_region == this) | ||
{ | ||
_handles.push_front(mb); | ||
} | ||
else if (mb->_region == nullptr) | ||
{ | ||
delete mb; | ||
} | ||
else | ||
{ | ||
throw; | ||
} | ||
} | ||
|
||
int blockSize() const { return _blockSizeInBytes; } | ||
}; | ||
|
||
struct Buffer | ||
{ | ||
constexpr static int MAX_PAGES = 20; | ||
int _pageCount; | ||
int _cursor; | ||
int _size; | ||
int _pageSize; | ||
MemoryBlock* _pages[MAX_PAGES]; | ||
MemoryArena* _arena; | ||
|
||
explicit Buffer(MemoryArena* arena) : _arena(arena), _pageCount{ 0 }, _cursor{ 0 }, _size{ 0 }, _pageSize(arena->blockSize()) {} | ||
|
||
Buffer(Buffer&& b) : _arena(b._arena), _pageCount(b._pageCount), _cursor(b._cursor), _size(b._size), _pageSize(b._arena->blockSize()) | ||
{ | ||
for (int i = 0; i < _pageCount; i++) | ||
{ | ||
_pages[i] = b._pages[i]; | ||
} | ||
b._pageCount = 0; // prevent _pages being destructed by old object | ||
} | ||
static constexpr int BUFFER_SIZE = 1024; | ||
std::array<std::uint8_t, BUFFER_SIZE> _data; | ||
std::uint8_t* _head = _data.data(); | ||
std::uint8_t* _tail = _data.data(); | ||
|
||
Buffer(const Buffer&) = delete; | ||
Buffer& operator=(const Buffer&) = delete; | ||
std::uint8_t* head() const | ||
{ | ||
return _head; | ||
} | ||
|
||
~Buffer() | ||
std::uint8_t* tail() const | ||
{ | ||
for (int i = 0; i < _pageCount; i++) | ||
{ | ||
_arena->put(_pages[i]); | ||
} | ||
return _tail; | ||
} | ||
|
||
uint8_t* tail() const | ||
{ | ||
return offset(_size); | ||
} | ||
bool hasRemainingCapacity() const | ||
{ | ||
return _tail < _data.end(); | ||
} | ||
|
||
int remainingCapacity() const | ||
{ | ||
return capacity() - _size; | ||
return _data.end() - _tail; | ||
} | ||
|
||
void produce(int size) | ||
{ | ||
_size += size; | ||
} | ||
int remainingDataSize() const | ||
{ | ||
return _tail - _head; | ||
} | ||
|
||
bool ensureCapacity() | ||
{ | ||
return remainingCapacity() > 0 || tryNewPage(); | ||
} | ||
|
||
uint8_t* head() const | ||
{ | ||
return offset(_cursor); | ||
} | ||
|
||
int headLimit() const | ||
void produce(int size) | ||
{ | ||
return std::min(pageLimit(_cursor), _size - _cursor); | ||
assert(remainingCapacity() >= size); | ||
_tail += size; | ||
} | ||
|
||
void consume(int size) | ||
{ | ||
_cursor += size; | ||
} | ||
|
||
bool tryNewPage() | ||
{ | ||
if (_pageCount >= MAX_PAGES) return false; | ||
_pages[_pageCount++] = _arena->get(); | ||
return true; | ||
} | ||
|
||
uint8_t* offset(int x) const | ||
{ | ||
return _pages[x / _pageSize]->offset(x % _pageSize); | ||
} | ||
|
||
int capacity() const | ||
{ | ||
return _pageCount * _pageSize; | ||
} | ||
|
||
int pageLimit(int x) const | ||
{ | ||
return _pageSize - (x % _pageSize); | ||
} | ||
|
||
int cursor() const | ||
{ | ||
return _cursor; | ||
assert(remainingDataSize() >= size); | ||
_head += size; | ||
} | ||
|
||
int size() const | ||
{ | ||
return _size; | ||
} | ||
|
||
bool empty() const | ||
{ | ||
return _size <= 0; | ||
} | ||
void reset() | ||
{ | ||
_head = _tail = _data.data(); | ||
} | ||
|
||
bool consumed() const | ||
{ | ||
return _cursor >= _size; | ||
} | ||
}; | ||
|
||
struct BufferManager | ||
{ | ||
thread_local static MemoryArena* arena; | ||
|
||
static std::unique_ptr<Buffer> getBuffer() | ||
{ | ||
auto b = std::make_unique<Buffer>(arena); | ||
b->tryNewPage(); | ||
return b; | ||
} | ||
|
||
static std::unique_ptr<Buffer> getEmptyBuffer() | ||
{ | ||
return std::make_unique<Buffer>(arena); | ||
return _head >= _tail; | ||
} | ||
}; | ||
|
||
|
||
} | ||
} |
Oops, something went wrong.