-
Notifications
You must be signed in to change notification settings - Fork 0
/
EventProcessor.cc
99 lines (84 loc) · 1.91 KB
/
EventProcessor.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// -*- C++ -*-
//
// Package: Package
// Class : EventProcessor
//
// Implementation:
// [Notes on implementation]
//
// Original Author: Chris Jones
// Created: Thu, 23 May 2013 21:43:32 GMT
// $Id$
//
// system include files
#include <vector>
#include <tbb/task.h>
// user include files
#include "EventProcessor.h"
#include "Stream.h"
#include "Coordinator.h"
#include "RunCache.h"
//
// constants, enums and typedefs
//
//
// static data member definitions
//
//
// constructors and destructor
//
EventProcessor::EventProcessor(Source* iSource, GlobalWatcher* iWatcher, unsigned int iNStreams, unsigned int iNRuns):
m_source(iSource),
m_watcher(iWatcher),
m_nStreams(iNStreams),
m_nRuns(iNRuns)
{
}
// EventProcessor::EventProcessor(const EventProcessor& rhs)
// {
// // do actual copying here;
// }
//EventProcessor::~EventProcessor()
//{
//}
//
// assignment operators
//
// const EventProcessor& EventProcessor::operator=(const EventProcessor& rhs)
// {
// //An exception safe implementation is
// EventProcessor temp(rhs);
// swap(rhs);
//
// return *this;
// }
//
// member functions
//
void
EventProcessor::processAll()
{
auto eventLoopWaitTask = new (tbb::task::allocate_root()) tbb::empty_task{};
eventLoopWaitTask->increment_ref_count();
std::vector<std::shared_ptr<Stream>> streams;
streams.reserve(m_nStreams);
RunCache runHandler(m_nRuns);
Coordinator coordinator(eventLoopWaitTask,m_source, m_watcher, runHandler);
for(unsigned int i=0; i<m_nStreams;++i) {
std::shared_ptr<Stream> p{ new Stream{i,m_watcher}};
streams.push_back(p);
eventLoopWaitTask->increment_ref_count();
auto t = coordinator.assignWorkTo(p.get());
if(nullptr != t) {
tbb::task::enqueue(*t);
}
}
eventLoopWaitTask->wait_for_all();
tbb::task::destroy(*eventLoopWaitTask);
}
//
// const member functions
//
//
// static member functions
//