#include "Spider.h" #include TIPsDatabase *Spider::m_db = 0; DomainConnection Spider::m_googleDC("www.google.co.uk", 0, 300, 0); Domain Spider::m_googleDomain(0, "www.google.co.uk"); Filter Spider::m_fGetGoogleCount(1, 0, "Results 1 - 10 of about ([0-9,]+) from "); Spider::Spider(SpiderEventSink *_sm, Domain *_domain, TIPsDatabase *_db, const size_t _buffersize): ResourceMonitor(_sm), m_run(true), m_sm(_sm), //listener to Spider events m_buffersize(_buffersize), m_domain(_domain), m_dc(_domain->m_domain, 0, 300, 0), //DC connect to the current domain with 300 keep-alive and 0 linger (off) m_currentProtocol(0), //State: not set until a request is required m_currentPage(0), m_status(created) { pthread_invalidate(m_runThread); m_buffer = (char*)mallocCheck(m_buffersize); //static first time initialisations if (m_db == 0) { m_db = _db; m_googleDC.DNSLookup(); m_googleDC.connect(); } //monitor resources on the Domain to pass it up to the SM attachToDomain(); memoryDelta((int)(m_buffersize + sizeof(Spider)), this); } //these are more for moving to new Domains void Spider::attachToDomain() { m_domain->setResourceMonitorEventSink(this); } void Spider::detachFromDomain() { m_domain->setResourceMonitorEventSink(0); } Spider::~Spider() { if (m_currentProtocol) { DEBUGERROR("[%s]: StillTalking()", m_domain->m_domain); throw StillTalking(); } detachFromDomain(); if (m_buffer) {free((void*)m_buffer);m_buffer = 0;} memoryDelta(-(int)(m_buffersize + sizeof(Spider)), this); } unsigned int Spider::googleCount() { //ask Google how many pages there are in this domain (http://www.google.co.uk/search?as_sitesearch=) DEBUGPRINT("[%s]: get Google count", DEBUG_CHECK, m_domain->m_domain); //create URL size_t googleCountURLLen = strlen(m_domain->m_domain) + 50; char *googleCountURL = (char*)mallocCheck(googleCountURLLen+1); _SNPRINTF(googleCountURL, googleCountURLLen, "http://www.google.co.uk/search?as_sitesearch=%s", m_domain->m_domain); unsigned int googlePageCount = 0; const char *googlePageCountText = 0; size_t pos = 0; char googlePageCountTextWithoutCommas[32], c; //get the page InternetResource *googleResource = 0; InternetURIRequest *ir = new InternetURIRequest(&m_googleDomain, googleCountURL); ir->parse(); //this section is effectively critical because the claiming and release of the DC is in a mutex //only one Protocol can own a DC at a time. The second will wait. Protocol *pGoogle = Protocol::createProtocol(m_db, &m_googleDC, ir, &googleResource, m_buffer, m_buffersize); //DC ownership mutex claimed if (pGoogle && pGoogle->getResource() == Protocol::ok && googleResource) { //synchronous DEBUG_RESULT_OK; //analyse the body for the count info vector parts; parts.reserve(2); if (!m_fGetGoogleCount.submatches(googleResource->body(), &parts)) { //did not match at all! DEBUGPRINT("[%s]: Google count not found", DEBUG_LINE, m_domain->m_domain); } else { //convert number into an int e.g. "7,123" -> 7123 googlePageCountText = parts[1]; //with commas in (confuses atoi()) while ((c = *googlePageCountText++) && pos < 31) if (c>='0' && c<='9') googlePageCountTextWithoutCommas[pos++] = c; googlePageCountTextWithoutCommas[pos] = 0; googlePageCount = atoi(googlePageCountTextWithoutCommas); DEBUGPRINT("[%s]: Google count: [%u]", DEBUG_LINE, m_domain->m_domain, googlePageCount); for (vector::iterator i = parts.begin(); i != parts.end(); i++) free((void*)*i); } } else DEBUG_RESULT_FAIL; //clear up free(googleCountURL); if (pGoogle) delete pGoogle; //DC ownership mutex released (persistent connection maintained) if (ir) delete ir; if (googleResource) delete googleResource; return googlePageCount; } int Spider::run() { //public function //---------------- pre-async run stuff #ifdef GOOGLECOUNT if (!m_domain->googleCount()) m_domain->googleCount(googleCount()); #endif //need to have a big stack size for the GRETA analysis of the 50k pages pthread_attr_t attr; pthread_attr_init(&attr); //Spider thread priority struct sched_param param; param.sched_priority = SPIDER_THREADPRIORITY; if (pthread_attr_setschedparam(&attr, ¶m)) DEBUGERROR("[%s]: cannot set Spider thread priority", m_domain->m_domain); //stacksize #ifdef _DEBUG size_t stacksize = 0; pthread_attr_getstacksize(&attr, &stacksize); DEBUGPRINT("[%s]: Adjusting stacksize for GRETA [%u] -> [%u]", DEBUG_LINE, m_domain->m_domain, stacksize, SPIDER_THREADSTACKSIZE); #endif if (pthread_attr_setstacksize(&attr, SPIDER_THREADSTACKSIZE)) DEBUGERROR("[%s]: cannot set thread stacksize", m_domain->m_domain); //note that m_runThread is set to 0 after staticrunasync() returns return pthread_create(&m_runThread, &attr, Spider::staticrunasync, this); } int Spider::stop() { //public function //just request the event loop stop DEBUGPRINT("[%s]: stopping Spider: setting run to false", DEBUG_LINE, m_domain->m_domain); m_run = false; //asynchronous signal to exit at the end of next loop return 0; } int Spider::waitStop() { //Spider will be deleted after this, so make sure all contracts are closed //synchronous wait for thread to end naturally (not a cancel) if (pthread_isvalid(m_runThread)) { DEBUGPRINT("[%s]: stopping Spider: waiting for thread", DEBUG_LINE, m_domain->m_domain); pthread_join(m_runThread, 0); DEBUGPRINT("[%s]: stopping Spider: thread exited", DEBUG_LINE, m_domain->m_domain); } else { DEBUGPRINT("[%s]: stopping Spider: thread already finished", DEBUG_LINE, m_domain->m_domain); } detachFromDomain(); //stop listening for resource information m_status = stopped; return 0; } //private threaded functions void Spider::staticrunCleanup(LPVOID lpParam) { Spider *spider = (Spider*)lpParam; pthread_invalidate(spider->m_runThread); } THREAD_CALLBACK_TYPE Spider::staticrunasync(LPVOID lpParam) { Spider *spider = (Spider*)lpParam; //thread cleanup pthread_cleanup_push(Spider::staticrunCleanup, spider); spider->runasync(); pthread_cleanup_pop(1); //and invoke return 0; } int Spider::runasync() { //Spider uses the Transcend method to traverse the website hierarchy. //There are 3 arrays: newlinks (m_newlinks), children and finished (m_pages) (includes errors) //The Spider fetches all the children of the current page first before progressing to the next in newlinks //The SpiderManager is responsible for adding the first (root page /) into the newlinks DEBUGPRINT("[%s]: starting Spider run (Transcend method)", DEBUG_LINE, m_domain->m_domain); //these are the default: //if (pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, 0)) //PTHREAD_CANCEL_ENABLE, PTHREAD_CANCEL_DISABLE // DEBUGERROR("[%s]: cannot set cancel type", m_domain->m_domain); //if (pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, 0)) //PTHREAD_CANCEL_DEFERRED, PTHREAD_CANCEL_ASYNCHRONOUS // DEBUGERROR("[%s]: cannot set cancel type", m_domain->m_domain); //The Spider manages the DomainConnection try { m_dc.DNSLookup(); //will throw DNSFailure on fail } catch (DomainConnection::DNSFailure&) { //write out and then ignore DEBUGERROR("[%s]: DNS Lookup failed", m_domain->m_domain); return 1; } m_dc.connect(); //pre-imptive connect as the Protocol will want to write //kick off the first request loop: this will trigger a response/timeout event on the Spider when the server responds Protocol::PersistentData permData; memset(&permData, 0, sizeof(permData)); //maintained entirely by the Protocol InternetResource *resource = 0; //returned by the Protocol const vector *protocolLinks = 0; //returned by the Protocol (from headers) const vector *resourceLinks = 0; //from the InternetResource (in the body) vector::const_iterator i; m_status = working; const char *finallink = 0; InternetURIRequest *newPage = 0; //source request to get the InternetResource Protocol::conversationResult ret = Protocol::notStarted; DEBUGPRINT("[%s]: starting Spider run", DEBUG_LINE, m_domain->m_domain); while (m_domain->popNewPage(&m_currentPage) != Domain::noWork && m_currentPage && m_run) { DEBUGPRINT("[%s]: Job [%s]", DEBUG_LINE, m_domain->m_domain, m_currentPage->absoluteURL()); ret = Protocol::notStarted; if (!m_currentPage->type()->process()) { DEBUGPRINT("[%s]: ignored (pdf|image|...)", DEBUG_LINE, m_domain->m_domain); } else { try { //the assignment of the Protocol to the URL will initiate a write to the DC and a listen to its events m_currentProtocol = Protocol::createProtocol(m_db, &m_dc, m_currentPage, &resource, m_buffer, m_buffersize, &permData); DEBUGPRINT("[%s]: Request Protocol [%s] selected", DEBUG_LINE, m_domain->m_domain, m_currentProtocol->description()); } catch (Protocol::UnknownProtocol&) { DEBUGPRINT("[%s]: failed to set initial request Protocol for [%s], continue with next job", DEBUG_LINE, m_domain->m_domain, m_currentPage->absoluteURL()); m_currentProtocol = 0; } if (m_currentProtocol) { ret = m_currentProtocol->getResource(); if (!m_currentPage->isRepeating()) { switch (ret) { //synchronous case Protocol::ok: case Protocol::some: case Protocol::maybe: { //compile the sub-links from the Protocol if (protocolLinks = m_currentProtocol->links()) { //Protocol frees this vector in destructor for (i = protocolLinks->begin(); i != protocolLinks->end(); i++) { finallink = *i; try{ newPage = m_domain->createPage(finallink, m_currentPage);} //DomainMismatch& or InvalidAbsoluteURL& catch (...) {newPage = 0;} if (finallink) free((void*)finallink); } } //compile the sub-links from the Resource if (resource) { if (resourceLinks = resource->links()) { //we free this vector for (i = resourceLinks->begin(); i != resourceLinks->end(); i++) { finallink = *i; try { newPage = m_domain->createPage(finallink, m_currentPage);} //DomainMismatch& or InvalidAbsoluteURL& catch (...) {newPage = 0;} if (finallink) free((void*)finallink); } delete resourceLinks; } m_domain->addResourceSize(resource->size()); //resource->writeToFile(); m_sm->finishedPage(this, resource); //inform the SpiderManager that a page has been finished } else DEBUGPRINT("[%s]: no resource", DEBUG_LINE, m_domain->m_domain); break; } default: { #ifdef _DEBUG const char *results[] = {"notStarted", "unknownConversationResult", "busy", "exception", "globaltimeout", "timeout", "unknownCharset", "unknownContentEncoding", "contentDecodeFailed", "unknownContenttype", "noBody", "failed", "maybe", "some", "ok"}; DEBUGPRINT("[%s]: unhandled Protocol result [%s]", DEBUG_LINE, m_domain->m_domain, results[ret]); #endif break; } } } //no release of the IR is necessary because it uses a shared buffer space for the body //if (resource) {delete resource;resource = 0;resourceLinks = 0;} //filled out even in some failure cases //if (resource) resource->freeBody(); //m_currentPage->releaseInternetResource(); //sets the m_ir to 0 {delete m_currentProtocol;m_currentProtocol = 0;protocolLinks = 0;} //zero pointers also to internal arrays } } //finalise page m_currentPage->setProcessed(); m_domain->updateLastPage(m_currentPage); } DEBUGPRINT("[%s]: finishing Spider run", DEBUG_LINE, m_domain->m_domain); m_sm->finishedDomain(this); m_status = stopped; return 0; }