Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DAQ] Input source improvements and initial Phase-2 DTH format support #47068

Merged
merged 20 commits into from
Jan 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
05bfe1a
WIP filelist improvements
smorovic May 15, 2024
7a99b4b
file deleter thread instead of DAQ director
smorovic Jul 4, 2024
2f81cb5
port deleter thread and thread synchronization changes to DAQSource
smorovic Jul 4, 2024
e1ecb24
fix two deadlock scenarios
smorovic Jul 7, 2024
667935c
fixes to DAQSource striped mode and EvFDaqDirector detection of mount…
smorovic Jul 9, 2024
e727c63
add new fields to EoR json of the (unit) test writer
smorovic Jul 11, 2024
ebcb616
New DAQSourceModel "PreUnpack" where FRD collection is unpacked in th…
smorovic Jul 12, 2024
cbd0ba8
Adding DTH DAQ source model:
smorovic Nov 22, 2024
0693cc1
update error conditions and monitoring for new file deletion thread
smorovic Dec 12, 2024
9ae4ec3
*port held files monitoring to DAQSource, drop support for very old F…
smorovic Dec 12, 2024
d976961
fix FRDPreUnpack to not modify file structure variables when running …
smorovic Dec 12, 2024
59f07e1
add README for DTH source
smorovic Dec 18, 2024
70a3af4
fix header ifdef
smorovic Dec 18, 2024
cc8f5dd
Add reworked LS-based file closure. This is complementary to stream-b…
smorovic Jan 9, 2025
7ed2304
fix leftover asserts and clean up commented code
smorovic Jan 9, 2025
32a1fee
code-format/checks
smorovic Jan 9, 2025
ec8c303
code-format
smorovic Jan 9, 2025
7026f88
override virtual function
smorovic Jan 9, 2025
5ee9ae3
move implementation to cc files
smorovic Jan 9, 2025
06fdce0
reorganize input file and chunk classes in separate header
smorovic Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions EventFilter/Utilities/doc/README-DTH.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

# DTH orbit/event unpacker for DAQSource

https://github.com/smorovic/cmssw/tree/15_0_0_pre1-source-improvements
<br>
This patch implements unpacking of the the DTH data format by `DAQSource` into `FedRawDataCollection`.

It is rebased over CMSSW master (compatible with 15_0_0_pre1 at the time this file is commited), but it builds and runs in 14_2_0 as well. All changes are contained in `EventFilter/Utilities`.

## Fetching the code

```
scram project CMSSW_15_0_0_pre1 #or CMSSW_14_2_0 (currently it compiles and runs also in 14_X releases)
git cms-addpkg EventFilter/Utilities
git remote add smorovic https://github.com/smorovic/cmssw.git
git fetch smorovic 15_0_0_pre1-source-improvements:15_0_0_pre1-source-improvements
git checkout 15_0_0_pre1-source-improvements
scram b
```

Run the unit test (generates and consumes files with DTH format):
```
cmsenv
cd src/EventFilter/Utilities/test
./RunBUFU.sh
```

## Important code and scripts in `EventFilter/Utilities`:

Definition of DTH orbit header, fragment trailer and SLinkRocket header/trailer (could potentially be moved to DataFormats or another package in the future):
<br>
[interface/DTHHeaders.h](../interface/DTHHeaders.h)

Plugin for DAQSource (input source) which parses the DTH format:
<br>
[src/DAQSourceModelsDTH.cc](../src/DAQSourceModelsDTH.cc)

Generator of dummy DTH payload for the fake "BU" process used in unit tests:
<br>
[plugins/DTHFakeReader.cc](../plugins/DTHFakeReader.cc)

Script which runs the unit test with "fakeBU" process generating payload from multiple DTH sources (per orbit) and "FU" CMSSW job consuming it:
<br>
[test/testDTH.sh](../test/testDTH.sh)

FU cmsRun configuration used in above tests:
<br>
[test/unittest_FU_daqsource.py](../test/unittest_FU_daqsource.py)

## Running on custom input files
`unittest_FU_daqsource.py` script can be used as a starting point to create a custom runner with inputs such as DTH dumps (not generated as in the unit test). DAQSource should be set to `dataMode = cms.untracked.string("DTH")` to process DTH format. Change `fileListMode` to `True` and fill in `fileList` parameter with file paths to run with custom files, however they should be named similarly and could also be placed in similar directory structure, `ramdisk/runXX`, to provide initial run and lumisection to the source. Run number is also passed to the source via the command line as well as the working directory (see `testDTH.sh` script).

Note on the file format: apart of parsing single DTH orbit dump, input source plugin is capable also of building events from multiple DTH orbit blocks, but for the same orbit they must come sequentially in the file. Source scans the file and will find all blocks with orbit headers from the same orbit number, until a different orbit number is found or EOF, then it proceeds to build events from them by starting from last DTH event fragment trailer in each of the orbits found. This is then iterated for the next set of orbit blocks with the same orbit number in the file until file is processed.

It is possible that another DAQ-specific header will be added to both file and per-orbit to better encapsulate data similar is done for Run2/3 RAW data), to provide additional metadata to improve integrity and completeness checks after aggregation of data in DAQ. At present, only RAW DTH is supported by "DTH" format.
34 changes: 8 additions & 26 deletions EventFilter/Utilities/interface/DAQSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <memory>
#include <mutex>
#include <thread>
#include <queue>

#include "oneapi/tbb/concurrent_queue.h"
#include "oneapi/tbb/concurrent_vector.h"
Expand All @@ -20,7 +21,8 @@
#include "EventFilter/Utilities/interface/EvFDaqDirector.h"

//import InputChunk
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
#include "EventFilter/Utilities/interface/SourceRawFile.h"

class FEDRawDataCollection;
class InputSourceDescription;
Expand Down Expand Up @@ -72,6 +74,7 @@ class DAQSource : public edm::RawInputSource {
void maybeOpenNewLumiSection(const uint32_t lumiSection);

void readSupervisor();
void fileDeleter();
void dataArranger();
void readWorker(unsigned int tid);
void threadError();
Expand All @@ -92,10 +95,11 @@ class DAQSource : public edm::RawInputSource {
uint64_t maxChunkSize_; // for buffered read-ahead
uint64_t eventChunkBlock_; // how much read(2) asks at the time
unsigned int readBlocks_;
int numConcurrentReads_;
unsigned int numBuffers_;
unsigned int maxBufferedFiles_;
unsigned int numConcurrentReads_;
std::atomic<unsigned int> readingFilesCount_;
std::atomic<unsigned int> heldFilesCount_;

// get LS from filename instead of event header
const bool alwaysStartFromFirstLS_;
Expand Down Expand Up @@ -136,6 +140,7 @@ class DAQSource : public edm::RawInputSource {

bool startedSupervisorThread_ = false;
std::unique_ptr<std::thread> readSupervisorThread_;
std::unique_ptr<std::thread> fileDeleterThread_;
std::unique_ptr<std::thread> dataArrangerThread_;
std::vector<std::thread*> workerThreads_;

Expand Down Expand Up @@ -164,6 +169,7 @@ class DAQSource : public edm::RawInputSource {
//supervisor thread wakeup
std::mutex mWakeup_;
std::condition_variable cvWakeup_;
std::condition_variable cvWakeupAll_;

//variables for the single buffered mode
int fileDescriptor_ = -1;
Expand All @@ -176,28 +182,4 @@ class DAQSource : public edm::RawInputSource {
std::shared_ptr<DataMode> dataMode_;
};

class RawInputFile : public InputFile {
public:
RawInputFile(evf::EvFDaqDirector::FileStatus status,
unsigned int lumi = 0,
std::string const& name = std::string(),
bool deleteFile = true,
int rawFd = -1,
uint64_t fileSize = 0,
uint16_t rawHeaderSize = 0,
uint32_t nChunks = 0,
int nEvents = 0,
DAQSource* parent = nullptr)
: InputFile(status, lumi, name, deleteFile, rawFd, fileSize, rawHeaderSize, nChunks, nEvents, nullptr),
sourceParent_(parent) {}
bool advance(unsigned char*& dataPosition, const size_t size);
void advance(const size_t size) {
chunkPosition_ += size;
bufferPosition_ += size;
}

private:
DAQSource* sourceParent_;
};

#endif // EventFilter_Utilities_DAQSource_h
17 changes: 10 additions & 7 deletions EventFilter/Utilities/interface/DAQSourceModels.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
#include "DataFormats/Provenance/interface/LuminosityBlockAuxiliary.h"

//import InputChunk
#include "EventFilter/Utilities/interface/FedRawDataInputSource.h"
#include "EventFilter/Utilities/interface/SourceRawFile.h"

class RawInputFile;
class UnpackedRawEventWrapper;
class DAQSource;

//evf?
Expand All @@ -40,18 +42,16 @@ class DataMode {
virtual uint32_t headerSize() const = 0;
virtual bool versionCheck() const = 0;
virtual uint64_t dataBlockSize() const = 0;
virtual void makeDataBlockView(unsigned char* addr,
size_t maxSize,
std::vector<uint64_t> const& fileSizes,
size_t fileHeaderSize) = 0;
virtual bool nextEventView() = 0;
virtual void makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) = 0;
virtual bool nextEventView(RawInputFile*) = 0;
virtual bool blockChecksumValid() = 0;
virtual bool checksumValid() = 0;
virtual std::string getChecksumError() const = 0;
virtual bool isRealData() const = 0;
virtual uint32_t run() const = 0;
virtual bool dataBlockCompleted() const = 0;
virtual bool requireHeader() const = 0;
virtual bool fitToBuffer() const = 0;
virtual void unpackFile(RawInputFile* file) = 0;

virtual bool dataBlockInitialized() const = 0;
virtual void setDataBlockInitialized(bool) = 0;
Expand All @@ -66,9 +66,12 @@ class DataMode {
std::string const& runDir) = 0;
void setTesting(bool testing) { testing_ = testing; }

bool errorDetected() { return errorDetected_; }

protected:
DAQSource* daqSource_;
bool testing_ = false;
bool errorDetected_ = false;
};

#endif // EventFilter_Utilities_DAQSourceModels_h
89 changes: 89 additions & 0 deletions EventFilter/Utilities/interface/DAQSourceModelsDTH.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#ifndef EventFilter_Utilities_DAQSourceModelsDTH_h
#define EventFilter_Utilities_DAQSourceModelsDTH_h

#include <filesystem>
#include <queue>

#include "EventFilter/Utilities/interface/DAQSourceModels.h"
#include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
#include "EventFilter/Utilities/interface/DTHHeaders.h"

class FEDRawDataCollection;

class DataModeDTH : public DataMode {
public:
DataModeDTH(DAQSource* daqSource, bool verifyChecksum) : DataMode(daqSource), verifyChecksum_(verifyChecksum) {}
~DataModeDTH() override {}
std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>>& makeDaqProvenanceHelpers() override;
void readEvent(edm::EventPrincipal& eventPrincipal) override;

//non-virtual
edm::Timestamp fillFEDRawDataCollection(FEDRawDataCollection& rawData);

int dataVersion() const override { return detectedDTHversion_; }
void detectVersion(unsigned char* fileBuf, uint32_t fileHeaderOffset) override {
detectedDTHversion_ = 1; //TODO: read version
}

uint32_t headerSize() const override { return sizeof(evf::DTHOrbitHeader_v1); }

bool versionCheck() const override { return detectedDTHversion_ == 1; }

uint64_t dataBlockSize() const override { return dataBlockSize_; }

void makeDataBlockView(unsigned char* addr, RawInputFile* rawFile) override;

bool nextEventView(RawInputFile*) override;
bool blockChecksumValid() override { return checksumValid_; }
bool checksumValid() override { return checksumValid_; }
std::string getChecksumError() const override { return checksumError_; }

bool isRealData() const { return true; } //this flag could be added to RU/BU-generated index

uint32_t run() const override { return firstOrbitHeader_->runNumber(); }

bool dataBlockCompleted() const override { return blockCompleted_; }

bool requireHeader() const override { return false; }

bool fitToBuffer() const override { return true; }

void unpackFile(RawInputFile*) override {}

bool dataBlockInitialized() const override { return dataBlockInitialized_; }

void setDataBlockInitialized(bool val) override { dataBlockInitialized_ = val; }

void setTCDSSearchRange(uint16_t MINTCDSuTCAFEDID, uint16_t MAXTCDSuTCAFEDID) override {}

void makeDirectoryEntries(std::vector<std::string> const& baseDirs,
std::vector<int> const& numSources,
std::string const& runDir) override {}

std::pair<bool, std::vector<std::string>> defineAdditionalFiles(std::string const& primaryName, bool) const override {
return std::make_pair(true, std::vector<std::string>());
}

private:
bool verifyChecksum_;
std::vector<std::shared_ptr<const edm::DaqProvenanceHelper>> daqProvenanceHelpers_;
uint16_t detectedDTHversion_ = 0;
evf::DTHOrbitHeader_v1* firstOrbitHeader_ = nullptr;
uint64_t nextEventID_ = 0;
std::vector<evf::DTHFragmentTrailer_v1*> eventFragments_; //events in block (DTH trailer)
bool dataBlockInitialized_ = false;
bool blockCompleted_ = true;

std::vector<uint8_t*> addrsStart_; //start of orbit payloads per source
std::vector<uint8_t*> addrsEnd_; //dth trailers per source (go through events from the end)

bool checksumValid_ = false;
std::string checksumError_;
//total
size_t dataBlockSize_ = 0;
//uint16_t MINTCDSuTCAFEDID_ = FEDNumbering::MINTCDSuTCAFEDID;
//uint16_t MAXTCDSuTCAFEDID_ = FEDNumbering::MAXTCDSuTCAFEDID;
bool eventCached_ = false;
};

#endif // EventFilter_Utilities_DAQSourceModelsDTH_h
Loading