tor-browser

The Tor Browser
git clone https://git.dasho.dev/tor-browser.git
Log | Files | Refs | README | LICENSE

MtDec.c (29807B)


      1 /* MtDec.c -- Multi-thread Decoder
      2 2018-03-02 : Igor Pavlov : Public domain */
      3 
      4 #include "Precomp.h"
      5 
      6 // #define SHOW_DEBUG_INFO
      7 
      8 // #include <stdio.h>
      9 
     10 #ifdef SHOW_DEBUG_INFO
     11 #include <stdio.h>
     12 #endif
     13 
     14 #ifdef SHOW_DEBUG_INFO
     15 #define PRF(x) x
     16 #else
     17 #define PRF(x)
     18 #endif
     19 
     20 #define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
     21 
     22 #include "MtDec.h"
     23 
     24 #ifndef _7ZIP_ST
     25 
     26 void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
     27 {
     28  p->progress = progress;
     29  p->res = SZ_OK;
     30  p->totalInSize = 0;
     31  p->totalOutSize = 0;
     32 }
     33 
     34 
     35 SRes MtProgress_Progress_ST(CMtProgress *p)
     36 {
     37  if (p->res == SZ_OK && p->progress)
     38    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
     39      p->res = SZ_ERROR_PROGRESS;
     40  return p->res;
     41 }
     42 
     43 
     44 SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
     45 {
     46  SRes res;
     47  CriticalSection_Enter(&p->cs);
     48  
     49  p->totalInSize += inSize;
     50  p->totalOutSize += outSize;
     51  if (p->res == SZ_OK && p->progress)
     52    if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
     53      p->res = SZ_ERROR_PROGRESS;
     54  res = p->res;
     55  
     56  CriticalSection_Leave(&p->cs);
     57  return res;
     58 }
     59 
     60 
     61 SRes MtProgress_GetError(CMtProgress *p)
     62 {
     63  SRes res;
     64  CriticalSection_Enter(&p->cs);
     65  res = p->res;
     66  CriticalSection_Leave(&p->cs);
     67  return res;
     68 }
     69 
     70 
     71 void MtProgress_SetError(CMtProgress *p, SRes res)
     72 {
     73  CriticalSection_Enter(&p->cs);
     74  if (p->res == SZ_OK)
     75    p->res = res;
     76  CriticalSection_Leave(&p->cs);
     77 }
     78 
     79 
     80 #define RINOK_THREAD(x) RINOK(x)
     81 
     82 
     83 static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
     84 {
     85  if (Event_IsCreated(p))
     86    return Event_Reset(p);
     87  return AutoResetEvent_CreateNotSignaled(p);
     88 }
     89 
     90 
     91 
     92 typedef struct
     93 {
     94  void *next;
     95  void *pad[3];
     96 } CMtDecBufLink;
     97 
     98 #define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
     99 #define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
    100 
    101 
    102 
    103 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp);
    104 
    105 
    106 static WRes MtDecThread_CreateEvents(CMtDecThread *t)
    107 {
    108  WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
    109  if (wres == 0)
    110  {
    111    wres = ArEvent_OptCreate_And_Reset(&t->canRead);
    112    if (wres == 0)
    113      return SZ_OK;
    114  }
    115  return wres;
    116 }
    117 
    118 
    119 static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
    120 {
    121  WRes wres = MtDecThread_CreateEvents(t);
    122  // wres = 17; // for test
    123  if (wres == 0)
    124  {
    125    if (Thread_WasCreated(&t->thread))
    126      return SZ_OK;
    127    wres = Thread_Create(&t->thread, ThreadFunc, t);
    128    if (wres == 0)
    129      return SZ_OK;
    130  }
    131  return MY_SRes_HRESULT_FROM_WRes(wres);
    132 }
    133 
    134 
    135 void MtDecThread_FreeInBufs(CMtDecThread *t)
    136 {
    137  if (t->inBuf)
    138  {
    139    void *link = t->inBuf;
    140    t->inBuf = NULL;
    141    do
    142    {
    143      void *next = ((CMtDecBufLink *)link)->next;
    144      ISzAlloc_Free(t->mtDec->alloc, link);
    145      link = next;
    146    }
    147    while (link);
    148  }
    149 }
    150 
    151 
    152 static void MtDecThread_CloseThread(CMtDecThread *t)
    153 {
    154  if (Thread_WasCreated(&t->thread))
    155  {
    156    Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
    157    Event_Set(&t->canRead);
    158    Thread_Wait(&t->thread);
    159    Thread_Close(&t->thread);
    160  }
    161 
    162  Event_Close(&t->canRead);
    163  Event_Close(&t->canWrite);
    164 }
    165 
    166 static void MtDec_CloseThreads(CMtDec *p)
    167 {
    168  unsigned i;
    169  for (i = 0; i < MTDEC__THREADS_MAX; i++)
    170    MtDecThread_CloseThread(&p->threads[i]);
    171 }
    172 
    173 static void MtDecThread_Destruct(CMtDecThread *t)
    174 {
    175  MtDecThread_CloseThread(t);
    176  MtDecThread_FreeInBufs(t);
    177 }
    178 
    179 
    180 
    181 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
    182 {
    183  size_t size = *processedSize;
    184  *processedSize = 0;
    185  while (size != 0)
    186  {
    187    size_t cur = size;
    188    SRes res = ISeqInStream_Read(stream, data, &cur);
    189    *processedSize += cur;
    190    data += cur;
    191    size -= cur;
    192    RINOK(res);
    193    if (cur == 0)
    194      return SZ_OK;
    195  }
    196  return SZ_OK;
    197 }
    198 
    199 
    200 static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, Bool *wasInterrupted)
    201 {
    202  SRes res;
    203  CriticalSection_Enter(&p->mtProgress.cs);
    204  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
    205  res = p->mtProgress.res;
    206  CriticalSection_Leave(&p->mtProgress.cs);
    207  return res;
    208 }
    209 
    210 static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, Bool *wasInterrupted)
    211 {
    212  SRes res;
    213  CriticalSection_Enter(&p->mtProgress.cs);
    214 
    215  p->mtProgress.totalInSize += inSize;
    216  p->mtProgress.totalOutSize += outSize;
    217  if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
    218    if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
    219      p->mtProgress.res = SZ_ERROR_PROGRESS;
    220 
    221  *wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
    222  res = p->mtProgress.res;
    223  
    224  CriticalSection_Leave(&p->mtProgress.cs);
    225 
    226  return res;
    227 }
    228 
    229 static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
    230 {
    231  CriticalSection_Enter(&p->mtProgress.cs);
    232  if (!p->needInterrupt || interruptIndex < p->interruptIndex)
    233  {
    234    p->interruptIndex = interruptIndex;
    235    p->needInterrupt = True;
    236  }
    237  CriticalSection_Leave(&p->mtProgress.cs);
    238 }
    239 
    240 Byte *MtDec_GetCrossBuff(CMtDec *p)
    241 {
    242  Byte *cr = p->crossBlock;
    243  if (!cr)
    244  {
    245    cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
    246    if (!cr)
    247      return NULL;
    248    p->crossBlock = cr;
    249  }
    250  return MTDEC__DATA_PTR_FROM_LINK(cr);
    251 }
    252 
    253 
    254 /*
    255  ThreadFunc2() returns:
    256  0      - in all normal cases (even for stream error or memory allocation error)
    257  (!= 0) - WRes error return by system threading function
    258 */
    259 
    260 // #define MTDEC_ProgessStep (1 << 22)
    261 #define MTDEC_ProgessStep (1 << 0)
    262 
    263 static WRes ThreadFunc2(CMtDecThread *t)
    264 {
    265  CMtDec *p = t->mtDec;
    266 
    267  PRF_STR_INT("ThreadFunc2", t->index);
    268 
    269  // SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
    270 
    271  for (;;)
    272  {
    273    SRes res, codeRes;
    274    Bool wasInterrupted, isAllocError, overflow, finish;
    275    SRes threadingErrorSRes;
    276    Bool needCode, needWrite, needContinue;
    277    
    278    size_t inDataSize_Start;
    279    UInt64 inDataSize;
    280    // UInt64 inDataSize_Full;
    281    
    282    UInt64 blockIndex;
    283 
    284    UInt64 inPrev = 0;
    285    UInt64 outPrev = 0;
    286    UInt64 inCodePos;
    287    UInt64 outCodePos;
    288    
    289    Byte *afterEndData = NULL;
    290    size_t afterEndData_Size = 0;
    291 
    292    Bool canCreateNewThread = False;
    293    // CMtDecCallbackInfo parse;
    294    CMtDecThread *nextThread;
    295 
    296    PRF_STR_INT("Event_Wait(&t->canRead)", t->index);
    297 
    298    RINOK_THREAD(Event_Wait(&t->canRead));
    299    if (p->exitThread)
    300      return 0;
    301 
    302    PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
    303 
    304    // if (t->index == 3) return 19; // for test
    305 
    306    blockIndex = p->blockIndex++;
    307 
    308    // PRF(printf("\ncanRead\n"))
    309 
    310    res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
    311 
    312    finish = p->readWasFinished;
    313    needCode = False;
    314    needWrite = False;
    315    isAllocError = False;
    316    overflow = False;
    317 
    318    inDataSize_Start = 0;
    319    inDataSize = 0;
    320    // inDataSize_Full = 0;
    321 
    322    if (res == SZ_OK && !wasInterrupted)
    323    {
    324      // if (p->inStream)
    325      {
    326        CMtDecBufLink *prev = NULL;
    327        CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
    328        size_t crossSize = p->crossEnd - p->crossStart;
    329 
    330        PRF(printf("\ncrossSize = %d\n", crossSize));
    331 
    332        for (;;)
    333        {
    334          if (!link)
    335          {
    336            link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
    337            if (!link)
    338            {
    339              finish = True;
    340              // p->allocError_for_Read_BlockIndex = blockIndex;
    341              isAllocError = True;
    342              break;
    343            }
    344            link->next = NULL;
    345            if (prev)
    346            {
    347              // static unsigned g_num = 0;
    348              // printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
    349              prev->next = link;
    350            }
    351            else
    352              t->inBuf = (void *)link;
    353          }
    354 
    355          {
    356            Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
    357            Byte *parseData = data;
    358            size_t size;
    359 
    360            if (crossSize != 0)
    361            {
    362              inDataSize = crossSize;
    363              // inDataSize_Full = inDataSize;
    364              inDataSize_Start = crossSize;
    365              size = crossSize;
    366              parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
    367              PRF(printf("\ncross : crossStart = %7d  crossEnd = %7d finish = %1d",
    368                  (int)p->crossStart, (int)p->crossEnd, (int)finish));
    369            }
    370            else
    371            {
    372              size = p->inBufSize;
    373              
    374              res = FullRead(p->inStream, data, &size);
    375              
    376              // size = 10; // test
    377 
    378              inDataSize += size;
    379              // inDataSize_Full = inDataSize;
    380              if (!prev)
    381                inDataSize_Start = size;
    382 
    383              p->readProcessed += size;
    384              finish = (size != p->inBufSize);
    385              if (finish)
    386                p->readWasFinished = True;
    387              
    388              // res = E_INVALIDARG; // test
    389 
    390              if (res != SZ_OK)
    391              {
    392                // PRF(printf("\nRead error = %d\n", res))
    393                // we want to decode all data before error
    394                p->readRes = res;
    395                // p->readError_BlockIndex = blockIndex;
    396                p->readWasFinished = True;
    397                finish = True;
    398                res = SZ_OK;
    399                // break;
    400              }
    401 
    402              if (inDataSize - inPrev >= MTDEC_ProgessStep)
    403              {
    404                res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
    405                if (res != SZ_OK || wasInterrupted)
    406                  break;
    407                inPrev = inDataSize;
    408              }
    409            }
    410 
    411            {
    412              CMtDecCallbackInfo parse;
    413 
    414              parse.startCall = (prev == NULL);
    415              parse.src = parseData;
    416              parse.srcSize = size;
    417              parse.srcFinished = finish;
    418              parse.canCreateNewThread = True;
    419 
    420              // PRF(printf("\nParse size = %d\n", (unsigned)size))
    421 
    422              p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
    423 
    424              needWrite = True;
    425              canCreateNewThread = parse.canCreateNewThread;
    426 
    427              // printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
    428              
    429              if (
    430                  // parseRes != SZ_OK ||
    431                  // inDataSize - (size - parse.srcSize) > p->inBlockMax
    432                  // ||
    433                  parse.state == MTDEC_PARSE_OVERFLOW
    434                  // || wasInterrupted
    435                  )
    436              {
    437                // Overflow or Parse error - switch from MT decoding to ST decoding
    438                finish = True;
    439                overflow = True;
    440 
    441                {
    442                  PRF(printf("\n Overflow"));
    443                  // PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
    444                  PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
    445                }
    446                
    447                if (crossSize != 0)
    448                  memcpy(data, parseData, size);
    449                p->crossStart = 0;
    450                p->crossEnd = 0;
    451                break;
    452              }
    453 
    454              if (crossSize != 0)
    455              {
    456                memcpy(data, parseData, parse.srcSize);
    457                p->crossStart += parse.srcSize;
    458              }
    459 
    460              if (parse.state != MTDEC_PARSE_CONTINUE || finish)
    461              {
    462                // we don't need to parse in current thread anymore
    463 
    464                if (parse.state == MTDEC_PARSE_END)
    465                  finish = True;
    466 
    467                needCode = True;
    468                // p->crossFinished = finish;
    469 
    470                if (parse.srcSize == size)
    471                {
    472                  // full parsed - no cross transfer
    473                  p->crossStart = 0;
    474                  p->crossEnd = 0;
    475                  break;
    476                }
    477 
    478                if (parse.state == MTDEC_PARSE_END)
    479                {
    480                  p->crossStart = 0;
    481                  p->crossEnd = 0;
    482 
    483                  if (crossSize != 0)
    484                    memcpy(data + parse.srcSize, parseData + parse.srcSize, size - parse.srcSize); // we need all data
    485                  afterEndData_Size = size - parse.srcSize;
    486                  afterEndData = parseData + parse.srcSize;
    487 
    488                  // we reduce data size to required bytes (parsed only)
    489                  inDataSize -= (size - parse.srcSize);
    490                  if (!prev)
    491                    inDataSize_Start = parse.srcSize;
    492                  break;
    493                }
    494 
    495                {
    496                  // partial parsed - need cross transfer
    497                  if (crossSize != 0)
    498                    inDataSize = parse.srcSize; // it's only parsed now
    499                  else
    500                  {
    501                    // partial parsed - is not in initial cross block - we need to copy new data to cross block
    502                    Byte *cr = MtDec_GetCrossBuff(p);
    503                    if (!cr)
    504                    {
    505                      {
    506                        PRF(printf("\ncross alloc error error\n"));
    507                        // res = SZ_ERROR_MEM;
    508                        finish = True;
    509                        // p->allocError_for_Read_BlockIndex = blockIndex;
    510                        isAllocError = True;
    511                        break;
    512                      }
    513                    }
    514 
    515                    {
    516                      size_t crSize = size - parse.srcSize;
    517                      inDataSize -= crSize;
    518                      p->crossEnd = crSize;
    519                      p->crossStart = 0;
    520                      memcpy(cr, parseData + parse.srcSize, crSize);
    521                    }
    522                  }
    523 
    524                  // inDataSize_Full = inDataSize;
    525                  if (!prev)
    526                    inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
    527 
    528                  finish = False;
    529                  break;
    530                }
    531              }
    532 
    533              if (parse.srcSize != size)
    534              {
    535                res = SZ_ERROR_FAIL;
    536                PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
    537                break;
    538              }
    539            }
    540          }
    541          
    542          prev = link;
    543          link = link->next;
    544 
    545          if (crossSize != 0)
    546          {
    547            crossSize = 0;
    548            p->crossStart = 0;
    549            p->crossEnd = 0;
    550          }
    551        }
    552      }
    553 
    554      if (res == SZ_OK)
    555        res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
    556    }
    557 
    558    codeRes = SZ_OK;
    559 
    560    if (res == SZ_OK && needCode && !wasInterrupted)
    561    {
    562      codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
    563      if (codeRes != SZ_OK)
    564      {
    565        needCode = False;
    566        finish = True;
    567        // SZ_ERROR_MEM is expected error here.
    568        //   if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
    569        //   if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
    570      }
    571    }
    572    
    573    if (res != SZ_OK || wasInterrupted)
    574      finish = True;
    575    
    576    nextThread = NULL;
    577    threadingErrorSRes = SZ_OK;
    578 
    579    if (!finish)
    580    {
    581      if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
    582      {
    583        SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
    584        if (res2 == SZ_OK)
    585        {
    586          // if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
    587          p->numStartedThreads++;
    588        }
    589        else
    590        {
    591          PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
    592          if (p->numStartedThreads == 1)
    593          {
    594            // if only one thread is possible, we leave muti-threading code
    595            finish = True;
    596            needCode = False;
    597            threadingErrorSRes = res2;
    598          }
    599          else
    600            p->numStartedThreads_Limit = p->numStartedThreads;
    601        }
    602      }
    603      
    604      if (!finish)
    605      {
    606        unsigned nextIndex = t->index + 1;
    607        nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
    608        RINOK_THREAD(Event_Set(&nextThread->canRead))
    609        // We have started executing for new iteration (with next thread)
    610        // And that next thread now is responsible for possible exit from decoding (threading_code)
    611      }
    612    }
    613 
    614    // each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
    615    // if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
    616    // if (  finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
    617    //   - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
    618    //   - otherwise we stop decoding and exit from ThreadFunc2()
    619 
    620    // Don't change (finish) variable in the further code
    621 
    622 
    623    // ---------- CODE ----------
    624 
    625    inPrev = 0;
    626    outPrev = 0;
    627    inCodePos = 0;
    628    outCodePos = 0;
    629 
    630    if (res == SZ_OK && needCode && codeRes == SZ_OK)
    631    {
    632      Bool isStartBlock = True;
    633      CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
    634 
    635      for (;;)
    636      {
    637        size_t inSize;
    638        int stop;
    639 
    640        if (isStartBlock)
    641          inSize = inDataSize_Start;
    642        else
    643        {
    644          UInt64 rem = inDataSize - inCodePos;
    645          inSize = p->inBufSize;
    646          if (inSize > rem)
    647            inSize = (size_t)rem;
    648        }
    649 
    650        inCodePos += inSize;
    651        stop = True;
    652 
    653        codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
    654            (const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
    655            (inCodePos == inDataSize), // srcFinished
    656            &inCodePos, &outCodePos, &stop);
    657        
    658        if (codeRes != SZ_OK)
    659        {
    660          PRF(printf("\nCode Interrupt error = %x\n", codeRes));
    661          // we interrupt only later blocks
    662          MtDec_Interrupt(p, blockIndex);
    663          break;
    664        }
    665 
    666        if (stop || inCodePos == inDataSize)
    667          break;
    668  
    669        {
    670          const UInt64 inDelta = inCodePos - inPrev;
    671          const UInt64 outDelta = outCodePos - outPrev;
    672          if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
    673          {
    674            // Sleep(1);
    675            res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
    676            if (res != SZ_OK || wasInterrupted)
    677              break;
    678            inPrev = inCodePos;
    679            outPrev = outCodePos;
    680          }
    681        }
    682 
    683        link = link->next;
    684        isStartBlock = False;
    685      }
    686    }
    687 
    688 
    689    // ---------- WRITE ----------
    690   
    691    RINOK_THREAD(Event_Wait(&t->canWrite));
    692 
    693  {
    694    Bool isErrorMode = False;
    695    Bool canRecode = True;
    696    Bool needWriteToStream = needWrite;
    697 
    698    if (p->exitThread) return 0; // it's never executed in normal cases
    699 
    700    if (p->wasInterrupted)
    701      wasInterrupted = True;
    702    else
    703    {
    704      if (codeRes != SZ_OK) // || !needCode // check it !!!
    705      {
    706        p->wasInterrupted = True;
    707        p->codeRes = codeRes;
    708        if (codeRes == SZ_ERROR_MEM)
    709          isAllocError = True;
    710      }
    711      
    712      if (threadingErrorSRes)
    713      {
    714        p->wasInterrupted = True;
    715        p->threadingErrorSRes = threadingErrorSRes;
    716        needWriteToStream = False;
    717      }
    718      if (isAllocError)
    719      {
    720        p->wasInterrupted = True;
    721        p->isAllocError = True;
    722        needWriteToStream = False;
    723      }
    724      if (overflow)
    725      {
    726        p->wasInterrupted = True;
    727        p->overflow = True;
    728        needWriteToStream = False;
    729      }
    730    }
    731 
    732    if (needCode)
    733    {
    734      if (wasInterrupted)
    735      {
    736        inCodePos = 0;
    737        outCodePos = 0;
    738      }
    739      {
    740        const UInt64 inDelta = inCodePos - inPrev;
    741        const UInt64 outDelta = outCodePos - outPrev;
    742        // if (inDelta != 0 || outDelta != 0)
    743        res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
    744      }
    745    }
    746 
    747    needContinue = (!finish);
    748 
    749    // if (res == SZ_OK && needWrite && !wasInterrupted)
    750    if (needWrite)
    751    {
    752      // p->inProcessed += inCodePos;
    753 
    754      res = p->mtCallback->Write(p->mtCallbackObject, t->index,
    755          res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
    756          afterEndData, afterEndData_Size,
    757          &needContinue,
    758          &canRecode);
    759      
    760      // res= E_INVALIDARG; // for test
    761 
    762      PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
    763      PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
    764 
    765      if (res != SZ_OK)
    766      {
    767        PRF(printf("\nWrite error = %d\n", res));
    768        isErrorMode = True;
    769        p->wasInterrupted = True;
    770      }
    771      if (res != SZ_OK
    772          || (!needContinue && !finish))
    773      {
    774        PRF(printf("\nWrite Interrupt error = %x\n", res));
    775        MtDec_Interrupt(p, blockIndex);
    776      }
    777    }
    778 
    779    if (canRecode)
    780    if (!needCode
    781        || res != SZ_OK
    782        || p->wasInterrupted
    783        || codeRes != SZ_OK
    784        || wasInterrupted
    785        || p->numFilledThreads != 0
    786        || isErrorMode)
    787    {
    788      if (p->numFilledThreads == 0)
    789        p->filledThreadStart = t->index;
    790      if (inDataSize != 0 || !finish)
    791      {
    792        t->inDataSize_Start = inDataSize_Start;
    793        t->inDataSize = inDataSize;
    794        p->numFilledThreads++;
    795      }
    796      PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
    797      PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
    798    }
    799 
    800    if (!finish)
    801    {
    802      RINOK_THREAD(Event_Set(&nextThread->canWrite));
    803    }
    804    else
    805    {
    806      if (needContinue)
    807      {
    808        // we restore decoding with new iteration
    809        RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
    810      }
    811      else
    812      {
    813        // we exit from decoding
    814        if (t->index == 0)
    815          return SZ_OK;
    816        p->exitThread = True;
    817      }
    818      RINOK_THREAD(Event_Set(&p->threads[0].canRead));
    819    }
    820  }
    821  }
    822 }
    823 
    824 #ifdef _WIN32
    825 #define USE_ALLOCA
    826 #endif
    827 
    828 #ifdef USE_ALLOCA
    829 #ifdef _WIN32
    830 #include <malloc.h>
    831 #else
    832 #include <stdlib.h>
    833 #endif
    834 #endif
    835 
    836 
    837 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc1(void *pp)
    838 {
    839  WRes res;
    840 
    841  CMtDecThread *t = (CMtDecThread *)pp;
    842  CMtDec *p;
    843 
    844  // fprintf(stdout, "\n%d = %p\n", t->index, &t);
    845 
    846  res = ThreadFunc2(t);
    847  p = t->mtDec;
    848  if (res == 0)
    849    return p->exitThreadWRes;
    850  {
    851    // it's unexpected situation for some threading function error
    852    if (p->exitThreadWRes == 0)
    853      p->exitThreadWRes = res;
    854    PRF(printf("\nthread exit error = %d\n", res));
    855    p->exitThread = True;
    856    Event_Set(&p->threads[0].canRead);
    857    Event_Set(&p->threads[0].canWrite);
    858    MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
    859  }
    860  return res;
    861 }
    862 
    863 static MY_NO_INLINE THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp)
    864 {
    865  CMtDecThread *t = (CMtDecThread *)pp;
    866 
    867  // fprintf(stderr, "\n%d = %p - before", t->index, &t);
    868  #ifdef USE_ALLOCA
    869  t->allocaPtr = alloca(t->index * 128);
    870  #endif
    871  return ThreadFunc1(pp);
    872 }
    873 
    874 
    875 int MtDec_PrepareRead(CMtDec *p)
    876 {
    877  if (p->crossBlock && p->crossStart == p->crossEnd)
    878  {
    879    ISzAlloc_Free(p->alloc, p->crossBlock);
    880    p->crossBlock = NULL;
    881  }
    882    
    883  {
    884    unsigned i;
    885    for (i = 0; i < MTDEC__THREADS_MAX; i++)
    886      if (i > p->numStartedThreads
    887          || p->numFilledThreads <=
    888            (i >= p->filledThreadStart ?
    889              i - p->filledThreadStart :
    890              i + p->numStartedThreads - p->filledThreadStart))
    891        MtDecThread_FreeInBufs(&p->threads[i]);
    892  }
    893 
    894  return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
    895 }
    896 
    897    
    898 const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
    899 {
    900  while (p->numFilledThreads != 0)
    901  {
    902    CMtDecThread *t = &p->threads[p->filledThreadStart];
    903    
    904    if (*inLim != 0)
    905    {
    906      {
    907        void *link = t->inBuf;
    908        void *next = ((CMtDecBufLink *)link)->next;
    909        ISzAlloc_Free(p->alloc, link);
    910        t->inBuf = next;
    911      }
    912      
    913      if (t->inDataSize == 0)
    914      {
    915        MtDecThread_FreeInBufs(t);
    916        if (--p->numFilledThreads == 0)
    917          break;
    918        if (++p->filledThreadStart == p->numStartedThreads)
    919          p->filledThreadStart = 0;
    920        t = &p->threads[p->filledThreadStart];
    921      }
    922    }
    923    
    924    {
    925      size_t lim = t->inDataSize_Start;
    926      if (lim != 0)
    927        t->inDataSize_Start = 0;
    928      else
    929      {
    930        UInt64 rem = t->inDataSize;
    931        lim = p->inBufSize;
    932        if (lim > rem)
    933          lim = (size_t)rem;
    934      }
    935      t->inDataSize -= lim;
    936      *inLim = lim;
    937      return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
    938    }
    939  }
    940 
    941  {
    942    size_t crossSize = p->crossEnd - p->crossStart;
    943    if (crossSize != 0)
    944    {
    945      const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
    946      *inLim = crossSize;
    947      p->crossStart = 0;
    948      p->crossEnd = 0;
    949      return data;
    950    }
    951    *inLim = 0;
    952    if (p->crossBlock)
    953    {
    954      ISzAlloc_Free(p->alloc, p->crossBlock);
    955      p->crossBlock = NULL;
    956    }
    957    return NULL;
    958  }
    959 }
    960 
    961 
    962 void MtDec_Construct(CMtDec *p)
    963 {
    964  unsigned i;
    965  
    966  p->inBufSize = (size_t)1 << 18;
    967 
    968  p->numThreadsMax = 0;
    969 
    970  p->inStream = NULL;
    971  
    972  // p->inData = NULL;
    973  // p->inDataSize = 0;
    974 
    975  p->crossBlock = NULL;
    976  p->crossStart = 0;
    977  p->crossEnd = 0;
    978 
    979  p->numFilledThreads = 0;
    980 
    981  p->progress = NULL;
    982  p->alloc = NULL;
    983 
    984  p->mtCallback = NULL;
    985  p->mtCallbackObject = NULL;
    986 
    987  p->allocatedBufsSize = 0;
    988 
    989  for (i = 0; i < MTDEC__THREADS_MAX; i++)
    990  {
    991    CMtDecThread *t = &p->threads[i];
    992    t->mtDec = p;
    993    t->index = i;
    994    t->inBuf = NULL;
    995    Event_Construct(&t->canRead);
    996    Event_Construct(&t->canWrite);
    997    Thread_Construct(&t->thread);
    998  }
    999 
   1000  // Event_Construct(&p->finishedEvent);
   1001 
   1002  CriticalSection_Init(&p->mtProgress.cs);
   1003 }
   1004 
   1005 
   1006 static void MtDec_Free(CMtDec *p)
   1007 {
   1008  unsigned i;
   1009 
   1010  p->exitThread = True;
   1011 
   1012  for (i = 0; i < MTDEC__THREADS_MAX; i++)
   1013    MtDecThread_Destruct(&p->threads[i]);
   1014 
   1015  // Event_Close(&p->finishedEvent);
   1016 
   1017  if (p->crossBlock)
   1018  {
   1019    ISzAlloc_Free(p->alloc, p->crossBlock);
   1020    p->crossBlock = NULL;
   1021  }
   1022 }
   1023 
   1024 
   1025 void MtDec_Destruct(CMtDec *p)
   1026 {
   1027  MtDec_Free(p);
   1028 
   1029  CriticalSection_Delete(&p->mtProgress.cs);
   1030 }
   1031 
   1032 
   1033 SRes MtDec_Code(CMtDec *p)
   1034 {
   1035  unsigned i;
   1036 
   1037  p->inProcessed = 0;
   1038 
   1039  p->blockIndex = 1; // it must be larger than not_defined index (0)
   1040  p->isAllocError = False;
   1041  p->overflow = False;
   1042  p->threadingErrorSRes = SZ_OK;
   1043 
   1044  p->needContinue = True;
   1045 
   1046  p->readWasFinished = False;
   1047  p->needInterrupt = False;
   1048  p->interruptIndex = (UInt64)(Int64)-1;
   1049 
   1050  p->readProcessed = 0;
   1051  p->readRes = SZ_OK;
   1052  p->codeRes = SZ_OK;
   1053  p->wasInterrupted = False;
   1054 
   1055  p->crossStart = 0;
   1056  p->crossEnd = 0;
   1057 
   1058  p->filledThreadStart = 0;
   1059  p->numFilledThreads = 0;
   1060 
   1061  {
   1062    unsigned numThreads = p->numThreadsMax;
   1063    if (numThreads > MTDEC__THREADS_MAX)
   1064      numThreads = MTDEC__THREADS_MAX;
   1065    p->numStartedThreads_Limit = numThreads;
   1066    p->numStartedThreads = 0;
   1067  }
   1068 
   1069  if (p->inBufSize != p->allocatedBufsSize)
   1070  {
   1071    for (i = 0; i < MTDEC__THREADS_MAX; i++)
   1072    {
   1073      CMtDecThread *t = &p->threads[i];
   1074      if (t->inBuf)
   1075        MtDecThread_FreeInBufs(t);
   1076    }
   1077    if (p->crossBlock)
   1078    {
   1079      ISzAlloc_Free(p->alloc, p->crossBlock);
   1080      p->crossBlock = NULL;
   1081    }
   1082 
   1083    p->allocatedBufsSize = p->inBufSize;
   1084  }
   1085 
   1086  MtProgress_Init(&p->mtProgress, p->progress);
   1087 
   1088  // RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
   1089  p->exitThread = False;
   1090  p->exitThreadWRes = 0;
   1091 
   1092  {
   1093    WRes wres;
   1094    WRes sres;
   1095    CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
   1096    // wres = MtDecThread_CreateAndStart(nextThread);
   1097    wres = MtDecThread_CreateEvents(nextThread);
   1098    if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
   1099    if (wres == 0) { wres = Event_Set(&nextThread->canRead);
   1100    if (wres == 0) { wres = ThreadFunc(nextThread);
   1101    if (wres != 0)
   1102    {
   1103      p->needContinue = False;
   1104      MtDec_CloseThreads(p);
   1105    }}}}
   1106 
   1107    // wres = 17; // for test
   1108    // wres = Event_Wait(&p->finishedEvent);
   1109 
   1110    sres = MY_SRes_HRESULT_FROM_WRes(wres);
   1111 
   1112    if (sres != 0)
   1113      p->threadingErrorSRes = sres;
   1114 
   1115    if (
   1116        // wres == 0
   1117        // wres != 0
   1118        // || p->mtc.codeRes == SZ_ERROR_MEM
   1119        p->isAllocError
   1120        || p->threadingErrorSRes != SZ_OK
   1121        || p->overflow)
   1122    {
   1123      // p->needContinue = True;
   1124    }
   1125    else
   1126      p->needContinue = False;
   1127    
   1128    if (p->needContinue)
   1129      return SZ_OK;
   1130 
   1131    // if (sres != SZ_OK)
   1132      return sres;
   1133    // return E_FAIL;
   1134  }
   1135 }
   1136 
   1137 #endif