From 1788075368b62e153e7a7c24b0156a57d588dad9 Mon Sep 17 00:00:00 2001 From: maksis Date: Sat, 31 Aug 2024 19:22:25 +0300 Subject: [PATCH] Squashed 'airdcpp-core/' changes from cd8de9936..857b11aeb 857b11aeb Remove ConnectionQueueItem::FLAG_REMOVE 1da4842e5 Refactor download connection management/starting of downloads git-subtree-dir: airdcpp-core git-subtree-split: 857b11aeb22f6355ecd47a77869dad3d36f7e07b --- airdcpp.vcxproj | 1 + airdcpp.vcxproj.filters | 3 + airdcpp/Bundle.cpp | 8 +- airdcpp/Bundle.h | 3 +- airdcpp/ClientManager.cpp | 53 ++- airdcpp/ClientManager.h | 32 +- airdcpp/ConnectionManager.cpp | 560 +++++++++++++++----------- airdcpp/ConnectionManager.h | 61 +-- airdcpp/DirectoryListingManager.h | 11 +- airdcpp/Download.cpp | 16 +- airdcpp/Download.h | 1 + airdcpp/DownloadManager.cpp | 194 +++++---- airdcpp/DownloadManager.h | 7 +- airdcpp/DownloadManagerListener.h | 3 +- airdcpp/GetSet.h | 9 + airdcpp/PartialBundleSharingManager.h | 1 - airdcpp/PrivateChat.cpp | 18 +- airdcpp/QueueDownloadInfo.h | 85 ++++ airdcpp/QueueItem.cpp | 93 +++-- airdcpp/QueueItem.h | 8 +- airdcpp/QueueItemBase.h | 9 - airdcpp/QueueManager.cpp | 311 +++++++------- airdcpp/QueueManager.h | 42 +- airdcpp/TrackableDownloadItem.cpp | 2 +- airdcpp/TransferInfo.h | 1 - airdcpp/TransferInfoManager.cpp | 28 +- airdcpp/TransferInfoManager.h | 1 + airdcpp/UploadBundleInfoSender.cpp | 2 +- airdcpp/UploadBundleInfoSender.h | 3 +- airdcpp/UploadQueueManager.cpp | 8 +- airdcpp/UserConnection.cpp | 12 + airdcpp/UserConnection.h | 6 +- airdcpp/UserConnectionListener.h | 4 +- airdcpp/UserQueue.cpp | 44 +- airdcpp/UserQueue.h | 10 +- airdcpp/forward.h | 2 + 36 files changed, 978 insertions(+), 674 deletions(-) create mode 100644 airdcpp/QueueDownloadInfo.h diff --git a/airdcpp.vcxproj b/airdcpp.vcxproj index 5b38d1da..f70022dd 100644 --- a/airdcpp.vcxproj +++ b/airdcpp.vcxproj @@ -389,6 +389,7 @@ + diff --git a/airdcpp.vcxproj.filters b/airdcpp.vcxproj.filters index f3874821..7bbb9584 100644 --- a/airdcpp.vcxproj.filters +++ b/airdcpp.vcxproj.filters @@ -1195,6 +1195,9 @@ Header Files\favorites + + Header Files\queue + diff --git a/airdcpp/Bundle.cpp b/airdcpp/Bundle.cpp index 1c7f917f..621b2cb0 100644 --- a/airdcpp/Bundle.cpp +++ b/airdcpp/Bundle.cpp @@ -360,20 +360,20 @@ bool Bundle::addUserQueue(const QueueItemPtr& qi, const HintedUser& aUser, bool } } -QueueItemPtr Bundle::getNextQI(const UserPtr& aUser, const OrderedStringSet& aOnlineHubs, string& aLastError, Priority aMinPrio, int64_t aWantedSize, int64_t aLastSpeed, QueueItemBase::DownloadType aType, bool aAllowOverlap) noexcept { +QueueItemPtr Bundle::getNextQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept { int p = static_cast(Priority::LAST) - 1; do { - auto i = userQueue[p].find(aUser); + auto i = userQueue[p].find(aQuery.user); if(i != userQueue[p].end()) { dcassert(!i->second.empty()); for(auto& qi: i->second) { - if (qi->hasSegment(aUser, aOnlineHubs, aLastError, aWantedSize, aLastSpeed, aType, aAllowOverlap)) { + if (qi->hasSegment(aQuery, lastError_, aAllowOverlap)) { return qi; } } } p--; - } while(p >= static_cast(aMinPrio)); + } while(p >= static_cast(aQuery.minPrio)); return nullptr; } diff --git a/airdcpp/Bundle.h b/airdcpp/Bundle.h index 55d132be..e198da79 100644 --- a/airdcpp/Bundle.h +++ b/airdcpp/Bundle.h @@ -26,6 +26,7 @@ #include "MerkleTree.h" #include "User.h" +#include "QueueDownloadInfo.h" #include "QueueItemBase.h" namespace dcpp { @@ -215,7 +216,7 @@ class Bundle : public QueueItemBase { /** All queue items indexed by user */ void addUserQueue(const QueueItemPtr& qi) noexcept; bool addUserQueue(const QueueItemPtr& qi, const HintedUser& aUser, bool aIsBad = false) noexcept; - QueueItemPtr getNextQI(const UserPtr& aUser, const OrderedStringSet& aOnlineHubs, string& aLastError, Priority aMinPrio, int64_t aWantedSize, int64_t aLastSpeed, QueueItemBase::DownloadType aType, bool allowOverlap) noexcept; + QueueItemPtr getNextQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept; void getItems(const UserPtr& aUser, QueueItemList& ql) const noexcept; QueueItemList getFailedItems() const noexcept; diff --git a/airdcpp/ClientManager.cpp b/airdcpp/ClientManager.cpp index 976bb655..f1d078f6 100644 --- a/airdcpp/ClientManager.cpp +++ b/airdcpp/ClientManager.cpp @@ -755,72 +755,71 @@ OnlineUserPtr ClientManager::findOnlineUser(const CID& cid, const string& hintUr return aAllowFallback ? p.first->second : nullptr; } -bool ClientManager::connect(const UserPtr& aUser, const string& aToken, bool aAllowUrlChange, string& lastError_, string& hubHint_, bool& isProtocolError_, ConnectionType aConnType) const noexcept { +ClientManager::ConnectResult ClientManager::connect(const HintedUser& aUser, const string& aToken, bool aAllowUrlChange, ConnectionType aConnType) const noexcept { + ConnectResult result; + RLock l(cs); - auto op = onlineUsers.equal_range(const_cast(&aUser->getCID())); + auto op = onlineUsers.equal_range(const_cast(&aUser.user->getCID())); auto connectUser = [&] (OnlineUser* ou) -> bool { - isProtocolError_ = false; + result.resetError(); + // result.isProtocolError = false; - auto ret = ou->getClient()->connect(*ou, aToken, lastError_); + string connectError; + auto ret = ou->getClient()->connect(*ou, aToken, connectError); if (ret == AdcCommand::SUCCESS) { return true; } //get the error string if (ret == AdcCommand::ERROR_TLS_REQUIRED) { - isProtocolError_ = true; - lastError_ = STRING(SOURCE_NO_ENCRYPTION); + result.onProtocolError(STRING(SOURCE_NO_ENCRYPTION)); } else if (ret == AdcCommand::ERROR_PROTOCOL_UNSUPPORTED) { - isProtocolError_ = true; - lastError_ = STRING_F(REMOTE_PROTOCOL_UNSUPPORTED, lastError_); + result.onProtocolError(STRING_F(REMOTE_PROTOCOL_UNSUPPORTED, connectError)); } else if (ret == AdcCommand::ERROR_BAD_STATE) { - lastError_ = STRING(CONNECTING_IN_PROGRESS); + result.onMinorError(STRING(CONNECTING_IN_PROGRESS)); } else if (ret == AdcCommand::ERROR_FEATURE_MISSING) { - isProtocolError_ = true; - lastError_ = STRING(NO_NATT_SUPPORT); + result.onProtocolError(STRING(NO_NATT_SUPPORT)); } else if (ret == AdcCommand::ERROR_PROTOCOL_GENERIC) { - isProtocolError_ = true; - lastError_ = STRING(UNABLE_CONNECT_USER); + result.onProtocolError(STRING(UNABLE_CONNECT_USER)); } return false; }; if (aConnType == CONNECTION_TYPE_PM) { - if (!aUser->isSet(User::TLS)) { - isProtocolError_ = true; - lastError_ = STRING(SOURCE_NO_ENCRYPTION); - return false; + if (!aUser.user->isSet(User::TLS)) { + result.onProtocolError(STRING(SOURCE_NO_ENCRYPTION)); + return result; } // We don't care which hub we use to establish the connection all we need to know is the user supports the CCPM feature. - if (!aUser->isSet(User::CCPM)) { - isProtocolError_ = true; - lastError_ = STRING(CCPM_NOT_SUPPORTED); - return false; + if (!aUser.user->isSet(User::CCPM)) { + result.onProtocolError(STRING(CCPM_NOT_SUPPORTED)); + return result; } } // Prefer the hinted hub - auto p = ranges::find_if(op | pair_to_range, [&hubHint_](const auto& ouc) { return ouc.second->getHubUrl() == hubHint_; }); + auto p = ranges::find_if(op | pair_to_range, [&aUser](const auto& ouc) { return ouc.second->getHubUrl() == aUser.hint; }); if (p != op.second && connectUser(p->second)) { - return true; + result.onSuccess(aUser.hint); + return result; } if (!aAllowUrlChange) { - return false; + return result; } // Connect via any available hub for (auto i = op.first; i != op.second; ++i) { if (connectUser(i->second)) { - hubHint_ = i->second->getHubUrl(); - return true; + result.onSuccess(i->second->getHubUrl()); + return result; } } - return false; + return result; } bool ClientManager::privateMessageHooked(const HintedUser& aUser, const OutgoingChatMessage& aMessage, string& error_, bool aEcho) noexcept { diff --git a/airdcpp/ClientManager.h b/airdcpp/ClientManager.h index 2c01b5c8..0d18b971 100644 --- a/airdcpp/ClientManager.h +++ b/airdcpp/ClientManager.h @@ -230,7 +230,37 @@ class ClientManager : public Speaker, bool sendUDPHooked(AdcCommand& c, const CID& to, bool aNoCID = false, bool aNoPassive = false, const string& aEncryptionKey = Util::emptyString, const string& aHubUrl = Util::emptyString) noexcept; bool sendUDP(const string& aData, const string& aIP, const string& aPort) noexcept; - bool connect(const UserPtr& aUser, const string& aToken, bool aAllowUrlChange, string& lastError_, string& hubHint_, bool& isProtocolError_, ConnectionType type = CONNECTION_TYPE_LAST) const noexcept; + struct ConnectResult { + public: + void onSuccess(const string& aHubHint) noexcept { + success = true; + hubHint = aHubHint; + } + + void onMinorError(const string& aError) noexcept { + lastError = aError; + protocolError = false; + } + + void onProtocolError(const string& aError) noexcept { + lastError = aError; + protocolError = true; + } + + void resetError() noexcept { + lastError = Util::emptyString; + protocolError = false; + } + + + GETPROP(string, lastError, Error); + IGETPROP(bool, protocolError, IsProtocolError, false); + + GETPROP(string, hubHint, HubHint); + IGETPROP(bool, success, IsSuccess, false); + }; + + ConnectResult connect(const HintedUser& aUser, const string& aToken, bool aAllowUrlChange, ConnectionType type = CONNECTION_TYPE_LAST) const noexcept; bool privateMessageHooked(const HintedUser& aUser, const OutgoingChatMessage& aMessage, string& error_, bool aEcho = true) noexcept; void userCommand(const HintedUser& aUser, const UserCommand& uc, ParamMap& params_, bool aCompatibility) noexcept; diff --git a/airdcpp/ConnectionManager.cpp b/airdcpp/ConnectionManager.cpp index dcb15213..0d25d0b5 100644 --- a/airdcpp/ConnectionManager.cpp +++ b/airdcpp/ConnectionManager.cpp @@ -27,7 +27,6 @@ #include "LogManager.h" #include "QueueManager.h" #include "ResourceManager.h" -#include "ScopedFunctor.h" #include "UploadManager.h" #include "UserConnection.h" #include "ValueGenerator.h" @@ -126,9 +125,50 @@ ConnectionQueueItem::ConnectionQueueItem(const HintedUser& aUser, ConnectionType } bool ConnectionQueueItem::allowNewConnections(int aRunning) const noexcept { - return (aRunning < AirUtil::getSlotsPerUser(true) || AirUtil::getSlotsPerUser(true) == 0) && (aRunning < maxConns || maxConns == 0); + if (maxRemoteConns != 0 && aRunning >= maxRemoteConns) { + return false; + } + + auto maxOwnConns = AirUtil::getSlotsPerUser(true); + if (maxOwnConns != 0 && aRunning >= maxOwnConns) { + return false; + } + + return true; } +bool ConnectionQueueItem::isSmallSlot() const noexcept { + return downloadType == QueueDownloadType::SMALL; +} + +bool ConnectionQueueItem::isActive() const noexcept { + return state == State::ACTIVE; +} + +bool ConnectionQueueItem::isMcn() const noexcept { + return isSet(FLAG_MCN); +} + +bool ConnectionQueueItem::allowConnect(int aAttempts, int aAttemptLimit, uint64_t aTick) const noexcept { + // No attempts? + if (lastAttempt == 0 && aAttempts < aAttemptLimit * 2) { + return true; + } + + // Enough time ellapsed since the last attempt? + return (aAttemptLimit == 0 || aAttempts < aAttemptLimit) && + lastAttempt + 60 * 1000 * max(1, errors) < aTick; +} + +bool ConnectionQueueItem::isTimeout(uint64_t aTick) const noexcept { + return state == ConnectionQueueItem::State::CONNECTING && lastAttempt + 50 * 1000 < aTick; +} + +void ConnectionQueueItem::resetFatalError() noexcept { + if (getLastAttempt() == -1) { + setLastAttempt(0); + } +} /** * Request a connection for downloading. @@ -143,63 +183,79 @@ void ConnectionManager::getDownloadConnection(const HintedUser& aUser, bool aSma return; } - bool supportMcn = false; + { + WLock l(cs); + if (!allowNewMCNUnsafe(aUser, aSmallSlot, [](ConnectionQueueItem* aWaitingCQI) { + // Force in case we joined a new hub and there was a protocol error + aWaitingCQI->resetFatalError(); + })) { + return; + } + + auto cqi = getCQIUnsafe(aUser, CONNECTION_TYPE_DOWNLOAD); + if (aSmallSlot) { + cqi->setDownloadType(QueueDownloadType::SMALL); + } + + dcdebug("DownloadManager::getDownloadConnection: created new item %s for user %s (small slot: %s)\n", cqi->getToken().c_str(), ClientManager::getInstance()->getFormatedNicks(aUser).c_str(), aSmallSlot ? "true" : "false"); + } +} +bool ConnectionManager::allowNewMCNUnsafe(const UserPtr& aUser, bool aSmallSlot, ConnectionQueueItemCallback&& aWaitingCallback) noexcept { + // We need to check if we have queued something also while the small file connection was being established ConnectionQueueItem* cqi = nullptr; - int running = 0; + int runningNormal = 0; + auto supportMcn = false; - { - WLock l(cs); - for(const auto& i: downloads) { - cqi = i; - if (cqi->getUser() != aUser.user || cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) { - continue; - } + for(const auto& i: downloads) { + cqi = i; + if (cqi->getUser() != aUser) { + continue; + } - if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL) { - supportMcn = true; - if (cqi->getState() != ConnectionQueueItem::RUNNING) { - //already has a waiting item? small slot doesn't count - if (!aSmallSlot) { - // force in case we joined a new hub and there was a protocol error - if (cqi->getLastAttempt() == -1) { - cqi->setLastAttempt(0); - } - return; - } - } else { - running++; - } - } else if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF) { - supportMcn = true; - //no need to continue with small slot if an item with the same type exists already (no mather whether it's running or not) - if (aSmallSlot) { - // force in case we joined a new hub and there was a protocol error - if (cqi->getLastAttempt() == -1) { - cqi->setLastAttempt(0); + if (!cqi->isMcn()) { + // We already have a connection, no need to continue + return false; + } + + supportMcn = true; + + if (cqi->getDownloadType() == QueueDownloadType::MCN_NORMAL) { + if (!cqi->isActive()) { + // Already has a waiting item? Small slot doesn't count + if (!aSmallSlot) { + // Force in case we joined a new hub and there was a protocol error + if (aWaitingCallback) { + aWaitingCallback(cqi); } - return; + return false; } } else { - //no need to continue with non-MCN users - return; + runningNormal++; } - } + } else if (cqi->getDownloadType() == QueueDownloadType::SMALL) { + // No need to continue with small slot if an item with the same type exists already + // (regardless of whether it's running or not) + if (aSmallSlot) { + // Force in case we joined a new hub and there was a protocol error + if (!cqi->isActive() && aWaitingCallback) { + aWaitingCallback(cqi); + } - if (supportMcn && !aSmallSlot && !cqi->allowNewConnections(running)) { - return; + return false; + } } + } - //WLock l (cs); - dcdebug("Get cqi"); - cqi = getCQI(aUser, CONNECTION_TYPE_DOWNLOAD); - if (aSmallSlot) { - cqi->setDownloadType(supportMcn ? ConnectionQueueItem::DownloadType::SMALL_CONF : ConnectionQueueItem::DownloadType::SMALL); - } + if (supportMcn && !aSmallSlot && !cqi->allowNewConnections(runningNormal)) { + return false; } + + return true; } -ConnectionQueueItem* ConnectionManager::getCQI(const HintedUser& aUser, ConnectionType aConnType, const string& aToken) noexcept { + +ConnectionQueueItem* ConnectionManager::getCQIUnsafe(const HintedUser& aUser, ConnectionType aConnType, const string& aToken) noexcept { auto& container = cqis[aConnType]; auto cqi = new ConnectionQueueItem(aUser, aConnType, !aToken.empty() ? aToken : tokens.createToken(aConnType)); container.emplace_back(cqi); @@ -209,9 +265,7 @@ ConnectionQueueItem* ConnectionManager::getCQI(const HintedUser& aUser, Connecti return cqi; } -void ConnectionManager::putCQI(ConnectionQueueItem* cqi) noexcept { - //allways called from inside lock - +void ConnectionManager::putCQIUnsafe(ConnectionQueueItem* cqi) noexcept { fire(ConnectionManagerListener::Removed(), cqi); auto& container = cqis[cqi->getConnType()]; @@ -283,156 +337,157 @@ void ConnectionManager::on(TimerManagerListener::Second, uint64_t aTick) noexcep for(auto& m: removedTokens) { auto s = find(downloads.begin(), downloads.end(), m); if (s != downloads.end()) { - putCQI(*s); + putCQIUnsafe(*s); } } } } void ConnectionManager::attemptDownloads(uint64_t aTick, StringList& removedTokens_) noexcept { - RLock l(cs); int attemptLimit = SETTING(DOWNCONN_PER_SEC); - uint16_t attempts = 0; - for (auto cqi : downloads) { - if (!cqi->isActive()) { - if (!cqi->getUser().user->isOnline() || cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) { - removedTokens_.push_back(cqi->getToken()); - continue; - } - - if (cqi->getErrors() == -1 && cqi->getLastAttempt() != 0) { - // protocol error, don't reconnect except after a forced attempt - continue; - } - - if ((cqi->getLastAttempt() == 0 && attempts < attemptLimit * 2) || ((attemptLimit == 0 || attempts < attemptLimit) && - cqi->getLastAttempt() + 60 * 1000 * max(1, cqi->getErrors()) < aTick)) - { - // TODO: no one can understand this code, fix! - ScopedFunctor([=] { cqi->setLastAttempt(aTick); }); - - QueueToken bundleToken = 0; - string lastError, hubHint = cqi->getHubUrl(); - bool allowUrlChange = true; - bool hasDownload = false; - - auto type = cqi->isSmallSlot() ? QueueItem::TYPE_SMALL : cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL ? QueueItem::TYPE_MCN_NORMAL : QueueItem::TYPE_ANY; - - //we'll also validate the hubhint (and that the user is online) before making any connection attempt - auto startDown = QueueManager::getInstance()->startDownload(cqi->getUser(), hubHint, type, bundleToken, allowUrlChange, hasDownload, lastError); - if (!hasDownload && cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL && count_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* aCQI) { return aCQI != cqi && aCQI->getUser() == cqi->getUser(); }) == 0) { - //the small file finished already? try with any type - cqi->setDownloadType(ConnectionQueueItem::DownloadType::ANY); - startDown = QueueManager::getInstance()->startDownload(cqi->getUser(), hubHint, QueueItem::TYPE_ANY, - bundleToken, allowUrlChange, hasDownload, lastError); - } else if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::ANY && startDown.first == QueueItem::TYPE_SMALL && - count_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* aCQI) { - return aCQI->getUser() == cqi->getUser() && cqi->isSmallSlot(); - }) == 0) { - // a small file has been added after the CQI was created - cqi->setDownloadType(ConnectionQueueItem::DownloadType::SMALL); - } + int attempts = 0; + RLock l(cs); + for (auto cqi : downloads) { + // Already active? + if (cqi->isActive()) { + continue; + } - if (!hasDownload) { - removedTokens_.push_back(cqi->getToken()); - continue; - } + // Removing? + if (!cqi->getUser().user->isOnline()) { + removedTokens_.push_back(cqi->getToken()); + continue; + } - cqi->setLastBundle(bundleToken != 0 ? Util::toString(bundleToken) : Util::emptyString); - cqi->setHubUrl(hubHint); - - if (cqi->getState() == ConnectionQueueItem::WAITING || - // Forcing the connection and it's not connected yet? Retry - (cqi->getLastAttempt() == 0 && cqi->getState() == ConnectionQueueItem::CONNECTING && find(userConnections.begin(), userConnections.end(), cqi->getToken()) == userConnections.end()) - ) { - if (startDown.second) { - cqi->setState(ConnectionQueueItem::CONNECTING); - bool protocolError = false; - - if (!ClientManager::getInstance()->connect(cqi->getUser(), cqi->getToken(), allowUrlChange, lastError, hubHint, protocolError)) { - cqi->setState(ConnectionQueueItem::WAITING); - cqi->setErrors(protocolError ? -1 : (cqi->getErrors() + 1)); // protocol error - dcassert(!lastError.empty()); - fire(ConnectionManagerListener::Failed(), cqi, lastError); - } else { - cqi->setHubUrl(hubHint); - fire(ConnectionManagerListener::Connecting(), cqi); - attempts++; - } - } else { - fire(ConnectionManagerListener::Failed(), cqi, lastError); - } - } - } else if (cqi->getState() == ConnectionQueueItem::CONNECTING && cqi->getLastAttempt() + 50 * 1000 < aTick) { + // No attempts? + if (cqi->getErrors() == -1 && cqi->getLastAttempt() != 0) { + // protocol error, don't reconnect except after a forced attempt + continue; + } + // Not enough time since the last attempt? + if (!cqi->allowConnect(attempts, attemptLimit, aTick)) { + if (cqi->isTimeout(aTick)) { cqi->setErrors(cqi->getErrors() + 1); fire(ConnectionManagerListener::Failed(), cqi, STRING(CONNECTION_TIMEOUT)); - cqi->setState(ConnectionQueueItem::WAITING); + cqi->setState(ConnectionQueueItem::State::WAITING); } - } else if (cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) { - cqi->unsetFlag(ConnectionQueueItem::FLAG_REMOVE); + + continue; + } + + // Try to connect + if (attemptDownloadUnsafe(cqi, removedTokens_)) { + attempts++; } + + cqi->setLastAttempt(aTick); } } +bool ConnectionManager::attemptDownloadUnsafe(ConnectionQueueItem* cqi, StringList& removedTokens_) noexcept { + + // We'll also validate the hubhint (and that the user is online) before making any connection attempt + auto startResult = QueueManager::getInstance()->startDownload(cqi->getUser(), cqi->getDownloadType()); + if (!startResult.hasDownload && cqi->getDownloadType() == QueueDownloadType::SMALL && ranges::none_of(downloads, [&](const ConnectionQueueItem* aCQI) { return aCQI != cqi && aCQI->getUser() == cqi->getUser(); })) { + // The small file finished already? Try with any type + cqi->setDownloadType(QueueDownloadType::ANY); + startResult = QueueManager::getInstance()->startDownload(cqi->getUser(), QueueDownloadType::ANY); + } else if ( + cqi->getDownloadType() == QueueDownloadType::ANY && + startResult.downloadType == QueueDownloadType::SMALL && + ranges::none_of(downloads, [&](const ConnectionQueueItem* aCQI) { + return aCQI->getUser() == cqi->getUser() && cqi->isSmallSlot(); + }) + ) { + // a Small file has been added after the CQI was created + cqi->setDownloadType(QueueDownloadType::SMALL); + } -void ConnectionManager::addRunningMCN(const UserConnection *aSource) noexcept { - { - RLock l(cs); - auto i = find(downloads.begin(), downloads.end(), aSource->getToken()); - if (i != downloads.end()) { - ConnectionQueueItem* cqi = *i; - cqi->setState(ConnectionQueueItem::RUNNING); - //LogManager::getInstance()->message("Running downloads for the user: " + Util::toString(runningDownloads[aSource->getUser()])); + // No files to download from this user? + if (!startResult.hasDownload) { + dcdebug("ConnectionManager::attemptDownload: no downloads from user %s (conn %s), removing (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(cqi->getUser()).c_str(), cqi->getToken().c_str(), cqi->isSmallSlot() ? "true" : "false"); + removedTokens_.push_back(cqi->getToken()); + return false; + } - if (!allowNewMCN(cqi)) - return; + cqi->setLastBundle(startResult.bundleToken ? Util::toString(*startResult.bundleToken) : Util::emptyString); + cqi->setHubUrl(startResult.hubHint); + + if (cqi->getState() == ConnectionQueueItem::State::WAITING || + // Forcing the connection and it's not connected yet? Retry + (cqi->getLastAttempt() == 0 && cqi->getState() == ConnectionQueueItem::State::CONNECTING && find(userConnections.begin(), userConnections.end(), cqi->getToken()) == userConnections.end()) + ) { + if (startResult.startDownload) { + return connectUnsafe(cqi, startResult.allowUrlChange); + } else { + // Download limits full or similar temporary error + dcdebug("ConnectionManager::attemptDownload: can't start download from user %s (connection %s): %s (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(cqi->getUser()).c_str(), cqi->getToken().c_str(), startResult.lastError.c_str(), cqi->isSmallSlot() ? "true" : "false"); + fire(ConnectionManagerListener::Failed(), cqi, startResult.lastError); } } - createNewMCN(aSource->getHintedUser()); + return false; } -bool ConnectionManager::allowNewMCN(const ConnectionQueueItem* aCQI) noexcept { - //we need to check if we have queued something also while the small file connection was being established - if (!aCQI->isMcn()) { +bool ConnectionManager::connectUnsafe(ConnectionQueueItem* cqi, bool aAllowUrlChange) noexcept { + cqi->setState(ConnectionQueueItem::State::CONNECTING); + + auto connectResult = ClientManager::getInstance()->connect(cqi->getUser(), cqi->getToken(), aAllowUrlChange); + if (!connectResult.getIsSuccess()) { + cqi->setState(ConnectionQueueItem::State::WAITING); + cqi->setErrors(connectResult.getIsProtocolError() ? -1 : (cqi->getErrors() + 1)); // protocol error + dcassert(!connectResult.getError().empty()); + fire(ConnectionManagerListener::Failed(), cqi, connectResult.getError()); return false; } - //count the running MCN connections - int running = 0; - for(const auto& cqi: downloads) { - if (cqi->getUser() == aCQI->getUser() && cqi->getDownloadType() != ConnectionQueueItem::DownloadType::SMALL_CONF && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) { - if (!cqi->isActive()) { - return false; - } - running++; + // Success + cqi->setHubUrl(connectResult.getHubHint()); + fire(ConnectionManagerListener::Connecting(), cqi); + return true; +} + +void ConnectionManager::onDownloadRunning(const UserConnection *aSource) noexcept { + { + RLock l(cs); + auto cqi = findDownloadUnsafe(aSource); + if (!cqi || cqi->isSet(ConnectionQueueItem::FLAG_RUNNING)) { + return; } - } - if (running > 0 && aCQI->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF) - return false; + cqi->setFlag(ConnectionQueueItem::FLAG_RUNNING); + if (!cqi->isMcn()) { + return; + } - if (!aCQI->allowNewConnections(running) && !aCQI->isSet(ConnectionQueueItem::FLAG_REMOVE)) - return false; + if (!allowNewMCNUnsafe(cqi->getUser(), false)) { + dcdebug("ConnectionManager::addRunningMCN: can't add new connections for user %s, conn %s (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(aSource->getHintedUser()).c_str(), cqi->getToken().c_str(), cqi->isSmallSlot() ? "true" : "false"); + return; + } + } - return true; + createNewMCN(aSource->getHintedUser()); } void ConnectionManager::createNewMCN(const HintedUser& aUser) noexcept { - auto runningBundles = DownloadManager::getInstance()->getRunningBundles(); + auto result = QueueManager::getInstance()->startDownload(aUser, QueueDownloadType::MCN_NORMAL); + if (!result.hasDownload) { + dcdebug("ConnectionManager::createNewMCN: no downloads from user %s (type normal)\n", ClientManager::getInstance()->getFormatedNicks(aUser).c_str()); + return; + } - string lastError; - auto start = QueueManager::getInstance()->startDownload(aUser, runningBundles, - ClientManager::getInstance()->getHubSet(aUser.user->getCID()), QueueItem::TYPE_MCN_NORMAL, 0, lastError); // don't overlap... - if (start) { + { WLock l (cs); - ConnectionQueueItem* cqiNew = getCQI(aUser, CONNECTION_TYPE_DOWNLOAD); - cqiNew->setDownloadType(ConnectionQueueItem::DownloadType::MCN_NORMAL); + auto cqiNew = getCQIUnsafe(aUser, CONNECTION_TYPE_DOWNLOAD); + cqiNew->setDownloadType(QueueDownloadType::MCN_NORMAL); + + dcdebug("ConnectionManager::createNewMCN: creating new connection for user %s\n", ClientManager::getInstance()->getFormatedNicks(aUser).c_str()); } } +#define MAX_UC_INACTIVITY_SECONDS 180 void ConnectionManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcept { WLock l(cs); for (auto i = removedDownloadTokens.begin(); i != removedDownloadTokens.end();) { @@ -446,12 +501,13 @@ void ConnectionManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcep for(auto j: userConnections) { if (j->isSet(UserConnection::FLAG_PM)) { //Send a write check to the socket to detect half connected state, a good interval? - if ((j->getLastActivity() + 180 * 1000) < aTick) { + if ((j->getLastActivity() + MAX_UC_INACTIVITY_SECONDS * 1000) < aTick) { AdcCommand c(AdcCommand::CMD_PMI); c.addParam("\n"); j->send(c); } - } else if ((j->getLastActivity() + 180 * 1000) < aTick) { + } else if ((j->getLastActivity() + MAX_UC_INACTIVITY_SECONDS * 1000) < aTick) { + dcdebug("ConnectionManager::timer: disconnecting an inactive connection %s for user %s\n", j->getToken().c_str(), ClientManager::getInstance()->getFormatedNicks(j->getHintedUser()).c_str()); j->disconnect(true); } } @@ -754,7 +810,7 @@ void ConnectionManager::on(UserConnectionListener::MyNick, UserConnection* aSour RLock l(cs); for(auto cqi: downloads) { cqi->setErrors(0); - if((cqi->getState() == ConnectionQueueItem::CONNECTING || cqi->getState() == ConnectionQueueItem::WAITING) && + if(!cqi->isActive() && cqi->getUser().user->getCID() == cid) { aSource->setUser(cqi->getUser()); @@ -855,8 +911,9 @@ void ConnectionManager::addPMConnection(UserConnection* uc) noexcept { if (i == container.end()) { dcassert(!uc->getToken().empty()); uc->setFlag(UserConnection::FLAG_ASSOCIATED); - auto cqi = getCQI(uc->getHintedUser(), CONNECTION_TYPE_PM, uc->getToken()); - cqi->setState(ConnectionQueueItem::ACTIVE); + + auto cqi = getCQIUnsafe(uc->getHintedUser(), CONNECTION_TYPE_PM, uc->getToken()); + cqi->setState(ConnectionQueueItem::State::ACTIVE); fire(ConnectionManagerListener::Connected(), cqi, uc); @@ -864,6 +921,7 @@ void ConnectionManager::addPMConnection(UserConnection* uc) noexcept { return; } } + putConnection(uc); } @@ -873,27 +931,25 @@ void ConnectionManager::addDownloadConnection(UserConnection* uc) noexcept { bool addConn = false; { RLock l(cs); - auto i = uc->isMCN() ? std::find(downloads.begin(), downloads.end(), uc->getToken()) : std::find(downloads.begin(), downloads.end(), uc->getUser()); - if(i != downloads.end()) { - ConnectionQueueItem* cqi = *i; - if(cqi->getState() == ConnectionQueueItem::WAITING || cqi->getState() == ConnectionQueueItem::CONNECTING) { - cqi->setState(ConnectionQueueItem::ACTIVE); - if (uc->isMCN()) { - if (cqi->isSmallSlot()) { - uc->setFlag(UserConnection::FLAG_SMALL_SLOT); - cqi->setDownloadType(ConnectionQueueItem::DownloadType::SMALL_CONF); - } else { - cqi->setDownloadType(ConnectionQueueItem::DownloadType::MCN_NORMAL); - } + auto cqi = findDownloadUnsafe(uc); + if (cqi && !cqi->isActive()) { + cqi->setState(ConnectionQueueItem::State::ACTIVE); + if (uc->isMCN()) { + if (cqi->isSmallSlot()) { + uc->setFlag(UserConnection::FLAG_SMALL_SLOT); + } else { + cqi->setDownloadType(QueueDownloadType::MCN_NORMAL); } - uc->setToken(cqi->getToken()); // sync for NMDC users - uc->setHubUrl(cqi->getHubUrl()); //set the correct hint for the uc, it might not even have a hint at first. - uc->setFlag(UserConnection::FLAG_ASSOCIATED); - fire(ConnectionManagerListener::Connected(), cqi, uc); - dcdebug("ConnectionManager::addDownloadConnection, leaving to downloadmanager\n"); - addConn = true; + cqi->setFlag(ConnectionQueueItem::FLAG_MCN); } + + uc->setToken(cqi->getToken()); // sync for NMDC users + uc->setHubUrl(cqi->getHubUrl()); //set the correct hint for the uc, it might not even have a hint at first. + uc->setFlag(UserConnection::FLAG_ASSOCIATED); + fire(ConnectionManagerListener::Connected(), cqi, uc); + dcdebug("ConnectionManager::addDownloadConnection, leaving to downloadmanager\n"); + addConn = true; } } @@ -910,9 +966,9 @@ void ConnectionManager::addUploadConnection(UserConnection* uc) noexcept { { WLock l(cs); - auto &uploads = cqis[CONNECTION_TYPE_UPLOAD]; + auto& uploads = cqis[CONNECTION_TYPE_UPLOAD]; if (!uc->isMCN() && find(uploads.begin(), uploads.end(), uc->getUser()) != uploads.end()) { - //one connection per CID for non-mcn users + // One connection per CID for non-mcn users allowAdd = false; } @@ -921,8 +977,8 @@ void ConnectionManager::addUploadConnection(UserConnection* uc) noexcept { if (allowAdd) { uc->setFlag(UserConnection::FLAG_ASSOCIATED); - auto cqi = getCQI(uc->getHintedUser(), CONNECTION_TYPE_UPLOAD, uc->getToken()); - cqi->setState(ConnectionQueueItem::ACTIVE); + auto cqi = getCQIUnsafe(uc->getHintedUser(), CONNECTION_TYPE_UPLOAD, uc->getToken()); + cqi->setState(ConnectionQueueItem::State::ACTIVE); fire(ConnectionManagerListener::Connected(), cqi, uc); } } @@ -986,11 +1042,13 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo fail(AdcCommand::ERROR_GENERIC, "Connection not expected"); return; } + + // Hub URL aSource->setHubUrl(i.second); + // User auto user = ClientManager::getInstance()->findUser(CID(i.first)); aSource->setUser(user); - if (!aSource->getUser()) { dcdebug("CM::onINF: User not found"); fail(AdcCommand::ERROR_GENERIC, "User not found"); @@ -1033,11 +1091,11 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo RLock l(cs); auto i = find(downloads.begin(), downloads.end(), token); if (i != downloads.end()) { - ConnectionQueueItem* cqi = *i; - if(aSource->isMCN()) { + auto cqi = *i; + if (aSource->isMCN()) { string slots; if (cmd.getParam("CO", 0, slots)) { - cqi->setMaxConns(static_cast(Util::toInt(slots))); + cqi->setMaxRemoteConns(static_cast(Util::toInt(slots))); } } cqi->setErrors(0); @@ -1077,11 +1135,17 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo } void ConnectionManager::force(const string& aToken) noexcept { + if (DownloadManager::getInstance()->checkIdle(aToken)) { + dcdebug("ConnectionManager::force: idler %s\n", aToken.c_str()); + return; + } + RLock l(cs); auto i = find(downloads.begin(), downloads.end(), aToken); if (i != downloads.end()) { fire(ConnectionManagerListener::Forced(), *i); (*i)->setLastAttempt(0); + dcdebug("ConnectionManager::force: download %s\n", aToken.c_str()); } } @@ -1106,67 +1170,89 @@ void ConnectionManager::failDownload(const string& aToken, const string& aError, } auto cqi = *i; - if (cqi->getState() == ConnectionQueueItem::WAITING) - return; - + if (cqi->isMcn()) { + removeExtraMCNUnsafe(cqi); - if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) { - // Remove an existing waiting item (if exists) - auto s = find_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* c) { - return c->getUser() == cqi->getUser() && !c->isSmallSlot() && - !c->isActive() && c != cqi && !c->isSet(ConnectionQueueItem::FLAG_REMOVE); - }); - - if (s != downloads.end()) { - // (*s)->setFlag(ConnectionQueueItem::FLAG_REMOVE); - dcdebug("ConnectionManager::failDownload: removing an existing inactive CQI %s\n", (*s)->getToken().c_str()); - putCQI(*s); + if (cqi->isSmallSlot() && cqi->getState() == ConnectionQueueItem::State::ACTIVE) { + // Small slot item that was never used for downloading anything? Check if we have normal files to download + if (allowNewMCNUnsafe(cqi->getUser(), false)) { + mcnUser = cqi->getUser(); + } } - } - - if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF && cqi->getState() == ConnectionQueueItem::ACTIVE) { - //small slot item that was never used for downloading anything? check if we have normal files to download - if (allowNewMCN(cqi)) - mcnUser = cqi->getUser(); } - cqi->setState(ConnectionQueueItem::WAITING); + if (cqi->getState() != ConnectionQueueItem::State::WAITING) { + cqi->setState(ConnectionQueueItem::State::WAITING); + + cqi->setErrors(aFatalError ? -1 : (cqi->getErrors() + 1)); + cqi->setLastAttempt(GET_TICK()); + } - cqi->setErrors(aFatalError ? -1 : (cqi->getErrors() + 1)); - cqi->setLastAttempt(GET_TICK()); + cqi->unsetFlag(ConnectionQueueItem::FLAG_RUNNING); fire(ConnectionManagerListener::Failed(), cqi, aError); } - if (mcnUser) + if (mcnUser) { createNewMCN(*mcnUser); + } +} + +void ConnectionManager::onIdle(UserConnection* aSource) noexcept { + WLock l(cs); //this may flag other user connections as removed which would possibly cause threading issues + auto cqi = findDownloadUnsafe(aSource); + if (!cqi || !cqi->isSet(ConnectionQueueItem::FLAG_RUNNING)) { + return; + } + + cqi->unsetFlag(ConnectionQueueItem::FLAG_RUNNING); + removeExtraMCNUnsafe(cqi); +} + +void ConnectionManager::removeExtraMCNUnsafe(const ConnectionQueueItem* aFailedCQI) noexcept { + if (!aFailedCQI->isMcn()) { + return; + } + + if (aFailedCQI->getDownloadType() != QueueDownloadType::MCN_NORMAL) { + return; + } + + // Remove an existing waiting item (if exists) + auto s = find_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* c) { + return c->getUser() == aFailedCQI->getUser() && !c->isSmallSlot() && + !c->isActive() && c != aFailedCQI; + }); + + if (s != downloads.end()) { + // (*s)->setFlag(ConnectionQueueItem::FLAG_REMOVE); + dcdebug("ConnectionManager::disconnectExtraMCN: removing an existing inactive MCN item %s\n", (*s)->getToken().c_str()); + putCQIUnsafe(*s); + } +} + +ConnectionQueueItem* ConnectionManager::findDownloadUnsafe(const UserConnection* aSource) noexcept { + // Token may not be synced for NMDC users + auto i = aSource->isMCN() ? std::find(downloads.begin(), downloads.end(), aSource->getToken()) : std::find(downloads.begin(), downloads.end(), aSource->getUser()); + if (i == downloads.end()) { + return nullptr; + } + + return *i; +} + +void ConnectionManager::on(UserConnectionListener::State, UserConnection* aSource) noexcept { + if (aSource->getState() == UserConnection::STATE_IDLE) { + onIdle(aSource); + } else if (aSource->isSet(UserConnection::FLAG_DOWNLOAD) && aSource->getState() == UserConnection::STATE_RUNNING) { + onDownloadRunning(aSource); + } } void ConnectionManager::failed(UserConnection* aSource, const string& aError, bool aProtocolError) noexcept { if(aSource->isSet(UserConnection::FLAG_ASSOCIATED)) { if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) { - if (aSource->getState() == UserConnection::STATE_IDLE) { - // don't remove the CQI if we are only out of downloading slots - - bool allowChange = false, hasDownload = false; - QueueToken unusedBundleToken = 0; - string unusedHubUrl, lastError; - QueueManager::getInstance()->startDownload(aSource->getHintedUser(), unusedHubUrl, - aSource->isSet(UserConnection::FLAG_SMALL_SLOT) ? QueueItem::TYPE_SMALL : QueueItem::TYPE_ANY, - unusedBundleToken, allowChange, hasDownload, lastError); - - if (hasDownload) { - failDownload(aSource->getToken(), lastError, aProtocolError); - } else { - WLock l(cs); - auto i = find(downloads.begin(), downloads.end(), aSource->getToken()); - dcassert(i != downloads.end()); - if (i != downloads.end()) { - putCQI(*i); - } - } - } else { - failDownload(aSource->getToken(), aError, aProtocolError); - } + failDownload(aSource->getToken(), aError, aProtocolError); + dcdebug("ConnectionManager::failed: download %s failed\n", aSource->getToken().c_str()); } else { auto type = aSource->isSet(UserConnection::FLAG_UPLOAD) ? CONNECTION_TYPE_UPLOAD : @@ -1178,7 +1264,7 @@ void ConnectionManager::failed(UserConnection* aSource, const string& aError, bo auto i = type == CONNECTION_TYPE_PM ? find(container.begin(), container.end(), aSource->getUser()) : find(container.begin(), container.end(), aSource->getToken()); dcassert(i != container.end()); - putCQI(*i); + putCQIUnsafe(*i); } } } diff --git a/airdcpp/ConnectionManager.h b/airdcpp/ConnectionManager.h index de652623..ea75a146 100644 --- a/airdcpp/ConnectionManager.h +++ b/airdcpp/ConnectionManager.h @@ -28,6 +28,7 @@ #include "CriticalSection.h" #include "FloodCounter.h" #include "HintedUser.h" +#include "QueueDownloadInfo.h" #include "Singleton.h" #include "UserConnection.h" @@ -54,33 +55,26 @@ class ConnectionQueueItem : boost::noncopyable, public Flags { typedef vector List; typedef List::const_iterator Iter; - enum State { + enum class State { CONNECTING, // Recently sent request to connect WAITING, // Waiting to send request to connect ACTIVE, // In one up/downmanager - RUNNING // Running/idle }; enum Flags { - FLAG_REMOVE = 0x01 - }; - - enum class DownloadType { - ANY, - SMALL, - SMALL_CONF, - MCN_NORMAL + FLAG_MCN = 0x02, + FLAG_RUNNING = 0x04, }; ConnectionQueueItem(const HintedUser& aUser, ConnectionType aConntype, const string& aToken); GETSET(string, token, Token); - IGETSET(DownloadType, downloadType, DownloadType, DownloadType::ANY); + IGETSET(QueueDownloadType, downloadType, DownloadType, QueueDownloadType::ANY); GETSET(string, lastBundle, LastBundle); IGETSET(uint64_t, lastAttempt, LastAttempt, 0); IGETSET(int, errors, Errors, 0); // Number of connection errors, or -1 after a protocol error - IGETSET(State, state, State, WAITING); - IGETSET(uint8_t, maxConns, MaxConns, 0); + IGETSET(State, state, State, State::WAITING); + IGETSET(uint8_t, maxRemoteConns, MaxRemoteConns, 0); GETSET(ConnectionType, connType, ConnType); const string& getHubUrl() const noexcept { return user.hint; } @@ -88,17 +82,14 @@ class ConnectionQueueItem : boost::noncopyable, public Flags { const HintedUser& getUser() const noexcept { return user; } bool allowNewConnections(int running) const noexcept; - bool isSmallSlot() const noexcept { - return downloadType == DownloadType::SMALL_CONF || downloadType == DownloadType::SMALL; - } + bool isSmallSlot() const noexcept; + bool isActive() const noexcept; + bool isMcn() const noexcept; - bool isActive() const noexcept { - return state == ACTIVE || state == RUNNING; - } + bool allowConnect(int aAttempts, int aAttemptLimit, uint64_t aTick) const noexcept; + bool isTimeout(uint64_t aTick) const noexcept; - bool isMcn() const noexcept { - return downloadType == DownloadType::SMALL_CONF || downloadType == DownloadType::MCN_NORMAL; - } + void resetFatalError() noexcept; private: HintedUser user; }; @@ -174,8 +165,6 @@ class ConnectionManager : public Speaker, public Clie const string& getPort() const noexcept; const string& getSecurePort() const noexcept; - void addRunningMCN(const UserConnection *aSource) noexcept; - // set fatalError to true if the client shouldn't try to reconnect automatically void failDownload(const string& aToken, const string& aError, bool aFatalError) noexcept; @@ -190,9 +179,19 @@ class ConnectionManager : public Speaker, public Clie private: FloodCounter floodCounter; - bool allowNewMCN(const ConnectionQueueItem* aCQI) noexcept; + typedef std::function ConnectionQueueItemCallback; + + // Can we create a new regular MCN connection? + bool allowNewMCNUnsafe(const UserPtr& aUser, bool aSmallSlot, ConnectionQueueItemCallback&& aWaitingCallback = nullptr) noexcept; + + // Create a new regular MCN connection void createNewMCN(const HintedUser& aUser) noexcept; + // Remove an extra waiting MCN connection (we should keep only one waiting connection) + void removeExtraMCNUnsafe(const ConnectionQueueItem* aFailedCQI) noexcept; + + void onDownloadRunning(const UserConnection* aSource) noexcept; + class Server : public Thread { public: Server(bool secure, const string& port_, const string& ipv4, const string& ipv6); @@ -247,8 +246,8 @@ class ConnectionManager : public Speaker, public Clie void addDownloadConnection(UserConnection* uc) noexcept; void addPMConnection(UserConnection* uc) noexcept; - ConnectionQueueItem* getCQI(const HintedUser& aUser, ConnectionType aConnType, const string& aToken = Util::emptyString) noexcept; - void putCQI(ConnectionQueueItem* cqi) noexcept; + ConnectionQueueItem* getCQIUnsafe(const HintedUser& aUser, ConnectionType aConnType, const string& aToken = Util::emptyString) noexcept; + void putCQIUnsafe(ConnectionQueueItem* cqi) noexcept; void accept(const Socket& sock, bool aSecure) noexcept; @@ -268,6 +267,8 @@ class ConnectionManager : public Speaker, public Clie void on(UserConnectionListener::MyNick, UserConnection*, const string&) noexcept override; void on(UserConnectionListener::Supports, UserConnection*, const StringList&) noexcept override; void on(UserConnectionListener::UserSet, UserConnection*) noexcept override; + // void on(UserConnectionListener::Idle, UserConnection*) noexcept override; + void on(UserConnectionListener::State, UserConnection*) noexcept override; void on(AdcCommand::SUP, UserConnection*, const AdcCommand&) noexcept override; void on(AdcCommand::INF, UserConnection*, const AdcCommand&) noexcept override; @@ -282,8 +283,14 @@ class ConnectionManager : public Speaker, public Clie void on(ClientManagerListener::UserDisconnected, const UserPtr& aUser, bool) noexcept override { onUserUpdated(aUser); } void onUserUpdated(const UserPtr& aUser) noexcept; + void onIdle(UserConnection* aSource) noexcept; void attemptDownloads(uint64_t aTick, StringList& removedTokens_) noexcept; + bool attemptDownloadUnsafe(ConnectionQueueItem* cqi, StringList& removedTokens_) noexcept; + bool connectUnsafe(ConnectionQueueItem* cqi, bool aAllowUrlChange) noexcept; + + ConnectionQueueItem* findDownloadUnsafe(const UserConnection* aSource) noexcept; + StringList getAdcFeatures() const noexcept; }; diff --git a/airdcpp/DirectoryListingManager.h b/airdcpp/DirectoryListingManager.h index 13f06de0..327b10ef 100644 --- a/airdcpp/DirectoryListingManager.h +++ b/airdcpp/DirectoryListingManager.h @@ -94,14 +94,13 @@ namespace dcpp { /** Lists open in the client **/ DirectoryListingMap viewedLists; - void on(QueueManagerListener::ItemAdded, const QueueItemPtr& aQI) noexcept; - void on(QueueManagerListener::ItemFinished, const QueueItemPtr& qi, const string& dir, const HintedUser& aUser, int64_t aSpeed) noexcept; - void on(QueueManagerListener::ItemRemoved, const QueueItemPtr& qi, bool finished) noexcept; + void on(QueueManagerListener::ItemAdded, const QueueItemPtr& aQI) noexcept override; + void on(QueueManagerListener::ItemFinished, const QueueItemPtr& qi, const string& dir, const HintedUser& aUser, int64_t aSpeed) noexcept override; + void on(QueueManagerListener::ItemRemoved, const QueueItemPtr& qi, bool finished) noexcept override; - void on(QueueManagerListener::PartialListFinished, const HintedUser& aUser, const string& aXml, const string& aBase) noexcept; + void on(QueueManagerListener::PartialListFinished, const HintedUser& aUser, const string& aXml, const string& aBase) noexcept override; - - void on(TimerManagerListener::Minute, uint64_t aTick) noexcept; + void on(TimerManagerListener::Minute, uint64_t aTick) noexcept override; }; } diff --git a/airdcpp/Download.cpp b/airdcpp/Download.cpp index 28a92747..d8356678 100644 --- a/airdcpp/Download.cpp +++ b/airdcpp/Download.cpp @@ -36,6 +36,7 @@ namespace dcpp { Download::Download(UserConnection& conn, QueueItem& qi) noexcept : Transfer(conn, qi.getTarget(), qi.getTTH()), tempTarget(qi.getTempTarget()), listDirectoryPath(qi.isFilelist() ? qi.getListDirectoryPath() : Util::emptyString) { + dcassert(!conn.getDownload()); conn.setDownload(this); QueueItem::SourceConstIter source = qi.getSource(getUser()); @@ -101,7 +102,9 @@ Download::Download(UserConnection& conn, QueueItem& qi) noexcept : Transfer(conn } Download::~Download() { - getUserConnection().setDownload(0); + dcassert(getUserConnection().getDownload() == this); + // dcdebug("Deleting download %s\n", getToken().c_str()); + getUserConnection().setDownload(nullptr); } string Download::getBundleStringToken() const noexcept { @@ -115,6 +118,17 @@ bool Download::operator==(const Download* d) const { return compare(getToken(), d->getToken()) == 0; } +void Download::flush() noexcept { + if (getOutput()) { + if (getActual() > 0) { + try { + getOutput()->flushBuffers(false); + } catch (const Exception&) { + } + } + } +} + void Download::appendFlags(OrderedStringSet& flags_) const noexcept { if (isSet(Download::FLAG_PARTIAL)) { flags_.insert("P"); diff --git a/airdcpp/Download.h b/airdcpp/Download.h index 2a303adc..6326ef13 100644 --- a/airdcpp/Download.h +++ b/airdcpp/Download.h @@ -94,6 +94,7 @@ class Download : public Transfer, public Flags { string getBundleStringToken() const noexcept; void appendFlags(OrderedStringSet& flags_) const noexcept; + void flush() noexcept; private: Download(const Download&); Download& operator=(const Download&) = delete; diff --git a/airdcpp/DownloadManager.cpp b/airdcpp/DownloadManager.cpp index c2150577..edfa94ea 100644 --- a/airdcpp/DownloadManager.cpp +++ b/airdcpp/DownloadManager.cpp @@ -20,7 +20,6 @@ #include "DownloadManager.h" #include "ClientManager.h" -#include "ConnectionManager.h" #include "Download.h" #include "LogManager.h" #include "QueueManager.h" @@ -31,12 +30,6 @@ #include #include - -// some strange mac definition -#ifdef ff -#undef ff -#endif - namespace dcpp { static const string DOWNLOAD_AREA = "Downloads"; @@ -150,16 +143,26 @@ void DownloadManager::on(TimerManagerListener::Second, uint64_t aTick) noexcept QueueManager::getInstance()->handleSlowDisconnect(dtp.user, dtp.target, dtp.bundle); } -bool DownloadManager::checkIdle(const UserPtr& aUser, bool aSmallSlot, bool aReportOnly) { +bool DownloadManager::checkIdle(const string& aToken) { + RLock l(cs); + for (auto uc : idlers) { + if (uc->getToken() == aToken) { + uc->callAsync([this, uc] { revive(uc); }); + return true; + } + } + return false; +} + +bool DownloadManager::checkIdle(const UserPtr& aUser, bool aSmallSlot) { RLock l(cs); for (auto uc: idlers) { if (uc->getUser() == aUser) { if (aSmallSlot != uc->isSet(UserConnection::FLAG_SMALL_SLOT) && uc->isMCN()) continue; - if (!aReportOnly) - uc->callAsync([this, uc] { revive(uc); }); - //dcdebug("uc updated"); + + uc->callAsync([this, uc] { revive(uc); }); return true; } } @@ -178,18 +181,19 @@ void DownloadManager::revive(UserConnection* uc) { checkDownloads(uc); } -void DownloadManager::addConnection(UserConnection* conn) { - if (!conn->isSet(UserConnection::FLAG_SUPPORTS_TTHF) || !conn->isSet(UserConnection::FLAG_SUPPORTS_ADCGET)) { +void DownloadManager::addConnection(UserConnection* aSource) { + if (!aSource->isSet(UserConnection::FLAG_SUPPORTS_TTHF) || !aSource->isSet(UserConnection::FLAG_SUPPORTS_ADCGET)) { // Can't download from these... - conn->getUser()->setFlag(User::OLD_CLIENT); - QueueManager::getInstance()->removeSource(conn->getUser(), QueueItem::Source::FLAG_NO_TTHF); - conn->disconnect(); + aSource->getUser()->setFlag(User::OLD_CLIENT); + QueueManager::getInstance()->removeSource(aSource->getUser(), QueueItem::Source::FLAG_NO_TTHF); + dcdebug("DownloadManager::addConnection: outdated user (%s)\n", aSource->getToken().c_str()); + disconnect(aSource); return; } - conn->addListener(this); - checkDownloads(conn); + aSource->addListener(this); + checkDownloads(aSource); } QueueTokenSet DownloadManager::getRunningBundles(bool aIgnoreHighestPrio) const noexcept { @@ -225,52 +229,38 @@ void DownloadManager::checkDownloads(UserConnection* aConn) { //We may have download assigned for a connection if we are downloading in segments //dcassert(!aConn->getDownload() || aConn->getDownload()->isSet(Download::FLAG_CHUNKED)); - QueueItemBase::DownloadType dlType = QueueItemBase::TYPE_ANY; - if (aConn->isSet(UserConnection::FLAG_SMALL_SLOT)) { - dlType = QueueItemBase::TYPE_SMALL; - } else if (aConn->isMCN()) { - dlType = QueueItemBase::TYPE_MCN_NORMAL; - } - auto hubs = ClientManager::getInstance()->getHubSet(aConn->getUser()->getCID()); - //always make sure that the current hub is also compared even if it is offline + // always make sure that the current hub is also compared even if it is offline hubs.insert(aConn->getHubUrl()); - string errorMessage; auto runningBundles = getRunningBundles(); - bool start = QueueManager::getInstance()->startDownload(aConn->getHintedUser(), runningBundles, hubs, dlType, aConn->getSpeed(), errorMessage); - - // not a finished download? - if(!start && aConn->getState() != UserConnection::STATE_RUNNING) { - // removeRunningUser(aConn); - failDownload(aConn, Util::emptyString, false); - return; - } - string newUrl; - Download* d = nullptr; + auto result = QueueManager::getInstance()->getDownload(*aConn, runningBundles, hubs); - if (start) - d = QueueManager::getInstance()->getDownload(*aConn, runningBundles, hubs, errorMessage, newUrl, dlType); + // Nothing to download? Skip finished download connections as they should be added in idlers + if (!result.hasDownload) { + dcdebug("DownloadManager::checkDownloads: no downloads from user %s (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(aConn->getHintedUser()).c_str(), aConn->isSet(UserConnection::FLAG_SMALL_SLOT) ? "true" : "false"); + if (aConn->getState() != UserConnection::STATE_RUNNING) { + failDownload(aConn, Util::emptyString, false); + return; + } + } - if(!d) { - aConn->unsetFlag(UserConnection::FLAG_RUNNING); - if(!errorMessage.empty()) { - fire(DownloadManagerListener::Status(), aConn, errorMessage); + auto d = result.download; + if (!d) { + if (result.hasDownload) { + dcdebug("DownloadManager::checkDownloads: can't start download from user %s (%s)\n", ClientManager::getInstance()->getFormatedNicks(aConn->getHintedUser()).c_str(), result.lastError.c_str()); } - if (!checkIdle(aConn->getUser(), aConn->isSet(UserConnection::FLAG_SMALL_SLOT), true)) { - aConn->setState(UserConnection::STATE_IDLE); - fire(DownloadManagerListener::Idle(), aConn); + aConn->setState(UserConnection::STATE_IDLE); + fire(DownloadManagerListener::Idle(), aConn, result.lastError); - { - WLock l(cs); - idlers.push_back(aConn); - } - } else { - aConn->disconnect(true); + { + WLock l(cs); + idlers.push_back(aConn); } + return; } @@ -281,8 +271,8 @@ void DownloadManager::checkDownloads(UserConnection* aConn) { */ string mySID; - if(!aConn->getUser()->isNMDC()) { - mySID = ClientManager::getInstance()->findMySID(aConn->getUser(), newUrl, false); //no fallback, keep the old hint even if the hub is offline + if (!aConn->getUser()->isNMDC()) { + mySID = ClientManager::getInstance()->findMySID(aConn->getUser(), result.hubHint, false); //no fallback, keep the old hint even if the hub is offline } aConn->setState(UserConnection::STATE_SND); @@ -300,15 +290,16 @@ void DownloadManager::checkDownloads(UserConnection* aConn) { } } - dcdebug("Requesting " I64_FMT "/" I64_FMT "\n", d->getStartPos(), d->getSegmentSize()); + dcdebug("DownloadManager::checkDownloads: requesting " I64_FMT "/" I64_FMT " (connection %s)\n", d->getStartPos(), d->getSegmentSize(), d->getToken().c_str()); //only update the hub if it has been changed - if (compare(newUrl, aConn->getHubUrl()) == 0) { + if (compare(result.hubHint, aConn->getHubUrl()) == 0) { mySID.clear(); - } else if (!newUrl.empty()) { - aConn->setHubUrl(newUrl); + } else if (!result.hubHint.empty()) { + aConn->setHubUrl(result.hubHint); } + dcassert(aConn->getDownload()); fire(DownloadManagerListener::Requesting(), d, !mySID.empty()); aConn->send(d->getCommand(aConn->isSet(UserConnection::FLAG_SUPPORTS_ZLIB_GET), mySID)); } @@ -320,7 +311,8 @@ void DownloadManager::on(AdcCommand::SND, UserConnection* aSource, const AdcComm } if(!aSource->getDownload()) { - aSource->disconnect(true); + dcdebug("DownloadManager::AdcCommand::SND: no download (%s)\n", aSource->getToken().c_str()); + disconnect(aSource, true); return; } @@ -333,7 +325,8 @@ void DownloadManager::on(AdcCommand::SND, UserConnection* aSource, const AdcComm if(type != Transfer::names[aSource->getDownload()->getType()]) { // Uhh??? We didn't ask for this... - aSource->disconnect(); + dcdebug("DownloadManager::AdcCommand::SND: transfer type mismatch (%s)\n", aSource->getToken().c_str()); + disconnect(aSource); return; } @@ -344,7 +337,7 @@ void DownloadManager::startData(UserConnection* aSource, int64_t start, int64_t Download* d = aSource->getDownload(); dcassert(d); - dcdebug("Preparing " I64_FMT ":" I64_FMT ", " I64_FMT ":" I64_FMT"\n", d->getStartPos(), start, d->getSegmentSize(), bytes); + dcdebug("DownloadManager::startData: preparing " I64_FMT ":" I64_FMT ", " I64_FMT ":" I64_FMT"\n", d->getStartPos(), start, d->getSegmentSize(), bytes); if (d->getSegmentSize() == -1) { if(bytes >= 0) { d->setSegmentSize(bytes); @@ -379,10 +372,6 @@ void DownloadManager::startData(UserConnection* aSource, int64_t start, int64_t d->setStart(GET_TICK()); d->tick(); - if (!aSource->isSet(UserConnection::FLAG_RUNNING) && aSource->isMCN() && (d->getType() == Download::TYPE_FILE || d->getType() == Download::TYPE_PARTIAL_LIST)) { - ConnectionManager::getInstance()->addRunningMCN(aSource); - aSource->setFlag(UserConnection::FLAG_RUNNING); - } aSource->setState(UserConnection::STATE_RUNNING); fire(DownloadManagerListener::Starting(), d); @@ -401,8 +390,12 @@ void DownloadManager::startData(UserConnection* aSource, int64_t start, int64_t void DownloadManager::on(UserConnectionListener::Data, UserConnection* aSource, const uint8_t* aData, size_t aLen) noexcept { Download* d = aSource->getDownload(); - if (!d) //No download but receiving data?? - aSource->disconnect(true); + if (!d) { + //No download but receiving data?? + dcassert(0); + dcdebug("DownloadManager::UserConnectionListener::Data: no download (%s)\n", aSource->getToken().c_str()); + disconnect(aSource, true); + } try { d->addPos(d->getOutput()->write(aData, aLen), aLen); @@ -444,12 +437,12 @@ void DownloadManager::endData(UserConnection* aSource) { if(!(d->getTTH() == d->getTigerTree().getRoot())) { // This tree is for a different file, remove from queue... - removeDownload(d); fire(DownloadManagerListener::Failed(), d, STRING(INVALID_TREE)); QueueManager::getInstance()->removeFileSource(d->getPath(), aSource->getUser(), QueueItem::Source::FLAG_BAD_TREE, false); - QueueManager::getInstance()->putDownloadHooked(d, false); + dcdebug("DownloadManager::endData: invalid tree received from user %s (received %s while %s was excpected)\n", ClientManager::getInstance()->getFormatedNicks(d->getHintedUser()).c_str(), d->getTTH().toBase32().c_str(), d->getTigerTree().getRoot().toBase32().c_str()); + putDownloadHooked(d, false); checkDownloads(aSource); return; } @@ -458,21 +451,24 @@ void DownloadManager::endData(UserConnection* aSource) { aSource->setSpeed(static_cast(d->getAverageSpeed())); aSource->updateChunkSize(d->getTigerTree().getBlockSize(), d->getSegmentSize(), GET_TICK() - d->getStart()); - dcdebug("Download finished: %s, size " I64_FMT ", downloaded " I64_FMT " in " U64_FMT " ms\n", d->getPath().c_str(), d->getSegmentSize(), d->getPos(), GET_TICK() - d->getStart()); + dcdebug("DownloadManager::endData: %s (connection %s), size " I64_FMT ", downloaded " I64_FMT " in " U64_FMT " ms\n", d->getPath().c_str(), d->getToken().c_str(), d->getSegmentSize(), d->getPos(), GET_TICK() - d->getStart()); } - removeDownload(d); - fire(DownloadManagerListener::Complete(), d, d->getType() == Transfer::TYPE_TREE); + putDownloadHooked(d, true); + checkDownloads(aSource); +} + +void DownloadManager::putDownloadHooked(Download* aDownload, bool aFinished, bool aNoAccess, bool aRotateQueue) { + unique_ptr d(aDownload); + + removeDownload(d.get()); try { - QueueManager::getInstance()->putDownloadHooked(d, true); + QueueManager::getInstance()->putDownloadHooked(d.get(), aFinished, aNoAccess, aRotateQueue); } catch (const HashException& e) { - failDownload(aSource, e.getError(), false); - ConnectionManager::getInstance()->failDownload(aSource->getToken(), e.getError(), true); + failDownload(&aDownload->getUserConnection(), e.getError(), false); return; } - - checkDownloads(aSource); } int64_t DownloadManager::getRunningAverage() const { @@ -508,7 +504,7 @@ void DownloadManager::on(UserConnectionListener::MaxedOut, UserConnection* aSour void DownloadManager::noSlots(UserConnection* aSource, const string& param) { if(aSource->getState() != UserConnection::STATE_SND) { dcdebug("DM::noSlots Bad state, disconnecting\n"); - aSource->disconnect(); + disconnect(aSource); return; } @@ -527,9 +523,9 @@ void DownloadManager::onFailed(UserConnection* aSource, const string& aError) { void DownloadManager::failDownload(UserConnection* aSource, const string& aReason, bool aRotateQueue) { auto d = aSource->getDownload(); if (d) { - removeDownload(d); + dcdebug("DownloadManager::failDownload: %s failed (%s)\n", aSource->getToken().c_str(), aReason.c_str()); fire(DownloadManagerListener::Failed(), d, aReason); - QueueManager::getInstance()->putDownloadHooked(d, false, false, aRotateQueue); + putDownloadHooked(d, false, false, aRotateQueue); } else { fire(DownloadManagerListener::Remove(), aSource); } @@ -543,16 +539,14 @@ void DownloadManager::removeConnection(UserConnectionPtr aConn) { aConn->disconnect(); } +void DownloadManager::disconnect(UserConnectionPtr aConn, bool aGraceless) { + dcdebug("DownloadManager::disconnect: %s (graceless: %s)\n", aConn->getToken().c_str(), aGraceless ? "true" : "false"); + aConn->disconnect(aGraceless); +} + void DownloadManager::removeDownload(Download* d) { // Write the leftover bytes into file - if(d->getOutput()) { - if(d->getActual() > 0) { - try { - d->getOutput()->flushBuffers(false); - } catch(const Exception&) { - } - } - } + d->flush(); { WLock l(cs); @@ -587,7 +581,7 @@ void DownloadManager::abortDownload(const string& aTarget, const UserPtr& aUser) continue; } } - dcdebug("Trying to close connection for download %p\n", d); + dcdebug("Trying to close connection for download %s\n", d->getToken().c_str()); d->getUserConnection().disconnect(true); } } @@ -595,7 +589,8 @@ void DownloadManager::abortDownload(const string& aTarget, const UserPtr& aUser) void DownloadManager::on(UserConnectionListener::FileNotAvailable, UserConnection* aSource) noexcept { if(!aSource->getDownload()) { - aSource->disconnect(true); + dcdebug("DM::FileNotAvailable: no download (%s)\n", aSource->getToken().c_str()); + disconnect(aSource, true); return; } fileNotAvailable(aSource, false); @@ -604,20 +599,23 @@ void DownloadManager::on(UserConnectionListener::FileNotAvailable, UserConnectio /** @todo Handle errors better */ void DownloadManager::on(AdcCommand::STA, UserConnection* aSource, const AdcCommand& cmd) noexcept { if(cmd.getParameters().size() < 2) { - aSource->disconnect(); + dcdebug("DM::AdcCommand::STA: not enough parameters (%s)\n", aSource->getToken().c_str()); + disconnect(aSource); return; } const string& errorCode = cmd.getParam(0); const string& errorMessage = cmd.getParam(1); if(errorCode.length() != 3) { - aSource->disconnect(); + dcdebug("DM::AdcCommand::STA: invalid error code (%s)\n", aSource->getToken().c_str()); + disconnect(aSource); return; } switch(Util::toInt(errorCode.substr(0, 1))) { case AdcCommand::SEV_FATAL: - aSource->disconnect(); + dcdebug("DM::AdcCommand::STA: fatal error (%s)\n", aSource->getToken().c_str()); + disconnect(aSource); return; case AdcCommand::SEV_RECOVERABLE: switch(Util::toInt(errorCode.substr(1))) { @@ -642,13 +640,15 @@ void DownloadManager::on(AdcCommand::STA, UserConnection* aSource, const AdcComm dcdebug("Unknown success message %s %s", errorCode.c_str(), errorMessage.c_str()); return; } - aSource->disconnect(); + + dcdebug("DM::AdcCommand::STA: disconnecting (%s)\n", aSource->getToken().c_str()); + disconnect(aSource); } void DownloadManager::fileNotAvailable(UserConnection* aSource, bool aNoAccess, const string& aMessage) { if(aSource->getState() != UserConnection::STATE_SND) { dcdebug("DM::fileNotAvailable Invalid state, disconnecting"); - aSource->disconnect(); + disconnect(aSource); return; } @@ -656,8 +656,6 @@ void DownloadManager::fileNotAvailable(UserConnection* aSource, bool aNoAccess, dcassert(d); dcdebug("File Not Available: %s\n", d->getPath().c_str()); - removeDownload(d); - string error; if (aNoAccess) { error = STRING(NO_FILE_ACCESS); @@ -675,7 +673,7 @@ void DownloadManager::fileNotAvailable(UserConnection* aSource, bool aNoAccess, QueueManager::getInstance()->removeFileSource(d->getPath(), aSource->getUser(), (Flags::MaskType)(d->getType() == Transfer::TYPE_TREE ? QueueItem::Source::FLAG_NO_TREE : QueueItem::Source::FLAG_FILE_NOT_AVAILABLE), false); } - QueueManager::getInstance()->putDownloadHooked(d, false, aNoAccess); + putDownloadHooked(d, false, aNoAccess); checkDownloads(aSource); } diff --git a/airdcpp/DownloadManager.h b/airdcpp/DownloadManager.h index efa6a8ed..ee6591d4 100644 --- a/airdcpp/DownloadManager.h +++ b/airdcpp/DownloadManager.h @@ -47,7 +47,8 @@ class DownloadManager : public Speaker, /** @internal */ void addConnection(UserConnection* conn); - bool checkIdle(const UserPtr& aUser, bool aSmallSlot, bool aReportOnly = false); + bool checkIdle(const UserPtr& aUser, bool aSmallSlot); + bool checkIdle(const string& aToken); /** @internal */ void abortDownload(const string& aTarget, const UserPtr& aUser = nullptr); @@ -83,11 +84,15 @@ class DownloadManager : public Speaker, // Bundle::TokenMap bundles; UserConnectionList idlers; + void disconnect(UserConnectionPtr aConn, bool aGraceless = false); void removeConnection(UserConnectionPtr aConn); void removeDownload(Download* aDown); void fileNotAvailable(UserConnection* aSource, bool aNoAccess, const string& aMessage = Util::emptyString); void noSlots(UserConnection* aSource, const string& param = Util::emptyString); + + void putDownloadHooked(Download* aDownload, bool aFinished, bool aNoAccess = false, bool aRotateQueue = false); + void failDownload(UserConnection* aSource, const string& reason, bool rotateQueue); friend class Singleton; diff --git a/airdcpp/DownloadManagerListener.h b/airdcpp/DownloadManagerListener.h index baf2e53a..73205e4d 100644 --- a/airdcpp/DownloadManagerListener.h +++ b/airdcpp/DownloadManagerListener.h @@ -84,8 +84,7 @@ class DownloadManagerListener { * display an error string. */ virtual void on(Failed, const Download*, const string&) noexcept { } - virtual void on(Status, const UserConnection*, const string&) noexcept { } - virtual void on(Idle, const UserConnection*) noexcept { } + virtual void on(Idle, const UserConnection*, const string&) noexcept { } virtual void on(Remove, const UserConnection*) noexcept { } }; diff --git a/airdcpp/GetSet.h b/airdcpp/GetSet.h index 4089f12b..932277be 100644 --- a/airdcpp/GetSet.h +++ b/airdcpp/GetSet.h @@ -36,6 +36,15 @@ protected: t name = init; \ public: std::conditional::value, const t&, t>::type get##name2() const noexcept { return name; } \ template void set##name2(GetSetT&& name) { this->name = std::forward(name); } + +#define GETPROP(t, name, name2) \ +protected: t name; \ +public: std::conditional::value, const t&, t>::type get##name2() const noexcept { return name; } + +#define IGETPROP(t, name, name2, init) \ +protected: t name = init; \ +public: std::conditional::value, const t&, t>::type get##name2() const noexcept { return name; } + #else // This version is for my stupid editor =) diff --git a/airdcpp/PartialBundleSharingManager.h b/airdcpp/PartialBundleSharingManager.h index 0d84f312..39f52629 100644 --- a/airdcpp/PartialBundleSharingManager.h +++ b/airdcpp/PartialBundleSharingManager.h @@ -27,7 +27,6 @@ #include "CriticalSection.h" #include "ProtocolCommandManager.h" #include "Message.h" -#include "QueueItemBase.h" namespace dcpp { diff --git a/airdcpp/PrivateChat.cpp b/airdcpp/PrivateChat.cpp index 720a9ce7..8c92572f 100644 --- a/airdcpp/PrivateChat.cpp +++ b/airdcpp/PrivateChat.cpp @@ -243,28 +243,24 @@ void PrivateChat::close() { } void PrivateChat::startCC() { - bool protocolError; if (!replyTo.user->isOnline() || ccpmState < DISCONNECTED) { return; } ccpmState = CONNECTING; - string lastError; auto token = ConnectionManager::getInstance()->tokens.createToken(CONNECTION_TYPE_PM); - - auto newUrl = replyTo.hint; - bool connecting = ClientManager::getInstance()->connect(replyTo.user, token, true, lastError, newUrl, protocolError, CONNECTION_TYPE_PM); - if (replyTo.hint != newUrl) { - setHubUrl(newUrl); + auto connectResult = ClientManager::getInstance()->connect(replyTo, token, true, CONNECTION_TYPE_PM); + if (replyTo.hint != connectResult.getHubHint()) { + setHubUrl(connectResult.getHubHint()); } - allowAutoCCPM = !protocolError; + allowAutoCCPM = !connectResult.getIsProtocolError(); - if (!connecting) { + if (!connectResult.getIsSuccess()) { ccpmState = DISCONNECTED; - if (!lastError.empty()) { - statusMessage(lastError, LogMessage::SEV_ERROR, LogMessage::Type::SERVER); + if (!connectResult.getError().empty()) { + statusMessage(connectResult.getError(), LogMessage::SEV_ERROR, LogMessage::Type::SERVER); } } else { statusMessage(STRING(CCPM_ESTABLISHING), LogMessage::SEV_INFO, LogMessage::Type::SERVER); diff --git a/airdcpp/QueueDownloadInfo.h b/airdcpp/QueueDownloadInfo.h new file mode 100644 index 00000000..cf096f1a --- /dev/null +++ b/airdcpp/QueueDownloadInfo.h @@ -0,0 +1,85 @@ +/* +* Copyright (C) 2011-2024 AirDC++ Project +* +* This program is free software; you can redistribute it and/or modify +* it under the terms of the GNU General Public License as published by +* the Free Software Foundation; either version 3 of the License, or +* (at your option) any later version. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU General Public License for more details. +* +* You should have received a copy of the GNU General Public License +* along with this program; if not, write to the Free Software +* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +*/ + +#ifndef DCPLUSPLUS_DCPP_QUEUE_DOWNLOAD_INFO_H_ +#define DCPLUSPLUS_DCPP_QUEUE_DOWNLOAD_INFO_H_ + +#include "typedefs.h" + +#include "HintedUser.h" +#include "Priority.h" + +namespace dcpp { + +enum class QueueDownloadType { + ANY, + SMALL, + MCN_NORMAL +}; + +struct QueueDownloadResultBase { + string hubHint; + + // The last error why a file can't be started (not cleared if a download is found afterwards) + string lastError; + + // Indicates that there's a valid file even if it can't be temporarily started e.g. due to configured download limits + bool hasDownload = false; + + virtual void merge(const QueueDownloadResultBase& aOther) noexcept { + hubHint = aOther.hubHint; + hasDownload = aOther.hasDownload; + lastError = aOther.lastError; + } +}; + + +// Queue results +struct QueueDownloadResult : QueueDownloadResultBase { + // Whether the returned hubHint should be strictly followed (e.g. a filelist download) + bool allowUrlChange = true; + + // Possible bundle + optional bundleToken; + + bool startDownload = false; + QueueDownloadType downloadType = QueueDownloadType::ANY; + + QueueItemPtr qi = nullptr; +}; + + +struct QueueDownloadQuery { + QueueDownloadQuery(const UserPtr& aUser, const OrderedStringSet& aOnlineHubs, const QueueTokenSet& aRunningBundles) : user(aUser), onlineHubs(aOnlineHubs), runningBundles(aRunningBundles) {} + + const UserPtr user; + + QueueDownloadType downloadType = QueueDownloadType::ANY; + + int64_t wantedSize = 0; + int64_t lastSpeed = 0; + + Priority minPrio = Priority::LOWEST; + + const OrderedStringSet& onlineHubs; + const QueueTokenSet& runningBundles; +}; + +} + +#endif /* DCPLUSPLUS_DCPP_BUNDLEINFO_H_ */ diff --git a/airdcpp/QueueItem.cpp b/airdcpp/QueueItem.cpp index e1635090..13a26ff9 100644 --- a/airdcpp/QueueItem.cpp +++ b/airdcpp/QueueItem.cpp @@ -132,9 +132,9 @@ bool QueueItem::isFailedStatus(Status aStatus) noexcept { } Priority QueueItem::calculateAutoPriority() const noexcept { - if(getAutoPriority()) { + if (getAutoPriority()) { Priority p; - int percent = static_cast(getDownloadedBytes() * 10.0 / size); + auto percent = static_cast(getDownloadedBytes() * 10.0 / size); switch(percent){ case 0: case 1: @@ -565,7 +565,7 @@ uint64_t QueueItem::getDownloadedBytes() const noexcept { void QueueItem::addFinishedSegment(const Segment& aSegment) noexcept { #ifdef _DEBUG if (bundle) - dcdebug("adding segment segment of size " I64_FMT " (" I64_FMT ", " I64_FMT ")...", aSegment.getSize(), aSegment.getStart(), aSegment.getEnd()); + dcdebug("QueueItem::addFinishedSegment: adding segment of size " I64_FMT " (" I64_FMT ", " I64_FMT ")...", aSegment.getSize(), aSegment.getStart(), aSegment.getEnd()); #endif dcassert(aSegment.getOverlapped() == false); @@ -649,54 +649,89 @@ void QueueItem::getChunksVisualisation(vector& running_, vectorgetBlockedHubs().empty() && includes(source->getBlockedHubs().begin(), source->getBlockedHubs().end(), aOnlineHubs.begin(), aOnlineHubs.end())) { +bool QueueItem::Source::matchesDownloadQuery(const QueueDownloadQuery& aQuery, string& lastError_) const noexcept { + // Only blocked hubs? + if (!blockedHubs.empty() && includes(blockedHubs.begin(), blockedHubs.end(), aQuery.onlineHubs.begin(), aQuery.onlineHubs.end())) { lastError_ = STRING(NO_ACCESS_ONLINE_HUBS); return false; } - //can't download a filelist if the hub is offline... don't be too strict with NMDC hubs - if (!aUser->isSet(User::NMDC) && (isSet(FLAG_USER_LIST) && !isSet(FLAG_TTHLIST_BUNDLE)) && aOnlineHubs.find(source->getUser().hint) == aOnlineHubs.end()) { + // Can't download a filelist if the hub is offline... don't be too strict with NMDC hubs + if (!aQuery.user->isSet(User::NMDC) && (isSet(FLAG_USER_LIST) && !isSet(FLAG_TTHLIST_BUNDLE)) && aQuery.onlineHubs.find(user.hint) == aQuery.onlineHubs.end()) { lastError_ = STRING(USER_OFFLINE); return false; } - dcassert(isSource(aUser)); - if (segmentsDone()) { + return true; +} + +bool QueueItem::matchesDownloadType(QueueDownloadType aType) const noexcept { + if (aType == QueueDownloadType::SMALL && !usesSmallSlot()) { + //don't even think of stealing our priority channel + return false; + } else if (aType == QueueDownloadType::MCN_NORMAL && usesSmallSlot()) { return false; } - if(aType == TYPE_SMALL && !usesSmallSlot()) { - //don't even think of stealing our priority channel + return true; +} + +bool QueueItem::allowSegmentedDownloads() const noexcept { + // Don't try to create multiple connections for filelists or files viewed in client + if (isSet(QueueItem::FLAG_USER_LIST) || isSet(QueueItem::FLAG_CLIENT_VIEW)) { + return false; + } + + // No segmented downloading when getting the tree + if (getDownloads()[0]->getType() == Transfer::TYPE_TREE) { + return false; + } + + return true; +} + +bool QueueItem::hasSegment(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept { + if (isPausedPrio()) + return false; + + dcassert(isSource(aQuery.user)); + auto source = getSource(aQuery.user); + + // Check source + if (!source->matchesDownloadQuery(aQuery, lastError_)) { + return false; + } + + // Finished? + if (segmentsDone()) { return false; - } else if (aType == TYPE_MCN_NORMAL && usesSmallSlot()) { + } + + // Slot type + if (!matchesDownloadType(aQuery.downloadType)) { return false; } - if(isWaiting()) { + // See if we have an available segment + + if (isWaiting()) { return true; } - - // No segmented downloading when getting the tree - if(getDownloads()[0]->getType() == Transfer::TYPE_TREE) { + + // Running item + + if (!allowSegmentedDownloads()) { return false; } - if(!isSet(QueueItem::FLAG_USER_LIST) && !isSet(QueueItem::FLAG_CLIENT_VIEW)) { - Segment segment = getNextSegment(getBlockSize(), aWantedSize, aLastSpeed, source->getPartsInfo(), aAllowOverlap); - if(segment.getSize() == 0) { - lastError_ = (segment.getStart() == -1 || getSize() < Util::convertSize(SETTING(MIN_SEGMENT_SIZE), Util::KB)) ? STRING(NO_FILES_AVAILABLE) : STRING(NO_FREE_BLOCK); - dcdebug("No segment for %s (%s) in %s, block " I64_FMT "\n", aUser->getCID().toBase32().c_str(), Util::listToString(aOnlineHubs).c_str(), getTarget().c_str(), blockSize); - return false; - } - } else if (!isWaiting()) { - //don't try to create multiple connections for filelists or files viewed in client + // File segment? + auto segment = getNextSegment(getBlockSize(), aQuery.wantedSize, aQuery.lastSpeed, source->getPartsInfo(), aAllowOverlap); + if (segment.getSize() == 0) { + lastError_ = (segment.getStart() == -1 || getSize() < Util::convertSize(SETTING(MIN_SEGMENT_SIZE), Util::KB)) ? STRING(NO_FILES_AVAILABLE) : STRING(NO_FREE_BLOCK); + dcdebug("No segment for %s (%s) in %s, block " I64_FMT "\n", aQuery.user->getCID().toBase32().c_str(), Util::listToString(aQuery.onlineHubs).c_str(), getTarget().c_str(), blockSize); return false; } + return true; } diff --git a/airdcpp/QueueItem.h b/airdcpp/QueueItem.h index eba2a8f9..5a007410 100644 --- a/airdcpp/QueueItem.h +++ b/airdcpp/QueueItem.h @@ -20,6 +20,7 @@ #define DCPLUSPLUS_DCPP_QUEUE_ITEM_H #include "QueueItemBase.h" +#include "QueueDownloadInfo.h" #include "FastAlloc.h" #include "HintedUser.h" @@ -146,6 +147,8 @@ class QueueItem : public QueueItemBase { void setPartsInfo(const PartsInfo& aPartsInfo) noexcept { partsInfo = aPartsInfo; } + + bool matchesDownloadQuery(const QueueDownloadQuery& aQuery, string& lastError_) const noexcept; private: PartsInfo partsInfo; @@ -174,7 +177,7 @@ class QueueItem : public QueueItemBase { void save(OutputStream &save, string tmp, string b32tmp); int countOnlineUsers() const noexcept; void getOnlineUsers(HintedUserList& l) const noexcept; - bool hasSegment(const UserPtr& aUser, const OrderedStringSet& onlineHubs, string& lastError, int64_t wantedSize, int64_t lastSpeed, DownloadType aType, bool allowOverlap) noexcept; + bool hasSegment(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept; bool isPausedPrio() const noexcept; SourceList& getSources() noexcept { return sources; } @@ -285,6 +288,9 @@ class QueueItem : public QueueItemBase { bool isHubBlocked(const UserPtr& aUser, const string& aUrl) const noexcept; void removeSource(const UserPtr& aUser, Flags::MaskType reason) noexcept; + bool matchesDownloadType(QueueDownloadType aType) const noexcept; + bool allowSegmentedDownloads() const noexcept; + static uint8_t getMaxSegments(int64_t aFileSize) noexcept; int64_t blockSize = -1; diff --git a/airdcpp/QueueItemBase.h b/airdcpp/QueueItemBase.h index 0d5e2a55..9e36c0fb 100644 --- a/airdcpp/QueueItemBase.h +++ b/airdcpp/QueueItemBase.h @@ -31,17 +31,8 @@ namespace dcpp { using std::string; -typedef uint32_t QueueToken; -typedef unordered_set QueueTokenSet; class QueueItemBase : public Flags { public: - enum DownloadType { - TYPE_NONE, - TYPE_ANY, - TYPE_SMALL, - TYPE_MCN_NORMAL - }; - QueueItemBase(const string& aTarget, int64_t aSize, Priority aPriority, time_t aAdded, QueueToken aToken, Flags::MaskType aFlags); const DownloadList& getDownloads() { return downloads; } diff --git a/airdcpp/QueueManager.cpp b/airdcpp/QueueManager.cpp index 0a8da4e7..b994ce11 100644 --- a/airdcpp/QueueManager.cpp +++ b/airdcpp/QueueManager.cpp @@ -1143,175 +1143,221 @@ bool QueueManager::addValidatedSource(const QueueItemPtr& qi, const HintedUser& } -Download* QueueManager::getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs, string& lastError_, string& newUrl, QueueItemBase::DownloadType aType) noexcept{ +QueueManager::DownloadResult QueueManager::getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs) noexcept { + const auto& user = aSource.getUser(); + + DownloadResult result; + QueueItemPtr q = nullptr; - Download* d = nullptr; { - WLock l(cs); - dcdebug("Getting download for %s...", aSource.getUser()->getCID().toBase32().c_str()); + // Segments shouldn't be assigned simultaneously for multiple connections + Lock slotLock(slotAssignCS); - const UserPtr& u = aSource.getUser(); - bool hasDownload = false; + { + auto downloadType = QueueDownloadType::ANY; + if (aSource.isSet(UserConnection::FLAG_SMALL_SLOT)) { + downloadType = QueueDownloadType::SMALL; + } else if (aSource.isMCN()) { + downloadType = QueueDownloadType::MCN_NORMAL; + } - q = userQueue.getNext(aSource.getUser(), aRunningBundles, aOnlineHubs, lastError_, hasDownload, Priority::LOWEST, aSource.getChunkSize(), aSource.getSpeed(), aType); - if (!q) { - dcdebug("none\n"); - return nullptr; - } + auto startResult = startDownload(aSource.getHintedUser(), downloadType, aRunningBundles, aOnlineHubs, aSource.getSpeed()); + result.merge(startResult); + + if (!startResult.startDownload) { + // dcdebug("none\n"); + return result; + } - auto source = q->getSource(aSource.getUser()); + q = startResult.qi; + dcassert(q); + } - //update the hub hint - newUrl = aSource.getHubUrl(); - source->updateDownloadHubUrl(aOnlineHubs, newUrl, (q->isSet(QueueItem::FLAG_USER_LIST) && !q->isSet(QueueItem::FLAG_TTHLIST_BUNDLE))); + { + WLock l(cs); - //check partial sources - if (source->isSet(QueueItem::Source::FLAG_PARTIAL)) { - Segment segment = q->getNextSegment(q->getBlockSize(), aSource.getChunkSize(), aSource.getSpeed(), source->getPartsInfo(), false); - if (segment.getStart() != -1 && segment.getSize() == 0) { - // no other partial chunk from this user, remove him from queue - userQueue.removeQI(q, u); - q->removeSource(u, QueueItem::Source::FLAG_NO_NEED_PARTS); - lastError_ = STRING(NO_NEEDED_PART); - return nullptr; + // Check partial sources + auto source = q->getSource(user); + if (source->isSet(QueueItem::Source::FLAG_PARTIAL)) { + auto segment = q->getNextSegment(q->getBlockSize(), aSource.getChunkSize(), aSource.getSpeed(), source->getPartsInfo(), false); + if (segment.getStart() != -1 && segment.getSize() == 0) { + // dcdebug("no needed chunks)\n"); + // no other partial chunk from this user, remove him from queue + userQueue.removeQI(q, user); + q->removeSource(user, QueueItem::Source::FLAG_NO_NEED_PARTS); + result.lastError = STRING(NO_NEEDED_PART); + return result; + } } - } - // Check that the file we will be downloading to exists - if (q->getDownloadedBytes() > 0) { - if (!PathUtil::fileExists(q->getTempTarget())) { - // Temp target gone? - q->resetDownloaded(); + // Check that the file we will be downloading to exists + if (q->getDownloadedBytes() > 0) { + if (!PathUtil::fileExists(q->getTempTarget())) { + // Temp target gone? + q->resetDownloaded(); + } } - } - d = new Download(aSource, *q); - userQueue.addDownload(q, d); + result.download = new Download(aSource, *q); + userQueue.addDownload(q, result.download); + } } fire(QueueManagerListener::ItemSources(), q); - dcdebug("found %s for %s (" I64_FMT ", " I64_FMT ")\n", q->getTarget().c_str(), d->getToken().c_str(), d->getSegment().getStart(), d->getSegment().getEnd()); - return d; + dcdebug("QueueManager::getDownload: found %s for %s (segment " I64_FMT ", " I64_FMT ")\n", q->getTarget().c_str(), result.download->getToken().c_str(), result.download->getSegment().getStart(), result.download->getSegment().getEnd()); + return result; } -bool QueueManager::allowStartQI(const QueueItemPtr& aQI, const QueueTokenSet& runningBundles, string& lastError_, bool mcn /*false*/) noexcept{ - // nothing to download? - if (!aQI) - return false; - - // override the slot settings for partial lists and small files - if (aQI->usesSmallSlot()) +bool QueueManager::checkLowestPrioRules(const QueueItemPtr& aQI, const QueueTokenSet& aRunningBundles, string& lastError_) const noexcept { + auto& b = aQI->getBundle(); + if (!b) { return true; + } + if (b->getPriority() == Priority::LOWEST) { + // Don't start if there are other bundles running + if (!aRunningBundles.empty() && aRunningBundles.find(b->getToken()) == aRunningBundles.end()) { + lastError_ = STRING(LOWEST_PRIO_ERR_BUNDLES); + return false; + } + } - // paused? - if (aQI->isPausedPrio()) - return false; - + if (aQI->getPriority() == Priority::LOWEST) { + // Start only if there are no other downloads running in this bundle + // (or all bundle downloads belong to this file) + auto bundleDownloads = DownloadManager::getInstance()->getBundleDownloadConnectionCount(aQI->getBundle()); - //check if we have free space to continue the download now... otherwise results in paused priority.. - if (aQI->getBundle() && (aQI->getBundle()->getStatus() == Bundle::STATUS_DOWNLOAD_ERROR)) { - if (File::getFreeSpace(aQI->getBundle()->getTarget()) >= static_cast(aQI->getSize() - aQI->getDownloadedBytes())) { - setBundleStatus(aQI->getBundle(), Bundle::STATUS_QUEUED); - } else { - lastError_ = aQI->getBundle()->getError(); - onDownloadError(aQI->getBundle(), lastError_); + RLock l(cs); + auto start = bundleDownloads == 0 || bundleDownloads == aQI->getDownloads().size(); + if (!start) { + lastError_ = STRING(LOWEST_PRIO_ERR_FILES); return false; } } - size_t downloadCount = DownloadManager::getInstance()->getFileDownloadConnectionCount(); - bool slotsFull = (AirUtil::getSlots(true) != 0) && (downloadCount >= static_cast(AirUtil::getSlots(true))); - bool speedFull = (AirUtil::getSpeedLimitKbps(true) != 0) && (DownloadManager::getInstance()->getRunningAverage() >= Util::convertSize(AirUtil::getSpeedLimitKbps(true), Util::KB)); + return true; +} + +bool QueueManager::checkDownloadLimits(const QueueItemPtr& aQI, string& lastError_) const noexcept { + auto downloadSlots = AirUtil::getSlots(true); + auto downloadCount = static_cast(DownloadManager::getInstance()->getFileDownloadConnectionCount()); + bool slotsFull = downloadSlots != 0 && downloadCount >= downloadSlots; + + auto speedLimit = Util::convertSize(AirUtil::getSpeedLimitKbps(true), Util::KB); + auto downloadSpeed = DownloadManager::getInstance()->getRunningAverage(); + bool speedFull = speedLimit != 0 && downloadSpeed >= speedLimit; //log("Speedlimit: " + Util::toString(Util::getSpeedLimit(true)*1024) + " slots: " + Util::toString(Util::getSlots(true)) + " (avg: " + Util::toString(getRunningAverage()) + ")"); if (slotsFull || speedFull) { - size_t slots = AirUtil::getSlots(true); - bool extraFull = (slots != 0) && (downloadCount >= (slots + static_cast(SETTING(EXTRA_DOWNLOAD_SLOTS)))); - if (extraFull || mcn || aQI->getPriority() != Priority::HIGHEST) { - lastError_ = slotsFull ? STRING(ALL_DOWNLOAD_SLOTS_TAKEN) : STRING(MAX_DL_SPEED_REACHED); + bool extraFull = downloadSlots != 0 && downloadCount >= downloadSlots + SETTING(EXTRA_DOWNLOAD_SLOTS); + if (extraFull || aQI->getPriority() != Priority::HIGHEST) { + if (slotsFull) { + lastError_ = STRING(ALL_DOWNLOAD_SLOTS_TAKEN); + } else { + lastError_ = STRING(MAX_DL_SPEED_REACHED); + } return false; } - return true; } - // bundle with the lowest prio? don't start if there are other bundle running - if (aQI->getBundle() && aQI->getBundle()->getPriority() == Priority::LOWEST && !runningBundles.empty() && runningBundles.find(aQI->getBundle()->getToken()) == runningBundles.end()) { - lastError_ = STRING(LOWEST_PRIO_ERR_BUNDLES); - return false; - } - - if (aQI->getPriority() == Priority::LOWEST) { - if (aQI->getBundle()) { - // start only if there are no other downloads running in this bundle (or the downloads belong to this file) - auto bundleDownloads = DownloadManager::getInstance()->getBundleDownloadConnectionCount(aQI->getBundle()); + return true; +} - RLock l(cs); - bool start = bundleDownloads == 0 || bundleDownloads == aQI->getDownloads().size(); - if (!start) { - lastError_ = STRING(LOWEST_PRIO_ERR_FILES); - } +bool QueueManager::checkDiskSpace(const QueueItemPtr& aQI, string& lastError_) noexcept { + auto& b = aQI->getBundle(); + if (!b) { + return true; + } - return start; + //check if we have free space to continue the download now... otherwise results in paused priority.. + if (aQI->getBundle()->getStatus() == Bundle::STATUS_DOWNLOAD_ERROR) { + if (File::getFreeSpace(b->getTarget()) >= static_cast(aQI->getSize() - aQI->getDownloadedBytes())) { + setBundleStatus(b, Bundle::STATUS_QUEUED); } else { - // shouldn't happen at the moment - dcassert(0); - return downloadCount == 0; + lastError_ = b->getError(); + onDownloadError(b, lastError_); + return false; } } return true; } -bool QueueManager::startDownload(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs, - QueueItemBase::DownloadType aType, int64_t aLastSpeed, string& lastError_) noexcept{ +bool QueueManager::allowStartQI(const QueueItemPtr& aQI, const QueueTokenSet& aRunningBundles, string& lastError_) noexcept{ + // nothing to download? + if (!aQI) + return false; - bool hasDownload = false; - QueueItemPtr qi = nullptr; - { - RLock l(cs); - qi = userQueue.getNext(aUser, runningBundles, onlineHubs, lastError_, hasDownload, Priority::LOWEST, 0, aLastSpeed, aType); + // override the slot settings for partial lists and small files + if (aQI->usesSmallSlot()) + return true; + + + // paused? + if (aQI->isPausedPrio()) + return false; + + if (!checkDiskSpace(aQI, lastError_)) { + return false; } - return allowStartQI(qi, runningBundles, lastError_); -} + if (!checkDownloadLimits(aQI, lastError_)) { + return false; + } -pair QueueManager::startDownload(const UserPtr& aUser, string& hubHint, QueueItemBase::DownloadType aType, - QueueToken& bundleToken, bool& allowUrlChange, bool& hasDownload, string& lastError_) noexcept{ + if (!checkLowestPrioRules(aQI, aRunningBundles, lastError_)) { + return false; + } + return true; +} + +QueueDownloadResult QueueManager::startDownload(const HintedUser& aUser, QueueDownloadType aType) noexcept{ + auto hubs = ClientManager::getInstance()->getHubSet(aUser.user->getCID()); auto runningBundleTokens = DownloadManager::getInstance()->getRunningBundles(); - auto hubs = ClientManager::getInstance()->getHubSet(aUser->getCID()); - if (!hubs.empty()) { - QueueItemPtr qi = nullptr; - { - RLock l(cs); - qi = userQueue.getNext(aUser, runningBundleTokens, hubs, lastError_, hasDownload, Priority::LOWEST, 0, 0, aType); + return startDownload(aUser, aType, runningBundleTokens, hubs, 0); +} - if (qi) { - if (qi->getBundle()) { - bundleToken = qi->getBundle()->getToken(); - } +QueueDownloadResult QueueManager::startDownload(const HintedUser& aUser, QueueDownloadType aType, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs, int64_t aLastSpeed) noexcept { + QueueDownloadResult result; + if (aOnlineHubs.empty()) { + result.lastError = STRING(USER_OFFLINE); + return result; + } - if (hubs.find(hubHint) == hubs.end()) { - //we can't connect via a hub that is offline... - hubHint = *hubs.begin(); - } + QueueDownloadQuery query(aUser, aOnlineHubs, aRunningBundles); + query.lastSpeed = aLastSpeed; + query.downloadType = aType; - allowUrlChange = !qi->isSet(QueueItem::FLAG_USER_LIST); - qi->getSource(aUser)->updateDownloadHubUrl(hubs, hubHint, (qi->isSet(QueueItem::FLAG_USER_LIST) && !qi->isSet(QueueItem::FLAG_TTHLIST_BUNDLE))); - } - } + { + RLock l(cs); + auto qi = userQueue.getNext(query, result.lastError, result.hasDownload); if (qi) { - bool start = allowStartQI(qi, runningBundleTokens, lastError_); - return { qi->usesSmallSlot() ? QueueItem::TYPE_SMALL : QueueItem::TYPE_ANY, start }; + result.qi = qi; + if (qi->getBundle()) { + result.bundleToken = qi->getBundle()->getToken(); + } + + if (aOnlineHubs.find(aUser.hint) == aOnlineHubs.end()) { + //we can't connect via a hub that is offline... + result.hubHint = *aOnlineHubs.begin(); + } + + result.allowUrlChange = !qi->isSet(QueueItem::FLAG_USER_LIST); + + auto isFilelist = qi->isSet(QueueItem::FLAG_USER_LIST) && !qi->isSet(QueueItem::FLAG_TTHLIST_BUNDLE); + qi->getSource(aUser)->updateDownloadHubUrl(aOnlineHubs, result.hubHint, isFilelist); } - } else { - lastError_ = STRING(USER_OFFLINE); } - return { QueueItem::TYPE_NONE, false }; + if (result.qi) { + result.startDownload = allowStartQI(result.qi, aRunningBundles, result.lastError); + result.downloadType = result.qi->usesSmallSlot() ? QueueDownloadType::SMALL : QueueDownloadType::ANY; + } + + return result; } QueueItemList QueueManager::findFiles(const TTHValue& tth) const noexcept { @@ -1339,24 +1385,6 @@ void QueueManager::matchListing(const DirectoryListing& dl, int& matchingFiles_, newFiles_ = addValidatedSources(dl.getHintedUser(), matchingItems, QueueItem::Source::FLAG_FILE_NOT_AVAILABLE, bundles_); } -QueueItemPtr QueueManager::getQueueInfo(const HintedUser& aUser) noexcept { - OrderedStringSet hubs; - hubs.insert(aUser.hint); - - auto runningBundles = DownloadManager::getInstance()->getRunningBundles(); - - QueueItemPtr qi = nullptr; - string lastError_; - bool hasDownload = false; - - { - RLock l(cs); - qi = userQueue.getNext(aUser, runningBundles, hubs, lastError_, hasDownload); - } - - return qi; -} - void QueueManager::toggleSlowDisconnectBundle(QueueToken aBundleToken) noexcept { RLock l(cs); auto b = bundleQueue.findBundle(aBundleToken); @@ -1623,12 +1651,12 @@ void QueueManager::onDownloadError(const BundlePtr& aBundle, const string& aErro setBundleStatus(aBundle, Bundle::STATUS_DOWNLOAD_ERROR); } -void QueueManager::putDownloadHooked(Download* aDownload, bool aFinished, bool aNoAccess /*false*/, bool aRotateQueue /*false*/) { +void QueueManager::putDownloadHooked(Download* d, bool aFinished, bool aNoAccess /*false*/, bool aRotateQueue /*false*/) { QueueItemPtr q = nullptr; // Make sure the download gets killed - unique_ptr d(aDownload); - aDownload = nullptr; + // unique_ptr d(aDownload); + // aDownload = nullptr; d->close(); @@ -1656,13 +1684,13 @@ void QueueManager::putDownloadHooked(Download* aDownload, bool aFinished, bool a } if (!aFinished) { - onDownloadFailed(q, d.get(), aNoAccess, aRotateQueue); + onDownloadFailed(q, d, aNoAccess, aRotateQueue); } else if (q->isSet(QueueItem::FLAG_USER_LIST)) { - onFilelistDownloadCompletedHooked(q, d.get()); + onFilelistDownloadCompletedHooked(q, d); } else if (d->getType() == Transfer::TYPE_TREE) { - onTreeDownloadCompleted(q, d.get()); + onTreeDownloadCompleted(q, d); } else { - onFileDownloadCompleted(q, d.get()); + onFileDownloadCompleted(q, d); } } @@ -1732,7 +1760,7 @@ void QueueManager::onFileDownloadRemoved(const QueueItemPtr& aQI, bool aFailed) } else { delayEvents.addEvent(aQI->getBundle()->getToken(), [this, checkWaiting] { checkWaiting(); - }, 1000); + }, 1000); } } } @@ -1799,7 +1827,7 @@ void QueueManager::onFileDownloadCompleted(const QueueItemPtr& aQI, Download* aD aQI->addFinishedSegment(aDownload->getSegment()); wholeFileCompleted = aQI->segmentsDone(); - dcdebug("Finish segment for %s (" I64_FMT ", " I64_FMT ")\n", aDownload->getToken().c_str(), aDownload->getSegment().getStart(), aDownload->getSegment().getEnd()); + // dcdebug("Finish segment for %s (" I64_FMT ", " I64_FMT ")\n", aDownload->getToken().c_str(), aDownload->getSegment().getStart(), aDownload->getSegment().getEnd()); if (wholeFileCompleted) { // Disconnect all possible overlapped downloads @@ -2994,7 +3022,6 @@ void QueueManager::on(TimerManagerListener::Second, uint64_t aTick) noexcept { void QueueManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcept { tasks.addTask([=, this] { - // requestPartialSourceInfo(aTick); searchAlternates(aTick); checkResumeBundles(); }); diff --git a/airdcpp/QueueManager.h b/airdcpp/QueueManager.h index 7d9ff40a..4ab5bc0f 100644 --- a/airdcpp/QueueManager.h +++ b/airdcpp/QueueManager.h @@ -27,7 +27,6 @@ #include "TimerManagerListener.h" #include "ActionHook.h" -#include "QueueAddInfo.h" #include "BundleQueue.h" #include "DelayedEvents.h" #include "DupeType.h" @@ -36,6 +35,8 @@ #include "HashBloom.h" #include "MerkleTree.h" #include "Message.h" +#include "QueueAddInfo.h" +#include "QueueDownloadInfo.h" #include "Singleton.h" #include "StringMatch.h" #include "TaskQueue.h" @@ -194,30 +195,18 @@ class QueueManager : public Singleton, public SpeakergetSources(); } Bundle::SourceList getBadBundleSources(const BundlePtr& b) const noexcept { RLock l(cs); return b->getBadSources(); } - // Get information about the next valid file in the queue - // Used for displaying initial information for a transfer before the connection has been established and the real download is created - QueueItemPtr getQueueInfo(const HintedUser& aUser) noexcept; - // Check if a download can be started for the specified user - // - // lastError_ will contain the last error why a file can't be started (not cleared if a download is found afterwards) - // TODO: FINISH - bool startDownload(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs, - QueueItemBase::DownloadType aType, int64_t aLastSpeed, string& lastError_) noexcept; + QueueDownloadResult startDownload(const HintedUser& aUser, QueueDownloadType aType) noexcept; - // The same thing but only used before any connect requests - // newUrl can be changed if the download is for a filelist from a different hub - // lastError_ will contain the last error why a file can't be started (not cleared if a download is found afterwards) - // hasDownload will be set to true if there are any files queued from the user - // TODO: FINISH - pair startDownload(const UserPtr& aUser, string& hubUrl, QueueItemBase::DownloadType aType, QueueToken& bundleToken, - bool& allowUrlChange_, bool& hasDownload_, string& lastError_) noexcept; + struct DownloadResult : QueueDownloadResultBase { + Download* download = nullptr; + }; // Creates new download for the specified user - // This won't check various download limits so startDownload should be called first + // Runs all necessary validations in startDownload // newUrl can be changed if the download is for a filelist from a different hub // lastError_ will contain the last error why a file can't be started (not cleared if a download is found afterwards) - Download* getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs, string& lastError_, string& newUrl_, QueueItemBase::DownloadType aType) noexcept; + DownloadResult getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs) noexcept; // Handle an ended transfer // finished should be true if the file/segment was finished successfully (false if disconnected/failed). Always false for finished trees. @@ -225,7 +214,6 @@ class QueueManager : public Singleton, public Speaker, public Speaker udp; @@ -443,7 +432,15 @@ class QueueManager : public Singleton, public Speaker, public SpeakergetQueueInfo(aInfo->getHintedUser()); - if (!qi) { + auto result = QueueManager::getInstance()->startDownload(aInfo->getHintedUser(), QueueDownloadType::ANY); + if (!result.qi) { return; } auto type = Transfer::TYPE_FILE; - if (qi->getFlags() & QueueItem::FLAG_PARTIAL_LIST) + if (result.qi->getFlags() & QueueItem::FLAG_PARTIAL_LIST) type = Transfer::TYPE_PARTIAL_LIST; - else if (qi->getFlags() & QueueItem::FLAG_USER_LIST) + else if (result.qi->getFlags() & QueueItem::FLAG_USER_LIST) type = Transfer::TYPE_FULL_LIST; aInfo->setType(type); - aInfo->setTarget(qi->getTarget()); - aInfo->setSize(qi->getSize()); - aInfo->setQueueToken(qi->getToken()); + aInfo->setTarget(result.qi->getTarget()); + aInfo->setSize(result.qi->getSize()); + aInfo->setQueueToken(result.qi->getToken()); } void TransferInfoManager::on(ConnectionManagerListener::Connecting, const ConnectionQueueItem* aCqi) noexcept { @@ -308,6 +308,20 @@ namespace dcpp { starting(aDownload, STRING(REQUESTING), true); } + void TransferInfoManager::on(DownloadManagerListener::Idle, const UserConnection* aConn, const string& aError) noexcept { + if (aError.empty()) { + return; + } + + auto t = findTransfer(aConn->getToken()); + if (!t) { + return; + } + + t->setStatusString(aError); + onTransferUpdated(t, TransferInfo::UpdateFlags::STATUS); + } + void TransferInfoManager::starting(const Download* aDownload, const string& aStatus, bool aFullUpdate) noexcept { auto t = findTransfer(aDownload->getToken()); if (!t) { diff --git a/airdcpp/TransferInfoManager.h b/airdcpp/TransferInfoManager.h index 7f4c553a..0546f9b8 100644 --- a/airdcpp/TransferInfoManager.h +++ b/airdcpp/TransferInfoManager.h @@ -89,6 +89,7 @@ namespace dcpp { void on(DownloadManagerListener::Complete, const Download* aDownload, bool) noexcept override; void on(DownloadManagerListener::Failed, const Download* aDownload, const string& reason) noexcept override; void on(DownloadManagerListener::Requesting, const Download* aDownload, bool hubChanged) noexcept override; + void on(DownloadManagerListener::Idle, const UserConnection* aConn, const string& aError) noexcept override; void on(UploadManagerListener::Starting, const Upload* aUpload) noexcept override; void on(UploadManagerListener::Complete, const Upload* aUpload) noexcept override; diff --git a/airdcpp/UploadBundleInfoSender.cpp b/airdcpp/UploadBundleInfoSender.cpp index 660080a2..ad29914b 100644 --- a/airdcpp/UploadBundleInfoSender.cpp +++ b/airdcpp/UploadBundleInfoSender.cpp @@ -144,7 +144,7 @@ void UploadBundleInfoSender::on(DownloadManagerListener::Starting, const Downloa } }*/ -void UploadBundleInfoSender::on(DownloadManagerListener::Idle, const UserConnection* aSource) noexcept { +void UploadBundleInfoSender::on(DownloadManagerListener::Idle, const UserConnection* aSource, const string&) noexcept { removeRunningUser(aSource, false); } diff --git a/airdcpp/UploadBundleInfoSender.h b/airdcpp/UploadBundleInfoSender.h index 92527041..0f83b728 100644 --- a/airdcpp/UploadBundleInfoSender.h +++ b/airdcpp/UploadBundleInfoSender.h @@ -21,7 +21,6 @@ #include "CriticalSection.h" #include "Message.h" -#include "QueueItemBase.h" #include "User.h" #include "DownloadManagerListener.h" @@ -91,7 +90,7 @@ class UploadBundleInfoSender: public DownloadManagerListener, public QueueManage void on(DownloadManagerListener::Failed, const Download*, const string&) noexcept override; void on(DownloadManagerListener::BundleTick, const BundleList& aBundles, uint64_t aTick) noexcept override; void on(DownloadManagerListener::Remove, const UserConnection* aConn) noexcept override; - void on(DownloadManagerListener::Idle, const UserConnection* aConn) noexcept override; + void on(DownloadManagerListener::Idle, const UserConnection* aConn, const string& aError) noexcept override; unordered_map bundleTokenMap; diff --git a/airdcpp/UploadQueueManager.cpp b/airdcpp/UploadQueueManager.cpp index 75ccc574..b374f74c 100644 --- a/airdcpp/UploadQueueManager.cpp +++ b/airdcpp/UploadQueueManager.cpp @@ -56,12 +56,8 @@ UploadQueueItem::UploadQueueItem(const HintedUser& _user, const string& _file, i } void UploadQueueManager::connectUser(const HintedUser& aUser, const string& aToken) noexcept { - string lastError; - string hubUrl = aUser.hint; - bool protocolError = false; - ClientManager::getInstance()->connect(aUser.user, aToken, true, lastError, hubUrl, protocolError); - - //TODO: report errors? + // TODO: report errors? + ClientManager::getInstance()->connect(aUser, aToken, true); } void UploadQueueManager::connectUser(const HintedUser& aUser) noexcept { diff --git a/airdcpp/UserConnection.cpp b/airdcpp/UserConnection.cpp index 7a2f3238..d437dcd8 100644 --- a/airdcpp/UserConnection.cpp +++ b/airdcpp/UserConnection.cpp @@ -173,6 +173,17 @@ void UserConnection::setUseLimiter(bool aEnabled) noexcept { } } +void UserConnection::setState(States aNewState) noexcept { + if (aNewState == state) { + return; + } + + state = aNewState; + callAsync([this] { + fire(UserConnectionListener::State(), this); + }); +} + void UserConnection::setUser(const UserPtr& aUser) noexcept { user = aUser; @@ -458,6 +469,7 @@ UserConnection::UserConnection(bool secure_) noexcept : encoding(SETTING(NMDC_EN UserConnection::~UserConnection() { BufferedSocket::putSocket(socket); + dcdebug("User connection %s was deleted\n", getToken().c_str()); } } // namespace dcpp diff --git a/airdcpp/UserConnection.h b/airdcpp/UserConnection.h index 80b52dee..e380c499 100644 --- a/airdcpp/UserConnection.h +++ b/airdcpp/UserConnection.h @@ -72,8 +72,7 @@ class UserConnection : public Speaker, FLAG_SUPPORTS_ZLIB_GET = FLAG_SUPPORTS_ADCGET << 1, FLAG_SUPPORTS_TTHL = FLAG_SUPPORTS_ZLIB_GET <<1, FLAG_SUPPORTS_TTHF = FLAG_SUPPORTS_TTHL << 1, - FLAG_RUNNING = FLAG_SUPPORTS_TTHF << 1, - FLAG_SMALL_SLOT = FLAG_RUNNING << 1, + FLAG_SMALL_SLOT = FLAG_SUPPORTS_TTHF << 1, FLAG_TRUSTED = FLAG_SMALL_SLOT << 1 }; @@ -200,7 +199,7 @@ class UserConnection : public Speaker, IGETSET(int64_t, speed, Speed, 0); IGETSET(uint64_t, lastActivity, LastActivity, 0); GETSET(string, encoding, Encoding); - IGETSET(States, state, State, STATE_UNCONNECTED); + IGETPROP(States, state, State, STATE_UNCONNECTED); IGETSET(uint8_t, slotType, SlotType, NOSLOT); const BufferedSocket* getSocket() const noexcept { return socket; } @@ -218,6 +217,7 @@ class UserConnection : public Speaker, } void setUseLimiter(bool aEnabled) noexcept; + void setState(States aNewState) noexcept; private: void initSocket(); diff --git a/airdcpp/UserConnectionListener.h b/airdcpp/UserConnectionListener.h index dfcfcd74..c5695337 100644 --- a/airdcpp/UserConnectionListener.h +++ b/airdcpp/UserConnectionListener.h @@ -49,7 +49,8 @@ class UserConnectionListener { typedef X<18> Supports; typedef X<19> ProtocolError; typedef X<20> FileNotAvailable; - typedef X<21> ListLength; + typedef X<21> ListLength; + typedef X<22> State; virtual void on(BytesSent, UserConnection*, size_t, size_t) noexcept { } virtual void on(Connected, UserConnection*) noexcept { } @@ -71,6 +72,7 @@ class UserConnectionListener { virtual void on(ListLength, UserConnection*, const string&) noexcept { } virtual void on(PrivateMessage, UserConnection*, const ChatMessagePtr&) noexcept{} virtual void on(UserSet, UserConnection*) noexcept {} + virtual void on(State, UserConnection*) noexcept {} virtual void on(AdcCommand::SUP, UserConnection*, const AdcCommand&) noexcept { } virtual void on(AdcCommand::INF, UserConnection*, const AdcCommand&) noexcept { } diff --git a/airdcpp/UserQueue.cpp b/airdcpp/UserQueue.cpp index 3bb81af2..baa73e55 100644 --- a/airdcpp/UserQueue.cpp +++ b/airdcpp/UserQueue.cpp @@ -69,35 +69,35 @@ void UserQueue::getUserQIs(const UserPtr& aUser, QueueItemList& ql) noexcept{ } } -QueueItemPtr UserQueue::getNext(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs, - string& lastError_, bool& hasDownload, Priority minPrio, int64_t wantedSize, int64_t lastSpeed, QueueItemBase::DownloadType aType, - bool allowOverlap /*false*/) noexcept { +QueueItemPtr UserQueue::getNext(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload_, bool aAllowOverlap) noexcept { - /* Using the PAUSED priority will list all files */ - auto qi = getNextPrioQI(aUser, onlineHubs, 0, 0, aType, allowOverlap, lastError_); + // Using the PAUSED priority will list all files (?) + auto qi = getNextPrioQI(aQuery, lastError_, aAllowOverlap /*, 0, 0*/); if(!qi) { - qi = getNextBundleQI(aUser, runningBundles, onlineHubs, (Priority)minPrio, wantedSize, lastSpeed, aType, allowOverlap, lastError_, hasDownload); + qi = getNextBundleQI(aQuery, lastError_, hasDownload_, aAllowOverlap); } - if (!qi && !allowOverlap) { - //no free segments. let's do another round and now check if there are slow sources which can be overlapped - qi = getNext(aUser, runningBundles, onlineHubs, lastError_, hasDownload, minPrio, wantedSize, lastSpeed, aType, true); + if (!qi && !aAllowOverlap) { + // No free segments + // Let's do another round and check if there are slow sources which can be overlapped + qi = getNext(aQuery, lastError_, hasDownload_, true); + } + + if (qi) { + hasDownload_ = true; } - if (qi) - hasDownload = true; return qi; } -QueueItemPtr UserQueue::getNextPrioQI(const UserPtr& aUser, const OrderedStringSet& onlineHubs, int64_t wantedSize, int64_t lastSpeed, - QueueItemBase::DownloadType aType, bool allowOverlap, string& lastError_) noexcept{ +QueueItemPtr UserQueue::getNextPrioQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept{ lastError_ = Util::emptyString; - auto i = userPrioQueue.find(aUser); + auto i = userPrioQueue.find(aQuery.user); if(i != userPrioQueue.end()) { dcassert(!i->second.empty()); for(auto& q: i->second) { - if (q->hasSegment(aUser, onlineHubs, lastError_, wantedSize, lastSpeed, aType, allowOverlap)) { + if (q->hasSegment(aQuery, lastError_, aAllowOverlap)) { return q; } } @@ -105,28 +105,26 @@ QueueItemPtr UserQueue::getNextPrioQI(const UserPtr& aUser, const OrderedStringS return nullptr; } -QueueItemPtr UserQueue::getNextBundleQI(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs, - Priority minPrio, int64_t wantedSize, int64_t lastSpeed, QueueItemBase::DownloadType aType, bool allowOverlap, - string& lastError_, bool& hasDownload) noexcept{ +QueueItemPtr UserQueue::getNextBundleQI(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload_, bool aAllowOverlap) noexcept{ lastError_ = Util::emptyString; auto bundleLimit = SETTING(MAX_RUNNING_BUNDLES); - auto i = userBundleQueue.find(aUser); + auto i = userBundleQueue.find(aQuery.user); if(i != userBundleQueue.end()) { dcassert(!i->second.empty()); for (auto& b: i->second) { - if (bundleLimit > 0 && static_cast(runningBundles.size()) >= bundleLimit && runningBundles.find(b->getToken()) == runningBundles.end()) { - hasDownload = true; + if (bundleLimit > 0 && static_cast(aQuery.runningBundles.size()) >= bundleLimit && aQuery.runningBundles.find(b->getToken()) == aQuery.runningBundles.end()) { + hasDownload_ = true; lastError_ = STRING(MAX_BUNDLES_RUNNING); continue; } - if (b->getPriority() < minPrio) { + if (b->getPriority() < aQuery.minPrio) { break; } - auto qi = b->getNextQI(aUser, onlineHubs, lastError_, minPrio, wantedSize, lastSpeed, aType, allowOverlap); + auto qi = b->getNextQI(aQuery, lastError_, aAllowOverlap); if (qi) { return qi; } diff --git a/airdcpp/UserQueue.h b/airdcpp/UserQueue.h index bd0ef7aa..34d215a2 100644 --- a/airdcpp/UserQueue.h +++ b/airdcpp/UserQueue.h @@ -33,13 +33,9 @@ class UserQueue { void addQI(const QueueItemPtr& qi, const HintedUser& aUser, bool aIsBadSource = false) noexcept; void getUserQIs(const UserPtr& aUser, QueueItemList& ql) noexcept; - QueueItemPtr getNext(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs, string& lastError_, bool& hasDownload, - Priority minPrio = Priority::LOWEST, int64_t wantedSize = 0, int64_t lastSpeed = 0, QueueItemBase::DownloadType aType = QueueItem::TYPE_ANY, bool allowOverlap = false) noexcept; - QueueItemPtr getNextPrioQI(const UserPtr& aUser, const OrderedStringSet& onlineHubs, int64_t wantedSize, int64_t lastSpeed, - QueueItemBase::DownloadType aType, bool allowOverlap, string& lastError_) noexcept; - QueueItemPtr getNextBundleQI(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs, - Priority minPrio, int64_t wantedSize, int64_t lastSpeed, QueueItemBase::DownloadType aType, - bool allowOverlap, string& lastError_, bool& hasDownload) noexcept; + QueueItemPtr getNext(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload_, bool aAllowOverlap = false) noexcept; + QueueItemPtr getNextPrioQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept; + QueueItemPtr getNextBundleQI(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload, bool aAllowOverlap) noexcept; void addDownload(const QueueItemPtr& qi, Download* d) noexcept; void removeDownload(const QueueItemPtr& qi, const string& aToken) noexcept; diff --git a/airdcpp/forward.h b/airdcpp/forward.h index 708b74a1..da306d09 100644 --- a/airdcpp/forward.h +++ b/airdcpp/forward.h @@ -150,6 +150,8 @@ class OutputStream; class PrivateChat; typedef std::shared_ptr PrivateChatPtr; +typedef uint32_t QueueToken; +typedef unordered_set QueueTokenSet; class QueueItemBase; class QueueItem;