From cfc6427ef6ac45817e2463acddc897a46a1b4da2 Mon Sep 17 00:00:00 2001 From: Kimball Thurston Date: Mon, 23 Sep 2024 23:32:38 +1200 Subject: [PATCH] Add templated utility class to centralize common pattern All reading code has a (for threaded reads) need to maintain a set of contexts and re-use them while dispatching N threads. Create a class to do that and remove duplicate code. This also lowers the scope of when the contexts are alive to reduce memory bloat Signed-off-by: Kimball Thurston --- src/lib/IlmThread/CMakeLists.txt | 1 + src/lib/IlmThread/IlmThreadProcessGroup.h | 154 +++++++++++++++++++ src/lib/OpenEXR/ImfDeepScanLineInputFile.cpp | 113 +++++--------- src/lib/OpenEXR/ImfDeepTiledInputFile.cpp | 122 ++++++--------- src/lib/OpenEXR/ImfScanLineInputFile.cpp | 119 ++++++-------- src/lib/OpenEXR/ImfTiledInputFile.cpp | 116 ++++++-------- 6 files changed, 335 insertions(+), 290 deletions(-) create mode 100644 src/lib/IlmThread/IlmThreadProcessGroup.h diff --git a/src/lib/IlmThread/CMakeLists.txt b/src/lib/IlmThread/CMakeLists.txt index b87da8ac68..f2ebc95cc4 100644 --- a/src/lib/IlmThread/CMakeLists.txt +++ b/src/lib/IlmThread/CMakeLists.txt @@ -19,6 +19,7 @@ openexr_define_library(IlmThread IlmThreadMutex.h IlmThreadNamespace.h IlmThreadPool.h + IlmThreadProcessGroup.h IlmThreadSemaphore.h DEPENDENCIES OpenEXR::Config diff --git a/src/lib/IlmThread/IlmThreadProcessGroup.h b/src/lib/IlmThread/IlmThreadProcessGroup.h new file mode 100644 index 0000000000..8ef677a658 --- /dev/null +++ b/src/lib/IlmThread/IlmThreadProcessGroup.h @@ -0,0 +1,154 @@ +// +// SPDX-License-Identifier: BSD-3-Clause +// Copyright (c) Contributors to the OpenEXR Project. +// + +#ifndef INCLUDED_ILM_THREAD_PROCESS_GROUP_H +#define INCLUDED_ILM_THREAD_PROCESS_GROUP_H + +//----------------------------------------------------------------------------- +// +// Class ProcessGroup is a templated inline helper for constraining +// task contexts to a number of threads. It maintains a list of +// contexts and then can hand them out one at a time, waiting for a +// previous thread request to finish before handing out more, +// preventing over-subscription / allocation of contexts. +// +//----------------------------------------------------------------------------- + +#include "IlmThreadConfig.h" +#include "IlmThreadExport.h" +#include "IlmThreadNamespace.h" +#include "IlmThreadSemaphore.h" + +#include "Iex.h" + +#include +#include +#include +#include + +ILMTHREAD_INTERNAL_NAMESPACE_HEADER_ENTER + +template ::value && + std::is_same ::value, bool> = true> +class ProcessGroup +{ +public: + using Process = P; + + ProcessGroup (unsigned int numThreads) + : _sem (numThreads) + , _avail_head (nullptr) + , _first_failure (nullptr) + { + _fixed_pool.resize (numThreads); + for ( unsigned int i = 0; i < numThreads; ++i ) + { + if (i == (numThreads - 1)) + _fixed_pool[i].next = nullptr; + else + _fixed_pool[i].next = &(_fixed_pool[i+1]); + } + _avail_head = &(_fixed_pool[0]); + } + + ProcessGroup (const ProcessGroup&) = delete; + ProcessGroup& operator= (const ProcessGroup&) = delete; + ProcessGroup (ProcessGroup&&) = default; + ProcessGroup& operator= (ProcessGroup&&) = delete; + ~ProcessGroup() + { + std::string *cur = _first_failure.load (); + delete cur; + } + + void push (Process *p) + { + Process* oldhead = _avail_head.load (std::memory_order_relaxed); + + do + { + p->next = oldhead; + } while (!_avail_head.compare_exchange_weak ( + oldhead, p, + std::memory_order_release, + std::memory_order_relaxed)); + + // notify someone else there's one available + _sem.post (); + } + + // called by the thread dispatching work units, may block + Process* pop () + { + Process* ret = nullptr; + + // we do not have to worry about ABA problems as + // we have a static pool of items we own, we're just + // putting them here and popping them off. + + // used for honoring the numThreads, as pop + // should only be called by the one thread + // waiting to submit thread calls + _sem.wait (); + + ret = _avail_head.load (std::memory_order_acquire); + + Process* newhead; + do + { + if (!ret) + std::cerr << "GACK: serious failure case???" << std::endl; + + newhead = ret->next; + } while ( !_avail_head.compare_exchange_weak( + ret, newhead, std::memory_order_acquire)); + + return ret; + } + + void record_failure (const char *e) + { + // should we construct a list of failures if there are + // more than one? seems less confusing to just report + // the first we happened to record + + std::string *cur = _first_failure.load (); + if (!cur) + { + std::string *msg = new std::string (e); + if (! _first_failure.compare_exchange_strong (cur, msg)) + delete msg; + } + } + + void throw_on_failure () + { + std::string *cur = _first_failure.load (); + _first_failure.store (nullptr); + + if (cur) + { + std::string msg (*cur); + delete cur; + + throw IEX_NAMESPACE::IoExc (msg); + } + } +private: + Semaphore _sem; + + std::vector _fixed_pool; + + std::atomic _avail_head; + + std::atomic _first_failure; +}; + + +ILMTHREAD_INTERNAL_NAMESPACE_HEADER_EXIT + +#endif // INCLUDED_ILM_THREAD_POOL_H diff --git a/src/lib/OpenEXR/ImfDeepScanLineInputFile.cpp b/src/lib/OpenEXR/ImfDeepScanLineInputFile.cpp index 31ec5ad667..1487249622 100644 --- a/src/lib/OpenEXR/ImfDeepScanLineInputFile.cpp +++ b/src/lib/OpenEXR/ImfDeepScanLineInputFile.cpp @@ -16,14 +16,14 @@ #include "IlmThreadPool.h" #if ILMTHREAD_THREADING_ENABLED -# include "IlmThreadSemaphore.h" +# include "IlmThreadProcessGroup.h" +# include #endif #include "Iex.h" #include #include -#include #include #include @@ -86,9 +86,13 @@ struct ScanLineProcess exr_chunk_info_t cinfo; exr_decode_pipeline_t decoder; - std::shared_ptr next; + ScanLineProcess* next; }; +#if ILMTHREAD_THREADING_ENABLED +using ScanLineProcessGroup = ILMTHREAD_NAMESPACE::ProcessGroup; +#endif + } // empty namespace struct DeepScanLineInputFile::Data @@ -97,9 +101,6 @@ struct DeepScanLineInputFile::Data : _ctxt (ctxt) , partNumber (pN) , numThreads (nT) -#if ILMTHREAD_THREADING_ENABLED - , _sem ((unsigned int)nT) -#endif {} void initialize () @@ -133,42 +134,16 @@ struct DeepScanLineInputFile::Data DeepFrameBuffer frameBuffer; std::vector fill_list; - std::shared_ptr processStack; - std::shared_ptr getChunkProcess () - { - std::shared_ptr retval; -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - retval = processStack; - if (!retval) - retval = std::make_shared (); - processStack = retval->next; - retval->next.reset(); - return retval; - } - void putChunkProcess (std::shared_ptr sp) - { -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - sp->next = processStack; - processStack = sp; - } - #if ILMTHREAD_THREADING_ENABLED std::mutex _mx; - ILMTHREAD_NAMESPACE::Semaphore _sem; - std::vector _failures; -#endif -#if ILMTHREAD_THREADING_ENABLED class LineBufferTask final : public ILMTHREAD_NAMESPACE::Task { public: LineBufferTask ( ILMTHREAD_NAMESPACE::TaskGroup* group, Data* ifd, + ScanLineProcessGroup* lineg, const DeepFrameBuffer* outfb, const exr_chunk_info_t& cinfo, int fby, @@ -179,7 +154,8 @@ struct DeepScanLineInputFile::Data , _ifd (ifd) , _fby (fby) , _last_fby (endScan) - , _line (ifd->getChunkProcess ()) + , _line (lineg->pop ()) + , _line_group (lineg) { _line->cinfo = cinfo; _line->counts_only = countsOnly; @@ -187,9 +163,7 @@ struct DeepScanLineInputFile::Data ~LineBufferTask () override { - if (_line) - _ifd->putChunkProcess (std::move (_line)); - _ifd->_sem.post (); + _line_group->push (_line); } void execute () override; @@ -201,8 +175,8 @@ struct DeepScanLineInputFile::Data Data* _ifd; int _fby; int _last_fby; - - std::shared_ptr _line; + ScanLineProcess* _line; + ScanLineProcessGroup* _line_group; }; #endif }; @@ -301,7 +275,6 @@ DeepScanLineInputFile::setFrameBuffer (const DeepFrameBuffer& frameBuffer) _data->prepFillList (frameBuffer, _data->fill_list); _data->frameBuffer = frameBuffer; _data->frameBufferValid = true; - _data->processStack.reset(); } const DeepFrameBuffer& @@ -539,29 +512,36 @@ DeepScanLineInputFile::Data::readData ( if (nchunks > 1 && numThreads > 1) { - ILMTHREAD_NAMESPACE::TaskGroup tg; + // we need the lifetime of this to last longer than the + // lifetime of the task group below such that we don't get use + // after free type error, so use scope rules to accomplish + // this + ScanLineProcessGroup sg (numThreads); - for (int y = scanLine1; y <= scanLine2; ) { - if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo)) - throw IEX_NAMESPACE::InputExc ("Unable to query scanline information"); + ILMTHREAD_NAMESPACE::TaskGroup tg; - // used for honoring the numThreads - _sem.wait (); + for (int y = scanLine1; y <= scanLine2; ) + { + if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo)) + throw IEX_NAMESPACE::InputExc ("Unable to query scanline information"); - ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( - new LineBufferTask (&tg, this, &fb, cinfo, y, scanLine2, countsOnly) ); + ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( + new LineBufferTask (&tg, this, &sg, &fb, cinfo, y, scanLine2, countsOnly) ); - y += scansperchunk - (y - cinfo.start_y); + y += scansperchunk - (y - cinfo.start_y); + } } + + sg.throw_on_failure (); } else #endif { - auto sp = getChunkProcess (); - bool redo = sp->first || sp->counts_only != countsOnly; + ScanLineProcess sp; + bool redo = true; - sp->counts_only = countsOnly; + sp.counts_only = countsOnly; for (int y = scanLine1; y <= scanLine2; ) { if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo)) @@ -571,10 +551,10 @@ DeepScanLineInputFile::Data::readData ( // re-run the unpack (i.e. people reading 1 scan at a time // in a multi-scanline chunk) if (!redo && - sp->cinfo.idx == cinfo.idx && - sp->last_decode_err == EXR_ERR_SUCCESS) + sp.cinfo.idx == cinfo.idx && + sp.last_decode_err == EXR_ERR_SUCCESS) { - sp->run_unpack ( + sp.run_unpack ( *_ctxt, partNumber, &fb, @@ -584,8 +564,8 @@ DeepScanLineInputFile::Data::readData ( } else { - sp->cinfo = cinfo; - sp->run_decode ( + sp.cinfo = cinfo; + sp.run_decode ( *_ctxt, partNumber, &fb, @@ -597,19 +577,7 @@ DeepScanLineInputFile::Data::readData ( y += scansperchunk - (y - cinfo.start_y); } - - putChunkProcess (std::move(sp)); - } - -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lock (_mx); - if (! _failures.empty()) - { - std::string fail = _failures[0]; - _failures.clear (); - throw IEX_NAMESPACE::IoExc (fail); } -#endif } //////////////////////////////////////// @@ -727,8 +695,11 @@ void DeepScanLineInputFile::Data::LineBufferTask::execute () } catch (std::exception &e) { - std::lock_guard lock (_ifd->_mx); - _ifd->_failures.emplace_back (std::string (e.what())); + _line_group->record_failure (e.what ()); + } + catch (...) + { + _line_group->record_failure ("Unknown exception"); } } #endif diff --git a/src/lib/OpenEXR/ImfDeepTiledInputFile.cpp b/src/lib/OpenEXR/ImfDeepTiledInputFile.cpp index 90de89ca75..3ce47b98eb 100644 --- a/src/lib/OpenEXR/ImfDeepTiledInputFile.cpp +++ b/src/lib/OpenEXR/ImfDeepTiledInputFile.cpp @@ -15,7 +15,8 @@ #include "IlmThreadPool.h" #if ILMTHREAD_THREADING_ENABLED -# include "IlmThreadSemaphore.h" +# include "IlmThreadProcessGroup.h" +# include #endif #include "ImfDeepFrameBuffer.h" @@ -26,7 +27,6 @@ #include "ImfTiledMisc.h" #include -#include #include #include @@ -71,9 +71,13 @@ struct TileProcess exr_chunk_info_t cinfo; exr_decode_pipeline_t decoder; - std::shared_ptr next; + TileProcess* next; }; +#if ILMTHREAD_THREADING_ENABLED +using TileProcessGroup = ILMTHREAD_NAMESPACE::ProcessGroup; +#endif + } // empty namespace // @@ -87,9 +91,6 @@ struct DeepTiledInputFile::Data : _ctxt (ctxt) , partNumber (pN) , numThreads (nT) -#if ILMTHREAD_THREADING_ENABLED - , _sem ((unsigned int)nT) -#endif {} void initialize () @@ -136,49 +137,23 @@ struct DeepTiledInputFile::Data std::vector fill_list; #if ILMTHREAD_THREADING_ENABLED - std::vector _failures; - std::mutex _mx; - ILMTHREAD_NAMESPACE::Semaphore _sem; -#endif - - std::shared_ptr processStack; - std::shared_ptr getChunkProcess () - { - std::shared_ptr retval; -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - retval = processStack; - if (!retval) - retval = std::make_shared (); - processStack = retval->next; - retval->next.reset(); - return retval; - } - void putChunkProcess (std::shared_ptr tp) - { -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - tp->next = processStack; - processStack = tp; - } -#if ILMTHREAD_THREADING_ENABLED class TileBufferTask final : public ILMTHREAD_NAMESPACE::Task { public: TileBufferTask ( ILMTHREAD_NAMESPACE::TaskGroup* group, Data* ifd, + TileProcessGroup* tileg, const DeepFrameBuffer* outfb, const exr_chunk_info_t& cinfo, bool countsOnly) : Task (group) , _outfb (outfb) , _ifd (ifd) - , _tile (ifd->getChunkProcess ()) + , _tile (tileg->pop ()) + , _tile_group (tileg) { _tile->cinfo = cinfo; _tile->counts_only = countsOnly; @@ -186,9 +161,7 @@ struct DeepTiledInputFile::Data ~TileBufferTask () override { - if (_tile) - _ifd->putChunkProcess (std::move (_tile)); - _ifd->_sem.post (); + _tile_group->push (_tile); } void execute () override; @@ -199,7 +172,8 @@ struct DeepTiledInputFile::Data const DeepFrameBuffer* _outfb; Data* _ifd; - std::shared_ptr _tile; + TileProcess* _tile; + TileProcessGroup* _tile_group; }; #endif }; @@ -307,7 +281,6 @@ DeepTiledInputFile::setFrameBuffer (const DeepFrameBuffer& frameBuffer) _data->frameBuffer = frameBuffer; _data->frameBufferValid = true; - _data->processStack.reset(); } const DeepFrameBuffer& @@ -924,38 +897,45 @@ void DeepTiledInputFile::Data::readTiles ( #if ILMTHREAD_THREADING_ENABLED if (nTiles > 1 && numThreads > 1) { - ILMTHREAD_NAMESPACE::TaskGroup tg; + // we need the lifetime of this to last longer than the + // lifetime of the task group below such that we don't get use + // after free type error, so use scope rules to accomplish + // this + TileProcessGroup tpg (numThreads); - for (int ty = dy1; ty <= dy2; ++ty) { - for (int tx = dx1; tx <= dx2; ++tx) + ILMTHREAD_NAMESPACE::TaskGroup tg; + + for (int ty = dy1; ty <= dy2; ++ty) { - exr_result_t rv = exr_read_tile_chunk_info ( - *_ctxt, partNumber, tx, ty, lx, ly, &cinfo); - if (EXR_ERR_INCOMPLETE_CHUNK_TABLE == rv) + for (int tx = dx1; tx <= dx2; ++tx) { - THROW ( - IEX_NAMESPACE::InputExc, - "Tile (" << tx << ", " << ty << ", " << lx << ", " << ly - << ") is missing."); - } - else if (EXR_ERR_SUCCESS != rv) - throw IEX_NAMESPACE::InputExc ("Unable to query tile information"); - - // used for honoring the numThreads - _sem.wait (); + exr_result_t rv = exr_read_tile_chunk_info ( + *_ctxt, partNumber, tx, ty, lx, ly, &cinfo); + if (EXR_ERR_INCOMPLETE_CHUNK_TABLE == rv) + { + THROW ( + IEX_NAMESPACE::InputExc, + "Tile (" << tx << ", " << ty << ", " << lx << ", " << ly + << ") is missing."); + } + else if (EXR_ERR_SUCCESS != rv) + throw IEX_NAMESPACE::InputExc ("Unable to query tile information"); - ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( - new TileBufferTask (&tg, this, &frameBuffer, cinfo, countsOnly) ); + ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( + new TileBufferTask (&tg, this, &tpg, &frameBuffer, cinfo, countsOnly) ); + } } } + + tpg.throw_on_failure (); } else #endif { - auto tp = getChunkProcess (); + TileProcess tp; - tp->counts_only = countsOnly; + tp.counts_only = countsOnly; for (int ty = dy1; ty <= dy2; ++ty) { for (int tx = dx1; tx <= dx2; ++tx) @@ -972,26 +952,15 @@ void DeepTiledInputFile::Data::readTiles ( else if (EXR_ERR_SUCCESS != rv) throw IEX_NAMESPACE::InputExc ("Unable to query tile information"); - tp->cinfo = cinfo; - tp->run_decode ( + tp.cinfo = cinfo; + tp.run_decode ( *_ctxt, partNumber, &frameBuffer, fill_list); } } - - putChunkProcess (std::move(tp)); - } - -#if ILMTHREAD_THREADING_ENABLED - if (! _failures.empty()) - { - std::string fail = _failures[0]; - _failures.clear (); - throw IEX_NAMESPACE::IoExc (fail); } -#endif } //////////////////////////////////////// @@ -1009,8 +978,11 @@ void DeepTiledInputFile::Data::TileBufferTask::execute () } catch (std::exception &e) { - std::lock_guard lock (_ifd->_mx); - _ifd->_failures.emplace_back (std::string (e.what())); + _tile_group->record_failure (e.what ()); + } + catch (...) + { + _tile_group->record_failure ("Unknown exception"); } } #endif diff --git a/src/lib/OpenEXR/ImfScanLineInputFile.cpp b/src/lib/OpenEXR/ImfScanLineInputFile.cpp index f7eb144701..e0db6de0b9 100644 --- a/src/lib/OpenEXR/ImfScanLineInputFile.cpp +++ b/src/lib/OpenEXR/ImfScanLineInputFile.cpp @@ -15,13 +15,13 @@ #include "IlmThreadPool.h" #if ILMTHREAD_THREADING_ENABLED -# include "IlmThreadSemaphore.h" +# include "IlmThreadProcessGroup.h" +# include #endif #include "ImfFrameBuffer.h" #include "ImfInputPartData.h" -#include #include OPENEXR_IMF_INTERNAL_NAMESPACE_SOURCE_ENTER @@ -67,9 +67,14 @@ struct ScanLineProcess exr_chunk_info_t cinfo; exr_decode_pipeline_t decoder; - std::shared_ptr next; + // requirement to use process group + ScanLineProcess* next; }; +#if ILMTHREAD_THREADING_ENABLED +using ScanLineProcessGroup = ILMTHREAD_NAMESPACE::ProcessGroup; +#endif + } // empty namespace struct ScanLineInputFile::Data @@ -78,9 +83,6 @@ struct ScanLineInputFile::Data : _ctxt (ctxt) , partNumber (pN) , numThreads (nT) -#if ILMTHREAD_THREADING_ENABLED - , _sem ((unsigned int)nT) -#endif {} void initialize () @@ -105,41 +107,14 @@ struct ScanLineInputFile::Data #if ILMTHREAD_THREADING_ENABLED std::mutex _mx; - ILMTHREAD_NAMESPACE::Semaphore _sem; - std::vector _failures; -#endif - - std::shared_ptr processStack; - std::shared_ptr getChunkProcess () - { - std::shared_ptr retval; -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - retval = processStack; - if (!retval) - retval = std::make_shared (); - processStack = retval->next; - retval->next.reset(); - return retval; - } - void putChunkProcess (std::shared_ptr sp) - { -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - sp->next = processStack; - processStack = std::move (sp); - } - -#if ILMTHREAD_THREADING_ENABLED class LineBufferTask final : public ILMTHREAD_NAMESPACE::Task { public: LineBufferTask ( ILMTHREAD_NAMESPACE::TaskGroup* group, Data* ifd, + ScanLineProcessGroup* lineg, const FrameBuffer* outfb, const exr_chunk_info_t& cinfo, int fby, @@ -149,16 +124,15 @@ struct ScanLineInputFile::Data , _ifd (ifd) , _fby (fby) , _last_fby (endScan) - , _line (ifd->getChunkProcess ()) + , _line (lineg->pop ()) + , _line_group (lineg) { _line->cinfo = cinfo; } ~LineBufferTask () override { - if (_line) - _ifd->putChunkProcess (std::move (_line)); - _ifd->_sem.post (); + _line_group->push (_line); } void execute () override; @@ -166,12 +140,12 @@ struct ScanLineInputFile::Data private: void run_decode (); - const FrameBuffer* _outfb; - Data* _ifd; - int _fby; - int _last_fby; - - std::shared_ptr _line; + const FrameBuffer* _outfb; + Data* _ifd; + int _fby; + int _last_fby; + ScanLineProcess* _line; + ScanLineProcessGroup* _line_group; }; #endif }; @@ -279,7 +253,6 @@ ScanLineInputFile::setFrameBuffer (const FrameBuffer& frameBuffer) } _data->frameBuffer = frameBuffer; - _data->processStack.reset(); } const FrameBuffer& @@ -445,26 +418,33 @@ void ScanLineInputFile::Data::readPixels ( if (nchunks > 1 && numThreads > 1) { - ILMTHREAD_NAMESPACE::TaskGroup tg; + // we need the lifetime of this to last longer than the + // lifetime of the task group below such that we don't get use + // after free type error, so use scope rules to accomplish + // this + ScanLineProcessGroup sg (numThreads); - for (int y = scanLine1; y <= scanLine2; ) { - if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo)) - throw IEX_NAMESPACE::InputExc ("Unable to query scanline information"); + ILMTHREAD_NAMESPACE::TaskGroup tg; - // used for honoring the numThreads - _sem.wait (); + for (int y = scanLine1; y <= scanLine2; ) + { + if (EXR_ERR_SUCCESS != exr_read_scanline_chunk_info (*_ctxt, partNumber, y, &cinfo)) + throw IEX_NAMESPACE::InputExc ("Unable to query scanline information"); - ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( - new LineBufferTask (&tg, this, &fb, cinfo, y, scanLine2) ); + ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( + new LineBufferTask (&tg, this, &sg, &fb, cinfo, y, scanLine2) ); - y += scansperchunk - (y - cinfo.start_y); + y += scansperchunk - (y - cinfo.start_y); + } } + + sg.throw_on_failure (); } else #endif { - auto sp = getChunkProcess (); + ScanLineProcess sp; for (int y = scanLine1; y <= scanLine2; ) { @@ -474,10 +454,10 @@ void ScanLineInputFile::Data::readPixels ( // check if we have the same chunk where we can just // re-run the unpack (i.e. people reading 1 scan at a time // in a multi-scanline chunk) - if (!sp->first && sp->cinfo.idx == cinfo.idx && - sp->last_decode_err == EXR_ERR_SUCCESS) + if (!sp.first && sp.cinfo.idx == cinfo.idx && + sp.last_decode_err == EXR_ERR_SUCCESS) { - sp->run_unpack ( + sp.run_unpack ( *_ctxt, partNumber, &fb, @@ -487,8 +467,8 @@ void ScanLineInputFile::Data::readPixels ( } else { - sp->cinfo = cinfo; - sp->run_decode ( + sp.cinfo = cinfo; + sp.run_decode ( *_ctxt, partNumber, &fb, @@ -499,19 +479,7 @@ void ScanLineInputFile::Data::readPixels ( y += scansperchunk - (y - cinfo.start_y); } - - putChunkProcess (std::move(sp)); - } - -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lock (_mx); - if (! _failures.empty()) - { - std::string fail = _failures[0]; - _failures.clear (); - throw IEX_NAMESPACE::IoExc (fail); } -#endif } //////////////////////////////////////// @@ -531,8 +499,11 @@ void ScanLineInputFile::Data::LineBufferTask::execute () } catch (std::exception &e) { - std::lock_guard lock (_ifd->_mx); - _ifd->_failures.emplace_back (std::string (e.what())); + _line_group->record_failure (e.what ()); + } + catch (...) + { + _line_group->record_failure ("Unknown exception"); } } #endif diff --git a/src/lib/OpenEXR/ImfTiledInputFile.cpp b/src/lib/OpenEXR/ImfTiledInputFile.cpp index d37115312d..7bf4c5ebe1 100644 --- a/src/lib/OpenEXR/ImfTiledInputFile.cpp +++ b/src/lib/OpenEXR/ImfTiledInputFile.cpp @@ -15,7 +15,8 @@ #include "IlmThreadPool.h" #if ILMTHREAD_THREADING_ENABLED -# include "IlmThreadSemaphore.h" +# include "IlmThreadProcessGroup.h" +# include #endif #include "ImfFrameBuffer.h" @@ -26,7 +27,6 @@ #include "ImfTiledMisc.h" #include -#include #include OPENEXR_IMF_INTERNAL_NAMESPACE_SOURCE_ENTER @@ -62,9 +62,13 @@ struct TileProcess exr_chunk_info_t cinfo; exr_decode_pipeline_t decoder; - std::shared_ptr next; + TileProcess* next; }; +#if ILMTHREAD_THREADING_ENABLED +using TileProcessGroup = ILMTHREAD_NAMESPACE::ProcessGroup; +#endif + } // empty namespace // @@ -78,9 +82,6 @@ struct TiledInputFile::Data : _ctxt (ctxt) , partNumber (pN) , numThreads (nT) -#if ILMTHREAD_THREADING_ENABLED - , _sem ((unsigned int)nT) -#endif {} void initialize () @@ -131,54 +132,28 @@ struct TiledInputFile::Data #if ILMTHREAD_THREADING_ENABLED std::mutex _mx; - ILMTHREAD_NAMESPACE::Semaphore _sem; -#endif - - std::shared_ptr processStack; - std::shared_ptr getChunkProcess () - { - std::shared_ptr retval; -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - retval = processStack; - if (!retval) - retval = std::make_shared (); - processStack = retval->next; - retval->next.reset(); - return retval; - } - void putChunkProcess (std::shared_ptr tp) - { -#if ILMTHREAD_THREADING_ENABLED - std::lock_guard lk (_mx); -#endif - tp->next = processStack; - processStack = tp; - } -#if ILMTHREAD_THREADING_ENABLED class TileBufferTask final : public ILMTHREAD_NAMESPACE::Task { public: TileBufferTask ( ILMTHREAD_NAMESPACE::TaskGroup* group, Data* ifd, + TileProcessGroup* tileg, const FrameBuffer* outfb, const exr_chunk_info_t& cinfo) : Task (group) , _outfb (outfb) , _ifd (ifd) - , _tile (ifd->getChunkProcess ()) + , _tile (tileg->pop ()) + , _tile_group (tileg) { _tile->cinfo = cinfo; } ~TileBufferTask () override { - if (_tile) - _ifd->putChunkProcess (std::move (_tile)); - _ifd->_sem.post (); + _tile_group->push (_tile); } void execute () override; @@ -189,7 +164,8 @@ struct TiledInputFile::Data const FrameBuffer* _outfb; Data* _ifd; - std::shared_ptr _tile; + TileProcess* _tile; + TileProcessGroup* _tile_group; }; #endif }; @@ -296,7 +272,6 @@ TiledInputFile::setFrameBuffer (const FrameBuffer& frameBuffer) } _data->frameBuffer = frameBuffer; - _data->processStack.reset(); } const FrameBuffer& @@ -792,36 +767,43 @@ void TiledInputFile::Data::readTiles (int dx1, int dx2, int dy1, int dy2, int lx #if ILMTHREAD_THREADING_ENABLED if (nTiles > 1 && numThreads > 1) { - ILMTHREAD_NAMESPACE::TaskGroup tg; + // we need the lifetime of this to last longer than the + // lifetime of the task group below such that we don't get use + // after free type error, so use scope rules to accomplish + // this + TileProcessGroup tpg (numThreads); - for (int ty = dy1; ty <= dy2; ++ty) { - for (int tx = dx1; tx <= dx2; ++tx) + ILMTHREAD_NAMESPACE::TaskGroup tg; + + for (int ty = dy1; ty <= dy2; ++ty) { - exr_result_t rv = exr_read_tile_chunk_info ( - *_ctxt, partNumber, tx, ty, lx, ly, &cinfo); - if (EXR_ERR_INCOMPLETE_CHUNK_TABLE == rv) + for (int tx = dx1; tx <= dx2; ++tx) { - THROW ( - IEX_NAMESPACE::InputExc, - "Tile (" << tx << ", " << ty << ", " << lx << ", " << ly - << ") is missing."); - } - else if (EXR_ERR_SUCCESS != rv) - throw IEX_NAMESPACE::InputExc ("Unable to query tile information"); - - // used for honoring the numThreads - _sem.wait (); + exr_result_t rv = exr_read_tile_chunk_info ( + *_ctxt, partNumber, tx, ty, lx, ly, &cinfo); + if (EXR_ERR_INCOMPLETE_CHUNK_TABLE == rv) + { + THROW ( + IEX_NAMESPACE::InputExc, + "Tile (" << tx << ", " << ty << ", " << lx << ", " << ly + << ") is missing."); + } + else if (EXR_ERR_SUCCESS != rv) + throw IEX_NAMESPACE::InputExc ("Unable to query tile information"); - ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( - new TileBufferTask (&tg, this, &frameBuffer, cinfo) ); + ILMTHREAD_NAMESPACE::ThreadPool::addGlobalTask ( + new TileBufferTask (&tg, this, &tpg, &frameBuffer, cinfo) ); + } } } + + tpg.throw_on_failure (); } else #endif { - auto tp = getChunkProcess (); + TileProcess tp; for (int ty = dy1; ty <= dy2; ++ty) { @@ -839,23 +821,14 @@ void TiledInputFile::Data::readTiles (int dx1, int dx2, int dy1, int dy2, int lx else if (EXR_ERR_SUCCESS != rv) throw IEX_NAMESPACE::InputExc ("Unable to query tile information"); - tp->cinfo = cinfo; - tp->run_decode ( + tp.cinfo = cinfo; + tp.run_decode ( *_ctxt, partNumber, &frameBuffer, fill_list); } } - - putChunkProcess (std::move(tp)); - } - - if (! _failures.empty()) - { - std::string fail = _failures[0]; - _failures.clear (); - throw IEX_NAMESPACE::IoExc (fail); } } @@ -874,8 +847,11 @@ void TiledInputFile::Data::TileBufferTask::execute () } catch (std::exception &e) { - std::lock_guard lock (_ifd->_mx); - _ifd->_failures.emplace_back (std::string (e.what())); + _tile_group->record_failure (e.what ()); + } + catch (...) + { + _tile_group->record_failure ("Unknown exception"); } } #endif