Pothos  0.2.1-g9f04573d
The Pothos dataflow programming software suite
 All Classes Namespaces Files Functions Variables Typedefs Friends Macros
InputPort.hpp
Go to the documentation of this file.
1 
11 #pragma once
12 #include <Pothos/Config.hpp>
13 #include <Pothos/Object/Object.hpp>
19 #include <Pothos/Util/SpinLock.hpp>
20 #include <string>
21 
22 namespace Pothos {
23 
24 class WorkerActor;
25 class OutputPort;
26 
31 {
32 public:
33 
35  ~InputPort(void);
36 
42  int index(void) const;
43 
45  const std::string &name(void) const;
46 
48  const DType &dtype(void) const;
49 
51  const std::string &domain(void) const;
52 
57  const BufferChunk &buffer(void) const;
58 
63  size_t elements(void) const;
64 
70  unsigned long long totalElements(void) const;
71 
77  unsigned long long totalBuffers(void) const;
78 
84  unsigned long long totalLabels(void) const;
85 
91  unsigned long long totalMessages(void) const;
92 
94  bool hasMessage(void);
95 
101  const LabelIteratorRange &labels(void) const;
102 
111  void removeLabel(const Label &label);
112 
119  void consume(const size_t numElements);
120 
126  Object popMessage(void);
127 
136  void setReserve(const size_t numElements);
137 
141  bool isSlot(void) const;
142 
150  void pushBuffer(const BufferChunk &buffer);
151 
158  void pushLabel(const Label &label);
159 
164  void pushMessage(const Object &message);
165 
170  void clear(void);
171 
172 private:
173  WorkerActor *_actor;
174 
175  //port configuration
176  bool _isSlot;
177  int _index;
178  std::string _name;
179  DType _dtype;
180  std::string _domain;
181 
182  //state set in pre-work
183  BufferChunk _buffer;
184  size_t _elements;
185  LabelIteratorRange _labelIter;
186 
187  //port stats
188  unsigned long long _totalElements;
189  unsigned long long _totalBuffers;
190  unsigned long long _totalLabels;
191  unsigned long long _totalMessages;
192 
193  //state changes from work
194  size_t _pendingElements;
195  size_t _reserveElements;
196 
197  //counts work actions which we will use to establish activity
198  size_t _workEvents;
199 
200  Util::SpinLock _asyncMessagesLock;
202 
203  Util::SpinLock _slotCallsLock;
205 
206  std::vector<Label> _inlineMessages; //user api structure
207  Util::RingDeque<Label> _inputInlineMessages; //shared structure
208 
209  Util::SpinLock _bufferAccumulatorLock;
210  BufferAccumulator _bufferAccumulator;
211 
212  std::vector<OutputPort *> _subscribers;
213 
215  void asyncMessagesPush(const Object &message, const BufferChunk &token = BufferChunk::null());
216  bool asyncMessagesEmpty(void);
217  Object asyncMessagesPop(void);
218  void asyncMessagesClear(void);
219 
221  void slotCallsPush(const Object &args, const BufferChunk &token);
222  bool slotCallsEmpty(void);
223  Object slotCallsPop(void);
224  void slotCallsClear(void);
225 
227  void inlineMessagesPush(const Label &label);
228  void inlineMessagesClear(void);
229 
231  void bufferAccumulatorFront(BufferChunk &);
232  void bufferAccumulatorPush(const BufferChunk &buffer);
233  void bufferAccumulatorPushNoLock(const BufferChunk &buffer);
234  void bufferAccumulatorPop(const size_t numBytes);
235  void bufferAccumulatorRequire(const size_t numBytes);
236  void bufferAccumulatorClear(void);
237 
239  void bufferLabelPush(
240  const std::vector<Label> &postedLabels,
241  const Util::RingDeque<BufferChunk> &postedBuffers);
242 
243  InputPort(void);
244  InputPort(const InputPort &){} // non construction-copyable
245  InputPort &operator=(const InputPort &){return *this;} // non copyable
246  friend class WorkerActor;
247  friend class OutputPort;
248 };
249 
250 } //namespace Pothos
Definition: Label.hpp:23
Definition: BufferAccumulator.hpp:31
Definition: Label.hpp:86
#define POTHOS_API
Definition: Config.hpp:41
Definition: OutputPort.hpp:30
Definition: SpinLock.hpp:26
Definition: Object.hpp:55
Definition: InputPort.hpp:30
Definition: RingDeque.hpp:28
Definition: BufferChunk.hpp:26
Definition: DType.hpp:38
static const BufferChunk & null(void)
Get a const reference to a null/empty BufferChunk.