-
Notifications
You must be signed in to change notification settings - Fork 3
/
ORDataProcManager.cc
194 lines (158 loc) · 5.38 KB
/
ORDataProcManager.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// ORDataProcManager.cc
#include "ORDataProcManager.hh"
#include "ORLogger.hh"
#include "ORSocketReader.hh"
#include "ORVWriter.hh"
#include <vector>
ORDataProcManager::ORDataProcManager(ORVReader* reader, ORRunDataProcessor* runDataProc, ORHeaderProcessor* headerProc)
{
// the optional runDataProc argument allows the user to pass in an
// overloaded ORRunDataProcessor
if (runDataProc != NULL) {
fRunDataProcessor = runDataProc;
fIOwnRunDataProcessor = false;
}
else {
fRunDataProcessor = new ORRunDataProcessor;
fIOwnRunDataProcessor = true;
}
// the optional headerProc argument allows the user to pass in a header
// in case he/she wants to change it after reading it in
if (headerProc != NULL) {
fHeaderProcessor = headerProc;
fIOwnHeaderProcessor = false;
}
else {
fHeaderProcessor = new ORHeaderProcessor;
fIOwnRunDataProcessor = true;
}
SetReader(reader);
SetRunContext(new ORRunContext);
/* Sets fRunContext. This class owns this object. */
if (dynamic_cast<ORSocketReader*>(fReader) != NULL) {
fRunDataProcessor->IncreaseHeartbeatVerbosity();
}
fRunAsDaemon = false;
}
ORDataProcManager::~ORDataProcManager()
{
if(fIOwnRunDataProcessor) delete fRunDataProcessor;
if(fIOwnHeaderProcessor) delete fHeaderProcessor;
/* This class owns fRunContext. */
delete fRunContext;
}
ORDataProcManager::EReturnCode ORDataProcManager::ProcessDataStream()
{
if (fReader == NULL) {
ORLog(kError) << "ProcessDataStream(): fReader == NULL: "
<< "you must set a ORVReader before attempting to process data"
<< std::endl;
return kAlarm;
}
EReturnCode retCode = StartProcessing();
if (retCode >= kFailure) return kAlarm;
ORLog(kDebug) << "ProcessDataStream(): calling fReader->Open()..." << std::endl;
if (!fReader->Open()) return kAlarm;
while (1) {
retCode = ProcessRun();
if (retCode >= kAlarm) return kAlarm;
if (retCode >= kBreak || !fReader->OKToRead()) break;
}
ORLog(kDebug) << "ProcessDataStream(): calling fReader->Close()..." << std::endl;
fReader->Close();
return EndProcessing();
}
ORDataProcManager::EReturnCode ORDataProcManager::ProcessRun()
{
// always allow all processors to try to process a run
SetDoProcessRun();
EReturnCode retCode;
size_t nLongsMax = 1024;
std::vector<UInt_t> vecbuffer(nLongsMax);
Bool_t headerIsReadIn = false;
ORLog(kDebug) << "ProcessRun(): start reading records..." << std::endl;
while (fReader->ReadRecord(vecbuffer)) {
UInt_t* buffer = &vecbuffer[0];
// Check if it is a header
if(fHeaderProcessor->ProcessDataRecord(buffer) == kSuccess) {
// It is a header, perform the setup
// Set the default flag, header is not read in yet
headerIsReadIn = false;
fRunContext->SetMustSwap(fReader->MustSwap());
/* Also check to see if we can write to the reader. */
if (ORVWriter* theMonitor = dynamic_cast<ORVWriter*>(fReader)) {
fRunContext->SetWritableSocket(theMonitor);
} else {
fRunContext->SetWritableSocket(NULL);
}
// Load the header dictionary
ORLog(kDebug) << "ProcessRun(): loading dictionary..." << std::endl;
if (!fRunContext->LoadHeader(fHeaderProcessor->GetHeader(), fRunAsDaemon)) {
/* We have encountered a problem loading the header file. */
/* Kill Run, try going to the next run. */
ORLog(kError) << "ProcessRun(): Error loading header file. Stopping run." << std::endl;
break;
}
// Set all the IDs, dictionary
ORLog(kDebug) << "ProcessRun(): setting dataIDs..." << std::endl;
SetDataId();
SetDecoderDictionary();
headerIsReadIn = true;
// Read the next record
continue;
} // End checking if it is a header
if (!headerIsReadIn) break;
fRunContext->ResetRecordFlags();
if (!fRunAsDaemon) {
fRunDataProcessor->ProcessDataRecord(buffer);
}
if (fRunContext->GetState() <= ORRunContext::kStarting) {
// Starting a run
retCode = StartRun();
if (retCode >= kFailure) KillRun(); // but keep processing: skips to next run
if (retCode >= kAlarm) return kAlarm;
if (!fRunAsDaemon) {
fRunDataProcessor->OnStartRunComplete();
}
}
if (fDoProcessRun) {
// let all processors process the data record
if (ProcessDataRecord(buffer) >= kAlarm) return kAlarm;
}
if (fRunContext->GetState() == ORRunContext::kStopping) {
break;
}
if (TestCancel()) break;
fRunContext->fPacketNumber++;
}
// Set up Run Context
ORLog(kDebug) << "ProcessRun(): finished reading records..." << std::endl;
retCode = EndRun();
if (retCode >= kAlarm) return kAlarm;
if (!fRunAsDaemon) {
fRunDataProcessor->OnEndRunComplete();
}
if (TestCancel()) return kBreak;
return kSuccess;
}
/*
void ORDataProcManager::Handle(int)
{
ORLog(kWarning) << "Caught ctrl-c, trying to exit nicely" << std::endl;
EndRun();
fRunDataProcessor->OnEndRunComplete();
EndProcessing();
}*/
void ORDataProcManager::SetRunContext(ORRunContext* aContext)
{
fRunDataProcessor->SetRunContext(aContext);
ORCompoundDataProcessor::SetRunContext(aContext);
}
void ORDataProcManager::SetDataId()
{
// Not necessary to SetDataId of fHeaderProcessor -- it is always 0x0!
if (!fRunAsDaemon) {
fRunDataProcessor->SetDataId();
}
ORCompoundDataProcessor::SetDataId();
}