Pothos  0.4.0-gd11861cd
The Pothos dataflow programming software suite
 All Classes Namespaces Files Functions Variables Typedefs Friends Macros Pages
InputPortImpl.hpp
Go to the documentation of this file.
1 
11 #pragma once
13 #include <mutex> //lock_guard
14 
15 inline int Pothos::InputPort::index(void) const
16 {
17  return _index;
18 }
19 
20 inline const std::string &Pothos::InputPort::name(void) const
21 {
22  return _name;
23 }
24 
25 inline const Pothos::DType &Pothos::InputPort::dtype(void) const
26 {
27  return _dtype;
28 }
29 
30 inline const std::string &Pothos::InputPort::domain(void) const
31 {
32  return _domain;
33 }
34 
36 {
37  return _buffer;
38 }
39 
40 inline size_t Pothos::InputPort::elements(void) const
41 {
42  return _elements;
43 }
44 
45 inline unsigned long long Pothos::InputPort::totalElements(void) const
46 {
47  return _totalElements;
48 }
49 
50 inline unsigned long long Pothos::InputPort::totalBuffers(void) const
51 {
52  return _totalBuffers;
53 }
54 
55 inline unsigned long long Pothos::InputPort::totalLabels(void) const
56 {
57  return _totalLabels;
58 }
59 
60 inline unsigned long long Pothos::InputPort::totalMessages(void) const
61 {
62  return _totalMessages;
63 }
64 
66 {
67  return _labelIter;
68 }
69 
70 inline void Pothos::InputPort::consume(const size_t numElements)
71 {
72  _pendingElements += numElements;
73 }
74 
76 {
77  return not this->asyncMessagesEmpty();
78 }
79 
80 inline bool Pothos::InputPort::isSlot(void) const
81 {
82  return _isSlot;
83 }
84 
86 {
87  auto msg = this->asyncMessagesPop();
88  _totalMessages++;
89  _workEvents++;
90  return msg;
91 }
92 
94 {
95  return this->asyncMessagesPeek();
96 }
97 
98 inline void Pothos::InputPort::removeLabel(const Label &label)
99 {
100  for (auto it = _inlineMessages.begin(); it != _inlineMessages.end(); it++)
101  {
102  if (*it == label)
103  {
104  _inlineMessages.erase(it);
105  _labelIter = _inlineMessages;
106  _totalLabels++;
107  _workEvents++;
108  return;
109  }
110  }
111 }
112 
113 inline void Pothos::InputPort::setReserve(const size_t numElements)
114 {
115  //only mark this change when setting a larger reserve
116  if (numElements > _reserveElements) _workEvents++;
117 
118  _reserveElements = numElements;
119 }
120 
121 inline bool Pothos::InputPort::asyncMessagesEmpty(void)
122 {
123  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
124  return _asyncMessages.empty();
125 }
126 
127 inline Pothos::Object Pothos::InputPort::asyncMessagesPop(void)
128 {
129  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
130  if (_asyncMessages.empty()) return Pothos::Object();
131  auto msg = _asyncMessages.front().first;
132  _asyncMessages.pop_front();
133  return msg;
134 }
135 
136 inline Pothos::Object Pothos::InputPort::asyncMessagesPeek(void)
137 {
138  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
139  if (_asyncMessages.empty()) return Pothos::Object();
140  return _asyncMessages.front().first;
141 }
142 
143 inline void Pothos::InputPort::inlineMessagesPush(const Pothos::Label &label)
144 {
145  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
146  if (_inputInlineMessages.full()) _inputInlineMessages.set_capacity(_inputInlineMessages.capacity()*2);
147  _inputInlineMessages.push_back(label);
148 }
149 
150 inline void Pothos::InputPort::inlineMessagesClear(void)
151 {
152  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
153  _inputInlineMessages.clear();
154  _inlineMessages.clear();
155 }
156 
157 inline void Pothos::InputPort::bufferAccumulatorFront(Pothos::BufferChunk &buff)
158 {
159  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
160  while (not _inputInlineMessages.empty())
161  {
162  const auto &front = _inputInlineMessages.front();
163  _inlineMessages.push_back(front.toAdjusted(1, this->dtype().size()));
164  _inputInlineMessages.pop_front();
165  }
166  buff = _bufferAccumulator.front();
167 }
168 
169 inline void Pothos::InputPort::bufferAccumulatorPush(const BufferChunk &buffer)
170 {
171  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
172  this->bufferAccumulatorPushNoLock(buffer);
173 }
174 
175 inline void Pothos::InputPort::bufferAccumulatorRequire(const size_t numBytes)
176 {
177  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
178  _bufferAccumulator.require(numBytes);
179 }
180 
181 inline void Pothos::InputPort::bufferAccumulatorClear(void)
182 {
183  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
184  _bufferAccumulator = BufferAccumulator();
185 }
Definition: Label.hpp:23
Definition: Label.hpp:86
unsigned long long totalBuffers(void) const
Definition: InputPortImpl.hpp:50
void setReserve(const size_t numElements)
Definition: InputPortImpl.hpp:113
void removeLabel(const Label &label)
Definition: InputPortImpl.hpp:98
int index(void) const
Definition: InputPortImpl.hpp:15
bool isSlot(void) const
Definition: InputPortImpl.hpp:80
bool hasMessage(void)
Does the specified input port have an asynchronous message available?
Definition: InputPortImpl.hpp:75
const std::string & name(void) const
Get the string name identifier for this port.
Definition: InputPortImpl.hpp:20
const LabelIteratorRange & labels(void) const
Definition: InputPortImpl.hpp:65
const BufferChunk & buffer(void) const
Definition: InputPortImpl.hpp:35
unsigned long long totalMessages(void) const
Definition: InputPortImpl.hpp:60
Definition: Object.hpp:55
unsigned long long totalLabels(void) const
Definition: InputPortImpl.hpp:55
Object popMessage(void)
Definition: InputPortImpl.hpp:85
const std::string & domain(void) const
Get the domain information for this port.
Definition: InputPortImpl.hpp:30
size_t elements(void) const
Definition: InputPortImpl.hpp:40
Object peekMessage(void)
Definition: InputPortImpl.hpp:93
unsigned long long totalElements(void) const
Definition: InputPortImpl.hpp:45
const DType & dtype(void) const
Get the data type information for this port.
Definition: InputPortImpl.hpp:25
Definition: BufferChunk.hpp:30
void consume(const size_t numElements)
Definition: InputPortImpl.hpp:70
Definition: DType.hpp:38