SDRAngel  4.11.5
Developer docs for <a href="https://github.com/f4exb/sdrangel">SDRangel<\a>, an Open Source Qt5 / OpenGL 3.0+ SDR and signal analyzer frontend to various hardware.
udpsourceudphandler.cpp
Go to the documentation of this file.
1 // Copyright (C) 2017 Edouard Griffiths, F4EXB //
3 // //
4 // This program is free software; you can redistribute it and/or modify //
5 // it under the terms of the GNU General Public License as published by //
6 // the Free Software Foundation as version 3 of the License, or //
7 // (at your option) any later version. //
8 // //
9 // This program is distributed in the hope that it will be useful, //
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of //
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
12 // GNU General Public License V3 for more details. //
13 // //
14 // You should have received a copy of the GNU General Public License //
15 // along with this program. If not, see <http://www.gnu.org/licenses/>. //
17 
18 #include "udpsourceudphandler.h"
19 
20 #include <QDebug>
21 #include <stdint.h>
22 #include <algorithm>
23 
24 #include "udpsourcemsg.h"
25 
27 
29  m_dataSocket(0),
30  m_dataAddress(QHostAddress::LocalHost),
31  m_remoteAddress(QHostAddress::LocalHost),
32  m_dataPort(9999),
33  m_remotePort(0),
34  m_dataConnected(false),
35  m_udpDumpIndex(0),
36  m_nbUDPFrames(m_minNbUDPFrames),
37  m_nbAllocatedUDPFrames(m_minNbUDPFrames),
38  m_writeFrameIndex(0),
39  m_readFrameIndex(m_minNbUDPFrames/2),
40  m_readIndex(0),
41  m_rwDelta(m_minNbUDPFrames/2),
42  m_d(0),
43  m_autoRWBalance(true),
44  m_feedbackMessageQueue(0)
45 {
46  m_udpBuf = new udpBlk_t[m_minNbUDPFrames];
47  std::fill(m_udpDump, m_udpDump + m_udpBlockSize + 8192, 0);
48  connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleMessages()));
49 }
50 
52 {
53  delete[] m_udpBuf;
54 }
55 
57 {
58  qDebug("UDPSourceUDPHandler::start");
59 
60  if (!m_dataSocket)
61  {
62  m_dataSocket = new QUdpSocket(this);
63  }
64 
65  if (!m_dataConnected)
66  {
67 
69  {
70  qDebug("UDPSourceUDPHandler::start: bind data socket to %s:%d", m_dataAddress.toString().toStdString().c_str(), m_dataPort);
71  connect(m_dataSocket, SIGNAL(readyRead()), this, SLOT(dataReadyRead())); // , Qt::QueuedConnection gets stuck since Qt 5.8.0
72  m_dataConnected = true;
73  }
74  else
75  {
76  qWarning("UDPSourceUDPHandler::start: cannot bind data socket to %s:%d", m_dataAddress.toString().toStdString().c_str(), m_dataPort);
77  m_dataConnected = false;
78  }
79  }
80 }
81 
83 {
84  qDebug("UDPSourceUDPHandler::stop");
85 
86  if (m_dataConnected)
87  {
88  m_dataConnected = false;
89  disconnect(m_dataSocket, SIGNAL(readyRead()), this, SLOT(dataReadyRead()));
90  }
91 
92  if (m_dataSocket)
93  {
94  delete m_dataSocket;
95  m_dataSocket = 0;
96  }
97 }
98 
100 {
101  while (m_dataSocket->hasPendingDatagrams() && m_dataConnected)
102  {
103  qint64 pendingDataSize = m_dataSocket->pendingDatagramSize();
104  qint64 bytesRead = m_dataSocket->readDatagram(&m_udpDump[m_udpDumpIndex], pendingDataSize, &m_remoteAddress, &m_remotePort);
105 
106  if (bytesRead < 0)
107  {
108  qWarning("UDPSourceUDPHandler::dataReadyRead: UDP read error");
109  }
110  else
111  {
112  int udpDumpSize = m_udpDumpIndex + bytesRead;
113  int udpDumpPtr = 0;
114 
115  while (udpDumpSize >= m_udpBlockSize)
116  {
117  moveData(&m_udpDump[udpDumpPtr]);
118  udpDumpPtr += m_udpBlockSize;
119  udpDumpSize -= m_udpBlockSize;
120  }
121 
122  if (udpDumpSize > 0)
123  {
124  memcpy(m_udpDump, &m_udpDump[udpDumpPtr], udpDumpSize);
125  }
126 
127  m_udpDumpIndex = udpDumpSize;
128  }
129  }
130 }
131 
133 {
135 
136  if (m_writeFrameIndex < m_nbUDPFrames - 1) {
137  m_writeFrameIndex++;
138  } else {
139  m_writeFrameIndex = 0;
140  }
141 }
142 
144 {
145  if (m_readFrameIndex == m_writeFrameIndex) // block until more writes
146  {
147  t = 0;
148  }
149  else
150  {
151  memcpy(&t, &m_udpBuf[m_readFrameIndex][m_readIndex], sizeof(qint16));
152  advanceReadPointer((int) sizeof(qint16));
153  }
154 }
155 
157 {
158  if (m_readFrameIndex == m_writeFrameIndex) // block until more writes
159  {
160  a.l = 0;
161  a.r = 0;
162  }
163  else
164  {
165  memcpy(&a, &m_udpBuf[m_readFrameIndex][m_readIndex], sizeof(AudioSample));
166  advanceReadPointer((int) sizeof(AudioSample));
167  }
168 }
169 
171 {
172  if (m_readFrameIndex == m_writeFrameIndex) // block until more writes
173  {
174  s.m_real = 0;
175  s.m_imag = 0;
176  }
177  else
178  {
179  memcpy(&s, &m_udpBuf[m_readFrameIndex][m_readIndex], sizeof(Sample));
180  advanceReadPointer((int) sizeof(Sample));
181  }
182 }
183 
185 {
186  if (m_readIndex < m_udpBlockSize - 2*nbBytes)
187  {
188  m_readIndex += nbBytes;
189  }
190  else
191  {
192  m_readIndex = 0;
193 
195  {
197  }
198  else
199  {
200  m_rwDelta = m_writeFrameIndex; // raw R/W delta estimate
201  int nbUDPFrames2 = m_nbUDPFrames/2;
202  float d = (m_rwDelta - nbUDPFrames2)/(float) m_nbUDPFrames;
203  //qDebug("UDPSourceUDPHandler::advanceReadPointer: w: %02d d: %f", m_writeIndex, d);
204 
205  if ((d < -0.45) || (d > 0.45))
206  {
207  resetReadIndex();
208  }
209  else
210  {
211  float dd = d - m_d; // derivative
212  float c = (d / 15.0) + (dd / 20.0); // damping and scaling
213  c = c < -0.05 ? -0.05 : c > 0.05 ? 0.05 : c; // limit
215 
218  }
219 
220  m_readFrameIndex = 0;
221  m_d = d;
222  }
223  }
224  }
225 }
226 
227 void UDPSourceUDPHandler::configureUDPLink(const QString& address, quint16 port)
228 {
229  Message* msg = MsgUDPAddressAndPort::create(address, port);
231 }
232 
233 void UDPSourceUDPHandler::applyUDPLink(const QString& address, quint16 port)
234 {
235  qDebug("UDPSourceUDPHandler::configureUDPLink: %s:%d", address.toStdString().c_str(), port);
236  bool addressOK = m_dataAddress.setAddress(address);
237 
238  if (!addressOK)
239  {
240  qWarning("UDPSourceUDPHandler::configureUDPLink: invalid address %s. Set to localhost.", address.toStdString().c_str());
241  m_dataAddress = QHostAddress::LocalHost;
242  }
243 
244  stop();
245  m_dataPort = port;
246  resetReadIndex();
247  start();
248 }
249 
251 {
254  m_readIndex = 0;
255  m_d = 0.0f;
256 }
257 
258 void UDPSourceUDPHandler::resizeBuffer(float sampleRate)
259 {
260  int halfNbFrames = std::max((sampleRate / 375.0), (m_minNbUDPFrames / 2.0));
261  qDebug("UDPSourceUDPHandler::resizeBuffer: nb_frames: %d", 2*halfNbFrames);
262 
263  if (2*halfNbFrames > m_nbAllocatedUDPFrames)
264  {
265  delete[] m_udpBuf;
266  m_udpBuf = new udpBlk_t[2*halfNbFrames];
267  m_nbAllocatedUDPFrames = 2*halfNbFrames;
268  }
269 
270  m_nbUDPFrames = 2*halfNbFrames;
271  m_writeFrameIndex = 0;
272 
273  resetReadIndex();
274 }
275 
277 {
278  Message* message;
279 
280  while ((message = m_inputMessageQueue.pop()) != 0)
281  {
282  if (handleMessage(*message))
283  {
284  delete message;
285  }
286  }
287 }
288 
290 {
292  {
294  applyUDPLink(notif.getAddress(), notif.getPort());
295  return true;
296  }
297  else
298  {
299  return false;
300  }
301 }
302 
303 
304 
void applyUDPLink(const QString &address, quint16 port)
Message * pop()
Pop message from queue.
void push(Message *message, bool emitSignal=true)
Push message onto queue.
static MsgUDPAddressAndPort * create(QString address, quint16 port)
void advanceReadPointer(int nbBytes)
void readSample(qint16 &t)
audio mono
qint16 r
Definition: dsptypes.h:92
#define MESSAGE_CLASS_DEFINITION(Name, BaseClass)
Definition: message.h:52
void configureUDPLink(const QString &address, quint16 port)
FixReal m_real
Definition: dsptypes.h:64
static const int m_minNbUDPFrames
static bool match(const Message *message)
Definition: message.cpp:45
char m_udpDump[m_udpBlockSize+8192]
static MsgSampleRateCorrection * create(float correctionFactor, float rawDeltaRatio)
Definition: udpsourcemsg.h:36
MessageQueue m_inputMessageQueue
qint16 l
Definition: dsptypes.h:91
void resizeBuffer(float sampleRate)
static const int m_udpBlockSize
FixReal m_imag
Definition: dsptypes.h:65
MessageQueue * m_feedbackMessageQueue
bool handleMessage(const Message &message)
T max(const T &x, const T &y)
Definition: framework.h:446