Pothos  0.3.0-ga8f2d4e2
The Pothos dataflow programming software suite
 All Classes Namespaces Files Functions Variables Typedefs Friends Macros
InputPortImpl.hpp
Go to the documentation of this file.
1 
11 #pragma once
13 #include <mutex> //lock_guard
14 
15 inline int Pothos::InputPort::index(void) const
16 {
17  return _index;
18 }
19 
20 inline const std::string &Pothos::InputPort::name(void) const
21 {
22  return _name;
23 }
24 
25 inline const Pothos::DType &Pothos::InputPort::dtype(void) const
26 {
27  return _dtype;
28 }
29 
30 inline const std::string &Pothos::InputPort::domain(void) const
31 {
32  return _domain;
33 }
34 
36 {
37  return _buffer;
38 }
39 
40 inline size_t Pothos::InputPort::elements(void) const
41 {
42  return _elements;
43 }
44 
45 inline unsigned long long Pothos::InputPort::totalElements(void) const
46 {
47  return _totalElements;
48 }
49 
50 inline unsigned long long Pothos::InputPort::totalBuffers(void) const
51 {
52  return _totalBuffers;
53 }
54 
55 inline unsigned long long Pothos::InputPort::totalLabels(void) const
56 {
57  return _totalLabels;
58 }
59 
60 inline unsigned long long Pothos::InputPort::totalMessages(void) const
61 {
62  return _totalMessages;
63 }
64 
66 {
67  return _labelIter;
68 }
69 
70 inline void Pothos::InputPort::consume(const size_t numElements)
71 {
72  _pendingElements += numElements;
73 }
74 
76 {
77  return not this->asyncMessagesEmpty();
78 }
79 
80 inline bool Pothos::InputPort::isSlot(void) const
81 {
82  return _isSlot;
83 }
84 
86 {
87  auto msg = this->asyncMessagesPop();
88  _totalMessages++;
89  _workEvents++;
90  return msg;
91 }
92 
93 inline void Pothos::InputPort::removeLabel(const Label &label)
94 {
95  for (auto it = _inlineMessages.begin(); it != _inlineMessages.end(); it++)
96  {
97  if (*it == label)
98  {
99  _inlineMessages.erase(it);
100  _labelIter = _inlineMessages;
101  _totalLabels++;
102  _workEvents++;
103  return;
104  }
105  }
106 }
107 
108 inline void Pothos::InputPort::setReserve(const size_t numElements)
109 {
110  _reserveElements = numElements;
111  _workEvents++;
112 }
113 
114 inline bool Pothos::InputPort::asyncMessagesEmpty(void)
115 {
116  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
117  return _asyncMessages.empty();
118 }
119 
120 inline Pothos::Object Pothos::InputPort::asyncMessagesPop(void)
121 {
122  std::lock_guard<Util::SpinLock> lock(_asyncMessagesLock);
123  if (_asyncMessages.empty()) return Pothos::Object();
124  auto msg = _asyncMessages.front().first;
125  _asyncMessages.pop_front();
126  return msg;
127 }
128 
129 inline void Pothos::InputPort::inlineMessagesPush(const Pothos::Label &label)
130 {
131  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
132  if (_inputInlineMessages.full()) _inputInlineMessages.set_capacity(_inputInlineMessages.capacity()*2);
133  _inputInlineMessages.push_back(label);
134 }
135 
136 inline void Pothos::InputPort::inlineMessagesClear(void)
137 {
138  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
139  _inputInlineMessages.clear();
140  _inlineMessages.clear();
141 }
142 
143 inline void Pothos::InputPort::bufferAccumulatorFront(Pothos::BufferChunk &buff)
144 {
145  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
146  while (not _inputInlineMessages.empty())
147  {
148  const auto &front = _inputInlineMessages.front();
149  _inlineMessages.push_back(front.toAdjusted(1, this->dtype().size()));
150  _inputInlineMessages.pop_front();
151  }
152  buff = _bufferAccumulator.front();
153 }
154 
155 inline void Pothos::InputPort::bufferAccumulatorPush(const BufferChunk &buffer)
156 {
157  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
158  this->bufferAccumulatorPushNoLock(buffer);
159 }
160 
161 inline void Pothos::InputPort::bufferAccumulatorRequire(const size_t numBytes)
162 {
163  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
164  _bufferAccumulator.require(numBytes);
165 }
166 
167 inline void Pothos::InputPort::bufferAccumulatorClear(void)
168 {
169  std::lock_guard<Util::SpinLock> lock(_bufferAccumulatorLock);
170  _bufferAccumulator = BufferAccumulator();
171 }
Definition: Label.hpp:23
Definition: Label.hpp:86
unsigned long long totalBuffers(void) const
Definition: InputPortImpl.hpp:50
void setReserve(const size_t numElements)
Definition: InputPortImpl.hpp:108
void removeLabel(const Label &label)
Definition: InputPortImpl.hpp:93
int index(void) const
Definition: InputPortImpl.hpp:15
bool isSlot(void) const
Definition: InputPortImpl.hpp:80
bool hasMessage(void)
Does the specified input port have an asynchronous message available?
Definition: InputPortImpl.hpp:75
const std::string & name(void) const
Get the string name identifier for this port.
Definition: InputPortImpl.hpp:20
const LabelIteratorRange & labels(void) const
Definition: InputPortImpl.hpp:65
const BufferChunk & buffer(void) const
Definition: InputPortImpl.hpp:35
unsigned long long totalMessages(void) const
Definition: InputPortImpl.hpp:60
Definition: Object.hpp:55
unsigned long long totalLabels(void) const
Definition: InputPortImpl.hpp:55
Object popMessage(void)
Definition: InputPortImpl.hpp:85
const std::string & domain(void) const
Get the domain information for this port.
Definition: InputPortImpl.hpp:30
size_t elements(void) const
Definition: InputPortImpl.hpp:40
unsigned long long totalElements(void) const
Definition: InputPortImpl.hpp:45
const DType & dtype(void) const
Get the data type information for this port.
Definition: InputPortImpl.hpp:25
Definition: BufferChunk.hpp:26
void consume(const size_t numElements)
Definition: InputPortImpl.hpp:70
Definition: DType.hpp:38