diff --git a/EventFilter/Utilities/interface/DAQSource.h b/EventFilter/Utilities/interface/DAQSource.h index b3200bf77434b..10f9ae2f7264e 100644 --- a/EventFilter/Utilities/interface/DAQSource.h +++ b/EventFilter/Utilities/interface/DAQSource.h @@ -21,8 +21,8 @@ #include "EventFilter/Utilities/interface/EvFDaqDirector.h" //import InputChunk -#include "EventFilter/Utilities/interface/FedRawDataInputSource.h" #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h" +#include "EventFilter/Utilities/interface/SourceRawFile.h" class FEDRawDataCollection; class InputSourceDescription; @@ -182,73 +182,4 @@ class DAQSource : public edm::RawInputSource { std::shared_ptr dataMode_; }; -//used by some models that use FEDRawDataCollection -class UnpackedRawEventWrapper { -public: - UnpackedRawEventWrapper() {} - ~UnpackedRawEventWrapper() {} - void setError(std::string msg) { - errmsg_ = msg; - error_ = true; - } - void setChecksumError(std::string msg) { - errmsg_ = msg; - checksumError_ = true; - } - void setRawData(FEDRawDataCollection* rawData) { rawData_.reset(rawData); } - void setAux(edm::EventAuxiliary* aux) { aux_.reset(aux); } - void setRun(uint32_t run) { run_ = run; } - FEDRawDataCollection* rawData() { return rawData_.get(); } - std::unique_ptr& rawDataRef() { return rawData_; } - edm::EventAuxiliary* aux() { return aux_.get(); } - uint32_t run() const { return run_; } - bool checksumError() const { return checksumError_; } - bool error() const { return error_; } - std::string const& errmsg() { return errmsg_; } - -private: - std::unique_ptr rawData_; - std::unique_ptr aux_; - uint32_t run_; - bool checksumError_ = false; - bool error_ = false; - std::string errmsg_; -}; - -class RawInputFile : public InputFile { -public: - RawInputFile(evf::EvFDaqDirector::FileStatus status, - unsigned int lumi = 0, - std::string const& name = std::string(), - bool deleteFile = true, - int rawFd = -1, - uint64_t fileSize = 0, - uint16_t rawHeaderSize = 0, - uint32_t nChunks = 0, - int nEvents = 0, - DAQSource* parent = nullptr) - : InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr), - sourceParent_(parent) {} - bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size); - void advance(const size_t size) { - chunkPosition_ += size; - bufferPosition_ += size; - } - void queue(UnpackedRawEventWrapper* ec) { - if (!frdcQueue_.get()) - frdcQueue_.reset(new std::queue>()); - std::unique_ptr uptr(ec); - frdcQueue_->push(std::move(uptr)); - } - void popQueue(std::unique_ptr& uptr) { - uptr = std::move(frdcQueue_->front()); - frdcQueue_->pop(); - } - -private: - DAQSource* sourceParent_; - //optional unpacked raw data queue (currently here because DAQSource controls lifetime of the RawInputfile) - std::unique_ptr>> frdcQueue_; -}; - #endif // EventFilter_Utilities_DAQSource_h diff --git a/EventFilter/Utilities/interface/DAQSourceModels.h b/EventFilter/Utilities/interface/DAQSourceModels.h index 0d90dcf7160a9..c17509b3ae1c1 100644 --- a/EventFilter/Utilities/interface/DAQSourceModels.h +++ b/EventFilter/Utilities/interface/DAQSourceModels.h @@ -24,8 +24,10 @@ #include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h" //import InputChunk -#include "EventFilter/Utilities/interface/FedRawDataInputSource.h" +#include "EventFilter/Utilities/interface/SourceRawFile.h" +class RawInputFile; +class UnpackedRawEventWrapper; class DAQSource; //evf? diff --git a/EventFilter/Utilities/interface/EvFDaqDirector.h b/EventFilter/Utilities/interface/EvFDaqDirector.h index dfaa7443d6d03..4631fdacda585 100644 --- a/EventFilter/Utilities/interface/EvFDaqDirector.h +++ b/EventFilter/Utilities/interface/EvFDaqDirector.h @@ -33,9 +33,6 @@ class SystemBounds; class GlobalContext; class StreamID; -class InputFile; -struct InputChunk; - namespace edm { class PathsAndConsumesOfModulesBase; class ProcessContext; diff --git a/EventFilter/Utilities/interface/FedRawDataInputSource.h b/EventFilter/Utilities/interface/FedRawDataInputSource.h index d989ef731514a..295bc4647a87f 100644 --- a/EventFilter/Utilities/interface/FedRawDataInputSource.h +++ b/EventFilter/Utilities/interface/FedRawDataInputSource.h @@ -186,148 +186,6 @@ class FedRawDataInputSource : public edm::RawInputSource { std::mutex monlock_; }; -struct InputChunk { - unsigned char* buf_; - InputChunk* next_ = nullptr; - uint64_t size_; - uint64_t usedSize_ = 0; - //unsigned int index_; - uint64_t offset_; - unsigned int fileIndex_; - std::atomic readComplete_; - - InputChunk(uint64_t size) : size_(size) { - buf_ = new unsigned char[size_]; - reset(0, 0, 0); - } - void reset(uint64_t newOffset, uint64_t toRead, unsigned int fileIndex) { - offset_ = newOffset; - usedSize_ = toRead; - fileIndex_ = fileIndex; - readComplete_ = false; - } - - bool resize(uint64_t wantedSize, uint64_t maxSize) { - if (wantedSize > maxSize) - return false; - if (size_ < wantedSize) { - size_ = uint64_t(wantedSize * 1.05); - delete[] buf_; - buf_ = new unsigned char[size_]; - } - return true; - } - - ~InputChunk() { delete[] buf_; } -}; - -class InputFile { -public: - FedRawDataInputSource* parent_; - evf::EvFDaqDirector::FileStatus status_; - unsigned int lumi_; - std::string fileName_; - //used by DAQSource - std::vector fileNames_; - std::vector diskFileSizes_; - std::vector bufferOffsets_; - std::vector fileSizes_; - std::vector fileOrder_; - bool deleteFile_; - int rawFd_; - uint64_t fileSize_; - uint16_t rawHeaderSize_; - uint16_t nChunks_; - uint16_t numFiles_; - int nEvents_; - unsigned int nProcessed_; - - tbb::concurrent_vector chunks_; - - uint32_t bufferPosition_ = 0; - uint32_t chunkPosition_ = 0; - unsigned int currentChunk_ = 0; - - InputFile(evf::EvFDaqDirector::FileStatus status, - unsigned int lumi = 0, - std::string const& name = std::string(), - bool deleteFile = true, - int rawFd = -1, - uint64_t fileSize = 0, - uint16_t rawHeaderSize = 0, - uint16_t nChunks = 0, - int nEvents = 0, - FedRawDataInputSource* parent = nullptr) - : parent_(parent), - status_(status), - lumi_(lumi), - fileName_(name), - deleteFile_(deleteFile), - rawFd_(rawFd), - fileSize_(fileSize), - rawHeaderSize_(rawHeaderSize), - nChunks_(nChunks), - numFiles_(1), - nEvents_(nEvents), - nProcessed_(0) { - fileNames_.push_back(name); - fileOrder_.push_back(fileOrder_.size()); - diskFileSizes_.push_back(fileSize); - fileSizes_.push_back(0); - bufferOffsets_.push_back(0); - chunks_.reserve(nChunks_); - for (unsigned int i = 0; i < nChunks; i++) - chunks_.push_back(nullptr); - } - virtual ~InputFile(); - - void setChunks(uint16_t nChunks) { - nChunks_ = nChunks; - chunks_.clear(); - chunks_.reserve(nChunks_); - for (unsigned int i = 0; i < nChunks_; i++) - chunks_.push_back(nullptr); - } - - void appendFile(std::string const& name, uint64_t size) { - size_t prevOffset = bufferOffsets_.back(); - size_t prevSize = diskFileSizes_.back(); - numFiles_++; - fileNames_.push_back(name); - fileOrder_.push_back(fileOrder_.size()); - diskFileSizes_.push_back(size); - fileSizes_.push_back(0); - bufferOffsets_.push_back(prevOffset + prevSize); - } - - bool waitForChunk(unsigned int chunkid) { - //some atomics to make sure everything is cache synchronized for the main thread - return chunks_[chunkid] != nullptr && chunks_[chunkid]->readComplete_; - } - bool advance(std::mutex& m, std::condition_variable& cv, unsigned char*& dataPosition, const size_t size); - bool advanceSimple(unsigned char*& dataPosition, const size_t size) { - size_t currentLeft = chunks_[currentChunk_]->size_ - chunkPosition_; - if (currentLeft < size) - return true; - dataPosition = chunks_[currentChunk_]->buf_ + chunkPosition_; - chunkPosition_ += size; - bufferPosition_ += size; - return false; - } - void resetPos() { - chunkPosition_ = 0; - bufferPosition_ = 0; - } - void moveToPreviousChunk(const size_t size, const size_t offset); - void rewindChunk(const size_t size); - void unsetDeleteFile() { deleteFile_ = false; } - void randomizeOrder(std::default_random_engine& rng) { - std::shuffle(std::begin(fileOrder_), std::end(fileOrder_), rng); - } - uint64_t currentChunkSize() const { return chunks_[currentChunk_]->size_; } - int64_t fileSizeLeft() const { return (int64_t)fileSize_ - (int64_t)bufferPosition_; } -}; - #endif // EventFilter_Utilities_FedRawDataInputSource_h /// emacs configuration diff --git a/EventFilter/Utilities/interface/SourceRawFile.h b/EventFilter/Utilities/interface/SourceRawFile.h index 83b57b1467ba5..df87ef79f0fbf 100644 --- a/EventFilter/Utilities/interface/SourceRawFile.h +++ b/EventFilter/Utilities/interface/SourceRawFile.h @@ -46,8 +46,6 @@ class UnpackedRawEventWrapper { std::string errmsg_; }; - - struct InputChunk { unsigned char* buf_; InputChunk* next_ = nullptr; diff --git a/EventFilter/Utilities/src/DAQSource.cc b/EventFilter/Utilities/src/DAQSource.cc index a2574296ac47f..07c062d7b3375 100644 --- a/EventFilter/Utilities/src/DAQSource.cc +++ b/EventFilter/Utilities/src/DAQSource.cc @@ -5,6 +5,7 @@ #include #include +#include "EventFilter/Utilities/interface/SourceRawFile.h" #include "EventFilter/Utilities/interface/DAQSource.h" #include "EventFilter/Utilities/interface/DAQSourceModels.h" #include "EventFilter/Utilities/interface/DAQSourceModelsFRD.h" diff --git a/EventFilter/Utilities/src/FedRawDataInputSource.cc b/EventFilter/Utilities/src/FedRawDataInputSource.cc index 6e98286377829..4ccf58f9990a0 100644 --- a/EventFilter/Utilities/src/FedRawDataInputSource.cc +++ b/EventFilter/Utilities/src/FedRawDataInputSource.cc @@ -30,6 +30,7 @@ #include "EventFilter/Utilities/interface/GlobalEventNumber.h" +#include "EventFilter/Utilities/interface/SourceRawFile.h" #include "EventFilter/Utilities/interface/FedRawDataInputSource.h" #include "EventFilter/Utilities/interface/SourceCommon.h"