SDRAngel  4.11.5
Developer docs for <a href="https://github.com/f4exb/sdrangel">SDRangel<\a>, an Open Source Qt5 / OpenGL 3.0+ SDR and signal analyzer frontend to various hardware.
remoteinputbuffer.cpp
Go to the documentation of this file.
1 // Copyright (C) 2016 Edouard Griffiths, F4EXB //
3 // //
4 // This program is free software; you can redistribute it and/or modify //
5 // it under the terms of the GNU General Public License as published by //
6 // the Free Software Foundation as version 3 of the License, or //
7 // (at your option) any later version. //
8 // //
9 // This program is distributed in the hope that it will be useful, //
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of //
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the //
12 // GNU General Public License V3 for more details. //
13 // //
14 // You should have received a copy of the GNU General Public License //
15 // along with this program. If not, see <http://www.gnu.org/licenses/>. //
17 
18 #include <QDebug>
19 #include <cassert>
20 #include <cstring>
21 #include <cmath>
22 #include <algorithm>
23 #include <boost/crc.hpp>
24 #include <boost/cstdint.hpp>
25 #include "remoteinputbuffer.h"
26 
27 
28 
30  m_decoderIndexHead(nbDecoderSlots/2),
31  m_frameHead(0),
32  m_curNbBlocks(0),
33  m_minNbBlocks(256),
34  m_curOriginalBlocks(0),
35  m_minOriginalBlocks(128),
36  m_curNbRecovery(0),
37  m_maxNbRecovery(0),
38  m_framesDecoded(true),
39  m_readIndex(0),
40  m_readBuffer(0),
41  m_readSize(0),
42  m_bufferLenSec(0.0f),
43  m_nbReads(0),
44  m_nbWrites(0),
45  m_balCorrection(0),
46  m_balCorrLimit(0)
47 {
51  m_tvOut_sec = 0;
52  m_tvOut_usec = 0;
53  m_readNbBytes = 1;
54  m_paramsCM256.BlockBytes = sizeof(RemoteProtectedBlock); // never changes
55  m_paramsCM256.OriginalCount = RemoteNbOrginalBlocks; // never changes
56 
57  if (!m_cm256.isInitialized()) {
58  m_cm256_OK = false;
59  qDebug() << "RemoteInputBuffer::RemoteInputBuffer: cannot initialize CM256 library";
60  } else {
61  m_cm256_OK = true;
62  }
63 
66 }
67 
69 {
70  if (m_readBuffer) {
71  delete[] m_readBuffer;
72  }
73 }
74 
76 {
77  for (int i = 0; i < nbDecoderSlots; i++)
78  {
82  m_decoderSlots[i].m_decoded = false;
85  memset((void *) m_decoderSlots[i].m_recoveryBlocks, 0, RemoteNbOrginalBlocks * sizeof(RemoteProtectedBlock));
86  }
87 }
88 
90 {
91  // collect stats before voiding the slot
92 
100 
103  }
104 
107  }
108 
111  }
112 
113  // void the slot
114 
115  m_decoderSlots[slotIndex].m_blockCount = 0;
116  m_decoderSlots[slotIndex].m_originalCount = 0;
117  m_decoderSlots[slotIndex].m_recoveryCount = 0;
118  m_decoderSlots[slotIndex].m_decoded = false;
119  m_decoderSlots[slotIndex].m_metaRetrieved = false;
120 
121  resetOriginalBlocks(slotIndex);
122  memset((void *) m_decoderSlots[slotIndex].m_recoveryBlocks, 0, RemoteNbOrginalBlocks * sizeof(RemoteProtectedBlock));
123 }
124 
126 {
129  m_nbReads = 0;
130  m_nbWrites = 0;
131 }
132 
134 {
135  if (m_nbReads >= 40) // check every ~1s as tick is ~50ms
136  {
137  int targetPivotSlot = (slotIndex + (nbDecoderSlots/2)) % nbDecoderSlots; // slot at half buffer opposite of current write slot
138  int targetPivotIndex = targetPivotSlot * sizeof(BufferFrame); // buffer index corresponding to start of above slot
139  int normalizedReadIndex = (m_readIndex < targetPivotIndex ? m_readIndex + nbDecoderSlots * sizeof(BufferFrame) : m_readIndex)
140  - (targetPivotSlot * sizeof(BufferFrame)); // normalize read index so it is positive and zero at start of pivot slot
141  int dBytes;
142  int rwDelta = (m_nbReads * m_readNbBytes) - (m_nbWrites * sizeof(BufferFrame));
143 
144  if (normalizedReadIndex < (nbDecoderSlots/ 2) * (int) sizeof(BufferFrame)) // read leads
145  {
146  dBytes = - normalizedReadIndex - rwDelta;
147  }
148  else // read lags
149  {
150  int bufSize = (nbDecoderSlots * sizeof(BufferFrame));
151  dBytes = bufSize - normalizedReadIndex - rwDelta;
152  }
153 
154  // calculate exponential moving average on floating point for better accuracy (was int)
155  double newCorrection = ((double) dBytes) / (((int) m_currentMeta.m_sampleBytes) * 2 * m_nbReads);
156  m_balCorrection = 0.25*m_balCorrection + 0.75*newCorrection; // exponential average with alpha = 0.75 (original is wrong)
157  //m_balCorrection = (m_balCorrection / 4) + (dBytes / (int) (m_currentMeta.m_sampleBytes * 2 * m_nbReads)); // correction is in number of samples. Alpha = 0.25
158 
161  } else if (m_balCorrection > m_balCorrLimit) {
163  }
164 
165  m_nbReads = 0;
166  m_nbWrites = 0;
167  }
168 }
169 
171 {
172  int pseudoWriteIndex = slotIndex * sizeof(BufferFrame);
173  m_wrDeltaEstimate = pseudoWriteIndex - m_readIndex;
174  int rwDelayBytes = (m_wrDeltaEstimate > 0 ? m_wrDeltaEstimate : sizeof(BufferFrame) * nbDecoderSlots + m_wrDeltaEstimate);
175  int sampleRate = m_currentMeta.m_sampleRate;
176 
177  if (sampleRate > 0)
178  {
180  ts -= (rwDelayBytes * 1000000LL) / (sampleRate * 2 * m_currentMeta.m_sampleBytes);
181  m_tvOut_sec = ts / 1000000LL;
182  m_tvOut_usec = ts - (m_tvOut_sec * 1000000LL);
183  }
184 
185  if (!m_decoderSlots[slotIndex].m_decoded)
186  {
187  qDebug() << "RemoteInputBuffer::checkSlotData: incomplete frame:"
188  << " slotIndex: " << slotIndex
189  << " m_blockCount: " << m_decoderSlots[slotIndex].m_blockCount
190  << " m_recoveryCount: " << m_decoderSlots[slotIndex].m_recoveryCount;
191  }
192 }
193 
195 {
196  RemoteSuperBlock *superBlock = (RemoteSuperBlock *) array;
197  int frameIndex = superBlock->m_header.m_frameIndex;
198  int decoderIndex = frameIndex % nbDecoderSlots;
199 
200  // frame break
201 
202  if (m_frameHead == -1) // initial state
203  {
204  m_decoderIndexHead = decoderIndex; // new decoder slot head
205  m_frameHead = frameIndex;
206  initReadIndex(); // reset read index
207  initDecodeAllSlots(); // initialize all slots
208  }
209  else if (m_frameHead != frameIndex) // frame break => new frame starts
210  {
211  m_decoderIndexHead = decoderIndex; // new decoder slot head
212  m_frameHead = frameIndex; // new frame head
213  checkSlotData(decoderIndex); // check slot before re-init
214  rwCorrectionEstimate(decoderIndex);
215  m_nbWrites++;
216  initDecodeSlot(decoderIndex); // collect stats and re-initialize current slot
217  }
218 
219  // Block processing
220 
221  if (m_decoderSlots[decoderIndex].m_blockCount < RemoteNbOrginalBlocks) // not enough blocks to decode -> store data
222  {
223  int blockIndex = superBlock->m_header.m_blockIndex;
224  int blockCount = m_decoderSlots[decoderIndex].m_blockCount;
225  int recoveryCount = m_decoderSlots[decoderIndex].m_recoveryCount;
226  m_decoderSlots[decoderIndex].m_cm256DescriptorBlocks[blockCount].Index = blockIndex;
227 
228  if (blockIndex == 0) // first block with meta
229  {
230  m_decoderSlots[decoderIndex].m_metaRetrieved = true;
231  }
232 
233  if (blockIndex < RemoteNbOrginalBlocks) // original data
234  {
235  m_decoderSlots[decoderIndex].m_cm256DescriptorBlocks[blockCount].Block = (void *) storeOriginalBlock(decoderIndex, blockIndex, superBlock->m_protectedBlock);
236  m_decoderSlots[decoderIndex].m_originalCount++;
237  }
238  else // recovery data
239  {
240  m_decoderSlots[decoderIndex].m_recoveryBlocks[recoveryCount] = superBlock->m_protectedBlock;
241  m_decoderSlots[decoderIndex].m_cm256DescriptorBlocks[blockCount].Block = (void *) &m_decoderSlots[decoderIndex].m_recoveryBlocks[recoveryCount];
242  m_decoderSlots[decoderIndex].m_recoveryCount++;
243  }
244  }
245 
246  m_decoderSlots[decoderIndex].m_blockCount++;
247 
248  if (m_decoderSlots[decoderIndex].m_blockCount == RemoteNbOrginalBlocks) // ready to decode
249  {
250  m_decoderSlots[decoderIndex].m_decoded = true;
251 
252  if (m_cm256_OK && (m_decoderSlots[decoderIndex].m_recoveryCount > 0)) // recovery data used => need to decode FEC
253  {
254  m_paramsCM256.BlockBytes = sizeof(RemoteProtectedBlock); // never changes
255  m_paramsCM256.OriginalCount = RemoteNbOrginalBlocks; // never changes
256 
257  if (m_decoderSlots[decoderIndex].m_metaRetrieved) {
258  m_paramsCM256.RecoveryCount = m_currentMeta.m_nbFECBlocks;
259  } else {
260  m_paramsCM256.RecoveryCount = m_decoderSlots[decoderIndex].m_recoveryCount;
261  }
262 
263  if (m_cm256.cm256_decode(m_paramsCM256, m_decoderSlots[decoderIndex].m_cm256DescriptorBlocks)) // CM256 decode
264  {
265  qDebug() << "RemoteInputBuffer::writeData: decode CM256 error:"
266  << " decoderIndex: " << decoderIndex
267  << " m_blockCount: " << m_decoderSlots[decoderIndex].m_blockCount
268  << " m_originalCount: " << m_decoderSlots[decoderIndex].m_originalCount
269  << " m_recoveryCount: " << m_decoderSlots[decoderIndex].m_recoveryCount;
270  }
271  else
272  {
273  qDebug() << "RemoteInputBuffer::writeData: decode CM256 success:"
274  << " decoderIndex: " << decoderIndex
275  << " m_blockCount: " << m_decoderSlots[decoderIndex].m_blockCount
276  << " m_originalCount: " << m_decoderSlots[decoderIndex].m_originalCount
277  << " m_recoveryCount: " << m_decoderSlots[decoderIndex].m_recoveryCount;
278 
279  for (int ir = 0; ir < m_decoderSlots[decoderIndex].m_recoveryCount; ir++) // restore missing blocks
280  {
281  int recoveryIndex = RemoteNbOrginalBlocks - m_decoderSlots[decoderIndex].m_recoveryCount + ir;
282  int blockIndex = m_decoderSlots[decoderIndex].m_cm256DescriptorBlocks[recoveryIndex].Index;
283  RemoteProtectedBlock *recoveredBlock = (RemoteProtectedBlock *) m_decoderSlots[decoderIndex].m_cm256DescriptorBlocks[recoveryIndex].Block;
284 
285  if (blockIndex == 0) // first block with meta
286  {
287  RemoteMetaDataFEC *metaData = (RemoteMetaDataFEC *) recoveredBlock;
288 
289  boost::crc_32_type crc32;
290  crc32.process_bytes(metaData, sizeof(RemoteMetaDataFEC)-4);
291 
292  if (crc32.checksum() == metaData->m_crc32)
293  {
294  m_decoderSlots[decoderIndex].m_metaRetrieved = true;
295  printMeta("RemoteInputBuffer::writeData: recovered meta", metaData);
296  }
297  else
298  {
299  qDebug() << "RemoteInputBuffer::writeData: recovered meta: invalid CRC32";
300  }
301  }
302 
303  storeOriginalBlock(decoderIndex, blockIndex, *recoveredBlock);
304 
305  qDebug() << "RemoteInputBuffer::writeData: recovered block #" << blockIndex;
306  } // restore missing blocks
307  } // CM256 decode
308  } // recovery
309 
310  if (m_decoderSlots[decoderIndex].m_metaRetrieved) // block zero with its meta data has been received
311  {
312  RemoteMetaDataFEC *metaData = getMetaData(decoderIndex);
313 
314  if (!(*metaData == m_currentMeta))
315  {
316  uint32_t sampleRate = metaData->m_sampleRate;
317 
318  if (sampleRate != 0)
319  {
320  m_bufferLenSec = (float) m_framesNbBytes / (float) (sampleRate * metaData->m_sampleBytes * 2);
321  m_balCorrLimit = sampleRate / 400; // +/- 5% correction max per read
322  m_readNbBytes = (sampleRate * metaData->m_sampleBytes * 2) / 20;
323  }
324 
325  printMeta("RemoteInputBuffer::writeData: new meta", metaData); // print for change other than timestamp
326  }
327 
328  m_currentMeta = *metaData; // renew current meta
329  } // check block 0
330  } // decode
331 }
332 
334 {
335  uint8_t *buffer = (uint8_t *) m_frames;
336  uint32_t readIndex = m_readIndex;
337 
338  m_nbReads++;
339 
340  // SEGFAULT FIX: arbitratily truncate so that it does not exceed buffer length
341  if (length > framesSize) {
342  length = framesSize;
343  }
344 
345  if (m_readIndex + length < m_framesNbBytes) // ends before buffer bound
346  {
347  m_readIndex += length;
348  return &buffer[readIndex];
349  }
350  else if (m_readIndex + length == m_framesNbBytes) // ends at buffer bound
351  {
352  m_readIndex = 0;
353  return &buffer[readIndex];
354  }
355  else // ends after buffer bound
356  {
357  if (length > m_readSize) // reallocate composition buffer if necessary
358  {
359  if (m_readBuffer) {
360  delete[] m_readBuffer;
361  }
362 
363  m_readBuffer = new uint8_t[length];
364  m_readSize = length;
365  }
366 
367  std::memcpy((void *) m_readBuffer, (const void *) &buffer[m_readIndex], m_framesNbBytes - m_readIndex); // copy end of buffer
368  length -= m_framesNbBytes - m_readIndex;
369  std::memcpy((void *) &m_readBuffer[m_framesNbBytes - m_readIndex], (const void *) buffer, length); // copy start of buffer
370  m_readIndex = length;
371  return m_readBuffer;
372  }
373 }
374 
375 void RemoteInputBuffer::printMeta(const QString& header, RemoteMetaDataFEC *metaData)
376 {
377  qDebug() << header << ": "
378  << "|" << metaData->m_centerFrequency
379  << ":" << metaData->m_sampleRate
380  << ":" << (int) (metaData->m_sampleBytes & 0xF)
381  << ":" << (int) metaData->m_sampleBits
382  << ":" << (int) metaData->m_nbOriginalBlocks
383  << ":" << (int) metaData->m_nbFECBlocks
384  << "|" << metaData->m_tv_sec
385  << ":" << metaData->m_tv_usec
386  << "|";
387 }
uint32_t m_sampleRate
12 sample rate in Hz
int m_readNbBytes
Nominal number of bytes per read (50ms)
int m_readIndex
current byte read index in frames buffer
RemoteMetaDataFEC m_currentMeta
Stored current meta data.
static void printMeta(const QString &header, RemoteMetaDataFEC *metaData)
int m_minOriginalBlocks
(stats) minimum number of original blocks received since last poll
CM256::cm256_block m_cm256DescriptorBlocks[RemoteNbOrginalBlocks]
CM256 decoder descriptors (block addresses and block indexes)
void resetOriginalBlocks(int slotIndex)
uint8_t * readData(int32_t length)
Read data from buffer.
RemoteProtectedBlock m_protectedBlock
int m_maxNbRecovery
(stats) maximum number of recovery blocks used since last poll
static const int nbDecoderSlots
int m_blockCount
number of blocks received for this frame
MovingAverageUtil< int, int, 10 > m_avgNbBlocks
(stats) average number of blocks received
RemoteProtectedBlock * storeOriginalBlock(int slotIndex, int blockIndex, const RemoteProtectedBlock &protectedBlock)
int m_balCorrection
R/W balance correction in number of samples.
__int64 int64_t
Definition: rtptypes_win.h:47
CM256 m_cm256
CM256 library.
int m_curNbRecovery
(stats) instantaneous number of recovery blocks used
int m_curNbBlocks
(stats) instantaneous number of blocks received
unsigned int uint32_t
Definition: rtptypes_win.h:46
bool m_framesDecoded
[stats] true if all frames were decoded since last poll
uint8_t * m_readBuffer
Read buffer to hold samples when looping back to beginning of raw buffer.
int m_framesNbBytes
Number of bytes in samples buffer.
int m_recoveryCount
number of recovery blocks received
uint8_t m_sampleBytes
13 4 LSB: number of bytes per sample (2 or 4)
unsigned char uint8_t
Definition: rtptypes_win.h:42
int m_originalCount
number of original blocks received
static const int framesSize
int m_readSize
Read buffer size.
void initDecodeSlot(int slotIndex)
uint32_t m_crc32
28 CRC32 of the above
int m_nbWrites
Number of buffer writes since start of auto R/W balance correction period.
int m_balCorrLimit
Correction absolute value limit in number of samples.
RemoteProtectedBlock m_recoveryBlocks[RemoteNbOrginalBlocks]
Recovery blocks (FEC blocks) with max size.
uint8_t m_nbFECBlocks
16 number of blocks carrying FEC
int32_t i
Definition: decimators.h:244
uint32_t m_tv_usec
24 microseconds of timestamp at start time of super-frame processing
uint8_t m_blockIndex
uint8_t m_sampleBits
14 number of effective bits per sample (deprecated)
MovingAverageUtil< int, int, 10 > m_avgOrigBlocks
(stats) average number of original blocks received
int int32_t
Definition: rtptypes_win.h:45
CM256::cm256_encoder_params m_paramsCM256
CM256 decoder parameters block.
int m_frameHead
index of the current head frame sent
BufferFrame m_frames[nbDecoderSlots]
Samples buffer.
RemoteMetaDataFEC * getMetaData(int slotIndex)
uint32_t m_tv_sec
20 seconds of timestamp at start time of super-frame processing
uint32_t m_tvOut_usec
Estimated returned samples timestamp (microseconds)
uint32_t m_tvOut_sec
Estimated returned samples timestamp (seconds)
int m_nbReads
Number of buffer reads since start of auto R/W balance correction period.
RemoteHeader m_header
void writeData(char *array)
Write data into buffer.
void rwCorrectionEstimate(int slotIndex)
int m_minNbBlocks
(stats) minimum number of blocks received since last poll
uint16_t m_frameIndex
uint8_t m_nbOriginalBlocks
15 number of blocks with original (protected) data
void checkSlotData(int slotIndex)
int m_decoderIndexHead
index of the current head frame slot in decoding slots
bool m_metaRetrieved
true if meta data (block zero) was retrieved
DecoderSlot m_decoderSlots[nbDecoderSlots]
CM256 decoding control/buffer slots.
uint32_t crc32(const uint8_t *buf, int len)
Definition: crc.h:58
uint64_t m_centerFrequency
8 center frequency in kHz
bool m_cm256_OK
CM256 library initialized OK.
int m_curOriginalBlocks
(stats) instantanous number of original blocks received
int m_wrDeltaEstimate
Sampled estimate of write to read indexes difference.
MovingAverageUtil< int, int, 10 > m_avgNbRecovery
(stats) average number of recovery blocks used