Pothos  0.7.0-gf7fbae99
The Pothos dataflow programming software suite
OutputPort.hpp
Go to the documentation of this file.
1 
11 #pragma once
12 #include <Pothos/Config.hpp>
13 #include <Pothos/Object/Object.hpp>
20 #include <Pothos/Util/SpinLock.hpp>
21 #include <string>
22 
23 namespace Pothos {
24 
25 class WorkerActor;
26 class InputPort;
27 
32 {
33 public:
34 
36  ~OutputPort(void);
37 
43  int index(void) const;
44 
46  const std::string &name(void) const;
47 
49  const std::string &alias(void) const;
50 
52  void setAlias(const std::string &alias);
53 
55  const DType &dtype(void) const;
56 
58  const std::string &domain(void) const;
59 
64  const BufferChunk &buffer(void) const;
65 
70  size_t elements(void) const;
71 
77  unsigned long long totalElements(void) const;
78 
85  unsigned long long totalBuffers(void) const;
86 
91  unsigned long long totalLabels(void) const;
92 
98  unsigned long long totalMessages(void) const;
99 
106  void produce(const size_t numElements);
107 
117  void popElements(const size_t numElements);
118 
130  BufferChunk getBuffer(const size_t numElements);
131 
140  BufferChunk getBuffer(const DType &dtype, const size_t numElements);
141 
152  template <typename... ValueType>
153  void postLabel(ValueType&&... label);
154 
159  template <typename ValueType>
160  void postMessage(ValueType &&message);
161 
171  template <typename T, typename... Args>
172  void emplaceMessage(Args&&... args);
173 
183  template <typename ValueType>
184  void postBuffer(ValueType &&buffer);
185 
198  void setReserve(const size_t numElements);
199 
203  bool isSignal(void) const;
204 
223  void setReadBeforeWrite(InputPort *port);
224 
225 private:
226  WorkerActor *_actor;
227 
228  //port configuration
229  bool _isSignal;
230  int _index;
231  std::string _name;
232  std::string _alias;
233  DType _dtype;
234 
235  //state set in pre-work
236  std::string _domain;
237  BufferChunk _buffer;
238  size_t _elements;
239 
240  //port stats
241  unsigned long long _totalElements;
242  unsigned long long _totalBuffers;
243  unsigned long long _totalLabels;
244  unsigned long long _totalMessages;
245 
246  //state changes from work
247  size_t _pendingElements;
248  size_t _reserveElements;
249  std::vector<Label> _postedLabels;
250  Util::RingDeque<BufferChunk> _postedBuffers;
251 
252  //counts work actions which we will use to establish activity
253  size_t _workEvents;
254 
255  Util::SpinLock _bufferManagerLock;
256  BufferManager::Sptr _bufferManager;
257 
258  Util::SpinLock _tokenManagerLock;
259  BufferManager::Sptr _tokenManager; //used for message backpressure
260 
262  void bufferManagerSetup(const BufferManager::Sptr &manager);
263  bool bufferManagerEmpty(void);
264  void bufferManagerFront(BufferChunk &);
265  void bufferManagerPop(const size_t numBytes);
266  void bufferManagerPush(Pothos::Util::SpinLock *mutex, const ManagedBuffer &buff);
267 
269  void tokenManagerInit(void);
270  bool tokenManagerEmpty(void);
271  BufferChunk tokenManagerPop(void);
272  void tokenManagerPop(const size_t numBytes);
273 
274  std::vector<InputPort *> _subscribers;
275  InputPort *_readBeforeWritePort;
276  bool _bufferFromManager;
277  BufferPool _bufferPool;
278 
279  OutputPort(void);
280  OutputPort(const OutputPort &) = delete; // non construction-copyable
281  OutputPort &operator=(const OutputPort &) = delete; // non copyable
282  friend class WorkerActor;
283  friend class InputPort;
284  void _postMessage(const Object &message);
285 };
286 
287 } //namespace Pothos
#define POTHOS_API
Definition: Config.hpp:41
Definition: OutputPort.hpp:31
Definition: SpinLock.hpp:26
Definition: ArchiveEntry.hpp:20
Definition: Object.hpp:47
Definition: InputPort.hpp:30
Definition: ManagedBuffer.hpp:31
Definition: RingDeque.hpp:29
Definition: BufferPool.hpp:24
std::shared_ptr< BufferManager > Sptr
Definition: BufferManager.hpp:60
Definition: BufferChunk.hpp:30
Definition: DType.hpp:38