23
Sep
2010

Pornind de la o problema concreta, am incercat sa evidentiez anumite tehnici care pot fi utilizate pentru optimizarea executiei unui program, beneficiind astfel de avantajul tehnologic oferit de platformele hardware care suporta procesoare multiple, nuclee multiple per procesor sau tehnologii de tipul Hyper Threading.

Introducere

Visual C++ 2010 include asa numitul Concurrency Runtime, suportat incepand cu sistemul de operare Wndows XP SP3, care pune la dispozitia programatorilor algoritmi pentru procesare concurenta, definirea de task-uri care pot rula in paralel, containere, agenti etc. care utilizeaza Concurrency Runtime Scheduler implicat in crearea si managementul firelor de executie. In plus, Visual C++ 2010 extinde suportul pentru standardul C++0x prin adaugarea de noi caracteristici, printre care si expresiile lambda. Cu ajutorul acestora se pot crea functii anonime. In exemplul de mai jos sunt prezentate alternative la un cod care se executa serial prin implementarea unora dintre algoritmii pentru rularea concurenta, task-uri care ruleaza in paralel si prin utilizarea expresiilor lambda care structureaza codul intru-un mod logic si il fac mai usor de citit si de inteles.

Detalii de implementare

Se pune problema indexarii unui fisier text de mari dimensiuni si determinarea cuvintelor care au cele mai putine aparitii. Aceasta implica separarea acestui proces in trei etape:

  • incarcare fisier
  • indexare cuvinte
  • cautare

In exemplul de fata nu s-a pus problema optimizarii citirii textului din fisier sau a cautarii cuvintelor indexate, dar mecanismele utilizare sunt aceleasi pentru rularea seriala, cat si pentru cea concurenta pentru a putea evidentia performantele fiecarei tehnici in parte. In plus, timpii de rulare sunt dependenti de masina pe care se fac testele, in functie de configuratia hardware, in special a procesorului, cat si a sistemului de operare utilizat. Cat despre modul in care sunt stocate in memorie cuvintele indexate, am optat pentru containerele de tip colectie std::map si stdext::hash_map in care, pe post de cheie, este utilizata clasa CString. Acest lucru necesita definirea unei functii operator pentru std::map si a unei functii hash pentru std::hash_map.

struct CStringCompare
{
public:
   bool operator()(const CString &s1, const CString &s2) const
   {
      //return s1.CompareNoCase(s2) < 0; // daca se doreste indexarea fara a tine 
                                         // seama de litere mari sau mici
      return s1 < s2;
   }
};
 
typedef map<CString,int,CStringCompare> CStringMap;
 
template< > inline
size_t stdext::hash_value< CString >(const CString &s)
{
   return stdext::hash_value((LPCTSTR)s);
}
 
typedef hash_map<CString,int> CStringHashMap;

Mai jos sunt prezentate secvente de cod in care este utilizata doar colectia stdext::hash_map, dar proiectul exemplu atasat le include pe amandoua.

Procesare seriala

Un cod clasic, serial, poate grupa primele doua etape intr-una singura, astfel ca in bucla de incarcare a textului din fisier fiecare rand este indexat.

BOOL IncarcareIndexare(LPCTSTR sFisName, LPCTSTR sMetoda)
{
   //...
   int nLinii=0;
   CString sLinie;
   int pos;
   CString t;
   CStringHashMap::iterator iter_c;

   ReseteazaCautare();
    
   timer.start();
   while(fisier.ReadString(sLinie))
   {
      pos=0;
      t=sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      while(!t.IsEmpty())
      {
         iter_c=m_cuvinte.find(t);
         if(iter_c==m_cuvinte.end())
            m_cuvinte.insert(CStringHashMap::value_type(t,1));
         else
            iter_c->second++;
         t=sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      }
      nLinii++;
   }
   double end=timer.stop();
   fisier.Close();
   //...
   return TRUE;
}

Ca alternativa se poate separa incarcarea fisierului de indexare, relevanta la utilizarea tehnicii de procesare concurenta in care este utilizata functia parallel_for_each.

static BOOL Incarcare(LPCTSTR sFisName, LPCTSTR sMetoda)
{
   //...
   CString sLinie;
 
   timer.start();
   while(fisier.ReadString(sLinie))
      m_randuri.push_back((LPCTSTR)sLinie);
   double end=timer.stop();
   fisier.Close();
   //...
   return TRUE;
}

double Serial()
{
   //...
   timer.start();
   for(iter_s=m_randuri.begin();iter_s!=m_randuri.end();iter_s++)
   {
      pos=0;
      t=iter_s->Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      while(!t.IsEmpty())
      {
         iter_c=m_cuvinte.find(t);
         if(iter_c==m_cuvinte.end())
            m_cuvinte.insert(CStringHashMap::value_type(t,1));
         else
            iter_c->second++;
         t=iter_s->Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      }
   }
   return timer.stop();
}

Dupa indexare, colectia contine cuvintele distincte din fisier si numarul de aparitii pentru fiecare dintre acestea. Determinarea cuvintelor cu cel mai mic numar de aparitii se realizeaza prin parcurgerea colectiei si construirea listei de cuvinte, separate prin virgula, cu cele mai putine aparitii. Daca la o noua iteratie numarul de aparitii asociat cuvantului curent este mai mic decat cel memorat in variabila m_numarAparitii, atunci lista de cuvinte este reinitializata si apoi se continua parcurgerea colectiei.

double Cautare()
{
   //...
   timer.start();

   for(iter_c=m_cuvinte.begin();iter_c!=m_cuvinte.end();iter_c++)
   {
      if(iter_c->second<=m_numarAparitii)
      {
         if(iter_c->second < m_numarAparitii)
            m_listaCuvinte=iter_c->first,m_numarCuvinte=1;
         else
            m_listaCuvinte+=_T(",")+iter_c->first,m_numarCuvinte++;
         m_numarAparitii=iter_c->second;
      }
   }

   return timer.stop();
}

Procesare concurenta

Concurrency Runtime din Visual C++ 2010 pune la dispozitia programatorilor algoritmi pentru procesarea concurenta, printre care si functia parallel_for_each din namespace-ul Concurrency care poate fi aplicata pe vectorul de linii de rand din fisierul text, incarcat in prealabil. Functia primeste ca parametri iteratorii de inceput si cel de dupa sfarsit ai vectorului plus o functie lambda in care se indexeaza o linie de text din vector asa cum este distribuita de Concurrency Runtime Scheduler.

double Paralel()
{
   //...
   CRITICAL_SECTION cs;
   InitializeCriticalSection(&cs);
 
   timer.start();
 
   parallel_for_each(m_randuri.begin(),m_randuri.end(),[&](const CString &linie)
   {
      CStringHashMap::iterator iter_c;
      int pos=0;
      CString t=linie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      while(!t.IsEmpty())
      {
         EnterCriticalSection(&cs);
         iter_c=m_cuvinte.find(t);
         if(iter_c==m_cuvinte.end())
            m_cuvinte.insert(CStringHashMap::value_type(t,1));
         else
            iter_c->second++;
         LeaveCriticalSection(&cs);
         t=linie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      }
   });
 
   return timer.stop();
 
   DeleteCriticalSection(&cs);
}

Implementarea de mai sus nu este o abordare tocmai corecta din punct de vedere al procesarii paralele deoarece este utilizata o sectiune critica care blocheaza celelalte fire de executie pana ce lista de cuvinte indexate a fost actualizata cu un nou cuvant din firul de executie curent. A fost necesara utilizarea sectiunii critice pentru a asigura ca aceasta colectie este actualizata o singura data la un moment dat, prevenindu-se astfel generarea unor erori la executie sau generarea unor rezultate eronate. O metoda mai eficienta este utilizarea clasei combinable care asigura, pentru fiecare fir de executie creat in mecanismul de procesare paralela, o copie locala a rezultatelor care apoi sunt reunite intr-un rezultat final prin utilizarea functiei membre combine.

double ParalelEx()
{
   //...
   combinable<CStringHashMap> cuvinte;

   timer.start();

   parallel_for_each(m_randuri.begin(),m_randuri.end(),[&](const CString &linie)
   {
      CStringHashMap::iterator iter_c;
      int pos=0;

      CStringHashMap &local=cuvinte.local();

      CString t=linie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      while(!t.IsEmpty())
      {
         iter_c=local.find(t);
         if(iter_c==local.end())
            local.insert(CStringHashMap::value_type(t,1));
         else
            iter_c->second++;
         t=linie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      }
   });
	
   m_cuvinte=cuvinte.combine(
      [](const CStringHashMap &sm1, const CStringHashMap &sm2)->CStringHashMap
   {
      static CStringHashMap::const_iterator iter_m;
      static CStringHashMap::iterator iter;
      CStringHashMap sm(sm1);
      for(iter_m=sm2.begin();iter_m!=sm2.end();iter_m++)
      {
         iter=sm.find(iter_m->first);
         if(iter==sm.end())
            sm.insert(CStringHashMap::value_type(iter_m->first,iter_m->second));
         else
            iter->second+=iter_m->second;
      }

      return sm;
   });
 
   return timer.stop();
}

Determinarea concurenta a cuvintelor cu cel mai mic numar de aparitii se realizeaza cu ajutorul functiei parallel_for_each care este aplicata pe colectia de cuvinte indexate si cu ajutorul clasei combinable pentru generarea unei copii locale de rezultate pentru fiecare fir de executie; ulterior se combina rezultatele locale intr-un rezultat final.

double CautareEx()
{
   //...
   struct cvParam
   {
      int numarCuvinte;
      int numarAparitii;
      cvParam()
      {
         numarAparitii=MAXINT32;
         numarCuvinte=0;
      }
   };
 
   typedef pair<CString,cvParam> cuvParam;
 
   combinable<cuvParam> listaCuvinte;
 
   timer.start();
 
   parallel_for_each(m_cuvinte.begin(),m_cuvinte.end(),
               [&](const pair<CString,int> &cuvant)
   {
      if(cuvant.second<=listaCuvinte.local().second.numarAparitii)
      {
         if(cuvant.second<listaCuvinte.local().second.numarAparitii)
            listaCuvinte.local().first=cuvant.first,
            listaCuvinte.local().second.numarCuvinte=1;
         else
            listaCuvinte.local().first+=_T(",")+cuvant.first,
            listaCuvinte.local().second.numarCuvinte++;
         listaCuvinte.local().second.numarAparitii=cuvant.second;
      }
   });
 
   cuvParam cvP=listaCuvinte.combine(
            [](const cuvParam &cv1, const cuvParam &cv2)->cuvParam
   {
      cuvParam cv=cv1;
      if(cv2.second.numarAparitii<=cv.second.numarAparitii)
      {
         if(cv2.second.numarAparitii<cv.second.numarAparitii)
         {
            cv.first=cv2.first;
            cv.second.numarAparitii=cv2.second.numarAparitii;
            cv.second.numarCuvinte=cv2.second.numarCuvinte;
         }
         else
         {
            cv.first+=_T(",")+cv2.first;
            cv.second.numarCuvinte+=cv2.second.numarCuvinte;
         }
      }
      return cv;
   });

   m_listaCuvinte=cvP.first;
   m_numarCuvinte=cvP.second.numarCuvinte;
   m_numarAparitii=cvP.second.numarAparitii;

   return timer.stop();
}

Functia parallel_for_each necesita incarcarea in prealabil a unei colectii de date care, in cazul exemplului nostru, nu este cea mai buna solutie. Concurrency Runtime include algoritmi pentru definirea de task-uri cu ajutorul clasei task_group. Astfel indexarea textului poate fi efectuata direct in functia de incarcare a fisierului.

BOOL IncarcareIndexareEx(LPCTSTR sFisName, LPCTSTR sMetoda)
{
   //...
   task_group tg;
 
   CRITICAL_SECTION cs;
   InitializeCriticalSection(&cs);
 
   timer.start();
    while(fisier.ReadString(sLinie))
   {
      nLinii++;
      tg.run([sLinie,&cs,this]()
      {
         CStringHashMap::iterator iter_c;
         int pos=0;
         CString t=sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
         while(!t.IsEmpty())
         {
            EnterCriticalSection(&cs);
            iter_c=m_cuvinte.find(t);
            if(iter_c==m_cuvinte.end())
               m_cuvinte.insert(CStringHashMap::value_type(t,1));
            else
               iter_c->second++;
            LeaveCriticalSection(&cs);
            t=sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
         }
      });
   }
   tg.wait();
   end=timer.stop();

   DeleteCriticalSection(&cs);
   //...
   return TRUE;
}

Mai mult decat atat, similar cu implementarea functiei parallel_for_each, se poate utiliza clasa combinable pentru a genera rezultate locale care apoi sunt reunite intr-un rezultat final.

BOOL IncarcareIndexareEx(LPCTSTR sFisName)
{   
   //...
   task_group tg;
 
   combinable<CStringHashMap> cuvinte;
 
   timer.start();
 
   while(fisier.ReadString(sLinie))
   {
      nLinii++;
      tg.run([sLinie,&cuvinte,this]()
      {
         static CStringHashMap::iterator iter_c;
         int pos=0;

         CStringHashMap &local=cuvinte.local();

         CString t=sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
         while(!t.IsEmpty())
         {
            iter_c=local.find(t);
            if(iter_c==local.end())
               local.insert(CStringHashMap::value_type(t,1));
            else
               iter_c->second++;
            t=sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
         }
      });
   }
 
   tg.wait();
 
   m_cuvinte=cuvinte.combine(
      [](const CStringHashMap &sm1, const CStringHashMap &sm2)->CStringHashMap
   {
      static CStringHashMap::const_iterator iter_m;
      static CStringHashMap::iterator iter;
      CStringHashMap sm(sm1);
      for(iter_m=sm2.begin();iter_m!=sm2.end();iter_m++)
      {
         iter=sm.find(iter_m->first);
         if(iter==sm.end())
            sm.insert(CStringHashMap::value_type(
                  iter_m->first,iter_m->second));
         else
            iter->second+=iter_m->second;
      }
 
      return sm;
   });

   end=timer.stop();
   //...
   return TRUE;
}

Daca nu sunteti familiarizati cu noile facilitati de procesare concurenta introduse odata cu Visual C++ 2010, puteti utiliza clasa CThreadPool inclusa in header-ul atlutil.h introdusa incepand cu Visual Studio 2005 pe care am adaptat-o sa functioneze ca si clasa task_group prin implementarea unei functii de asteptare pana ce toate task-urile au fost executate. Aceasta poate fi utilizata in procesul de incarcare si indexare. Ca detalii de implementare, mai intai se defineste clasa CTask in care este implementata metoda DoTask insarcinata cu indexarea unui rand de text.

class CTask
{
   static CCriticalSection m_cs;
public:
   CString m_sLinie;
   int m_nLinie;
   CTask(const CString &sLinie, int nLinie):m_sLinie(sLinie),m_nLinie(nLinie){}
   ~CTask(){}
   virtual void DoTask(void *pvParam)
   {
      CStringHashMap *cv=reinterpret_cast<CStringHashMap*>(pvParam);
      CStringHashMap::iterator iter_c;
 
      int pos=0;
 
      CSingleLock sl(&m_cs);
      sl.Lock();
      CString t=m_sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      while(!t.IsEmpty())
      {
         iter_c=cv->find(t);
         if(iter_c==cv->end())
            cv->insert(CStringHashMap::value_type(t,1));
         else
            iter_c->second++;
         t=m_sLinie.Tokenize(_T(".,;:'\"-?!() \\/"),pos);
      }
      sl.Unlock();
      delete this;
   }
};

Apoi se defineste clasa CThreadPoolWorker care este asociata fiecarui fir de executie creat si care are rolul de a gestiona rularea acestuia.

class CThreadPoolWorker
{
public:
   //...
   void Execute(RequestType dw, void *pvParam, OVERLAPPED* pOverlapped) throw()
   {
      UNREFERENCED_PARAMETER(pOverlapped);
      ATLASSERT(pvParam != NULL);
 
      CTask *pJob = (CTask*)(DWORD_PTR)dw;
      pJob->DoTask(pvParam);
      LONG doneNo=InterlockedIncrement(&_DoneNo);
      LONG taskNo=InterlockedExchange(&_TaskNo,_TaskNo);
      if(m_event!=NULL && taskNo!=-1 && doneNo>=taskNo)
      {
         InterlockedExchange(&_TaskNo,-1);
         SetEvent(m_event);
      }
   }
   //...
   static void WaitUntilLastTaskIsDone()
   {
      WaitForSingleObject(m_event,INFINITE);
      CloseHandle(m_event);
   }
    static void SetNumberOfTasks(const LONG &taskNo)
   {
      InterlockedExchange(&_TaskNo,taskNo);
   }
   //...
};

Functia membra Execute primeste un task disponibil din coada de asteptare si executa functia DoTask asociata. Instanta clasei CThreadPool este initializata, asociindu-se astfel un numar de fire de executie pentru procesarea paralela a task-urilor asociate.

BOOL IncarcareIndexareEx(LPCTSTR sFisName, int poolSize, LPCTSTR sMetoda)
{
   //...
   CTask *job=NULL;
   CThreadPool<CThreadPoolWorker> thPool;
 
   timer.start();
 
   HRESULT hr = thPool.Initialize((void*)&m_cuvinte,poolSize);
   if(SUCCEEDED(hr))
   {
      while(fisier.ReadString(sLinie))
      {
         job=new CTask(sLinie,nLinii+1);
         if(thPool.GetQueueHandle()!=NULL)
         {
            nLinii++;
            thPool.QueueRequest((CThreadPoolWorker::RequestType)job);
         }
      }
      CThreadPoolWorker::SetNumberOfTasks(nLinii);
      CThreadPoolWorker::WaitUntilLastTaskIsDone();
      thPool.Shutdown();

      end=timer.stop();
 
      fisier.Close();
   }
   else
      fisier.Close();
   //...
   return TRUE;
}

Dupa ce toate task-urile au fost executate, adica toate randurile din fisierul text au fost indexate, toate firele de executie implicate in procesul de indexare sunt "terminate".

Exemplu de utilizare

In exemplul Teste am folosit toate tehnicile prezentate mai sus, utilizand atat colectia std::map, cat si stdext::hash_map. Clasa CString este utilizata ca si cheie pentru aceste colectii din considerente pur personale, poate si datorita faptului ca utilizez in mod curent librariile MFC si STL in proiectele mele. In plus, clasa CString include functia Tokenize, utilizata frecvent in acest exemplu. Aplicatia primeste la intrare unul sau mai multi parametri. Primul parametru este calea unui fisier text, al doilea reprezinta dimensiunea setului de fire de executie gestionat de CThreadPool, al treilea, un intreg 0 sau 1 daca se doreste asteptarea intre etapele procesului de incarcare, indexare si cautare, iar ultimul parametru indica timpul de asteptare exprimat in milisecunde. Utilizare:

Teste.exe <FisPath> <PoolSize> <WaitBefore> <WaitTimeout>

Am inclus aceasta asteptare intre etape, dorind sa evit, pe cat posibil, ca rezultatele testului sa nu fie influentate de etapele executate anterior. Am definit clasa de baza CCuvinteBaza, si doua clase derivate din aceasta (CCuvinteMap, CCuvinteHashMap) pentru cele doua tipuri de colectii std::map si stdext::hash_map. Functiile evidentiate in articol sunt functii membre in aceste clase. La rulare, pe baza codului de mai jos,

CCuvinteBaza::AsteaptaInainteDeProcesareActiune(bWaitBefore,bWaitTimeout);
CCuvinteBaza::Incarcare((LPCTSTR)sFisName,_T("Incarcare"));
 
CCuvinteMap cvMap;
cvMap.Afisare(_T("CStringMap - Serial, Cautare"),
         cvMap.Cautare(),cvMap.Serial()); // ,TRUE);
cvMap.Afisare(_T("CStringMap - Paralel, Cautare"),
         cvMap.Cautare(),cvMap.Paralel());
cvMap.Afisare(_T("CStringMap - ParalelEx, Cautare"),
         cvMap.Cautare(),cvMap.ParalelEx());
cvMap.Afisare(_T("CStringMap - ParalelEx, CautareEx"),
         cvMap.CautareEx(),cvMap.ParalelEx());
 
CCuvinteHashMap cvHashMap;
cvHashMap.Afisare(_T("CStringHashMap - Serial, Cautare"),
         cvHashMap.Cautare(),cvHashMap.Serial()); // ,TRUE);
cvHashMap.Afisare(_T("CStringHashMap - Paralel, Cautare"),
         cvHashMap.Cautare(),cvHashMap.Paralel());
cvHashMap.Afisare(_T("CStringHashMap - ParalelEx, Cautare"),
         cvHashMap.Cautare(),cvHashMap.ParalelEx());
cvHashMap.Afisare(_T("CStringHashMap - ParalelEx, CautareEx"),
         cvHashMap.CautareEx(),cvHashMap.ParalelEx());
 
cvHashMap.IncarcareIndexare((LPCTSTR)sFisName,_T("IncarcareIndexare"));
cvHashMap.IncarcareIndexareEx((LPCTSTR)sFisName,nPoolSize,
               _T("IncarcareIndexareEx (CThreadPool)"));
cvHashMap.IncarcareIndexareEx((LPCTSTR)sFisName,
               _T("IncarcareIndexareEx (task_group)"));
cvHashMap.IncarcareIndexareEx((LPCTSTR)sFisName); // task_group + combinable
cvHashMap.Afisare(_T("CStringHashMap - CautareEx"),cvHashMap.CautareEx(),0.0); // ,TRUE);

au fost generate urmatoarele rezultate:

Teste rulate pe o statie cu procesor Intel Dual Core (Windows XP)
Vezi imaginea la dimenisunea normala

Figura 1. Teste rulate pe o statie cu procesor Intel Dual Core (Windows XP)

Teste rulate pe o statie cu procesor AMD II X4 (Windows 7)
Vezi imaginea la dimenisunea normala

Figura 2. Teste rulate pe o statie cu procesor AMD II X4 (Windows 7)

Toate exemplele de cod prezentate in articol sunt incluse intr-un proiect creat in Microsoft Visual Studio 2010.

Concluzii

Utilizarea colectiei stdext::hash_map da un spor de viteza executiei codului in care este folosita, fata de colectiei std::map. Daca este utilizata clasa combinable, se elimina necesitatea blocarii, deblocarii accesului firelor de executie la resursa comuna, astfel incat timpul de executie se reduce simtitor. Utilizarea clasei task_grup in procedul de incarcare, indexare reduce pana la jumatate timpul necesar efectuarii acestor operatiuni, fata de alte metode. Implementarea care include clasa CThreadPool se comporta similar si genereaza timpi de executie apropiati.

Utilizarea algoritmilor pentru procesare concurenta aduc un plus de performanta daca sunt aplicati pe fisiere de mari dimensiuni. Acest lucru este dependent si de configuratia hardware, in special a procesorului, pe care se ruleaza testul.