#include #include "Protocol.h" #include "Protocols.h" #include "debug.h" #include "extensions.h" #include "Parser.h" using namespace std; TIPsDatabase *Protocol::m_db = 0; //---------------------------------------------- Base Protocol class ------------------------------------------------------- //runs a virtual description() function to access the correct static property ostream& operator<<(ostream &o, Protocol &p) {o << p.description();return o;} Protocol::Protocol(TIPsDatabase *_db, DomainConnection *_dc, InternetURIRequest *_ir, InternetResource **_resource, char *_buffer, const size_t _buffersize, PersistentData *_permData, const bool _manageBuffer): m_dc(_dc), m_ir(_ir), m_resource(_resource), m_buffer(_buffer), m_buffersize(_buffersize), m_permData(_permData), m_manageBuffer(_manageBuffer), Stream(), m_contentType(&ContentType::notStatedContentType), m_contentLanguage(notStatedLanguage), m_charSet(notStatedCharSet), m_compression(noCompression), m_contentEncoding(noContentEncoding), m_conversationEndScheme(notStatedConversationEndScheme), m_totalBytesTransfered(0), m_chunkedCurrentLabelPos(0), m_chunkedLastSize(0), m_keepalive(0), m_connectionKeepAlive(0), m_status(created), m_conversationResult(notStarted), m_domain(m_ir->domain()->m_domain) //cached here as used often { if (!m_db && _db) m_db = _db; //first Protocol ever: sometimes it may not want to use a DB though *m_buffer = 0; //zero end the buffer immediately just in case something goes wrong *m_resource = 0; //invalidate InternetResource immediately m_dc->claim(this); //claim the event sink of the DC (singular): throws Exception if already claimed //pthread stuff to make protocol synchronous as it talks with the asunchronous DC DEBUGPRINT("[%s]: Protocol mutex and cond init", DEBUG_CHECK, m_domain); MEMBER_INIT_COND(m_conversation_cond); MEMBER_INIT_MUTEX(m_conversation_mutex); DEBUG_RESULT_OK; } Protocol::~Protocol() { //will only release the DC if this is the current listener to avoid releasing a different listener //because the listener is removed from the DC no further events will be triggered onto the Protocol from the DC //events from the select will still arrive at the DC but listener will == 0 //mutex ensures that the getResource(...) and finishConversation(...) have fully exited before allowing delete of this int ret = pthread_mutex_lock(&m_conversation_mutex); //mutex owned and specific to this single-threaded Spider, no other threads can claim it if (ret) DEBUGERROR("[%s]: pthread_mutex_lock returned [%i] (%s)", m_domain, ret, ret == EINVAL ? "EINVAL" : "?"); m_dc->release(this); //waits until outside the listener event loop (after all the finishConversation(...)) ret = pthread_mutex_unlock(&m_conversation_mutex); if (ret) DEBUGERROR("[%s]: pthread_mutex_unlock returned [%i] (%s)", m_domain, ret, ret == EINVAL ? "EINVAL" : "?" ); pthread_cond_destroy (&m_conversation_cond); pthread_mutex_destroy(&m_conversation_mutex); } Protocol *Protocol::createProtocol( //caller deletes result TIPsDatabase *_db, //for creating InternetResource DomainConnection *_dc, //for the Protocol to talk to the server on (held by the Spider) InternetURIRequest *_ir, //The URI request (contains breakdown of the URI) InternetResource **_resource, //the resultant resource compiled by getResource() call char *_buffer, //the buffer to pass to the DC for replies from the server (held by the Spider) const size_t _buffersize, PersistentData *_permData, //a previous protocol instance that may need to sync persistent session data const bool _manageBuffer) { //caller responsible for deleting result if (_ir) { switch (_ir->protocolEnum()) { case http: return new HTTP (_db, _dc, _ir, _resource, _buffer, _buffersize, _permData, _manageBuffer); //case https: return new HTTPS(_db, _dc, _ir, _resource, _buffer, _buffersize, _permData, _manageBuffer); //case ftp: return new FTP (_db, _dc, _ir, _resource, _buffer, _buffersize, _permData, _manageBuffer); } } DEBUGPRINT0("UnknownProtocol", DEBUG_LINE); throw UnknownProtocol(); //can mean that a _ir == 0 is invalid return 0; } //-------------------------------------- stream events ---------------------------------------------- Protocol::conversationResult Protocol::getResource(const u_int timeout) { //default: 10 seconds //synchronous call: the asynchronous nature of the underlying communication is hidden here (by a conditional wait) //Called by the Spider thread. The DC event loop thread will release the conditional wait via Protocol callback functions DEBUGPRINT("[%s]: getResource(%s)", DEBUG_LINE, m_domain, m_ir->absoluteURL()); struct timespec ts; ts.tv_nsec = 0; //don't care about the fine ts.tv_sec = (long) (time(0) + timeout); //add the timeout (default 10 secs) //this thread must now wait for the DC static event loop thread to return and complete the conversation in this Protocol //all calls that want to end the conversation MUST call finishConversation() to release this conditional wait //mutex is locked here to make sure that the condition is waited before it is signalled int ret = pthread_mutex_lock(&m_conversation_mutex); //mutex owned and specific to this single-threaded Spider, no other threads can claim it if (ret) DEBUGERROR("[%s]: pthread_mutex_lock returned [%i] (%s)", m_domain, ret, ret==EINVAL?"EINVAL":"?"); //this call will add the DC's fd into the write fd_set thus registering it for callback events (timeout/fail and write events) //start conversation m_status = connecting; m_dc->waitForWrite(generateRequest()); //generated in buffer owned by the caller, not freed by DC //implicit call to pthread_mutex_unlock() in pthread_cond_timedwait() on init ret = pthread_cond_timedwait(&m_conversation_cond, &m_conversation_mutex, &ts); //zero indicates normal exit //implicit call to pthread_mutex_lock() from pthread_cond_timedwait() on exit if (ret) { m_conversationResult = globaltimeout; m_dc->waitForNothing(); //this Protocol may be deleted on return from function so finalise DC comms DEBUGERROR("[%s]: getResource(%s) timed out waiting for the DC response", m_domain, m_ir->absoluteURL()); } DEBUGPRINT("[%s]: Protocol finished", DEBUG_LINE, m_domain); ret = pthread_mutex_unlock(&m_conversation_mutex); if (ret) DEBUGERROR("[%s]: pthread_mutex_unlock returned [%i] (%s)", m_domain, ret, ret == EINVAL ? "EINVAL" : "?" ); if (m_conversationResult == exception) { //exception transfer from DC thread to Spider thread DEBUGERROR("[%s]: rethrow passed DC Exception", m_domain); throw m_exception; } return m_conversationResult; } void Protocol::finishConversation(conversationResult _conversationResult) { //finishes the conversation, releasing the getResource() function //called by the DC event loop thread via Protocol callback functions //signals the Spider thread in Protocol::getResource() to continue (and probably delete this Protocol) int ret; m_conversationResult = _conversationResult; #ifdef _DEBUG const char *results[] = {"notStarted", "unknownConversationResult", "busy", "exception", "globaltimeout", "timeout", "unknownCharset", "unknownContentEncoding", "contentDecodeFailed", "unknownContenttype", "noBody", "failed", "maybe", "some", "ok"}; DEBUGPRINT("[%s]: finishConversation(%s) = [%s]", DEBUG_LINE, m_domain, m_ir->absoluteURL(), results[m_conversationResult]); #endif //cannot cancel the listener here because it must occur outside the critical event loop section //this Protocol may be deleted on return from function so finalise DC comms m_dc->waitForNothing(); //we need to lock the mutex because the finish thread might arrive before the condition is locked //also ~Protocol() may attempt to delete this from under us during this function ret = pthread_mutex_lock(&m_conversation_mutex); //mutex owned and specific to this single-threaded Spider, no other threads can claim it if (ret) DEBUGERROR("[%s]: pthread_mutex_lock returned [%i] (%s)", m_domain, ret, ret == EINVAL ? "EINVAL" : "?"); ret = pthread_cond_signal(&m_conversation_cond); //release the getResource() function from it's lock so that it synchronously returns to the caller if (ret) DEBUGERROR("[%s]: pthread_cond_signal returned [%i] (%s)", m_domain, ret, ret == EINVAL ? "EINVAL" : "?"); ret = pthread_mutex_unlock(&m_conversation_mutex); if (ret) DEBUGERROR("[%s]: pthread_mutex_unlock returned [%i] (%s)", m_domain, ret, ret == EINVAL ? "EINVAL" : "?" ); } const bool Protocol::checkForEOS(const char *eos) const { size_t leneos; if (eos) leneos = strlen(eos); const char *bufPos = m_buffer + m_totalBytesTransfered; return eos && leneos && bufPos-leneos >= m_buffer && !_STRNCMP(bufPos - leneos, eos, leneos); } const int Protocol::chunkSize(const char *chunkLabel, size_t *chunkSize, size_t *labelLength) { const char *currentChunkSizeFieldEnd = 0; size_t currentChunkSizeFieldLength = 0; char chunkSizeFieldStr[18]; //max 32 bit integer with 0 terminator = 9 chars, use 18 for future compat int ret = 1; *chunkSize = 0; if (chunkLabel) { if (currentChunkSizeFieldEnd = strstr(chunkLabel, "\r\n")) { currentChunkSizeFieldLength = currentChunkSizeFieldEnd - chunkLabel; if (labelLength) *labelLength = currentChunkSizeFieldLength; if (currentChunkSizeFieldLength < 18) { strncpy(chunkSizeFieldStr, chunkLabel, currentChunkSizeFieldLength); chunkSizeFieldStr[currentChunkSizeFieldLength] = 0; //0 terminate if (xtoi(chunkSizeFieldStr, chunkSize)) {DEBUGERROR("[%s]: failed to convert chunk to i", m_domain);} else ret = 0; } else DEBUGERROR("[%s]: chunk size field too big (>18)!", m_domain); } else DEBUGERROR("[%s]: can't find end of chunk size field", m_domain); } else DEBUGERROR("[%s]: *chunkLabel = 0!", m_domain); return ret; } Protocol::conversationResult Protocol::finalise() { //management function that just groups functionality for derived Protocol::conversationResult ret = notStarted; if ((ret = decompress()) != ok) return ret; //compresion like gzip (compression enum) if ((ret = contentDecode()) != ok) return ret; //things like chunked transfer encoding (contentEncoding enum) if ((ret = charsetDecode()) != ok) return ret; // * -> UTF-8 (charSet enum) if ((ret = translate()) != ok) return ret; // * -> english (contentLanguage enum) if ((ret = createInternetResource()) != ok) return ret; // **InternetResource (contentType enum) ret = updatePersistentData(); //session data etc. cross-Protocol return ret; } //-------------------------------------- DC event sink ---------------------------------------------- void Protocol::customException(DomainConnection::DomainConnectionException &e) { //custom Exception on event loop thread //e.g. InvalidSocket() //need to transfer the exception to the Protocol (Spider) thread DEBUGERROR("[%s]: custom exception", m_domain); m_exception = e; finishConversation(exception); } void Protocol::timedoutOnRead() { DEBUGERROR("[%s]: timedout waiting for read after %i bytes", m_domain, m_totalBytesTransfered); finalise(); //analyse what we have and then continue finishConversation(m_totalBytesTransfered?some:timeout); } void Protocol::timedoutOnWrite() { DEBUGERROR("[%s]: timedout waiting for write", m_domain); m_status = reconnecting; m_dc->closeConnectedSocket(); m_dc->connect(); m_dc->waitForConnect(); //retry current page request (no finalise) } void Protocol::threwException() { if (m_dc->consecutiveConnectionFails() < SPIDER_MAXCONNFAILS) { DEBUGERROR("[%s]: threw exception, reconnecting", m_domain); m_status = reconnecting; m_dc->connect(); m_dc->waitForConnect(); //retry current page request (no finalise) } else { DEBUGERROR("[%s]: threw exception, given up (>%i fails)", m_domain, SPIDER_MAXCONNFAILS); m_status = stopped; m_dc->waitForNothing(); finalise(); //analyse what we have and then continue finishConversation(m_totalBytesTransfered?some:failed); } } void Protocol::connectionClosedOnWrite() { if (m_dc->consecutiveConnectionFails() < SPIDER_MAXCONNFAILS) { DEBUGERROR("[%s]: dropped connection whilst trying write, reconnecting", m_domain); m_status = reconnecting; //now we set to reconnecting, even if already connected to start listening for write events m_dc->connect(); //start reconnection immediately while current page is being analysed m_dc->waitForConnect(); //re-attempt the same write } else { DEBUGERROR("[%s]: dropped connection whilst trying write, given up (>%i fails)", m_domain, SPIDER_MAXCONNFAILS); m_status = stopped; m_dc->waitForNothing(); finalise(); //analyse what we have and then continue finishConversation(m_totalBytesTransfered?some:failed); } } void Protocol::connectionClosedOnRead(const size_t bufspace) { DEBUGPRINT("[%s]: dropped connection whilst trying read, (expected with %u bytes)", DEBUG_LINE, m_domain, m_totalBytesTransfered); m_status = reconnecting; //now we set to reconnecting, even if already connected to start listening for write events m_dc->connect(); //start reconnection immediately while current page is being analysed m_dc->waitForNothing(); //set DC mode to nothing so that no more events come through finishConversation(finalise()); //release the getResource() call which will delete this Protocol } bool Protocol::outOfBufferSpace() { DEBUGPRINT("[%s]: Protocol::outOfBufferSpace", DEBUG_LINE, m_domain); finalise(); finishConversation(some); return true; //ask for a disconnect } void Protocol::completeFailure() { DEBUGERROR("[%s]: Protocol::completeFailure", m_domain); m_dc->connect(); m_status = reconnecting; //now we set to reconnecting, even if already connected to start listening for write events m_dc->waitForConnect(); } void Protocol::outOfSyncRead() { DEBUGERROR("[%s]: Protocol::outOfSyncRead", m_domain); //read data is available but we are waiting for a write //ignore the data in 1024 chunks, discarding immediately m_dc->ignoreReadData(); } void Protocol::outOfSyncWrite() { DEBUGERROR("[%s]: Protocol::outOfSyncWrite", m_domain); //write is available when we are waiting for more data from the server //finalise what we have and continue //finalise(m_totalBytesTransfered?some:failed); } void Protocol::finishedWrite(const int bytesleft) { //some bytes have been written, maybe not all. //there are bytesleft still to write (maybe 0) //bytesleft of 0 indicates that all bytes have been written switch (m_status) { case connecting: case reconnecting: { //get ready for new page read: tell the DC to wait for the new response if (!bytesleft) { m_totalBytesTransfered = 0; m_status = written; m_dc->waitForRead(m_buffer, m_buffersize); //DomainConnection will null terminate string and use BUFFERSZ-1 } break; } } } void Protocol::finishedRead(const int bytes) { DEBUGPRINT("[%s]: Protocol::finishedRead", DEBUG_LINE, m_domain); //the read has finished: by default assume a single request -> response paradigm and finialise the IR on EOS switch (m_status) { case written: case reading: { m_totalBytesTransfered += bytes; DEBUGPRINT("[%s]: %i bytes recieved [%s]", DEBUG_LINE, m_domain, bytes, m_buffer + m_totalBytesTransfered - 15); if (checkForEOS()) finishConversation(finalise()); break; } } }