SDRAngel  4.11.5
Developer docs for <a href="https://github.com/f4exb/sdrangel">SDRangel<\a>, an Open Source Qt5 / OpenGL 3.0+ SDR and signal analyzer frontend to various hardware.
dspdevicemimoengine.cpp
Go to the documentation of this file.
1 // Copyright (C) 2019 F4EXB //
3 // written by Edouard Griffiths //
4 // //
5 // This program is free software; you can redistribute it and/or modify //
6 // it under the terms of the GNU General Public License as published by //
7 // the Free Software Foundation as version 3 of the License, or //
8 // (at your option) any later version. //
9 // //
10 // This program is distributed in the hope that it will be useful, //
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of //
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
13 // GNU General Public License V3 for more details. //
14 // //
15 // You should have received a copy of the GNU General Public License //
16 // along with this program. If not, see <http://www.gnu.org/licenses/>. //
18 
19 #include <QDebug>
20 
21 #include "dsp/dspcommands.h"
24 #include "devicesamplemimo.h"
25 
26 #include "dspdevicemimoengine.h"
27 
45 
47  QThread(parent),
48  m_uid(uid),
49  m_state(StNotStarted),
50  m_deviceSampleMIMO(nullptr),
51  m_spectrumInputSourceElseSink(true),
52  m_spectrumInputIndex(0)
53 {
54  connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
55  connect(&m_syncMessenger, SIGNAL(messageSent()), this, SLOT(handleSynchronousMessages()), Qt::QueuedConnection);
56 
57  moveToThread(this);
58 }
59 
61 {
62  stop();
63  wait();
64 }
65 
67 {
68  qDebug() << "DSPDeviceMIMOEngine::run";
69  m_state = StIdle;
70  exec();
71 }
72 
74 {
75  qDebug() << "DSPDeviceMIMOEngine::start";
76  QThread::start();
77 }
78 
80 {
81  qDebug() << "DSPDeviceMIMOEngine::stop";
82  gotoIdle();
84  QThread::exit();
85 }
86 
88 {
89  qDebug() << "DSPDeviceMIMOEngine::initGeneration";
91 
92  return m_syncMessenger.sendWait(cmd) == StReady;
93 }
94 
96 {
97  qDebug() << "DSPDeviceMIMOEngine::startGeneration";
99 
100  return m_syncMessenger.sendWait(cmd) == StRunning;
101 }
102 
104 {
105  qDebug() << "DSPDeviceMIMOEngine::stopGeneration";
106  DSPGenerationStop cmd;
109 }
110 
112 {
113  qDebug() << "DSPDeviceMIMOEngine::setMIMO";
114  SetSampleMIMO cmd(mimo);
116 }
117 
119 {
120  qDebug("DSPDeviceMIMOEngine::setSinkSequence: seq: %d", sequence);
121  m_sampleMIMOSequence = sequence;
122 }
123 
125 {
126  qDebug("DSPDeviceMIMOEngine::addSourceStream");
127  AddSourceStream cmd(connect);
129 }
130 
132 {
133  qDebug("DSPDeviceMIMOEngine::removeLastSourceStream");
136 }
137 
139 {
140  qDebug("DSPDeviceMIMOEngine::addSinkStream");
141  AddSinkStream cmd(connect);
143 }
144 
146 {
147  qDebug("DSPDeviceMIMOEngine::removeLastSinkStream");
150 }
151 
153 {
154  qDebug() << "DSPDeviceMIMOEngine::addThreadedSource: "
155  << source->objectName().toStdString().c_str()
156  << " at: "
157  << index;
158  AddThreadedBasebandSampleSource cmd(source, index);
160 }
161 
163 {
164  qDebug() << "DSPDeviceMIMOEngine::removeThreadedSource: "
165  << source->objectName().toStdString().c_str()
166  << " at: "
167  << index;
168  RemoveThreadedBasebandSampleSource cmd(source, index);
170 }
171 
173 {
174  qDebug() << "DSPDeviceMIMOEngine::addThreadedSink: "
175  << sink->objectName().toStdString().c_str()
176  << " at: "
177  << index;
178  AddThreadedBasebandSampleSink cmd(sink, index);
180 }
181 
183 {
184  qDebug() << "DSPDeviceMIMOEngine::removeThreadedSink: "
185  << sink->objectName().toStdString().c_str()
186  << " at: "
187  << index;
188  RemoveThreadedBasebandSampleSink cmd(sink, index);
190 }
191 
193 {
194  qDebug() << "DSPDeviceMIMOEngine::addSink: "
195  << sink->objectName().toStdString().c_str()
196  << " at: "
197  << index;
198  AddBasebandSampleSink cmd(sink, index);
200 }
201 
203 {
204  qDebug() << "DSPDeviceMIMOEngine::removeSink: "
205  << sink->objectName().toStdString().c_str()
206  << " at: "
207  << index;
208  RemoveBasebandSampleSink cmd(sink, index);
210 }
211 
213 {
214  qDebug() << "DSPDeviceMIMOEngine::addSpectrumSink: " << spectrumSink->objectName().toStdString().c_str();
215  AddSpectrumSink cmd(spectrumSink);
217 }
218 
220 {
221  qDebug() << "DSPDeviceSinkEngine::removeSpectrumSink: " << spectrumSink->objectName().toStdString().c_str();
222  DSPRemoveSpectrumSink cmd(spectrumSink);
224 }
225 
226 void DSPDeviceMIMOEngine::setSpectrumSinkInput(bool sourceElseSink, int index)
227 {
228  qDebug() << "DSPDeviceSinkEngine::setSpectrumSinkInput: "
229  << " sourceElseSink: " << sourceElseSink
230  << " index: " << index;
231  SetSpectrumSinkInput cmd(sourceElseSink, index);
233 }
234 
236 {
237  qDebug() << "DSPDeviceMIMOEngine::errorMessage";
238  GetErrorMessage cmd;
240  return cmd.getErrorMessage();
241 }
242 
244 {
245  qDebug() << "DSPDeviceMIMOEngine::deviceDescription";
248  return cmd.getDeviceDescription();
249 }
250 
255 void DSPDeviceMIMOEngine::work(int nbWriteSamples)
256 {
257  (void) nbWriteSamples;
258  // Sources
259  for (unsigned int isource = 0; isource < m_deviceSampleMIMO->getNbSourceStreams(); isource++)
260  {
261  SampleSinkFifo* sampleFifo = m_deviceSampleMIMO->getSampleSinkFifo(isource); // sink FIFO is for Rx
262  int samplesDone = 0;
263  bool positiveOnly = false;
264 
265  while ((sampleFifo->fill() > 0) && (m_inputMessageQueue.size() == 0) && (samplesDone < m_deviceSampleMIMO->getSourceSampleRate(isource)))
266  {
267  SampleVector::iterator part1begin;
268  SampleVector::iterator part1end;
269  SampleVector::iterator part2begin;
270  SampleVector::iterator part2end;
271 
272  std::size_t count = sampleFifo->readBegin(sampleFifo->fill(), &part1begin, &part1end, &part2begin, &part2end);
273 
274  // first part of FIFO data
275  if (part1begin != part1end)
276  {
277  // TODO: DC and IQ corrections
278 
279  // feed data to direct sinks
280  if (isource < m_basebandSampleSinks.size())
281  {
282  for (BasebandSampleSinks::const_iterator it = m_basebandSampleSinks[isource].begin(); it != m_basebandSampleSinks[isource].end(); ++it) {
283  (*it)->feed(part1begin, part1end, positiveOnly);
284  }
285  }
286 
287  // possibly feed data to spectrum sink
289  m_spectrumSink->feed(part1begin, part1end, positiveOnly);
290  }
291 
292  // feed data to threaded sinks
293  if (isource < m_threadedBasebandSampleSinks.size())
294  {
295  for (ThreadedBasebandSampleSinks::const_iterator it = m_threadedBasebandSampleSinks[isource].begin(); it != m_threadedBasebandSampleSinks[isource].end(); ++it) {
296  (*it)->feed(part1begin, part1end, positiveOnly);
297  }
298  }
299  }
300 
301  // second part of FIFO data (used when block wraps around)
302  if(part2begin != part2end)
303  {
304  // TODO: DC and IQ corrections
305 
306  // feed data to direct sinks
307  if (isource < m_basebandSampleSinks.size())
308  {
309  for (BasebandSampleSinks::const_iterator it = m_basebandSampleSinks[isource].begin(); it != m_basebandSampleSinks[isource].end(); ++it) {
310  (*it)->feed(part2begin, part2end, positiveOnly);
311  }
312  }
313 
314  // possibly feed data to spectrum sink
316  m_spectrumSink->feed(part2begin, part2end, positiveOnly);
317  }
318 
319  // feed data to threaded sinks
320  if (isource < m_threadedBasebandSampleSinks.size())
321  {
322  for (ThreadedBasebandSampleSinks::const_iterator it = m_threadedBasebandSampleSinks[isource].begin(); it != m_threadedBasebandSampleSinks[isource].end(); ++it) {
323  (*it)->feed(part2begin, part2end, positiveOnly);
324  }
325  }
326  }
327 
328  // adjust FIFO pointers
329  sampleFifo->readCommit((unsigned int) count);
330  samplesDone += count;
331  } // while stream FIFO
332  } // for stream source
333 
334  // TODO: sinks
335 }
336 
337 void DSPDeviceMIMOEngine::workSampleSink(unsigned int sinkIndex)
338 {
339  if (m_state != StRunning) {
340  return;
341  }
342 
343  SampleSinkFifo* sampleFifo = m_deviceSampleMIMO->getSampleSinkFifo(sinkIndex);
344  int samplesDone = 0;
345  bool positiveOnly = false;
346 
347  while ((sampleFifo->fill() > 0) && (m_inputMessageQueue.size() == 0) && (samplesDone < m_deviceSampleMIMO->getSourceSampleRate(sinkIndex)))
348  {
349  SampleVector::iterator part1begin;
350  SampleVector::iterator part1end;
351  SampleVector::iterator part2begin;
352  SampleVector::iterator part2end;
353 
354  std::size_t count = sampleFifo->readBegin(sampleFifo->fill(), &part1begin, &part1end, &part2begin, &part2end);
355 
356  // first part of FIFO data
357  if (part1begin != part1end)
358  {
359  // DC and IQ corrections
360  if (m_sourcesCorrections[sinkIndex].m_dcOffsetCorrection) {
361  iqCorrections(part1begin, part1end, sinkIndex, m_sourcesCorrections[sinkIndex].m_iqImbalanceCorrection);
362  }
363 
364  // feed data to direct sinks
365  if (sinkIndex < m_basebandSampleSinks.size())
366  {
367  for (BasebandSampleSinks::const_iterator it = m_basebandSampleSinks[sinkIndex].begin(); it != m_basebandSampleSinks[sinkIndex].end(); ++it) {
368  (*it)->feed(part1begin, part1end, positiveOnly);
369  }
370  }
371 
372  // possibly feed data to spectrum sink
374  m_spectrumSink->feed(part1begin, part1end, positiveOnly);
375  }
376 
377  // feed data to threaded sinks
378  for (ThreadedBasebandSampleSinks::const_iterator it = m_threadedBasebandSampleSinks[sinkIndex].begin(); it != m_threadedBasebandSampleSinks[sinkIndex].end(); ++it)
379  {
380  (*it)->feed(part1begin, part1end, positiveOnly);
381  }
382  }
383 
384  // second part of FIFO data (used when block wraps around)
385  if(part2begin != part2end)
386  {
387  // DC and IQ corrections
388  if (m_sourcesCorrections[sinkIndex].m_dcOffsetCorrection) {
389  iqCorrections(part2begin, part2end, sinkIndex, m_sourcesCorrections[sinkIndex].m_iqImbalanceCorrection);
390  }
391 
392  // feed data to direct sinks
393  if (sinkIndex < m_basebandSampleSinks.size())
394  {
395  for (BasebandSampleSinks::const_iterator it = m_basebandSampleSinks[sinkIndex].begin(); it != m_basebandSampleSinks[sinkIndex].end(); ++it) {
396  (*it)->feed(part2begin, part2end, positiveOnly);
397  }
398  }
399 
400  // possibly feed data to spectrum sink
402  m_spectrumSink->feed(part2begin, part2end, positiveOnly);
403  }
404 
405  // feed data to threaded sinks
406  for (ThreadedBasebandSampleSinks::const_iterator it = m_threadedBasebandSampleSinks[sinkIndex].begin(); it != m_threadedBasebandSampleSinks[sinkIndex].end(); ++it)
407  {
408  (*it)->feed(part2begin, part2end, positiveOnly);
409  }
410  }
411 
412  // adjust FIFO pointers
413  sampleFifo->readCommit((unsigned int) count);
414  samplesDone += count;
415  }
416 }
417 
418 // notStarted -> idle -> init -> running -+
419 // ^ |
420 // +-----------------------+
421 
423 {
424  qDebug() << "DSPDeviceMIMOEngine::gotoIdle";
425 
426  switch(m_state) {
427  case StNotStarted:
428  return StNotStarted;
429 
430  case StIdle:
431  case StError:
432  return StIdle;
433 
434  case StReady:
435  case StRunning:
436  break;
437  }
438 
439  if (!m_deviceSampleMIMO) {
440  return StIdle;
441  }
442 
443  // stop everything
444 
445  std::vector<BasebandSampleSinks>::const_iterator vbit = m_basebandSampleSinks.begin();
446 
447  for (; vbit != m_basebandSampleSinks.end(); ++vbit)
448  {
449  for (BasebandSampleSinks::const_iterator it = vbit->begin(); it != vbit->end(); ++it) {
450  (*it)->stop();
451  }
452  }
453 
454  std::vector<ThreadedBasebandSampleSinks>::const_iterator vtit = m_threadedBasebandSampleSinks.begin();
455 
456  for (; vtit != m_threadedBasebandSampleSinks.end(); vtit++)
457  {
458  for (ThreadedBasebandSampleSinks::const_iterator it = vtit->begin(); it != vtit->end(); ++it) {
459  (*it)->stop();
460  }
461  }
462 
464  m_deviceDescription.clear();
465 
466  return StIdle;
467 }
468 
470 {
471  switch(m_state) {
472  case StNotStarted:
473  return StNotStarted;
474 
475  case StRunning: // FIXME: assumes it goes first through idle state. Could we get back to init from running directly?
476  return StRunning;
477 
478  case StReady:
479  return StReady;
480 
481  case StIdle:
482  case StError:
483  break;
484  }
485 
486  if (!m_deviceSampleMIMO) {
487  return gotoError("No sample MIMO configured");
488  }
489 
490  // init: pass sample rate and center frequency to all sample rate and/or center frequency dependent sinks and wait for completion
491 
492 
494 
495  qDebug() << "DSPDeviceMIMOEngine::gotoInit: "
496  << " m_deviceDescription: " << m_deviceDescription.toStdString().c_str();
497 
498  // Rx
499 
500  for (unsigned int isource = 0; isource < m_deviceSampleMIMO->getNbSinkFifos(); isource++)
501  {
502  if (isource < m_sourcesCorrections.size())
503  {
504  m_sourcesCorrections[isource].m_iOffset = 0;
505  m_sourcesCorrections[isource].m_qOffset = 0;
506  m_sourcesCorrections[isource].m_iRange = 1 << 16;
507  m_sourcesCorrections[isource].m_qRange = 1 << 16;
508  }
509 
510  quint64 sourceCenterFrequency = m_deviceSampleMIMO->getSourceCenterFrequency(isource);
511  int sourceStreamSampleRate = m_deviceSampleMIMO->getSourceSampleRate(isource);
512 
513  qDebug("DSPDeviceMIMOEngine::gotoInit: m_sourceCenterFrequencies[%d] = %llu", isource, sourceCenterFrequency);
514  qDebug("DSPDeviceMIMOEngine::gotoInit: m_sourceStreamSampleRates[%d] = %d", isource, sourceStreamSampleRate);
515 
516  DSPSignalNotification notif(sourceStreamSampleRate, sourceCenterFrequency);
517 
518  if (isource < m_basebandSampleSinks.size())
519  {
520  for (BasebandSampleSinks::const_iterator it = m_basebandSampleSinks[isource].begin(); it != m_basebandSampleSinks[isource].end(); ++it)
521  {
522  qDebug() << "DSPDeviceMIMOEngine::gotoInit: initializing " << (*it)->objectName().toStdString().c_str();
523  (*it)->handleMessage(notif);
524  }
525  }
526 
527  if (isource < m_threadedBasebandSampleSinks.size())
528  {
529  for (ThreadedBasebandSampleSinks::const_iterator it = m_threadedBasebandSampleSinks[isource].begin(); it != m_threadedBasebandSampleSinks[isource].end(); ++it)
530  {
531  qDebug() << "DSPDeviceMIMOEngine::gotoInit: initializing ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")";
532  (*it)->handleSinkMessage(notif);
533  }
534  }
535 
536  // Probably not necessary
537  // // possibly forward to spectrum sink
538  // if ((m_spectrumSink) && (m_spectrumInputSourceElseSink) && (isource == m_spectrumInputIndex)) {
539  // m_spectrumSink->handleMessage(notif);
540  // }
541 
542  // // forward changes to MIMO GUI input queue
543  // MessageQueue *guiMessageQueue = m_deviceSampleMIMO->getMessageQueueToGUI();
544 
545  // if (guiMessageQueue) {
546  // DSPMIMOSignalNotification* rep = new DSPMIMOSignalNotification(sourceStreamSampleRate, sourceCenterFrequency, true, isource); // make a copy for the MIMO GUI
547  // guiMessageQueue->push(rep);
548  // }
549  }
550 
551  //TODO: Tx
552 
553  return StReady;
554 }
555 
557 {
558  qDebug() << "DSPDeviceMIMOEngine::gotoRunning";
559 
560  switch(m_state)
561  {
562  case StNotStarted:
563  return StNotStarted;
564 
565  case StIdle:
566  return StIdle;
567 
568  case StRunning:
569  return StRunning;
570 
571  case StReady:
572  case StError:
573  break;
574  }
575 
576  if (!m_deviceSampleMIMO) {
577  return gotoError("DSPDeviceMIMOEngine::gotoRunning: No sample source configured");
578  }
579 
580  qDebug() << "DSPDeviceMIMOEngine::gotoRunning: " << m_deviceDescription.toStdString().c_str() << " started";
581 
582  // Start everything
583 
584  if (!m_deviceSampleMIMO->start()) {
585  return gotoError("Could not start sample source");
586  }
587 
588  std::vector<BasebandSampleSinks>::const_iterator vbit = m_basebandSampleSinks.begin();
589 
590  for (; vbit != m_basebandSampleSinks.end(); ++vbit)
591  {
592  for (BasebandSampleSinks::const_iterator it = vbit->begin(); it != vbit->end(); ++it)
593  {
594  qDebug() << "DSPDeviceMIMOEngine::gotoRunning: starting " << (*it)->objectName().toStdString().c_str();
595  (*it)->start();
596  }
597  }
598 
599  std::vector<ThreadedBasebandSampleSinks>::const_iterator vtit = m_threadedBasebandSampleSinks.begin();
600 
601  for (; vtit != m_threadedBasebandSampleSinks.end(); vtit++)
602  {
603  for (ThreadedBasebandSampleSinks::const_iterator it = vtit->begin(); it != vtit->end(); ++it)
604  {
605  qDebug() << "DSPDeviceMIMOEngine::gotoRunning: starting ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")";
606  (*it)->start();
607  }
608  }
609 
610  qDebug() << "DSPDeviceMIMOEngine::gotoRunning:input message queue pending: " << m_inputMessageQueue.size();
611 
612  return StRunning;
613 }
614 
616 {
617  qDebug() << "DSPDeviceMIMOEngine::gotoError: " << errorMessage;
618 
620  m_deviceDescription.clear();
621  m_state = StError;
622  return StError;
623 }
624 
626 {
627  if (m_state == StRunning)
628  {
629  work(0); // TODO: implement Tx side
630  }
631 }
632 
634 {
635  m_deviceSampleMIMO = mimo;
636 
637  if (mimo)
638  {
639  if ((m_sampleSinkConnectionIndexes.size() == 1) && (m_sampleSourceConnectionIndexes.size() == 0)) // true MIMO (synchronous FIFOs)
640  {
641  qDebug("DSPDeviceMIMOEngine::handleSetMIMO: synchronous set %s", qPrintable(mimo->getDeviceDescription()));
642  // connect(m_deviceSampleMIMO->getSampleSinkFifo(m_sampleSinkConnectionIndexes[0]), SIGNAL(dataReady()), this, SLOT(handleData()), Qt::QueuedConnection);
643  QObject::connect(
646  this,
647  [=](){ this->handleData(); }, // lambda function is not strictly needed here
648  Qt::QueuedConnection
649  );
650  }
651  else if (m_sampleSinkConnectionIndexes.size() != 0) // asynchronous FIFOs
652  {
653  for (unsigned int isink = 0; isink < m_sampleSinkConnectionIndexes.size(); isink++)
654  {
655  qDebug("DSPDeviceMIMOEngine::handleSetMIMO: asynchronous sources set %s channel %u",
656  qPrintable(mimo->getDeviceDescription()), isink);
657  QObject::connect(
660  this,
661  [=](){ this->workSampleSink(isink); },
662  Qt::QueuedConnection
663  );
664  }
665  }
666  }
667 
668  // TODO: Tx
669 }
670 
672 {
673  Message *message = m_syncMessenger.getMessage();
674  qDebug() << "DSPDeviceMIMOEngine::handleSynchronousMessages: " << message->getIdentifier();
675 
676  if (DSPGenerationInit::match(*message))
677  {
678  m_state = gotoIdle();
679 
680  if(m_state == StIdle) {
681  m_state = gotoInit(); // State goes ready if init is performed
682  }
683  }
684  else if (DSPGenerationStart::match(*message))
685  {
686  if(m_state == StReady) {
687  m_state = gotoRunning();
688  }
689  }
690  else if (DSPGenerationStop::match(*message))
691  {
692  m_state = gotoIdle();
693  }
694  else if (GetMIMODeviceDescription::match(*message))
695  {
696  ((GetMIMODeviceDescription*) message)->setDeviceDescription(m_deviceDescription);
697  }
698  else if (DSPGetErrorMessage::match(*message))
699  {
700  ((DSPGetErrorMessage*) message)->setErrorMessage(m_errorMessage);
701  }
702  else if (SetSampleMIMO::match(*message)) {
703  handleSetMIMO(((SetSampleMIMO*) message)->getSampleMIMO());
704  }
705  else if (AddSourceStream::match(*message))
706  {
708  int currentIndex = m_threadedBasebandSampleSinks.size();
711 
712  if (((AddSourceStream *) message)->getConnect()) {
713  m_sampleSinkConnectionIndexes.push_back(currentIndex);
714  }
715  }
716  else if (RemoveLastSourceStream::match(*message))
717  {
718  m_basebandSampleSinks.pop_back();
720  }
721  else if (AddSinkStream::match(*message))
722  {
723  int currentIndex = m_threadedBasebandSampleSources.size();
725 
726  if (((AddSinkStream *) message)->getConnect()) {
727  m_sampleSourceConnectionIndexes.push_back(currentIndex);
728  }
729  }
730  else if (RemoveLastSinkStream::match(*message))
731  {
733  }
734  else if (AddBasebandSampleSink::match(*message))
735  {
736  const AddBasebandSampleSink *msg = (AddBasebandSampleSink *) message;
737  BasebandSampleSink* sink = msg->getSampleSink();
738  unsigned int isource = msg->getIndex();
739 
740  if (isource < m_basebandSampleSinks.size())
741  {
742  m_basebandSampleSinks[isource].push_back(sink);
743  // initialize sample rate and center frequency in the sink:
744  int sourceStreamSampleRate = m_deviceSampleMIMO->getSourceSampleRate(isource);
745  quint64 sourceCenterFrequency = m_deviceSampleMIMO->getSourceCenterFrequency(isource);
746  DSPSignalNotification msg(sourceStreamSampleRate, sourceCenterFrequency);
747  sink->handleMessage(msg);
748  // start the sink:
749  if(m_state == StRunning) {
750  sink->start();
751  }
752  }
753  }
754  else if (RemoveBasebandSampleSink::match(*message))
755  {
756  const RemoveBasebandSampleSink *msg = (RemoveBasebandSampleSink *) message;
757  BasebandSampleSink* sink = ((DSPRemoveBasebandSampleSink*) message)->getSampleSink();
758  unsigned int isource = msg->getIndex();
759 
760  if (isource < m_basebandSampleSinks.size())
761  {
762  if(m_state == StRunning) {
763  sink->stop();
764  }
765 
766  m_basebandSampleSinks[isource].remove(sink);
767  }
768  }
769  else if (AddThreadedBasebandSampleSink::match(*message))
770  {
772  ThreadedBasebandSampleSink *threadedSink = msg->getThreadedSampleSink();
773  unsigned int isource = msg->getIndex();
774 
775  if (isource < m_threadedBasebandSampleSinks.size())
776  {
777  m_threadedBasebandSampleSinks[isource].push_back(threadedSink);
778  // initialize sample rate and center frequency in the sink:
779  int sourceStreamSampleRate = m_deviceSampleMIMO->getSourceSampleRate(isource);
780  quint64 sourceCenterFrequency = m_deviceSampleMIMO->getSourceCenterFrequency(isource);
781  DSPSignalNotification msg(sourceStreamSampleRate, sourceCenterFrequency);
782  threadedSink->handleSinkMessage(msg);
783  // start the sink:
784  if(m_state == StRunning) {
785  threadedSink->start();
786  }
787  }
788  }
789  else if (RemoveThreadedBasebandSampleSink::match(*message))
790  {
792  ThreadedBasebandSampleSink* threadedSink = msg->getThreadedSampleSink();
793  unsigned int isource = msg->getIndex();
794 
795  if (isource < m_threadedBasebandSampleSinks.size())
796  {
797  threadedSink->stop();
798  m_threadedBasebandSampleSinks[isource].remove(threadedSink);
799  }
800  }
801  else if (AddThreadedBasebandSampleSource::match(*message))
802  {
804  ThreadedBasebandSampleSource *threadedSource = msg->getThreadedSampleSource();
805  unsigned int isink = msg->getIndex();
806 
807  if (isink < m_threadedBasebandSampleSources.size())
808  {
809  m_threadedBasebandSampleSources[isink].push_back(threadedSource);
810  // initialize sample rate and center frequency in the sink:
811  int sinkStreamSampleRate = m_deviceSampleMIMO->getSinkSampleRate(isink);
812  quint64 sinkCenterFrequency = m_deviceSampleMIMO->getSinkCenterFrequency(isink);
813  DSPSignalNotification msg(sinkStreamSampleRate, sinkCenterFrequency);
814  threadedSource->handleSourceMessage(msg);
815  // start the sink:
816  if(m_state == StRunning) {
817  threadedSource->start();
818  }
819  }
820  }
822  {
824  ThreadedBasebandSampleSource* threadedSource = msg->getThreadedSampleSource();
825  unsigned int isink = msg->getIndex();
826 
827  if (isink < m_threadedBasebandSampleSources.size())
828  {
829  threadedSource->stop();
830  m_threadedBasebandSampleSources[isink].remove(threadedSource);
831  }
832  }
833  else if (AddSpectrumSink::match(*message))
834  {
835  m_spectrumSink = ((AddSpectrumSink*) message)->getSampleSink();
836  }
837  else if (RemoveSpectrumSink::match(*message))
838  {
839  BasebandSampleSink* spectrumSink = ((DSPRemoveSpectrumSink*) message)->getSampleSink();
840  spectrumSink->stop();
841 
842  if (!m_spectrumInputSourceElseSink && m_deviceSampleMIMO && (m_spectrumInputIndex < m_deviceSampleMIMO->getNbSinkStreams()))
843  {
845  disconnect(inputFIFO, SIGNAL(dataRead(int)), this, SLOT(handleForwardToSpectrumSink(int)));
846  }
847 
848  m_spectrumSink = nullptr;
849  }
850  else if (SetSpectrumSinkInput::match(*message))
851  {
852  const SetSpectrumSinkInput *msg = (SetSpectrumSinkInput *) message;
853  bool spectrumInputSourceElseSink = msg->getSourceElseSink();
854  unsigned int spectrumInputIndex = msg->getIndex();
855 
856  if ((spectrumInputSourceElseSink != m_spectrumInputSourceElseSink) || (spectrumInputIndex != m_spectrumInputIndex))
857  {
858  if (!m_spectrumInputSourceElseSink) // remove the source listener
859  {
861  disconnect(inputFIFO, SIGNAL(dataRead(int)), this, SLOT(handleForwardToSpectrumSink(int)));
862  }
863 
864  if ((!spectrumInputSourceElseSink) && (spectrumInputIndex < m_deviceSampleMIMO->getNbSinkStreams())) // add the source listener
865  {
866  SampleSourceFifo *inputFIFO = m_deviceSampleMIMO->getSampleSourceFifo(spectrumInputIndex);
867  connect(inputFIFO, SIGNAL(dataRead(int)), this, SLOT(handleForwardToSpectrumSink(int)));
868 
869  if (m_spectrumSink)
870  {
871  DSPSignalNotification notif(
872  m_deviceSampleMIMO->getSinkSampleRate(spectrumInputIndex),
873  m_deviceSampleMIMO->getSinkCenterFrequency(spectrumInputIndex));
875  }
876  }
877 
878  if (m_spectrumSink && (spectrumInputSourceElseSink) && (spectrumInputIndex < m_deviceSampleMIMO->getNbSinkFifos()))
879  {
880  DSPSignalNotification notif(
881  m_deviceSampleMIMO->getSourceSampleRate(spectrumInputIndex),
882  m_deviceSampleMIMO->getSourceCenterFrequency(spectrumInputIndex));
884  }
885 
886  m_spectrumInputSourceElseSink = spectrumInputSourceElseSink;
887  m_spectrumInputIndex = spectrumInputIndex;
888  }
889  }
890 
892 }
893 
895 {
896  Message* message;
897 
898  while ((message = m_inputMessageQueue.pop()) != 0)
899  {
900  qDebug("DSPDeviceMIMOEngine::handleInputMessages: message: %s", message->getIdentifier());
901 
902  if (ConfigureCorrection::match(*message))
903  {
904  ConfigureCorrection* conf = (ConfigureCorrection*) message;
905  unsigned int isource = conf->getIndex();
906 
907  if (isource < m_sourcesCorrections.size())
908  {
909  m_sourcesCorrections[isource].m_iqImbalanceCorrection = conf->getIQImbalanceCorrection();
910 
911  if (m_sourcesCorrections[isource].m_dcOffsetCorrection != conf->getDCOffsetCorrection())
912  {
913  m_sourcesCorrections[isource].m_dcOffsetCorrection = conf->getDCOffsetCorrection();
914  m_sourcesCorrections[isource].m_iOffset = 0;
915  m_sourcesCorrections[isource].m_qOffset = 0;
916 
917  if (m_sourcesCorrections[isource].m_iqImbalanceCorrection != conf->getIQImbalanceCorrection())
918  {
919  m_sourcesCorrections[isource].m_iqImbalanceCorrection = conf->getIQImbalanceCorrection();
920  m_sourcesCorrections[isource].m_iRange = 1 << 16;
921  m_sourcesCorrections[isource].m_qRange = 1 << 16;
922  m_sourcesCorrections[isource].m_imbalance = 65536;
923  }
924  }
925  m_sourcesCorrections[isource].m_iBeta.reset();
926  m_sourcesCorrections[isource].m_qBeta.reset();
927  m_sourcesCorrections[isource].m_avgAmp.reset();
928  m_sourcesCorrections[isource].m_avgII.reset();
929  m_sourcesCorrections[isource].m_avgII2.reset();
930  m_sourcesCorrections[isource].m_avgIQ.reset();
931  m_sourcesCorrections[isource].m_avgPhi.reset();
932  m_sourcesCorrections[isource].m_avgQQ2.reset();
933  m_sourcesCorrections[isource].m_iBeta.reset();
934  m_sourcesCorrections[isource].m_qBeta.reset();
935  }
936 
937  delete message;
938  }
939  else if (DSPMIMOSignalNotification::match(*message))
940  {
942 
943  // update DSP values
944 
945  bool sourceElseSink = notif->getSourceOrSink();
946  unsigned int istream = notif->getIndex();
947  int sampleRate = notif->getSampleRate();
948  qint64 centerFrequency = notif->getCenterFrequency();
949 
950  qDebug() << "DeviceMIMOEngine::handleInputMessages: DSPMIMOSignalNotification:"
951  << " sourceElseSink: " << sourceElseSink
952  << " istream: " << istream
953  << " sampleRate: " << sampleRate
954  << " centerFrequency: " << centerFrequency;
955 
956  if (sourceElseSink)
957  {
958  if ((istream < m_deviceSampleMIMO->getNbSourceStreams()))
959  {
960  DSPSignalNotification *message = new DSPSignalNotification(sampleRate, centerFrequency);
961 
962  // forward source changes to ancillary sinks with immediate execution (no queuing)
963  if (istream < m_basebandSampleSinks.size())
964  {
965  for (BasebandSampleSinks::const_iterator it = m_basebandSampleSinks[istream].begin(); it != m_basebandSampleSinks[istream].end(); ++it)
966  {
967  qDebug() << "DSPDeviceMIMOEngine::gotoRunning: starting " << (*it)->objectName().toStdString().c_str();
968  (*it)->handleMessage(*message);
969  }
970  }
971 
972  // forward source changes to channel sinks with immediate execution (no queuing)
973  if (istream < m_threadedBasebandSampleSinks.size())
974  {
975  for (ThreadedBasebandSampleSinks::const_iterator it = m_threadedBasebandSampleSinks[istream].begin(); it != m_threadedBasebandSampleSinks[istream].end(); ++it)
976  {
977  qDebug() << "DSPDeviceMIMOEngine::handleSourceMessages: forward message to ThreadedSampleSink(" << (*it)->getSampleSinkObjectName().toStdString().c_str() << ")";
978  (*it)->handleSinkMessage(*message);
979  }
980  }
981 
982  // forward changes to MIMO GUI input queue
984  qDebug("DeviceMIMOEngine::handleInputMessages: DSPMIMOSignalNotification: guiMessageQueue: %p", guiMessageQueue);
985 
986  if (guiMessageQueue) {
987  DSPMIMOSignalNotification* rep = new DSPMIMOSignalNotification(*notif); // make a copy for the MIMO GUI
988  guiMessageQueue->push(rep);
989  }
990 
991  // forward changes to spectrum sink if currently active
993  {
994  DSPSignalNotification spectrumNotif(sampleRate, centerFrequency);
995  m_spectrumSink->handleMessage(spectrumNotif);
996  }
997  }
998  }
999  else
1000  {
1001  if ((istream < m_deviceSampleMIMO->getNbSinkStreams()))
1002  {
1003  DSPSignalNotification *message = new DSPSignalNotification(sampleRate, centerFrequency);
1004 
1005  // forward source changes to channel sources with immediate execution (no queuing)
1006  if (istream < m_threadedBasebandSampleSources.size())
1007  {
1008  for (ThreadedBasebandSampleSources::const_iterator it = m_threadedBasebandSampleSources[istream].begin(); it != m_threadedBasebandSampleSources[istream].end(); ++it)
1009  {
1010  qDebug() << "DSPDeviceMIMOEngine::handleSinkMessages: forward message to ThreadedSampleSource(" << (*it)->getSampleSourceObjectName().toStdString().c_str() << ")";
1011  (*it)->handleSourceMessage(*message);
1012  }
1013  }
1014 
1015  // forward changes to MIMO GUI input queue
1017  qDebug("DSPDeviceMIMOEngine::handleInputMessages: DSPSignalNotification: guiMessageQueue: %p", guiMessageQueue);
1018 
1019  if (guiMessageQueue) {
1020  DSPMIMOSignalNotification* rep = new DSPMIMOSignalNotification(*notif); // make a copy for the source GUI
1021  guiMessageQueue->push(rep);
1022  }
1023 
1024  // forward changes to spectrum sink if currently active
1026  {
1027  DSPSignalNotification spectrumNotif(sampleRate, centerFrequency);
1028  m_spectrumSink->handleMessage(spectrumNotif);
1029  }
1030  }
1031  }
1032 
1033  delete message;
1034  }
1035  }
1036 }
1037 
1038 void DSPDeviceMIMOEngine::configureCorrections(bool dcOffsetCorrection, bool iqImbalanceCorrection, int isource)
1039 {
1040  qDebug() << "DSPDeviceMIMOEngine::configureCorrections";
1041  ConfigureCorrection* cmd = new ConfigureCorrection(dcOffsetCorrection, iqImbalanceCorrection, isource);
1043 }
1044 
1045 // This is used for the Tx (sink streams) side
1047 {
1048  if ((m_spectrumSink) && (m_spectrumInputIndex < m_deviceSampleMIMO->getNbSinkStreams()))
1049  {
1051  SampleVector::iterator readUntil;
1052  sampleFifo->getReadIterator(readUntil);
1053  m_spectrumSink->feed(readUntil - nbSamples, readUntil, false);
1054  }
1055 }
1056 
1057 void DSPDeviceMIMOEngine::iqCorrections(SampleVector::iterator begin, SampleVector::iterator end, int isource, bool imbalanceCorrection)
1058 {
1059  for(SampleVector::iterator it = begin; it < end; it++)
1060  {
1061  m_sourcesCorrections[isource].m_iBeta(it->real());
1062  m_sourcesCorrections[isource].m_qBeta(it->imag());
1063 
1064  if (imbalanceCorrection)
1065  {
1066 #if IMBALANCE_INT
1067  // acquisition
1068  int64_t xi = (it->m_real - (int32_t) m_sourcesCorrections[isource].m_iBeta) << 5;
1069  int64_t xq = (it->m_imag - (int32_t) m_sourcesCorrections[isource].m_qBeta) << 5;
1070 
1071  // phase imbalance
1072  m_sourcesCorrections[isource].m_avgII((xi*xi)>>28); // <I", I">
1073  m_sourcesCorrections[isource].m_avgIQ((xi*xq)>>28); // <I", Q">
1074 
1075  if ((int64_t) m_sourcesCorrections[isource].m_avgII != 0)
1076  {
1077  int64_t phi = (((int64_t) m_sourcesCorrections[isource].m_avgIQ)<<28) / (int64_t) m_sourcesCorrections[isource].m_avgII;
1078  m_sourcesCorrections[isource].m_avgPhi(phi);
1079  }
1080 
1081  int64_t corrPhi = (((int64_t) m_sourcesCorrections[isource].m_avgPhi) * xq) >> 28; //(m_avgPhi.asDouble()/16777216.0) * ((double) xq);
1082 
1083  int64_t yi = xi - corrPhi;
1084  int64_t yq = xq;
1085 
1086  // amplitude I/Q imbalance
1087  m_sourcesCorrections[isource].m_avgII2((yi*yi)>>28); // <I, I>
1088  m_sourcesCorrections[isource].m_avgQQ2((yq*yq)>>28); // <Q, Q>
1089 
1090  if ((int64_t) m_sourcesCorrections[isource].m_avgQQ2 != 0)
1091  {
1092  int64_t a = (((int64_t) m_sourcesCorrections[isource].m_avgII2)<<28) / (int64_t) m_sourcesCorrections[isource].m_avgQQ2;
1095  m_sourcesCorrections[isource].m_avgAmp(sqrtA.as_internal());
1096  }
1097 
1098  int64_t zq = (((int64_t) m_sourcesCorrections[isource].m_avgAmp) * yq) >> 28;
1099 
1100  it->m_real = yi >> 5;
1101  it->m_imag = zq >> 5;
1102 
1103 #else
1104  // DC correction and conversion
1105  float xi = (it->m_real - (int32_t) m_sourcesCorrections[isource].m_iBeta) / SDR_RX_SCALEF;
1106  float xq = (it->m_imag - (int32_t) m_sourcesCorrections[isource].m_qBeta) / SDR_RX_SCALEF;
1107 
1108  // phase imbalance
1109  m_sourcesCorrections[isource].m_avgII(xi*xi); // <I", I">
1110  m_sourcesCorrections[isource].m_avgIQ(xi*xq); // <I", Q">
1111 
1112 
1113  if (m_sourcesCorrections[isource].m_avgII.asDouble() != 0) {
1114  m_sourcesCorrections[isource].m_avgPhi(m_sourcesCorrections[isource].m_avgIQ.asDouble()/m_sourcesCorrections[isource].m_avgII.asDouble());
1115  }
1116 
1117  float& yi = xi; // the in phase remains the reference
1118  float yq = xq - m_sourcesCorrections[isource].m_avgPhi.asDouble()*xi;
1119 
1120  // amplitude I/Q imbalance
1121  m_sourcesCorrections[isource].m_avgII2(yi*yi); // <I, I>
1122  m_sourcesCorrections[isource].m_avgQQ2(yq*yq); // <Q, Q>
1123 
1124  if (m_sourcesCorrections[isource].m_avgQQ2.asDouble() != 0) {
1125  m_sourcesCorrections[isource].m_avgAmp(sqrt(m_sourcesCorrections[isource].m_avgII2.asDouble() / m_sourcesCorrections[isource].m_avgQQ2.asDouble()));
1126  }
1127 
1128  // final correction
1129  float& zi = yi; // the in phase remains the reference
1130  float zq = m_sourcesCorrections[isource].m_avgAmp.asDouble() * yq;
1131 
1132  // convert and store
1133  it->m_real = zi * SDR_RX_SCALEF;
1134  it->m_imag = zq * SDR_RX_SCALEF;
1135 #endif
1136  }
1137  else
1138  {
1139  // DC correction only
1140  it->m_real -= (int32_t) m_sourcesCorrections[isource].m_iBeta;
1141  it->m_imag -= (int32_t) m_sourcesCorrections[isource].m_qBeta;
1142  }
1143  }
1144 }
void handleData()
Handle data when samples have to be processed.
Message * pop()
Pop message from queue.
void removeChannelSource(ThreadedBasebandSampleSource *source, int index=0)
Remove a channel source that runs on its own thread.
qint64 getCenterFrequency() const
Definition: dspcommands.h:347
SyncMessenger m_syncMessenger
Used to process messages synchronously with the thread.
std::vector< ThreadedBasebandSampleSinks > m_threadedBasebandSampleSinks
channel sample sinks on their own thread (per input stream)
void setMIMO(DeviceSampleMIMO *mimo)
Set the sample MIMO type.
void push(Message *message, bool emitSignal=true)
Push message onto queue.
int size()
Returns queue size.
QString deviceDescription()
Return the device description.
virtual bool start()=0
void removeChannelSink(ThreadedBasebandSampleSink *sink, int index=0)
Remove a channel sink that runs on its own thread.
void handleInputMessages()
Handle input message queue.
virtual const QString & getDeviceDescription() const =0
bool handleSinkMessage(const Message &cmd)
Send message to sink synchronously.
QString errorMessage()
Return the current error message.
unsigned int getNbSourceStreams() const
Commodity function same as getNbSinkFifos (Rx or source streams)
void start()
this thread start()
std::list< ThreadedBasebandSampleSink * > ThreadedBasebandSampleSinks
void handleSynchronousMessages()
Handle synchronous messages with the thread.
bool m_spectrumInputSourceElseSink
Source else sink stream to be used as spectrum sink input.
engine is before initialization
void storeMessage(Message &message)
Definition: syncmessenger.h:42
void stopProcess()
Stop process sequence.
virtual quint64 getSourceCenterFrequency(int index) const =0
Center frequency exposed by the source at index.
virtual int getSinkSampleRate(int index) const =0
Sample rate exposed by the sink at index.
virtual void feed(const SampleVector::const_iterator &begin, const SampleVector::const_iterator &end, bool positiveOnly)=0
BasebandSampleSink * m_spectrumSink
The spectrum sink.
bool initProcess()
Initialize process sequence.
__int64 int64_t
Definition: rtptypes_win.h:47
std::vector< SourceCorrection > m_sourcesCorrections
uint readCommit(uint count)
unsigned int uint32_t
Definition: rtptypes_win.h:46
#define SDR_RX_SCALEF
Definition: dsptypes.h:33
bool getSourceOrSink() const
Definition: dspcommands.h:348
virtual void start()=0
std::list< BasebandSampleSink * > BasebandSampleSinks
ThreadedBasebandSampleSource * getThreadedSampleSource() const
State gotoError(const QString &errorMsg)
Go to an error state.
void handleForwardToSpectrumSink(int nbSamples)
virtual bool handleMessage(const Message &cmd)=0
Processing of a message. Returns true if message has actually been processed.
void setSpectrumSinkInput(bool sourceElseSink, int index)
#define MESSAGE_CLASS_DEFINITION(Name, BaseClass)
Definition: message.h:52
std::list< ThreadedBasebandSampleSource * > ThreadedBasebandSampleSources
void removeSpectrumSink(BasebandSampleSink *spectrumSink)
Add a spectrum vis baseband sample sink.
void configureCorrections(bool dcOffsetCorrection, bool iqImbalanceCorrection, int isource)
Configure source DSP corrections.
std::vector< int > m_sampleSinkConnectionIndexes
DeviceSampleMIMO * m_deviceSampleMIMO
void workSampleSink(unsigned int sinkIndex)
ThreadedBasebandSampleSink * getThreadedSampleSink() const
SampleSinkFifo * getSampleSinkFifo(unsigned int index)
Get Rx FIFO at index.
void handleSetMIMO(DeviceSampleMIMO *mimo)
Manage MIMO device setting.
void removeAncillarySink(BasebandSampleSink *sink, int index=0)
Remove an ancillary sample sink.
void stop()
this thread exit() and wait()
static bool match(const Message *message)
Definition: message.cpp:45
void stop()
This thread stop.
Definition: fixed.h:42
SampleSourceFifo * getSampleSourceFifo(unsigned int index)
Get Tx FIFO at index.
int int32_t
Definition: rtptypes_win.h:45
State gotoIdle()
Go to the idle state.
void iqCorrections(SampleVector::iterator begin, SampleVector::iterator end, int isource, bool imbalanceCorrection)
std::vector< BasebandSampleSinks > m_basebandSampleSinks
ancillary sample sinks on main thread (per input stream)
Fixed< IntType, IntBits > sqrt(Fixed< IntType, IntBits > const &x)
Definition: fixed.h:2283
State gotoRunning()
Go to the running state from ready state.
unsigned int getNbSinkFifos() const
Get the number of Rx FIFOs.
void setMIMOSequence(int sequence)
Set the sample MIMO sequence in type.
IntType as_internal() const
Definition: fixed.h:154
Message * getMessage() const
Definition: syncmessenger.h:41
void addAncillarySink(BasebandSampleSink *sink, int index=0)
Add an ancillary sink like a I/Q recorder.
void addChannelSource(ThreadedBasebandSampleSource *source, int index=0)
Add a channel source that will run on its own thread.
MessageQueue * getMessageQueueToGUI()
virtual quint64 getSinkCenterFrequency(int index) const =0
Center frequency exposed by the sink at index.
void done(int result=0)
Processing of the message is complete.
State gotoInit()
Go to the acquisition init state from idle.
virtual void stop()=0
void stop()
this thread exit() and wait()
unsigned int m_spectrumInputIndex
Index of the stream to be used as spectrum sink input.
void start()
This thread start.
ThreadedBasebandSampleSink * getThreadedSampleSink() const
ThreadedBasebandSampleSource * getThreadedSampleSource() const
virtual const char * getIdentifier() const
Definition: message.cpp:35
virtual int getSourceSampleRate(int index) const =0
Sample rate exposed by the source at index.
void addChannelSink(ThreadedBasebandSampleSink *sink, int index=0)
Add a channel sink that will run on its own thread.
void addSourceStream(bool connect)
int sendWait(Message &message, unsigned long msPollTime=100)
Send message and waits for its process completion.
virtual void stop()=0
unsigned int getIndex() const
Definition: dspcommands.h:349
std::vector< ThreadedBasebandSampleSources > m_threadedBasebandSampleSources
channel sample sources on their own threads (per output stream)
std::vector< int > m_sampleSourceConnectionIndexes
uint readBegin(uint count, SampleVector::iterator *part1Begin, SampleVector::iterator *part1End, SampleVector::iterator *part2Begin, SampleVector::iterator *part2End)
void getReadIterator(SampleVector::iterator &readUntil)
get iterator past the last sample of a read advance operation (i.e. current read iterator) ...
void addSpectrumSink(BasebandSampleSink *spectrumSink)
Add a spectrum vis baseband sample sink.
void work(int nbWriteSamples)
transfer samples if in running state
bool handleSourceMessage(const Message &cmd)
Send message to source synchronously.
void addSinkStream(bool connect)
MessageQueue m_inputMessageQueue
bool startProcess()
Start process sequence.