Pothos  0.2.1-g9f04573d
The Pothos dataflow programming software suite
 All Classes Namespaces Files Functions Variables Typedefs Friends Macros
OutputPort.hpp
Go to the documentation of this file.
1 
11 #pragma once
12 #include <Pothos/Config.hpp>
13 #include <Pothos/Object/Object.hpp>
19 #include <Pothos/Util/SpinLock.hpp>
20 #include <string>
21 
22 namespace Pothos {
23 
24 class WorkerActor;
25 class InputPort;
26 
31 {
32 public:
33 
35  ~OutputPort(void);
36 
42  int index(void) const;
43 
45  const std::string &name(void) const;
46 
48  const DType &dtype(void) const;
49 
51  const std::string &domain(void) const;
52 
57  const BufferChunk &buffer(void) const;
58 
63  size_t elements(void) const;
64 
70  unsigned long long totalElements(void) const;
71 
78  unsigned long long totalBuffers(void) const;
79 
84  unsigned long long totalLabels(void) const;
85 
91  unsigned long long totalMessages(void) const;
92 
99  void produce(const size_t numElements);
100 
110  void popBuffer(const size_t numBytes);
111 
116  void postLabel(const Label &label);
117 
122  template <typename ValueType>
123  void postMessage(ValueType &&message);
124 
134  void postBuffer(const BufferChunk &buffer);
135 
139  bool isSignal(void) const;
140 
159  void setReadBeforeWrite(InputPort *port);
160 
161 private:
162  WorkerActor *_actor;
163 
164  //port configuration
165  bool _isSignal;
166  int _index;
167  std::string _name;
168  DType _dtype;
169 
170  //state set in pre-work
171  std::string _domain;
172  BufferChunk _buffer;
173  size_t _elements;
174 
175  //port stats
176  unsigned long long _totalElements;
177  unsigned long long _totalBuffers;
178  unsigned long long _totalLabels;
179  unsigned long long _totalMessages;
180 
181  //state changes from work
182  size_t _pendingElements;
183  std::vector<Label> _postedLabels;
184  Util::RingDeque<BufferChunk> _postedBuffers;
185 
186  //counts work actions which we will use to establish activity
187  size_t _workEvents;
188 
189  Util::SpinLock _bufferManagerLock;
190  BufferManager::Sptr _bufferManager;
191 
192  Util::SpinLock _tokenManagerLock;
193  BufferManager::Sptr _tokenManager; //used for message backpressure
194 
196  void bufferManagerSetup(const BufferManager::Sptr &manager);
197  bool bufferManagerEmpty(void);
198  void bufferManagerFront(BufferChunk &);
199  void bufferManagerPop(const size_t numBytes);
200  void bufferManagerPush(Pothos::Util::SpinLock *mutex, const ManagedBuffer &buff);
201 
203  void tokenManagerInit(void);
204  bool tokenManagerEmpty(void);
205  BufferChunk tokenManagerPop(void);
206  void tokenManagerPop(const size_t numBytes);
207 
208  std::vector<InputPort *> _subscribers;
209  InputPort *_readBeforeWritePort;
210  bool _bufferFromManager;
211 
212  OutputPort(void);
213  OutputPort(const OutputPort &){} // non construction-copyable
214  OutputPort &operator=(const OutputPort &){return *this;} // non copyable
215  friend class WorkerActor;
216  friend class InputPort;
217  void _postMessage(const Object &message);
218 };
219 
220 } //namespace Pothos
Definition: Label.hpp:23
#define POTHOS_API
Definition: Config.hpp:41
Definition: OutputPort.hpp:30
Definition: SpinLock.hpp:26
Definition: Object.hpp:55
Definition: InputPort.hpp:30
Definition: ManagedBuffer.hpp:27
Definition: RingDeque.hpp:28
std::shared_ptr< BufferManager > Sptr
Definition: BufferManager.hpp:60
Definition: BufferChunk.hpp:26
Definition: DType.hpp:38