#include "SpiderManager.h" #include "DBObjects.h" #include TIPsDatabase *SpiderManager::m_db = 0; SpiderManager::SpiderManager(TIPsDatabase* _db): SpiderEventSink() { MEMBER_INIT_MUTEX(m_objects_mutex); //for adding/deleting from m_objects MEMBER_INIT_MUTEX(m_hasObjects_mutex); //sleep while no work to do MEMBER_INIT_COND( m_hasObjects_cond); m_db = _db; memoryDelta((int) sizeof(SpiderManager)); } SpiderManager::~SpiderManager() { pthread_mutex_destroy(&m_objects_mutex); memoryDelta(-(int) (sizeof(SpiderManager) + m_domains.capacity()*sizeof(Domain*))); } THREAD_CALLBACK_TYPE SpiderManager::staticRunDBThreadasync(LPVOID lpParam) { //lpParam = the SpiderManager this SpiderManager *sm = (SpiderManager*)lpParam; if (sm) sm->runDBThread(); return 0; } int SpiderManager::runDBThread() { DBObject *object; vector::iterator i; m_runDB = true; while (m_runDB) { pthread_mutex_lock(&m_hasObjects_mutex); pthread_cond_wait(&m_hasObjects_cond, &m_hasObjects_mutex); //infinite wait for something to do pthread_mutex_unlock(&m_hasObjects_mutex); //this is the only function that reduces the size of m_objects so a mutex is not needed for the size() const while (m_objects.size()) { //however, this vector changing code will need to be mutexed outside of other vector changing stuff //it is quick, the DB access is done asynchronously pthread_mutex_lock(&m_objects_mutex); object = m_objects.back(); m_objects.pop_back(); pthread_mutex_unlock(&m_objects_mutex); //break out if the object queue exceeds 5000 //temporary fix until we understand how to manage this... //the stop will wait for all other threads to exit //when it pthread_join() to this thread it will auto-continue from here if (m_runDB) { if (m_objects.size() > MAX_OBJECTS) { MSG("[SpiderManager]: more than %i objects - auto-Stop", MAX_OBJECTS); stop(); } #ifdef _LINUX unsigned int freeMB = sysconf(_SC_AVPHYS_PAGES) * sysconf(_SC_PAGE_SIZE) / (1024*1024); if (freeMB < MIN_MEMORY) { MSG("[SpiderManager]: less than %i MB free memory left (%i) - auto-Stop", MIN_MEMORY, freeMB); stop(); } #endif } //serialise the object to DB if (!m_runDB) PROGRESS; try {object->addToDB();} //uses big escaped POST of copied data (in LIVEDB mode) catch (...) {} //ignore fails and continue delete object; //frees all the copied elements } } return 0; } int SpiderManager::stop() { //stop and delete all the Spiders Spider *sp; StringMap spiders; spiders.insert(begin(), end()); MSG0("[SpiderManager]: sending all Spiders the stop signal"); for (SpiderManager::iterator i = spiders.begin(); i != spiders.end(); i++) { sp = i->second; sp->stop(); //requests end asynchronously, use waitStop() to wait for the result } MSG1("[SpiderManager]: spider wait: "); for (SpiderManager::iterator i = spiders.begin(); i != spiders.end(); i++) { PROGRESS; sp = i->second; sp->waitStop(); //needs to synchronously kill the thread... //delete sp; //implicitly deletes associated DC (outside of eventLoop) erase(i->first); } FINISHPROGRESS; //save and delete all the domains MSG1("[SpiderManager]: saving all domains: "); Domain *dm; for (vector::iterator i = m_domains.begin(); i != m_domains.end(); i++) { PROGRESS; dm = *i; delete dm; //implicitly saves the domain to file } m_domains.clear(); FINISHPROGRESS; //cancel the DBThread MSG1("[SpiderManager]: cancel DB Thread: "); m_runDB = false; //end DB loop on next run pthread_cond_signal(&m_hasObjects_cond); //wake up DBThread (if need be) //note that this join will auto-continue if m_DBThread = pthread_self() pthread_join(m_DBThread, 0); //synchronous wait for end FINISHPROGRESS; return 0; } int SpiderManager::run() { char *sError = 0; Spider *spider = 0; Domain *domain = 0; m_domains.reserve(FD_SETSIZE); if (!m_db->loadExternalSources(SPIDERMANAGER_DOMAINS, &m_domains, &sError)) { MSG1("[SpiderManager]: start spiders (auto-load domains files): "); for (vector::iterator i = m_domains.begin(); i != m_domains.end(); i++) { PROGRESS; domain = *i; domain->createRootPage(); //all pages hang off the Domain spider = new Spider(this, domain, m_db, SPIDER_MAXBODYBUFFER * 1024); insert(pair(domain->m_domain, spider)); spider->run(); //ASYNC: returns immediately as runs on a separate thread } FINISHPROGRESS; //create DB Thread (for saving objects to the DB) //DB thread priority pthread_attr_t attr; pthread_attr_init(&attr); struct sched_param param; param.sched_priority = SM_DBTHREADPRIORITY; if (pthread_attr_setschedparam(&attr, ¶m)) DEBUGERROR0("[SpiderManager]: cannot set Spider thread priority"); pthread_create(&m_DBThread, &attr, SpiderManager::staticRunDBThreadasync, this); } else { DEBUGERROR("DB Error [%s]", sError); free(sError);sError=0; } memoryDelta((int) (m_domains.capacity()*sizeof(Domain*))); return 0; } int SpiderManager::finishedDomain(Spider *spider) {return 0;} int SpiderManager::finishedPage(Spider *spider, InternetResource *resource) { //this is the same thread that retrieved the resource so it, and the Spider, //will not be deleted until this function returns //check the document is valid for parsing (size, return code, etc.) if (resource && resource->validDocument()) { //get new objects (all values are copied so that they are not freed by the Spider) vector newobjects; if (resource->parse(&newobjects)) { //returns number of valid DB objects in document resource->internetURIRequest()->domain()->addDocument(resource->internetURIRequest()); //add the new objects to objects member group (making sure that they are not being removed at the same time) //note that these are pointers to new objects so the objects themselves are not copied //again during the transfer pthread_mutex_lock(&m_objects_mutex); m_objects.insert(m_objects.end(), newobjects.begin(), newobjects.end()); pthread_mutex_unlock(&m_objects_mutex); pthread_cond_signal(&m_hasObjects_cond); //wake up DBThread (if need be) } } return 0; }