tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

MtCoder.c (14259B)


      1 /* MtCoder.c -- Multi-thread Coder
      2 2018-02-21 : Igor Pavlov : Public domain */
      3 
      4 #include "Precomp.h"
      5 
      6 #include "MtCoder.h"
      7 
      8 #ifndef _7ZIP_ST
      9 
     10 SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize)
     11 {
     12  CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt);
     13  UInt64 inSize2 = 0;
     14  UInt64 outSize2 = 0;
     15  if (inSize != (UInt64)(Int64)-1)
     16  {
     17    inSize2 = inSize - thunk->inSize;
     18    thunk->inSize = inSize;
     19  }
     20  if (outSize != (UInt64)(Int64)-1)
     21  {
     22    outSize2 = outSize - thunk->outSize;
     23    thunk->outSize = outSize;
     24  }
     25  return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2);
     26 }
     27 
     28 
     29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p)
     30 {
     31  p->vt.Progress = MtProgressThunk_Progress;
     32 }
     33 
     34 
     35 
     36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; }
     37 
     38 
     39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
     40 {
     41  if (Event_IsCreated(p))
     42    return Event_Reset(p);
     43  return AutoResetEvent_CreateNotSignaled(p);
     44 }
     45 
     46 
     47 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
     48 
     49 
     50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t)
     51 {
     52  WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent);
     53  if (wres == 0)
     54  {
     55    t->stop = False;
     56    if (!Thread_WasCreated(&t->thread))
     57      wres = Thread_Create(&t->thread, ThreadFunc, t);
     58    if (wres == 0)
     59      wres = Event_Set(&t->startEvent);
     60  }
     61  if (wres == 0)
     62    return SZ_OK;
     63  return MY_SRes_HRESULT_FROM_WRes(wres);
     64 }
     65 
     66 
     67 static void MtCoderThread_Destruct(CMtCoderThread *t)
     68 {
     69  if (Thread_WasCreated(&t->thread))
     70  {
     71    t->stop = 1;
     72    Event_Set(&t->startEvent);
     73    Thread_Wait(&t->thread);
     74    Thread_Close(&t->thread);
     75  }
     76 
     77  Event_Close(&t->startEvent);
     78 
     79  if (t->inBuf)
     80  {
     81    ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf);
     82    t->inBuf = NULL;
     83  }
     84 }
     85 
     86 
     87 
     88 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
     89 {
     90  size_t size = *processedSize;
     91  *processedSize = 0;
     92  while (size != 0)
     93  {
     94    size_t cur = size;
     95    SRes res = ISeqInStream_Read(stream, data, &cur);
     96    *processedSize += cur;
     97    data += cur;
     98    size -= cur;
     99    RINOK(res);
    100    if (cur == 0)
    101      return SZ_OK;
    102  }
    103  return SZ_OK;
    104 }
    105 
    106 
    107 /*
    108  ThreadFunc2() returns:
    109  SZ_OK           - in all normal cases (even for stream error or memory allocation error)
    110  SZ_ERROR_THREAD - in case of failure in system synch function
    111 */
    112 
    113 static SRes ThreadFunc2(CMtCoderThread *t)
    114 {
    115  CMtCoder *mtc = t->mtCoder;
    116 
    117  for (;;)
    118  {
    119    unsigned bi;
    120    SRes res;
    121    SRes res2;
    122    Bool finished;
    123    unsigned bufIndex;
    124    size_t size;
    125    const Byte *inData;
    126    UInt64 readProcessed = 0;
    127    
    128    RINOK_THREAD(Event_Wait(&mtc->readEvent))
    129 
    130    /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */
    131 
    132    if (mtc->stopReading)
    133    {
    134      return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD;
    135    }
    136 
    137    res = MtProgress_GetError(&mtc->mtProgress);
    138    
    139    size = 0;
    140    inData = NULL;
    141    finished = True;
    142 
    143    if (res == SZ_OK)
    144    {
    145      size = mtc->blockSize;
    146      if (mtc->inStream)
    147      {
    148        if (!t->inBuf)
    149        {
    150          t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize);
    151          if (!t->inBuf)
    152            res = SZ_ERROR_MEM;
    153        }
    154        if (res == SZ_OK)
    155        {
    156          res = FullRead(mtc->inStream, t->inBuf, &size);
    157          readProcessed = mtc->readProcessed + size;
    158          mtc->readProcessed = readProcessed;
    159        }
    160        if (res != SZ_OK)
    161        {
    162          mtc->readRes = res;
    163          /* after reading error - we can stop encoding of previous blocks */
    164          MtProgress_SetError(&mtc->mtProgress, res);
    165        }
    166        else
    167          finished = (size != mtc->blockSize);
    168      }
    169      else
    170      {
    171        size_t rem;
    172        readProcessed = mtc->readProcessed;
    173        rem = mtc->inDataSize - (size_t)readProcessed;
    174        if (size > rem)
    175          size = rem;
    176        inData = mtc->inData + (size_t)readProcessed;
    177        readProcessed += size;
    178        mtc->readProcessed = readProcessed;
    179        finished = (mtc->inDataSize == (size_t)readProcessed);
    180      }
    181    }
    182 
    183    /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */
    184 
    185    res2 = SZ_OK;
    186 
    187    if (Semaphore_Wait(&mtc->blocksSemaphore) != 0)
    188    {
    189      res2 = SZ_ERROR_THREAD;
    190      if (res == SZ_OK)
    191      {
    192        res = res2;
    193        // MtProgress_SetError(&mtc->mtProgress, res);
    194      }
    195    }
    196 
    197    bi = mtc->blockIndex;
    198 
    199    if (++mtc->blockIndex >= mtc->numBlocksMax)
    200      mtc->blockIndex = 0;
    201 
    202    bufIndex = (unsigned)(int)-1;
    203 
    204    if (res == SZ_OK)
    205      res = MtProgress_GetError(&mtc->mtProgress);
    206 
    207    if (res != SZ_OK)
    208      finished = True;
    209 
    210    if (!finished)
    211    {
    212      if (mtc->numStartedThreads < mtc->numStartedThreadsLimit
    213          && mtc->expectedDataSize != readProcessed)
    214      {
    215        res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]);
    216        if (res == SZ_OK)
    217          mtc->numStartedThreads++;
    218        else
    219        {
    220          MtProgress_SetError(&mtc->mtProgress, res);
    221          finished = True;
    222        }
    223      }
    224    }
    225 
    226    if (finished)
    227      mtc->stopReading = True;
    228 
    229    RINOK_THREAD(Event_Set(&mtc->readEvent))
    230 
    231    if (res2 != SZ_OK)
    232      return res2;
    233 
    234    if (res == SZ_OK)
    235    {
    236      CriticalSection_Enter(&mtc->cs);
    237      bufIndex = mtc->freeBlockHead;
    238      mtc->freeBlockHead = mtc->freeBlockList[bufIndex];
    239      CriticalSection_Leave(&mtc->cs);
    240      
    241      res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex,
    242          mtc->inStream ? t->inBuf : inData, size, finished);
    243      
    244      // MtProgress_Reinit(&mtc->mtProgress, t->index);
    245 
    246      if (res != SZ_OK)
    247        MtProgress_SetError(&mtc->mtProgress, res);
    248    }
    249 
    250    {
    251      CMtCoderBlock *block = &mtc->blocks[bi];
    252      block->res = res;
    253      block->bufIndex = bufIndex;
    254      block->finished = finished;
    255    }
    256    
    257    #ifdef MTCODER__USE_WRITE_THREAD
    258      RINOK_THREAD(Event_Set(&mtc->writeEvents[bi]))
    259    #else
    260    {
    261      unsigned wi;
    262      {
    263        CriticalSection_Enter(&mtc->cs);
    264        wi = mtc->writeIndex;
    265        if (wi == bi)
    266          mtc->writeIndex = (unsigned)(int)-1;
    267        else
    268          mtc->ReadyBlocks[bi] = True;
    269        CriticalSection_Leave(&mtc->cs);
    270      }
    271 
    272      if (wi != bi)
    273      {
    274        if (res != SZ_OK || finished)
    275          return 0;
    276        continue;
    277      }
    278 
    279      if (mtc->writeRes != SZ_OK)
    280        res = mtc->writeRes;
    281 
    282      for (;;)
    283      {
    284        if (res == SZ_OK && bufIndex != (unsigned)(int)-1)
    285        {
    286          res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex);
    287          if (res != SZ_OK)
    288          {
    289            mtc->writeRes = res;
    290            MtProgress_SetError(&mtc->mtProgress, res);
    291          }
    292        }
    293 
    294        if (++wi >= mtc->numBlocksMax)
    295          wi = 0;
    296        {
    297          Bool isReady;
    298 
    299          CriticalSection_Enter(&mtc->cs);
    300          
    301          if (bufIndex != (unsigned)(int)-1)
    302          {
    303            mtc->freeBlockList[bufIndex] = mtc->freeBlockHead;
    304            mtc->freeBlockHead = bufIndex;
    305          }
    306          
    307          isReady = mtc->ReadyBlocks[wi];
    308          
    309          if (isReady)
    310            mtc->ReadyBlocks[wi] = False;
    311          else
    312            mtc->writeIndex = wi;
    313          
    314          CriticalSection_Leave(&mtc->cs);
    315 
    316          RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore))
    317 
    318          if (!isReady)
    319            break;
    320        }
    321 
    322        {
    323          CMtCoderBlock *block = &mtc->blocks[wi];
    324          if (res == SZ_OK && block->res != SZ_OK)
    325            res = block->res;
    326          bufIndex = block->bufIndex;
    327          finished = block->finished;
    328        }
    329      }
    330    }
    331    #endif
    332      
    333    if (finished || res != SZ_OK)
    334      return 0;
    335  }
    336 }
    337 
    338 
    339 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
    340 {
    341  CMtCoderThread *t = (CMtCoderThread *)pp;
    342  for (;;)
    343  {
    344    if (Event_Wait(&t->startEvent) != 0)
    345      return SZ_ERROR_THREAD;
    346    if (t->stop)
    347      return 0;
    348    {
    349      SRes res = ThreadFunc2(t);
    350      CMtCoder *mtc = t->mtCoder;
    351      if (res != SZ_OK)
    352      {
    353        MtProgress_SetError(&mtc->mtProgress, res);
    354      }
    355      
    356      #ifndef MTCODER__USE_WRITE_THREAD
    357      {
    358        unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads);
    359        if (numFinished == mtc->numStartedThreads)
    360          if (Event_Set(&mtc->finishedEvent) != 0)
    361            return SZ_ERROR_THREAD;
    362      }
    363      #endif
    364    }
    365  }
    366 }
    367 
    368 
    369 
    370 void MtCoder_Construct(CMtCoder *p)
    371 {
    372  unsigned i;
    373  
    374  p->blockSize = 0;
    375  p->numThreadsMax = 0;
    376  p->expectedDataSize = (UInt64)(Int64)-1;
    377 
    378  p->inStream = NULL;
    379  p->inData = NULL;
    380  p->inDataSize = 0;
    381 
    382  p->progress = NULL;
    383  p->allocBig = NULL;
    384 
    385  p->mtCallback = NULL;
    386  p->mtCallbackObject = NULL;
    387 
    388  p->allocatedBufsSize = 0;
    389 
    390  Event_Construct(&p->readEvent);
    391  Semaphore_Construct(&p->blocksSemaphore);
    392 
    393  for (i = 0; i < MTCODER__THREADS_MAX; i++)
    394  {
    395    CMtCoderThread *t = &p->threads[i];
    396    t->mtCoder = p;
    397    t->index = i;
    398    t->inBuf = NULL;
    399    t->stop = False;
    400    Event_Construct(&t->startEvent);
    401    Thread_Construct(&t->thread);
    402  }
    403 
    404  #ifdef MTCODER__USE_WRITE_THREAD
    405    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
    406      Event_Construct(&p->writeEvents[i]);
    407  #else
    408    Event_Construct(&p->finishedEvent);
    409  #endif
    410 
    411  CriticalSection_Init(&p->cs);
    412  CriticalSection_Init(&p->mtProgress.cs);
    413 }
    414 
    415 
    416 
    417 
    418 static void MtCoder_Free(CMtCoder *p)
    419 {
    420  unsigned i;
    421 
    422  /*
    423  p->stopReading = True;
    424  if (Event_IsCreated(&p->readEvent))
    425    Event_Set(&p->readEvent);
    426  */
    427 
    428  for (i = 0; i < MTCODER__THREADS_MAX; i++)
    429    MtCoderThread_Destruct(&p->threads[i]);
    430 
    431  Event_Close(&p->readEvent);
    432  Semaphore_Close(&p->blocksSemaphore);
    433 
    434  #ifdef MTCODER__USE_WRITE_THREAD
    435    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
    436      Event_Close(&p->writeEvents[i]);
    437  #else
    438    Event_Close(&p->finishedEvent);
    439  #endif
    440 }
    441 
    442 
    443 void MtCoder_Destruct(CMtCoder *p)
    444 {
    445  MtCoder_Free(p);
    446 
    447  CriticalSection_Delete(&p->cs);
    448  CriticalSection_Delete(&p->mtProgress.cs);
    449 }
    450 
    451 
    452 SRes MtCoder_Code(CMtCoder *p)
    453 {
    454  unsigned numThreads = p->numThreadsMax;
    455  unsigned numBlocksMax;
    456  unsigned i;
    457  SRes res = SZ_OK;
    458 
    459  if (numThreads > MTCODER__THREADS_MAX)
    460    numThreads = MTCODER__THREADS_MAX;
    461  numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads);
    462  
    463  if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++;
    464  if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++;
    465  if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++;
    466 
    467  if (numBlocksMax > MTCODER__BLOCKS_MAX)
    468    numBlocksMax = MTCODER__BLOCKS_MAX;
    469 
    470  if (p->blockSize != p->allocatedBufsSize)
    471  {
    472    for (i = 0; i < MTCODER__THREADS_MAX; i++)
    473    {
    474      CMtCoderThread *t = &p->threads[i];
    475      if (t->inBuf)
    476      {
    477        ISzAlloc_Free(p->allocBig, t->inBuf);
    478        t->inBuf = NULL;
    479      }
    480    }
    481    p->allocatedBufsSize = p->blockSize;
    482  }
    483 
    484  p->readRes = SZ_OK;
    485 
    486  MtProgress_Init(&p->mtProgress, p->progress);
    487 
    488  #ifdef MTCODER__USE_WRITE_THREAD
    489    for (i = 0; i < numBlocksMax; i++)
    490    {
    491      RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i]));
    492    }
    493  #else
    494    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
    495  #endif
    496 
    497  {
    498    RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent));
    499 
    500    if (Semaphore_IsCreated(&p->blocksSemaphore))
    501    {
    502      RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore));
    503    }
    504    RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax));
    505  }
    506 
    507  for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++)
    508    p->freeBlockList[i] = i + 1;
    509  p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1;
    510  p->freeBlockHead = 0;
    511 
    512  p->readProcessed = 0;
    513  p->blockIndex = 0;
    514  p->numBlocksMax = numBlocksMax;
    515  p->stopReading = False;
    516 
    517  #ifndef MTCODER__USE_WRITE_THREAD
    518    p->writeIndex = 0;
    519    p->writeRes = SZ_OK;
    520    for (i = 0; i < MTCODER__BLOCKS_MAX; i++)
    521      p->ReadyBlocks[i] = False;
    522    p->numFinishedThreads = 0;
    523  #endif
    524 
    525  p->numStartedThreadsLimit = numThreads;
    526  p->numStartedThreads = 0;
    527 
    528  // for (i = 0; i < numThreads; i++)
    529  {
    530    CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++];
    531    RINOK(MtCoderThread_CreateAndStart(nextThread));
    532  }
    533 
    534  RINOK_THREAD(Event_Set(&p->readEvent))
    535 
    536  #ifdef MTCODER__USE_WRITE_THREAD
    537  {
    538    unsigned bi = 0;
    539 
    540    for (;; bi++)
    541    {
    542      if (bi >= numBlocksMax)
    543        bi = 0;
    544 
    545      RINOK_THREAD(Event_Wait(&p->writeEvents[bi]))
    546 
    547      {
    548        const CMtCoderBlock *block = &p->blocks[bi];
    549        unsigned bufIndex = block->bufIndex;
    550        Bool finished = block->finished;
    551        if (res == SZ_OK && block->res != SZ_OK)
    552          res = block->res;
    553 
    554        if (bufIndex != (unsigned)(int)-1)
    555        {
    556          if (res == SZ_OK)
    557          {
    558            res = p->mtCallback->Write(p->mtCallbackObject, bufIndex);
    559            if (res != SZ_OK)
    560              MtProgress_SetError(&p->mtProgress, res);
    561          }
    562          
    563          CriticalSection_Enter(&p->cs);
    564          {
    565            p->freeBlockList[bufIndex] = p->freeBlockHead;
    566            p->freeBlockHead = bufIndex;
    567          }
    568          CriticalSection_Leave(&p->cs);
    569        }
    570        
    571        RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore))
    572 
    573        if (finished)
    574          break;
    575      }
    576    }
    577  }
    578  #else
    579  {
    580    WRes wres = Event_Wait(&p->finishedEvent);
    581    res = MY_SRes_HRESULT_FROM_WRes(wres);
    582  }
    583  #endif
    584 
    585  if (res == SZ_OK)
    586    res = p->readRes;
    587 
    588  if (res == SZ_OK)
    589    res = p->mtProgress.res;
    590 
    591  #ifndef MTCODER__USE_WRITE_THREAD
    592    if (res == SZ_OK)
    593      res = p->writeRes;
    594  #endif
    595 
    596  if (res != SZ_OK)
    597    MtCoder_Free(p);
    598  return res;
    599 }
    600 
    601 #endif