Pothos  0.3.0-ga8f2d4e2
The Pothos dataflow programming software suite
 All Classes Namespaces Files Functions Variables Typedefs Friends Macros
OutputPort.hpp
Go to the documentation of this file.
1 
11 #pragma once
12 #include <Pothos/Config.hpp>
13 #include <Pothos/Object/Object.hpp>
19 #include <Pothos/Util/SpinLock.hpp>
20 #include <string>
21 
22 namespace Pothos {
23 
24 class WorkerActor;
25 class InputPort;
26 
31 {
32 public:
33 
35  ~OutputPort(void);
36 
42  int index(void) const;
43 
45  const std::string &name(void) const;
46 
48  const std::string &alias(void) const;
49 
51  void setAlias(const std::string &alias);
52 
54  const DType &dtype(void) const;
55 
57  const std::string &domain(void) const;
58 
63  const BufferChunk &buffer(void) const;
64 
69  size_t elements(void) const;
70 
76  unsigned long long totalElements(void) const;
77 
84  unsigned long long totalBuffers(void) const;
85 
90  unsigned long long totalLabels(void) const;
91 
97  unsigned long long totalMessages(void) const;
98 
105  void produce(const size_t numElements);
106 
116  void popBuffer(const size_t numBytes);
117 
122  void postLabel(const Label &label);
123 
128  template <typename ValueType>
129  void postMessage(ValueType &&message);
130 
140  void postBuffer(const BufferChunk &buffer);
141 
145  bool isSignal(void) const;
146 
165  void setReadBeforeWrite(InputPort *port);
166 
167 private:
168  WorkerActor *_actor;
169 
170  //port configuration
171  bool _isSignal;
172  int _index;
173  std::string _name;
174  std::string _alias;
175  DType _dtype;
176 
177  //state set in pre-work
178  std::string _domain;
179  BufferChunk _buffer;
180  size_t _elements;
181 
182  //port stats
183  unsigned long long _totalElements;
184  unsigned long long _totalBuffers;
185  unsigned long long _totalLabels;
186  unsigned long long _totalMessages;
187 
188  //state changes from work
189  size_t _pendingElements;
190  std::vector<Label> _postedLabels;
191  Util::RingDeque<BufferChunk> _postedBuffers;
192 
193  //counts work actions which we will use to establish activity
194  size_t _workEvents;
195 
196  Util::SpinLock _bufferManagerLock;
197  BufferManager::Sptr _bufferManager;
198 
199  Util::SpinLock _tokenManagerLock;
200  BufferManager::Sptr _tokenManager; //used for message backpressure
201 
203  void bufferManagerSetup(const BufferManager::Sptr &manager);
204  bool bufferManagerEmpty(void);
205  void bufferManagerFront(BufferChunk &);
206  void bufferManagerPop(const size_t numBytes);
207  void bufferManagerPush(Pothos::Util::SpinLock *mutex, const ManagedBuffer &buff);
208 
210  void tokenManagerInit(void);
211  bool tokenManagerEmpty(void);
212  BufferChunk tokenManagerPop(void);
213  void tokenManagerPop(const size_t numBytes);
214 
215  std::vector<InputPort *> _subscribers;
216  InputPort *_readBeforeWritePort;
217  bool _bufferFromManager;
218 
219  OutputPort(void);
220  OutputPort(const OutputPort &){} // non construction-copyable
221  OutputPort &operator=(const OutputPort &){return *this;} // non copyable
222  friend class WorkerActor;
223  friend class InputPort;
224  void _postMessage(const Object &message);
225 };
226 
227 } //namespace Pothos
Definition: Label.hpp:23
#define POTHOS_API
Definition: Config.hpp:41
Definition: OutputPort.hpp:30
Definition: SpinLock.hpp:26
Definition: Object.hpp:55
Definition: InputPort.hpp:30
Definition: ManagedBuffer.hpp:27
Definition: RingDeque.hpp:28
std::shared_ptr< BufferManager > Sptr
Definition: BufferManager.hpp:60
Definition: BufferChunk.hpp:26
Definition: DType.hpp:38