Upload queuing code.

Technical discussion about the NMDC and <a href="http://dcpp.net/ADC.html">ADC</A> protocol. The NMDC protocol is documented in the <a href="http://dcpp.net/wiki/">Wiki</a>, so feel free to refer to it.

Moderator: Moderators

Locked
cologic
Programmer
Posts: 337
Joined: 2003-01-06 13:32
Contact:

Upload queuing code.

Post by cologic » 2003-02-25 23:37

I'm not sure of the etiquette of largeish chunks of code here, but I took a diff of my .232 and the stock .232 and extracted what should be the upload queuing parts, as there's been interest expressed in a the ratings server thread about it. There are a couple of other changes interweaved into it, but removing those would be more work than I cared to do at 11:30 at night... Er, I guess I could put it on a web server, too, oh well.

Code: Select all

diff -burNd DCPlusPlus-0.232-src/client/ClientManager.h DCPlusPlus-0.232-src-bc/client/ClientManager.h
--- DCPlusPlus-0.232-src/client/ClientManager.h	2003-02-03 18:10:00.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/ClientManager.h	2003-02-16 13:52:34.000000000 -0500
@@ -107,10 +109,48 @@
 		fire(ClientManagerListener::USER_UPDATED, aUser);
 	}
 	
+	string getNickIP(const string& Nick) const {
+		StringMap::const_iterator it;
+		if ((it = nickList.find(Nick)) != nickList.end())
+			return it->second;
+		else
+			return Nick;
+	}
+
+	string getIPNick(const string& IP) const {
+		HubNickMap::const_iterator it;
+		if ((it = ipList.find(IP)) != ipList.end() && !it->second.nick.empty())
+			return it->second.nick;
+		else
+			return IP;
+	}
+
+	int getIPClientEmulation(const string& IP) const {
+		HubNickMap::const_iterator it;
+		if ((it = ipList.find(IP)) != ipList.end())
+			return it->second.clientEmulation;
+		else
+			return SETTING(CLIENT_EMULATION);
+	}
+
+	void setIPNick(const string& IP, const string& Nick) {
+		ipList[IP].nick = Nick;
+		nickList[Nick] = IP;
+	}
+
+	void setIPClientEmulation(const string& IP, int clientEmulation) {
+		ipList[IP].clientEmulation = clientEmulation;
+	}
 private:
+	struct HubNick {
+		string nick;
+		int clientEmulation;
+	};
+
 	typedef HASH_MULTIMAP<string, User::Ptr> UserMap;
 	typedef UserMap::iterator UserIter;
 	typedef pair<UserIter, UserIter> UserPair;
+	typedef map<string, HubNick> HubNickMap;
 
 	Client::List clients;
 	CriticalSection cs;
@@ -119,6 +159,9 @@
 	UserMap users;
 	Socket s;
 
+	StringMap nickList;
+	HubNickMap ipList;
+	
 	friend class Singleton<ClientManager>;
 	ClientManager() : minutes(0) { 
 		TimerManager::getInstance()->addListener(this); 
@@ -140,6 +183,9 @@
 	// TimerManagerListener
 	void onAction(TimerManagerListener::Types type, u_int8_t aTick) throw();
 	void onTimerMinute(u_int8_t aTick);
+
+	// SearchManagerListener
+	void onAction(SearchManagerListener::Types types, const string &aLine) throw();
 };
 
 #endif // !defined(AFX_CLIENTMANAGER_H__8EF173E1_F7DC_40B5_B2F3_F92297701034__INCLUDED_)
diff -burNd DCPlusPlus-0.232-src/client/ConnectionManager.cpp DCPlusPlus-0.232-src-bc/client/ConnectionManager.cpp
--- DCPlusPlus-0.232-src/client/ConnectionManager.cpp	2003-02-03 18:10:00.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/ConnectionManager.cpp	2003-02-18 10:18:24.000000000 -0500
@@ -335,7 +336,12 @@
 
 	if( aSource->isSet(UserConnection::FLAG_INCOMING) ) {
 		aSource->myNick(aSource->getUser()->getClientNick()); 
-		aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk());
+		int clientEmu = aSource->getUser()->getClientEmulation();
+		aSource->lock(CryptoManager::getInstance()->makeLock(clientEmu), CryptoManager::getInstance()->makePk(clientEmu));
+ 	} else {
+ 		//we know the user's IP and port now
+ 		ClientManager::getInstance()->setIPNick(aSource->getRemoteIp(), aNick);
+		ClientManager::getInstance()->setIPClientEmulation(aSource->getRemoteIp(), aSource->getUser()->getClientEmulation());
 	}
 
 	aSource->setState(UserConnection::STATE_LOCK);
diff -burNd DCPlusPlus-0.232-src/client/ConnectionManager.h DCPlusPlus-0.232-src-bc/client/ConnectionManager.h
--- DCPlusPlus-0.232-src/client/ConnectionManager.h	2003-02-04 09:45:20.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/ConnectionManager.h	2003-02-16 13:28:33.000000000 -0500
@@ -92,6 +92,7 @@
 		socket.waitForConnections(aPort);
 	}
 
+	ServerSocket GetServerSocket() const { return socket; }
 private:
 
 	CriticalSection cs;
diff -burNd DCPlusPlus-0.232-src/client/Socket.cpp DCPlusPlus-0.232-src-bc/client/Socket.cpp
--- DCPlusPlus-0.232-src/client/Socket.cpp	2003-02-03 18:10:00.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/Socket.cpp	2003-02-23 22:17:04.000000000 -0500
@@ -44,6 +46,54 @@
 
 Socket::Stats Socket::stats = { 0, 0, 0, 0 };
 
+// Set storage for max_send_bytes and cs
+u_int32_t Socket::max_send_bytes = 0;
+u_int32_t Socket::max_receive_bytes = 0;
+CriticalSection Socket::cs;
+ 
+// Set max_send_bytes (make sure access is synchronized)
+// The CriticalSection may not be needed (being cautious)
+void Socket::setSendRate(u_int32_t rate)
+{
+	Socket::cs.enter();
+	Socket::max_send_bytes = rate;
+	Socket::cs.leave();
+}
+
+// Get max_send_bytes (make sure access is synchronized)
+// The CriticalSection may not be needed (being cautious)
+u_int32_t Socket::getSendRate()
+{
+	Socket::cs.enter();
+	u_int32_t i = Socket::max_send_bytes;
+	Socket::cs.leave();
+	return i;
+}
+
+void Socket::setReceiveRate(u_int32_t rate)
+{
+	Socket::cs.enter();
+	Socket::max_receive_bytes = rate;
+	Socket::cs.leave();
+}
+
+u_int32_t Socket::getReceiveRate()
+{
+	Socket::cs.enter();
+	u_int32_t i = Socket::max_receive_bytes;
+	Socket::cs.leave();
+	return i;
+}
+
+string Socket::getRemoteIp() const {
+	sockaddr_in sock_addr;
+	socklen_t len = sizeof(sock_addr);
+	if(getpeername(sock, (sockaddr*)&sock_addr, &len) == 0) {
+		return string(inet_ntoa(sock_addr.sin_addr)) + ":" + string(Util::toString((sock_addr.sin_port >> 8) | (sock_addr.sin_port << 8 & 0xffff)));
+	}
+	return Util::emptyString;
+}
+
 string SocketException::errorToString(int aError) {
 	switch(aError) {
 	case EWOULDBLOCK:
diff -burNd DCPlusPlus-0.232-src/client/Socket.h DCPlusPlus-0.232-src-bc/client/Socket.h
--- DCPlusPlus-0.232-src/client/Socket.h	2003-02-03 18:25:36.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/Socket.h	2003-02-23 22:05:56.000000000 -0500
@@ -214,6 +216,17 @@
 	/** When socks settings are updated, this has to be called... */
 	static void socksUpdated();
 
+ 	string getRemoteIp() const;
+ 
+ 	static void setSendRate(u_int32_t rate);
+ 	static u_int32_t getSendRate();
+ 	static void setReceiveRate(u_int32_t rate);
+ 	static u_int32_t getReceiveRate();
+ 
+ 	static u_int32_t max_send_bytes;	// Maximum number of bytes to send per second
+ 	static u_int32_t max_receive_bytes;	// Maximum number of bytes to receive per second
+ 	static CriticalSection cs;			// Critical section for R/W protection
+
 	GETSETREF(string, ip, Ip);
 protected:
 	SOCKET sock;
diff -burNd DCPlusPlus-0.232-src/client/UploadManager.cpp DCPlusPlus-0.232-src-bc/client/UploadManager.cpp
--- DCPlusPlus-0.232-src/client/UploadManager.cpp	2003-02-04 13:04:04.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/UploadManager.cpp	2003-02-15 01:48:15.000000000 -0500
@@ -24,6 +24,7 @@
 #include "LogManager.h"
 #include "ShareManager.h"
 #include "ClientManager.h"
+#include "HubManager.h"
 
 UploadManager* Singleton<UploadManager>::instance = NULL;
 
@@ -80,18 +87,19 @@
 
 	if( (!aSource->isSet(UserConnection::FLAG_HASSLOT)) && 
 		(getFreeSlots()<=0) && 
-		(ui == reservedSlots.end()) ) 
+		(ui == reservedSlots.end()) && !HubManager::getInstance()->isFavoriteUser(aSource->getUser()) ) 
 	{
 		dcdebug("Average speed: %s/s\n", Util::formatBytes(UploadManager::getInstance()->getAverageSpeed()).c_str());
 		if( ((getLastAutoGrant() + 30*1000) > GET_TICK()) || (SETTING(MIN_UPLOAD_SPEED) == 0) || ( (SETTING(MIN_UPLOAD_SPEED)*1024) < UploadManager::getInstance()->getAverageSpeed() ) ) {
 			if( !(smallfile || userlist) ||
 				!(aSource->isSet(UserConnection::FLAG_HASEXTRASLOT) || (getFreeExtraSlots() > 0) || (aSource->getUser()->isSet(User::OP)) ) || 
-				!(aSource->getUser()->isSet(User::DCPLUSPLUS)) 
+				(!(aSource->getUser()->isSet(User::DCPLUSPLUS)) && (aSource->getUser()->getClientEmulation() != SettingsManager::CLIENT_EMULATION_NMDC))
 				) 
 			{
 
 				cs.leave();
 				aSource->maxedOut();
+				addFailedUpload(aSource, aFile);
 				removeConnection(aSource);
 				return false;
 			}
@@ -246,12 +253,59 @@
 	removeUpload(u);
 }
 
+void UploadManager::notifyQueuedUsers()
+{
+	if (!BOOLSETTING(UPLOAD_QUEUING)) return;
+ 	int freeSlots = getFreeSlots();
+ 
+ 	//while all contacted users may not connect, many probably will; it's fine that the rest are filled with randomly allocated slots
+ 	while (freeSlots) {
+ 		//get rid of offline users
+ 		while (!waitingUsers.empty() && !waitingUsers.front()->isOnline()) waitingUsers.pop_front();
+ 		if (waitingUsers.empty()) break;		//no users to notify
+ 
+ 		waitingUsers.front()->halfconnect();
+ 		--freeSlots;
+ 
+ 		waitingUsers.pop_front();
+ 	}
+}
+ 
+void UploadManager::addFailedUpload(UserConnection::Ptr source, string filename)
+{
+ 	SlotQueue::iterator userPos = find(waitingUsers.begin(), waitingUsers.end(), source->getUser());
+ 
+ 	if (userPos == waitingUsers.end())
+ 	{
+ 		//user is not in queue. add him.
+ 		waitingUsers.push_back(source->getUser());
+ 
+ 		//set userPos to point to him for later use
+ 		userPos = waitingUsers.end() - 1;
+ 	}
+ 
+ 	//maintain list of files the user's searched for
+ 	waitingFiles[source->getUser()].insert(filename);
+}
+ 
+void UploadManager::clearUserFiles(UserConnection::Ptr source)
+{
+ 	//run this when a user's got a slot. It clears the list of files he attempted unsuccessfully download, as well as the user himself from the queue
+ 
+ 	SlotQueue::iterator sit = find(waitingUsers.begin(), waitingUsers.end(), source->getUser());
+ 	if (sit != waitingUsers.end()) waitingUsers.erase(sit);
+ 
+ 	FilesMap::iterator fit = waitingFiles.find(source->getUser());
+ 	if (fit != waitingFiles.end()) waitingFiles.erase(source->getUser());
+}
+
 void UploadManager::removeConnection(UserConnection::Ptr aConn) {
 	dcassert(aConn->getUpload() == NULL);
 	aConn->removeListener(this);
 	if(aConn->isSet(UserConnection::FLAG_HASSLOT)) {
 		running--;
 		aConn->unsetFlag(UserConnection::FLAG_HASSLOT);
+		notifyQueuedUsers();
 	} 
 	if(aConn->isSet(UserConnection::FLAG_HASEXTRASLOT)) {
 		extra--;
diff -burNd DCPlusPlus-0.232-src/client/UploadManager.h DCPlusPlus-0.232-src-bc/client/UploadManager.h
--- DCPlusPlus-0.232-src/client/UploadManager.h	2003-02-03 18:10:00.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/UploadManager.h	2003-02-15 01:48:15.000000000 -0500
@@ -94,10 +116,14 @@
 		conn->setState(UserConnection::STATE_GET);
 	}
 
-	GETSET(int, running, Running);
+	void notifyQueuedUsers();
+	void clearQueue() { waitingUsers.clear(); waitingFiles.clear(); }
+	void setRunning(int _running) { running = _running; notifyQueuedUsers(); }
+	
 	GETSET(int, extra, Extra);
 	GETSET(u_int32_t, lastAutoGrant, LastAutoGrant);
 private:
+	int running;
 	Upload::List uploads;
 	CriticalSection cs;
 
@@ -105,6 +131,20 @@
 	typedef SlotMap::iterator SlotIter;
 	SlotMap reservedSlots;
 
+	//two parallel structures to implement the users-waiting list
+	//this one merely lists the user who are waiting for slots
+	typedef deque<User::Ptr> SlotQueue;
+	SlotQueue waitingUsers;
+ 
+	//this keeps track of which files they've requested
+	//set of files which this user's searched for
+	typedef hash_map<User::Ptr, set<string>, User::HashFunction> FilesMap;
+	FilesMap waitingFiles;
+
+	//functions for manipulating waitingFiles and waitingUsers
+	void addFailedUpload(UserConnection::Ptr source, string filename);
+	void clearUserFiles(UserConnection::Ptr source);
+	
 	friend class Singleton<UploadManager>;
 	UploadManager() throw();
 	virtual ~UploadManager() throw();
diff -burNd DCPlusPlus-0.232-src/client/UserConnection.h DCPlusPlus-0.232-src-bc/client/UserConnection.h
--- DCPlusPlus-0.232-src/client/UserConnection.h	2003-02-03 18:10:00.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/client/UserConnection.h	2003-02-23 22:05:35.000000000 -0500
@@ -56,6 +56,7 @@
 		MY_NICK,
 		TRANSMIT_DONE,
 		SUPPORTS,
+		PING,
 		FILE_NOT_AVAILABLE
 	};
 
@@ -164,7 +165,8 @@
 		FLAG_HASEXTRASLOT = FLAG_HASSLOT << 1,
 		FLAG_INVALIDKEY = FLAG_HASEXTRASLOT << 1,
 		FLAG_SUPPORTS_BZLIST = FLAG_INVALIDKEY << 1,
-		FLAG_SUPPORTS_GETZBLOCK = FLAG_SUPPORTS_BZLIST << 1
+		FLAG_SUPPORTS_GETZBLOCK = FLAG_SUPPORTS_BZLIST << 1,
+		FLAG_SUPPORTS_ZEBEDEE = FLAG_SUPPORTS_GETZBLOCK << 1
 	};
 	
 	enum States {
@@ -237,6 +239,7 @@
 
 	User::Ptr& getUser() { return user; };
 
+	string getRemoteIp() const { return socket->getRemoteIp(); }
 	Download* getDownload() { dcassert(isSet(FLAG_DOWNLOAD)); return download; };
 	void setDownload(Download* d) { dcassert(isSet(FLAG_DOWNLOAD)); download = d; };
 	Upload* getUpload() { dcassert(isSet(FLAG_UPLOAD)); return upload; };
diff -burNd DCPlusPlus-0.232-src/windows/UploadPage.cpp DCPlusPlus-0.232-src-bc/windows/UploadPage.cpp
--- DCPlusPlus-0.232-src/windows/UploadPage.cpp	2003-02-13 22:08:58.000000000 -0500
+++ DCPlusPlus-0.232-src-bc/windows/UploadPage.cpp	2003-02-15 01:48:16.000000000 -0500
@@ -26,6 +26,7 @@
 #include "../client/Util.h"
 #include "../client/ShareManager.h"
 #include "../client/SettingsManager.h"
+#include "../client/UploadManager.h"
 
 #ifdef _DEBUG
 #define new DEBUG_NEW
@@ -79,6 +81,7 @@
 
 	if(SETTING(SLOTS) < 1)
 		settings->set(SettingsManager::SLOTS, 1);
+	UploadManager::getInstance()->notifyQueuedUsers();
 
 	// Do specialized writing here
 	ShareManager::getInstance()->refresh();

GargoyleMT
DC++ Contributor
Posts: 3212
Joined: 2003-01-07 21:46
Location: .pa.us

Re: Upload queuing code.

Post by GargoyleMT » 2003-03-03 20:30

cologic wrote:I'm not sure of the etiquette of largeish chunks of code here, but I took a diff of my .232 and the stock .232 and extracted what should be the upload queuing parts, as there's been interest expressed in a the ratings server thread about it. There are a couple of other changes interweaved into it, but removing those would be more work than I cared to do at 11:30 at night... Er, I guess I could put it on a web server, too, oh well.
Could you explain about half-connect, and the need to do the queue by IPs? If you send an ${Rev}ConnectToMe, will the client not pull a queued download and start transferring when it establishes a connection to you?

Gratch06
Posts: 141
Joined: 2003-05-25 01:48
Location: USA

Post by Gratch06 » 2003-08-17 18:17

bump!

I'm curious about GargoyleMT's questions as well, and if there is a "new and improved" upload queueing system that reports the queue position to the client (and then displays it on the client side for clients that support it ).
Sedulus[url=http://dcplusplus.sourceforge.net/forum/viewtopic.php?t=4460]here[/url] wrote:first of all it would require a $Error You are #n in queue|
Would this handle the reporting issues?

-Gratch06

Locked