diff --git a/airdcpp.vcxproj b/airdcpp.vcxproj
index 5b38d1da..f70022dd 100644
--- a/airdcpp.vcxproj
+++ b/airdcpp.vcxproj
@@ -389,6 +389,7 @@
+
diff --git a/airdcpp.vcxproj.filters b/airdcpp.vcxproj.filters
index f3874821..7bbb9584 100644
--- a/airdcpp.vcxproj.filters
+++ b/airdcpp.vcxproj.filters
@@ -1195,6 +1195,9 @@
Header Files\favorites
+
+ Header Files\queue
+
diff --git a/airdcpp/Bundle.cpp b/airdcpp/Bundle.cpp
index 1c7f917f..621b2cb0 100644
--- a/airdcpp/Bundle.cpp
+++ b/airdcpp/Bundle.cpp
@@ -360,20 +360,20 @@ bool Bundle::addUserQueue(const QueueItemPtr& qi, const HintedUser& aUser, bool
}
}
-QueueItemPtr Bundle::getNextQI(const UserPtr& aUser, const OrderedStringSet& aOnlineHubs, string& aLastError, Priority aMinPrio, int64_t aWantedSize, int64_t aLastSpeed, QueueItemBase::DownloadType aType, bool aAllowOverlap) noexcept {
+QueueItemPtr Bundle::getNextQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept {
int p = static_cast(Priority::LAST) - 1;
do {
- auto i = userQueue[p].find(aUser);
+ auto i = userQueue[p].find(aQuery.user);
if(i != userQueue[p].end()) {
dcassert(!i->second.empty());
for(auto& qi: i->second) {
- if (qi->hasSegment(aUser, aOnlineHubs, aLastError, aWantedSize, aLastSpeed, aType, aAllowOverlap)) {
+ if (qi->hasSegment(aQuery, lastError_, aAllowOverlap)) {
return qi;
}
}
}
p--;
- } while(p >= static_cast(aMinPrio));
+ } while(p >= static_cast(aQuery.minPrio));
return nullptr;
}
diff --git a/airdcpp/Bundle.h b/airdcpp/Bundle.h
index 55d132be..e198da79 100644
--- a/airdcpp/Bundle.h
+++ b/airdcpp/Bundle.h
@@ -26,6 +26,7 @@
#include "MerkleTree.h"
#include "User.h"
+#include "QueueDownloadInfo.h"
#include "QueueItemBase.h"
namespace dcpp {
@@ -215,7 +216,7 @@ class Bundle : public QueueItemBase {
/** All queue items indexed by user */
void addUserQueue(const QueueItemPtr& qi) noexcept;
bool addUserQueue(const QueueItemPtr& qi, const HintedUser& aUser, bool aIsBad = false) noexcept;
- QueueItemPtr getNextQI(const UserPtr& aUser, const OrderedStringSet& aOnlineHubs, string& aLastError, Priority aMinPrio, int64_t aWantedSize, int64_t aLastSpeed, QueueItemBase::DownloadType aType, bool allowOverlap) noexcept;
+ QueueItemPtr getNextQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept;
void getItems(const UserPtr& aUser, QueueItemList& ql) const noexcept;
QueueItemList getFailedItems() const noexcept;
diff --git a/airdcpp/ClientManager.cpp b/airdcpp/ClientManager.cpp
index 976bb655..f1d078f6 100644
--- a/airdcpp/ClientManager.cpp
+++ b/airdcpp/ClientManager.cpp
@@ -755,72 +755,71 @@ OnlineUserPtr ClientManager::findOnlineUser(const CID& cid, const string& hintUr
return aAllowFallback ? p.first->second : nullptr;
}
-bool ClientManager::connect(const UserPtr& aUser, const string& aToken, bool aAllowUrlChange, string& lastError_, string& hubHint_, bool& isProtocolError_, ConnectionType aConnType) const noexcept {
+ClientManager::ConnectResult ClientManager::connect(const HintedUser& aUser, const string& aToken, bool aAllowUrlChange, ConnectionType aConnType) const noexcept {
+ ConnectResult result;
+
RLock l(cs);
- auto op = onlineUsers.equal_range(const_cast(&aUser->getCID()));
+ auto op = onlineUsers.equal_range(const_cast(&aUser.user->getCID()));
auto connectUser = [&] (OnlineUser* ou) -> bool {
- isProtocolError_ = false;
+ result.resetError();
+ // result.isProtocolError = false;
- auto ret = ou->getClient()->connect(*ou, aToken, lastError_);
+ string connectError;
+ auto ret = ou->getClient()->connect(*ou, aToken, connectError);
if (ret == AdcCommand::SUCCESS) {
return true;
}
//get the error string
if (ret == AdcCommand::ERROR_TLS_REQUIRED) {
- isProtocolError_ = true;
- lastError_ = STRING(SOURCE_NO_ENCRYPTION);
+ result.onProtocolError(STRING(SOURCE_NO_ENCRYPTION));
} else if (ret == AdcCommand::ERROR_PROTOCOL_UNSUPPORTED) {
- isProtocolError_ = true;
- lastError_ = STRING_F(REMOTE_PROTOCOL_UNSUPPORTED, lastError_);
+ result.onProtocolError(STRING_F(REMOTE_PROTOCOL_UNSUPPORTED, connectError));
} else if (ret == AdcCommand::ERROR_BAD_STATE) {
- lastError_ = STRING(CONNECTING_IN_PROGRESS);
+ result.onMinorError(STRING(CONNECTING_IN_PROGRESS));
} else if (ret == AdcCommand::ERROR_FEATURE_MISSING) {
- isProtocolError_ = true;
- lastError_ = STRING(NO_NATT_SUPPORT);
+ result.onProtocolError(STRING(NO_NATT_SUPPORT));
} else if (ret == AdcCommand::ERROR_PROTOCOL_GENERIC) {
- isProtocolError_ = true;
- lastError_ = STRING(UNABLE_CONNECT_USER);
+ result.onProtocolError(STRING(UNABLE_CONNECT_USER));
}
return false;
};
if (aConnType == CONNECTION_TYPE_PM) {
- if (!aUser->isSet(User::TLS)) {
- isProtocolError_ = true;
- lastError_ = STRING(SOURCE_NO_ENCRYPTION);
- return false;
+ if (!aUser.user->isSet(User::TLS)) {
+ result.onProtocolError(STRING(SOURCE_NO_ENCRYPTION));
+ return result;
}
// We don't care which hub we use to establish the connection all we need to know is the user supports the CCPM feature.
- if (!aUser->isSet(User::CCPM)) {
- isProtocolError_ = true;
- lastError_ = STRING(CCPM_NOT_SUPPORTED);
- return false;
+ if (!aUser.user->isSet(User::CCPM)) {
+ result.onProtocolError(STRING(CCPM_NOT_SUPPORTED));
+ return result;
}
}
// Prefer the hinted hub
- auto p = ranges::find_if(op | pair_to_range, [&hubHint_](const auto& ouc) { return ouc.second->getHubUrl() == hubHint_; });
+ auto p = ranges::find_if(op | pair_to_range, [&aUser](const auto& ouc) { return ouc.second->getHubUrl() == aUser.hint; });
if (p != op.second && connectUser(p->second)) {
- return true;
+ result.onSuccess(aUser.hint);
+ return result;
}
if (!aAllowUrlChange) {
- return false;
+ return result;
}
// Connect via any available hub
for (auto i = op.first; i != op.second; ++i) {
if (connectUser(i->second)) {
- hubHint_ = i->second->getHubUrl();
- return true;
+ result.onSuccess(i->second->getHubUrl());
+ return result;
}
}
- return false;
+ return result;
}
bool ClientManager::privateMessageHooked(const HintedUser& aUser, const OutgoingChatMessage& aMessage, string& error_, bool aEcho) noexcept {
diff --git a/airdcpp/ClientManager.h b/airdcpp/ClientManager.h
index 2c01b5c8..0d18b971 100644
--- a/airdcpp/ClientManager.h
+++ b/airdcpp/ClientManager.h
@@ -230,7 +230,37 @@ class ClientManager : public Speaker,
bool sendUDPHooked(AdcCommand& c, const CID& to, bool aNoCID = false, bool aNoPassive = false, const string& aEncryptionKey = Util::emptyString, const string& aHubUrl = Util::emptyString) noexcept;
bool sendUDP(const string& aData, const string& aIP, const string& aPort) noexcept;
- bool connect(const UserPtr& aUser, const string& aToken, bool aAllowUrlChange, string& lastError_, string& hubHint_, bool& isProtocolError_, ConnectionType type = CONNECTION_TYPE_LAST) const noexcept;
+ struct ConnectResult {
+ public:
+ void onSuccess(const string& aHubHint) noexcept {
+ success = true;
+ hubHint = aHubHint;
+ }
+
+ void onMinorError(const string& aError) noexcept {
+ lastError = aError;
+ protocolError = false;
+ }
+
+ void onProtocolError(const string& aError) noexcept {
+ lastError = aError;
+ protocolError = true;
+ }
+
+ void resetError() noexcept {
+ lastError = Util::emptyString;
+ protocolError = false;
+ }
+
+
+ GETPROP(string, lastError, Error);
+ IGETPROP(bool, protocolError, IsProtocolError, false);
+
+ GETPROP(string, hubHint, HubHint);
+ IGETPROP(bool, success, IsSuccess, false);
+ };
+
+ ConnectResult connect(const HintedUser& aUser, const string& aToken, bool aAllowUrlChange, ConnectionType type = CONNECTION_TYPE_LAST) const noexcept;
bool privateMessageHooked(const HintedUser& aUser, const OutgoingChatMessage& aMessage, string& error_, bool aEcho = true) noexcept;
void userCommand(const HintedUser& aUser, const UserCommand& uc, ParamMap& params_, bool aCompatibility) noexcept;
diff --git a/airdcpp/ConnectionManager.cpp b/airdcpp/ConnectionManager.cpp
index dcb15213..0d25d0b5 100644
--- a/airdcpp/ConnectionManager.cpp
+++ b/airdcpp/ConnectionManager.cpp
@@ -27,7 +27,6 @@
#include "LogManager.h"
#include "QueueManager.h"
#include "ResourceManager.h"
-#include "ScopedFunctor.h"
#include "UploadManager.h"
#include "UserConnection.h"
#include "ValueGenerator.h"
@@ -126,9 +125,50 @@ ConnectionQueueItem::ConnectionQueueItem(const HintedUser& aUser, ConnectionType
}
bool ConnectionQueueItem::allowNewConnections(int aRunning) const noexcept {
- return (aRunning < AirUtil::getSlotsPerUser(true) || AirUtil::getSlotsPerUser(true) == 0) && (aRunning < maxConns || maxConns == 0);
+ if (maxRemoteConns != 0 && aRunning >= maxRemoteConns) {
+ return false;
+ }
+
+ auto maxOwnConns = AirUtil::getSlotsPerUser(true);
+ if (maxOwnConns != 0 && aRunning >= maxOwnConns) {
+ return false;
+ }
+
+ return true;
}
+bool ConnectionQueueItem::isSmallSlot() const noexcept {
+ return downloadType == QueueDownloadType::SMALL;
+}
+
+bool ConnectionQueueItem::isActive() const noexcept {
+ return state == State::ACTIVE;
+}
+
+bool ConnectionQueueItem::isMcn() const noexcept {
+ return isSet(FLAG_MCN);
+}
+
+bool ConnectionQueueItem::allowConnect(int aAttempts, int aAttemptLimit, uint64_t aTick) const noexcept {
+ // No attempts?
+ if (lastAttempt == 0 && aAttempts < aAttemptLimit * 2) {
+ return true;
+ }
+
+ // Enough time ellapsed since the last attempt?
+ return (aAttemptLimit == 0 || aAttempts < aAttemptLimit) &&
+ lastAttempt + 60 * 1000 * max(1, errors) < aTick;
+}
+
+bool ConnectionQueueItem::isTimeout(uint64_t aTick) const noexcept {
+ return state == ConnectionQueueItem::State::CONNECTING && lastAttempt + 50 * 1000 < aTick;
+}
+
+void ConnectionQueueItem::resetFatalError() noexcept {
+ if (getLastAttempt() == -1) {
+ setLastAttempt(0);
+ }
+}
/**
* Request a connection for downloading.
@@ -143,63 +183,79 @@ void ConnectionManager::getDownloadConnection(const HintedUser& aUser, bool aSma
return;
}
- bool supportMcn = false;
+ {
+ WLock l(cs);
+ if (!allowNewMCNUnsafe(aUser, aSmallSlot, [](ConnectionQueueItem* aWaitingCQI) {
+ // Force in case we joined a new hub and there was a protocol error
+ aWaitingCQI->resetFatalError();
+ })) {
+ return;
+ }
+
+ auto cqi = getCQIUnsafe(aUser, CONNECTION_TYPE_DOWNLOAD);
+ if (aSmallSlot) {
+ cqi->setDownloadType(QueueDownloadType::SMALL);
+ }
+
+ dcdebug("DownloadManager::getDownloadConnection: created new item %s for user %s (small slot: %s)\n", cqi->getToken().c_str(), ClientManager::getInstance()->getFormatedNicks(aUser).c_str(), aSmallSlot ? "true" : "false");
+ }
+}
+bool ConnectionManager::allowNewMCNUnsafe(const UserPtr& aUser, bool aSmallSlot, ConnectionQueueItemCallback&& aWaitingCallback) noexcept {
+ // We need to check if we have queued something also while the small file connection was being established
ConnectionQueueItem* cqi = nullptr;
- int running = 0;
+ int runningNormal = 0;
+ auto supportMcn = false;
- {
- WLock l(cs);
- for(const auto& i: downloads) {
- cqi = i;
- if (cqi->getUser() != aUser.user || cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
- continue;
- }
+ for(const auto& i: downloads) {
+ cqi = i;
+ if (cqi->getUser() != aUser) {
+ continue;
+ }
- if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL) {
- supportMcn = true;
- if (cqi->getState() != ConnectionQueueItem::RUNNING) {
- //already has a waiting item? small slot doesn't count
- if (!aSmallSlot) {
- // force in case we joined a new hub and there was a protocol error
- if (cqi->getLastAttempt() == -1) {
- cqi->setLastAttempt(0);
- }
- return;
- }
- } else {
- running++;
- }
- } else if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF) {
- supportMcn = true;
- //no need to continue with small slot if an item with the same type exists already (no mather whether it's running or not)
- if (aSmallSlot) {
- // force in case we joined a new hub and there was a protocol error
- if (cqi->getLastAttempt() == -1) {
- cqi->setLastAttempt(0);
+ if (!cqi->isMcn()) {
+ // We already have a connection, no need to continue
+ return false;
+ }
+
+ supportMcn = true;
+
+ if (cqi->getDownloadType() == QueueDownloadType::MCN_NORMAL) {
+ if (!cqi->isActive()) {
+ // Already has a waiting item? Small slot doesn't count
+ if (!aSmallSlot) {
+ // Force in case we joined a new hub and there was a protocol error
+ if (aWaitingCallback) {
+ aWaitingCallback(cqi);
}
- return;
+ return false;
}
} else {
- //no need to continue with non-MCN users
- return;
+ runningNormal++;
}
- }
+ } else if (cqi->getDownloadType() == QueueDownloadType::SMALL) {
+ // No need to continue with small slot if an item with the same type exists already
+ // (regardless of whether it's running or not)
+ if (aSmallSlot) {
+ // Force in case we joined a new hub and there was a protocol error
+ if (!cqi->isActive() && aWaitingCallback) {
+ aWaitingCallback(cqi);
+ }
- if (supportMcn && !aSmallSlot && !cqi->allowNewConnections(running)) {
- return;
+ return false;
+ }
}
+ }
- //WLock l (cs);
- dcdebug("Get cqi");
- cqi = getCQI(aUser, CONNECTION_TYPE_DOWNLOAD);
- if (aSmallSlot) {
- cqi->setDownloadType(supportMcn ? ConnectionQueueItem::DownloadType::SMALL_CONF : ConnectionQueueItem::DownloadType::SMALL);
- }
+ if (supportMcn && !aSmallSlot && !cqi->allowNewConnections(runningNormal)) {
+ return false;
}
+
+ return true;
}
-ConnectionQueueItem* ConnectionManager::getCQI(const HintedUser& aUser, ConnectionType aConnType, const string& aToken) noexcept {
+
+ConnectionQueueItem* ConnectionManager::getCQIUnsafe(const HintedUser& aUser, ConnectionType aConnType, const string& aToken) noexcept {
auto& container = cqis[aConnType];
auto cqi = new ConnectionQueueItem(aUser, aConnType, !aToken.empty() ? aToken : tokens.createToken(aConnType));
container.emplace_back(cqi);
@@ -209,9 +265,7 @@ ConnectionQueueItem* ConnectionManager::getCQI(const HintedUser& aUser, Connecti
return cqi;
}
-void ConnectionManager::putCQI(ConnectionQueueItem* cqi) noexcept {
- //allways called from inside lock
-
+void ConnectionManager::putCQIUnsafe(ConnectionQueueItem* cqi) noexcept {
fire(ConnectionManagerListener::Removed(), cqi);
auto& container = cqis[cqi->getConnType()];
@@ -283,156 +337,157 @@ void ConnectionManager::on(TimerManagerListener::Second, uint64_t aTick) noexcep
for(auto& m: removedTokens) {
auto s = find(downloads.begin(), downloads.end(), m);
if (s != downloads.end()) {
- putCQI(*s);
+ putCQIUnsafe(*s);
}
}
}
}
void ConnectionManager::attemptDownloads(uint64_t aTick, StringList& removedTokens_) noexcept {
- RLock l(cs);
int attemptLimit = SETTING(DOWNCONN_PER_SEC);
- uint16_t attempts = 0;
- for (auto cqi : downloads) {
- if (!cqi->isActive()) {
- if (!cqi->getUser().user->isOnline() || cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
- removedTokens_.push_back(cqi->getToken());
- continue;
- }
-
- if (cqi->getErrors() == -1 && cqi->getLastAttempt() != 0) {
- // protocol error, don't reconnect except after a forced attempt
- continue;
- }
-
- if ((cqi->getLastAttempt() == 0 && attempts < attemptLimit * 2) || ((attemptLimit == 0 || attempts < attemptLimit) &&
- cqi->getLastAttempt() + 60 * 1000 * max(1, cqi->getErrors()) < aTick))
- {
- // TODO: no one can understand this code, fix!
- ScopedFunctor([=] { cqi->setLastAttempt(aTick); });
-
- QueueToken bundleToken = 0;
- string lastError, hubHint = cqi->getHubUrl();
- bool allowUrlChange = true;
- bool hasDownload = false;
-
- auto type = cqi->isSmallSlot() ? QueueItem::TYPE_SMALL : cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL ? QueueItem::TYPE_MCN_NORMAL : QueueItem::TYPE_ANY;
-
- //we'll also validate the hubhint (and that the user is online) before making any connection attempt
- auto startDown = QueueManager::getInstance()->startDownload(cqi->getUser(), hubHint, type, bundleToken, allowUrlChange, hasDownload, lastError);
- if (!hasDownload && cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL && count_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* aCQI) { return aCQI != cqi && aCQI->getUser() == cqi->getUser(); }) == 0) {
- //the small file finished already? try with any type
- cqi->setDownloadType(ConnectionQueueItem::DownloadType::ANY);
- startDown = QueueManager::getInstance()->startDownload(cqi->getUser(), hubHint, QueueItem::TYPE_ANY,
- bundleToken, allowUrlChange, hasDownload, lastError);
- } else if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::ANY && startDown.first == QueueItem::TYPE_SMALL &&
- count_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* aCQI) {
- return aCQI->getUser() == cqi->getUser() && cqi->isSmallSlot();
- }) == 0) {
- // a small file has been added after the CQI was created
- cqi->setDownloadType(ConnectionQueueItem::DownloadType::SMALL);
- }
+ int attempts = 0;
+ RLock l(cs);
+ for (auto cqi : downloads) {
+ // Already active?
+ if (cqi->isActive()) {
+ continue;
+ }
- if (!hasDownload) {
- removedTokens_.push_back(cqi->getToken());
- continue;
- }
+ // Removing?
+ if (!cqi->getUser().user->isOnline()) {
+ removedTokens_.push_back(cqi->getToken());
+ continue;
+ }
- cqi->setLastBundle(bundleToken != 0 ? Util::toString(bundleToken) : Util::emptyString);
- cqi->setHubUrl(hubHint);
-
- if (cqi->getState() == ConnectionQueueItem::WAITING ||
- // Forcing the connection and it's not connected yet? Retry
- (cqi->getLastAttempt() == 0 && cqi->getState() == ConnectionQueueItem::CONNECTING && find(userConnections.begin(), userConnections.end(), cqi->getToken()) == userConnections.end())
- ) {
- if (startDown.second) {
- cqi->setState(ConnectionQueueItem::CONNECTING);
- bool protocolError = false;
-
- if (!ClientManager::getInstance()->connect(cqi->getUser(), cqi->getToken(), allowUrlChange, lastError, hubHint, protocolError)) {
- cqi->setState(ConnectionQueueItem::WAITING);
- cqi->setErrors(protocolError ? -1 : (cqi->getErrors() + 1)); // protocol error
- dcassert(!lastError.empty());
- fire(ConnectionManagerListener::Failed(), cqi, lastError);
- } else {
- cqi->setHubUrl(hubHint);
- fire(ConnectionManagerListener::Connecting(), cqi);
- attempts++;
- }
- } else {
- fire(ConnectionManagerListener::Failed(), cqi, lastError);
- }
- }
- } else if (cqi->getState() == ConnectionQueueItem::CONNECTING && cqi->getLastAttempt() + 50 * 1000 < aTick) {
+ // No attempts?
+ if (cqi->getErrors() == -1 && cqi->getLastAttempt() != 0) {
+ // protocol error, don't reconnect except after a forced attempt
+ continue;
+ }
+ // Not enough time since the last attempt?
+ if (!cqi->allowConnect(attempts, attemptLimit, aTick)) {
+ if (cqi->isTimeout(aTick)) {
cqi->setErrors(cqi->getErrors() + 1);
fire(ConnectionManagerListener::Failed(), cqi, STRING(CONNECTION_TIMEOUT));
- cqi->setState(ConnectionQueueItem::WAITING);
+ cqi->setState(ConnectionQueueItem::State::WAITING);
}
- } else if (cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
- cqi->unsetFlag(ConnectionQueueItem::FLAG_REMOVE);
+
+ continue;
+ }
+
+ // Try to connect
+ if (attemptDownloadUnsafe(cqi, removedTokens_)) {
+ attempts++;
}
+
+ cqi->setLastAttempt(aTick);
}
}
+bool ConnectionManager::attemptDownloadUnsafe(ConnectionQueueItem* cqi, StringList& removedTokens_) noexcept {
+
+ // We'll also validate the hubhint (and that the user is online) before making any connection attempt
+ auto startResult = QueueManager::getInstance()->startDownload(cqi->getUser(), cqi->getDownloadType());
+ if (!startResult.hasDownload && cqi->getDownloadType() == QueueDownloadType::SMALL && ranges::none_of(downloads, [&](const ConnectionQueueItem* aCQI) { return aCQI != cqi && aCQI->getUser() == cqi->getUser(); })) {
+ // The small file finished already? Try with any type
+ cqi->setDownloadType(QueueDownloadType::ANY);
+ startResult = QueueManager::getInstance()->startDownload(cqi->getUser(), QueueDownloadType::ANY);
+ } else if (
+ cqi->getDownloadType() == QueueDownloadType::ANY &&
+ startResult.downloadType == QueueDownloadType::SMALL &&
+ ranges::none_of(downloads, [&](const ConnectionQueueItem* aCQI) {
+ return aCQI->getUser() == cqi->getUser() && cqi->isSmallSlot();
+ })
+ ) {
+ // a Small file has been added after the CQI was created
+ cqi->setDownloadType(QueueDownloadType::SMALL);
+ }
-void ConnectionManager::addRunningMCN(const UserConnection *aSource) noexcept {
- {
- RLock l(cs);
- auto i = find(downloads.begin(), downloads.end(), aSource->getToken());
- if (i != downloads.end()) {
- ConnectionQueueItem* cqi = *i;
- cqi->setState(ConnectionQueueItem::RUNNING);
- //LogManager::getInstance()->message("Running downloads for the user: " + Util::toString(runningDownloads[aSource->getUser()]));
+ // No files to download from this user?
+ if (!startResult.hasDownload) {
+ dcdebug("ConnectionManager::attemptDownload: no downloads from user %s (conn %s), removing (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(cqi->getUser()).c_str(), cqi->getToken().c_str(), cqi->isSmallSlot() ? "true" : "false");
+ removedTokens_.push_back(cqi->getToken());
+ return false;
+ }
- if (!allowNewMCN(cqi))
- return;
+ cqi->setLastBundle(startResult.bundleToken ? Util::toString(*startResult.bundleToken) : Util::emptyString);
+ cqi->setHubUrl(startResult.hubHint);
+
+ if (cqi->getState() == ConnectionQueueItem::State::WAITING ||
+ // Forcing the connection and it's not connected yet? Retry
+ (cqi->getLastAttempt() == 0 && cqi->getState() == ConnectionQueueItem::State::CONNECTING && find(userConnections.begin(), userConnections.end(), cqi->getToken()) == userConnections.end())
+ ) {
+ if (startResult.startDownload) {
+ return connectUnsafe(cqi, startResult.allowUrlChange);
+ } else {
+ // Download limits full or similar temporary error
+ dcdebug("ConnectionManager::attemptDownload: can't start download from user %s (connection %s): %s (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(cqi->getUser()).c_str(), cqi->getToken().c_str(), startResult.lastError.c_str(), cqi->isSmallSlot() ? "true" : "false");
+ fire(ConnectionManagerListener::Failed(), cqi, startResult.lastError);
}
}
- createNewMCN(aSource->getHintedUser());
+ return false;
}
-bool ConnectionManager::allowNewMCN(const ConnectionQueueItem* aCQI) noexcept {
- //we need to check if we have queued something also while the small file connection was being established
- if (!aCQI->isMcn()) {
+bool ConnectionManager::connectUnsafe(ConnectionQueueItem* cqi, bool aAllowUrlChange) noexcept {
+ cqi->setState(ConnectionQueueItem::State::CONNECTING);
+
+ auto connectResult = ClientManager::getInstance()->connect(cqi->getUser(), cqi->getToken(), aAllowUrlChange);
+ if (!connectResult.getIsSuccess()) {
+ cqi->setState(ConnectionQueueItem::State::WAITING);
+ cqi->setErrors(connectResult.getIsProtocolError() ? -1 : (cqi->getErrors() + 1)); // protocol error
+ dcassert(!connectResult.getError().empty());
+ fire(ConnectionManagerListener::Failed(), cqi, connectResult.getError());
return false;
}
- //count the running MCN connections
- int running = 0;
- for(const auto& cqi: downloads) {
- if (cqi->getUser() == aCQI->getUser() && cqi->getDownloadType() != ConnectionQueueItem::DownloadType::SMALL_CONF && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
- if (!cqi->isActive()) {
- return false;
- }
- running++;
+ // Success
+ cqi->setHubUrl(connectResult.getHubHint());
+ fire(ConnectionManagerListener::Connecting(), cqi);
+ return true;
+}
+
+void ConnectionManager::onDownloadRunning(const UserConnection *aSource) noexcept {
+ {
+ RLock l(cs);
+ auto cqi = findDownloadUnsafe(aSource);
+ if (!cqi || cqi->isSet(ConnectionQueueItem::FLAG_RUNNING)) {
+ return;
}
- }
- if (running > 0 && aCQI->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF)
- return false;
+ cqi->setFlag(ConnectionQueueItem::FLAG_RUNNING);
+ if (!cqi->isMcn()) {
+ return;
+ }
- if (!aCQI->allowNewConnections(running) && !aCQI->isSet(ConnectionQueueItem::FLAG_REMOVE))
- return false;
+ if (!allowNewMCNUnsafe(cqi->getUser(), false)) {
+ dcdebug("ConnectionManager::addRunningMCN: can't add new connections for user %s, conn %s (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(aSource->getHintedUser()).c_str(), cqi->getToken().c_str(), cqi->isSmallSlot() ? "true" : "false");
+ return;
+ }
+ }
- return true;
+ createNewMCN(aSource->getHintedUser());
}
void ConnectionManager::createNewMCN(const HintedUser& aUser) noexcept {
- auto runningBundles = DownloadManager::getInstance()->getRunningBundles();
+ auto result = QueueManager::getInstance()->startDownload(aUser, QueueDownloadType::MCN_NORMAL);
+ if (!result.hasDownload) {
+ dcdebug("ConnectionManager::createNewMCN: no downloads from user %s (type normal)\n", ClientManager::getInstance()->getFormatedNicks(aUser).c_str());
+ return;
+ }
- string lastError;
- auto start = QueueManager::getInstance()->startDownload(aUser, runningBundles,
- ClientManager::getInstance()->getHubSet(aUser.user->getCID()), QueueItem::TYPE_MCN_NORMAL, 0, lastError); // don't overlap...
- if (start) {
+ {
WLock l (cs);
- ConnectionQueueItem* cqiNew = getCQI(aUser, CONNECTION_TYPE_DOWNLOAD);
- cqiNew->setDownloadType(ConnectionQueueItem::DownloadType::MCN_NORMAL);
+ auto cqiNew = getCQIUnsafe(aUser, CONNECTION_TYPE_DOWNLOAD);
+ cqiNew->setDownloadType(QueueDownloadType::MCN_NORMAL);
+
+ dcdebug("ConnectionManager::createNewMCN: creating new connection for user %s\n", ClientManager::getInstance()->getFormatedNicks(aUser).c_str());
}
}
+#define MAX_UC_INACTIVITY_SECONDS 180
void ConnectionManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcept {
WLock l(cs);
for (auto i = removedDownloadTokens.begin(); i != removedDownloadTokens.end();) {
@@ -446,12 +501,13 @@ void ConnectionManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcep
for(auto j: userConnections) {
if (j->isSet(UserConnection::FLAG_PM)) {
//Send a write check to the socket to detect half connected state, a good interval?
- if ((j->getLastActivity() + 180 * 1000) < aTick) {
+ if ((j->getLastActivity() + MAX_UC_INACTIVITY_SECONDS * 1000) < aTick) {
AdcCommand c(AdcCommand::CMD_PMI);
c.addParam("\n");
j->send(c);
}
- } else if ((j->getLastActivity() + 180 * 1000) < aTick) {
+ } else if ((j->getLastActivity() + MAX_UC_INACTIVITY_SECONDS * 1000) < aTick) {
+ dcdebug("ConnectionManager::timer: disconnecting an inactive connection %s for user %s\n", j->getToken().c_str(), ClientManager::getInstance()->getFormatedNicks(j->getHintedUser()).c_str());
j->disconnect(true);
}
}
@@ -754,7 +810,7 @@ void ConnectionManager::on(UserConnectionListener::MyNick, UserConnection* aSour
RLock l(cs);
for(auto cqi: downloads) {
cqi->setErrors(0);
- if((cqi->getState() == ConnectionQueueItem::CONNECTING || cqi->getState() == ConnectionQueueItem::WAITING) &&
+ if(!cqi->isActive() &&
cqi->getUser().user->getCID() == cid)
{
aSource->setUser(cqi->getUser());
@@ -855,8 +911,9 @@ void ConnectionManager::addPMConnection(UserConnection* uc) noexcept {
if (i == container.end()) {
dcassert(!uc->getToken().empty());
uc->setFlag(UserConnection::FLAG_ASSOCIATED);
- auto cqi = getCQI(uc->getHintedUser(), CONNECTION_TYPE_PM, uc->getToken());
- cqi->setState(ConnectionQueueItem::ACTIVE);
+
+ auto cqi = getCQIUnsafe(uc->getHintedUser(), CONNECTION_TYPE_PM, uc->getToken());
+ cqi->setState(ConnectionQueueItem::State::ACTIVE);
fire(ConnectionManagerListener::Connected(), cqi, uc);
@@ -864,6 +921,7 @@ void ConnectionManager::addPMConnection(UserConnection* uc) noexcept {
return;
}
}
+
putConnection(uc);
}
@@ -873,27 +931,25 @@ void ConnectionManager::addDownloadConnection(UserConnection* uc) noexcept {
bool addConn = false;
{
RLock l(cs);
- auto i = uc->isMCN() ? std::find(downloads.begin(), downloads.end(), uc->getToken()) : std::find(downloads.begin(), downloads.end(), uc->getUser());
- if(i != downloads.end()) {
- ConnectionQueueItem* cqi = *i;
- if(cqi->getState() == ConnectionQueueItem::WAITING || cqi->getState() == ConnectionQueueItem::CONNECTING) {
- cqi->setState(ConnectionQueueItem::ACTIVE);
- if (uc->isMCN()) {
- if (cqi->isSmallSlot()) {
- uc->setFlag(UserConnection::FLAG_SMALL_SLOT);
- cqi->setDownloadType(ConnectionQueueItem::DownloadType::SMALL_CONF);
- } else {
- cqi->setDownloadType(ConnectionQueueItem::DownloadType::MCN_NORMAL);
- }
+ auto cqi = findDownloadUnsafe(uc);
+ if (cqi && !cqi->isActive()) {
+ cqi->setState(ConnectionQueueItem::State::ACTIVE);
+ if (uc->isMCN()) {
+ if (cqi->isSmallSlot()) {
+ uc->setFlag(UserConnection::FLAG_SMALL_SLOT);
+ } else {
+ cqi->setDownloadType(QueueDownloadType::MCN_NORMAL);
}
- uc->setToken(cqi->getToken()); // sync for NMDC users
- uc->setHubUrl(cqi->getHubUrl()); //set the correct hint for the uc, it might not even have a hint at first.
- uc->setFlag(UserConnection::FLAG_ASSOCIATED);
- fire(ConnectionManagerListener::Connected(), cqi, uc);
- dcdebug("ConnectionManager::addDownloadConnection, leaving to downloadmanager\n");
- addConn = true;
+ cqi->setFlag(ConnectionQueueItem::FLAG_MCN);
}
+
+ uc->setToken(cqi->getToken()); // sync for NMDC users
+ uc->setHubUrl(cqi->getHubUrl()); //set the correct hint for the uc, it might not even have a hint at first.
+ uc->setFlag(UserConnection::FLAG_ASSOCIATED);
+ fire(ConnectionManagerListener::Connected(), cqi, uc);
+ dcdebug("ConnectionManager::addDownloadConnection, leaving to downloadmanager\n");
+ addConn = true;
}
}
@@ -910,9 +966,9 @@ void ConnectionManager::addUploadConnection(UserConnection* uc) noexcept {
{
WLock l(cs);
- auto &uploads = cqis[CONNECTION_TYPE_UPLOAD];
+ auto& uploads = cqis[CONNECTION_TYPE_UPLOAD];
if (!uc->isMCN() && find(uploads.begin(), uploads.end(), uc->getUser()) != uploads.end()) {
- //one connection per CID for non-mcn users
+ // One connection per CID for non-mcn users
allowAdd = false;
}
@@ -921,8 +977,8 @@ void ConnectionManager::addUploadConnection(UserConnection* uc) noexcept {
if (allowAdd) {
uc->setFlag(UserConnection::FLAG_ASSOCIATED);
- auto cqi = getCQI(uc->getHintedUser(), CONNECTION_TYPE_UPLOAD, uc->getToken());
- cqi->setState(ConnectionQueueItem::ACTIVE);
+ auto cqi = getCQIUnsafe(uc->getHintedUser(), CONNECTION_TYPE_UPLOAD, uc->getToken());
+ cqi->setState(ConnectionQueueItem::State::ACTIVE);
fire(ConnectionManagerListener::Connected(), cqi, uc);
}
}
@@ -986,11 +1042,13 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo
fail(AdcCommand::ERROR_GENERIC, "Connection not expected");
return;
}
+
+ // Hub URL
aSource->setHubUrl(i.second);
+ // User
auto user = ClientManager::getInstance()->findUser(CID(i.first));
aSource->setUser(user);
-
if (!aSource->getUser()) {
dcdebug("CM::onINF: User not found");
fail(AdcCommand::ERROR_GENERIC, "User not found");
@@ -1033,11 +1091,11 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo
RLock l(cs);
auto i = find(downloads.begin(), downloads.end(), token);
if (i != downloads.end()) {
- ConnectionQueueItem* cqi = *i;
- if(aSource->isMCN()) {
+ auto cqi = *i;
+ if (aSource->isMCN()) {
string slots;
if (cmd.getParam("CO", 0, slots)) {
- cqi->setMaxConns(static_cast(Util::toInt(slots)));
+ cqi->setMaxRemoteConns(static_cast(Util::toInt(slots)));
}
}
cqi->setErrors(0);
@@ -1077,11 +1135,17 @@ void ConnectionManager::on(AdcCommand::INF, UserConnection* aSource, const AdcCo
}
void ConnectionManager::force(const string& aToken) noexcept {
+ if (DownloadManager::getInstance()->checkIdle(aToken)) {
+ dcdebug("ConnectionManager::force: idler %s\n", aToken.c_str());
+ return;
+ }
+
RLock l(cs);
auto i = find(downloads.begin(), downloads.end(), aToken);
if (i != downloads.end()) {
fire(ConnectionManagerListener::Forced(), *i);
(*i)->setLastAttempt(0);
+ dcdebug("ConnectionManager::force: download %s\n", aToken.c_str());
}
}
@@ -1106,67 +1170,89 @@ void ConnectionManager::failDownload(const string& aToken, const string& aError,
}
auto cqi = *i;
- if (cqi->getState() == ConnectionQueueItem::WAITING)
- return;
-
+ if (cqi->isMcn()) {
+ removeExtraMCNUnsafe(cqi);
- if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::MCN_NORMAL && !cqi->isSet(ConnectionQueueItem::FLAG_REMOVE)) {
- // Remove an existing waiting item (if exists)
- auto s = find_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* c) {
- return c->getUser() == cqi->getUser() && !c->isSmallSlot() &&
- !c->isActive() && c != cqi && !c->isSet(ConnectionQueueItem::FLAG_REMOVE);
- });
-
- if (s != downloads.end()) {
- // (*s)->setFlag(ConnectionQueueItem::FLAG_REMOVE);
- dcdebug("ConnectionManager::failDownload: removing an existing inactive CQI %s\n", (*s)->getToken().c_str());
- putCQI(*s);
+ if (cqi->isSmallSlot() && cqi->getState() == ConnectionQueueItem::State::ACTIVE) {
+ // Small slot item that was never used for downloading anything? Check if we have normal files to download
+ if (allowNewMCNUnsafe(cqi->getUser(), false)) {
+ mcnUser = cqi->getUser();
+ }
}
- }
-
- if (cqi->getDownloadType() == ConnectionQueueItem::DownloadType::SMALL_CONF && cqi->getState() == ConnectionQueueItem::ACTIVE) {
- //small slot item that was never used for downloading anything? check if we have normal files to download
- if (allowNewMCN(cqi))
- mcnUser = cqi->getUser();
}
- cqi->setState(ConnectionQueueItem::WAITING);
+ if (cqi->getState() != ConnectionQueueItem::State::WAITING) {
+ cqi->setState(ConnectionQueueItem::State::WAITING);
+
+ cqi->setErrors(aFatalError ? -1 : (cqi->getErrors() + 1));
+ cqi->setLastAttempt(GET_TICK());
+ }
- cqi->setErrors(aFatalError ? -1 : (cqi->getErrors() + 1));
- cqi->setLastAttempt(GET_TICK());
+ cqi->unsetFlag(ConnectionQueueItem::FLAG_RUNNING);
fire(ConnectionManagerListener::Failed(), cqi, aError);
}
- if (mcnUser)
+ if (mcnUser) {
createNewMCN(*mcnUser);
+ }
+}
+
+void ConnectionManager::onIdle(UserConnection* aSource) noexcept {
+ WLock l(cs); //this may flag other user connections as removed which would possibly cause threading issues
+ auto cqi = findDownloadUnsafe(aSource);
+ if (!cqi || !cqi->isSet(ConnectionQueueItem::FLAG_RUNNING)) {
+ return;
+ }
+
+ cqi->unsetFlag(ConnectionQueueItem::FLAG_RUNNING);
+ removeExtraMCNUnsafe(cqi);
+}
+
+void ConnectionManager::removeExtraMCNUnsafe(const ConnectionQueueItem* aFailedCQI) noexcept {
+ if (!aFailedCQI->isMcn()) {
+ return;
+ }
+
+ if (aFailedCQI->getDownloadType() != QueueDownloadType::MCN_NORMAL) {
+ return;
+ }
+
+ // Remove an existing waiting item (if exists)
+ auto s = find_if(downloads.begin(), downloads.end(), [&](const ConnectionQueueItem* c) {
+ return c->getUser() == aFailedCQI->getUser() && !c->isSmallSlot() &&
+ !c->isActive() && c != aFailedCQI;
+ });
+
+ if (s != downloads.end()) {
+ // (*s)->setFlag(ConnectionQueueItem::FLAG_REMOVE);
+ dcdebug("ConnectionManager::disconnectExtraMCN: removing an existing inactive MCN item %s\n", (*s)->getToken().c_str());
+ putCQIUnsafe(*s);
+ }
+}
+
+ConnectionQueueItem* ConnectionManager::findDownloadUnsafe(const UserConnection* aSource) noexcept {
+ // Token may not be synced for NMDC users
+ auto i = aSource->isMCN() ? std::find(downloads.begin(), downloads.end(), aSource->getToken()) : std::find(downloads.begin(), downloads.end(), aSource->getUser());
+ if (i == downloads.end()) {
+ return nullptr;
+ }
+
+ return *i;
+}
+
+void ConnectionManager::on(UserConnectionListener::State, UserConnection* aSource) noexcept {
+ if (aSource->getState() == UserConnection::STATE_IDLE) {
+ onIdle(aSource);
+ } else if (aSource->isSet(UserConnection::FLAG_DOWNLOAD) && aSource->getState() == UserConnection::STATE_RUNNING) {
+ onDownloadRunning(aSource);
+ }
}
void ConnectionManager::failed(UserConnection* aSource, const string& aError, bool aProtocolError) noexcept {
if(aSource->isSet(UserConnection::FLAG_ASSOCIATED)) {
if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) {
- if (aSource->getState() == UserConnection::STATE_IDLE) {
- // don't remove the CQI if we are only out of downloading slots
-
- bool allowChange = false, hasDownload = false;
- QueueToken unusedBundleToken = 0;
- string unusedHubUrl, lastError;
- QueueManager::getInstance()->startDownload(aSource->getHintedUser(), unusedHubUrl,
- aSource->isSet(UserConnection::FLAG_SMALL_SLOT) ? QueueItem::TYPE_SMALL : QueueItem::TYPE_ANY,
- unusedBundleToken, allowChange, hasDownload, lastError);
-
- if (hasDownload) {
- failDownload(aSource->getToken(), lastError, aProtocolError);
- } else {
- WLock l(cs);
- auto i = find(downloads.begin(), downloads.end(), aSource->getToken());
- dcassert(i != downloads.end());
- if (i != downloads.end()) {
- putCQI(*i);
- }
- }
- } else {
- failDownload(aSource->getToken(), aError, aProtocolError);
- }
+ failDownload(aSource->getToken(), aError, aProtocolError);
+ dcdebug("ConnectionManager::failed: download %s failed\n", aSource->getToken().c_str());
} else {
auto type = aSource->isSet(UserConnection::FLAG_UPLOAD) ? CONNECTION_TYPE_UPLOAD :
@@ -1178,7 +1264,7 @@ void ConnectionManager::failed(UserConnection* aSource, const string& aError, bo
auto i = type == CONNECTION_TYPE_PM ? find(container.begin(), container.end(), aSource->getUser()) :
find(container.begin(), container.end(), aSource->getToken());
dcassert(i != container.end());
- putCQI(*i);
+ putCQIUnsafe(*i);
}
}
}
diff --git a/airdcpp/ConnectionManager.h b/airdcpp/ConnectionManager.h
index de652623..ea75a146 100644
--- a/airdcpp/ConnectionManager.h
+++ b/airdcpp/ConnectionManager.h
@@ -28,6 +28,7 @@
#include "CriticalSection.h"
#include "FloodCounter.h"
#include "HintedUser.h"
+#include "QueueDownloadInfo.h"
#include "Singleton.h"
#include "UserConnection.h"
@@ -54,33 +55,26 @@ class ConnectionQueueItem : boost::noncopyable, public Flags {
typedef vector List;
typedef List::const_iterator Iter;
- enum State {
+ enum class State {
CONNECTING, // Recently sent request to connect
WAITING, // Waiting to send request to connect
ACTIVE, // In one up/downmanager
- RUNNING // Running/idle
};
enum Flags {
- FLAG_REMOVE = 0x01
- };
-
- enum class DownloadType {
- ANY,
- SMALL,
- SMALL_CONF,
- MCN_NORMAL
+ FLAG_MCN = 0x02,
+ FLAG_RUNNING = 0x04,
};
ConnectionQueueItem(const HintedUser& aUser, ConnectionType aConntype, const string& aToken);
GETSET(string, token, Token);
- IGETSET(DownloadType, downloadType, DownloadType, DownloadType::ANY);
+ IGETSET(QueueDownloadType, downloadType, DownloadType, QueueDownloadType::ANY);
GETSET(string, lastBundle, LastBundle);
IGETSET(uint64_t, lastAttempt, LastAttempt, 0);
IGETSET(int, errors, Errors, 0); // Number of connection errors, or -1 after a protocol error
- IGETSET(State, state, State, WAITING);
- IGETSET(uint8_t, maxConns, MaxConns, 0);
+ IGETSET(State, state, State, State::WAITING);
+ IGETSET(uint8_t, maxRemoteConns, MaxRemoteConns, 0);
GETSET(ConnectionType, connType, ConnType);
const string& getHubUrl() const noexcept { return user.hint; }
@@ -88,17 +82,14 @@ class ConnectionQueueItem : boost::noncopyable, public Flags {
const HintedUser& getUser() const noexcept { return user; }
bool allowNewConnections(int running) const noexcept;
- bool isSmallSlot() const noexcept {
- return downloadType == DownloadType::SMALL_CONF || downloadType == DownloadType::SMALL;
- }
+ bool isSmallSlot() const noexcept;
+ bool isActive() const noexcept;
+ bool isMcn() const noexcept;
- bool isActive() const noexcept {
- return state == ACTIVE || state == RUNNING;
- }
+ bool allowConnect(int aAttempts, int aAttemptLimit, uint64_t aTick) const noexcept;
+ bool isTimeout(uint64_t aTick) const noexcept;
- bool isMcn() const noexcept {
- return downloadType == DownloadType::SMALL_CONF || downloadType == DownloadType::MCN_NORMAL;
- }
+ void resetFatalError() noexcept;
private:
HintedUser user;
};
@@ -174,8 +165,6 @@ class ConnectionManager : public Speaker, public Clie
const string& getPort() const noexcept;
const string& getSecurePort() const noexcept;
- void addRunningMCN(const UserConnection *aSource) noexcept;
-
// set fatalError to true if the client shouldn't try to reconnect automatically
void failDownload(const string& aToken, const string& aError, bool aFatalError) noexcept;
@@ -190,9 +179,19 @@ class ConnectionManager : public Speaker, public Clie
private:
FloodCounter floodCounter;
- bool allowNewMCN(const ConnectionQueueItem* aCQI) noexcept;
+ typedef std::function ConnectionQueueItemCallback;
+
+ // Can we create a new regular MCN connection?
+ bool allowNewMCNUnsafe(const UserPtr& aUser, bool aSmallSlot, ConnectionQueueItemCallback&& aWaitingCallback = nullptr) noexcept;
+
+ // Create a new regular MCN connection
void createNewMCN(const HintedUser& aUser) noexcept;
+ // Remove an extra waiting MCN connection (we should keep only one waiting connection)
+ void removeExtraMCNUnsafe(const ConnectionQueueItem* aFailedCQI) noexcept;
+
+ void onDownloadRunning(const UserConnection* aSource) noexcept;
+
class Server : public Thread {
public:
Server(bool secure, const string& port_, const string& ipv4, const string& ipv6);
@@ -247,8 +246,8 @@ class ConnectionManager : public Speaker, public Clie
void addDownloadConnection(UserConnection* uc) noexcept;
void addPMConnection(UserConnection* uc) noexcept;
- ConnectionQueueItem* getCQI(const HintedUser& aUser, ConnectionType aConnType, const string& aToken = Util::emptyString) noexcept;
- void putCQI(ConnectionQueueItem* cqi) noexcept;
+ ConnectionQueueItem* getCQIUnsafe(const HintedUser& aUser, ConnectionType aConnType, const string& aToken = Util::emptyString) noexcept;
+ void putCQIUnsafe(ConnectionQueueItem* cqi) noexcept;
void accept(const Socket& sock, bool aSecure) noexcept;
@@ -268,6 +267,8 @@ class ConnectionManager : public Speaker, public Clie
void on(UserConnectionListener::MyNick, UserConnection*, const string&) noexcept override;
void on(UserConnectionListener::Supports, UserConnection*, const StringList&) noexcept override;
void on(UserConnectionListener::UserSet, UserConnection*) noexcept override;
+ // void on(UserConnectionListener::Idle, UserConnection*) noexcept override;
+ void on(UserConnectionListener::State, UserConnection*) noexcept override;
void on(AdcCommand::SUP, UserConnection*, const AdcCommand&) noexcept override;
void on(AdcCommand::INF, UserConnection*, const AdcCommand&) noexcept override;
@@ -282,8 +283,14 @@ class ConnectionManager : public Speaker, public Clie
void on(ClientManagerListener::UserDisconnected, const UserPtr& aUser, bool) noexcept override { onUserUpdated(aUser); }
void onUserUpdated(const UserPtr& aUser) noexcept;
+ void onIdle(UserConnection* aSource) noexcept;
void attemptDownloads(uint64_t aTick, StringList& removedTokens_) noexcept;
+ bool attemptDownloadUnsafe(ConnectionQueueItem* cqi, StringList& removedTokens_) noexcept;
+ bool connectUnsafe(ConnectionQueueItem* cqi, bool aAllowUrlChange) noexcept;
+
+ ConnectionQueueItem* findDownloadUnsafe(const UserConnection* aSource) noexcept;
+
StringList getAdcFeatures() const noexcept;
};
diff --git a/airdcpp/DirectoryListingManager.h b/airdcpp/DirectoryListingManager.h
index 13f06de0..327b10ef 100644
--- a/airdcpp/DirectoryListingManager.h
+++ b/airdcpp/DirectoryListingManager.h
@@ -94,14 +94,13 @@ namespace dcpp {
/** Lists open in the client **/
DirectoryListingMap viewedLists;
- void on(QueueManagerListener::ItemAdded, const QueueItemPtr& aQI) noexcept;
- void on(QueueManagerListener::ItemFinished, const QueueItemPtr& qi, const string& dir, const HintedUser& aUser, int64_t aSpeed) noexcept;
- void on(QueueManagerListener::ItemRemoved, const QueueItemPtr& qi, bool finished) noexcept;
+ void on(QueueManagerListener::ItemAdded, const QueueItemPtr& aQI) noexcept override;
+ void on(QueueManagerListener::ItemFinished, const QueueItemPtr& qi, const string& dir, const HintedUser& aUser, int64_t aSpeed) noexcept override;
+ void on(QueueManagerListener::ItemRemoved, const QueueItemPtr& qi, bool finished) noexcept override;
- void on(QueueManagerListener::PartialListFinished, const HintedUser& aUser, const string& aXml, const string& aBase) noexcept;
+ void on(QueueManagerListener::PartialListFinished, const HintedUser& aUser, const string& aXml, const string& aBase) noexcept override;
-
- void on(TimerManagerListener::Minute, uint64_t aTick) noexcept;
+ void on(TimerManagerListener::Minute, uint64_t aTick) noexcept override;
};
}
diff --git a/airdcpp/Download.cpp b/airdcpp/Download.cpp
index 28a92747..d8356678 100644
--- a/airdcpp/Download.cpp
+++ b/airdcpp/Download.cpp
@@ -36,6 +36,7 @@ namespace dcpp {
Download::Download(UserConnection& conn, QueueItem& qi) noexcept : Transfer(conn, qi.getTarget(), qi.getTTH()),
tempTarget(qi.getTempTarget()), listDirectoryPath(qi.isFilelist() ? qi.getListDirectoryPath() : Util::emptyString)
{
+ dcassert(!conn.getDownload());
conn.setDownload(this);
QueueItem::SourceConstIter source = qi.getSource(getUser());
@@ -101,7 +102,9 @@ Download::Download(UserConnection& conn, QueueItem& qi) noexcept : Transfer(conn
}
Download::~Download() {
- getUserConnection().setDownload(0);
+ dcassert(getUserConnection().getDownload() == this);
+ // dcdebug("Deleting download %s\n", getToken().c_str());
+ getUserConnection().setDownload(nullptr);
}
string Download::getBundleStringToken() const noexcept {
@@ -115,6 +118,17 @@ bool Download::operator==(const Download* d) const {
return compare(getToken(), d->getToken()) == 0;
}
+void Download::flush() noexcept {
+ if (getOutput()) {
+ if (getActual() > 0) {
+ try {
+ getOutput()->flushBuffers(false);
+ } catch (const Exception&) {
+ }
+ }
+ }
+}
+
void Download::appendFlags(OrderedStringSet& flags_) const noexcept {
if (isSet(Download::FLAG_PARTIAL)) {
flags_.insert("P");
diff --git a/airdcpp/Download.h b/airdcpp/Download.h
index 2a303adc..6326ef13 100644
--- a/airdcpp/Download.h
+++ b/airdcpp/Download.h
@@ -94,6 +94,7 @@ class Download : public Transfer, public Flags {
string getBundleStringToken() const noexcept;
void appendFlags(OrderedStringSet& flags_) const noexcept;
+ void flush() noexcept;
private:
Download(const Download&);
Download& operator=(const Download&) = delete;
diff --git a/airdcpp/DownloadManager.cpp b/airdcpp/DownloadManager.cpp
index c2150577..edfa94ea 100644
--- a/airdcpp/DownloadManager.cpp
+++ b/airdcpp/DownloadManager.cpp
@@ -20,7 +20,6 @@
#include "DownloadManager.h"
#include "ClientManager.h"
-#include "ConnectionManager.h"
#include "Download.h"
#include "LogManager.h"
#include "QueueManager.h"
@@ -31,12 +30,6 @@
#include
#include
-
-// some strange mac definition
-#ifdef ff
-#undef ff
-#endif
-
namespace dcpp {
static const string DOWNLOAD_AREA = "Downloads";
@@ -150,16 +143,26 @@ void DownloadManager::on(TimerManagerListener::Second, uint64_t aTick) noexcept
QueueManager::getInstance()->handleSlowDisconnect(dtp.user, dtp.target, dtp.bundle);
}
-bool DownloadManager::checkIdle(const UserPtr& aUser, bool aSmallSlot, bool aReportOnly) {
+bool DownloadManager::checkIdle(const string& aToken) {
+ RLock l(cs);
+ for (auto uc : idlers) {
+ if (uc->getToken() == aToken) {
+ uc->callAsync([this, uc] { revive(uc); });
+ return true;
+ }
+ }
+ return false;
+}
+
+bool DownloadManager::checkIdle(const UserPtr& aUser, bool aSmallSlot) {
RLock l(cs);
for (auto uc: idlers) {
if (uc->getUser() == aUser) {
if (aSmallSlot != uc->isSet(UserConnection::FLAG_SMALL_SLOT) && uc->isMCN())
continue;
- if (!aReportOnly)
- uc->callAsync([this, uc] { revive(uc); });
- //dcdebug("uc updated");
+
+ uc->callAsync([this, uc] { revive(uc); });
return true;
}
}
@@ -178,18 +181,19 @@ void DownloadManager::revive(UserConnection* uc) {
checkDownloads(uc);
}
-void DownloadManager::addConnection(UserConnection* conn) {
- if (!conn->isSet(UserConnection::FLAG_SUPPORTS_TTHF) || !conn->isSet(UserConnection::FLAG_SUPPORTS_ADCGET)) {
+void DownloadManager::addConnection(UserConnection* aSource) {
+ if (!aSource->isSet(UserConnection::FLAG_SUPPORTS_TTHF) || !aSource->isSet(UserConnection::FLAG_SUPPORTS_ADCGET)) {
// Can't download from these...
- conn->getUser()->setFlag(User::OLD_CLIENT);
- QueueManager::getInstance()->removeSource(conn->getUser(), QueueItem::Source::FLAG_NO_TTHF);
- conn->disconnect();
+ aSource->getUser()->setFlag(User::OLD_CLIENT);
+ QueueManager::getInstance()->removeSource(aSource->getUser(), QueueItem::Source::FLAG_NO_TTHF);
+ dcdebug("DownloadManager::addConnection: outdated user (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource);
return;
}
- conn->addListener(this);
- checkDownloads(conn);
+ aSource->addListener(this);
+ checkDownloads(aSource);
}
QueueTokenSet DownloadManager::getRunningBundles(bool aIgnoreHighestPrio) const noexcept {
@@ -225,52 +229,38 @@ void DownloadManager::checkDownloads(UserConnection* aConn) {
//We may have download assigned for a connection if we are downloading in segments
//dcassert(!aConn->getDownload() || aConn->getDownload()->isSet(Download::FLAG_CHUNKED));
- QueueItemBase::DownloadType dlType = QueueItemBase::TYPE_ANY;
- if (aConn->isSet(UserConnection::FLAG_SMALL_SLOT)) {
- dlType = QueueItemBase::TYPE_SMALL;
- } else if (aConn->isMCN()) {
- dlType = QueueItemBase::TYPE_MCN_NORMAL;
- }
-
auto hubs = ClientManager::getInstance()->getHubSet(aConn->getUser()->getCID());
- //always make sure that the current hub is also compared even if it is offline
+ // always make sure that the current hub is also compared even if it is offline
hubs.insert(aConn->getHubUrl());
- string errorMessage;
auto runningBundles = getRunningBundles();
- bool start = QueueManager::getInstance()->startDownload(aConn->getHintedUser(), runningBundles, hubs, dlType, aConn->getSpeed(), errorMessage);
-
- // not a finished download?
- if(!start && aConn->getState() != UserConnection::STATE_RUNNING) {
- // removeRunningUser(aConn);
- failDownload(aConn, Util::emptyString, false);
- return;
- }
- string newUrl;
- Download* d = nullptr;
+ auto result = QueueManager::getInstance()->getDownload(*aConn, runningBundles, hubs);
- if (start)
- d = QueueManager::getInstance()->getDownload(*aConn, runningBundles, hubs, errorMessage, newUrl, dlType);
+ // Nothing to download? Skip finished download connections as they should be added in idlers
+ if (!result.hasDownload) {
+ dcdebug("DownloadManager::checkDownloads: no downloads from user %s (small slot: %s)\n", ClientManager::getInstance()->getFormatedNicks(aConn->getHintedUser()).c_str(), aConn->isSet(UserConnection::FLAG_SMALL_SLOT) ? "true" : "false");
+ if (aConn->getState() != UserConnection::STATE_RUNNING) {
+ failDownload(aConn, Util::emptyString, false);
+ return;
+ }
+ }
- if(!d) {
- aConn->unsetFlag(UserConnection::FLAG_RUNNING);
- if(!errorMessage.empty()) {
- fire(DownloadManagerListener::Status(), aConn, errorMessage);
+ auto d = result.download;
+ if (!d) {
+ if (result.hasDownload) {
+ dcdebug("DownloadManager::checkDownloads: can't start download from user %s (%s)\n", ClientManager::getInstance()->getFormatedNicks(aConn->getHintedUser()).c_str(), result.lastError.c_str());
}
- if (!checkIdle(aConn->getUser(), aConn->isSet(UserConnection::FLAG_SMALL_SLOT), true)) {
- aConn->setState(UserConnection::STATE_IDLE);
- fire(DownloadManagerListener::Idle(), aConn);
+ aConn->setState(UserConnection::STATE_IDLE);
+ fire(DownloadManagerListener::Idle(), aConn, result.lastError);
- {
- WLock l(cs);
- idlers.push_back(aConn);
- }
- } else {
- aConn->disconnect(true);
+ {
+ WLock l(cs);
+ idlers.push_back(aConn);
}
+
return;
}
@@ -281,8 +271,8 @@ void DownloadManager::checkDownloads(UserConnection* aConn) {
*/
string mySID;
- if(!aConn->getUser()->isNMDC()) {
- mySID = ClientManager::getInstance()->findMySID(aConn->getUser(), newUrl, false); //no fallback, keep the old hint even if the hub is offline
+ if (!aConn->getUser()->isNMDC()) {
+ mySID = ClientManager::getInstance()->findMySID(aConn->getUser(), result.hubHint, false); //no fallback, keep the old hint even if the hub is offline
}
aConn->setState(UserConnection::STATE_SND);
@@ -300,15 +290,16 @@ void DownloadManager::checkDownloads(UserConnection* aConn) {
}
}
- dcdebug("Requesting " I64_FMT "/" I64_FMT "\n", d->getStartPos(), d->getSegmentSize());
+ dcdebug("DownloadManager::checkDownloads: requesting " I64_FMT "/" I64_FMT " (connection %s)\n", d->getStartPos(), d->getSegmentSize(), d->getToken().c_str());
//only update the hub if it has been changed
- if (compare(newUrl, aConn->getHubUrl()) == 0) {
+ if (compare(result.hubHint, aConn->getHubUrl()) == 0) {
mySID.clear();
- } else if (!newUrl.empty()) {
- aConn->setHubUrl(newUrl);
+ } else if (!result.hubHint.empty()) {
+ aConn->setHubUrl(result.hubHint);
}
+ dcassert(aConn->getDownload());
fire(DownloadManagerListener::Requesting(), d, !mySID.empty());
aConn->send(d->getCommand(aConn->isSet(UserConnection::FLAG_SUPPORTS_ZLIB_GET), mySID));
}
@@ -320,7 +311,8 @@ void DownloadManager::on(AdcCommand::SND, UserConnection* aSource, const AdcComm
}
if(!aSource->getDownload()) {
- aSource->disconnect(true);
+ dcdebug("DownloadManager::AdcCommand::SND: no download (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource, true);
return;
}
@@ -333,7 +325,8 @@ void DownloadManager::on(AdcCommand::SND, UserConnection* aSource, const AdcComm
if(type != Transfer::names[aSource->getDownload()->getType()]) {
// Uhh??? We didn't ask for this...
- aSource->disconnect();
+ dcdebug("DownloadManager::AdcCommand::SND: transfer type mismatch (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource);
return;
}
@@ -344,7 +337,7 @@ void DownloadManager::startData(UserConnection* aSource, int64_t start, int64_t
Download* d = aSource->getDownload();
dcassert(d);
- dcdebug("Preparing " I64_FMT ":" I64_FMT ", " I64_FMT ":" I64_FMT"\n", d->getStartPos(), start, d->getSegmentSize(), bytes);
+ dcdebug("DownloadManager::startData: preparing " I64_FMT ":" I64_FMT ", " I64_FMT ":" I64_FMT"\n", d->getStartPos(), start, d->getSegmentSize(), bytes);
if (d->getSegmentSize() == -1) {
if(bytes >= 0) {
d->setSegmentSize(bytes);
@@ -379,10 +372,6 @@ void DownloadManager::startData(UserConnection* aSource, int64_t start, int64_t
d->setStart(GET_TICK());
d->tick();
- if (!aSource->isSet(UserConnection::FLAG_RUNNING) && aSource->isMCN() && (d->getType() == Download::TYPE_FILE || d->getType() == Download::TYPE_PARTIAL_LIST)) {
- ConnectionManager::getInstance()->addRunningMCN(aSource);
- aSource->setFlag(UserConnection::FLAG_RUNNING);
- }
aSource->setState(UserConnection::STATE_RUNNING);
fire(DownloadManagerListener::Starting(), d);
@@ -401,8 +390,12 @@ void DownloadManager::startData(UserConnection* aSource, int64_t start, int64_t
void DownloadManager::on(UserConnectionListener::Data, UserConnection* aSource, const uint8_t* aData, size_t aLen) noexcept {
Download* d = aSource->getDownload();
- if (!d) //No download but receiving data??
- aSource->disconnect(true);
+ if (!d) {
+ //No download but receiving data??
+ dcassert(0);
+ dcdebug("DownloadManager::UserConnectionListener::Data: no download (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource, true);
+ }
try {
d->addPos(d->getOutput()->write(aData, aLen), aLen);
@@ -444,12 +437,12 @@ void DownloadManager::endData(UserConnection* aSource) {
if(!(d->getTTH() == d->getTigerTree().getRoot())) {
// This tree is for a different file, remove from queue...
- removeDownload(d);
fire(DownloadManagerListener::Failed(), d, STRING(INVALID_TREE));
QueueManager::getInstance()->removeFileSource(d->getPath(), aSource->getUser(), QueueItem::Source::FLAG_BAD_TREE, false);
- QueueManager::getInstance()->putDownloadHooked(d, false);
+ dcdebug("DownloadManager::endData: invalid tree received from user %s (received %s while %s was excpected)\n", ClientManager::getInstance()->getFormatedNicks(d->getHintedUser()).c_str(), d->getTTH().toBase32().c_str(), d->getTigerTree().getRoot().toBase32().c_str());
+ putDownloadHooked(d, false);
checkDownloads(aSource);
return;
}
@@ -458,21 +451,24 @@ void DownloadManager::endData(UserConnection* aSource) {
aSource->setSpeed(static_cast(d->getAverageSpeed()));
aSource->updateChunkSize(d->getTigerTree().getBlockSize(), d->getSegmentSize(), GET_TICK() - d->getStart());
- dcdebug("Download finished: %s, size " I64_FMT ", downloaded " I64_FMT " in " U64_FMT " ms\n", d->getPath().c_str(), d->getSegmentSize(), d->getPos(), GET_TICK() - d->getStart());
+ dcdebug("DownloadManager::endData: %s (connection %s), size " I64_FMT ", downloaded " I64_FMT " in " U64_FMT " ms\n", d->getPath().c_str(), d->getToken().c_str(), d->getSegmentSize(), d->getPos(), GET_TICK() - d->getStart());
}
- removeDownload(d);
-
fire(DownloadManagerListener::Complete(), d, d->getType() == Transfer::TYPE_TREE);
+ putDownloadHooked(d, true);
+ checkDownloads(aSource);
+}
+
+void DownloadManager::putDownloadHooked(Download* aDownload, bool aFinished, bool aNoAccess, bool aRotateQueue) {
+ unique_ptr d(aDownload);
+
+ removeDownload(d.get());
try {
- QueueManager::getInstance()->putDownloadHooked(d, true);
+ QueueManager::getInstance()->putDownloadHooked(d.get(), aFinished, aNoAccess, aRotateQueue);
} catch (const HashException& e) {
- failDownload(aSource, e.getError(), false);
- ConnectionManager::getInstance()->failDownload(aSource->getToken(), e.getError(), true);
+ failDownload(&aDownload->getUserConnection(), e.getError(), false);
return;
}
-
- checkDownloads(aSource);
}
int64_t DownloadManager::getRunningAverage() const {
@@ -508,7 +504,7 @@ void DownloadManager::on(UserConnectionListener::MaxedOut, UserConnection* aSour
void DownloadManager::noSlots(UserConnection* aSource, const string& param) {
if(aSource->getState() != UserConnection::STATE_SND) {
dcdebug("DM::noSlots Bad state, disconnecting\n");
- aSource->disconnect();
+ disconnect(aSource);
return;
}
@@ -527,9 +523,9 @@ void DownloadManager::onFailed(UserConnection* aSource, const string& aError) {
void DownloadManager::failDownload(UserConnection* aSource, const string& aReason, bool aRotateQueue) {
auto d = aSource->getDownload();
if (d) {
- removeDownload(d);
+ dcdebug("DownloadManager::failDownload: %s failed (%s)\n", aSource->getToken().c_str(), aReason.c_str());
fire(DownloadManagerListener::Failed(), d, aReason);
- QueueManager::getInstance()->putDownloadHooked(d, false, false, aRotateQueue);
+ putDownloadHooked(d, false, false, aRotateQueue);
} else {
fire(DownloadManagerListener::Remove(), aSource);
}
@@ -543,16 +539,14 @@ void DownloadManager::removeConnection(UserConnectionPtr aConn) {
aConn->disconnect();
}
+void DownloadManager::disconnect(UserConnectionPtr aConn, bool aGraceless) {
+ dcdebug("DownloadManager::disconnect: %s (graceless: %s)\n", aConn->getToken().c_str(), aGraceless ? "true" : "false");
+ aConn->disconnect(aGraceless);
+}
+
void DownloadManager::removeDownload(Download* d) {
// Write the leftover bytes into file
- if(d->getOutput()) {
- if(d->getActual() > 0) {
- try {
- d->getOutput()->flushBuffers(false);
- } catch(const Exception&) {
- }
- }
- }
+ d->flush();
{
WLock l(cs);
@@ -587,7 +581,7 @@ void DownloadManager::abortDownload(const string& aTarget, const UserPtr& aUser)
continue;
}
}
- dcdebug("Trying to close connection for download %p\n", d);
+ dcdebug("Trying to close connection for download %s\n", d->getToken().c_str());
d->getUserConnection().disconnect(true);
}
}
@@ -595,7 +589,8 @@ void DownloadManager::abortDownload(const string& aTarget, const UserPtr& aUser)
void DownloadManager::on(UserConnectionListener::FileNotAvailable, UserConnection* aSource) noexcept {
if(!aSource->getDownload()) {
- aSource->disconnect(true);
+ dcdebug("DM::FileNotAvailable: no download (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource, true);
return;
}
fileNotAvailable(aSource, false);
@@ -604,20 +599,23 @@ void DownloadManager::on(UserConnectionListener::FileNotAvailable, UserConnectio
/** @todo Handle errors better */
void DownloadManager::on(AdcCommand::STA, UserConnection* aSource, const AdcCommand& cmd) noexcept {
if(cmd.getParameters().size() < 2) {
- aSource->disconnect();
+ dcdebug("DM::AdcCommand::STA: not enough parameters (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource);
return;
}
const string& errorCode = cmd.getParam(0);
const string& errorMessage = cmd.getParam(1);
if(errorCode.length() != 3) {
- aSource->disconnect();
+ dcdebug("DM::AdcCommand::STA: invalid error code (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource);
return;
}
switch(Util::toInt(errorCode.substr(0, 1))) {
case AdcCommand::SEV_FATAL:
- aSource->disconnect();
+ dcdebug("DM::AdcCommand::STA: fatal error (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource);
return;
case AdcCommand::SEV_RECOVERABLE:
switch(Util::toInt(errorCode.substr(1))) {
@@ -642,13 +640,15 @@ void DownloadManager::on(AdcCommand::STA, UserConnection* aSource, const AdcComm
dcdebug("Unknown success message %s %s", errorCode.c_str(), errorMessage.c_str());
return;
}
- aSource->disconnect();
+
+ dcdebug("DM::AdcCommand::STA: disconnecting (%s)\n", aSource->getToken().c_str());
+ disconnect(aSource);
}
void DownloadManager::fileNotAvailable(UserConnection* aSource, bool aNoAccess, const string& aMessage) {
if(aSource->getState() != UserConnection::STATE_SND) {
dcdebug("DM::fileNotAvailable Invalid state, disconnecting");
- aSource->disconnect();
+ disconnect(aSource);
return;
}
@@ -656,8 +656,6 @@ void DownloadManager::fileNotAvailable(UserConnection* aSource, bool aNoAccess,
dcassert(d);
dcdebug("File Not Available: %s\n", d->getPath().c_str());
- removeDownload(d);
-
string error;
if (aNoAccess) {
error = STRING(NO_FILE_ACCESS);
@@ -675,7 +673,7 @@ void DownloadManager::fileNotAvailable(UserConnection* aSource, bool aNoAccess,
QueueManager::getInstance()->removeFileSource(d->getPath(), aSource->getUser(), (Flags::MaskType)(d->getType() == Transfer::TYPE_TREE ? QueueItem::Source::FLAG_NO_TREE : QueueItem::Source::FLAG_FILE_NOT_AVAILABLE), false);
}
- QueueManager::getInstance()->putDownloadHooked(d, false, aNoAccess);
+ putDownloadHooked(d, false, aNoAccess);
checkDownloads(aSource);
}
diff --git a/airdcpp/DownloadManager.h b/airdcpp/DownloadManager.h
index efa6a8ed..ee6591d4 100644
--- a/airdcpp/DownloadManager.h
+++ b/airdcpp/DownloadManager.h
@@ -47,7 +47,8 @@ class DownloadManager : public Speaker,
/** @internal */
void addConnection(UserConnection* conn);
- bool checkIdle(const UserPtr& aUser, bool aSmallSlot, bool aReportOnly = false);
+ bool checkIdle(const UserPtr& aUser, bool aSmallSlot);
+ bool checkIdle(const string& aToken);
/** @internal */
void abortDownload(const string& aTarget, const UserPtr& aUser = nullptr);
@@ -83,11 +84,15 @@ class DownloadManager : public Speaker,
// Bundle::TokenMap bundles;
UserConnectionList idlers;
+ void disconnect(UserConnectionPtr aConn, bool aGraceless = false);
void removeConnection(UserConnectionPtr aConn);
void removeDownload(Download* aDown);
void fileNotAvailable(UserConnection* aSource, bool aNoAccess, const string& aMessage = Util::emptyString);
void noSlots(UserConnection* aSource, const string& param = Util::emptyString);
+
+ void putDownloadHooked(Download* aDownload, bool aFinished, bool aNoAccess = false, bool aRotateQueue = false);
+
void failDownload(UserConnection* aSource, const string& reason, bool rotateQueue);
friend class Singleton;
diff --git a/airdcpp/DownloadManagerListener.h b/airdcpp/DownloadManagerListener.h
index baf2e53a..73205e4d 100644
--- a/airdcpp/DownloadManagerListener.h
+++ b/airdcpp/DownloadManagerListener.h
@@ -84,8 +84,7 @@ class DownloadManagerListener {
* display an error string.
*/
virtual void on(Failed, const Download*, const string&) noexcept { }
- virtual void on(Status, const UserConnection*, const string&) noexcept { }
- virtual void on(Idle, const UserConnection*) noexcept { }
+ virtual void on(Idle, const UserConnection*, const string&) noexcept { }
virtual void on(Remove, const UserConnection*) noexcept { }
};
diff --git a/airdcpp/GetSet.h b/airdcpp/GetSet.h
index 4089f12b..932277be 100644
--- a/airdcpp/GetSet.h
+++ b/airdcpp/GetSet.h
@@ -36,6 +36,15 @@ protected: t name = init; \
public: std::conditional::value, const t&, t>::type get##name2() const noexcept { return name; } \
template void set##name2(GetSetT&& name) { this->name = std::forward(name); }
+
+#define GETPROP(t, name, name2) \
+protected: t name; \
+public: std::conditional::value, const t&, t>::type get##name2() const noexcept { return name; }
+
+#define IGETPROP(t, name, name2, init) \
+protected: t name = init; \
+public: std::conditional::value, const t&, t>::type get##name2() const noexcept { return name; }
+
#else
// This version is for my stupid editor =)
diff --git a/airdcpp/PartialBundleSharingManager.h b/airdcpp/PartialBundleSharingManager.h
index 0d84f312..39f52629 100644
--- a/airdcpp/PartialBundleSharingManager.h
+++ b/airdcpp/PartialBundleSharingManager.h
@@ -27,7 +27,6 @@
#include "CriticalSection.h"
#include "ProtocolCommandManager.h"
#include "Message.h"
-#include "QueueItemBase.h"
namespace dcpp {
diff --git a/airdcpp/PrivateChat.cpp b/airdcpp/PrivateChat.cpp
index 720a9ce7..8c92572f 100644
--- a/airdcpp/PrivateChat.cpp
+++ b/airdcpp/PrivateChat.cpp
@@ -243,28 +243,24 @@ void PrivateChat::close() {
}
void PrivateChat::startCC() {
- bool protocolError;
if (!replyTo.user->isOnline() || ccpmState < DISCONNECTED) {
return;
}
ccpmState = CONNECTING;
- string lastError;
auto token = ConnectionManager::getInstance()->tokens.createToken(CONNECTION_TYPE_PM);
-
- auto newUrl = replyTo.hint;
- bool connecting = ClientManager::getInstance()->connect(replyTo.user, token, true, lastError, newUrl, protocolError, CONNECTION_TYPE_PM);
- if (replyTo.hint != newUrl) {
- setHubUrl(newUrl);
+ auto connectResult = ClientManager::getInstance()->connect(replyTo, token, true, CONNECTION_TYPE_PM);
+ if (replyTo.hint != connectResult.getHubHint()) {
+ setHubUrl(connectResult.getHubHint());
}
- allowAutoCCPM = !protocolError;
+ allowAutoCCPM = !connectResult.getIsProtocolError();
- if (!connecting) {
+ if (!connectResult.getIsSuccess()) {
ccpmState = DISCONNECTED;
- if (!lastError.empty()) {
- statusMessage(lastError, LogMessage::SEV_ERROR, LogMessage::Type::SERVER);
+ if (!connectResult.getError().empty()) {
+ statusMessage(connectResult.getError(), LogMessage::SEV_ERROR, LogMessage::Type::SERVER);
}
} else {
statusMessage(STRING(CCPM_ESTABLISHING), LogMessage::SEV_INFO, LogMessage::Type::SERVER);
diff --git a/airdcpp/QueueDownloadInfo.h b/airdcpp/QueueDownloadInfo.h
new file mode 100644
index 00000000..cf096f1a
--- /dev/null
+++ b/airdcpp/QueueDownloadInfo.h
@@ -0,0 +1,85 @@
+/*
+* Copyright (C) 2011-2024 AirDC++ Project
+*
+* This program is free software; you can redistribute it and/or modify
+* it under the terms of the GNU General Public License as published by
+* the Free Software Foundation; either version 3 of the License, or
+* (at your option) any later version.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU General Public License for more details.
+*
+* You should have received a copy of the GNU General Public License
+* along with this program; if not, write to the Free Software
+* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+*/
+
+#ifndef DCPLUSPLUS_DCPP_QUEUE_DOWNLOAD_INFO_H_
+#define DCPLUSPLUS_DCPP_QUEUE_DOWNLOAD_INFO_H_
+
+#include "typedefs.h"
+
+#include "HintedUser.h"
+#include "Priority.h"
+
+namespace dcpp {
+
+enum class QueueDownloadType {
+ ANY,
+ SMALL,
+ MCN_NORMAL
+};
+
+struct QueueDownloadResultBase {
+ string hubHint;
+
+ // The last error why a file can't be started (not cleared if a download is found afterwards)
+ string lastError;
+
+ // Indicates that there's a valid file even if it can't be temporarily started e.g. due to configured download limits
+ bool hasDownload = false;
+
+ virtual void merge(const QueueDownloadResultBase& aOther) noexcept {
+ hubHint = aOther.hubHint;
+ hasDownload = aOther.hasDownload;
+ lastError = aOther.lastError;
+ }
+};
+
+
+// Queue results
+struct QueueDownloadResult : QueueDownloadResultBase {
+ // Whether the returned hubHint should be strictly followed (e.g. a filelist download)
+ bool allowUrlChange = true;
+
+ // Possible bundle
+ optional bundleToken;
+
+ bool startDownload = false;
+ QueueDownloadType downloadType = QueueDownloadType::ANY;
+
+ QueueItemPtr qi = nullptr;
+};
+
+
+struct QueueDownloadQuery {
+ QueueDownloadQuery(const UserPtr& aUser, const OrderedStringSet& aOnlineHubs, const QueueTokenSet& aRunningBundles) : user(aUser), onlineHubs(aOnlineHubs), runningBundles(aRunningBundles) {}
+
+ const UserPtr user;
+
+ QueueDownloadType downloadType = QueueDownloadType::ANY;
+
+ int64_t wantedSize = 0;
+ int64_t lastSpeed = 0;
+
+ Priority minPrio = Priority::LOWEST;
+
+ const OrderedStringSet& onlineHubs;
+ const QueueTokenSet& runningBundles;
+};
+
+}
+
+#endif /* DCPLUSPLUS_DCPP_BUNDLEINFO_H_ */
diff --git a/airdcpp/QueueItem.cpp b/airdcpp/QueueItem.cpp
index e1635090..13a26ff9 100644
--- a/airdcpp/QueueItem.cpp
+++ b/airdcpp/QueueItem.cpp
@@ -132,9 +132,9 @@ bool QueueItem::isFailedStatus(Status aStatus) noexcept {
}
Priority QueueItem::calculateAutoPriority() const noexcept {
- if(getAutoPriority()) {
+ if (getAutoPriority()) {
Priority p;
- int percent = static_cast(getDownloadedBytes() * 10.0 / size);
+ auto percent = static_cast(getDownloadedBytes() * 10.0 / size);
switch(percent){
case 0:
case 1:
@@ -565,7 +565,7 @@ uint64_t QueueItem::getDownloadedBytes() const noexcept {
void QueueItem::addFinishedSegment(const Segment& aSegment) noexcept {
#ifdef _DEBUG
if (bundle)
- dcdebug("adding segment segment of size " I64_FMT " (" I64_FMT ", " I64_FMT ")...", aSegment.getSize(), aSegment.getStart(), aSegment.getEnd());
+ dcdebug("QueueItem::addFinishedSegment: adding segment of size " I64_FMT " (" I64_FMT ", " I64_FMT ")...", aSegment.getSize(), aSegment.getStart(), aSegment.getEnd());
#endif
dcassert(aSegment.getOverlapped() == false);
@@ -649,54 +649,89 @@ void QueueItem::getChunksVisualisation(vector& running_, vectorgetBlockedHubs().empty() && includes(source->getBlockedHubs().begin(), source->getBlockedHubs().end(), aOnlineHubs.begin(), aOnlineHubs.end())) {
+bool QueueItem::Source::matchesDownloadQuery(const QueueDownloadQuery& aQuery, string& lastError_) const noexcept {
+ // Only blocked hubs?
+ if (!blockedHubs.empty() && includes(blockedHubs.begin(), blockedHubs.end(), aQuery.onlineHubs.begin(), aQuery.onlineHubs.end())) {
lastError_ = STRING(NO_ACCESS_ONLINE_HUBS);
return false;
}
- //can't download a filelist if the hub is offline... don't be too strict with NMDC hubs
- if (!aUser->isSet(User::NMDC) && (isSet(FLAG_USER_LIST) && !isSet(FLAG_TTHLIST_BUNDLE)) && aOnlineHubs.find(source->getUser().hint) == aOnlineHubs.end()) {
+ // Can't download a filelist if the hub is offline... don't be too strict with NMDC hubs
+ if (!aQuery.user->isSet(User::NMDC) && (isSet(FLAG_USER_LIST) && !isSet(FLAG_TTHLIST_BUNDLE)) && aQuery.onlineHubs.find(user.hint) == aQuery.onlineHubs.end()) {
lastError_ = STRING(USER_OFFLINE);
return false;
}
- dcassert(isSource(aUser));
- if (segmentsDone()) {
+ return true;
+}
+
+bool QueueItem::matchesDownloadType(QueueDownloadType aType) const noexcept {
+ if (aType == QueueDownloadType::SMALL && !usesSmallSlot()) {
+ //don't even think of stealing our priority channel
+ return false;
+ } else if (aType == QueueDownloadType::MCN_NORMAL && usesSmallSlot()) {
return false;
}
- if(aType == TYPE_SMALL && !usesSmallSlot()) {
- //don't even think of stealing our priority channel
+ return true;
+}
+
+bool QueueItem::allowSegmentedDownloads() const noexcept {
+ // Don't try to create multiple connections for filelists or files viewed in client
+ if (isSet(QueueItem::FLAG_USER_LIST) || isSet(QueueItem::FLAG_CLIENT_VIEW)) {
+ return false;
+ }
+
+ // No segmented downloading when getting the tree
+ if (getDownloads()[0]->getType() == Transfer::TYPE_TREE) {
+ return false;
+ }
+
+ return true;
+}
+
+bool QueueItem::hasSegment(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept {
+ if (isPausedPrio())
+ return false;
+
+ dcassert(isSource(aQuery.user));
+ auto source = getSource(aQuery.user);
+
+ // Check source
+ if (!source->matchesDownloadQuery(aQuery, lastError_)) {
+ return false;
+ }
+
+ // Finished?
+ if (segmentsDone()) {
return false;
- } else if (aType == TYPE_MCN_NORMAL && usesSmallSlot()) {
+ }
+
+ // Slot type
+ if (!matchesDownloadType(aQuery.downloadType)) {
return false;
}
- if(isWaiting()) {
+ // See if we have an available segment
+
+ if (isWaiting()) {
return true;
}
-
- // No segmented downloading when getting the tree
- if(getDownloads()[0]->getType() == Transfer::TYPE_TREE) {
+
+ // Running item
+
+ if (!allowSegmentedDownloads()) {
return false;
}
- if(!isSet(QueueItem::FLAG_USER_LIST) && !isSet(QueueItem::FLAG_CLIENT_VIEW)) {
- Segment segment = getNextSegment(getBlockSize(), aWantedSize, aLastSpeed, source->getPartsInfo(), aAllowOverlap);
- if(segment.getSize() == 0) {
- lastError_ = (segment.getStart() == -1 || getSize() < Util::convertSize(SETTING(MIN_SEGMENT_SIZE), Util::KB)) ? STRING(NO_FILES_AVAILABLE) : STRING(NO_FREE_BLOCK);
- dcdebug("No segment for %s (%s) in %s, block " I64_FMT "\n", aUser->getCID().toBase32().c_str(), Util::listToString(aOnlineHubs).c_str(), getTarget().c_str(), blockSize);
- return false;
- }
- } else if (!isWaiting()) {
- //don't try to create multiple connections for filelists or files viewed in client
+ // File segment?
+ auto segment = getNextSegment(getBlockSize(), aQuery.wantedSize, aQuery.lastSpeed, source->getPartsInfo(), aAllowOverlap);
+ if (segment.getSize() == 0) {
+ lastError_ = (segment.getStart() == -1 || getSize() < Util::convertSize(SETTING(MIN_SEGMENT_SIZE), Util::KB)) ? STRING(NO_FILES_AVAILABLE) : STRING(NO_FREE_BLOCK);
+ dcdebug("No segment for %s (%s) in %s, block " I64_FMT "\n", aQuery.user->getCID().toBase32().c_str(), Util::listToString(aQuery.onlineHubs).c_str(), getTarget().c_str(), blockSize);
return false;
}
+
return true;
}
diff --git a/airdcpp/QueueItem.h b/airdcpp/QueueItem.h
index eba2a8f9..5a007410 100644
--- a/airdcpp/QueueItem.h
+++ b/airdcpp/QueueItem.h
@@ -20,6 +20,7 @@
#define DCPLUSPLUS_DCPP_QUEUE_ITEM_H
#include "QueueItemBase.h"
+#include "QueueDownloadInfo.h"
#include "FastAlloc.h"
#include "HintedUser.h"
@@ -146,6 +147,8 @@ class QueueItem : public QueueItemBase {
void setPartsInfo(const PartsInfo& aPartsInfo) noexcept {
partsInfo = aPartsInfo;
}
+
+ bool matchesDownloadQuery(const QueueDownloadQuery& aQuery, string& lastError_) const noexcept;
private:
PartsInfo partsInfo;
@@ -174,7 +177,7 @@ class QueueItem : public QueueItemBase {
void save(OutputStream &save, string tmp, string b32tmp);
int countOnlineUsers() const noexcept;
void getOnlineUsers(HintedUserList& l) const noexcept;
- bool hasSegment(const UserPtr& aUser, const OrderedStringSet& onlineHubs, string& lastError, int64_t wantedSize, int64_t lastSpeed, DownloadType aType, bool allowOverlap) noexcept;
+ bool hasSegment(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept;
bool isPausedPrio() const noexcept;
SourceList& getSources() noexcept { return sources; }
@@ -285,6 +288,9 @@ class QueueItem : public QueueItemBase {
bool isHubBlocked(const UserPtr& aUser, const string& aUrl) const noexcept;
void removeSource(const UserPtr& aUser, Flags::MaskType reason) noexcept;
+ bool matchesDownloadType(QueueDownloadType aType) const noexcept;
+ bool allowSegmentedDownloads() const noexcept;
+
static uint8_t getMaxSegments(int64_t aFileSize) noexcept;
int64_t blockSize = -1;
diff --git a/airdcpp/QueueItemBase.h b/airdcpp/QueueItemBase.h
index 0d5e2a55..9e36c0fb 100644
--- a/airdcpp/QueueItemBase.h
+++ b/airdcpp/QueueItemBase.h
@@ -31,17 +31,8 @@ namespace dcpp {
using std::string;
-typedef uint32_t QueueToken;
-typedef unordered_set QueueTokenSet;
class QueueItemBase : public Flags {
public:
- enum DownloadType {
- TYPE_NONE,
- TYPE_ANY,
- TYPE_SMALL,
- TYPE_MCN_NORMAL
- };
-
QueueItemBase(const string& aTarget, int64_t aSize, Priority aPriority, time_t aAdded, QueueToken aToken, Flags::MaskType aFlags);
const DownloadList& getDownloads() { return downloads; }
diff --git a/airdcpp/QueueManager.cpp b/airdcpp/QueueManager.cpp
index 0a8da4e7..b994ce11 100644
--- a/airdcpp/QueueManager.cpp
+++ b/airdcpp/QueueManager.cpp
@@ -1143,175 +1143,221 @@ bool QueueManager::addValidatedSource(const QueueItemPtr& qi, const HintedUser&
}
-Download* QueueManager::getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs, string& lastError_, string& newUrl, QueueItemBase::DownloadType aType) noexcept{
+QueueManager::DownloadResult QueueManager::getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs) noexcept {
+ const auto& user = aSource.getUser();
+
+ DownloadResult result;
+
QueueItemPtr q = nullptr;
- Download* d = nullptr;
{
- WLock l(cs);
- dcdebug("Getting download for %s...", aSource.getUser()->getCID().toBase32().c_str());
+ // Segments shouldn't be assigned simultaneously for multiple connections
+ Lock slotLock(slotAssignCS);
- const UserPtr& u = aSource.getUser();
- bool hasDownload = false;
+ {
+ auto downloadType = QueueDownloadType::ANY;
+ if (aSource.isSet(UserConnection::FLAG_SMALL_SLOT)) {
+ downloadType = QueueDownloadType::SMALL;
+ } else if (aSource.isMCN()) {
+ downloadType = QueueDownloadType::MCN_NORMAL;
+ }
- q = userQueue.getNext(aSource.getUser(), aRunningBundles, aOnlineHubs, lastError_, hasDownload, Priority::LOWEST, aSource.getChunkSize(), aSource.getSpeed(), aType);
- if (!q) {
- dcdebug("none\n");
- return nullptr;
- }
+ auto startResult = startDownload(aSource.getHintedUser(), downloadType, aRunningBundles, aOnlineHubs, aSource.getSpeed());
+ result.merge(startResult);
+
+ if (!startResult.startDownload) {
+ // dcdebug("none\n");
+ return result;
+ }
- auto source = q->getSource(aSource.getUser());
+ q = startResult.qi;
+ dcassert(q);
+ }
- //update the hub hint
- newUrl = aSource.getHubUrl();
- source->updateDownloadHubUrl(aOnlineHubs, newUrl, (q->isSet(QueueItem::FLAG_USER_LIST) && !q->isSet(QueueItem::FLAG_TTHLIST_BUNDLE)));
+ {
+ WLock l(cs);
- //check partial sources
- if (source->isSet(QueueItem::Source::FLAG_PARTIAL)) {
- Segment segment = q->getNextSegment(q->getBlockSize(), aSource.getChunkSize(), aSource.getSpeed(), source->getPartsInfo(), false);
- if (segment.getStart() != -1 && segment.getSize() == 0) {
- // no other partial chunk from this user, remove him from queue
- userQueue.removeQI(q, u);
- q->removeSource(u, QueueItem::Source::FLAG_NO_NEED_PARTS);
- lastError_ = STRING(NO_NEEDED_PART);
- return nullptr;
+ // Check partial sources
+ auto source = q->getSource(user);
+ if (source->isSet(QueueItem::Source::FLAG_PARTIAL)) {
+ auto segment = q->getNextSegment(q->getBlockSize(), aSource.getChunkSize(), aSource.getSpeed(), source->getPartsInfo(), false);
+ if (segment.getStart() != -1 && segment.getSize() == 0) {
+ // dcdebug("no needed chunks)\n");
+ // no other partial chunk from this user, remove him from queue
+ userQueue.removeQI(q, user);
+ q->removeSource(user, QueueItem::Source::FLAG_NO_NEED_PARTS);
+ result.lastError = STRING(NO_NEEDED_PART);
+ return result;
+ }
}
- }
- // Check that the file we will be downloading to exists
- if (q->getDownloadedBytes() > 0) {
- if (!PathUtil::fileExists(q->getTempTarget())) {
- // Temp target gone?
- q->resetDownloaded();
+ // Check that the file we will be downloading to exists
+ if (q->getDownloadedBytes() > 0) {
+ if (!PathUtil::fileExists(q->getTempTarget())) {
+ // Temp target gone?
+ q->resetDownloaded();
+ }
}
- }
- d = new Download(aSource, *q);
- userQueue.addDownload(q, d);
+ result.download = new Download(aSource, *q);
+ userQueue.addDownload(q, result.download);
+ }
}
fire(QueueManagerListener::ItemSources(), q);
- dcdebug("found %s for %s (" I64_FMT ", " I64_FMT ")\n", q->getTarget().c_str(), d->getToken().c_str(), d->getSegment().getStart(), d->getSegment().getEnd());
- return d;
+ dcdebug("QueueManager::getDownload: found %s for %s (segment " I64_FMT ", " I64_FMT ")\n", q->getTarget().c_str(), result.download->getToken().c_str(), result.download->getSegment().getStart(), result.download->getSegment().getEnd());
+ return result;
}
-bool QueueManager::allowStartQI(const QueueItemPtr& aQI, const QueueTokenSet& runningBundles, string& lastError_, bool mcn /*false*/) noexcept{
- // nothing to download?
- if (!aQI)
- return false;
-
- // override the slot settings for partial lists and small files
- if (aQI->usesSmallSlot())
+bool QueueManager::checkLowestPrioRules(const QueueItemPtr& aQI, const QueueTokenSet& aRunningBundles, string& lastError_) const noexcept {
+ auto& b = aQI->getBundle();
+ if (!b) {
return true;
+ }
+ if (b->getPriority() == Priority::LOWEST) {
+ // Don't start if there are other bundles running
+ if (!aRunningBundles.empty() && aRunningBundles.find(b->getToken()) == aRunningBundles.end()) {
+ lastError_ = STRING(LOWEST_PRIO_ERR_BUNDLES);
+ return false;
+ }
+ }
- // paused?
- if (aQI->isPausedPrio())
- return false;
-
+ if (aQI->getPriority() == Priority::LOWEST) {
+ // Start only if there are no other downloads running in this bundle
+ // (or all bundle downloads belong to this file)
+ auto bundleDownloads = DownloadManager::getInstance()->getBundleDownloadConnectionCount(aQI->getBundle());
- //check if we have free space to continue the download now... otherwise results in paused priority..
- if (aQI->getBundle() && (aQI->getBundle()->getStatus() == Bundle::STATUS_DOWNLOAD_ERROR)) {
- if (File::getFreeSpace(aQI->getBundle()->getTarget()) >= static_cast(aQI->getSize() - aQI->getDownloadedBytes())) {
- setBundleStatus(aQI->getBundle(), Bundle::STATUS_QUEUED);
- } else {
- lastError_ = aQI->getBundle()->getError();
- onDownloadError(aQI->getBundle(), lastError_);
+ RLock l(cs);
+ auto start = bundleDownloads == 0 || bundleDownloads == aQI->getDownloads().size();
+ if (!start) {
+ lastError_ = STRING(LOWEST_PRIO_ERR_FILES);
return false;
}
}
- size_t downloadCount = DownloadManager::getInstance()->getFileDownloadConnectionCount();
- bool slotsFull = (AirUtil::getSlots(true) != 0) && (downloadCount >= static_cast(AirUtil::getSlots(true)));
- bool speedFull = (AirUtil::getSpeedLimitKbps(true) != 0) && (DownloadManager::getInstance()->getRunningAverage() >= Util::convertSize(AirUtil::getSpeedLimitKbps(true), Util::KB));
+ return true;
+}
+
+bool QueueManager::checkDownloadLimits(const QueueItemPtr& aQI, string& lastError_) const noexcept {
+ auto downloadSlots = AirUtil::getSlots(true);
+ auto downloadCount = static_cast(DownloadManager::getInstance()->getFileDownloadConnectionCount());
+ bool slotsFull = downloadSlots != 0 && downloadCount >= downloadSlots;
+
+ auto speedLimit = Util::convertSize(AirUtil::getSpeedLimitKbps(true), Util::KB);
+ auto downloadSpeed = DownloadManager::getInstance()->getRunningAverage();
+ bool speedFull = speedLimit != 0 && downloadSpeed >= speedLimit;
//log("Speedlimit: " + Util::toString(Util::getSpeedLimit(true)*1024) + " slots: " + Util::toString(Util::getSlots(true)) + " (avg: " + Util::toString(getRunningAverage()) + ")");
if (slotsFull || speedFull) {
- size_t slots = AirUtil::getSlots(true);
- bool extraFull = (slots != 0) && (downloadCount >= (slots + static_cast(SETTING(EXTRA_DOWNLOAD_SLOTS))));
- if (extraFull || mcn || aQI->getPriority() != Priority::HIGHEST) {
- lastError_ = slotsFull ? STRING(ALL_DOWNLOAD_SLOTS_TAKEN) : STRING(MAX_DL_SPEED_REACHED);
+ bool extraFull = downloadSlots != 0 && downloadCount >= downloadSlots + SETTING(EXTRA_DOWNLOAD_SLOTS);
+ if (extraFull || aQI->getPriority() != Priority::HIGHEST) {
+ if (slotsFull) {
+ lastError_ = STRING(ALL_DOWNLOAD_SLOTS_TAKEN);
+ } else {
+ lastError_ = STRING(MAX_DL_SPEED_REACHED);
+ }
return false;
}
- return true;
}
- // bundle with the lowest prio? don't start if there are other bundle running
- if (aQI->getBundle() && aQI->getBundle()->getPriority() == Priority::LOWEST && !runningBundles.empty() && runningBundles.find(aQI->getBundle()->getToken()) == runningBundles.end()) {
- lastError_ = STRING(LOWEST_PRIO_ERR_BUNDLES);
- return false;
- }
-
- if (aQI->getPriority() == Priority::LOWEST) {
- if (aQI->getBundle()) {
- // start only if there are no other downloads running in this bundle (or the downloads belong to this file)
- auto bundleDownloads = DownloadManager::getInstance()->getBundleDownloadConnectionCount(aQI->getBundle());
+ return true;
+}
- RLock l(cs);
- bool start = bundleDownloads == 0 || bundleDownloads == aQI->getDownloads().size();
- if (!start) {
- lastError_ = STRING(LOWEST_PRIO_ERR_FILES);
- }
+bool QueueManager::checkDiskSpace(const QueueItemPtr& aQI, string& lastError_) noexcept {
+ auto& b = aQI->getBundle();
+ if (!b) {
+ return true;
+ }
- return start;
+ //check if we have free space to continue the download now... otherwise results in paused priority..
+ if (aQI->getBundle()->getStatus() == Bundle::STATUS_DOWNLOAD_ERROR) {
+ if (File::getFreeSpace(b->getTarget()) >= static_cast(aQI->getSize() - aQI->getDownloadedBytes())) {
+ setBundleStatus(b, Bundle::STATUS_QUEUED);
} else {
- // shouldn't happen at the moment
- dcassert(0);
- return downloadCount == 0;
+ lastError_ = b->getError();
+ onDownloadError(b, lastError_);
+ return false;
}
}
return true;
}
-bool QueueManager::startDownload(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs,
- QueueItemBase::DownloadType aType, int64_t aLastSpeed, string& lastError_) noexcept{
+bool QueueManager::allowStartQI(const QueueItemPtr& aQI, const QueueTokenSet& aRunningBundles, string& lastError_) noexcept{
+ // nothing to download?
+ if (!aQI)
+ return false;
- bool hasDownload = false;
- QueueItemPtr qi = nullptr;
- {
- RLock l(cs);
- qi = userQueue.getNext(aUser, runningBundles, onlineHubs, lastError_, hasDownload, Priority::LOWEST, 0, aLastSpeed, aType);
+ // override the slot settings for partial lists and small files
+ if (aQI->usesSmallSlot())
+ return true;
+
+
+ // paused?
+ if (aQI->isPausedPrio())
+ return false;
+
+ if (!checkDiskSpace(aQI, lastError_)) {
+ return false;
}
- return allowStartQI(qi, runningBundles, lastError_);
-}
+ if (!checkDownloadLimits(aQI, lastError_)) {
+ return false;
+ }
-pair QueueManager::startDownload(const UserPtr& aUser, string& hubHint, QueueItemBase::DownloadType aType,
- QueueToken& bundleToken, bool& allowUrlChange, bool& hasDownload, string& lastError_) noexcept{
+ if (!checkLowestPrioRules(aQI, aRunningBundles, lastError_)) {
+ return false;
+ }
+ return true;
+}
+
+QueueDownloadResult QueueManager::startDownload(const HintedUser& aUser, QueueDownloadType aType) noexcept{
+ auto hubs = ClientManager::getInstance()->getHubSet(aUser.user->getCID());
auto runningBundleTokens = DownloadManager::getInstance()->getRunningBundles();
- auto hubs = ClientManager::getInstance()->getHubSet(aUser->getCID());
- if (!hubs.empty()) {
- QueueItemPtr qi = nullptr;
- {
- RLock l(cs);
- qi = userQueue.getNext(aUser, runningBundleTokens, hubs, lastError_, hasDownload, Priority::LOWEST, 0, 0, aType);
+ return startDownload(aUser, aType, runningBundleTokens, hubs, 0);
+}
- if (qi) {
- if (qi->getBundle()) {
- bundleToken = qi->getBundle()->getToken();
- }
+QueueDownloadResult QueueManager::startDownload(const HintedUser& aUser, QueueDownloadType aType, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs, int64_t aLastSpeed) noexcept {
+ QueueDownloadResult result;
+ if (aOnlineHubs.empty()) {
+ result.lastError = STRING(USER_OFFLINE);
+ return result;
+ }
- if (hubs.find(hubHint) == hubs.end()) {
- //we can't connect via a hub that is offline...
- hubHint = *hubs.begin();
- }
+ QueueDownloadQuery query(aUser, aOnlineHubs, aRunningBundles);
+ query.lastSpeed = aLastSpeed;
+ query.downloadType = aType;
- allowUrlChange = !qi->isSet(QueueItem::FLAG_USER_LIST);
- qi->getSource(aUser)->updateDownloadHubUrl(hubs, hubHint, (qi->isSet(QueueItem::FLAG_USER_LIST) && !qi->isSet(QueueItem::FLAG_TTHLIST_BUNDLE)));
- }
- }
+ {
+ RLock l(cs);
+ auto qi = userQueue.getNext(query, result.lastError, result.hasDownload);
if (qi) {
- bool start = allowStartQI(qi, runningBundleTokens, lastError_);
- return { qi->usesSmallSlot() ? QueueItem::TYPE_SMALL : QueueItem::TYPE_ANY, start };
+ result.qi = qi;
+ if (qi->getBundle()) {
+ result.bundleToken = qi->getBundle()->getToken();
+ }
+
+ if (aOnlineHubs.find(aUser.hint) == aOnlineHubs.end()) {
+ //we can't connect via a hub that is offline...
+ result.hubHint = *aOnlineHubs.begin();
+ }
+
+ result.allowUrlChange = !qi->isSet(QueueItem::FLAG_USER_LIST);
+
+ auto isFilelist = qi->isSet(QueueItem::FLAG_USER_LIST) && !qi->isSet(QueueItem::FLAG_TTHLIST_BUNDLE);
+ qi->getSource(aUser)->updateDownloadHubUrl(aOnlineHubs, result.hubHint, isFilelist);
}
- } else {
- lastError_ = STRING(USER_OFFLINE);
}
- return { QueueItem::TYPE_NONE, false };
+ if (result.qi) {
+ result.startDownload = allowStartQI(result.qi, aRunningBundles, result.lastError);
+ result.downloadType = result.qi->usesSmallSlot() ? QueueDownloadType::SMALL : QueueDownloadType::ANY;
+ }
+
+ return result;
}
QueueItemList QueueManager::findFiles(const TTHValue& tth) const noexcept {
@@ -1339,24 +1385,6 @@ void QueueManager::matchListing(const DirectoryListing& dl, int& matchingFiles_,
newFiles_ = addValidatedSources(dl.getHintedUser(), matchingItems, QueueItem::Source::FLAG_FILE_NOT_AVAILABLE, bundles_);
}
-QueueItemPtr QueueManager::getQueueInfo(const HintedUser& aUser) noexcept {
- OrderedStringSet hubs;
- hubs.insert(aUser.hint);
-
- auto runningBundles = DownloadManager::getInstance()->getRunningBundles();
-
- QueueItemPtr qi = nullptr;
- string lastError_;
- bool hasDownload = false;
-
- {
- RLock l(cs);
- qi = userQueue.getNext(aUser, runningBundles, hubs, lastError_, hasDownload);
- }
-
- return qi;
-}
-
void QueueManager::toggleSlowDisconnectBundle(QueueToken aBundleToken) noexcept {
RLock l(cs);
auto b = bundleQueue.findBundle(aBundleToken);
@@ -1623,12 +1651,12 @@ void QueueManager::onDownloadError(const BundlePtr& aBundle, const string& aErro
setBundleStatus(aBundle, Bundle::STATUS_DOWNLOAD_ERROR);
}
-void QueueManager::putDownloadHooked(Download* aDownload, bool aFinished, bool aNoAccess /*false*/, bool aRotateQueue /*false*/) {
+void QueueManager::putDownloadHooked(Download* d, bool aFinished, bool aNoAccess /*false*/, bool aRotateQueue /*false*/) {
QueueItemPtr q = nullptr;
// Make sure the download gets killed
- unique_ptr d(aDownload);
- aDownload = nullptr;
+ // unique_ptr d(aDownload);
+ // aDownload = nullptr;
d->close();
@@ -1656,13 +1684,13 @@ void QueueManager::putDownloadHooked(Download* aDownload, bool aFinished, bool a
}
if (!aFinished) {
- onDownloadFailed(q, d.get(), aNoAccess, aRotateQueue);
+ onDownloadFailed(q, d, aNoAccess, aRotateQueue);
} else if (q->isSet(QueueItem::FLAG_USER_LIST)) {
- onFilelistDownloadCompletedHooked(q, d.get());
+ onFilelistDownloadCompletedHooked(q, d);
} else if (d->getType() == Transfer::TYPE_TREE) {
- onTreeDownloadCompleted(q, d.get());
+ onTreeDownloadCompleted(q, d);
} else {
- onFileDownloadCompleted(q, d.get());
+ onFileDownloadCompleted(q, d);
}
}
@@ -1732,7 +1760,7 @@ void QueueManager::onFileDownloadRemoved(const QueueItemPtr& aQI, bool aFailed)
} else {
delayEvents.addEvent(aQI->getBundle()->getToken(), [this, checkWaiting] {
checkWaiting();
- }, 1000);
+ }, 1000);
}
}
}
@@ -1799,7 +1827,7 @@ void QueueManager::onFileDownloadCompleted(const QueueItemPtr& aQI, Download* aD
aQI->addFinishedSegment(aDownload->getSegment());
wholeFileCompleted = aQI->segmentsDone();
- dcdebug("Finish segment for %s (" I64_FMT ", " I64_FMT ")\n", aDownload->getToken().c_str(), aDownload->getSegment().getStart(), aDownload->getSegment().getEnd());
+ // dcdebug("Finish segment for %s (" I64_FMT ", " I64_FMT ")\n", aDownload->getToken().c_str(), aDownload->getSegment().getStart(), aDownload->getSegment().getEnd());
if (wholeFileCompleted) {
// Disconnect all possible overlapped downloads
@@ -2994,7 +3022,6 @@ void QueueManager::on(TimerManagerListener::Second, uint64_t aTick) noexcept {
void QueueManager::on(TimerManagerListener::Minute, uint64_t aTick) noexcept {
tasks.addTask([=, this] {
- // requestPartialSourceInfo(aTick);
searchAlternates(aTick);
checkResumeBundles();
});
diff --git a/airdcpp/QueueManager.h b/airdcpp/QueueManager.h
index 7d9ff40a..4ab5bc0f 100644
--- a/airdcpp/QueueManager.h
+++ b/airdcpp/QueueManager.h
@@ -27,7 +27,6 @@
#include "TimerManagerListener.h"
#include "ActionHook.h"
-#include "QueueAddInfo.h"
#include "BundleQueue.h"
#include "DelayedEvents.h"
#include "DupeType.h"
@@ -36,6 +35,8 @@
#include "HashBloom.h"
#include "MerkleTree.h"
#include "Message.h"
+#include "QueueAddInfo.h"
+#include "QueueDownloadInfo.h"
#include "Singleton.h"
#include "StringMatch.h"
#include "TaskQueue.h"
@@ -194,30 +195,18 @@ class QueueManager : public Singleton, public SpeakergetSources(); }
Bundle::SourceList getBadBundleSources(const BundlePtr& b) const noexcept { RLock l(cs); return b->getBadSources(); }
- // Get information about the next valid file in the queue
- // Used for displaying initial information for a transfer before the connection has been established and the real download is created
- QueueItemPtr getQueueInfo(const HintedUser& aUser) noexcept;
-
// Check if a download can be started for the specified user
- //
- // lastError_ will contain the last error why a file can't be started (not cleared if a download is found afterwards)
- // TODO: FINISH
- bool startDownload(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs,
- QueueItemBase::DownloadType aType, int64_t aLastSpeed, string& lastError_) noexcept;
+ QueueDownloadResult startDownload(const HintedUser& aUser, QueueDownloadType aType) noexcept;
- // The same thing but only used before any connect requests
- // newUrl can be changed if the download is for a filelist from a different hub
- // lastError_ will contain the last error why a file can't be started (not cleared if a download is found afterwards)
- // hasDownload will be set to true if there are any files queued from the user
- // TODO: FINISH
- pair startDownload(const UserPtr& aUser, string& hubUrl, QueueItemBase::DownloadType aType, QueueToken& bundleToken,
- bool& allowUrlChange_, bool& hasDownload_, string& lastError_) noexcept;
+ struct DownloadResult : QueueDownloadResultBase {
+ Download* download = nullptr;
+ };
// Creates new download for the specified user
- // This won't check various download limits so startDownload should be called first
+ // Runs all necessary validations in startDownload
// newUrl can be changed if the download is for a filelist from a different hub
// lastError_ will contain the last error why a file can't be started (not cleared if a download is found afterwards)
- Download* getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs, string& lastError_, string& newUrl_, QueueItemBase::DownloadType aType) noexcept;
+ DownloadResult getDownload(UserConnection& aSource, const QueueTokenSet& aRunningBundles, const OrderedStringSet& aOnlineHubs) noexcept;
// Handle an ended transfer
// finished should be true if the file/segment was finished successfully (false if disconnected/failed). Always false for finished trees.
@@ -225,7 +214,6 @@ class QueueManager : public Singleton, public Speaker, public Speaker udp;
@@ -443,7 +432,15 @@ class QueueManager : public Singleton, public Speaker, public SpeakergetQueueInfo(aInfo->getHintedUser());
- if (!qi) {
+ auto result = QueueManager::getInstance()->startDownload(aInfo->getHintedUser(), QueueDownloadType::ANY);
+ if (!result.qi) {
return;
}
auto type = Transfer::TYPE_FILE;
- if (qi->getFlags() & QueueItem::FLAG_PARTIAL_LIST)
+ if (result.qi->getFlags() & QueueItem::FLAG_PARTIAL_LIST)
type = Transfer::TYPE_PARTIAL_LIST;
- else if (qi->getFlags() & QueueItem::FLAG_USER_LIST)
+ else if (result.qi->getFlags() & QueueItem::FLAG_USER_LIST)
type = Transfer::TYPE_FULL_LIST;
aInfo->setType(type);
- aInfo->setTarget(qi->getTarget());
- aInfo->setSize(qi->getSize());
- aInfo->setQueueToken(qi->getToken());
+ aInfo->setTarget(result.qi->getTarget());
+ aInfo->setSize(result.qi->getSize());
+ aInfo->setQueueToken(result.qi->getToken());
}
void TransferInfoManager::on(ConnectionManagerListener::Connecting, const ConnectionQueueItem* aCqi) noexcept {
@@ -308,6 +308,20 @@ namespace dcpp {
starting(aDownload, STRING(REQUESTING), true);
}
+ void TransferInfoManager::on(DownloadManagerListener::Idle, const UserConnection* aConn, const string& aError) noexcept {
+ if (aError.empty()) {
+ return;
+ }
+
+ auto t = findTransfer(aConn->getToken());
+ if (!t) {
+ return;
+ }
+
+ t->setStatusString(aError);
+ onTransferUpdated(t, TransferInfo::UpdateFlags::STATUS);
+ }
+
void TransferInfoManager::starting(const Download* aDownload, const string& aStatus, bool aFullUpdate) noexcept {
auto t = findTransfer(aDownload->getToken());
if (!t) {
diff --git a/airdcpp/TransferInfoManager.h b/airdcpp/TransferInfoManager.h
index 7f4c553a..0546f9b8 100644
--- a/airdcpp/TransferInfoManager.h
+++ b/airdcpp/TransferInfoManager.h
@@ -89,6 +89,7 @@ namespace dcpp {
void on(DownloadManagerListener::Complete, const Download* aDownload, bool) noexcept override;
void on(DownloadManagerListener::Failed, const Download* aDownload, const string& reason) noexcept override;
void on(DownloadManagerListener::Requesting, const Download* aDownload, bool hubChanged) noexcept override;
+ void on(DownloadManagerListener::Idle, const UserConnection* aConn, const string& aError) noexcept override;
void on(UploadManagerListener::Starting, const Upload* aUpload) noexcept override;
void on(UploadManagerListener::Complete, const Upload* aUpload) noexcept override;
diff --git a/airdcpp/UploadBundleInfoSender.cpp b/airdcpp/UploadBundleInfoSender.cpp
index 660080a2..ad29914b 100644
--- a/airdcpp/UploadBundleInfoSender.cpp
+++ b/airdcpp/UploadBundleInfoSender.cpp
@@ -144,7 +144,7 @@ void UploadBundleInfoSender::on(DownloadManagerListener::Starting, const Downloa
}
}*/
-void UploadBundleInfoSender::on(DownloadManagerListener::Idle, const UserConnection* aSource) noexcept {
+void UploadBundleInfoSender::on(DownloadManagerListener::Idle, const UserConnection* aSource, const string&) noexcept {
removeRunningUser(aSource, false);
}
diff --git a/airdcpp/UploadBundleInfoSender.h b/airdcpp/UploadBundleInfoSender.h
index 92527041..0f83b728 100644
--- a/airdcpp/UploadBundleInfoSender.h
+++ b/airdcpp/UploadBundleInfoSender.h
@@ -21,7 +21,6 @@
#include "CriticalSection.h"
#include "Message.h"
-#include "QueueItemBase.h"
#include "User.h"
#include "DownloadManagerListener.h"
@@ -91,7 +90,7 @@ class UploadBundleInfoSender: public DownloadManagerListener, public QueueManage
void on(DownloadManagerListener::Failed, const Download*, const string&) noexcept override;
void on(DownloadManagerListener::BundleTick, const BundleList& aBundles, uint64_t aTick) noexcept override;
void on(DownloadManagerListener::Remove, const UserConnection* aConn) noexcept override;
- void on(DownloadManagerListener::Idle, const UserConnection* aConn) noexcept override;
+ void on(DownloadManagerListener::Idle, const UserConnection* aConn, const string& aError) noexcept override;
unordered_map bundleTokenMap;
diff --git a/airdcpp/UploadQueueManager.cpp b/airdcpp/UploadQueueManager.cpp
index 75ccc574..b374f74c 100644
--- a/airdcpp/UploadQueueManager.cpp
+++ b/airdcpp/UploadQueueManager.cpp
@@ -56,12 +56,8 @@ UploadQueueItem::UploadQueueItem(const HintedUser& _user, const string& _file, i
}
void UploadQueueManager::connectUser(const HintedUser& aUser, const string& aToken) noexcept {
- string lastError;
- string hubUrl = aUser.hint;
- bool protocolError = false;
- ClientManager::getInstance()->connect(aUser.user, aToken, true, lastError, hubUrl, protocolError);
-
- //TODO: report errors?
+ // TODO: report errors?
+ ClientManager::getInstance()->connect(aUser, aToken, true);
}
void UploadQueueManager::connectUser(const HintedUser& aUser) noexcept {
diff --git a/airdcpp/UserConnection.cpp b/airdcpp/UserConnection.cpp
index 7a2f3238..d437dcd8 100644
--- a/airdcpp/UserConnection.cpp
+++ b/airdcpp/UserConnection.cpp
@@ -173,6 +173,17 @@ void UserConnection::setUseLimiter(bool aEnabled) noexcept {
}
}
+void UserConnection::setState(States aNewState) noexcept {
+ if (aNewState == state) {
+ return;
+ }
+
+ state = aNewState;
+ callAsync([this] {
+ fire(UserConnectionListener::State(), this);
+ });
+}
+
void UserConnection::setUser(const UserPtr& aUser) noexcept {
user = aUser;
@@ -458,6 +469,7 @@ UserConnection::UserConnection(bool secure_) noexcept : encoding(SETTING(NMDC_EN
UserConnection::~UserConnection() {
BufferedSocket::putSocket(socket);
+ dcdebug("User connection %s was deleted\n", getToken().c_str());
}
} // namespace dcpp
diff --git a/airdcpp/UserConnection.h b/airdcpp/UserConnection.h
index 80b52dee..e380c499 100644
--- a/airdcpp/UserConnection.h
+++ b/airdcpp/UserConnection.h
@@ -72,8 +72,7 @@ class UserConnection : public Speaker,
FLAG_SUPPORTS_ZLIB_GET = FLAG_SUPPORTS_ADCGET << 1,
FLAG_SUPPORTS_TTHL = FLAG_SUPPORTS_ZLIB_GET <<1,
FLAG_SUPPORTS_TTHF = FLAG_SUPPORTS_TTHL << 1,
- FLAG_RUNNING = FLAG_SUPPORTS_TTHF << 1,
- FLAG_SMALL_SLOT = FLAG_RUNNING << 1,
+ FLAG_SMALL_SLOT = FLAG_SUPPORTS_TTHF << 1,
FLAG_TRUSTED = FLAG_SMALL_SLOT << 1
};
@@ -200,7 +199,7 @@ class UserConnection : public Speaker,
IGETSET(int64_t, speed, Speed, 0);
IGETSET(uint64_t, lastActivity, LastActivity, 0);
GETSET(string, encoding, Encoding);
- IGETSET(States, state, State, STATE_UNCONNECTED);
+ IGETPROP(States, state, State, STATE_UNCONNECTED);
IGETSET(uint8_t, slotType, SlotType, NOSLOT);
const BufferedSocket* getSocket() const noexcept { return socket; }
@@ -218,6 +217,7 @@ class UserConnection : public Speaker,
}
void setUseLimiter(bool aEnabled) noexcept;
+ void setState(States aNewState) noexcept;
private:
void initSocket();
diff --git a/airdcpp/UserConnectionListener.h b/airdcpp/UserConnectionListener.h
index dfcfcd74..c5695337 100644
--- a/airdcpp/UserConnectionListener.h
+++ b/airdcpp/UserConnectionListener.h
@@ -49,7 +49,8 @@ class UserConnectionListener {
typedef X<18> Supports;
typedef X<19> ProtocolError;
typedef X<20> FileNotAvailable;
- typedef X<21> ListLength;
+ typedef X<21> ListLength;
+ typedef X<22> State;
virtual void on(BytesSent, UserConnection*, size_t, size_t) noexcept { }
virtual void on(Connected, UserConnection*) noexcept { }
@@ -71,6 +72,7 @@ class UserConnectionListener {
virtual void on(ListLength, UserConnection*, const string&) noexcept { }
virtual void on(PrivateMessage, UserConnection*, const ChatMessagePtr&) noexcept{}
virtual void on(UserSet, UserConnection*) noexcept {}
+ virtual void on(State, UserConnection*) noexcept {}
virtual void on(AdcCommand::SUP, UserConnection*, const AdcCommand&) noexcept { }
virtual void on(AdcCommand::INF, UserConnection*, const AdcCommand&) noexcept { }
diff --git a/airdcpp/UserQueue.cpp b/airdcpp/UserQueue.cpp
index 3bb81af2..baa73e55 100644
--- a/airdcpp/UserQueue.cpp
+++ b/airdcpp/UserQueue.cpp
@@ -69,35 +69,35 @@ void UserQueue::getUserQIs(const UserPtr& aUser, QueueItemList& ql) noexcept{
}
}
-QueueItemPtr UserQueue::getNext(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs,
- string& lastError_, bool& hasDownload, Priority minPrio, int64_t wantedSize, int64_t lastSpeed, QueueItemBase::DownloadType aType,
- bool allowOverlap /*false*/) noexcept {
+QueueItemPtr UserQueue::getNext(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload_, bool aAllowOverlap) noexcept {
- /* Using the PAUSED priority will list all files */
- auto qi = getNextPrioQI(aUser, onlineHubs, 0, 0, aType, allowOverlap, lastError_);
+ // Using the PAUSED priority will list all files (?)
+ auto qi = getNextPrioQI(aQuery, lastError_, aAllowOverlap /*, 0, 0*/);
if(!qi) {
- qi = getNextBundleQI(aUser, runningBundles, onlineHubs, (Priority)minPrio, wantedSize, lastSpeed, aType, allowOverlap, lastError_, hasDownload);
+ qi = getNextBundleQI(aQuery, lastError_, hasDownload_, aAllowOverlap);
}
- if (!qi && !allowOverlap) {
- //no free segments. let's do another round and now check if there are slow sources which can be overlapped
- qi = getNext(aUser, runningBundles, onlineHubs, lastError_, hasDownload, minPrio, wantedSize, lastSpeed, aType, true);
+ if (!qi && !aAllowOverlap) {
+ // No free segments
+ // Let's do another round and check if there are slow sources which can be overlapped
+ qi = getNext(aQuery, lastError_, hasDownload_, true);
+ }
+
+ if (qi) {
+ hasDownload_ = true;
}
- if (qi)
- hasDownload = true;
return qi;
}
-QueueItemPtr UserQueue::getNextPrioQI(const UserPtr& aUser, const OrderedStringSet& onlineHubs, int64_t wantedSize, int64_t lastSpeed,
- QueueItemBase::DownloadType aType, bool allowOverlap, string& lastError_) noexcept{
+QueueItemPtr UserQueue::getNextPrioQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept{
lastError_ = Util::emptyString;
- auto i = userPrioQueue.find(aUser);
+ auto i = userPrioQueue.find(aQuery.user);
if(i != userPrioQueue.end()) {
dcassert(!i->second.empty());
for(auto& q: i->second) {
- if (q->hasSegment(aUser, onlineHubs, lastError_, wantedSize, lastSpeed, aType, allowOverlap)) {
+ if (q->hasSegment(aQuery, lastError_, aAllowOverlap)) {
return q;
}
}
@@ -105,28 +105,26 @@ QueueItemPtr UserQueue::getNextPrioQI(const UserPtr& aUser, const OrderedStringS
return nullptr;
}
-QueueItemPtr UserQueue::getNextBundleQI(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs,
- Priority minPrio, int64_t wantedSize, int64_t lastSpeed, QueueItemBase::DownloadType aType, bool allowOverlap,
- string& lastError_, bool& hasDownload) noexcept{
+QueueItemPtr UserQueue::getNextBundleQI(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload_, bool aAllowOverlap) noexcept{
lastError_ = Util::emptyString;
auto bundleLimit = SETTING(MAX_RUNNING_BUNDLES);
- auto i = userBundleQueue.find(aUser);
+ auto i = userBundleQueue.find(aQuery.user);
if(i != userBundleQueue.end()) {
dcassert(!i->second.empty());
for (auto& b: i->second) {
- if (bundleLimit > 0 && static_cast(runningBundles.size()) >= bundleLimit && runningBundles.find(b->getToken()) == runningBundles.end()) {
- hasDownload = true;
+ if (bundleLimit > 0 && static_cast(aQuery.runningBundles.size()) >= bundleLimit && aQuery.runningBundles.find(b->getToken()) == aQuery.runningBundles.end()) {
+ hasDownload_ = true;
lastError_ = STRING(MAX_BUNDLES_RUNNING);
continue;
}
- if (b->getPriority() < minPrio) {
+ if (b->getPriority() < aQuery.minPrio) {
break;
}
- auto qi = b->getNextQI(aUser, onlineHubs, lastError_, minPrio, wantedSize, lastSpeed, aType, allowOverlap);
+ auto qi = b->getNextQI(aQuery, lastError_, aAllowOverlap);
if (qi) {
return qi;
}
diff --git a/airdcpp/UserQueue.h b/airdcpp/UserQueue.h
index bd0ef7aa..34d215a2 100644
--- a/airdcpp/UserQueue.h
+++ b/airdcpp/UserQueue.h
@@ -33,13 +33,9 @@ class UserQueue {
void addQI(const QueueItemPtr& qi, const HintedUser& aUser, bool aIsBadSource = false) noexcept;
void getUserQIs(const UserPtr& aUser, QueueItemList& ql) noexcept;
- QueueItemPtr getNext(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs, string& lastError_, bool& hasDownload,
- Priority minPrio = Priority::LOWEST, int64_t wantedSize = 0, int64_t lastSpeed = 0, QueueItemBase::DownloadType aType = QueueItem::TYPE_ANY, bool allowOverlap = false) noexcept;
- QueueItemPtr getNextPrioQI(const UserPtr& aUser, const OrderedStringSet& onlineHubs, int64_t wantedSize, int64_t lastSpeed,
- QueueItemBase::DownloadType aType, bool allowOverlap, string& lastError_) noexcept;
- QueueItemPtr getNextBundleQI(const UserPtr& aUser, const QueueTokenSet& runningBundles, const OrderedStringSet& onlineHubs,
- Priority minPrio, int64_t wantedSize, int64_t lastSpeed, QueueItemBase::DownloadType aType,
- bool allowOverlap, string& lastError_, bool& hasDownload) noexcept;
+ QueueItemPtr getNext(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload_, bool aAllowOverlap = false) noexcept;
+ QueueItemPtr getNextPrioQI(const QueueDownloadQuery& aQuery, string& lastError_, bool aAllowOverlap) noexcept;
+ QueueItemPtr getNextBundleQI(const QueueDownloadQuery& aQuery, string& lastError_, bool& hasDownload, bool aAllowOverlap) noexcept;
void addDownload(const QueueItemPtr& qi, Download* d) noexcept;
void removeDownload(const QueueItemPtr& qi, const string& aToken) noexcept;
diff --git a/airdcpp/forward.h b/airdcpp/forward.h
index 708b74a1..da306d09 100644
--- a/airdcpp/forward.h
+++ b/airdcpp/forward.h
@@ -150,6 +150,8 @@ class OutputStream;
class PrivateChat;
typedef std::shared_ptr PrivateChatPtr;
+typedef uint32_t QueueToken;
+typedef unordered_set QueueTokenSet;
class QueueItemBase;
class QueueItem;