SDRAngel  4.11.5
Developer docs for <a href="https://github.com/f4exb/sdrangel">SDRangel<\a>, an Open Source Qt5 / OpenGL 3.0+ SDR and signal analyzer frontend to various hardware.
udpsinkfecworker.cpp
Go to the documentation of this file.
1 // Copyright (C) 2017 Edouard Griffiths, F4EXB //
3 // //
4 // This program is free software; you can redistribute it and/or modify //
5 // it under the terms of the GNU General Public License as published by //
6 // the Free Software Foundation as version 3 of the License, or //
7 // (at your option) any later version. //
8 // //
9 // This program is distributed in the hope that it will be useful, //
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of //
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
12 // GNU General Public License V3 for more details. //
13 // //
14 // You should have received a copy of the GNU General Public License //
15 // along with this program. If not, see <http://www.gnu.org/licenses/>. //
17 
18 #include "udpsinkfecworker.h"
19 
20 #include <QUdpSocket>
21 
22 
26 
28  m_running(false),
29  m_udpSocket(0),
30  m_remotePort(9090)
31 {
32  m_cm256Valid = m_cm256.isInitialized();
33  connect(&m_inputMessageQueue, SIGNAL(messageEnqueued()), this, SLOT(handleInputMessages()), Qt::QueuedConnection);
34 }
35 
37 {
38 }
39 
41 {
42  MsgStartStop *msg = MsgStartStop::create(start);
44 }
45 
47 {
48  qDebug("UDPSinkFECWorker::startWork");
49  m_startWaitMutex.lock();
50  m_udpSocket = new QUdpSocket(this);
51 
52  start();
53 
54  while(!m_running) {
55  m_startWaiter.wait(&m_startWaitMutex, 100);
56  }
57 
58  m_startWaitMutex.unlock();
59 }
60 
62 {
63  qDebug("UDPSinkFECWorker::stopWork");
64  delete m_udpSocket;
65  m_udpSocket = 0;
66  m_running = false;
67  wait();
68 }
69 
71 {
72  m_running = true;
73  m_startWaiter.wakeAll();
74 
75  qDebug("UDPSinkFECWorker::process: started");
76 
77  while (m_running)
78  {
79  sleep(1);
80  }
81  m_running = false;
82 
83  qDebug("UDPSinkFECWorker::process: stopped");
84 }
85 
87  uint32_t nbBlocksFEC,
88  uint32_t txDelay,
89  uint16_t frameIndex)
90 {
91  //qDebug("UDPSinkFECWorker::pushTxFrame. %d", m_inputMessageQueue.size());
92  m_inputMessageQueue.push(MsgUDPFECEncodeAndSend::create(txBlocks, nbBlocksFEC, txDelay, frameIndex));
93 }
94 
95 void UDPSinkFECWorker::setRemoteAddress(const QString& address, uint16_t port)
96 {
98 }
99 
101 {
102  Message* message;
103 
104  while ((message = m_inputMessageQueue.pop()) != 0)
105  {
106  if (MsgUDPFECEncodeAndSend::match(*message))
107  {
108  MsgUDPFECEncodeAndSend *sendMsg = (MsgUDPFECEncodeAndSend *) message;
109  encodeAndTransmit(sendMsg->getTxBlocks(), sendMsg->getFrameIndex(), sendMsg->getNbBlocsFEC(), sendMsg->getTxDelay());
110  }
111  else if (MsgConfigureRemoteAddress::match(*message))
112  {
113  qDebug("UDPSinkFECWorker::handleInputMessages: %s", message->getIdentifier());
114  MsgConfigureRemoteAddress *addressMsg = (MsgConfigureRemoteAddress *) message;
115  m_remoteAddress = addressMsg->getAddress();
116  m_remotePort = addressMsg->getPort();
117  m_remoteHostAddress.setAddress(addressMsg->getAddress());
118  }
119  else if (MsgStartStop::match(*message))
120  {
121  MsgStartStop* notif = (MsgStartStop*) message;
122  qDebug("UDPSinkFECWorker::handleInputMessages: MsgStartStop: %s", notif->getStartStop() ? "start" : "stop");
123 
124  if (notif->getStartStop()) {
125  startWork();
126  } else {
127  stopWork();
128  }
129  }
130 
131  delete message;
132  }
133 }
134 
135 void UDPSinkFECWorker::encodeAndTransmit(RemoteSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay)
136 {
137  CM256::cm256_encoder_params cm256Params;
138  CM256::cm256_block descriptorBlocks[256];
139  RemoteProtectedBlock fecBlocks[256];
140 
141  if ((nbBlocksFEC == 0) || !m_cm256Valid)
142  {
143  if (m_udpSocket)
144  {
145  for (unsigned int i = 0; i < RemoteNbOrginalBlocks; i++)
146  {
147  m_udpSocket->writeDatagram((const char *) &txBlockx[i], RemoteUdpSize, m_remoteHostAddress, m_remotePort);
148  usleep(txDelay);
149  }
150  }
151  }
152  else
153  {
154  cm256Params.BlockBytes = sizeof(RemoteProtectedBlock);
155  cm256Params.OriginalCount = RemoteNbOrginalBlocks;
156  cm256Params.RecoveryCount = nbBlocksFEC;
157 
158 
159  // Fill pointers to data
160  for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; ++i)
161  {
162  if (i >= cm256Params.OriginalCount) {
163  memset((char *) &txBlockx[i].m_protectedBlock, 0, sizeof(RemoteProtectedBlock));
164  }
165 
166  txBlockx[i].m_header.m_frameIndex = frameIndex;
167  txBlockx[i].m_header.m_blockIndex = i;
168  txBlockx[i].m_header.m_sampleBytes = (SDR_RX_SAMP_SZ <= 16 ? 2 : 4);
170  descriptorBlocks[i].Block = (void *) &(txBlockx[i].m_protectedBlock);
171  descriptorBlocks[i].Index = txBlockx[i].m_header.m_blockIndex;
172  }
173 
174  // Encode FEC blocks
175  if (m_cm256.cm256_encode(cm256Params, descriptorBlocks, fecBlocks))
176  {
177  qDebug("UDPSinkFECWorker::encodeAndTransmit: CM256 encode failed. No transmission.");
178  return;
179  }
180 
181  // Merge FEC with data to transmit
182  for (int i = 0; i < cm256Params.RecoveryCount; i++)
183  {
184  txBlockx[i + cm256Params.OriginalCount].m_protectedBlock = fecBlocks[i];
185  }
186 
187  // Transmit all blocks
188  if (m_udpSocket)
189  {
190  for (int i = 0; i < cm256Params.OriginalCount + cm256Params.RecoveryCount; i++)
191  {
192  #ifdef REMOTE_PUNCTURE
193  if (i == REMOTE_PUNCTURE) {
194  continue;
195  }
196  #endif
197 
198  m_udpSocket->writeDatagram((const char *) &txBlockx[i], RemoteUdpSize, m_remoteHostAddress, m_remotePort);
199  usleep(txDelay);
200  }
201  }
202  }
203 }
204 
205 
206 
207 
Message * pop()
Pop message from queue.
void push(Message *message, bool emitSignal=true)
Push message onto queue.
QWaitCondition m_startWaiter
static MsgStartStop * create(bool startStop)
QHostAddress m_remoteHostAddress
void pushTxFrame(RemoteSuperBlock *txBlocks, uint32_t nbBlocksFEC, uint32_t txDelay, uint16_t frameIndex)
static MsgUDPFECEncodeAndSend * create(RemoteSuperBlock *txBlocks, uint32_t nbBlocksFEC, uint32_t txDelay, uint16_t frameIndex)
QUdpSocket * m_udpSocket
void encodeAndTransmit(RemoteSuperBlock *txBlockx, uint16_t frameIndex, uint32_t nbBlocksFEC, uint32_t txDelay)
RemoteProtectedBlock m_protectedBlock
unsigned int uint32_t
Definition: rtptypes_win.h:46
void startStop(bool start)
#define SDR_RX_SAMP_SZ
Definition: dsptypes.h:32
#define MESSAGE_CLASS_DEFINITION(Name, BaseClass)
Definition: message.h:52
unsigned short uint16_t
Definition: rtptypes_win.h:44
MessageQueue m_inputMessageQueue
Queue for asynchronous inbound communication.
uint8_t m_sampleBytes
number of bytes per sample (2 or 4) for this block
int32_t i
Definition: decimators.h:244
uint8_t m_blockIndex
static bool match(const Message *message)
Definition: message.cpp:45
void setRemoteAddress(const QString &address, uint16_t port)
RemoteSuperBlock * getTxBlocks() const
RemoteHeader m_header
CM256 m_cm256
CM256 library object.
uint16_t m_frameIndex
virtual const char * getIdentifier() const
Definition: message.cpp:35
bool m_cm256Valid
true if CM256 library is initialized correctly
static MsgConfigureRemoteAddress * create(const QString &address, uint16_t port)
uint8_t m_sampleBits
number of bits per sample
volatile bool m_running