Skip to content

Commit

Permalink
Merge commit '26a65ef24069510df9473b19954ef7ea10ceab3d' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
maksis committed Aug 27, 2024
2 parents eff7555 + 26a65ef commit 7ecb9fc
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 137 deletions.
161 changes: 84 additions & 77 deletions airdcpp-core/airdcpp/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,58 +138,63 @@ bool ConnectionQueueItem::allowNewConnections(int aRunning) const noexcept {
*/
void ConnectionManager::getDownloadConnection(const HintedUser& aUser, bool aSmallSlot) noexcept {
dcassert(aUser.user);

if (DownloadManager::getInstance()->checkIdle(aUser.user, aSmallSlot)) {
return;
}

bool supportMcn = false;

if (!DownloadManager::getInstance()->checkIdle(aUser.user, aSmallSlot)) {
ConnectionQueueItem* cqi = nullptr;
int running = 0;
ConnectionQueueItem* cqi = nullptr;
int running = 0;

{
WLock l(cs);
for(const auto& i: downloads) {
cqi = i;
if (cqi->getUser() == aUser.user && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
if (cqi->isSet(ConnectionQueueItem::FLAG_MCN1)) {
supportMcn = true;
if (cqi->getState() != ConnectionQueueItem::RUNNING) {
//already has a waiting item? small slot doesn't count
if (!aSmallSlot) {
// force in case we joined a new hub and there was a protocol error
if (cqi->getLastAttempt() == -1) {
cqi->setLastAttempt(0);
}
return;
}
} else {
running++;
}
} else if (cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL_CONF) {
supportMcn = true;
//no need to continue with small slot if an item with the same type exists already (no mather whether it's running or not)
if (aSmallSlot) {
// force in case we joined a new hub and there was a protocol error
if (cqi->getLastAttempt() == -1) {
cqi->setLastAttempt(0);
}
return;
{
WLock l(cs);
for(const auto& i: downloads) {
cqi = i;
if (cqi->getUser() != aUser.user || cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
continue;
}

if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL) {
supportMcn = true;
if (cqi->getState() != ConnectionQueueItem::RUNNING) {
//already has a waiting item? small slot doesn't count
if (!aSmallSlot) {
// force in case we joined a new hub and there was a protocol error
if (cqi->getLastAttempt() == -1) {
cqi->setLastAttempt(0);
}
} else {
//no need to continue with non-MCN users
return;
}
} else {
running++;
}
}

if (supportMcn && !aSmallSlot && !cqi->allowNewConnections(running)) {
} else if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF) {
supportMcn = true;
//no need to continue with small slot if an item with the same type exists already (no mather whether it's running or not)
if (aSmallSlot) {
// force in case we joined a new hub and there was a protocol error
if (cqi->getLastAttempt() == -1) {
cqi->setLastAttempt(0);
}
return;
}
} else {
//no need to continue with non-MCN users
return;
}
}

//WLock l (cs);
dcdebug("Get cqi");
cqi = getCQI(aUser, CONNECTION_TYPE_DOWNLOAD);
if (aSmallSlot) {
cqi->setDownloadType(supportMcn ? ConnectionQueueItem::TYPE_SMALL_CONF : ConnectionQueueItem::TYPE_SMALL);
}
if (supportMcn && !aSmallSlot && !cqi->allowNewConnections(running)) {
return;
}

//WLock l (cs);
dcdebug("Get cqi");
cqi = getCQI(aUser, CONNECTION_TYPE_DOWNLOAD);
if (aSmallSlot) {
cqi->setDownloadType(supportMcn ? ConnectionQueueItem::DownloadType::SMALL_CONF : ConnectionQueueItem::DownloadType::SMALL);
}
}
}
Expand All @@ -213,8 +218,9 @@ void ConnectionManager::putCQI(ConnectionQueueItem* cqi) noexcept {
dcassert(find(container.begin(), container.end(), cqi) != container.end());
container.erase(remove(container.begin(), container.end(), cqi), container.end());

if (cqi->getConnType() == CONNECTION_TYPE_DOWNLOAD)
delayedTokens[cqi->getToken()] = GET_TICK();
if (cqi->getConnType() == CONNECTION_TYPE_DOWNLOAD && !cqi->isActive()) {
removedDownloadTokens[cqi->getToken()] = GET_TICK();
}

tokens.removeToken(cqi->getToken());
delete cqi;
Expand Down Expand Up @@ -288,7 +294,7 @@ void ConnectionManager::attemptDownloads(uint64_t aTick, StringList& removedToke
int attemptLimit = SETTING(DOWNCONN_PER_SEC);
uint16_t attempts = 0;
for (auto cqi : downloads) {
if (cqi->getState() != ConnectionQueueItem::ACTIVE && cqi->getState() != ConnectionQueueItem::RUNNING) {
if (!cqi->isActive()) {
if (!cqi->getUser().user->isOnline() || cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
removedTokens_.push_back(cqi->getToken());
continue;
Expand All @@ -310,21 +316,21 @@ void ConnectionManager::attemptDownloads(uint64_t aTick, StringList& removedToke
bool allowUrlChange = true;
bool hasDownload = false;

auto type = cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL || cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL_CONF ? QueueItem::TYPE_SMALL : cqi->getDownloadType() == ConnectionQueueItem::TYPE_MCN_NORMAL ? QueueItem::TYPE_MCN_NORMAL : QueueItem::TYPE_ANY;
auto type = cqi->isSmallSlot() ? QueueItem::TYPE_SMALL : cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL ? QueueItem::TYPE_MCN_NORMAL : QueueItem::TYPE_ANY;

//we'll also validate the hubhint (and that the user is online) before making any connection attempt
auto startDown = QueueManager::getInstance()->startDownload(cqi->getUser(), hubHint, type, bundleToken, allowUrlChange, hasDownload, lastError);
if (!hasDownload && cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL && count_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* aCQI) { return aCQI != cqi && aCQI->getUser() == cqi->getUser(); }) == 0) {
if (!hasDownload && cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL && count_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* aCQI) { return aCQI != cqi && aCQI->getUser() == cqi->getUser(); }) == 0) {
//the small file finished already? try with any type
cqi->setDownloadType(ConnectionQueueItem::TYPE_ANY);
cqi->setDownloadType(ConnectionQueueItem::DownloadType::ANY);
startDown = QueueManager::getInstance()->startDownload(cqi->getUser(), hubHint, QueueItem::TYPE_ANY,
bundleToken, allowUrlChange, hasDownload, lastError);
} else if (cqi->getDownloadType() == ConnectionQueueItem::TYPE_ANY && startDown.first == QueueItem::TYPE_SMALL &&
} else if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::ANY && startDown.first == QueueItem::TYPE_SMALL &&
count_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* aCQI) {
return aCQI->getUser() == cqi->getUser() && (cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL || cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL_CONF);
return aCQI->getUser() == cqi->getUser() && cqi->isSmallSlot();
}) == 0) {
// a small file has been added after the CQI was created
cqi->setDownloadType(ConnectionQueueItem::TYPE_SMALL);
cqi->setDownloadType(ConnectionQueueItem::DownloadType::SMALL);
}


Expand Down Expand Up @@ -390,21 +396,22 @@ void ConnectionManager::addRunningMCN(const UserConnection *aSource) noexcept {

bool ConnectionManager::allowNewMCN(const ConnectionQueueItem* aCQI) noexcept {
//we need to check if we have queued something also while the small file connection was being established
if (!aCQI->isSet(ConnectionQueueItem::FLAG_MCN1) && aCQI->getDownloadType() != ConnectionQueueItem::TYPE_SMALL_CONF)
if (!aCQI->isMcn()) {
return false;
}

//count the running MCN connections
int running = 0;
for(const auto& cqi: downloads) {
if (cqi->getUser() == aCQI->getUser() && cqi->getDownloadType() != ConnectionQueueItem::TYPE_SMALL_CONF && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
if (cqi->getState() != ConnectionQueueItem::RUNNING && cqi->getState() != ConnectionQueueItem::ACTIVE) {
if (cqi->getUser() == aCQI->getUser() && cqi->getDownloadType() != ConnectionQueueItem::DownloadType::SMALL_CONF && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
if (!cqi->isActive()) {
return false;
}
running++;
}
}

if (running > 0 && aCQI->getDownloadType() == ConnectionQueueItem::TYPE_SMALL_CONF)
if (running > 0 && aCQI->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF)
return false;

if (!aCQI->allowNewConnections(running) && !aCQI->isSet(ConnectionQueueItem::FLAG_REMOVE))
Expand All @@ -422,31 +429,29 @@ void ConnectionManager::createNewMCN(const HintedUser& aUser) noexcept {
if (start) {
WLock l (cs);
ConnectionQueueItem* cqiNew = getCQI(aUser, CONNECTION_TYPE_DOWNLOAD);
cqiNew->setFlag(ConnectionQueueItem::FLAG_MCN1);
cqiNew->setDownloadType(ConnectionQueueItem::TYPE_MCN_NORMAL);
cqiNew->setDownloadType(ConnectionQueueItem::DownloadType::MCN_NORMAL);
}
}

void ConnectionManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcept {
WLock l(cs);
for(auto i = delayedTokens.begin(); i != delayedTokens.end();) {
for (auto i = removedDownloadTokens.begin(); i != removedDownloadTokens.end();) {
if((i->second + (90 * 1000)) < aTick) {
delayedTokens.erase(i++);
} else
removedDownloadTokens.erase(i++);
} else {
++i;
}
}

for(auto j: userConnections) {

if (j->isSet(UserConnection::FLAG_PM)) {
//Send a write check to the socket to detect half connected state, a good interval?
if ((j->getLastActivity() + 180 * 1000) < aTick) {
AdcCommand c(AdcCommand::CMD_PMI);
c.addParam("\n");
j->send(c);
}
}
else if ((j->getLastActivity() + 180 * 1000) < aTick) {
} else if ((j->getLastActivity() + 180 * 1000) < aTick) {
j->disconnect(true);
}
}
Expand Down Expand Up @@ -874,12 +879,11 @@ void ConnectionManager::addDownloadConnection(UserConnection* uc) noexcept {
if(cqi->getState() == ConnectionQueueItem::WAITING || cqi->getState() == ConnectionQueueItem::CONNECTING) {
cqi->setState(ConnectionQueueItem::ACTIVE);
if (uc->isMCN()) {
if (cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL || cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL_CONF) {
if (cqi->isSmallSlot()) {
uc->setFlag(UserConnection::FLAG_SMALL_SLOT);
cqi->setDownloadType(ConnectionQueueItem::TYPE_SMALL_CONF);
cqi->setDownloadType(ConnectionQueueItem::DownloadType::SMALL_CONF);
} else {
cqi->setDownloadType(ConnectionQueueItem::TYPE_MCN_NORMAL);
cqi->setFlag(ConnectionQueueItem::FLAG_MCN1);
cqi->setDownloadType(ConnectionQueueItem::DownloadType::MCN_NORMAL);
}
}

Expand Down Expand Up @@ -1024,7 +1028,7 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo

dcassert(!token.empty());

bool delayedToken = false;
bool isRemovedDownload = false;
{
RLock l(cs);
auto i = find(downloads.begin(), downloads.end(), token);
Expand All @@ -1039,7 +1043,7 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo
cqi->setErrors(0);
aSource->setFlag(UserConnection::FLAG_DOWNLOAD);
} else {
delayedToken = delayedTokens.find(token) != delayedTokens.end();
isRemovedDownload = removedDownloadTokens.find(token) != removedDownloadTokens.end();
}
}

Expand All @@ -1063,7 +1067,7 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo
}

addPMConnection(aSource);
} else if (!delayedToken) {
} else if (!isRemovedDownload) {
if (!aSource->isSet(UserConnection::FLAG_UPLOAD))
aSource->setFlag(UserConnection::FLAG_UPLOAD);
addUploadConnection(aSource);
Expand Down Expand Up @@ -1106,18 +1110,21 @@ void ConnectionManager::failDownload(const string& aToken, const string& aError,
return;


if (cqi->isSet(ConnectionQueueItem::FLAG_MCN1) && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
//remove an existing waiting item, if exists
if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
// Remove an existing waiting item (if exists)
auto s = find_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* c) {
return c->getUser() == cqi->getUser() && c->getDownloadType() != ConnectionQueueItem::TYPE_SMALL_CONF && c->getDownloadType() != ConnectionQueueItem::TYPE_SMALL &&
c->getState() != ConnectionQueueItem::RUNNING && c->getState() != ConnectionQueueItem::ACTIVE && c != cqi && !c->isSet(ConnectionQueueItem::FLAG_REMOVE);
return c->getUser() == cqi->getUser() && !c->isSmallSlot() &&
!c->isActive() && c != cqi && !c->isSet(ConnectionQueueItem::FLAG_REMOVE);
});

if (s != downloads.end())
(*s)->setFlag(ConnectionQueueItem::FLAG_REMOVE);
if (s != downloads.end()) {
// (*s)->setFlag(ConnectionQueueItem::FLAG_REMOVE);
dcdebug("ConnectionManager::failDownload: removing an existing inactive CQI %s\n", (*s)->getToken().c_str());
putCQI(*s);
}
}

if (cqi->getDownloadType() == ConnectionQueueItem::TYPE_SMALL_CONF && cqi->getState() == ConnectionQueueItem::ACTIVE) {
if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF && cqi->getState() == ConnectionQueueItem::ACTIVE) {
//small slot item that was never used for downloading anything? check if we have normal files to download
if (allowNewMCN(cqi))
mcnUser = cqi->getUser();
Expand Down
35 changes: 24 additions & 11 deletions airdcpp-core/airdcpp/ConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,20 @@ class ConnectionQueueItem : boost::noncopyable, public Flags {
};

enum Flags {
FLAG_MCN1 = 0x01,
FLAG_REMOVE = 0x08
FLAG_REMOVE = 0x01
};

enum DownloadType {
TYPE_ANY,
TYPE_SMALL,
TYPE_SMALL_CONF,
TYPE_MCN_NORMAL
enum class DownloadType {
ANY,
SMALL,
SMALL_CONF,
MCN_NORMAL
};

ConnectionQueueItem(const HintedUser& aUser, ConnectionType aConntype, const string& aToken);

GETSET(string, token, Token);
IGETSET(DownloadType, downloadType, DownloadType, TYPE_ANY);
IGETSET(DownloadType, downloadType, DownloadType, DownloadType::ANY);
GETSET(string, lastBundle, LastBundle);
IGETSET(uint64_t, lastAttempt, LastAttempt, 0);
IGETSET(int, errors, Errors, 0); // Number of connection errors, or -1 after a protocol error
Expand All @@ -88,6 +87,18 @@ class ConnectionQueueItem : boost::noncopyable, public Flags {
void setHubUrl(const string& aHubUrl) noexcept { user.hint = aHubUrl; }
const HintedUser& getUser() const noexcept { return user; }
bool allowNewConnections(int running) const noexcept;

bool isSmallSlot() const noexcept {
return downloadType == DownloadType::SMALL_CONF || downloadType == DownloadType::SMALL;
}

bool isActive() const noexcept {
return state == ACTIVE || state == RUNNING;
}

bool isMcn() const noexcept {
return downloadType == DownloadType::SMALL_CONF || downloadType == DownloadType::MCN_NORMAL;
}
private:
HintedUser user;
};
Expand Down Expand Up @@ -213,9 +224,11 @@ class ConnectionManager : public Speaker<ConnectionManagerListener>, public Clie
StringList adcFeatures;

ExpectedMap expectedConnections;
typedef unordered_map<string, uint64_t> delayMap;
typedef delayMap::iterator delayIter;
delayMap delayedTokens;
typedef unordered_map<string, uint64_t> DelayMap;

// Keep track own our own downloads if they are removed before the handshake is finished
// (unknown tokens would be shown as uploads)
DelayMap removedDownloadTokens;

unique_ptr<Server> server;
unique_ptr<Server> secureServer;
Expand Down
2 changes: 2 additions & 0 deletions airdcpp-core/airdcpp/FavoriteManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ void FavoriteManager::loadFavoriteDirectories(SimpleXML& aXml) {
}
aXml.stepOut();
}

aXml.resetCurrentChild();
}

FavoriteHubEntryList FavoriteManager::getFavoriteHubs(const string& group) const noexcept {
Expand Down
Loading

0 comments on commit 7ecb9fc

Please sign in to comment.