Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increased python functionality #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 40 additions & 21 deletions ch_vdif_assembler.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,42 +48,53 @@ class constants:
num_disks = ch_vdif_assembler_cython.num_disks # 10 (moose)


class assembler:
class assembler(object):
def __init__(self, write_to_disk=False, rbuf_size=constants.num_disks, abuf_size=4, assembler_nt=65536):
self._assembler = ch_vdif_assembler_cython.assembler(write_to_disk, rbuf_size, abuf_size, assembler_nt)
self.python_processor = None
self.python_processors = []


def register_processor(self, p):
if isinstance(p, ch_vdif_assembler_cython.cpp_processor):
self._assembler.register_cpp_processor(p) # register C++ processor (this actually spawns a processing thread)
elif not isinstance(p, processor):
raise RuntimeError('Argument to assembler.register_processor() must be either an object of class ch_vdif_assembler.processor, or a C++ processor (e.g. returned by make_waterfall_plotter)')
elif self.python_processor is not None:
raise RuntimeError('Currently, ch_vdif_assembler only allows registering one python processor (but an arbitrary number of C++ processors)')
else:
self.python_processor = p
self.python_processors.append(p)


def run(self, stream):
if self.python_processor is None:
if not self.python_processors:
self._assembler.start_async(stream)
self._assembler.wait_until_end()
return

self._assembler.register_python_processor()

processors_byte_data = [p.byte_data for p in self.python_processors]
need_byte_data = True in processors_byte_data
need_complex_data = False in processors_byte_data

try:
self._assembler.start_async(stream)

while True:
chunk = self._assembler.get_next_python_chunk()
if chunk is None:
break
(t0, nt, efield, mask) = chunk.get_data()
self.python_processor.process_chunk(t0, nt, efield, mask)

self.python_processor.finalize()
if need_byte_data:
byte_data = chunk.get_byte_data()
if need_complex_data:
complex_data = chunk.get_data()
for p in self.python_processors:
if p.byte_data:
t0, nt, efield = byte_data
mask = None
else:
t0, nt, efield, mask = complex_data
p.process_chunk(t0, nt, efield, mask)

for p in self.python_processors:
p.finalize()

finally:
self._assembler.unregister_python_processor()
Expand Down Expand Up @@ -142,7 +153,7 @@ def cpp_waterfall_plotter(outdir, is_critical=False):
return ch_vdif_assembler_cython.cpp_waterfall_plotter(outdir, is_critical)


class processor:
class processor(object):
"""
To define a python processor, you subclass this base class.

Expand All @@ -158,17 +169,25 @@ class processor:
interruption in data stream, then a timestamp gap will appear.

The 'efield' arg is a shape (nfreq,2,nt) complex array with electric field values, where
the middle index is polarziation. Missing data is represented by (0+0j). The 'mask' arg
the middle index is polarization. Missing data is represented by (0+0j). The 'mask' arg
is a shape (nfreq,2,nt) integer array which is 0 for missing data, and 1 for non-missing.

WARNING 2: Handling missing data is an important aspect of the vdif_processor since it
happens all the time. If a GPU correlator node is down, which is a frequent occurrence,
then some frequencies will be "all missing". There are also routine packet loss events
on second-timescales which result in some high-speed samples being flagged as missing data.
"""
For subclasses with attribute `byte_data = True`, 'efield' is a shape
(nfreq,2,nt) byte array, as in the C++ versions. 'mask' is None.

WARNING 2: Handling missing data is an important aspect of the vdif_processor since it
happens all the time. If a GPU correlator node is down, which is a frequent occurrence,
then some frequencies will be "all missing". There are also routine packet loss events
on second-timescales which result in some high-speed samples being flagged as missing data.
"""

byte_data = False

def process_chunk(self, t0, nt, efield, mask):
print 'process_chunk called! t0=%s nt=%s efield (%s,%s) mask (%s,%s)' % (t0, nt, efield.dtype, efield.shape, mask.dtype, mask.shape)
if mask is None:
print 'process_chunk called! t0=%s nt=%s efield (%s,%s)' % (t0, nt, efield.dtype, efield.shape)
else:
print 'process_chunk called! t0=%s nt=%s efield (%s,%s) mask (%s,%s)' % (t0, nt, efield.dtype, efield.shape, mask.dtype, mask.shape)

def finalize(self):
pass
Expand All @@ -181,7 +200,7 @@ def finalize(self):
# See also the script show-moose-acquisitions.py


class moose_inventory:
class moose_inventory(object):
def __init__(self):
self.topdirs = [ ('/drives/E/%d' % i) for i in xrange(10) ]
self.subdirs = set()
Expand Down
10 changes: 10 additions & 0 deletions ch_vdif_assembler_cython.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ struct cython_assembled_chunk {
std::complex<float> *efield = reinterpret_cast<std::complex<float> *> (efield_hack);
p->fill_efield_array_reference(efield, mask);
}

inline uint8_t* get_buf(void)
{
// This const_cast is super sketchy but I can't figure out a way to get around it since I want
// to use this buffer as a numpy array (in the python assembled_chunk.get_byte_data). That array
// is set as read only. It would be marginally better if the const_cast could be done later in cython,
// but I can't figure out how to do that. -KM
return const_cast<uint8_t*> (p->buf);
}

};


Expand Down
48 changes: 47 additions & 1 deletion ch_vdif_assembler_cython.pyx
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from libc.stdint cimport int32_t, int64_t
from libc.stdint cimport int32_t, int64_t, uint8_t
from libc.stdlib cimport free, malloc
from libcpp.vector cimport vector
from libcpp cimport bool

import numpy as np
cimport numpy as np

# Oddly, things compile without this, but segfaults. -KM
np.import_array() # Exposes the numpy c-api.

cimport ch_vdif_assembler_pxd


Expand Down Expand Up @@ -95,6 +99,48 @@ cdef class assembled_chunk:
self.p[0].fill_efield(&efield[0,0,0], &mask[0,0,0])
return (t0, nt, efield, mask)

def get_byte_data(self):
if self.p == NULL:
return None

t0 = self.p[0].t0
nt = self.p[0].nt

shape = (chime_nfreq, 2, nt)
# Get all the dimensions in the right format.
cdef int nd = len(shape)
# Very important to use this type.
cdef np.npy_intp *shape_c = <np.npy_intp *> malloc(nd * sizeof(np.npy_intp))
cdef int64_t size = 1
for ii, s in enumerate(shape):
shape_c[ii] = s
size *= s

# XXX I'm note sure how the memory for self.p[0].buf is handled. I'm
# worried that it is freed when self.p[0] goes out of scope, which may be
# before efield goes out of scope. The fix is to store a reference of
# self.p[0] in efield. This is done below, but doesn't seem to be
# nessisary. -KM
cdef np.ndarray[np.uint8_t,ndim=3,mode='c'] efield
cdef uint8_t *buf
buf = self.p[0].get_buf()
#buf = <uint8_t *> malloc(size * sizeof(uint8_t))
efield = np.PyArray_SimpleNewFromData(nd, shape_c, np.NPY_UINT8, <void *>buf)
efield.flags.writeable = False

# Store reference to self to keep it in scope and from freeing
# memory.
# This is the wrong place for class definition if this ends up being
# needed (should be top level).
#class pyobj_array(np.ndarray):
# pass
#efield = efield.view(pyobj_array)
#pyobj_array._ref_to_memory = self

free(shape_c)
return (t0, nt, efield)



############################################## Assembler #########################################

Expand Down
3 changes: 2 additions & 1 deletion ch_vdif_assembler_pxd.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from libc.stdint cimport int32_t, int64_t
from libc.stdint cimport int32_t, int64_t, uint8_t

from libcpp cimport bool
from libcpp.string cimport string
Expand All @@ -24,6 +24,7 @@ cdef extern from "ch_vdif_assembler_cython.hpp" namespace "ch_vdif_assembler":
int nt

void fill_efield(float complex *efield, int32_t *mask) except +
uint8_t* get_buf() except +


cdef cppclass cython_assembler:
Expand Down