SDRAngel  4.11.5
Developer docs for <a href="https://github.com/f4exb/sdrangel">SDRangel<\a>, an Open Source Qt5 / OpenGL 3.0+ SDR and signal analyzer frontend to various hardware.
remotesinkthread.cpp
Go to the documentation of this file.
1 // Copyright (C) 2018-2019 Edouard Griffiths, F4EXB. //
3 // //
4 // Remote sink channel (Rx) UDP sender thread //
5 // //
6 // SDRangel can work as a detached SDR front end. With this plugin it can //
7 // sends the I/Q samples stream to another SDRangel instance via UDP. //
8 // It is controlled via a Web REST API. //
9 // //
10 // This program is free software; you can redistribute it and/or modify //
11 // it under the terms of the GNU General Public License as published by //
12 // the Free Software Foundation as version 3 of the License, or //
13 // (at your option) any later version. //
14 // //
15 // This program is distributed in the hope that it will be useful, //
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of //
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
18 // GNU General Public License V3 for more details. //
19 // //
20 // You should have received a copy of the GNU General Public License //
21 // along with this program. If not, see <http://www.gnu.org/licenses/>. //
23 
24 #include "remotesinkthread.h"
25 
27 #include <QUdpSocket>
28 
29 #include "cm256cc/cm256.h"
30 
32 
34  QThread(parent),
35  m_running(false),
36  m_address(QHostAddress::LocalHost),
37  m_socket(0)
38 {
39 
40  m_cm256p = m_cm256.isInitialized() ? &m_cm256 : 0;
41  connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
42 }
43 
45 {
46  qDebug("RemoteSinkThread::~RemoteSinkThread");
47 }
48 
50 {
51  MsgStartStop *msg = MsgStartStop::create(start);
53 }
54 
56 {
57  qDebug("RemoteSinkThread::startWork");
58  m_startWaitMutex.lock();
59  m_socket = new QUdpSocket(this);
60  start();
61  while(!m_running)
62  m_startWaiter.wait(&m_startWaitMutex, 100);
63  m_startWaitMutex.unlock();
64 }
65 
67 {
68  qDebug("RemoteSinkThread::stopWork");
69  delete m_socket;
70  m_socket = 0;
71  m_running = false;
72  wait();
73 }
74 
76 {
77  qDebug("RemoteSinkThread::run: begin");
78  m_running = true;
79  m_startWaiter.wakeAll();
80 
81  while (m_running)
82  {
83  sleep(1); // Do nothing as everything is in the data handler (dequeuer)
84  }
85 
86  m_running = false;
87  qDebug("RemoteSinkThread::run: end");
88 }
89 
91 {
92  handleDataBlock(*dataBlock);
93  delete dataBlock;
94 }
95 
97 {
98  CM256::cm256_encoder_params cm256Params;
99  CM256::cm256_block descriptorBlocks[256];
100  RemoteProtectedBlock fecBlocks[256];
101 
102  uint16_t frameIndex = dataBlock.m_txControlBlock.m_frameIndex;
103  int nbBlocksFEC = dataBlock.m_txControlBlock.m_nbBlocksFEC;
104  int txDelay = dataBlock.m_txControlBlock.m_txDelay;
105  m_address.setAddress(dataBlock.m_txControlBlock.m_dataAddress);
106  uint16_t dataPort = dataBlock.m_txControlBlock.m_dataPort;
107  RemoteSuperBlock *txBlockx = dataBlock.m_superBlocks;
108 
109  if ((nbBlocksFEC == 0) || !m_cm256p) // Do not FEC encode
110  {
111  if (m_socket)
112  {
113  for (int i = 0; i < RemoteNbOrginalBlocks; i++)
114  {
115  // send block via UDP
116  m_socket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort);
117  usleep(txDelay);
118  }
119  }
120  }
121  else
122  {
123  cm256Params.BlockBytes = sizeof(RemoteProtectedBlock);
124  cm256Params.OriginalCount = RemoteNbOrginalBlocks;
125  cm256Params.RecoveryCount = nbBlocksFEC;
126 
127  // Fill pointers to data
128  for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i)
129  {
130  if (i >= cm256Params.OriginalCount) {
131  memset((void *) &txBlockx[i].m_protectedBlock, 0, sizeof(RemoteProtectedBlock));
132  }
133 
134  txBlockx[i].m_header.m_frameIndex = frameIndex;
135  txBlockx[i].m_header.m_blockIndex = i;
136  txBlockx[i].m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4);
138  descriptorBlocks[i].Block = (void *) &(txBlockx[i].m_protectedBlock);
139  descriptorBlocks[i].Index = txBlockx[i].m_header.m_blockIndex;
140  }
141 
142  // Encode FEC blocks
143  if (m_cm256p->cm256_encode(cm256Params, descriptorBlocks, fecBlocks))
144  {
145  qWarning("RemoteSinkThread::handleDataBlock: CM256 encode failed. No transmission.");
146  // TODO: send without FEC changing meta data to set indication of no FEC
147  }
148 
149  // Merge FEC with data to transmit
150  for (int i = 0; i < cm256Params.RecoveryCount; i++)
151  {
152  txBlockx[i + cm256Params.OriginalCount].m_protectedBlock = fecBlocks[i];
153  }
154 
155  // Transmit all blocks
156  if (m_socket)
157  {
158  for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++)
159  {
160  // send block via UDP
161  m_socket->writeDatagram((const char*)&txBlockx[i], (qint64 ) RemoteUdpSize, m_address, dataPort);
162  usleep(txDelay);
163  }
164  }
165  }
166 
167  dataBlock.m_txControlBlock.m_processed = true;
168 }
169 
171 {
172  Message* message;
173 
174  while ((message = m_inputMessageQueue.pop()) != 0)
175  {
176  if (MsgStartStop::match(*message))
177  {
178  MsgStartStop* notif = (MsgStartStop*) message;
179  qDebug("RemoteSinkThread::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop");
180 
181  if (notif->getStartStop()) {
182  startWork();
183  } else {
184  stopWork();
185  }
186 
187  delete message;
188  }
189  }
190 }
Message * pop()
Pop message from queue.
void push(Message *message, bool emitSignal=true)
Push message onto queue.
QUdpSocket * m_socket
RemoteSuperBlock * m_superBlocks
QHostAddress m_address
RemoteProtectedBlock m_protectedBlock
static MsgStartStop * create(bool startStop)
#define SDR_RX_SAMP_SZ
Definition: dsptypes.h:32
MessageQueue m_inputMessageQueue
#define MESSAGE_CLASS_DEFINITION(Name, BaseClass)
Definition: message.h:52
void handleDataBlock(RemoteDataBlock &dataBlock)
unsigned short uint16_t
Definition: rtptypes_win.h:44
uint8_t m_sampleBytes
number of bytes per sample (2 or 4) for this block
int32_t i
Definition: decimators.h:244
QWaitCondition m_startWaiter
uint8_t m_blockIndex
static bool match(const Message *message)
Definition: message.cpp:45
volatile bool m_running
RemoteTxControlBlock m_txControlBlock
RemoteHeader m_header
uint16_t m_frameIndex
void startStop(bool start)
void processDataBlock(RemoteDataBlock *dataBlock)
uint8_t m_sampleBits
number of bits per sample