SDRAngel  4.11.5
Developer docs for <a href="https://github.com/f4exb/sdrangel">SDRangel<\a>, an Open Source Qt5 / OpenGL 3.0+ SDR and signal analyzer frontend to various hardware.
remotesourcethread.cpp
Go to the documentation of this file.
1 // Copyright (C) 2018-2019 Edouard Griffiths, F4EXB //
3 // //
4 // This program is free software; you can redistribute it and/or modify //
5 // it under the terms of the GNU General Public License as published by //
6 // the Free Software Foundation as version 3 of the License, or //
7 // (at your option) any later version. //
8 // //
9 // This program is distributed in the hope that it will be useful, //
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of //
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
12 // GNU General Public License V3 for more details. //
13 // //
14 // You should have received a copy of the GNU General Public License //
15 // along with this program. If not, see <http://www.gnu.org/licenses/>. //
17 
18 #include "remotesourcethread.h"
19 
22 #include <algorithm>
23 
24 #include <QUdpSocket>
25 #include "cm256cc/cm256.h"
26 
27 
28 
31 
32 RemoteSourceThread::RemoteSourceThread(RemoteDataQueue *dataQueue, QObject* parent) :
33  QThread(parent),
34  m_running(false),
35  m_dataQueue(dataQueue),
36  m_address(QHostAddress::LocalHost),
37  m_socket(0)
38 {
39  std::fill(m_dataBlocks, m_dataBlocks+4, (RemoteDataBlock *) 0);
40  connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
41 }
42 
44 {
45  qDebug("RemoteSourceThread::~RemoteSourceThread");
46 }
47 
49 {
50  MsgStartStop *msg = MsgStartStop::create(start);
52 }
53 
54 void RemoteSourceThread::dataBind(const QString& address, uint16_t port)
55 {
56  MsgDataBind *msg = MsgDataBind::create(address, port);
58 }
59 
61 {
62  qDebug("RemoteSourceThread::startWork");
63  m_startWaitMutex.lock();
64  m_socket = new QUdpSocket(this);
65  start();
66  while(!m_running)
67  m_startWaiter.wait(&m_startWaitMutex, 100);
68  m_startWaitMutex.unlock();
69 }
70 
72 {
73  qDebug("RemoteSourceThread::stopWork");
74  delete m_socket;
75  m_socket = 0;
76  m_running = false;
77  wait();
78 }
79 
81 {
82  qDebug("RemoteSourceThread::run: begin");
83  m_running = true;
84  m_startWaiter.wakeAll();
85 
86  while (m_running)
87  {
88  sleep(1); // Do nothing as everything is in the data handler (dequeuer)
89  }
90 
91  m_running = false;
92  qDebug("RemoteSourceThread::run: end");
93 }
94 
95 
97 {
98  Message* message;
99 
100  while ((message = m_inputMessageQueue.pop()) != 0)
101  {
102  if (MsgStartStop::match(*message))
103  {
104  MsgStartStop* notif = (MsgStartStop*) message;
105  qDebug("RemoteSourceThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop");
106 
107  if (notif->getStartStop()) {
108  startWork();
109  } else {
110  stopWork();
111  }
112 
113  delete message;
114  }
115  else if (MsgDataBind::match(*message))
116  {
117  MsgDataBind* notif = (MsgDataBind*) message;
118  qDebug("RemoteSourceThread::handleInputMessages: MsgDataBind: %s:%d", qPrintable(notif->getAddress().toString()), notif->getPort());
119 
120  if (m_socket)
121  {
122  disconnect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams()));
123  m_socket->bind(notif->getAddress(), notif->getPort());
124  connect(m_socket, SIGNAL(readyRead()), this, SLOT(readPendingDatagrams()));
125  }
126  }
127  }
128 }
129 
131 {
132  RemoteSuperBlock superBlock;
133  qint64 size;
134 
135  while (m_socket->hasPendingDatagrams())
136  {
137  QHostAddress sender;
138  quint16 senderPort = 0;
139  //qint64 pendingDataSize = m_socket->pendingDatagramSize();
140  size = m_socket->readDatagram((char *) &superBlock, (long long int) sizeof(RemoteSuperBlock), &sender, &senderPort);
141 
142  if (size == sizeof(RemoteSuperBlock))
143  {
144  unsigned int dataBlockIndex = superBlock.m_header.m_frameIndex % m_nbDataBlocks;
145 
146  // create the first block for this index
147  if (m_dataBlocks[dataBlockIndex] == 0) {
148  m_dataBlocks[dataBlockIndex] = new RemoteDataBlock();
149  }
150 
151  if (m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex < 0)
152  {
153  // initialize virgin block with the frame index
154  m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
155  }
156  else
157  {
158  // if the frame index is not the same for the same slot it means we are starting a new frame
159  uint32_t frameIndex = m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex;
160 
161  if (superBlock.m_header.m_frameIndex != frameIndex)
162  {
163  //qDebug("RemoteSourceThread::readPendingDatagrams: push frame %u", frameIndex);
164  m_dataQueue->push(m_dataBlocks[dataBlockIndex]);
165  m_dataBlocks[dataBlockIndex] = new RemoteDataBlock();
166  m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_frameIndex = superBlock.m_header.m_frameIndex;
167  }
168  }
169 
170  m_dataBlocks[dataBlockIndex]->m_superBlocks[superBlock.m_header.m_blockIndex] = superBlock;
171 
172  if (superBlock.m_header.m_blockIndex == 0) {
173  m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_metaRetrieved = true;
174  }
175 
176  if (superBlock.m_header.m_blockIndex < RemoteNbOrginalBlocks) {
177  m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_originalCount++;
178  } else {
179  m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_recoveryCount++;
180  }
181 
182  m_dataBlocks[dataBlockIndex]->m_rxControlBlock.m_blockCount++;
183  }
184  else
185  {
186  qWarning("RemoteSourceThread::readPendingDatagrams: wrong super block size not processing");
187  }
188  }
189 }
190 
Message * pop()
Pop message from queue.
void push(Message *message, bool emitSignal=true)
Push message onto queue.
RemoteSuperBlock * m_superBlocks
QWaitCondition m_startWaiter
RemoteDataQueue * m_dataQueue
void push(RemoteDataBlock *dataBlock, bool emitSignal=true)
Push daa block onto queue.
int m_recoveryCount
number of recovery blocks received
volatile bool m_running
static MsgDataBind * create(const QString &address, uint16_t port)
static const uint32_t m_nbDataBlocks
number of data blocks in the ring buffer
unsigned int uint32_t
Definition: rtptypes_win.h:46
#define MESSAGE_CLASS_DEFINITION(Name, BaseClass)
Definition: message.h:52
RemoteRxControlBlock m_rxControlBlock
int m_originalCount
number of original blocks received
unsigned short uint16_t
Definition: rtptypes_win.h:44
MessageQueue m_inputMessageQueue
static bool match(const Message *message)
Definition: message.cpp:45
void dataBind(const QString &address, uint16_t port)
static MsgStartStop * create(bool startStop)
RemoteDataBlock * m_dataBlocks[m_nbDataBlocks]
ring buffer of data blocks indexed by frame affinity
void startStop(bool start)
int m_frameIndex
this frame index or -1 if unset
int m_blockCount
number of blocks received for this frame
bool m_metaRetrieved
true if meta data (block zero) was retrieved