Pothos  0.6.0-g9da168ef
The Pothos dataflow programming software suite
OutputPort.hpp
Go to the documentation of this file.
1 
11 #pragma once
12 #include <Pothos/Config.hpp>
13 #include <Pothos/Object/Object.hpp>
20 #include <Pothos/Util/SpinLock.hpp>
21 #include <string>
22 
23 namespace Pothos {
24 
25 class WorkerActor;
26 class InputPort;
27 
32 {
33 public:
34 
36  ~OutputPort(void);
37 
43  int index(void) const;
44 
46  const std::string &name(void) const;
47 
49  const std::string &alias(void) const;
50 
52  void setAlias(const std::string &alias);
53 
55  const DType &dtype(void) const;
56 
58  const std::string &domain(void) const;
59 
64  const BufferChunk &buffer(void) const;
65 
70  size_t elements(void) const;
71 
77  unsigned long long totalElements(void) const;
78 
85  unsigned long long totalBuffers(void) const;
86 
91  unsigned long long totalLabels(void) const;
92 
98  unsigned long long totalMessages(void) const;
99 
106  void produce(const size_t numElements);
107 
117  void popElements(const size_t numElements);
118 
129  BufferChunk getBuffer(const size_t numElements);
130 
141  template <typename... ValueType>
142  void postLabel(ValueType&&... label);
143 
148  template <typename ValueType>
149  void postMessage(ValueType &&message);
150 
160  template <typename T, typename... Args>
161  void emplaceMessage(Args&&... args);
162 
172  template <typename ValueType>
173  void postBuffer(ValueType &&buffer);
174 
187  void setReserve(const size_t numElements);
188 
192  bool isSignal(void) const;
193 
212  void setReadBeforeWrite(InputPort *port);
213 
214 private:
215  WorkerActor *_actor;
216 
217  //port configuration
218  bool _isSignal;
219  int _index;
220  std::string _name;
221  std::string _alias;
222  DType _dtype;
223 
224  //state set in pre-work
225  std::string _domain;
226  BufferChunk _buffer;
227  size_t _elements;
228 
229  //port stats
230  unsigned long long _totalElements;
231  unsigned long long _totalBuffers;
232  unsigned long long _totalLabels;
233  unsigned long long _totalMessages;
234 
235  //state changes from work
236  size_t _pendingElements;
237  size_t _reserveElements;
238  std::vector<Label> _postedLabels;
239  Util::RingDeque<BufferChunk> _postedBuffers;
240 
241  //counts work actions which we will use to establish activity
242  size_t _workEvents;
243 
244  Util::SpinLock _bufferManagerLock;
245  BufferManager::Sptr _bufferManager;
246 
247  Util::SpinLock _tokenManagerLock;
248  BufferManager::Sptr _tokenManager; //used for message backpressure
249 
251  void bufferManagerSetup(const BufferManager::Sptr &manager);
252  bool bufferManagerEmpty(void);
253  void bufferManagerFront(BufferChunk &);
254  void bufferManagerPop(const size_t numBytes);
255  void bufferManagerPush(Pothos::Util::SpinLock *mutex, const ManagedBuffer &buff);
256 
258  void tokenManagerInit(void);
259  bool tokenManagerEmpty(void);
260  BufferChunk tokenManagerPop(void);
261  void tokenManagerPop(const size_t numBytes);
262 
263  std::vector<InputPort *> _subscribers;
264  InputPort *_readBeforeWritePort;
265  bool _bufferFromManager;
266  BufferPool _bufferPool;
267 
268  OutputPort(void);
269  OutputPort(const OutputPort &) = delete; // non construction-copyable
270  OutputPort &operator=(const OutputPort &) = delete; // non copyable
271  friend class WorkerActor;
272  friend class InputPort;
273  void _postMessage(const Object &message);
274 };
275 
276 } //namespace Pothos
#define POTHOS_API
Definition: Config.hpp:41
Definition: OutputPort.hpp:31
Definition: SpinLock.hpp:26
Definition: ArchiveEntry.hpp:20
Definition: Object.hpp:47
Definition: InputPort.hpp:30
Definition: ManagedBuffer.hpp:31
Definition: RingDeque.hpp:29
Definition: BufferPool.hpp:24
std::shared_ptr< BufferManager > Sptr
Definition: BufferManager.hpp:60
Definition: BufferChunk.hpp:30
Definition: DType.hpp:38