Pothos  0.3.3-g32d3017c
The Pothos dataflow programming software suite
 All Classes Namespaces Files Functions Variables Typedefs Friends Macros
InputPortImpl.hpp
Go to the documentation of this file.
1 
11 #pragma once
13 #include <mutex> //lock_guard
14 
15 inline int Pothos::InputPort::index(void) const
16 {
17  return _index;
18 }
19 
20 inline const std::string &Pothos::InputPort::name(void) const
21 {
22  return _name;
23 }
24 
25 inline const Pothos::DType &Pothos::InputPort::dtype(void) const
26 {
27  return _dtype;
28 }
29 
30 inline const std::string &Pothos::InputPort::domain(void) const
31 {
32  return _domain;
33 }
34 
36 {
37  return _buffer;
38 }
39 
40 inline size_t Pothos::InputPort::elements(void) const
41 {
42  return _elements;
43 }
44 
45 inline unsigned long long Pothos::InputPort::totalElements(void) const
46 {
47  return _totalElements;
48 }
49 
50 inline unsigned long long Pothos::InputPort::totalBuffers(void) const
51 {
52  return _totalBuffers;
53 }
54 
55 inline unsigned long long Pothos::InputPort::totalLabels(void) const
56 {
57  return _totalLabels;
58 }
59 
60 inline unsigned long long Pothos::InputPort::totalMessages(void) const
61 {
62  return _totalMessages;
63 }
64 
66 {
67  return _labelIter;
68 }
69 
70 inline void Pothos::InputPort::consume(const size_t numElements)
71 {
72  _pendingElements += numElements;
73 }
74 
76 {
77  return not this->asyncMessagesEmpty();
78 }
79 
80 inline bool Pothos::InputPort::isSlot(void) const
81 {
82  return _isSlot;
83 }
84 
86 {
87  auto msg = this->asyncMessagesPop();
88  _totalMessages++;
89  _workEvents++;
90  return msg;
91 }
92 
93 inline void Pothos::InputPort::removeLabel(const Label &label)
94 {
95  for (auto it = _inlineMessages.begin(); it != _inlineMessages.end(); it++)
96  {
97  if (*it == label)
98  {
99  _inlineMessages.erase(it);
100  _labelIter = _inlineMessages;
101  _totalLabels++;
102  _workEvents++;
103  return;
104  }
105  }
106 }
107 
108 inline void Pothos::InputPort::setReserve(const size_t numElements)
109 {
110  //only mark this change when setting a larger reserve
111  if (numElements > _reserveElements) _workEvents++;
112 
113  _reserveElements = numElements;
114 }
115 
116 inline bool Pothos::InputPort::asyncMessagesEmpty(void)
117 {
118  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
119  return _asyncMessages.empty();
120 }
121 
122 inline Pothos::Object Pothos::InputPort::asyncMessagesPop(void)
123 {
124  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
125  if (_asyncMessages.empty()) return Pothos::Object();
126  auto msg = _asyncMessages.front().first;
127  _asyncMessages.pop_front();
128  return msg;
129 }
130 
131 inline void Pothos::InputPort::inlineMessagesPush(const Pothos::Label &label)
132 {
133  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
134  if (_inputInlineMessages.full()) _inputInlineMessages.set_capacity(_inputInlineMessages.capacity()*2);
135  _inputInlineMessages.push_back(label);
136 }
137 
138 inline void Pothos::InputPort::inlineMessagesClear(void)
139 {
140  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
141  _inputInlineMessages.clear();
142  _inlineMessages.clear();
143 }
144 
145 inline void Pothos::InputPort::bufferAccumulatorFront(Pothos::BufferChunk &buff)
146 {
147  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
148  while (not _inputInlineMessages.empty())
149  {
150  const auto &front = _inputInlineMessages.front();
151  _inlineMessages.push_back(front.toAdjusted(1, this->dtype().size()));
152  _inputInlineMessages.pop_front();
153  }
154  buff = _bufferAccumulator.front();
155 }
156 
157 inline void Pothos::InputPort::bufferAccumulatorPush(const BufferChunk &buffer)
158 {
159  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
160  this->bufferAccumulatorPushNoLock(buffer);
161 }
162 
163 inline void Pothos::InputPort::bufferAccumulatorRequire(const size_t numBytes)
164 {
165  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
166  _bufferAccumulator.require(numBytes);
167 }
168 
169 inline void Pothos::InputPort::bufferAccumulatorClear(void)
170 {
171  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
172  _bufferAccumulator = BufferAccumulator();
173 }
Definition: Label.hpp:23
Definition: Label.hpp:86
unsigned long long totalBuffers(void) const
Definition: InputPortImpl.hpp:50
void setReserve(const size_t numElements)
Definition: InputPortImpl.hpp:108
void removeLabel(const Label &label)
Definition: InputPortImpl.hpp:93
int index(void) const
Definition: InputPortImpl.hpp:15
bool isSlot(void) const
Definition: InputPortImpl.hpp:80
bool hasMessage(void)
Does the specified input port have an asynchronous message available?
Definition: InputPortImpl.hpp:75
const std::string & name(void) const
Get the string name identifier for this port.
Definition: InputPortImpl.hpp:20
const LabelIteratorRange & labels(void) const
Definition: InputPortImpl.hpp:65
const BufferChunk & buffer(void) const
Definition: InputPortImpl.hpp:35
unsigned long long totalMessages(void) const
Definition: InputPortImpl.hpp:60
Definition: Object.hpp:55
unsigned long long totalLabels(void) const
Definition: InputPortImpl.hpp:55
Object popMessage(void)
Definition: InputPortImpl.hpp:85
const std::string & domain(void) const
Get the domain information for this port.
Definition: InputPortImpl.hpp:30
size_t elements(void) const
Definition: InputPortImpl.hpp:40
unsigned long long totalElements(void) const
Definition: InputPortImpl.hpp:45
const DType & dtype(void) const
Get the data type information for this port.
Definition: InputPortImpl.hpp:25
Definition: BufferChunk.hpp:26
void consume(const size_t numElements)
Definition: InputPortImpl.hpp:70
Definition: DType.hpp:38