MtCoder.c (14259B)
1 /* MtCoder.c -- Multi-thread Coder 2 2018-02-21 : Igor Pavlov : Public domain */ 3 4 #include "Precomp.h" 5 6 #include "MtCoder.h" 7 8 #ifndef _7ZIP_ST 9 10 SRes MtProgressThunk_Progress(const ICompressProgress *pp, UInt64 inSize, UInt64 outSize) 11 { 12 CMtProgressThunk *thunk = CONTAINER_FROM_VTBL(pp, CMtProgressThunk, vt); 13 UInt64 inSize2 = 0; 14 UInt64 outSize2 = 0; 15 if (inSize != (UInt64)(Int64)-1) 16 { 17 inSize2 = inSize - thunk->inSize; 18 thunk->inSize = inSize; 19 } 20 if (outSize != (UInt64)(Int64)-1) 21 { 22 outSize2 = outSize - thunk->outSize; 23 thunk->outSize = outSize; 24 } 25 return MtProgress_ProgressAdd(thunk->mtProgress, inSize2, outSize2); 26 } 27 28 29 void MtProgressThunk_CreateVTable(CMtProgressThunk *p) 30 { 31 p->vt.Progress = MtProgressThunk_Progress; 32 } 33 34 35 36 #define RINOK_THREAD(x) { if ((x) != 0) return SZ_ERROR_THREAD; } 37 38 39 static WRes ArEvent_OptCreate_And_Reset(CEvent *p) 40 { 41 if (Event_IsCreated(p)) 42 return Event_Reset(p); 43 return AutoResetEvent_CreateNotSignaled(p); 44 } 45 46 47 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp); 48 49 50 static SRes MtCoderThread_CreateAndStart(CMtCoderThread *t) 51 { 52 WRes wres = ArEvent_OptCreate_And_Reset(&t->startEvent); 53 if (wres == 0) 54 { 55 t->stop = False; 56 if (!Thread_WasCreated(&t->thread)) 57 wres = Thread_Create(&t->thread, ThreadFunc, t); 58 if (wres == 0) 59 wres = Event_Set(&t->startEvent); 60 } 61 if (wres == 0) 62 return SZ_OK; 63 return MY_SRes_HRESULT_FROM_WRes(wres); 64 } 65 66 67 static void MtCoderThread_Destruct(CMtCoderThread *t) 68 { 69 if (Thread_WasCreated(&t->thread)) 70 { 71 t->stop = 1; 72 Event_Set(&t->startEvent); 73 Thread_Wait(&t->thread); 74 Thread_Close(&t->thread); 75 } 76 77 Event_Close(&t->startEvent); 78 79 if (t->inBuf) 80 { 81 ISzAlloc_Free(t->mtCoder->allocBig, t->inBuf); 82 t->inBuf = NULL; 83 } 84 } 85 86 87 88 static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize) 89 { 90 size_t size = *processedSize; 91 *processedSize = 0; 92 while (size != 0) 93 { 94 size_t cur = size; 95 SRes res = ISeqInStream_Read(stream, data, &cur); 96 *processedSize += cur; 97 data += cur; 98 size -= cur; 99 RINOK(res); 100 if (cur == 0) 101 return SZ_OK; 102 } 103 return SZ_OK; 104 } 105 106 107 /* 108 ThreadFunc2() returns: 109 SZ_OK - in all normal cases (even for stream error or memory allocation error) 110 SZ_ERROR_THREAD - in case of failure in system synch function 111 */ 112 113 static SRes ThreadFunc2(CMtCoderThread *t) 114 { 115 CMtCoder *mtc = t->mtCoder; 116 117 for (;;) 118 { 119 unsigned bi; 120 SRes res; 121 SRes res2; 122 Bool finished; 123 unsigned bufIndex; 124 size_t size; 125 const Byte *inData; 126 UInt64 readProcessed = 0; 127 128 RINOK_THREAD(Event_Wait(&mtc->readEvent)) 129 130 /* after Event_Wait(&mtc->readEvent) we must call Event_Set(&mtc->readEvent) in any case to unlock another threads */ 131 132 if (mtc->stopReading) 133 { 134 return Event_Set(&mtc->readEvent) == 0 ? SZ_OK : SZ_ERROR_THREAD; 135 } 136 137 res = MtProgress_GetError(&mtc->mtProgress); 138 139 size = 0; 140 inData = NULL; 141 finished = True; 142 143 if (res == SZ_OK) 144 { 145 size = mtc->blockSize; 146 if (mtc->inStream) 147 { 148 if (!t->inBuf) 149 { 150 t->inBuf = (Byte *)ISzAlloc_Alloc(mtc->allocBig, mtc->blockSize); 151 if (!t->inBuf) 152 res = SZ_ERROR_MEM; 153 } 154 if (res == SZ_OK) 155 { 156 res = FullRead(mtc->inStream, t->inBuf, &size); 157 readProcessed = mtc->readProcessed + size; 158 mtc->readProcessed = readProcessed; 159 } 160 if (res != SZ_OK) 161 { 162 mtc->readRes = res; 163 /* after reading error - we can stop encoding of previous blocks */ 164 MtProgress_SetError(&mtc->mtProgress, res); 165 } 166 else 167 finished = (size != mtc->blockSize); 168 } 169 else 170 { 171 size_t rem; 172 readProcessed = mtc->readProcessed; 173 rem = mtc->inDataSize - (size_t)readProcessed; 174 if (size > rem) 175 size = rem; 176 inData = mtc->inData + (size_t)readProcessed; 177 readProcessed += size; 178 mtc->readProcessed = readProcessed; 179 finished = (mtc->inDataSize == (size_t)readProcessed); 180 } 181 } 182 183 /* we must get some block from blocksSemaphore before Event_Set(&mtc->readEvent) */ 184 185 res2 = SZ_OK; 186 187 if (Semaphore_Wait(&mtc->blocksSemaphore) != 0) 188 { 189 res2 = SZ_ERROR_THREAD; 190 if (res == SZ_OK) 191 { 192 res = res2; 193 // MtProgress_SetError(&mtc->mtProgress, res); 194 } 195 } 196 197 bi = mtc->blockIndex; 198 199 if (++mtc->blockIndex >= mtc->numBlocksMax) 200 mtc->blockIndex = 0; 201 202 bufIndex = (unsigned)(int)-1; 203 204 if (res == SZ_OK) 205 res = MtProgress_GetError(&mtc->mtProgress); 206 207 if (res != SZ_OK) 208 finished = True; 209 210 if (!finished) 211 { 212 if (mtc->numStartedThreads < mtc->numStartedThreadsLimit 213 && mtc->expectedDataSize != readProcessed) 214 { 215 res = MtCoderThread_CreateAndStart(&mtc->threads[mtc->numStartedThreads]); 216 if (res == SZ_OK) 217 mtc->numStartedThreads++; 218 else 219 { 220 MtProgress_SetError(&mtc->mtProgress, res); 221 finished = True; 222 } 223 } 224 } 225 226 if (finished) 227 mtc->stopReading = True; 228 229 RINOK_THREAD(Event_Set(&mtc->readEvent)) 230 231 if (res2 != SZ_OK) 232 return res2; 233 234 if (res == SZ_OK) 235 { 236 CriticalSection_Enter(&mtc->cs); 237 bufIndex = mtc->freeBlockHead; 238 mtc->freeBlockHead = mtc->freeBlockList[bufIndex]; 239 CriticalSection_Leave(&mtc->cs); 240 241 res = mtc->mtCallback->Code(mtc->mtCallbackObject, t->index, bufIndex, 242 mtc->inStream ? t->inBuf : inData, size, finished); 243 244 // MtProgress_Reinit(&mtc->mtProgress, t->index); 245 246 if (res != SZ_OK) 247 MtProgress_SetError(&mtc->mtProgress, res); 248 } 249 250 { 251 CMtCoderBlock *block = &mtc->blocks[bi]; 252 block->res = res; 253 block->bufIndex = bufIndex; 254 block->finished = finished; 255 } 256 257 #ifdef MTCODER__USE_WRITE_THREAD 258 RINOK_THREAD(Event_Set(&mtc->writeEvents[bi])) 259 #else 260 { 261 unsigned wi; 262 { 263 CriticalSection_Enter(&mtc->cs); 264 wi = mtc->writeIndex; 265 if (wi == bi) 266 mtc->writeIndex = (unsigned)(int)-1; 267 else 268 mtc->ReadyBlocks[bi] = True; 269 CriticalSection_Leave(&mtc->cs); 270 } 271 272 if (wi != bi) 273 { 274 if (res != SZ_OK || finished) 275 return 0; 276 continue; 277 } 278 279 if (mtc->writeRes != SZ_OK) 280 res = mtc->writeRes; 281 282 for (;;) 283 { 284 if (res == SZ_OK && bufIndex != (unsigned)(int)-1) 285 { 286 res = mtc->mtCallback->Write(mtc->mtCallbackObject, bufIndex); 287 if (res != SZ_OK) 288 { 289 mtc->writeRes = res; 290 MtProgress_SetError(&mtc->mtProgress, res); 291 } 292 } 293 294 if (++wi >= mtc->numBlocksMax) 295 wi = 0; 296 { 297 Bool isReady; 298 299 CriticalSection_Enter(&mtc->cs); 300 301 if (bufIndex != (unsigned)(int)-1) 302 { 303 mtc->freeBlockList[bufIndex] = mtc->freeBlockHead; 304 mtc->freeBlockHead = bufIndex; 305 } 306 307 isReady = mtc->ReadyBlocks[wi]; 308 309 if (isReady) 310 mtc->ReadyBlocks[wi] = False; 311 else 312 mtc->writeIndex = wi; 313 314 CriticalSection_Leave(&mtc->cs); 315 316 RINOK_THREAD(Semaphore_Release1(&mtc->blocksSemaphore)) 317 318 if (!isReady) 319 break; 320 } 321 322 { 323 CMtCoderBlock *block = &mtc->blocks[wi]; 324 if (res == SZ_OK && block->res != SZ_OK) 325 res = block->res; 326 bufIndex = block->bufIndex; 327 finished = block->finished; 328 } 329 } 330 } 331 #endif 332 333 if (finished || res != SZ_OK) 334 return 0; 335 } 336 } 337 338 339 static THREAD_FUNC_RET_TYPE THREAD_FUNC_CALL_TYPE ThreadFunc(void *pp) 340 { 341 CMtCoderThread *t = (CMtCoderThread *)pp; 342 for (;;) 343 { 344 if (Event_Wait(&t->startEvent) != 0) 345 return SZ_ERROR_THREAD; 346 if (t->stop) 347 return 0; 348 { 349 SRes res = ThreadFunc2(t); 350 CMtCoder *mtc = t->mtCoder; 351 if (res != SZ_OK) 352 { 353 MtProgress_SetError(&mtc->mtProgress, res); 354 } 355 356 #ifndef MTCODER__USE_WRITE_THREAD 357 { 358 unsigned numFinished = (unsigned)InterlockedIncrement(&mtc->numFinishedThreads); 359 if (numFinished == mtc->numStartedThreads) 360 if (Event_Set(&mtc->finishedEvent) != 0) 361 return SZ_ERROR_THREAD; 362 } 363 #endif 364 } 365 } 366 } 367 368 369 370 void MtCoder_Construct(CMtCoder *p) 371 { 372 unsigned i; 373 374 p->blockSize = 0; 375 p->numThreadsMax = 0; 376 p->expectedDataSize = (UInt64)(Int64)-1; 377 378 p->inStream = NULL; 379 p->inData = NULL; 380 p->inDataSize = 0; 381 382 p->progress = NULL; 383 p->allocBig = NULL; 384 385 p->mtCallback = NULL; 386 p->mtCallbackObject = NULL; 387 388 p->allocatedBufsSize = 0; 389 390 Event_Construct(&p->readEvent); 391 Semaphore_Construct(&p->blocksSemaphore); 392 393 for (i = 0; i < MTCODER__THREADS_MAX; i++) 394 { 395 CMtCoderThread *t = &p->threads[i]; 396 t->mtCoder = p; 397 t->index = i; 398 t->inBuf = NULL; 399 t->stop = False; 400 Event_Construct(&t->startEvent); 401 Thread_Construct(&t->thread); 402 } 403 404 #ifdef MTCODER__USE_WRITE_THREAD 405 for (i = 0; i < MTCODER__BLOCKS_MAX; i++) 406 Event_Construct(&p->writeEvents[i]); 407 #else 408 Event_Construct(&p->finishedEvent); 409 #endif 410 411 CriticalSection_Init(&p->cs); 412 CriticalSection_Init(&p->mtProgress.cs); 413 } 414 415 416 417 418 static void MtCoder_Free(CMtCoder *p) 419 { 420 unsigned i; 421 422 /* 423 p->stopReading = True; 424 if (Event_IsCreated(&p->readEvent)) 425 Event_Set(&p->readEvent); 426 */ 427 428 for (i = 0; i < MTCODER__THREADS_MAX; i++) 429 MtCoderThread_Destruct(&p->threads[i]); 430 431 Event_Close(&p->readEvent); 432 Semaphore_Close(&p->blocksSemaphore); 433 434 #ifdef MTCODER__USE_WRITE_THREAD 435 for (i = 0; i < MTCODER__BLOCKS_MAX; i++) 436 Event_Close(&p->writeEvents[i]); 437 #else 438 Event_Close(&p->finishedEvent); 439 #endif 440 } 441 442 443 void MtCoder_Destruct(CMtCoder *p) 444 { 445 MtCoder_Free(p); 446 447 CriticalSection_Delete(&p->cs); 448 CriticalSection_Delete(&p->mtProgress.cs); 449 } 450 451 452 SRes MtCoder_Code(CMtCoder *p) 453 { 454 unsigned numThreads = p->numThreadsMax; 455 unsigned numBlocksMax; 456 unsigned i; 457 SRes res = SZ_OK; 458 459 if (numThreads > MTCODER__THREADS_MAX) 460 numThreads = MTCODER__THREADS_MAX; 461 numBlocksMax = MTCODER__GET_NUM_BLOCKS_FROM_THREADS(numThreads); 462 463 if (p->blockSize < ((UInt32)1 << 26)) numBlocksMax++; 464 if (p->blockSize < ((UInt32)1 << 24)) numBlocksMax++; 465 if (p->blockSize < ((UInt32)1 << 22)) numBlocksMax++; 466 467 if (numBlocksMax > MTCODER__BLOCKS_MAX) 468 numBlocksMax = MTCODER__BLOCKS_MAX; 469 470 if (p->blockSize != p->allocatedBufsSize) 471 { 472 for (i = 0; i < MTCODER__THREADS_MAX; i++) 473 { 474 CMtCoderThread *t = &p->threads[i]; 475 if (t->inBuf) 476 { 477 ISzAlloc_Free(p->allocBig, t->inBuf); 478 t->inBuf = NULL; 479 } 480 } 481 p->allocatedBufsSize = p->blockSize; 482 } 483 484 p->readRes = SZ_OK; 485 486 MtProgress_Init(&p->mtProgress, p->progress); 487 488 #ifdef MTCODER__USE_WRITE_THREAD 489 for (i = 0; i < numBlocksMax; i++) 490 { 491 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->writeEvents[i])); 492 } 493 #else 494 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent)); 495 #endif 496 497 { 498 RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->readEvent)); 499 500 if (Semaphore_IsCreated(&p->blocksSemaphore)) 501 { 502 RINOK_THREAD(Semaphore_Close(&p->blocksSemaphore)); 503 } 504 RINOK_THREAD(Semaphore_Create(&p->blocksSemaphore, numBlocksMax, numBlocksMax)); 505 } 506 507 for (i = 0; i < MTCODER__BLOCKS_MAX - 1; i++) 508 p->freeBlockList[i] = i + 1; 509 p->freeBlockList[MTCODER__BLOCKS_MAX - 1] = (unsigned)(int)-1; 510 p->freeBlockHead = 0; 511 512 p->readProcessed = 0; 513 p->blockIndex = 0; 514 p->numBlocksMax = numBlocksMax; 515 p->stopReading = False; 516 517 #ifndef MTCODER__USE_WRITE_THREAD 518 p->writeIndex = 0; 519 p->writeRes = SZ_OK; 520 for (i = 0; i < MTCODER__BLOCKS_MAX; i++) 521 p->ReadyBlocks[i] = False; 522 p->numFinishedThreads = 0; 523 #endif 524 525 p->numStartedThreadsLimit = numThreads; 526 p->numStartedThreads = 0; 527 528 // for (i = 0; i < numThreads; i++) 529 { 530 CMtCoderThread *nextThread = &p->threads[p->numStartedThreads++]; 531 RINOK(MtCoderThread_CreateAndStart(nextThread)); 532 } 533 534 RINOK_THREAD(Event_Set(&p->readEvent)) 535 536 #ifdef MTCODER__USE_WRITE_THREAD 537 { 538 unsigned bi = 0; 539 540 for (;; bi++) 541 { 542 if (bi >= numBlocksMax) 543 bi = 0; 544 545 RINOK_THREAD(Event_Wait(&p->writeEvents[bi])) 546 547 { 548 const CMtCoderBlock *block = &p->blocks[bi]; 549 unsigned bufIndex = block->bufIndex; 550 Bool finished = block->finished; 551 if (res == SZ_OK && block->res != SZ_OK) 552 res = block->res; 553 554 if (bufIndex != (unsigned)(int)-1) 555 { 556 if (res == SZ_OK) 557 { 558 res = p->mtCallback->Write(p->mtCallbackObject, bufIndex); 559 if (res != SZ_OK) 560 MtProgress_SetError(&p->mtProgress, res); 561 } 562 563 CriticalSection_Enter(&p->cs); 564 { 565 p->freeBlockList[bufIndex] = p->freeBlockHead; 566 p->freeBlockHead = bufIndex; 567 } 568 CriticalSection_Leave(&p->cs); 569 } 570 571 RINOK_THREAD(Semaphore_Release1(&p->blocksSemaphore)) 572 573 if (finished) 574 break; 575 } 576 } 577 } 578 #else 579 { 580 WRes wres = Event_Wait(&p->finishedEvent); 581 res = MY_SRes_HRESULT_FROM_WRes(wres); 582 } 583 #endif 584 585 if (res == SZ_OK) 586 res = p->readRes; 587 588 if (res == SZ_OK) 589 res = p->mtProgress.res; 590 591 #ifndef MTCODER__USE_WRITE_THREAD 592 if (res == SZ_OK) 593 res = p->writeRes; 594 #endif 595 596 if (res != SZ_OK) 597 MtCoder_Free(p); 598 return res; 599 } 600 601 #endif