Skip to content

Commit

Permalink
fix bug interrupting WriterMT.RecordWriter in java, remove prints
Browse files Browse the repository at this point in the history
  • Loading branch information
carltimmer committed Dec 17, 2024
1 parent 4a39a74 commit a8ea448
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 42 deletions.
62 changes: 32 additions & 30 deletions java/org/jlab/coda/hipo/WriterMT.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -224,18 +225,8 @@ else if (firstEventLen < 1) {
haveDictionary = (dictionary != null) && (dictionary.length() > 0);
haveFirstEvent = (firstEvent != null) && (firstEventLen > 0);

if (haveDictionary) {
System.out.println("WriterMT CON: create dict, len = " + dictionary.length());
}

if (haveFirstEvent) {
System.out.println("WriterMT CON: create first event, len = " + firstEventLen);
}

if (haveDictionary || haveFirstEvent) {
dictionaryFirstEventBuffer = createDictionaryRecord();
System.out.println("WriterMT CON: created dict/firstEv buffer, order = " + byteOrder +
", dic/fe buff remaining = " + dictionaryFirstEventBuffer.remaining());
}

// Number of ring items must be >= compressionThreads
Expand All @@ -246,9 +237,10 @@ else if (firstEventLen < 1) {
// AND is must be multiple of 2
finalRingSize = Utilities.powerOfTwo(finalRingSize, true);

if (finalRingSize != ringSize) {
System.out.println("WriterMT: change to ring size = " + finalRingSize);
}
// if (finalRingSize != ringSize) {
// System.out.println("WriterMT: change to ring size from " +
// ringSize + " to " + finalRingSize);
// }

supply = new RecordSupply(ringSize, byteOrder, compressionThreads,
maxEventCount, maxBufferSize, compressionType);
Expand Down Expand Up @@ -372,7 +364,8 @@ public void run() {
class RecordWriter extends Thread {

/** The highest sequence to have been currently processed. */
private volatile long lastSeqProcessed = -1;
// private volatile long lastSeqProcessed = -1;
private AtomicLong lastSeqProcessed = new AtomicLong(-1);

/** Stop the thread. */
void stopThread() {
Expand All @@ -381,23 +374,25 @@ void stopThread() {
this.interrupt();

// Wait for it to stop
this.join(1);
this.join();

if (this.isAlive()) {
// If that didn't work, send Alert signal to ring
supply.errorAlert();
// Do NOT call supply.errorAlert since run() will exit
// in a way that lastSeqProcessed doesn't get
// properly set so waitForLastItem routine never returns.

this.join();
//std::cout << "RecordWriter JOINED from alert" << std::endl;
}
// if (this.isAlive()) {
// // If that didn't work, send Alert signal to ring
// supply.errorAlert();
// this.join();
// }
}
catch (InterruptedException e) {}
}

/** Wait for the last item to be processed, then exit thread. */
void waitForLastItem() {
try {
while (supply.getLastSequence() > lastSeqProcessed) {
while (supply.getLastSequence() > lastSeqProcessed.get()) {
Thread.yield();
}
}
Expand All @@ -417,7 +412,7 @@ public void run() {
return;
}

//System.out.println(" Writer: try getting record to write");
//System.out.println(" RecordWriter: try getting record to write");
// Get the next record for this thread to write
RecordRingItem item = supply.getToWrite();
currentSeq = item.getSequence();
Expand All @@ -426,7 +421,7 @@ public void run() {

// Do write
RecordHeader header = record.getHeader();
//System.out.println(" Writer: got record, header = \n" + header);
//System.out.println(" RecordWriter: got record, header = \n" + header);
int bytesToWrite = header.getLength();
// Record length of this record
recordLengths.add(bytesToWrite);
Expand All @@ -437,12 +432,12 @@ public void run() {
try {
ByteBuffer buf = record.getBinaryBuffer();
if (buf.hasArray()) {
//System.out.println(" Writer: use outStream to write file, buf pos = " + buf.position() +
//System.out.println(" RecordWriter: use outStream to write file, buf pos = " + buf.position() +
// ", lim = " + buf.limit() + ", bytesToWrite = " + bytesToWrite);
outStream.write(buf.array(), 0, bytesToWrite);
}
else {
//System.out.println(" Writer: use fileChannel to write file");
//System.out.println(" RecordWriter: use fileChannel to write file");
// binary buffer is ready to read after build()
fileChannel.write(buf);
}
Expand All @@ -453,21 +448,28 @@ public void run() {
}

// Release back to supply
//System.out.println(" Writer: release ring item back to supply");
//System.out.println(" RecordWriter: release ring item back to supply");
supply.releaseWriter(item);

// Now we're done with this sequence
lastSeqProcessed = currentSeq;
lastSeqProcessed.set(currentSeq);
}
}
catch (InterruptedException e) {
// We've been interrupted while blocked in getToWrite
// which means we're all done.
//System.out.println(" Writer: thread INTERRUPTED");
//System.out.println(" RecordWriter: thread INTERRUPTED");

// Make sure waitForLastItem() returns
if (supply.getLastSequence() > lastSeqProcessed.get()) {
lastSeqProcessed.set(supply.getLastSequence());
// System.out.println(" RecordWriter: thread INTERRUPTED, set lastSeqProcessed to " +
// (supply.getLastSequence() + 1));
}
}
catch (AlertException e) {
// We've been notified that an error has occurred
//System.out.println(" Writer: thread exiting due to ring alert exception");
//System.out.println(" RecordWriter: thread exiting due to ring alert exception");
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/libsrc++/WriterMT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ namespace evio {
// AND is must be multiple of 2
finalRingSize = Disruptor::Util::ceilingNextPowerOfTwo(finalRingSize);

if (finalRingSize != ringSize) {
std::cout << "WriterMT: started w/ ring size = " << ringSize <<
", change to " << finalRingSize << std::endl;
}
// if (finalRingSize != ringSize) {
// std::cout << "WriterMT: change ring size from " << ringSize <<
// " to " << finalRingSize << std::endl;
// }

supply = std::make_shared<RecordSupply>(finalRingSize, byteOrder,
compressionThreads,
Expand Down
26 changes: 18 additions & 8 deletions src/libsrc++/WriterMT.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,23 @@ namespace evio {
thd.interrupt();

// Wait for it to stop
if (thd.try_join_for(boost::chrono::milliseconds(1))) {
// if (thd.try_join_for(boost::chrono::milliseconds(1))) {
// //std::cout << "RecordWriter JOINED from interrupt" << std::endl;
// return;
// }
if (thd.joinable()) {
thd.join();
//std::cout << "RecordWriter JOINED from interrupt" << std::endl;
return;
}

// If that didn't work, send Alert signal to ring
supply->errorAlert();

if (thd.joinable()) {
thd.join();
//std::cout << "RecordWriter JOINED from alert" << std::endl;
}
// // If that didn't work, send Alert signal to ring
// supply->errorAlert();
//
// if (thd.joinable()) {
// thd.join();
// //std::cout << "RecordWriter JOINED from alert" << std::endl;
// }
}
}

Expand Down Expand Up @@ -225,6 +230,11 @@ namespace evio {
catch (boost::thread_interrupted & e) {
// Interrupted while blocked in getToWrite which means we're all done
//std::cout << "RecordWriter: quit thread through interrupt" << std::endl;
if (supply->getLastSequence() > lastSeqProcessed.load()) {
lastSeqProcessed = supply->getLastSequence();
// std::cout << "RecordWriter: thread INTERRUPTED, set lastSeqProcessed to " <<
// (supply->getLastSequence() + 1) << std::endl;
}
}
catch (std::runtime_error & e) {
std::string err = e.what();
Expand Down

0 comments on commit a8ea448

Please sign in to comment.