Pothos  0.7.0-gf7fbae99
The Pothos dataflow programming software suite
InputPort.hpp
Go to the documentation of this file.
1 
11 #pragma once
12 #include <Pothos/Config.hpp>
13 #include <Pothos/Object/Object.hpp>
19 #include <Pothos/Util/SpinLock.hpp>
20 #include <string>
21 
22 namespace Pothos {
23 
24 class WorkerActor;
25 class OutputPort;
26 
31 {
32 public:
33 
35  ~InputPort(void);
36 
42  int index(void) const;
43 
45  const std::string &name(void) const;
46 
48  const std::string &alias(void) const;
49 
51  void setAlias(const std::string &alias);
52 
54  const DType &dtype(void) const;
55 
57  const std::string &domain(void) const;
58 
63  const BufferChunk &buffer(void) const;
64 
69  size_t elements(void) const;
70 
76  unsigned long long totalElements(void) const;
77 
83  unsigned long long totalBuffers(void) const;
84 
90  unsigned long long totalLabels(void) const;
91 
97  unsigned long long totalMessages(void) const;
98 
100  bool hasMessage(void);
101 
107  const LabelIteratorRange &labels(void) const;
108 
117  void removeLabel(const Label &label);
118 
125  void consume(const size_t numElements);
126 
140  BufferChunk takeBuffer(void);
141 
147  Object popMessage(void);
148 
154  Object peekMessage(void);
155 
168  void setReserve(const size_t numElements);
169 
173  bool isSlot(void) const;
174 
182  void pushBuffer(const BufferChunk &buffer);
183 
190  void pushLabel(const Label &label);
191 
196  void pushMessage(const Object &message);
197 
202  void clear(void);
203 
204 private:
205  WorkerActor *_actor;
206 
207  //port configuration
208  bool _isSlot;
209  int _index;
210  std::string _name;
211  std::string _alias;
212  DType _dtype;
213  std::string _domain;
214 
215  //state set in pre-work
216  BufferChunk _buffer;
217  size_t _elements;
218  LabelIteratorRange _labelIter;
219 
220  //port stats
221  unsigned long long _totalElements;
222  unsigned long long _totalBuffers;
223  unsigned long long _totalLabels;
224  unsigned long long _totalMessages;
225 
226  //state changes from work
227  size_t _pendingElements;
228  size_t _reserveElements;
229 
230  //counts work actions which we will use to establish activity
231  size_t _workEvents;
232 
233  Util::SpinLock _asyncMessagesLock;
235 
236  Util::SpinLock _slotCallsLock;
238 
239  std::vector<Label> _inlineMessages; //user api structure
240  Util::RingDeque<Label> _inputInlineMessages; //shared structure
241 
242  Util::SpinLock _bufferAccumulatorLock;
243  BufferAccumulator _bufferAccumulator;
244 
245  std::vector<OutputPort *> _subscribers;
246 
248  void asyncMessagesPush(const Object &message, const BufferChunk &token = BufferChunk::null());
249  bool asyncMessagesEmpty(void);
250  Object asyncMessagesPop(void);
251  Object asyncMessagesPeek(void);
252  void asyncMessagesClear(void);
253 
255  void slotCallsPush(const Object &args, const BufferChunk &token);
256  bool slotCallsEmpty(void);
257  Object slotCallsPop(void);
258  void slotCallsClear(void);
259 
261  void inlineMessagesPush(const Label &label);
262  void inlineMessagesClear(void);
263 
265  void bufferAccumulatorFront(BufferChunk &);
266  void bufferAccumulatorPush(const BufferChunk &buffer);
267  void bufferAccumulatorPushNoLock(BufferChunk &&buffer);
268  void bufferAccumulatorPop(const size_t numBytes);
269  void bufferAccumulatorRequire(const size_t numBytes);
270  void bufferAccumulatorClear(void);
271 
273  void bufferLabelPush(
274  const bool enableMove,
275  std::vector<Label> &postedLabels,
276  Util::RingDeque<BufferChunk> &postedBuffers);
277 
278  InputPort(void);
279  InputPort(const InputPort &) = delete; // non construction-copyable
280  InputPort &operator=(const InputPort &) = delete; // non copyable
281  friend class WorkerActor;
282  friend class OutputPort;
283 };
284 
285 } //namespace Pothos
Definition: Label.hpp:23
Definition: BufferAccumulator.hpp:31
Definition: Label.hpp:95
#define POTHOS_API
Definition: Config.hpp:41
Definition: OutputPort.hpp:31
Definition: SpinLock.hpp:26
Definition: ArchiveEntry.hpp:20
Definition: Object.hpp:47
Definition: InputPort.hpp:30
Definition: RingDeque.hpp:29
Definition: BufferChunk.hpp:30
Definition: DType.hpp:38
static const BufferChunk & null(void)
Get a const reference to a null/empty BufferChunk.