Pothos  0.7.0-gf7fbae99
The Pothos dataflow programming software suite
InputPortImpl.hpp
Go to the documentation of this file.
1 
11 #pragma once
13 #include <mutex> //lock_guard
14 
15 inline int Pothos::InputPort::index(void) const
16 {
17  return _index;
18 }
19 
20 inline const std::string &Pothos::InputPort::name(void) const
21 {
22  return _name;
23 }
24 
25 inline const Pothos::DType &Pothos::InputPort::dtype(void) const
26 {
27  return _dtype;
28 }
29 
30 inline const std::string &Pothos::InputPort::domain(void) const
31 {
32  return _domain;
33 }
34 
36 {
37  return _buffer;
38 }
39 
40 inline size_t Pothos::InputPort::elements(void) const
41 {
42  return _elements;
43 }
44 
45 inline unsigned long long Pothos::InputPort::totalElements(void) const
46 {
47  return _totalElements;
48 }
49 
50 inline unsigned long long Pothos::InputPort::totalBuffers(void) const
51 {
52  return _totalBuffers;
53 }
54 
55 inline unsigned long long Pothos::InputPort::totalLabels(void) const
56 {
57  return _totalLabels;
58 }
59 
60 inline unsigned long long Pothos::InputPort::totalMessages(void) const
61 {
62  return _totalMessages;
63 }
64 
66 {
67  return _labelIter;
68 }
69 
70 inline void Pothos::InputPort::consume(const size_t numElements)
71 {
72  _pendingElements += numElements;
73 }
74 
76 {
77  return std::move(_buffer);
78 }
79 
81 {
82  return not this->asyncMessagesEmpty();
83 }
84 
85 inline bool Pothos::InputPort::isSlot(void) const
86 {
87  return _isSlot;
88 }
89 
91 {
92  auto msg = this->asyncMessagesPop();
93  _totalMessages++;
94  _workEvents++;
95  return msg;
96 }
97 
99 {
100  return this->asyncMessagesPeek();
101 }
102 
103 inline void Pothos::InputPort::removeLabel(const Label &label)
104 {
105  for (auto it = _inlineMessages.begin(); it != _inlineMessages.end(); it++)
106  {
107  if (*it == label)
108  {
109  _inlineMessages.erase(it);
110  _labelIter = _inlineMessages;
111  _totalLabels++;
112  _workEvents++;
113  return;
114  }
115  }
116 }
117 
118 inline void Pothos::InputPort::setReserve(const size_t numElements)
119 {
120  //only mark this change when setting a larger reserve
121  if (numElements > _reserveElements) _workEvents++;
122 
123  _reserveElements = numElements;
124 }
125 
126 inline bool Pothos::InputPort::asyncMessagesEmpty(void)
127 {
128  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
129  return _asyncMessages.empty();
130 }
131 
132 inline Pothos::Object Pothos::InputPort::asyncMessagesPop(void)
133 {
134  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
135  if (_asyncMessages.empty()) return Pothos::Object();
136  auto msg = std::move(_asyncMessages.front().first);
137  _asyncMessages.pop_front();
138  return msg;
139 }
140 
141 inline Pothos::Object Pothos::InputPort::asyncMessagesPeek(void)
142 {
143  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
144  if (_asyncMessages.empty()) return Pothos::Object();
145  return _asyncMessages.front().first;
146 }
147 
148 inline void Pothos::InputPort::inlineMessagesPush(const Pothos::Label &label)
149 {
150  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
151  if (_inputInlineMessages.full()) _inputInlineMessages.set_capacity(_inputInlineMessages.capacity()*2);
152  _inputInlineMessages.push_back(label);
153 }
154 
155 inline void Pothos::InputPort::inlineMessagesClear(void)
156 {
157  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
158  _inputInlineMessages.clear();
159  _inlineMessages.clear();
160 }
161 
162 inline void Pothos::InputPort::bufferAccumulatorFront(Pothos::BufferChunk &buff)
163 {
164  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
165  while (not _inputInlineMessages.empty())
166  {
167  _inlineMessages.push_back(std::move(_inputInlineMessages.front()));
168  _inlineMessages.back().adjust(1, this->dtype().size());
169  _inputInlineMessages.pop_front();
170  }
171  buff = _bufferAccumulator.front();
172 }
173 
174 inline void Pothos::InputPort::bufferAccumulatorPush(const BufferChunk &buffer)
175 {
176  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
177  this->bufferAccumulatorPushNoLock(BufferChunk(buffer));
178 }
179 
180 inline void Pothos::InputPort::bufferAccumulatorRequire(const size_t numBytes)
181 {
182  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
183  _bufferAccumulator.require(numBytes);
184 }
185 
186 inline void Pothos::InputPort::bufferAccumulatorClear(void)
187 {
188  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
189  _bufferAccumulator = BufferAccumulator();
190 }
Definition: Label.hpp:23
const std::string & domain(void) const
Get the domain information for this port.
Definition: InputPortImpl.hpp:30
Definition: BufferAccumulator.hpp:31
Definition: Label.hpp:95
const BufferChunk & buffer(void) const
Definition: InputPortImpl.hpp:35
const std::string & name(void) const
Get the string name identifier for this port.
Definition: InputPortImpl.hpp:20
void setReserve(const size_t numElements)
Definition: InputPortImpl.hpp:118
void removeLabel(const Label &label)
Definition: InputPortImpl.hpp:103
const LabelIteratorRange & labels(void) const
Definition: InputPortImpl.hpp:65
const BufferChunk & front(void) const
Definition: BufferAccumulator.hpp:100
bool hasMessage(void)
Does the specified input port have an asynchronous message available?
Definition: InputPortImpl.hpp:80
unsigned long long totalBuffers(void) const
Definition: InputPortImpl.hpp:50
unsigned long long totalElements(void) const
Definition: InputPortImpl.hpp:45
unsigned long long totalLabels(void) const
Definition: InputPortImpl.hpp:55
int index(void) const
Definition: InputPortImpl.hpp:15
Definition: Object.hpp:47
void require(const size_t numBytes)
const DType & dtype(void) const
Get the data type information for this port.
Definition: InputPortImpl.hpp:25
bool isSlot(void) const
Definition: InputPortImpl.hpp:85
unsigned long long totalMessages(void) const
Definition: InputPortImpl.hpp:60
BufferChunk takeBuffer(void)
Definition: InputPortImpl.hpp:75
Object popMessage(void)
Definition: InputPortImpl.hpp:90
Object peekMessage(void)
Definition: InputPortImpl.hpp:98
Definition: BufferChunk.hpp:30
void consume(const size_t numElements)
Definition: InputPortImpl.hpp:70
Definition: DType.hpp:38
size_t elements(void) const
Definition: InputPortImpl.hpp:40