SDRAngel  4.11.5
Developer docs for <a href="https://github.com/f4exb/sdrangel">SDRangel<\a>, an Open Source Qt5 / OpenGL 3.0+ SDR and signal analyzer frontend to various hardware.
dspdevicesinkengine.cpp
Go to the documentation of this file.
1 // Copyright (C) 2016 F4EXB //
3 // written by Edouard Griffiths //
4 // //
5 // This program is free software; you can redistribute it and/or modify //
6 // it under the terms of the GNU General Public License as published by //
7 // the Free Software Foundation as version 3 of the License, or //
8 // (at your option) any later version. //
9 // //
10 // This program is distributed in the hope that it will be useful, //
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of //
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
13 // GNU General Public License V3 for more details. //
14 // //
15 // You should have received a copy of the GNU General Public License //
16 // along with this program. If not, see <http://www.gnu.org/licenses/>. //
18 
19 #include <stdio.h>
20 #include <QDebug>
21 #include <QThread>
22 
23 #include "dspdevicesinkengine.h"
24 
26 #include "dsp/basebandsamplesink.h"
27 #include "dsp/devicesamplesink.h"
28 #include "dsp/dspcommands.h"
29 #include "samplesourcefifo.h"
31 
33  QThread(parent),
34  m_uid(uid),
35  m_state(StNotStarted),
36  m_deviceSampleSink(nullptr),
37  m_sampleSinkSequence(0),
38  m_basebandSampleSources(),
39  m_spectrumSink(nullptr),
40  m_sampleRate(0),
41  m_centerFrequency(0),
42  m_multipleSourcesDivisionFactor(1)
43 {
44  connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
45  connect(&m_syncMessenger, SIGNAL(messageSent()), this, SLOT(handleSynchronousMessages()), Qt::QueuedConnection);
46 
47  moveToThread(this);
48 }
49 
51 {
52  stop();
53  wait();
54 }
55 
57 {
58  qDebug() << "DSPDeviceSinkEngine::run";
59  m_state = StIdle;
60  exec();
61 }
62 
64 {
65  qDebug() << "DSPDeviceSinkEngine::start";
66  QThread::start();
67 }
68 
70 {
71  qDebug() << "DSPDeviceSinkEngine::stop";
72  gotoIdle();
74  QThread::exit();
75 // DSPExit cmd;
76 // m_syncMessenger.sendWait(cmd);
77 }
78 
80 {
81  qDebug() << "DSPDeviceSinkEngine::initGeneration";
83 
84  return m_syncMessenger.sendWait(cmd) == StReady;
85 }
86 
88 {
89  qDebug() << "DSPDeviceSinkEngine::startGeneration";
91 
92  return m_syncMessenger.sendWait(cmd) == StRunning;
93 }
94 
96 {
97  qDebug() << "DSPDeviceSinkEngine::stopGeneration";
101 }
102 
104 {
105  qDebug() << "DSPDeviceSinkEngine::setSink";
106  DSPSetSink cmd(sink);
108 }
109 
111 {
112  qDebug("DSPDeviceSinkEngine::setSinkSequence: seq: %d", sequence);
113  m_sampleSinkSequence = sequence;
114 }
115 
117 {
118  qDebug() << "DSPDeviceSinkEngine::addThreadedSource: " << source->objectName().toStdString().c_str();
121 }
122 
124 {
125  qDebug() << "DSPDeviceSinkEngine::removeThreadedSource: " << source->objectName().toStdString().c_str();
128 }
129 
131 {
132  qDebug() << "DSPDeviceSinkEngine::addSpectrumSink: " << spectrumSink->objectName().toStdString().c_str();
133  DSPAddSpectrumSink cmd(spectrumSink);
135 }
136 
138 {
139  qDebug() << "DSPDeviceSinkEngine::removeSpectrumSink: " << spectrumSink->objectName().toStdString().c_str();
140  DSPRemoveSpectrumSink cmd(spectrumSink);
142 }
143 
145 {
146  qDebug() << "DSPDeviceSinkEngine::errorMessage";
147  DSPGetErrorMessage cmd;
149  return cmd.getErrorMessage();
150 }
151 
153 {
154  qDebug() << "DSPDeviceSinkEngine::sinkDeviceDescription";
157  return cmd.getDeviceDescription();
158 }
159 
160 void DSPDeviceSinkEngine::work(int nbWriteSamples)
161 {
162  // multiple channel sources handling
164  {
165 // qDebug("DSPDeviceSinkEngine::work: multiple channel sources handling: %u", m_multipleSourcesDivisionFactor);
166 
167  SampleVector::iterator writeBegin;
169  sampleFifo->getWriteIterator(writeBegin);
170  SampleVector::iterator writeAt = writeBegin;
171  std::vector<SampleVector::iterator> sampleSourceIterators;
172 
173  for (ThreadedBasebandSampleSources::iterator it = m_threadedBasebandSampleSources.begin(); it != m_threadedBasebandSampleSources.end(); ++it)
174  {
175  sampleSourceIterators.push_back(SampleVector::iterator());
176  (*it)->getSampleSourceFifo().readAdvance(sampleSourceIterators.back(), nbWriteSamples);
177  sampleSourceIterators.back() -= nbWriteSamples;
178  }
179 
180  for (BasebandSampleSources::iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); ++it)
181  {
182  sampleSourceIterators.push_back(SampleVector::iterator());
183  (*it)->getSampleSourceFifo().readAdvance(sampleSourceIterators.back(), nbWriteSamples);
184  sampleSourceIterators.back() -= nbWriteSamples;
185  }
186 
187  for (int is = 0; is < nbWriteSamples; is++)
188  {
189  // pull data from sources FIFOs and merge them in the device sample FIFO
190  for (std::vector<SampleVector::iterator>::iterator it = sampleSourceIterators.begin(); it != sampleSourceIterators.end(); ++it)
191  {
192  Sample s = (**it);
194  ++(*it);
195 
196  if (it == sampleSourceIterators.begin()) {
197  (*writeAt) = s;
198  } else {
199  (*writeAt) += s;
200  }
201  }
202 
203  sampleFifo->bumpIndex(writeAt);
204  }
205  }
206 }
207 
208 // notStarted -> idle -> init -> running -+
209 // ^ |
210 // +-----------------------+
211 
213 {
214  qDebug() << "DSPDeviceSinkEngine::gotoIdle";
215 
216  switch(m_state) {
217  case StNotStarted:
218  return StNotStarted;
219 
220  case StIdle:
221  case StError:
222  return StIdle;
223 
224  case StReady:
225  case StRunning:
226  break;
227  }
228 
229  if(m_deviceSampleSink == 0)
230  {
231  return StIdle;
232  }
233 
234  // stop everything
235 
236  for(BasebandSampleSources::const_iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); it++)
237  {
238  qDebug() << "DSPDeviceSinkEngine::gotoIdle: stopping " << (*it)->objectName().toStdString().c_str();
239  (*it)->stop();
240  }
241 
242  for(ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources.begin(); it != m_threadedBasebandSampleSources.end(); it++)
243  {
244  qDebug() << "DSPDeviceSinkEngine::gotoIdle: stopping ThreadedSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
245  (*it)->stop();
246  }
247 
248  if (m_spectrumSink)
249  {
250  disconnect(m_deviceSampleSink->getSampleFifo(), SIGNAL(dataRead(int)), this, SLOT(handleForwardToSpectrumSink(int)));
251  m_spectrumSink->stop();
252  }
253 
255  m_deviceDescription.clear();
256  m_sampleRate = 0;
257 
258  return StIdle;
259 }
260 
262 {
263  switch(m_state) {
264  case StNotStarted:
265  return StNotStarted;
266 
267  case StRunning: // FIXME: assumes it goes first through idle state. Could we get back to init from running directly?
268  return StRunning;
269 
270  case StReady:
271  return StReady;
272 
273  case StIdle:
274  case StError:
275  break;
276  }
277 
278  if (m_deviceSampleSink == 0)
279  {
280  return gotoError("DSPDeviceSinkEngine::gotoInit: No sample source configured");
281  }
282 
283  // init: pass sample rate and center frequency to all sample rate and/or center frequency dependent sinks and wait for completion
284 
288 
289  qDebug() << "DSPDeviceSinkEngine::gotoInit: "
290  << " m_deviceDescription: " << m_deviceDescription.toStdString().c_str()
291  << " sampleRate: " << m_sampleRate
292  << " centerFrequency: " << m_centerFrequency;
293 
294  DSPSignalNotification notif(m_sampleRate, m_centerFrequency);
295 
296  for (BasebandSampleSources::const_iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); ++it)
297  {
298  qDebug() << "DSPDeviceSinkEngine::gotoInit: initializing " << (*it)->objectName().toStdString().c_str();
299  (*it)->handleMessage(notif);
300  }
301 
302  for (ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources.begin(); it != m_threadedBasebandSampleSources.end(); ++it)
303  {
304  qDebug() << "DSPDeviceSinkEngine::gotoInit: initializing ThreadedSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
305  (*it)->handleSourceMessage(notif);
306  }
307 
308  if (m_spectrumSink) {
310  }
311 
312  // pass data to listeners
314  {
315  DSPSignalNotification* rep = new DSPSignalNotification(notif); // make a copy for the output queue
317  }
318 
319  return StReady;
320 }
321 
323 {
324  qDebug() << "DSPDeviceSinkEngine::gotoRunning";
325 
326  switch(m_state)
327  {
328  case StNotStarted:
329  return StNotStarted;
330 
331  case StIdle:
332  return StIdle;
333 
334  case StRunning:
335  return StRunning;
336 
337  case StReady:
338  case StError:
339  break;
340  }
341 
342  if(m_deviceSampleSink == 0) {
343  return gotoError("DSPDeviceSinkEngine::gotoRunning: No sample source configured");
344  }
345 
346  qDebug() << "DSPDeviceSinkEngine::gotoRunning: " << m_deviceDescription.toStdString().c_str() << " started";
347 
348  // Start everything
349 
350  if(!m_deviceSampleSink->start())
351  {
352  return gotoError("DSPDeviceSinkEngine::gotoRunning: Could not start sample sink");
353  }
354 
355  for(BasebandSampleSources::const_iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); it++)
356  {
357  qDebug() << "DSPDeviceSinkEngine::gotoRunning: starting " << (*it)->objectName().toStdString().c_str();
358  (*it)->start();
359  }
360 
361  for (ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources.begin(); it != m_threadedBasebandSampleSources.end(); ++it)
362  {
363  qDebug() << "DSPDeviceSinkEngine::gotoRunning: starting ThreadedSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
364  (*it)->start();
365  }
366 
367  if (m_spectrumSink)
368  {
369  connect(m_deviceSampleSink->getSampleFifo(), SIGNAL(dataRead(int)), this, SLOT(handleForwardToSpectrumSink(int)));
371  }
372 
373  qDebug() << "DSPDeviceSinkEngine::gotoRunning: input message queue pending: " << m_inputMessageQueue.size();
374 
375  return StRunning;
376 }
377 
379 {
380  qDebug() << "DSPDeviceSinkEngine::gotoError";
381 
383  m_deviceDescription.clear();
384  m_state = StError;
385  return StError;
386 }
387 
389 {
390  gotoIdle();
391 
392  m_deviceSampleSink = sink;
393 
394  if(m_deviceSampleSink != 0) {
395  qDebug("DSPDeviceSinkEngine::handleSetSink: set %s", qPrintable(sink->getDeviceDescription()));
396  } else {
397  qDebug("DSPDeviceSinkEngine::handleSetSource: set none");
398  }
399 }
400 
402 {
403  if(m_state == StRunning)
404  {
405  work(nbSamples);
406  }
407 }
408 
410 {
411  Message *message = m_syncMessenger.getMessage();
412  qDebug() << "DSPDeviceSinkEngine::handleSynchronousMessages: " << message->getIdentifier();
413 
414  if (DSPGenerationInit::match(*message))
415  {
416  m_state = gotoIdle();
417 
418  if(m_state == StIdle) {
419  m_state = gotoInit(); // State goes ready if init is performed
420  }
421  }
422  else if (DSPGenerationStart::match(*message))
423  {
424  if(m_state == StReady) {
425  m_state = gotoRunning();
426  }
427  }
428  else if (DSPGenerationStop::match(*message))
429  {
430  m_state = gotoIdle();
431  }
432  else if (DSPGetSinkDeviceDescription::match(*message))
433  {
434  ((DSPGetSinkDeviceDescription*) message)->setDeviceDescription(m_deviceDescription);
435  }
436  else if (DSPGetErrorMessage::match(*message))
437  {
438  ((DSPGetErrorMessage*) message)->setErrorMessage(m_errorMessage);
439  }
440  else if (DSPSetSink::match(*message)) {
441  handleSetSink(((DSPSetSink*) message)->getSampleSink());
442  }
443  else if (DSPAddSpectrumSink::match(*message))
444  {
445  m_spectrumSink = ((DSPAddSpectrumSink*) message)->getSampleSink();
446  }
447  else if (DSPRemoveSpectrumSink::match(*message))
448  {
449  BasebandSampleSink* spectrumSink = ((DSPRemoveSpectrumSink*) message)->getSampleSink();
450 
451  if(m_state == StRunning) {
452  spectrumSink->stop();
453  }
454 
455  m_spectrumSink = 0;
456  }
457  else if (DSPAddBasebandSampleSource::match(*message))
458  {
459  BasebandSampleSource* source = ((DSPAddBasebandSampleSource*) message)->getSampleSource();
460  m_basebandSampleSources.push_back(source);
462  source->handleMessage(notif);
464 
465  if (m_state == StRunning)
466  {
467  source->start();
468  }
469  }
470  else if (DSPRemoveBasebandSampleSource::match(*message))
471  {
472  BasebandSampleSource* source = ((DSPRemoveBasebandSampleSource*) message)->getSampleSource();
473 
474  if(m_state == StRunning) {
475  source->stop();
476  }
477 
478  m_basebandSampleSources.remove(source);
480  }
482  {
483  ThreadedBasebandSampleSource *threadedSource = ((DSPAddThreadedBasebandSampleSource*) message)->getThreadedSampleSource();
484  m_threadedBasebandSampleSources.push_back(threadedSource);
486  threadedSource->handleSourceMessage(notif);
488 
489  if (m_state == StRunning)
490  {
491  threadedSource->start();
492  }
493  }
495  {
496  ThreadedBasebandSampleSource* threadedSource = ((DSPRemoveThreadedBasebandSampleSource*) message)->getThreadedSampleSource();
497  if (m_state == StRunning) {
498  threadedSource->stop();
499  }
500 
501  m_threadedBasebandSampleSources.remove(threadedSource);
503  }
504 
506 }
507 
509 {
510  Message* message;
511 
512  while ((message = m_inputMessageQueue.pop()) != 0)
513  {
514  qDebug("DSPDeviceSinkEngine::handleInputMessages: message: %s", message->getIdentifier());
515 
516  if (DSPSignalNotification::match(*message))
517  {
518  DSPSignalNotification *notif = (DSPSignalNotification *) message;
519 
520  // update DSP values
521 
522  m_sampleRate = notif->getSampleRate();
524 
525  qDebug() << "DSPDeviceSinkEngine::handleInputMessages: DSPSignalNotification:"
526  << " m_sampleRate: " << m_sampleRate
527  << " m_centerFrequency: " << m_centerFrequency;
528 
529  // forward source changes to sources with immediate execution
530 
531  for(BasebandSampleSources::const_iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); it++)
532  {
533  qDebug() << "DSPDeviceSinkEngine::handleInputMessages: forward message to " << (*it)->objectName().toStdString().c_str();
534  (*it)->handleMessage(*message);
535  }
536 
537  for (ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources.begin(); it != m_threadedBasebandSampleSources.end(); ++it)
538  {
539  qDebug() << "DSPDeviceSinkEngine::handleSourceMessages: forward message to ThreadedSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
540  (*it)->handleSourceMessage(*message);
541  }
542 
543  // forward changes to listeners on DSP output queue
544 
546  qDebug("DSPDeviceSinkEngine::handleInputMessages: DSPSignalNotification: guiMessageQueue: %p", guiMessageQueue);
547 
548  if (guiMessageQueue)
549  {
550  DSPSignalNotification* rep = new DSPSignalNotification(*notif); // make a copy for the output queue
551  guiMessageQueue->push(rep);
552  }
553 
554  delete message;
555  }
556  }
557 }
558 
560 {
561  if (m_spectrumSink)
562  {
564  SampleVector::iterator readUntil;
565  sampleFifo->getReadIterator(readUntil);
566  m_spectrumSink->feed(readUntil - nbSamples, readUntil, false);
567  }
568 }
569 
571 {
573 
574  // single channel source handling
575  if ((m_threadedBasebandSampleSources.size() + m_basebandSampleSources.size()) == 1)
576  {
577  qDebug("DSPDeviceSinkEngine::checkNumberOfBasebandSources: single channel mode");
578  disconnect(sampleFifo, SIGNAL(dataWrite(int)), this, SLOT(handleData(int)));
579 
580  if (m_threadedBasebandSampleSources.size() == 1) {
581  m_threadedBasebandSampleSources.back()->setDeviceSampleSourceFifo(sampleFifo);
582  } else if (m_basebandSampleSources.size() == 1) {
583  m_basebandSampleSources.back()->setDeviceSampleSourceFifo(sampleFifo);
584  }
585 
586  m_multipleSourcesDivisionFactor = 1; // for consistency but it is not used in this case
587  }
588  // null or multiple channel sources handling
589  else
590  {
591  int nbSources = 0;
592 
593  for (ThreadedBasebandSampleSources::iterator it = m_threadedBasebandSampleSources.begin(); it != m_threadedBasebandSampleSources.end(); ++it)
594  {
595  (*it)->setDeviceSampleSourceFifo(0);
596  nbSources++;
597  }
598 
599  for (BasebandSampleSources::iterator it = m_basebandSampleSources.begin(); it != m_basebandSampleSources.end(); ++it)
600  {
601  (*it)->setDeviceSampleSourceFifo(0);
602  nbSources++;
603  }
604 
605  if (nbSources == 0) {
607  } else if (nbSources < 3) {
609  } else {
610  m_multipleSourcesDivisionFactor = 1<<nbSources;
611  }
612 
613  if (nbSources > 1) {
614  connect(sampleFifo, SIGNAL(dataWrite(int)), this, SLOT(handleData(int)), Qt::QueuedConnection);
615  }
616 
617  qDebug("DSPDeviceSinkEngine::checkNumberOfBasebandSources: handle %d channel(s)", nbSources);
618  }
619 }
void setSink(DeviceSampleSink *sink)
Set the sample sink type.
Message * pop()
Pop message from queue.
engine is before initialization
engine is ready to run
const QString & getErrorMessage() const
Definition: dspcommands.h:86
void push(Message *message, bool emitSignal=true)
Push message onto queue.
int size()
Returns queue size.
State gotoRunning()
Go to the running state from ready state.
virtual bool handleMessage(const Message &cmd)=0
Processing of a message. Returns true if message has actually been processed.
void handleSynchronousMessages()
Handle synchronous messages with the thread.
DeviceSampleSink * m_deviceSampleSink
void removeSpectrumSink(BasebandSampleSink *spectrumSink)
Add a spectrum vis baseband sample sink.
void storeMessage(Message &message)
Definition: syncmessenger.h:42
virtual void stop()=0
BasebandSampleSink * m_spectrumSink
void start()
This thread start.
virtual void stop()=0
virtual void feed(const SampleVector::const_iterator &begin, const SampleVector::const_iterator &end, bool positiveOnly)=0
unsigned int uint32_t
Definition: rtptypes_win.h:46
bool initGeneration()
Initialize generation sequence.
void bumpIndex(SampleVector::iterator &writeAt)
copy current item to second buffer and bump write index - write phase 2
virtual void start()=0
void stopGeneration()
Stop generation sequence.
virtual void start()=0
qint64 getCenterFrequency() const
Definition: dspcommands.h:329
virtual bool handleMessage(const Message &cmd)=0
Processing of a message. Returns true if message has actually been processed.
BasebandSampleSources m_basebandSampleSources
baseband sample sources within main thread (usually file input)
void removeThreadedSource(ThreadedBasebandSampleSource *source)
Remove a baseband sample source that runs on its own thread.
void handleForwardToSpectrumSink(int nbSamples)
SyncMessenger m_syncMessenger
Used to process messages synchronously with the thread.
virtual quint64 getCenterFrequency() const =0
Center frequency exposed by the sink.
void handleSetSink(DeviceSampleSink *sink)
Manage sink setting.
void addThreadedSource(ThreadedBasebandSampleSource *source)
Add a baseband sample source that will run on its own thread.
void handleData(int nbSamples)
Handle data when samples have to be written to the sample FIFO.
SampleSourceFifo * getSampleFifo()
const QString & getDeviceDescription() const
Definition: dspcommands.h:75
MessageQueue m_inputMessageQueue
void stop()
this thread exit() and wait()
static bool match(const Message *message)
Definition: message.cpp:45
State gotoIdle()
Go to the idle state.
virtual bool start()=0
void work(int nbWriteSamples)
transfer samples from beseband sources to sink if in running state
QString errorMessage()
Return the current error message.
uint32_t m_multipleSourcesDivisionFactor
State gotoInit()
Go to the acquisition init state from idle.
void handleInputMessages()
Handle input message queue.
Message * getMessage() const
Definition: syncmessenger.h:41
void addSpectrumSink(BasebandSampleSink *spectrumSink)
Add a spectrum vis baseband sample sink.
DSPDeviceSinkEngine(uint32_t uid, QObject *parent=NULL)
void done(int result=0)
Processing of the message is complete.
int getSampleRate() const
Definition: dspcommands.h:328
QString sinkDeviceDescription()
Return the sink device description.
State gotoError(const QString &errorMsg)
Go to an error state.
virtual const char * getIdentifier() const
Definition: message.cpp:35
void setSinkSequence(int sequence)
Set the sample sink sequence in type.
virtual int getSampleRate() const =0
Sample rate exposed by the sink.
int sendWait(Message &message, unsigned long msPollTime=100)
Send message and waits for its process completion.
virtual void stop()=0
MessageQueue * getMessageQueueToGUI()
ThreadedBasebandSampleSources m_threadedBasebandSampleSources
baseband sample sources on their own threads (usually channels)
void getWriteIterator(SampleVector::iterator &writeAt)
get iterator to current item for update - write phase 1
virtual const QString & getDeviceDescription() const =0
void getReadIterator(SampleVector::iterator &readUntil)
get iterator past the last sample of a read advance operation (i.e. current read iterator) ...
bool startGeneration()
Start generation sequence.
bool handleSourceMessage(const Message &cmd)
Send message to source synchronously.
void stop()
This thread stop.