Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data streamer 1 #313

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions DeviceAdapters/DemoCamera/DemoCamera.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const char* g_DADeviceName = "D-DA";
const char* g_DA2DeviceName = "D-DA2";
const char* g_GalvoDeviceName = "DGalvo";
const char* g_MagnifierDeviceName = "DOptovar";
const char* g_DataStreamerDeviceName = "DDataStreamer";
const char* g_HubDeviceName = "DHub";

// constants for naming pixel types (allowed values of the "PixelType" property)
Expand Down Expand Up @@ -89,6 +90,7 @@ MODULE_API void InitializeModuleData()
RegisterDevice(g_DA2DeviceName, MM::SignalIODevice, "Demo DA-2");
RegisterDevice(g_MagnifierDeviceName, MM::MagnifierDevice, "Demo Optovar");
RegisterDevice(g_GalvoDeviceName, MM::GalvoDevice, "Demo Galvo");
RegisterDevice(g_DataStreamerDeviceName, MM::DataStreamerDevice, "Demo DataStreamer");
RegisterDevice("TransposeProcessor", MM::ImageProcessorDevice, "TransposeProcessor");
RegisterDevice("ImageFlipX", MM::ImageProcessorDevice, "ImageFlipX");
RegisterDevice("ImageFlipY", MM::ImageProcessorDevice, "ImageFlipY");
Expand Down Expand Up @@ -167,6 +169,11 @@ MODULE_API MM::Device* CreateDevice(const char* deviceName)
// create Galvo
return new DemoGalvo();
}
else if (strcmp(deviceName, g_DataStreamerDeviceName) == 0)
{
// create DataStreamer
return new DemoDataStreamer();
}

else if(strcmp(deviceName, "TransposeProcessor") == 0)
{
Expand Down Expand Up @@ -4669,6 +4676,239 @@ bool DemoGalvo::PointInTriangle(Point p, Point p0, Point p1, Point p2)
return s > 0 && t > 0 && (s + t) < A;
}

///////////////////////////////////////////////////////////
// DemoDataStreamer
DemoDataStreamer::DemoDataStreamer() :
mockDataSize_(1024),
acqPeriod_(1000),
procPeriod_(1000),
errorGetBufferSizeAt_(65535),
errorGetBufferAt_(65535),
errorProcessBufferAt_(65535),
initialized_(false)
{
// parent ID display
CreateHubIDProperty();
}

DemoDataStreamer::~DemoDataStreamer()
{
Shutdown();
}

void DemoDataStreamer::GetName(char* pszName) const {
CDeviceUtils::CopyLimitedString(pszName, g_DataStreamerDeviceName);
}

bool DemoDataStreamer::Busy() {
return false;
}

int DemoDataStreamer::Initialize()
{

SetErrorText(errorCodeGetBufferSize, "GetBufferSize error message");
SetErrorText(errorCodeGetBuffer, "GetBuffer error message");
SetErrorText(errorCodeProcessBuffer, "ProcessBuffer error message");

int ret;
ret = CreateFloatProperty("Average data value", NAN, false);
assert(ret == DEVICE_OK);

CPropertyAction* pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnAcquisitionPeriod);
ret = CreateIntegerProperty("Acquisition period in ms", acqPeriod_, false, pAct);
assert(ret == DEVICE_OK);
SetPropertyLimits("Acquisition period in ms", 0, 10000);

pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnProcessingPeriod);
ret = CreateIntegerProperty("Processing period in ms", procPeriod_, false, pAct);
assert(ret == DEVICE_OK);
SetPropertyLimits("Processing period in ms", 0, 10000);

pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnGenerateGetBufferSizeErrorAt);
ret = CreateIntegerProperty("Generate GetBufferSize error at", errorGetBufferSizeAt_, false, pAct);
assert(ret == DEVICE_OK);
SetPropertyLimits("Generate GetBufferSize error at", 1, 65535);

pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnGenerateGetBufferErrorAt);
ret = CreateIntegerProperty("Generate GetBuffer error at", errorGetBufferAt_, false, pAct);
assert(ret == DEVICE_OK);
SetPropertyLimits("Generate GetBuffer error at", 1, 65535);

pAct = pAct = new CPropertyAction(this, &DemoDataStreamer::OnGenerateProcessBufferErrorAt);
ret = CreateIntegerProperty("Generate ProcessBuffer error at", errorProcessBufferAt_, false, pAct);
assert(ret == DEVICE_OK);
SetPropertyLimits("Generate ProcessBuffer error at", 1, 65535);

initialized_ = true;
return DEVICE_OK;
}

int DemoDataStreamer::Shutdown() {
initialized_ = false;
return DEVICE_OK;
}

int DemoDataStreamer::StartStream() {
// calls to hardware should be implemented here
counter_ = 1;
int ret;
ret = this->StartDataStreamerThreads(); // this line must be present in every StartStream implementation
return ret;
}

int DemoDataStreamer::StopStream() {
// calls to hardware should be implemented here
int ret;
ret = this->StopDataStreamerThreads(); // this line must be present in every StopStream implementation
return ret;
}

int DemoDataStreamer::GetBufferSize(unsigned& dataBufferSize) {
if (counter_ == errorGetBufferSizeAt_) return errorCodeGetBufferSize;
dataBufferSize = mockDataSize_;
return DEVICE_OK;
}

std::unique_ptr<char[]> DemoDataStreamer::GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitStatus) {

if (counter_ == errorGetBufferAt_) {
actualDataBufferSize = 0;
exitStatus = errorCodeGetBuffer;
return 0;
}

if (expectedDataBufferSize <= mockDataSize_) {
actualDataBufferSize = expectedDataBufferSize;
}
else {
actualDataBufferSize = mockDataSize_;
}

// allocate a new data array and put data in it
std::unique_ptr<char[]> data(new char[actualDataBufferSize]);
int* ptr = (int*)data.get();
for (size_t ii = 0; ii < actualDataBufferSize/4; ii++) {
*ptr = counter_;
ptr++;
}
counter_++;
exitStatus = DEVICE_OK;
Sleep(acqPeriod_);

return data;
}

int DemoDataStreamer::ProcessBuffer(std::unique_ptr<char[]>& pDataBuffer, unsigned dataSize) {
if (counter_ == errorProcessBufferAt_) return errorCodeProcessBuffer;
double ave = 0;
int* ptr = (int*)pDataBuffer.get();
for (size_t ii = 0; ii < dataSize/4; ii++) {
ave += *ptr;
ptr++;
}
ave = ave / (dataSize/4);
SetProperty("Average data value", to_string(ave).c_str());
Sleep(procPeriod_);
return DEVICE_OK;
}

/**
* Handles "Acquisition period in ms" property.
*/
int DemoDataStreamer::OnAcquisitionPeriod(MM::PropertyBase* pProp, MM::ActionType eAct)
{
if (eAct == MM::BeforeGet)
{
pProp->Set(acqPeriod_);
}
else if (eAct == MM::AfterSet)
{
if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING;
long newPeriod;
pProp->Get(newPeriod);
if (newPeriod>pProp->GetUpperLimit() || newPeriod<pProp->GetLowerLimit())
{
pProp->Set(acqPeriod_); // revert
return DEVICE_INVALID_PROPERTY_VALUE;
}
acqPeriod_ = newPeriod;
}
return DEVICE_OK;
}

/**
* Handles "Acquisition period in ms" property.
*/
int DemoDataStreamer::OnProcessingPeriod(MM::PropertyBase* pProp, MM::ActionType eAct)
{
if (eAct == MM::BeforeGet)
{
pProp->Set(procPeriod_);
}
else if (eAct == MM::AfterSet)
{
if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING;
long newPeriod;
pProp->Get(newPeriod);
if (newPeriod > pProp->GetUpperLimit() || newPeriod < pProp->GetLowerLimit())
{
pProp->Set(procPeriod_); // revert
return DEVICE_INVALID_PROPERTY_VALUE;
}
procPeriod_ = newPeriod;
}
return DEVICE_OK;
}

int DemoDataStreamer::OnGenerateGetBufferSizeErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct)
{
if (eAct == MM::BeforeGet)
{
pProp->Set(errorGetBufferSizeAt_);
}
else if (eAct == MM::AfterSet)
{
if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING;
long newval;
pProp->Get(newval);
errorGetBufferSizeAt_ = newval;
}
return DEVICE_OK;
}

int DemoDataStreamer::OnGenerateGetBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct)
{
if (eAct == MM::BeforeGet)
{
pProp->Set(errorGetBufferAt_);
}
else if (eAct == MM::AfterSet)
{
if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING;
long newval;
pProp->Get(newval);
errorGetBufferAt_ = newval;
}
return DEVICE_OK;
}

int DemoDataStreamer::OnGenerateProcessBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct)
{
if (eAct == MM::BeforeGet)
{
pProp->Set(errorProcessBufferAt_);
}
else if (eAct == MM::AfterSet)
{
if (this->IsStreaming()) return DEVICE_DATASTREAMER_BUSY_ACQUIRING;
long newval;
pProp->Get(newval);
errorProcessBufferAt_ = newval;
}
return DEVICE_OK;
}

////////// BEGINNING OF POORLY ORGANIZED CODE //////////////
////////// CLEANUP NEEDED ////////////////////////////

Expand Down
43 changes: 43 additions & 0 deletions DeviceAdapters/DemoCamera/DemoCamera.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <algorithm>
#include <stdint.h>
#include <future>
#include <fstream>

//////////////////////////////////////////////////////////////////////////////
// Error codes
Expand Down Expand Up @@ -1194,6 +1195,48 @@ class DemoGalvo : public CGalvoBase<DemoGalvo>, ImgManipulator
double vMaxY_;
};

//////////////////////////////////////////////////////////////////////////////
// DemoDataStreamer class
// Simulation of data streamer device
//////////////////////////////////////////////////////////////////////////////
class DemoDataStreamer : public CDataStreamerBase<DemoDataStreamer>
{
public:
DemoDataStreamer();
~DemoDataStreamer();

int Initialize();
int Shutdown();
void GetName(char* pszName) const;
bool Busy();

int StartStream();
int StopStream();
int GetBufferSize(unsigned& dataBufferSize);
std::unique_ptr<char[]> GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitStatus);
int ProcessBuffer(std::unique_ptr<char[]>& pDataBuffer, unsigned dataSize);

// action interface
// ----------------
int OnAcquisitionPeriod(MM::PropertyBase* pProp, MM::ActionType eAct);
int OnProcessingPeriod(MM::PropertyBase* pProp, MM::ActionType eAct);
int OnGenerateGetBufferSizeErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct);
int OnGenerateGetBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct);
int OnGenerateProcessBufferErrorAt(MM::PropertyBase* pProp, MM::ActionType eAct);

private:
bool initialized_;
unsigned mockDataSize_;
unsigned counter_;
long acqPeriod_;
long procPeriod_;
long errorGetBufferSizeAt_;
long errorGetBufferAt_;
long errorProcessBufferAt_;
const int errorCodeGetBufferSize = 901;
const int errorCodeGetBuffer = 902;
const int errorCodeProcessBuffer = 903;
};


#endif //_DEMOCAMERA_H_
1 change: 1 addition & 0 deletions MMCore/CoreUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ inline std::string ToString(const MM::DeviceType d)
case MM::SLMDevice: return "SLM";
case MM::HubDevice: return "Hub";
case MM::GalvoDevice: return "Galvo";
case MM::DataStreamerDevice: return "DataStreamer";
}
return "Invalid";
}
Expand Down
41 changes: 41 additions & 0 deletions MMCore/Devices/DataStreamerInstance.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// PROJECT: Micro-Manager
// SUBSYSTEM: MMCore
//
// DESCRIPTION: Galvo device instance wrapper
//
// COPYRIGHT: University of California, San Francisco, 2014,
// All Rights reserved
//
// LICENSE: This file is distributed under the "Lesser GPL" (LGPL) license.
// License text is included with the source distribution.
//
// This file is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
// INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES.
//
// AUTHOR: Mark Tsuchida

#include "DataStreamerInstance.h"


int DataStreamerInstance::GetBufferSize(unsigned& dataBufferSiize) { return GetImpl()->GetBufferSize(dataBufferSiize); }
std::unique_ptr<char[]> DataStreamerInstance::GetBuffer(unsigned expectedDataBufferSize, unsigned& actualDataBufferSize, int& exitStatus) { return GetImpl()->GetBuffer(expectedDataBufferSize,actualDataBufferSize,exitStatus); }
int DataStreamerInstance::ProcessBuffer(std::unique_ptr<char[]>& pDataBuffer, unsigned dataSize) { return GetImpl()->ProcessBuffer(pDataBuffer, dataSize); }
int DataStreamerInstance::StartStream() { return GetImpl()->StartStream(); }
int DataStreamerInstance::StopStream() { return GetImpl()->StopStream(); }
bool DataStreamerInstance::GetOverflowStatus() { return GetImpl()->GetOverflowStatus(); }
int DataStreamerInstance::ResetOverflowStatus() { return GetImpl()->ResetOverflowStatus(); }
int DataStreamerInstance::GetAcquisitionExitStatus() { return GetImpl()->GetAcquisitionExitStatus(); }
int DataStreamerInstance::GetProcessingExitStatus() { return GetImpl()->GetProcessingExitStatus(); }
int DataStreamerInstance::SetAcquisitionPause(bool pause) { return GetImpl()->SetAcquisitionPause(pause); }
bool DataStreamerInstance::GetAcquisitionPause() { return GetImpl()->GetAcquisitionPause(); }
bool DataStreamerInstance::IsStreaming() { return GetImpl()->IsStreaming(); }
int DataStreamerInstance::SetStreamParameters(bool stopOnOverflow, bool pauseAcquisitionBeforeOverflow, int numberOfBuffers, int durationUs, int updatePeriodUs) { return GetImpl()->SetStreamParameters(stopOnOverflow, pauseAcquisitionBeforeOverflow, numberOfBuffers, durationUs, updatePeriodUs); }
int DataStreamerInstance::GetStreamParameters(bool& stopOnOverflow, bool& pauseAcquisitionBeforeOverflow, int& numberOfBuffers, int& durationUs, int& updatePeriodUs) { return GetImpl()->GetStreamParameters(stopOnOverflow,pauseAcquisitionBeforeOverflow,numberOfBuffers,durationUs,updatePeriodUs); }
int DataStreamerInstance::SetCircularAcquisitionBufferCapacity(int capacity) { return GetImpl()->SetCircularAcquisitionBufferCapacity(capacity); }
int DataStreamerInstance::GetCircularAcquisitionBufferCapacity() { return GetImpl()->GetCircularAcquisitionBufferCapacity(); }

Loading