Restoring voice_codec.py
All checks were successful
/ mirror (push) Successful in 4s

This commit is contained in:
STCB 2025-07-07 00:47:17 +02:00
parent f8a7aa0147
commit 07361b8448

View File

@ -0,0 +1,714 @@
"""
Voice codec integration for encrypted voice over GSM.
Implements Codec2 compression with FSK modulation for transmitting
encrypted voice data over standard GSM voice channels.
"""
import array
import math
import struct
from typing import Optional, Tuple, List
from dataclasses import dataclass
from enum import IntEnum
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# ANSI colors
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"
class Codec2Mode(IntEnum):
"""Codec2 bitrate modes."""
MODE_3200 = 0 # 3200 bps
MODE_2400 = 1 # 2400 bps
MODE_1600 = 2 # 1600 bps
MODE_1400 = 3 # 1400 bps
MODE_1300 = 4 # 1300 bps
MODE_1200 = 5 # 1200 bps (recommended for robustness)
MODE_700C = 6 # 700 bps
@dataclass
class Codec2Frame:
"""Represents a single Codec2 compressed voice frame."""
mode: Codec2Mode
bits: bytes
timestamp: float
frame_number: int
class Codec2Wrapper:
"""
Wrapper for Codec2 voice codec.
In production, this would use py_codec2 or ctypes bindings to libcodec2.
This is a simulation interface for protocol development.
"""
# Frame sizes in bits for each mode
FRAME_BITS = {
Codec2Mode.MODE_3200: 64,
Codec2Mode.MODE_2400: 48,
Codec2Mode.MODE_1600: 64,
Codec2Mode.MODE_1400: 56,
Codec2Mode.MODE_1300: 52,
Codec2Mode.MODE_1200: 48,
Codec2Mode.MODE_700C: 28
}
# Frame duration in ms
FRAME_MS = {
Codec2Mode.MODE_3200: 20,
Codec2Mode.MODE_2400: 20,
Codec2Mode.MODE_1600: 40,
Codec2Mode.MODE_1400: 40,
Codec2Mode.MODE_1300: 40,
Codec2Mode.MODE_1200: 40,
Codec2Mode.MODE_700C: 40
}
def __init__(self, mode: Codec2Mode = Codec2Mode.MODE_1200):
"""
Initialize Codec2 wrapper.
Args:
mode: Codec2 bitrate mode (default 1200 bps for robustness)
"""
self.mode = mode
self.frame_bits = self.FRAME_BITS[mode]
self.frame_bytes = (self.frame_bits + 7) // 8
self.frame_ms = self.FRAME_MS[mode]
self.frame_samples = int(8000 * self.frame_ms / 1000) # 8kHz sampling
self.frame_counter = 0
# Quiet initialization - no print
def encode(self, audio_samples) -> Optional[Codec2Frame]:
"""
Encode PCM audio samples to Codec2 frame.
Args:
audio_samples: PCM samples (8kHz, 16-bit signed)
Returns:
Codec2Frame or None if insufficient samples
"""
if len(audio_samples) < self.frame_samples:
return None
# In production: call codec2_encode(state, bits, samples)
# Simulation: create pseudo-compressed data
compressed = self._simulate_compression(audio_samples[:self.frame_samples])
frame = Codec2Frame(
mode=self.mode,
bits=compressed,
timestamp=self.frame_counter * self.frame_ms / 1000.0,
frame_number=self.frame_counter
)
self.frame_counter += 1
return frame
def decode(self, frame: Codec2Frame):
"""
Decode Codec2 frame to PCM audio samples.
Args:
frame: Codec2 compressed frame
Returns:
PCM samples (8kHz, 16-bit signed)
"""
if frame.mode != self.mode:
raise ValueError(f"Frame mode {frame.mode} doesn't match decoder mode {self.mode}")
# In production: call codec2_decode(state, samples, bits)
# Simulation: decompress to audio
return self._simulate_decompression(frame.bits)
def _simulate_compression(self, samples) -> bytes:
"""Simulate Codec2 compression (for testing)."""
# Convert to list if needed
if hasattr(samples, 'tolist'):
sample_list = samples.tolist()
elif hasattr(samples, '__iter__'):
sample_list = list(samples)
else:
sample_list = samples
# Extract basic features for simulation
if HAS_NUMPY and hasattr(samples, '__array__'):
# Convert to numpy array if needed
np_samples = np.asarray(samples, dtype=np.float32)
if len(np_samples) > 0:
mean_square = np.mean(np_samples ** 2)
energy = np.sqrt(mean_square) if not np.isnan(mean_square) else 0.0
zero_crossings = np.sum(np.diff(np.sign(np_samples)) != 0)
else:
energy = 0.0
zero_crossings = 0
else:
# Manual calculation without numpy
if sample_list and len(sample_list) > 0:
energy = math.sqrt(sum(s**2 for s in sample_list) / len(sample_list))
zero_crossings = sum(1 for i in range(1, len(sample_list))
if (sample_list[i-1] >= 0) != (sample_list[i] >= 0))
else:
energy = 0.0
zero_crossings = 0
# Pack into bytes (simplified)
# Ensure values are valid
energy_int = max(0, min(65535, int(energy)))
zc_int = max(0, min(65535, int(zero_crossings)))
data = struct.pack('<HH', energy_int, zc_int)
# Pad to expected frame size
data += b'\x00' * (self.frame_bytes - len(data))
return data[:self.frame_bytes]
def _simulate_decompression(self, compressed: bytes):
"""Simulate Codec2 decompression (for testing)."""
# Unpack features
if len(compressed) >= 4:
energy, zero_crossings = struct.unpack('<HH', compressed[:4])
else:
energy, zero_crossings = 1000, 100
# Generate synthetic speech-like signal
if HAS_NUMPY:
t = np.linspace(0, self.frame_ms/1000, self.frame_samples)
# Base frequency from zero crossings
freq = zero_crossings * 10 # Simplified mapping
# Generate harmonics
signal = np.zeros(self.frame_samples)
for harmonic in range(1, 4):
signal += np.sin(2 * np.pi * freq * harmonic * t) / harmonic
# Apply energy envelope
signal *= energy / 10000.0
# Convert to 16-bit PCM
return (signal * 32767).astype(np.int16)
else:
# Manual generation without numpy
samples = []
freq = zero_crossings * 10
for i in range(self.frame_samples):
t = i / 8000.0 # 8kHz sample rate
value = 0
for harmonic in range(1, 4):
value += math.sin(2 * math.pi * freq * harmonic * t) / harmonic
value *= energy / 10000.0
# Clamp to 16-bit range
sample = int(value * 32767)
sample = max(-32768, min(32767, sample))
samples.append(sample)
return array.array('h', samples)
class FSKModem:
"""
4-FSK modem for transmitting digital data over voice channels.
Designed to survive GSM/AMR/EVS vocoders.
"""
def __init__(self, sample_rate: int = 8000, baud_rate: int = 600):
"""
Initialize FSK modem.
Args:
sample_rate: Audio sample rate (Hz)
baud_rate: Symbol rate (baud)
"""
self.sample_rate = sample_rate
self.baud_rate = baud_rate
self.samples_per_symbol = int(sample_rate / baud_rate)
# 4-FSK frequencies (300-3400 Hz band)
self.frequencies = [
600, # 00
1200, # 01
1800, # 10
2400 # 11
]
# Preamble for synchronization (800 Hz, 100ms)
self.preamble_freq = 800
self.preamble_duration = 0.1 # seconds
# Quiet initialization - no print
def modulate(self, data: bytes, add_preamble: bool = True):
"""
Modulate binary data to FSK audio signal.
Args:
data: Binary data to modulate
add_preamble: Whether to add synchronization preamble
Returns:
Audio signal (normalized float32 array or list)
"""
# Convert bytes to dibits (2-bit symbols)
symbols = []
for byte in data:
symbols.extend([
(byte >> 6) & 0x03,
(byte >> 4) & 0x03,
(byte >> 2) & 0x03,
byte & 0x03
])
# Generate audio signal
signal = []
# Add preamble
if add_preamble:
preamble_samples = int(self.preamble_duration * self.sample_rate)
if HAS_NUMPY:
t = np.arange(preamble_samples) / self.sample_rate
preamble = np.sin(2 * np.pi * self.preamble_freq * t)
signal.extend(preamble)
else:
for i in range(preamble_samples):
t = i / self.sample_rate
value = math.sin(2 * math.pi * self.preamble_freq * t)
signal.append(value)
# Modulate symbols
for symbol in symbols:
freq = self.frequencies[symbol]
if HAS_NUMPY:
t = np.arange(self.samples_per_symbol) / self.sample_rate
tone = np.sin(2 * np.pi * freq * t)
signal.extend(tone)
else:
for i in range(self.samples_per_symbol):
t = i / self.sample_rate
value = math.sin(2 * math.pi * freq * t)
signal.append(value)
# Apply smoothing to reduce clicks
if HAS_NUMPY:
audio = np.array(signal, dtype=np.float32)
else:
audio = array.array('f', signal)
audio = self._apply_envelope(audio)
return audio
def demodulate(self, audio) -> Tuple[bytes, float]:
"""
Demodulate FSK audio signal to binary data.
Args:
audio: Audio signal
Returns:
Tuple of (demodulated data, confidence score)
"""
# Find preamble
preamble_start = self._find_preamble(audio)
if preamble_start < 0:
return b'', 0.0
# Skip preamble
data_start = preamble_start + int(self.preamble_duration * self.sample_rate)
# Demodulate symbols
symbols = []
confidence_scores = []
pos = data_start
while pos + self.samples_per_symbol <= len(audio):
symbol_audio = audio[pos:pos + self.samples_per_symbol]
symbol, confidence = self._demodulate_symbol(symbol_audio)
symbols.append(symbol)
confidence_scores.append(confidence)
pos += self.samples_per_symbol
# Convert symbols to bytes
data = bytearray()
for i in range(0, len(symbols), 4):
if i + 3 < len(symbols):
byte = (symbols[i] << 6) | (symbols[i+1] << 4) | (symbols[i+2] << 2) | symbols[i+3]
data.append(byte)
if HAS_NUMPY and confidence_scores:
avg_confidence = np.mean(confidence_scores)
else:
avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0
return bytes(data), avg_confidence
def _find_preamble(self, audio) -> int:
"""Find preamble in audio signal."""
# Simple energy-based detection
window_size = int(0.01 * self.sample_rate) # 10ms window
if HAS_NUMPY:
for i in range(0, len(audio) - window_size, window_size // 2):
window = audio[i:i + window_size]
# Check for preamble frequency
fft = np.fft.fft(window)
freqs = np.fft.fftfreq(len(window), 1/self.sample_rate)
# Find peak near preamble frequency
idx = np.argmax(np.abs(fft[:len(fft)//2]))
peak_freq = abs(freqs[idx])
if abs(peak_freq - self.preamble_freq) < 50: # 50 Hz tolerance
return i
else:
# Simple zero-crossing based detection without FFT
for i in range(0, len(audio) - window_size, window_size // 2):
window = list(audio[i:i + window_size])
# Count zero crossings
zero_crossings = 0
for j in range(1, len(window)):
if (window[j-1] >= 0) != (window[j] >= 0):
zero_crossings += 1
# Estimate frequency from zero crossings
estimated_freq = (zero_crossings * self.sample_rate) / (2 * len(window))
if abs(estimated_freq - self.preamble_freq) < 100: # 100 Hz tolerance
return i
return -1
def _demodulate_symbol(self, audio) -> Tuple[int, float]:
"""Demodulate a single FSK symbol."""
if HAS_NUMPY:
# FFT-based demodulation
fft = np.fft.fft(audio)
freqs = np.fft.fftfreq(len(audio), 1/self.sample_rate)
magnitude = np.abs(fft[:len(fft)//2])
# Find energy at each FSK frequency
energies = []
for freq in self.frequencies:
idx = np.argmin(np.abs(freqs[:len(freqs)//2] - freq))
energy = magnitude[idx]
energies.append(energy)
# Select symbol with highest energy
symbol = np.argmax(energies)
else:
# Goertzel algorithm for specific frequency detection
audio_list = list(audio) if hasattr(audio, '__iter__') else audio
energies = []
for freq in self.frequencies:
# Goertzel algorithm
omega = 2 * math.pi * freq / self.sample_rate
coeff = 2 * math.cos(omega)
s_prev = 0
s_prev2 = 0
for sample in audio_list:
s = sample + coeff * s_prev - s_prev2
s_prev2 = s_prev
s_prev = s
# Calculate magnitude
power = s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2
energies.append(math.sqrt(abs(power)))
# Select symbol with highest energy
symbol = energies.index(max(energies))
# Confidence is ratio of strongest to second strongest
sorted_energies = sorted(energies, reverse=True)
confidence = sorted_energies[0] / (sorted_energies[1] + 1e-6)
return symbol, min(confidence, 10.0) / 10.0
def _apply_envelope(self, audio):
"""Apply smoothing envelope to reduce clicks."""
# Simple raised cosine envelope
ramp_samples = int(0.002 * self.sample_rate) # 2ms ramps
if len(audio) > 2 * ramp_samples:
if HAS_NUMPY:
# Fade in
t = np.linspace(0, np.pi/2, ramp_samples)
audio[:ramp_samples] *= np.sin(t) ** 2
# Fade out
audio[-ramp_samples:] *= np.sin(t[::-1]) ** 2
else:
# Manual fade in
for i in range(ramp_samples):
t = (i / ramp_samples) * (math.pi / 2)
factor = math.sin(t) ** 2
audio[i] *= factor
# Manual fade out
for i in range(ramp_samples):
t = ((ramp_samples - 1 - i) / ramp_samples) * (math.pi / 2)
factor = math.sin(t) ** 2
audio[-(i+1)] *= factor
return audio
class VoiceProtocol:
"""
Integrates voice codec and modem with the Icing protocol
for encrypted voice transmission over GSM.
"""
def __init__(self, protocol_instance):
"""
Initialize voice protocol handler.
Args:
protocol_instance: IcingProtocol instance
"""
self.protocol = protocol_instance
self.codec = Codec2Wrapper(Codec2Mode.MODE_1200)
self.modem = FSKModem(sample_rate=8000, baud_rate=600)
# Voice crypto state
self.voice_iv_counter = 0
self.voice_sequence = 0
# Buffers
if HAS_NUMPY:
self.audio_buffer = np.array([], dtype=np.int16)
else:
self.audio_buffer = array.array('h') # 16-bit signed integers
self.frame_buffer = []
print(f"{GREEN}[VOICE]{RESET} Voice protocol initialized")
def process_voice_input(self, audio_samples):
"""
Process voice input: compress, encrypt, and modulate.
Args:
audio_samples: PCM audio samples (8kHz, 16-bit)
Returns:
Modulated audio signal ready for transmission (numpy array or array.array)
"""
# Add to buffer
if HAS_NUMPY:
self.audio_buffer = np.concatenate([self.audio_buffer, audio_samples])
else:
self.audio_buffer.extend(audio_samples)
# Process complete frames
modulated_audio = []
while len(self.audio_buffer) >= self.codec.frame_samples:
# Extract frame
if HAS_NUMPY:
frame_audio = self.audio_buffer[:self.codec.frame_samples]
self.audio_buffer = self.audio_buffer[self.codec.frame_samples:]
else:
frame_audio = array.array('h', self.audio_buffer[:self.codec.frame_samples])
del self.audio_buffer[:self.codec.frame_samples]
# Compress with Codec2
compressed_frame = self.codec.encode(frame_audio)
if not compressed_frame:
continue
# Encrypt frame
encrypted = self._encrypt_voice_frame(compressed_frame)
# Add FEC
protected = self._add_fec(encrypted)
# Modulate to audio
audio_signal = self.modem.modulate(protected, add_preamble=True)
modulated_audio.append(audio_signal)
if modulated_audio:
if HAS_NUMPY:
return np.concatenate(modulated_audio)
else:
# Concatenate array.array objects
result = array.array('f')
for audio in modulated_audio:
result.extend(audio)
return result
return None
def process_voice_output(self, modulated_audio):
"""
Process received audio: demodulate, decrypt, and decompress.
Args:
modulated_audio: Received FSK-modulated audio
Returns:
Decoded PCM audio samples (numpy array or array.array)
"""
# Demodulate
data, confidence = self.modem.demodulate(modulated_audio)
if confidence < 0.5:
print(f"{YELLOW}[VOICE]{RESET} Low demodulation confidence: {confidence:.2f}")
return None
# Remove FEC
frame_data = self._remove_fec(data)
if not frame_data:
return None
# Decrypt
compressed_frame = self._decrypt_voice_frame(frame_data)
if not compressed_frame:
return None
# Decompress
audio_samples = self.codec.decode(compressed_frame)
return audio_samples
def _encrypt_voice_frame(self, frame: Codec2Frame) -> bytes:
"""Encrypt a voice frame using ChaCha20-CTR."""
if not self.protocol.hkdf_key:
raise ValueError("No encryption key available")
# Prepare frame data
frame_data = struct.pack('<BIH',
frame.mode,
frame.frame_number,
len(frame.bits)
) + frame.bits
# Generate IV for this frame (ChaCha20 needs 16 bytes)
iv = struct.pack('<Q', self.voice_iv_counter) + b'\x00' * 8 # 8 + 8 = 16 bytes
self.voice_iv_counter += 1
# Encrypt using ChaCha20
from encryption import chacha20_encrypt
key = bytes.fromhex(self.protocol.hkdf_key)
encrypted = chacha20_encrypt(frame_data, key, iv)
# Add sequence number and IV hint
return struct.pack('<HQ', self.voice_sequence, self.voice_iv_counter) + encrypted
def _decrypt_voice_frame(self, data: bytes) -> Optional[Codec2Frame]:
"""Decrypt a voice frame."""
if len(data) < 10:
return None
# Extract sequence and IV hint
sequence, iv_hint = struct.unpack('<HQ', data[:10])
encrypted = data[10:]
# Generate IV (16 bytes for ChaCha20)
iv = struct.pack('<Q', iv_hint) + b'\x00' * 8
# Decrypt
from encryption import chacha20_decrypt
key = bytes.fromhex(self.protocol.hkdf_key)
try:
decrypted = chacha20_decrypt(encrypted, key, iv)
# Parse frame
mode, frame_num, bits_len = struct.unpack('<BIH', decrypted[:7])
bits = decrypted[7:7+bits_len]
return Codec2Frame(
mode=Codec2Mode(mode),
bits=bits,
timestamp=0, # Will be set by caller
frame_number=frame_num
)
except Exception as e:
print(f"{RED}[VOICE]{RESET} Decryption failed: {e}")
return None
def _add_fec(self, data: bytes) -> bytes:
"""Add forward error correction."""
# Simple repetition code (3x) for testing
# In production: use convolutional code or LDPC
fec_data = bytearray()
for byte in data:
# Repeat each byte 3 times
fec_data.extend([byte, byte, byte])
return bytes(fec_data)
def _remove_fec(self, data: bytes) -> Optional[bytes]:
"""Remove FEC and correct errors."""
if len(data) % 3 != 0:
return None
corrected = bytearray()
for i in range(0, len(data), 3):
# Majority voting
votes = [data[i], data[i+1], data[i+2]]
byte_value = max(set(votes), key=votes.count)
corrected.append(byte_value)
return bytes(corrected)
# Example usage
if __name__ == "__main__":
# Test Codec2 wrapper
print(f"\n{BLUE}=== Testing Codec2 Wrapper ==={RESET}")
codec = Codec2Wrapper(Codec2Mode.MODE_1200)
# Generate test audio
if HAS_NUMPY:
t = np.linspace(0, 0.04, 320) # 40ms at 8kHz
test_audio = (np.sin(2 * np.pi * 440 * t) * 16384).astype(np.int16)
else:
test_audio = array.array('h')
for i in range(320):
t = i * 0.04 / 320
value = int(math.sin(2 * math.pi * 440 * t) * 16384)
test_audio.append(value)
# Encode
frame = codec.encode(test_audio)
print(f"Encoded frame: {len(frame.bits)} bytes")
# Decode
decoded = codec.decode(frame)
print(f"Decoded audio: {len(decoded)} samples")
# Test FSK modem
print(f"\n{BLUE}=== Testing FSK Modem ==={RESET}")
modem = FSKModem()
# Test data
test_data = b"Hello, secure voice!"
# Modulate
modulated = modem.modulate(test_data)
print(f"Modulated: {len(modulated)} samples ({len(modulated)/8000:.2f}s)")
# Demodulate
demodulated, confidence = modem.demodulate(modulated)
print(f"Demodulated: {demodulated}")
print(f"Confidence: {confidence:.2%}")
print(f"Match: {demodulated == test_data}")