Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a read-only current_file attribute for G3MultiFileWriter #155

Merged
merged 1 commit into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions core/src/G3MultiFileWriter.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ class G3MultiFileWriter : public G3Module {
boost::python::object divide_on = boost::python::object());
virtual ~G3MultiFileWriter();
void Process(G3FramePtr frame, std::deque<G3FramePtr> &out);
std::string CurrentFile() { return current_filename_; }
private:
bool CheckNewFile(G3FramePtr frame);

std::string filename_;
boost::python::object filename_callback_;
std::string current_filename_;
size_t size_limit_;

std::vector<G3Frame::FrameType> always_break_on_;
Expand Down Expand Up @@ -141,6 +143,8 @@ G3MultiFileWriter::CheckNewFile(G3FramePtr frame)

}

current_filename_ = filename;

if (boost::algorithm::ends_with(filename, ".gz"))
stream_.push(boost::iostreams::gzip_compressor());
if (boost::algorithm::ends_with(filename, ".bz2")) {
Expand Down Expand Up @@ -201,8 +205,34 @@ void G3MultiFileWriter::Process(G3FramePtr frame, std::deque<G3FramePtr> &out)
out.push_back(frame);
}

EXPORT_G3MODULE("core", G3MultiFileWriter, (init<boost::python::object, size_t, optional<boost::python::object> >(args("filename", "size_limit", "divide_on"))),
"Writes frames to disk into a sequence of files. Once a file exceeds the number of bytes specified in size_limit, it will start a new file. Files are named based on filename. If passed a string for filename with a printf-style specifier, that specifier will be replaced by a zero-indexed sequence number. For example, outfile-%03u.g3.gz would produce a sequence of files named outfile-000.g3.gz, outfile-001.g3.gz, etc. Alternatively, you can pass a callable that is passed the first frame in the new file and the sequence number and returns a path to the new file. Any frames besides Timepoint and Scan frames have the most recent frame of each type prepended to all new files.\n\n"
"More complex behavior can be obtained with the optional divide_on argument. This can be an iterable of frame types (e.g. [core.G3FrameType.Observation]) or a callable. In the iterable case, the presence of any frame with a type in the list will cause the creation of a new file even if the file size threshold has not yet been met. This is useful to create files based on, for example, observation boundaries. For more flexibility, you can also pass a python callable as divide_on. This callable will be passed each frame in turn. If it returns True (or something with positive truth-value), a new file will be started at that frame."
);

PYBINDINGS("core") {
using namespace boost::python;

class_<G3MultiFileWriter, bases<G3Module>, boost::shared_ptr<G3MultiFileWriter>,
boost::noncopyable>("G3MultiFileWriter",
"Writes frames to disk into a sequence of files. Once a file exceeds "
"the number of bytes specified in size_limit, it will start a new file. "
"Files are named based on filename. If passed a string for filename "
"with a printf-style specifier, that specifier will be replaced by a "
"zero-indexed sequence number. For example, outfile-%03u.g3.gz would "
"produce a sequence of files named outfile-000.g3.gz, outfile-001.g3.gz, "
"etc. Alternatively, you can pass a callable that is passed the first "
"frame in the new file and the sequence number and returns a path to "
"the new file. Any frames besides Timepoint and Scan frames have the "
"most recent frame of each type prepended to all new files.\n\n"
"More complex behavior can be obtained with the optional divide_on "
"argument. This can be an iterable of frame types (e.g. "
"[core.G3FrameType.Observation]) or a callable. In the iterable case, "
"the presence of any frame with a type in the list will cause the "
"creation of a new file even if the file size threshold has not yet "
"been met. This is useful to create files based on, for example, "
"observation boundaries. For more flexibility, you can also pass a "
"python callable as divide_on. This callable will be passed each "
"frame in turn. If it returns True (or something with positive "
"truth-value), a new file will be started at that frame.",
init<object, size_t, optional<object> >((arg("filename"),
arg("size_limit"), arg("divide_on")=object())))
.def_readonly("current_file", &G3MultiFileWriter::CurrentFile)
.def_readonly("__g3module__", true)
;
}
16 changes: 12 additions & 4 deletions core/tests/multifileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@ def addinfo(fr):
fr['count'] = n
n += 1
pipe.Add(addinfo)

class checkwriter(core.G3MultiFileWriter):
def __call__(self, frame):
if frame.type == core.G3FrameType.EndProcessing:
assert os.path.exists(self.current_file)
super().__call__(frame)

pipe.Add(lambda fr: fr.type != core.G3FrameType.PipelineInfo) # Avoid extra frames that complicate accounting
pipe.Add(core.G3MultiFileWriter, filename='multitest-%02u.g3', size_limit=20*1024)
pipe.Add(core.G3MultiFileWriter, filename=lambda frame,seq: 'multitest2-%02d.g3' % seq, size_limit=20*1024)
pipe.Add(core.G3MultiFileWriter, filename='multitest3-%02u.g3', size_limit=20*1024, divide_on=[core.G3FrameType.Timepoint])
pipe.Add(core.G3MultiFileWriter, filename='multitest4-%02u.g3', size_limit=2000000*1024, divide_on=lambda fr: fr['count'] % 200 == 0)
pipe.Add(checkwriter, filename='multitest-%02u.g3', size_limit=20*1024)
pipe.Add(checkwriter, filename=lambda frame,seq: 'multitest2-%02d.g3' % seq, size_limit=20*1024)
pipe.Add(checkwriter, filename='multitest3-%02u.g3', size_limit=20*1024, divide_on=[core.G3FrameType.Timepoint])
pipe.Add(checkwriter, filename='multitest4-%02u.g3', size_limit=2000000*1024, divide_on=lambda fr: fr['count'] % 200 == 0)

pipe.Run()

# Check that various ways of splitting produce the right number of files
Expand Down
Loading