diff --git a/doc-6.0/doxygen/CC/html/dd/db4/_writer_m_t_8h_source.html b/doc-6.0/doxygen/CC/html/dd/db4/_writer_m_t_8h_source.html index 6d304edf..c33153c3 100644 --- a/doc-6.0/doxygen/CC/html/dd/db4/_writer_m_t_8h_source.html +++ b/doc-6.0/doxygen/CC/html/dd/db4/_writer_m_t_8h_source.html @@ -191,221 +191,231 @@
140  thd.interrupt();
141 
142  // Wait for it to stop
-
143  if (thd.try_join_for(boost::chrono::milliseconds(1))) {
-
144  //std::cout << "RecordWriter JOINED from interrupt" << std::endl;
-
145  return;
-
146  }
-
147 
-
148  // If that didn't work, send Alert signal to ring
-
149  supply->errorAlert();
-
150 
-
151  if (thd.joinable()) {
-
152  thd.join();
-
153  //std::cout << "RecordWriter JOINED from alert" << std::endl;
-
154  }
-
155  }
-
156  }
-
157 
-
159  void waitForLastItem() {
-
160  try {
-
161  //cout << "WRITE: supply last = " << supply->getLastSequence() << ", lasSeqProcessed = " << lastSeqProcessed <<
-
162  //" supply->getLast > lastSeq = " << (supply->getLastSequence() > lastSeqProcessed) << endl;
-
163  while (supply->getLastSequence() > lastSeqProcessed.load()) {
-
164  std::this_thread::sleep_for(std::chrono::milliseconds(1));
-
165  }
-
166 
-
167  // Stop this thread, not the calling thread
-
168  stopThread();
-
169  }
-
170  catch (Disruptor::AlertException & e) {
-
171  // Woken up in getToWrite through user call to supply.errorAlert()
-
172  //std::cout << "RecordWriter: quit thread through alert" << std::endl;
-
173  }
-
174  }
-
175 
-
177  void run() {
-
178 
-
179  try {
-
180  while (true) {
-
181  //std::cout << " RecordWriter: try getting record to write" << std::endl;
-
182  // Get the next record for this thread to write
-
183  auto item = supply->getToWrite();
-
184 
-
185  {
-
186  // Only allow interruption when blocked on trying to get item
-
187  boost::this_thread::interruption_point();
-
188  boost::this_thread::disable_interruption d1;
+
143 // if (thd.try_join_for(boost::chrono::milliseconds(1))) {
+
144 // //std::cout << "RecordWriter JOINED from interrupt" << std::endl;
+
145 // return;
+
146 // }
+
147  if (thd.joinable()) {
+
148  thd.join();
+
149  //std::cout << "RecordWriter JOINED from interrupt" << std::endl;
+
150  }
+
151 
+
152 
+
153  // // If that didn't work, send Alert signal to ring
+
154 // supply->errorAlert();
+
155 //
+
156 // if (thd.joinable()) {
+
157 // thd.join();
+
158 // //std::cout << "RecordWriter JOINED from alert" << std::endl;
+
159 // }
+
160  }
+
161  }
+
162 
+
164  void waitForLastItem() {
+
165  try {
+
166  //cout << "WRITE: supply last = " << supply->getLastSequence() << ", lasSeqProcessed = " << lastSeqProcessed <<
+
167  //" supply->getLast > lastSeq = " << (supply->getLastSequence() > lastSeqProcessed) << endl;
+
168  while (supply->getLastSequence() > lastSeqProcessed.load()) {
+
169  std::this_thread::sleep_for(std::chrono::milliseconds(1));
+
170  }
+
171 
+
172  // Stop this thread, not the calling thread
+
173  stopThread();
+
174  }
+
175  catch (Disruptor::AlertException & e) {
+
176  // Woken up in getToWrite through user call to supply.errorAlert()
+
177  //std::cout << "RecordWriter: quit thread through alert" << std::endl;
+
178  }
+
179  }
+
180 
+
182  void run() {
+
183 
+
184  try {
+
185  while (true) {
+
186  //std::cout << " RecordWriter: try getting record to write" << std::endl;
+
187  // Get the next record for this thread to write
+
188  auto item = supply->getToWrite();
189 
-
190  int64_t currentSeq = item->getSequence();
-
191  // Pull record out of wrapping object
-
192  std::shared_ptr<RecordOutput> record = item->getRecord();
-
193 
-
194  // Do write
-
195  auto header = record->getHeader();
-
196  int bytesToWrite = header->getLength();
-
197  // Record length of this record
-
198  writer->recordLengths->push_back(bytesToWrite);
-
199  // Followed by events in record
-
200  writer->recordLengths->push_back(header->getEntries());
-
201  writer->writerBytesWritten += bytesToWrite;
-
202 
-
203  auto buf = record->getBinaryBuffer();
-
204 // std::cout << " RecordWriter: use outFile to write file, buf pos = " << buf->position() <<
-
205 // ", lim = " << buf->limit() << ", bytesToWrite = " << bytesToWrite << std::endl;
-
206  writer->outFile.write(reinterpret_cast<const char *>(buf->array()), bytesToWrite);
-
207  if (writer->outFile.fail()) {
-
208  throw EvioException("failed write to file");
-
209  }
-
210 
-
211  record->reset();
-
212 
-
213  // Now we're done with this sequence
-
214  lastSeqProcessed = currentSeq;
+
190  {
+
191  // Only allow interruption when blocked on trying to get item
+
192  boost::this_thread::interruption_point();
+
193  boost::this_thread::disable_interruption d1;
+
194 
+
195  int64_t currentSeq = item->getSequence();
+
196  // Pull record out of wrapping object
+
197  std::shared_ptr<RecordOutput> record = item->getRecord();
+
198 
+
199  // Do write
+
200  auto header = record->getHeader();
+
201  int bytesToWrite = header->getLength();
+
202  // Record length of this record
+
203  writer->recordLengths->push_back(bytesToWrite);
+
204  // Followed by events in record
+
205  writer->recordLengths->push_back(header->getEntries());
+
206  writer->writerBytesWritten += bytesToWrite;
+
207 
+
208  auto buf = record->getBinaryBuffer();
+
209 // std::cout << " RecordWriter: use outFile to write file, buf pos = " << buf->position() <<
+
210 // ", lim = " << buf->limit() << ", bytesToWrite = " << bytesToWrite << std::endl;
+
211  writer->outFile.write(reinterpret_cast<const char *>(buf->array()), bytesToWrite);
+
212  if (writer->outFile.fail()) {
+
213  throw EvioException("failed write to file");
+
214  }
215 
-
216  // Release back to supply
-
217  supply->releaseWriter(item);
-
218  }
-
219  }
-
220  }
-
221  catch (Disruptor::AlertException & e) {
-
222  // Woken up in getToWrite through user call to supply.errorAlert()
-
223  //std::cout << "RecordWriter: quit thread through alert" << std::endl;
-
224  }
-
225  catch (boost::thread_interrupted & e) {
-
226  // Interrupted while blocked in getToWrite which means we're all done
-
227  //std::cout << "RecordWriter: quit thread through interrupt" << std::endl;
-
228  }
-
229  catch (std::runtime_error & e) {
-
230  std::string err = e.what();
-
231  supply->haveError(true);
-
232  supply->setError(err);
-
233  }
-
234  }
-
235  };
-
236 
-
237 
-
238  private:
-
239 
-
240 
-
242  size_t writerBytesWritten = 0ULL;
-
244  uint8_t* firstEvent = nullptr;
-
246  uint32_t firstEventLength = 0;
-
248  uint32_t maxEventCount = 0;
-
250  uint32_t maxBufferSize = 0;
-
252  uint32_t recordNumber = 1;
-
254  uint32_t compressionThreadCount = 1;
-
255 
-
257  std::string fileName;
-
258 
-
260  std::ofstream outFile;
-
261 
-
263  FileHeader fileHeader;
-
264 
-
266  std::string dictionary;
-
267 
-
269  std::shared_ptr<ByteBuffer> dictionaryFirstEventBuffer;
-
270 
-
272  ByteOrder byteOrder {ByteOrder::ENDIAN_LOCAL};
-
273 
-
275  std::shared_ptr<RecordOutput> outputRecord;
-
276 
-
278  std::vector<uint8_t> headerArray;
-
279 
-
281  Compressor::CompressionType compressionType {Compressor::UNCOMPRESSED};
-
282 
-
285  std::shared_ptr<std::vector<uint32_t>> recordLengths;
+
216  record->reset();
+
217 
+
218  // Now we're done with this sequence
+
219  lastSeqProcessed = currentSeq;
+
220 
+
221  // Release back to supply
+
222  supply->releaseWriter(item);
+
223  }
+
224  }
+
225  }
+
226  catch (Disruptor::AlertException & e) {
+
227  // Woken up in getToWrite through user call to supply.errorAlert()
+
228  //std::cout << "RecordWriter: quit thread through alert" << std::endl;
+
229  }
+
230  catch (boost::thread_interrupted & e) {
+
231  // Interrupted while blocked in getToWrite which means we're all done
+
232  //std::cout << "RecordWriter: quit thread through interrupt" << std::endl;
+
233  if (supply->getLastSequence() > lastSeqProcessed.load()) {
+
234  lastSeqProcessed = supply->getLastSequence();
+
235 // std::cout << "RecordWriter: thread INTERRUPTED, set lastSeqProcessed to " <<
+
236 // (supply->getLastSequence() + 1) << std::endl;
+
237  }
+
238  }
+
239  catch (std::runtime_error & e) {
+
240  std::string err = e.what();
+
241  supply->haveError(true);
+
242  supply->setError(err);
+
243  }
+
244  }
+
245  };
+
246 
+
247 
+
248  private:
+
249 
+
250 
+
252  size_t writerBytesWritten = 0ULL;
+
254  uint8_t* firstEvent = nullptr;
+
256  uint32_t firstEventLength = 0;
+
258  uint32_t maxEventCount = 0;
+
260  uint32_t maxBufferSize = 0;
+
262  uint32_t recordNumber = 1;
+
264  uint32_t compressionThreadCount = 1;
+
265 
+
267  std::string fileName;
+
268 
+
270  std::ofstream outFile;
+
271 
+
273  FileHeader fileHeader;
+
274 
+
276  std::string dictionary;
+
277 
+
279  std::shared_ptr<ByteBuffer> dictionaryFirstEventBuffer;
+
280 
+
282  ByteOrder byteOrder {ByteOrder::ENDIAN_LOCAL};
+
283 
+
285  std::shared_ptr<RecordOutput> outputRecord;
286 
-
288  std::shared_ptr<RecordSupply> supply;
+
288  std::vector<uint8_t> headerArray;
289 
-
292  std::vector<RecordWriter> recordWriterThreads;
-
293 
-
295  std::vector<RecordCompressor> recordCompressorThreads;
+
291  Compressor::CompressionType compressionType {Compressor::UNCOMPRESSED};
+
292 
+
295  std::shared_ptr<std::vector<uint32_t>> recordLengths;
296 
-
298  std::shared_ptr<RecordRingItem> ringItem;
+
298  std::shared_ptr<RecordSupply> supply;
299 
-
300 
-
302  bool addingTrailer = true;
-
304  bool addTrailerIndex = false;
-
306  bool closed = false;
-
308  bool opened = false;
-
310  bool haveDictionary = false;
-
312  bool haveFirstEvent = false;
-
314  bool haveUserHeader = false;
-
315 
-
316 
-
317  public:
-
318 
-
319  WriterMT();
-
320 
-
321  WriterMT(const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
-
322  Compressor::CompressionType compType, uint32_t compressionThreads);
-
323 
-
324  explicit WriterMT(
-
325  const HeaderType & hType,
-
326  const ByteOrder & order = ByteOrder::ENDIAN_LITTLE,
-
327  uint32_t maxEventCount = 0,
-
328  uint32_t maxBufferSize = 0,
-
329  const std::string & dictionary = "",
-
330  uint8_t* firstEvent = nullptr,
-
331  uint32_t firstEventLen = 0,
-
332  Compressor::CompressionType compressionType = Compressor::UNCOMPRESSED,
-
333  uint32_t compressionThreads = 1,
-
334  bool addTrailerIndex = false,
-
335  uint32_t ringSize = 16);
-
336 
-
337  explicit WriterMT(const std::string & filename);
-
338 
-
339  WriterMT(const std::string & filename, const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
-
340  Compressor::CompressionType compressionType, uint32_t compressionThreads);
-
341 
-
342  ~WriterMT() = default;
-
343 
-
345 
-
346  private:
-
347 
-
348  std::shared_ptr<ByteBuffer> createDictionaryRecord();
-
349  void writeTrailer(bool writeIndex, uint32_t recordNum);
-
350  void clear();
+
302  std::vector<RecordWriter> recordWriterThreads;
+
303 
+
305  std::vector<RecordCompressor> recordCompressorThreads;
+
306 
+
308  std::shared_ptr<RecordRingItem> ringItem;
+
309 
+
310 
+
312  bool addingTrailer = true;
+
314  bool addTrailerIndex = false;
+
316  bool closed = false;
+
318  bool opened = false;
+
320  bool haveDictionary = false;
+
322  bool haveFirstEvent = false;
+
324  bool haveUserHeader = false;
+
325 
+
326 
+
327  public:
+
328 
+
329  WriterMT();
+
330 
+
331  WriterMT(const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
+
332  Compressor::CompressionType compType, uint32_t compressionThreads);
+
333 
+
334  explicit WriterMT(
+
335  const HeaderType & hType,
+
336  const ByteOrder & order = ByteOrder::ENDIAN_LITTLE,
+
337  uint32_t maxEventCount = 0,
+
338  uint32_t maxBufferSize = 0,
+
339  const std::string & dictionary = "",
+
340  uint8_t* firstEvent = nullptr,
+
341  uint32_t firstEventLen = 0,
+
342  Compressor::CompressionType compressionType = Compressor::UNCOMPRESSED,
+
343  uint32_t compressionThreads = 1,
+
344  bool addTrailerIndex = false,
+
345  uint32_t ringSize = 16);
+
346 
+
347  explicit WriterMT(const std::string & filename);
+
348 
+
349  WriterMT(const std::string & filename, const ByteOrder & order, uint32_t maxEventCount, uint32_t maxBufferSize,
+
350  Compressor::CompressionType compressionType, uint32_t compressionThreads);
351 
-
352  public:
+
352  ~WriterMT() = default;
353 
-
354  const ByteOrder & getByteOrder() const;
-
355 // ByteBuffer & getBuffer();
-
356  FileHeader & getFileHeader();
-
357 // RecordHeader & getRecordHeader();
-
358 // RecordOutput & getRecord();
-
359  Compressor::CompressionType getCompressionType();
-
360 
-
361  bool addTrailer() const;
-
362  void addTrailer(bool add);
-
363  bool addTrailerWithIndex();
-
364  void addTrailerWithIndex(bool addTrailingIndex);
-
365 
-
366  void open(const std::string & filename);
-
367  void open(const std::string & filename, uint8_t* userHdr, uint32_t userLen, bool overwrite = true);
-
368 
-
369  std::shared_ptr<ByteBuffer> createHeader(uint8_t* userHdr, uint32_t userLen);
-
370  std::shared_ptr<ByteBuffer> createHeader(ByteBuffer & userHdr);
-
371 
-
372  void writeRecord(RecordOutput & record);
-
373 
-
374  // Use internal RecordOutput to write individual events
+
355 
+
356  private:
+
357 
+
358  std::shared_ptr<ByteBuffer> createDictionaryRecord();
+
359  void writeTrailer(bool writeIndex, uint32_t recordNum);
+
360  void clear();
+
361 
+
362  public:
+
363 
+
364  const ByteOrder & getByteOrder() const;
+
365 // ByteBuffer & getBuffer();
+
366  FileHeader & getFileHeader();
+
367 // RecordHeader & getRecordHeader();
+
368 // RecordOutput & getRecord();
+
369  Compressor::CompressionType getCompressionType();
+
370 
+
371  bool addTrailer() const;
+
372  void addTrailer(bool add);
+
373  bool addTrailerWithIndex();
+
374  void addTrailerWithIndex(bool addTrailingIndex);
375 
-
376  void addEvent(uint8_t* buffer, uint32_t offset, uint32_t length);
-
377  void addEvent(std::shared_ptr<ByteBuffer> buffer);
-
378  void addEvent(ByteBuffer & buffer);
-
379  void addEvent(std::shared_ptr<EvioBank> bank);
-
380  void addEvent(std::shared_ptr<EvioNode> node);
-
381  void addEvent(EvioNode & node);
-
382 
-
383  void close();
-
384 
-
385  };
-
386 
-
387 }
-
388 
-
389 
-
390 #endif //EVIO_6_0_WRITERMT_H
+
376  void open(const std::string & filename);
+
377  void open(const std::string & filename, uint8_t* userHdr, uint32_t userLen, bool overwrite = true);
+
378 
+
379  std::shared_ptr<ByteBuffer> createHeader(uint8_t* userHdr, uint32_t userLen);
+
380  std::shared_ptr<ByteBuffer> createHeader(ByteBuffer & userHdr);
+
381 
+
382  void writeRecord(RecordOutput & record);
+
383 
+
384  // Use internal RecordOutput to write individual events
+
385 
+
386  void addEvent(uint8_t* buffer, uint32_t offset, uint32_t length);
+
387  void addEvent(std::shared_ptr<ByteBuffer> buffer);
+
388  void addEvent(ByteBuffer & buffer);
+
389  void addEvent(std::shared_ptr<EvioBank> bank);
+
390  void addEvent(std::shared_ptr<EvioNode> node);
+
391  void addEvent(EvioNode & node);
+
392 
+
393  void close();
+
394 
+
395  };
+
396 
+
397 }
+
398 
+
399 
+
400 #endif //EVIO_6_0_WRITERMT_H
ByteBuffer.h
ByteOrder.h
Compressor.h