Pothos  0.4.3-gabce2ce6
The Pothos dataflow programming software suite
OutputPort.hpp
Go to the documentation of this file.
1 
11 #pragma once
12 #include <Pothos/Config.hpp>
13 #include <Pothos/Object/Object.hpp>
20 #include <Pothos/Util/SpinLock.hpp>
21 #include <string>
22 
23 namespace Pothos {
24 
25 class WorkerActor;
26 class InputPort;
27 
32 {
33 public:
34 
36  ~OutputPort(void);
37 
43  int index(void) const;
44 
46  const std::string &name(void) const;
47 
49  const std::string &alias(void) const;
50 
52  void setAlias(const std::string &alias);
53 
55  const DType &dtype(void) const;
56 
58  const std::string &domain(void) const;
59 
64  const BufferChunk &buffer(void) const;
65 
70  size_t elements(void) const;
71 
77  unsigned long long totalElements(void) const;
78 
85  unsigned long long totalBuffers(void) const;
86 
91  unsigned long long totalLabels(void) const;
92 
98  unsigned long long totalMessages(void) const;
99 
106  void produce(const size_t numElements);
107 
118  void popBuffer(const size_t numBytes);
119 
129  void popElements(const size_t numElements);
130 
140  BufferChunk getBuffer(const size_t numElements);
141 
146  void postLabel(const Label &label);
147 
152  template <typename ValueType>
153  void postMessage(ValueType &&message);
154 
164  void postBuffer(const BufferChunk &buffer);
165 
178  void setReserve(const size_t numElements);
179 
183  bool isSignal(void) const;
184 
203  void setReadBeforeWrite(InputPort *port);
204 
205 private:
206  WorkerActor *_actor;
207 
208  //port configuration
209  bool _isSignal;
210  int _index;
211  std::string _name;
212  std::string _alias;
213  DType _dtype;
214 
215  //state set in pre-work
216  std::string _domain;
217  BufferChunk _buffer;
218  size_t _elements;
219 
220  //port stats
221  unsigned long long _totalElements;
222  unsigned long long _totalBuffers;
223  unsigned long long _totalLabels;
224  unsigned long long _totalMessages;
225 
226  //state changes from work
227  size_t _pendingElements;
228  size_t _reserveElements;
229  std::vector<Label> _postedLabels;
230  Util::RingDeque<BufferChunk> _postedBuffers;
231 
232  //counts work actions which we will use to establish activity
233  size_t _workEvents;
234 
235  Util::SpinLock _bufferManagerLock;
236  BufferManager::Sptr _bufferManager;
237 
238  Util::SpinLock _tokenManagerLock;
239  BufferManager::Sptr _tokenManager; //used for message backpressure
240 
242  void bufferManagerSetup(const BufferManager::Sptr &manager);
243  bool bufferManagerEmpty(void);
244  void bufferManagerFront(BufferChunk &);
245  void bufferManagerPop(const size_t numBytes);
246  void bufferManagerPush(Pothos::Util::SpinLock *mutex, const ManagedBuffer &buff);
247 
249  void tokenManagerInit(void);
250  bool tokenManagerEmpty(void);
251  BufferChunk tokenManagerPop(void);
252  void tokenManagerPop(const size_t numBytes);
253 
254  std::vector<InputPort *> _subscribers;
255  InputPort *_readBeforeWritePort;
256  bool _bufferFromManager;
257  BufferPool _bufferPool;
258 
259  OutputPort(void);
260  OutputPort(const OutputPort &){} // non construction-copyable
261  OutputPort &operator=(const OutputPort &){return *this;} // non copyable
262  friend class WorkerActor;
263  friend class InputPort;
264  void _postMessage(const Object &message);
265 };
266 
267 } //namespace Pothos
Definition: Label.hpp:23
#define POTHOS_API
Definition: Config.hpp:41
Definition: OutputPort.hpp:31
Definition: SpinLock.hpp:26
Definition: CallInterface.hpp:15
Definition: Object.hpp:55
Definition: InputPort.hpp:30
Definition: ManagedBuffer.hpp:31
Definition: RingDeque.hpp:29
Definition: BufferPool.hpp:24
std::shared_ptr< BufferManager > Sptr
Definition: BufferManager.hpp:60
Definition: BufferChunk.hpp:30
Definition: DType.hpp:38