Cleaning a bit more

This commit is contained in:
STCB 2025-07-07 00:00:05 +02:00
parent 6b517f6a46
commit 4832ba751f
25 changed files with 0 additions and 2242 deletions

View File

@ -1,52 +0,0 @@
# Audio Setup Guide for DryBox
## Installing PyAudio on Fedora
PyAudio requires system dependencies before installation:
```bash
# Install required system packages
sudo dnf install python3-devel portaudio-devel
# Then install PyAudio
pip install pyaudio
```
## Alternative: Run Without PyAudio
If you prefer not to install PyAudio, the application will still work but without real-time playback. You can still:
- Record audio to files
- Process and export audio
- Use all other features
To run without PyAudio, the audio_player.py module will gracefully handle the missing dependency.
## Ubuntu/Debian Installation
```bash
sudo apt-get install python3-dev portaudio19-dev
pip install pyaudio
```
## macOS Installation
```bash
brew install portaudio
pip install pyaudio
```
## Troubleshooting
If you see "No module named 'pyaudio'" errors:
1. The app will continue to work without playback
2. Recording and processing features remain available
3. Install PyAudio later when convenient
## Testing Audio Features
1. Run the application: `python UI/main.py`
2. Start a call between phones
3. Test features:
- Recording: Works without PyAudio
- Playback: Requires PyAudio
- Processing: Works without PyAudio

View File

@ -1,118 +0,0 @@
# Audio Testing Guide for DryBox
## Setup Verification
1. **Start the server first**:
```bash
python server.py
```
2. **Run the UI**:
```bash
python UI/main.py
```
## Testing Audio Playback
### Step 1: Test PyAudio is Working
When you enable playback (Ctrl+1 or Ctrl+2), you should hear a short beep (100ms, 1kHz tone). This confirms:
- PyAudio is properly installed
- Audio output device is working
- Stream format is correct
### Step 2: Test During Call
1. Click "Run Automatic Test" or press Space
2. **Immediately** enable playback on Phone 2 (Ctrl+2)
- You should hear the test beep
3. Watch the debug console for:
- "Phone 2 playback started"
- "Phone 2 sent test beep to verify audio"
4. Wait for handshake to complete (steps 4-5 in test)
5. Once voice session starts, you should see:
- "Phone 2 received audio data: XXX bytes"
- "Phone 2 forwarding audio to player"
- "Client 1 playback thread got XXX bytes"
### What to Look For in Debug Console
**Good signs:**
```
[AudioPlayer] Client 1 add_audio_data called with 640 bytes
[AudioPlayer] Client 1 added to buffer, queue size: 1
[AudioPlayer] Client 1 playback thread got 640 bytes
```
**Problem signs:**
```
[AudioPlayer] Client 1 has no buffer (playback not started?)
Low confidence demodulation: 0.XX
Codec decode returned None or empty
```
## Troubleshooting
### No Test Beep
- Check system volume
- Verify PyAudio: `python test_audio_setup.py`
- Check audio device: `python -c "import pyaudio; p=pyaudio.PyAudio(); print(p.get_default_output_device_info())"`
### Test Beep Works but No Voice Audio
1. **Check if audio is being transmitted:**
- Phone 1 should show: "sent N voice frames"
- Phone 2 should show: "Received voice data frame #N"
2. **Check if audio is being decoded:**
- Look for: "Decoded PCM samples: type=<class 'numpy.ndarray'>, len=320"
- Look for: "Emitting PCM bytes: 640 bytes"
3. **Check if audio reaches the player:**
- Look for: "Phone 2 received audio data: 640 bytes"
- Look for: "Client 1 add_audio_data called with 640 bytes"
### Audio Sounds Distorted
This is normal! The system uses:
- Codec2 at 1200bps (very low bitrate)
- 4FSK modulation
- This creates robotic/vocoder-like sound
### Manual Testing Commands
Test just the codec:
```python
python test_audio_pipeline.py
```
Play the test outputs:
```bash
# Original
aplay wav/input.wav
# Codec only (should sound robotic)
aplay wav/test_codec_only.wav
# Full pipeline (codec + FSK)
aplay wav/test_full_pipeline.wav
```
## Expected Audio Flow
1. Phone 1 reads `wav/input.wav` (8kHz mono)
2. Encodes 320 samples (40ms) with Codec2 → 6 bytes
3. Modulates with 4FSK → ~1112 float samples
4. Encrypts with Noise XK
5. Sends to server
6. Server routes to Phone 2
7. Phone 2 decrypts with Noise XK
8. Demodulates FSK → 6 bytes
9. Decodes with Codec2 → 320 samples (640 bytes PCM)
10. Sends to PyAudio for playback
## Recording Feature
To save received audio:
1. Press Alt+1 or Alt+2 to start recording
2. Let it run during the call
3. Press again to stop and save
4. Check `wav/` directory for saved files
This helps verify if audio is being received even if playback isn't working.

View File

@ -1,58 +0,0 @@
# Phone 2 Playback - What It Actually Plays
## The Complete Audio Flow for Phone 2
When Phone 2 receives audio, it goes through this exact process:
### 1. Network Reception
- Encrypted data arrives from server
- Data includes Noise XK encrypted voice frames
### 2. Decryption (Noise XK)
- `protocol_phone_client.py` line 156-165: Noise wrapper decrypts the data
- Result: Decrypted voice message containing FSK modulated signal
### 3. Demodulation (4FSK)
- `_handle_voice_data()` line 223: FSK demodulation
- Converts modulated signal back to 6 bytes of compressed data
- Only processes if confidence > 0.5
### 4. Decompression (Codec2 Decode)
- Line 236: `pcm_samples = self.codec.decode(frame)`
- Converts 6 bytes → 320 samples (640 bytes PCM)
- This is the final audio ready for playback
### 5. Playback
- Line 264: `self.data_received.emit(pcm_bytes, self.client_id)`
- PCM audio sent to audio player
- PyAudio plays the 16-bit, 8kHz mono audio
## What You Hear on Phone 2
Phone 2 plays audio that has been:
- ✅ Encrypted → Decrypted (Noise XK)
- ✅ Modulated → Demodulated (4FSK)
- ✅ Compressed → Decompressed (Codec2)
The audio will sound:
- **Robotic/Vocoder-like** due to 1200bps Codec2 compression
- **Slightly delayed** due to processing pipeline
- **But intelligible** - you can understand speech
## Fixed Issues
1. **Silent beginning**: Now skips first second of silence in input.wav
2. **Control messages**: No longer sent to audio player
3. **Debug spam**: Reduced to show only important frames
## Testing Phone 2 Playback
1. Run automatic test (Space)
2. Enable Phone 2 playback (Ctrl+2)
3. Wait for handshake to complete
4. You should hear:
- Audio starting from 1 second into input.wav
- Processed through full protocol stack
- Robotic but understandable audio
The key point: Phone 2 IS playing fully processed audio (decrypted + demodulated + decompressed)!

View File

@ -1,67 +0,0 @@
# Fixed Audio Playback Guide
## How Playback Now Works
### Phone 1 (Sender) Playback
- **What it plays**: Original audio from `input.wav` BEFORE encoding
- **When to enable**: During a call to hear what you're sending
- **Audio quality**: Clear, unprocessed 8kHz mono audio
### Phone 2 (Receiver) Playback
- **What it plays**: Decoded audio AFTER the full pipeline (Codec2 → FSK → Noise XK → transmission → decryption → demodulation → decoding)
- **When to enable**: During a call to hear what's being received
- **Audio quality**: Robotic/vocoder sound due to 1200bps Codec2 compression
## Changes Made
1. **Fixed control message routing** - 8-byte control messages no longer sent to audio player
2. **Phone 1 now plays original audio** when sending (before encoding)
3. **Removed test beep** - you'll hear actual audio immediately
4. **Added size filter** - only audio data (≥320 bytes) is processed
## Testing Steps
1. Start server: `python server.py`
2. Start UI: `python UI/main.py`
3. Run automatic test (Space key)
4. **For Phone 1 playback**: Press Ctrl+1 to hear the original `input.wav` being sent
5. **For Phone 2 playback**: Press Ctrl+2 to hear the decoded audio after transmission
## Expected Debug Output
**Good signs for Phone 1 (sender):**
```
Phone 1 playing original audio (sender playback)
[AudioPlayer] Client 0 add_audio_data called with 640 bytes
```
**Good signs for Phone 2 (receiver):**
```
Phone 2 received audio data: 640 bytes
Phone 2 forwarding audio to player (playback enabled)
[AudioPlayer] Client 1 add_audio_data called with 640 bytes
```
**Fixed issues:**
```
Phone 2 received non-audio data: 8 bytes (ignoring) # Control messages now filtered out
```
## Audio Quality Expectations
- **Phone 1**: Should sound identical to `input.wav`
- **Phone 2**: Will sound robotic/compressed due to:
- Codec2 compression at 1200bps (very low bitrate)
- 4FSK modulation/demodulation
- This is normal and proves the protocol is working!
## Troubleshooting
If you still don't hear audio:
1. **Check debug console** for the messages above
2. **Verify handshake completes** before expecting audio
3. **Try recording** (Alt+1/2) to save audio for offline playback
4. **Check system volume** and audio device
The most important fix: control messages are no longer sent to the audio player, so you should only receive actual 640-byte audio frames.

View File

@ -1,83 +0,0 @@
# Audio Playback Implementation Summary
## Key Fixes Applied
### 1. Separated Sender vs Receiver Playback
- **Phone 1 (Sender)**: Now plays the original `input.wav` audio when transmitting
- **Phone 2 (Receiver)**: Plays the decoded audio after full protocol processing
### 2. Fixed Control Message Routing
- Control messages (like "CALL_END" - 8 bytes) no longer sent to audio player
- Added size filter: only data ≥320 bytes is considered audio
- Removed problematic `data_received.emit()` for non-audio messages
### 3. Improved Debug Logging
- Reduced verbosity: logs only first frame and every 25th frame
- Clear indication of what's happening at each stage
- Separate logging for sender vs receiver playback
### 4. Code Changes Made
**phone_manager.py**:
- Added original audio playback for sender
- Added size filter for received data
- Improved debug logging with frame counters
**protocol_phone_client.py**:
- Removed control message emission to data_received
- Added confidence logging for demodulation
- Reduced debug verbosity
**audio_player.py**:
- Added frame counting for debug
- Reduced playback thread logging
- Better buffer status reporting
**main.py**:
- Fixed lambda signal connection issue
- Improved UI scaling with flexible layouts
## How to Test
1. Start server and UI
2. Run automatic test (Space)
3. Enable playback:
- **Ctrl+1**: Hear original audio from Phone 1
- **Ctrl+2**: Hear decoded audio on Phone 2
## Expected Behavior
**Phone 1 with playback enabled:**
- Clear audio matching `input.wav`
- Shows "playing original audio (sender playback)"
**Phone 2 with playback enabled:**
- Robotic/compressed audio (normal for 1200bps)
- Shows "received audio frame #N: 640 bytes"
- No more "8 bytes" messages
## Audio Flow
```
Phone 1: Phone 2:
input.wav (8kHz)
[Playback here if enabled]
Codec2 encode (1200bps)
4FSK modulate
Noise XK encrypt
→ Network transmission →
Noise XK decrypt
4FSK demodulate
Codec2 decode
[Playback here if enabled]
```
The playback now correctly plays audio at the right points in the pipeline!

View File

@ -1,60 +0,0 @@
# DryBox - Secure Voice Over GSM Protocol
A secure voice communication protocol that transmits encrypted voice data over standard GSM voice channels.
## Architecture
- **Noise XK Protocol**: Provides authenticated key exchange and secure channel
- **Codec2**: Voice compression (1200 bps mode)
- **4FSK Modulation**: Converts digital data to audio tones
- **Encryption**: ChaCha20-Poly1305 for secure communication
## Project Structure
```
DryBox/
├── UI/ # User interface components
│ ├── main.py # Main PyQt5 application
│ ├── phone_manager.py # Phone state management
│ ├── protocol_phone_client.py # Protocol client implementation
│ ├── noise_wrapper.py # Noise XK wrapper
│ └── ...
├── simulator/ # GSM channel simulator
│ └── gsm_simulator.py # Simulates GSM voice channel
├── voice_codec.py # Codec2 and FSK modem implementation
├── encryption.py # Encryption utilities
└── wav/ # Audio test files
```
## Running the Protocol
1. Start the GSM simulator:
```bash
cd simulator
python3 gsm_simulator.py
```
2. Run the UI application:
```bash
./run_ui.sh
# or
python3 UI/main.py
```
## Usage
1. Click "Call" on Phone 1 to initiate
2. Click "Answer" on Phone 2 to accept
3. The protocol will automatically:
- Establish secure connection via Noise XK
- Start voice session
- Compress and encrypt voice data
- Transmit over simulated GSM channel
## Requirements
- Python 3.6+
- PyQt5
- dissononce (Noise protocol)
- numpy (optional, for optimized audio processing)

View File

@ -1,109 +0,0 @@
import socket
import time
import select
from PyQt5.QtCore import QThread, pyqtSignal
from client_state import ClientState
class PhoneClient(QThread):
data_received = pyqtSignal(bytes, int)
state_changed = pyqtSignal(str, str, int)
def __init__(self, client_id):
super().__init__()
self.host = "localhost"
self.port = 12345
self.client_id = client_id
self.sock = None
self.running = True
self.state = ClientState(client_id)
def connect_socket(self):
retries = 3
for attempt in range(retries):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock.settimeout(120)
self.sock.connect((self.host, self.port))
print(f"Client {self.client_id} connected to {self.host}:{self.port}")
return True
except Exception as e:
print(f"Client {self.client_id} connection attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(1)
self.sock = None
return False
def run(self):
while self.running:
if not self.sock:
if not self.connect_socket():
print(f"Client {self.client_id} failed to connect after retries")
self.state_changed.emit("CALL_END", "", self.client_id)
break
try:
while self.running:
self.state.process_command(self)
self.state.check_handshake_timeout(self)
# Always check for incoming data, even during handshake
if self.sock is None:
print(f"Client {self.client_id} socket is None, exiting inner loop")
break
readable, _, _ = select.select([self.sock], [], [], 0.01)
if readable:
try:
if self.sock is None:
print(f"Client {self.client_id} socket is None before recv, exiting")
break
data = self.sock.recv(1024)
if not data:
print(f"Client {self.client_id} disconnected")
self.state_changed.emit("CALL_END", "", self.client_id)
break
self.state.handle_data(self, data)
except socket.error as e:
print(f"Client {self.client_id} socket error: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
break
self.msleep(1)
except Exception as e:
print(f"Client {self.client_id} unexpected error in run loop: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
break
finally:
if self.sock:
try:
self.sock.close()
except Exception as e:
print(f"Client {self.client_id} error closing socket: {e}")
self.sock = None
def send(self, message):
if self.sock and self.running:
try:
if isinstance(message, str):
data = message.encode('utf-8')
self.sock.send(data)
print(f"Client {self.client_id} sent: {message}, length={len(data)}")
else:
self.sock.send(message)
print(f"Client {self.client_id} sent binary data, length={len(message)}")
except socket.error as e:
print(f"Client {self.client_id} send error: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
def stop(self):
self.running = False
if self.sock:
try:
self.sock.close()
except Exception as e:
print(f"Client {self.client_id} error closing socket in stop: {e}")
self.sock = None
self.quit()
self.wait(1000)
def start_handshake(self, initiator, keypair, peer_pubkey):
self.state.start_handshake(initiator, keypair, peer_pubkey)

View File

@ -1,105 +0,0 @@
# DryBox UI Features Guide
## UI Improvements
The UI has been updated with responsive layouts that scale better:
- Phone displays now use flexible sizing (min/max constraints)
- Waveform widgets adapt to available space
- Buttons have flexible widths that scale with window size
- Better margins and padding for improved visual appearance
## Audio Playback Feature
The DryBox UI includes real-time audio playback capabilities that allow you to hear the decoded audio as it's received.
### How to Use Playback
#### Manual Control
1. **During a Call**: Once a secure voice session is established, click the "🔊 Playback" button under either phone
2. **Button States**:
- Gray (unchecked): Playback disabled
- Green (checked): Playback active
3. **Toggle Anytime**: You can enable/disable playback at any time during a call
#### Keyboard Shortcuts
- `Ctrl+1`: Toggle playback for Phone 1
- `Ctrl+2`: Toggle playback for Phone 2
### Using Playback with Automatic Test
The automatic test feature demonstrates the complete protocol flow. Here's how to use it with playback:
1. **Start the Test**: Click "🧪 Run Automatic Test" or press `Space`
2. **Enable Playback Early**:
- As soon as the test starts, enable playback on Phone 2 (Ctrl+2)
- This ensures you'll hear audio as soon as the secure channel is established
3. **What You'll Hear**:
- Once handshake completes (step 4-5), Phone 1 starts transmitting test audio
- Phone 2 will play the received, decoded audio through your speakers
- The audio goes through: Codec2 encoding → 4FSK modulation → Noise XK encryption → transmission → decryption → demodulation → Codec2 decoding
### Audio Recording Feature
You can also record received audio for later analysis:
1. **Start Recording**: Click "⏺ Record" button (or press Alt+1/Alt+2)
2. **Stop Recording**: Click the button again
3. **Files Saved**: Recordings are saved to `wav/` directory with timestamps
### Audio Processing Options
Access advanced audio features via "Audio Options" button (Ctrl+A):
- **Export Buffer**: Save current audio buffer to file
- **Clear Buffer**: Clear accumulated audio data
- **Processing Options**:
- Normalize Audio
- Apply Gain (adjustable dB)
- Noise Gate
- Low/High Pass Filters
- Remove Silence
### Requirements
For playback to work, you need PyAudio installed:
```bash
# Fedora/RHEL
sudo dnf install python3-devel portaudio-devel
pip install pyaudio
# Ubuntu/Debian
sudo apt-get install python3-dev portaudio19-dev
pip install pyaudio
```
If PyAudio isn't installed, recording will still work but playback will be disabled.
### Troubleshooting
1. **No Sound**:
- Check PyAudio is installed
- Ensure system volume is up
- Verify audio device is working
2. **Choppy Audio**:
- Normal for low-bitrate codec (1200bps)
- Represents actual protocol performance
3. **Delayed Start**:
- Audio only flows after secure handshake
- Wait for "🔒 Secure Channel Established" status
### Test Sequence Overview
The automatic test goes through these steps:
1. Initial state check
2. Phone 1 calls Phone 2
3. Phone 2 answers
4. Noise XK handshake begins
5. Handshake completes, secure channel established
6. Voice session starts (Codec2 + 4FSK)
7. Audio transmission begins
8. Protocol details logged
9. Transmission continues for observation
10. Final statistics
11. Call ends, cleanup
Enable playback on the receiving phone to hear the transmitted audio in real-time!

View File

@ -1,150 +0,0 @@
#!/usr/bin/env python3
"""Test script for audio features in DryBox"""
import sys
import os
import wave
import struct
import time
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from UI.audio_player import AudioPlayer, PYAUDIO_AVAILABLE
from UI.audio_processor import AudioProcessor
def create_test_audio(filename="test_tone.wav", duration=2, frequency=440):
"""Create a test audio file with a sine wave"""
sample_rate = 8000
num_samples = int(sample_rate * duration)
# Generate sine wave
import math
samples = []
for i in range(num_samples):
t = float(i) / sample_rate
value = int(32767 * 0.5 * math.sin(2 * math.pi * frequency * t))
samples.append(value)
# Save to WAV file
with wave.open(filename, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
wav_file.writeframes(struct.pack(f'{len(samples)}h', *samples))
print(f"Created test audio file: {filename}")
return filename
def test_audio_player():
"""Test audio player functionality"""
print("\n=== Testing Audio Player ===")
player = AudioPlayer()
player.set_debug_callback(print)
if PYAUDIO_AVAILABLE:
print("PyAudio is available - testing playback")
# Test playback
client_id = 0
if player.start_playback(client_id):
print(f"Started playback for client {client_id}")
# Create and play test audio
test_file = create_test_audio()
with wave.open(test_file, 'rb') as wav:
data = wav.readframes(wav.getnframes())
# Add audio data
chunk_size = 640 # 320 samples * 2 bytes
for i in range(0, len(data), chunk_size):
chunk = data[i:i+chunk_size]
player.add_audio_data(client_id, chunk)
time.sleep(0.04) # 40ms per chunk
time.sleep(0.5) # Let playback finish
player.stop_playback(client_id)
print(f"Stopped playback for client {client_id}")
# Clean up
os.remove(test_file)
else:
print("PyAudio not available - skipping playback test")
# Test recording (works without PyAudio)
print("\n=== Testing Recording ===")
client_id = 1
player.start_recording(client_id)
# Add some test data
test_data = b'\x00\x01' * 320 # Simple test pattern
for i in range(10):
player.add_audio_data(client_id, test_data)
save_path = player.stop_recording(client_id, "test_recording.wav")
if save_path and os.path.exists(save_path):
print(f"Recording saved successfully: {save_path}")
os.remove(save_path)
else:
print("Recording failed")
player.cleanup()
print("Audio player test complete")
def test_audio_processor():
"""Test audio processor functionality"""
print("\n=== Testing Audio Processor ===")
processor = AudioProcessor()
processor.set_debug_callback(print)
# Create test audio
test_file = create_test_audio("test_input.wav", duration=1, frequency=1000)
# Read test audio
with wave.open(test_file, 'rb') as wav:
test_data = wav.readframes(wav.getnframes())
# Test various processing functions
print("\nTesting normalize:")
normalized = processor.normalize_audio(test_data, target_db=-6)
save_path = processor.save_processed_audio(normalized, test_file, "normalized")
if save_path:
print(f"Saved: {save_path}")
os.remove(save_path)
print("\nTesting gain:")
gained = processor.apply_gain(test_data, gain_db=6)
save_path = processor.save_processed_audio(gained, test_file, "gained")
if save_path:
print(f"Saved: {save_path}")
os.remove(save_path)
print("\nTesting filters:")
filtered = processor.apply_low_pass_filter(test_data)
save_path = processor.save_processed_audio(filtered, test_file, "lowpass")
if save_path:
print(f"Saved: {save_path}")
os.remove(save_path)
# Clean up
os.remove(test_file)
print("\nAudio processor test complete")
def main():
"""Run all tests"""
print("DryBox Audio Features Test")
print("==========================")
if not PYAUDIO_AVAILABLE:
print("\nNOTE: PyAudio not installed. Playback tests will be skipped.")
print("To install: sudo dnf install python3-devel portaudio-devel && pip install pyaudio")
test_audio_player()
test_audio_processor()
print("\nAll tests complete!")
if __name__ == "__main__":
main()

View File

@ -1,67 +0,0 @@
#!/usr/bin/env python3
"""
Test to verify audio is flowing through the system
"""
import os
import wave
import struct
def check_audio_file():
"""Verify input.wav has actual audio content"""
print("Checking input.wav content...")
with wave.open("wav/input.wav", 'rb') as wf:
# Read multiple frames to check for silence
total_frames = wf.getnframes()
print(f"Total frames: {total_frames}")
# Check beginning
wf.setpos(0)
frames = wf.readframes(320)
samples = struct.unpack('320h', frames)
max_val = max(abs(s) for s in samples)
print(f"Frame 0 (beginning): max amplitude = {max_val}")
# Check middle
wf.setpos(total_frames // 2)
frames = wf.readframes(320)
samples = struct.unpack('320h', frames)
max_val = max(abs(s) for s in samples)
print(f"Frame {total_frames//2} (middle): max amplitude = {max_val}")
# Check near end
wf.setpos(total_frames - 640)
frames = wf.readframes(320)
samples = struct.unpack('320h', frames)
max_val = max(abs(s) for s in samples)
print(f"Frame {total_frames-640} (near end): max amplitude = {max_val}")
# Find first non-silent frame
wf.setpos(0)
for i in range(0, total_frames, 320):
frames = wf.readframes(320)
if len(frames) < 640:
break
samples = struct.unpack('320h', frames)
max_val = max(abs(s) for s in samples)
if max_val > 100: # Not silence
print(f"\nFirst non-silent frame at position {i}")
print(f"First 10 samples: {samples[:10]}")
break
def main():
# Change to DryBox directory if needed
if os.path.basename(os.getcwd()) != 'DryBox':
if os.path.exists('DryBox'):
os.chdir('DryBox')
check_audio_file()
print("\nTo fix silence at beginning of file:")
print("1. Skip initial silence in phone_manager.py")
print("2. Or use a different test file")
print("3. Or trim the silence from input.wav")
if __name__ == "__main__":
main()

View File

@ -1,193 +0,0 @@
#!/usr/bin/env python3
"""
Test the audio pipeline (Codec2 + FSK) independently
"""
import sys
import os
import wave
import struct
import numpy as np
# Add parent directory to path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from voice_codec import Codec2Wrapper, FSKModem, Codec2Mode, Codec2Frame
def test_codec_only():
"""Test just the codec2 encode/decode"""
print("\n1. Testing Codec2 only...")
codec = Codec2Wrapper(mode=Codec2Mode.MODE_1200)
# Read test audio
with wave.open("wav/input.wav", 'rb') as wf:
# Read 320 samples (40ms at 8kHz)
frames = wf.readframes(320)
if len(frames) < 640: # 320 samples * 2 bytes
print("Not enough audio data")
return False
# Convert to samples
samples = struct.unpack(f'{len(frames)//2}h', frames)
print(f"Input: {len(samples)} samples, first 10: {samples[:10]}")
# Encode
encoded = codec.encode(frames)
if encoded:
print(f"Encoded: {len(encoded.bits)} bytes")
print(f"First 10 bytes: {encoded.bits[:10].hex()}")
else:
print("Encoding failed!")
return False
# Decode
decoded = codec.decode(encoded)
if decoded is not None:
print(f"Decoded: type={type(decoded)}, len={len(decoded)}")
if hasattr(decoded, '__getitem__'):
print(f"First 10 samples: {list(decoded[:10])}")
# Save decoded audio
with wave.open("wav/test_codec_only.wav", 'wb') as out:
out.setnchannels(1)
out.setsampwidth(2)
out.setframerate(8000)
if hasattr(decoded, 'tobytes'):
out.writeframes(decoded.tobytes())
else:
# Convert to bytes
import array
arr = array.array('h', decoded)
out.writeframes(arr.tobytes())
print("Saved decoded audio to wav/test_codec_only.wav")
return True
else:
print("Decoding failed!")
return False
def test_full_pipeline():
"""Test the full Codec2 + FSK pipeline"""
print("\n2. Testing full pipeline (Codec2 + FSK)...")
codec = Codec2Wrapper(mode=Codec2Mode.MODE_1200)
modem = FSKModem()
# Read test audio
with wave.open("wav/input.wav", 'rb') as wf:
frames = wf.readframes(320)
if len(frames) < 640:
print("Not enough audio data")
return False
# Encode with Codec2
encoded = codec.encode(frames)
if not encoded:
print("Codec encoding failed!")
return False
print(f"Codec2 encoded: {len(encoded.bits)} bytes")
# Modulate with FSK
modulated = modem.modulate(encoded.bits)
print(f"FSK modulated: {len(modulated)} float samples")
# Demodulate
demodulated, confidence = modem.demodulate(modulated)
print(f"FSK demodulated: {len(demodulated)} bytes, confidence: {confidence:.2f}")
if confidence < 0.5:
print("Low confidence demodulation!")
return False
# Create frame for decoding
frame = Codec2Frame(
mode=Codec2Mode.MODE_1200,
bits=demodulated,
timestamp=0,
frame_number=0
)
# Decode with Codec2
decoded = codec.decode(frame)
if decoded is not None:
print(f"Decoded: type={type(decoded)}, len={len(decoded)}")
# Save decoded audio
with wave.open("wav/test_full_pipeline.wav", 'wb') as out:
out.setnchannels(1)
out.setsampwidth(2)
out.setframerate(8000)
if hasattr(decoded, 'tobytes'):
out.writeframes(decoded.tobytes())
else:
# Convert to bytes
import array
arr = array.array('h', decoded)
out.writeframes(arr.tobytes())
print("Saved decoded audio to wav/test_full_pipeline.wav")
return True
else:
print("Codec decoding failed!")
return False
def test_byte_conversion():
"""Test the byte conversion that happens in the protocol"""
print("\n3. Testing byte conversion...")
# Create test PCM data
test_samples = [100, -100, 200, -200, 300, -300, 0, 0, 1000, -1000]
# Method 1: array.tobytes()
import array
arr = array.array('h', test_samples)
bytes1 = arr.tobytes()
print(f"array.tobytes(): {len(bytes1)} bytes, hex: {bytes1.hex()}")
# Method 2: struct.pack
bytes2 = struct.pack(f'{len(test_samples)}h', *test_samples)
print(f"struct.pack(): {len(bytes2)} bytes, hex: {bytes2.hex()}")
# They should be the same
print(f"Bytes match: {bytes1 == bytes2}")
# Test unpacking
unpacked = struct.unpack(f'{len(bytes1)//2}h', bytes1)
print(f"Unpacked: {unpacked}")
print(f"Matches original: {list(unpacked) == test_samples}")
return True
def main():
print("Audio Pipeline Test")
print("=" * 50)
# Change to DryBox directory if needed
if os.path.basename(os.getcwd()) != 'DryBox':
if os.path.exists('DryBox'):
os.chdir('DryBox')
# Ensure wav directory exists
os.makedirs("wav", exist_ok=True)
# Run tests
codec_ok = test_codec_only()
pipeline_ok = test_full_pipeline()
bytes_ok = test_byte_conversion()
print("\n" + "=" * 50)
print("Test Results:")
print(f" Codec2 only: {'✅ PASS' if codec_ok else '❌ FAIL'}")
print(f" Full pipeline: {'✅ PASS' if pipeline_ok else '❌ FAIL'}")
print(f" Byte conversion: {'✅ PASS' if bytes_ok else '❌ FAIL'}")
if codec_ok and pipeline_ok and bytes_ok:
print("\n✅ All tests passed!")
print("\nIf playback still doesn't work, check:")
print("1. Is the audio data actually being sent? (check debug logs)")
print("2. Is PyAudio stream format correct? (16-bit, 8kHz, mono)")
print("3. Is the volume turned up?")
else:
print("\n❌ Some tests failed - this explains the playback issue")
if __name__ == "__main__":
main()

View File

@ -1,127 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify audio setup for DryBox
"""
import os
import sys
import wave
def check_audio_file():
"""Check if input.wav exists and has correct format"""
wav_path = "wav/input.wav"
if not os.path.exists(wav_path):
print(f"{wav_path} not found!")
return False
try:
with wave.open(wav_path, 'rb') as wf:
channels = wf.getnchannels()
framerate = wf.getframerate()
sampwidth = wf.getsampwidth()
nframes = wf.getnframes()
duration = nframes / framerate
print(f"✅ Audio file: {wav_path}")
print(f" Channels: {channels} {'' if channels == 1 else '❌ (should be 1)'}")
print(f" Sample rate: {framerate}Hz {'' if framerate == 8000 else '❌ (should be 8000)'}")
print(f" Sample width: {sampwidth * 8} bits {'' if sampwidth == 2 else ''}")
print(f" Duration: {duration:.2f} seconds")
print(f" Size: {os.path.getsize(wav_path) / 1024:.1f} KB")
return channels == 1 and framerate == 8000
except Exception as e:
print(f"❌ Error reading {wav_path}: {e}")
return False
def check_pyaudio():
"""Check if PyAudio is installed and working"""
try:
import pyaudio
p = pyaudio.PyAudio()
# Check for output devices
output_devices = 0
for i in range(p.get_device_count()):
info = p.get_device_info_by_index(i)
if info['maxOutputChannels'] > 0:
output_devices += 1
p.terminate()
print(f"✅ PyAudio installed")
print(f" Output devices available: {output_devices}")
return True
except ImportError:
print("❌ PyAudio not installed")
print(" To enable playback, run:")
print(" sudo dnf install python3-devel portaudio-devel")
print(" pip install pyaudio")
return False
except Exception as e:
print(f"❌ PyAudio error: {e}")
return False
def check_dependencies():
"""Check all required dependencies"""
deps = {
'PyQt5': 'PyQt5',
'numpy': 'numpy',
'struct': None, # Built-in
'wave': None, # Built-in
}
print("\nDependency check:")
all_good = True
for module_name, pip_name in deps.items():
try:
__import__(module_name)
print(f"{module_name}")
except ImportError:
print(f"{module_name} not found")
if pip_name:
print(f" Install with: pip install {pip_name}")
all_good = False
return all_good
def main():
print("DryBox Audio Setup Test")
print("=" * 40)
# Change to DryBox directory if needed
if os.path.basename(os.getcwd()) != 'DryBox':
if os.path.exists('DryBox'):
os.chdir('DryBox')
print(f"Changed to DryBox directory: {os.getcwd()}")
print("\nChecking audio file...")
audio_ok = check_audio_file()
print("\nChecking PyAudio...")
pyaudio_ok = check_pyaudio()
print("\nChecking dependencies...")
deps_ok = check_dependencies()
print("\n" + "=" * 40)
if audio_ok and deps_ok:
print("✅ Audio setup is ready!")
if not pyaudio_ok:
print("⚠️ Playback disabled (PyAudio not available)")
print(" Recording will still work")
else:
print("❌ Audio setup needs attention")
print("\nUsage tips:")
print("1. Run the UI: python UI/main.py")
print("2. Click 'Run Automatic Test' or press Space")
print("3. Enable playback on Phone 2 with Ctrl+2")
print("4. You'll hear the decoded audio after handshake completes")
if __name__ == "__main__":
main()

View File

@ -1,714 +0,0 @@
"""
Voice codec integration for encrypted voice over GSM.
Implements Codec2 compression with FSK modulation for transmitting
encrypted voice data over standard GSM voice channels.
"""
import array
import math
import struct
from typing import Optional, Tuple, List
from dataclasses import dataclass
from enum import IntEnum
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# ANSI colors
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"
class Codec2Mode(IntEnum):
"""Codec2 bitrate modes."""
MODE_3200 = 0 # 3200 bps
MODE_2400 = 1 # 2400 bps
MODE_1600 = 2 # 1600 bps
MODE_1400 = 3 # 1400 bps
MODE_1300 = 4 # 1300 bps
MODE_1200 = 5 # 1200 bps (recommended for robustness)
MODE_700C = 6 # 700 bps
@dataclass
class Codec2Frame:
"""Represents a single Codec2 compressed voice frame."""
mode: Codec2Mode
bits: bytes
timestamp: float
frame_number: int
class Codec2Wrapper:
"""
Wrapper for Codec2 voice codec.
In production, this would use py_codec2 or ctypes bindings to libcodec2.
This is a simulation interface for protocol development.
"""
# Frame sizes in bits for each mode
FRAME_BITS = {
Codec2Mode.MODE_3200: 64,
Codec2Mode.MODE_2400: 48,
Codec2Mode.MODE_1600: 64,
Codec2Mode.MODE_1400: 56,
Codec2Mode.MODE_1300: 52,
Codec2Mode.MODE_1200: 48,
Codec2Mode.MODE_700C: 28
}
# Frame duration in ms
FRAME_MS = {
Codec2Mode.MODE_3200: 20,
Codec2Mode.MODE_2400: 20,
Codec2Mode.MODE_1600: 40,
Codec2Mode.MODE_1400: 40,
Codec2Mode.MODE_1300: 40,
Codec2Mode.MODE_1200: 40,
Codec2Mode.MODE_700C: 40
}
def __init__(self, mode: Codec2Mode = Codec2Mode.MODE_1200):
"""
Initialize Codec2 wrapper.
Args:
mode: Codec2 bitrate mode (default 1200 bps for robustness)
"""
self.mode = mode
self.frame_bits = self.FRAME_BITS[mode]
self.frame_bytes = (self.frame_bits + 7) // 8
self.frame_ms = self.FRAME_MS[mode]
self.frame_samples = int(8000 * self.frame_ms / 1000) # 8kHz sampling
self.frame_counter = 0
# Quiet initialization - no print
def encode(self, audio_samples) -> Optional[Codec2Frame]:
"""
Encode PCM audio samples to Codec2 frame.
Args:
audio_samples: PCM samples (8kHz, 16-bit signed)
Returns:
Codec2Frame or None if insufficient samples
"""
if len(audio_samples) < self.frame_samples:
return None
# In production: call codec2_encode(state, bits, samples)
# Simulation: create pseudo-compressed data
compressed = self._simulate_compression(audio_samples[:self.frame_samples])
frame = Codec2Frame(
mode=self.mode,
bits=compressed,
timestamp=self.frame_counter * self.frame_ms / 1000.0,
frame_number=self.frame_counter
)
self.frame_counter += 1
return frame
def decode(self, frame: Codec2Frame):
"""
Decode Codec2 frame to PCM audio samples.
Args:
frame: Codec2 compressed frame
Returns:
PCM samples (8kHz, 16-bit signed)
"""
if frame.mode != self.mode:
raise ValueError(f"Frame mode {frame.mode} doesn't match decoder mode {self.mode}")
# In production: call codec2_decode(state, samples, bits)
# Simulation: decompress to audio
return self._simulate_decompression(frame.bits)
def _simulate_compression(self, samples) -> bytes:
"""Simulate Codec2 compression (for testing)."""
# Convert to list if needed
if hasattr(samples, 'tolist'):
sample_list = samples.tolist()
elif hasattr(samples, '__iter__'):
sample_list = list(samples)
else:
sample_list = samples
# Extract basic features for simulation
if HAS_NUMPY and hasattr(samples, '__array__'):
# Convert to numpy array if needed
np_samples = np.asarray(samples, dtype=np.float32)
if len(np_samples) > 0:
mean_square = np.mean(np_samples ** 2)
energy = np.sqrt(mean_square) if not np.isnan(mean_square) else 0.0
zero_crossings = np.sum(np.diff(np.sign(np_samples)) != 0)
else:
energy = 0.0
zero_crossings = 0
else:
# Manual calculation without numpy
if sample_list and len(sample_list) > 0:
energy = math.sqrt(sum(s**2 for s in sample_list) / len(sample_list))
zero_crossings = sum(1 for i in range(1, len(sample_list))
if (sample_list[i-1] >= 0) != (sample_list[i] >= 0))
else:
energy = 0.0
zero_crossings = 0
# Pack into bytes (simplified)
# Ensure values are valid
energy_int = max(0, min(65535, int(energy)))
zc_int = max(0, min(65535, int(zero_crossings)))
data = struct.pack('<HH', energy_int, zc_int)
# Pad to expected frame size
data += b'\x00' * (self.frame_bytes - len(data))
return data[:self.frame_bytes]
def _simulate_decompression(self, compressed: bytes):
"""Simulate Codec2 decompression (for testing)."""
# Unpack features
if len(compressed) >= 4:
energy, zero_crossings = struct.unpack('<HH', compressed[:4])
else:
energy, zero_crossings = 1000, 100
# Generate synthetic speech-like signal
if HAS_NUMPY:
t = np.linspace(0, self.frame_ms/1000, self.frame_samples)
# Base frequency from zero crossings
freq = zero_crossings * 10 # Simplified mapping
# Generate harmonics
signal = np.zeros(self.frame_samples)
for harmonic in range(1, 4):
signal += np.sin(2 * np.pi * freq * harmonic * t) / harmonic
# Apply energy envelope
signal *= energy / 10000.0
# Convert to 16-bit PCM
return (signal * 32767).astype(np.int16)
else:
# Manual generation without numpy
samples = []
freq = zero_crossings * 10
for i in range(self.frame_samples):
t = i / 8000.0 # 8kHz sample rate
value = 0
for harmonic in range(1, 4):
value += math.sin(2 * math.pi * freq * harmonic * t) / harmonic
value *= energy / 10000.0
# Clamp to 16-bit range
sample = int(value * 32767)
sample = max(-32768, min(32767, sample))
samples.append(sample)
return array.array('h', samples)
class FSKModem:
"""
4-FSK modem for transmitting digital data over voice channels.
Designed to survive GSM/AMR/EVS vocoders.
"""
def __init__(self, sample_rate: int = 8000, baud_rate: int = 600):
"""
Initialize FSK modem.
Args:
sample_rate: Audio sample rate (Hz)
baud_rate: Symbol rate (baud)
"""
self.sample_rate = sample_rate
self.baud_rate = baud_rate
self.samples_per_symbol = int(sample_rate / baud_rate)
# 4-FSK frequencies (300-3400 Hz band)
self.frequencies = [
600, # 00
1200, # 01
1800, # 10
2400 # 11
]
# Preamble for synchronization (800 Hz, 100ms)
self.preamble_freq = 800
self.preamble_duration = 0.1 # seconds
# Quiet initialization - no print
def modulate(self, data: bytes, add_preamble: bool = True):
"""
Modulate binary data to FSK audio signal.
Args:
data: Binary data to modulate
add_preamble: Whether to add synchronization preamble
Returns:
Audio signal (normalized float32 array or list)
"""
# Convert bytes to dibits (2-bit symbols)
symbols = []
for byte in data:
symbols.extend([
(byte >> 6) & 0x03,
(byte >> 4) & 0x03,
(byte >> 2) & 0x03,
byte & 0x03
])
# Generate audio signal
signal = []
# Add preamble
if add_preamble:
preamble_samples = int(self.preamble_duration * self.sample_rate)
if HAS_NUMPY:
t = np.arange(preamble_samples) / self.sample_rate
preamble = np.sin(2 * np.pi * self.preamble_freq * t)
signal.extend(preamble)
else:
for i in range(preamble_samples):
t = i / self.sample_rate
value = math.sin(2 * math.pi * self.preamble_freq * t)
signal.append(value)
# Modulate symbols
for symbol in symbols:
freq = self.frequencies[symbol]
if HAS_NUMPY:
t = np.arange(self.samples_per_symbol) / self.sample_rate
tone = np.sin(2 * np.pi * freq * t)
signal.extend(tone)
else:
for i in range(self.samples_per_symbol):
t = i / self.sample_rate
value = math.sin(2 * math.pi * freq * t)
signal.append(value)
# Apply smoothing to reduce clicks
if HAS_NUMPY:
audio = np.array(signal, dtype=np.float32)
else:
audio = array.array('f', signal)
audio = self._apply_envelope(audio)
return audio
def demodulate(self, audio) -> Tuple[bytes, float]:
"""
Demodulate FSK audio signal to binary data.
Args:
audio: Audio signal
Returns:
Tuple of (demodulated data, confidence score)
"""
# Find preamble
preamble_start = self._find_preamble(audio)
if preamble_start < 0:
return b'', 0.0
# Skip preamble
data_start = preamble_start + int(self.preamble_duration * self.sample_rate)
# Demodulate symbols
symbols = []
confidence_scores = []
pos = data_start
while pos + self.samples_per_symbol <= len(audio):
symbol_audio = audio[pos:pos + self.samples_per_symbol]
symbol, confidence = self._demodulate_symbol(symbol_audio)
symbols.append(symbol)
confidence_scores.append(confidence)
pos += self.samples_per_symbol
# Convert symbols to bytes
data = bytearray()
for i in range(0, len(symbols), 4):
if i + 3 < len(symbols):
byte = (symbols[i] << 6) | (symbols[i+1] << 4) | (symbols[i+2] << 2) | symbols[i+3]
data.append(byte)
if HAS_NUMPY and confidence_scores:
avg_confidence = np.mean(confidence_scores)
else:
avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0
return bytes(data), avg_confidence
def _find_preamble(self, audio) -> int:
"""Find preamble in audio signal."""
# Simple energy-based detection
window_size = int(0.01 * self.sample_rate) # 10ms window
if HAS_NUMPY:
for i in range(0, len(audio) - window_size, window_size // 2):
window = audio[i:i + window_size]
# Check for preamble frequency
fft = np.fft.fft(window)
freqs = np.fft.fftfreq(len(window), 1/self.sample_rate)
# Find peak near preamble frequency
idx = np.argmax(np.abs(fft[:len(fft)//2]))
peak_freq = abs(freqs[idx])
if abs(peak_freq - self.preamble_freq) < 50: # 50 Hz tolerance
return i
else:
# Simple zero-crossing based detection without FFT
for i in range(0, len(audio) - window_size, window_size // 2):
window = list(audio[i:i + window_size])
# Count zero crossings
zero_crossings = 0
for j in range(1, len(window)):
if (window[j-1] >= 0) != (window[j] >= 0):
zero_crossings += 1
# Estimate frequency from zero crossings
estimated_freq = (zero_crossings * self.sample_rate) / (2 * len(window))
if abs(estimated_freq - self.preamble_freq) < 100: # 100 Hz tolerance
return i
return -1
def _demodulate_symbol(self, audio) -> Tuple[int, float]:
"""Demodulate a single FSK symbol."""
if HAS_NUMPY:
# FFT-based demodulation
fft = np.fft.fft(audio)
freqs = np.fft.fftfreq(len(audio), 1/self.sample_rate)
magnitude = np.abs(fft[:len(fft)//2])
# Find energy at each FSK frequency
energies = []
for freq in self.frequencies:
idx = np.argmin(np.abs(freqs[:len(freqs)//2] - freq))
energy = magnitude[idx]
energies.append(energy)
# Select symbol with highest energy
symbol = np.argmax(energies)
else:
# Goertzel algorithm for specific frequency detection
audio_list = list(audio) if hasattr(audio, '__iter__') else audio
energies = []
for freq in self.frequencies:
# Goertzel algorithm
omega = 2 * math.pi * freq / self.sample_rate
coeff = 2 * math.cos(omega)
s_prev = 0
s_prev2 = 0
for sample in audio_list:
s = sample + coeff * s_prev - s_prev2
s_prev2 = s_prev
s_prev = s
# Calculate magnitude
power = s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2
energies.append(math.sqrt(abs(power)))
# Select symbol with highest energy
symbol = energies.index(max(energies))
# Confidence is ratio of strongest to second strongest
sorted_energies = sorted(energies, reverse=True)
confidence = sorted_energies[0] / (sorted_energies[1] + 1e-6)
return symbol, min(confidence, 10.0) / 10.0
def _apply_envelope(self, audio):
"""Apply smoothing envelope to reduce clicks."""
# Simple raised cosine envelope
ramp_samples = int(0.002 * self.sample_rate) # 2ms ramps
if len(audio) > 2 * ramp_samples:
if HAS_NUMPY:
# Fade in
t = np.linspace(0, np.pi/2, ramp_samples)
audio[:ramp_samples] *= np.sin(t) ** 2
# Fade out
audio[-ramp_samples:] *= np.sin(t[::-1]) ** 2
else:
# Manual fade in
for i in range(ramp_samples):
t = (i / ramp_samples) * (math.pi / 2)
factor = math.sin(t) ** 2
audio[i] *= factor
# Manual fade out
for i in range(ramp_samples):
t = ((ramp_samples - 1 - i) / ramp_samples) * (math.pi / 2)
factor = math.sin(t) ** 2
audio[-(i+1)] *= factor
return audio
class VoiceProtocol:
"""
Integrates voice codec and modem with the Icing protocol
for encrypted voice transmission over GSM.
"""
def __init__(self, protocol_instance):
"""
Initialize voice protocol handler.
Args:
protocol_instance: IcingProtocol instance
"""
self.protocol = protocol_instance
self.codec = Codec2Wrapper(Codec2Mode.MODE_1200)
self.modem = FSKModem(sample_rate=8000, baud_rate=600)
# Voice crypto state
self.voice_iv_counter = 0
self.voice_sequence = 0
# Buffers
if HAS_NUMPY:
self.audio_buffer = np.array([], dtype=np.int16)
else:
self.audio_buffer = array.array('h') # 16-bit signed integers
self.frame_buffer = []
print(f"{GREEN}[VOICE]{RESET} Voice protocol initialized")
def process_voice_input(self, audio_samples):
"""
Process voice input: compress, encrypt, and modulate.
Args:
audio_samples: PCM audio samples (8kHz, 16-bit)
Returns:
Modulated audio signal ready for transmission (numpy array or array.array)
"""
# Add to buffer
if HAS_NUMPY:
self.audio_buffer = np.concatenate([self.audio_buffer, audio_samples])
else:
self.audio_buffer.extend(audio_samples)
# Process complete frames
modulated_audio = []
while len(self.audio_buffer) >= self.codec.frame_samples:
# Extract frame
if HAS_NUMPY:
frame_audio = self.audio_buffer[:self.codec.frame_samples]
self.audio_buffer = self.audio_buffer[self.codec.frame_samples:]
else:
frame_audio = array.array('h', self.audio_buffer[:self.codec.frame_samples])
del self.audio_buffer[:self.codec.frame_samples]
# Compress with Codec2
compressed_frame = self.codec.encode(frame_audio)
if not compressed_frame:
continue
# Encrypt frame
encrypted = self._encrypt_voice_frame(compressed_frame)
# Add FEC
protected = self._add_fec(encrypted)
# Modulate to audio
audio_signal = self.modem.modulate(protected, add_preamble=True)
modulated_audio.append(audio_signal)
if modulated_audio:
if HAS_NUMPY:
return np.concatenate(modulated_audio)
else:
# Concatenate array.array objects
result = array.array('f')
for audio in modulated_audio:
result.extend(audio)
return result
return None
def process_voice_output(self, modulated_audio):
"""
Process received audio: demodulate, decrypt, and decompress.
Args:
modulated_audio: Received FSK-modulated audio
Returns:
Decoded PCM audio samples (numpy array or array.array)
"""
# Demodulate
data, confidence = self.modem.demodulate(modulated_audio)
if confidence < 0.5:
print(f"{YELLOW}[VOICE]{RESET} Low demodulation confidence: {confidence:.2f}")
return None
# Remove FEC
frame_data = self._remove_fec(data)
if not frame_data:
return None
# Decrypt
compressed_frame = self._decrypt_voice_frame(frame_data)
if not compressed_frame:
return None
# Decompress
audio_samples = self.codec.decode(compressed_frame)
return audio_samples
def _encrypt_voice_frame(self, frame: Codec2Frame) -> bytes:
"""Encrypt a voice frame using ChaCha20-CTR."""
if not self.protocol.hkdf_key:
raise ValueError("No encryption key available")
# Prepare frame data
frame_data = struct.pack('<BIH',
frame.mode,
frame.frame_number,
len(frame.bits)
) + frame.bits
# Generate IV for this frame (ChaCha20 needs 16 bytes)
iv = struct.pack('<Q', self.voice_iv_counter) + b'\x00' * 8 # 8 + 8 = 16 bytes
self.voice_iv_counter += 1
# Encrypt using ChaCha20
from encryption import chacha20_encrypt
key = bytes.fromhex(self.protocol.hkdf_key)
encrypted = chacha20_encrypt(frame_data, key, iv)
# Add sequence number and IV hint
return struct.pack('<HQ', self.voice_sequence, self.voice_iv_counter) + encrypted
def _decrypt_voice_frame(self, data: bytes) -> Optional[Codec2Frame]:
"""Decrypt a voice frame."""
if len(data) < 10:
return None
# Extract sequence and IV hint
sequence, iv_hint = struct.unpack('<HQ', data[:10])
encrypted = data[10:]
# Generate IV (16 bytes for ChaCha20)
iv = struct.pack('<Q', iv_hint) + b'\x00' * 8
# Decrypt
from encryption import chacha20_decrypt
key = bytes.fromhex(self.protocol.hkdf_key)
try:
decrypted = chacha20_decrypt(encrypted, key, iv)
# Parse frame
mode, frame_num, bits_len = struct.unpack('<BIH', decrypted[:7])
bits = decrypted[7:7+bits_len]
return Codec2Frame(
mode=Codec2Mode(mode),
bits=bits,
timestamp=0, # Will be set by caller
frame_number=frame_num
)
except Exception as e:
print(f"{RED}[VOICE]{RESET} Decryption failed: {e}")
return None
def _add_fec(self, data: bytes) -> bytes:
"""Add forward error correction."""
# Simple repetition code (3x) for testing
# In production: use convolutional code or LDPC
fec_data = bytearray()
for byte in data:
# Repeat each byte 3 times
fec_data.extend([byte, byte, byte])
return bytes(fec_data)
def _remove_fec(self, data: bytes) -> Optional[bytes]:
"""Remove FEC and correct errors."""
if len(data) % 3 != 0:
return None
corrected = bytearray()
for i in range(0, len(data), 3):
# Majority voting
votes = [data[i], data[i+1], data[i+2]]
byte_value = max(set(votes), key=votes.count)
corrected.append(byte_value)
return bytes(corrected)
# Example usage
if __name__ == "__main__":
# Test Codec2 wrapper
print(f"\n{BLUE}=== Testing Codec2 Wrapper ==={RESET}")
codec = Codec2Wrapper(Codec2Mode.MODE_1200)
# Generate test audio
if HAS_NUMPY:
t = np.linspace(0, 0.04, 320) # 40ms at 8kHz
test_audio = (np.sin(2 * np.pi * 440 * t) * 16384).astype(np.int16)
else:
test_audio = array.array('h')
for i in range(320):
t = i * 0.04 / 320
value = int(math.sin(2 * math.pi * 440 * t) * 16384)
test_audio.append(value)
# Encode
frame = codec.encode(test_audio)
print(f"Encoded frame: {len(frame.bits)} bytes")
# Decode
decoded = codec.decode(frame)
print(f"Decoded audio: {len(decoded)} samples")
# Test FSK modem
print(f"\n{BLUE}=== Testing FSK Modem ==={RESET}")
modem = FSKModem()
# Test data
test_data = b"Hello, secure voice!"
# Modulate
modulated = modem.modulate(test_data)
print(f"Modulated: {len(modulated)} samples ({len(modulated)/8000:.2f}s)")
# Demodulate
demodulated, confidence = modem.demodulate(modulated)
print(f"Demodulated: {demodulated}")
print(f"Confidence: {confidence:.2%}")
print(f"Match: {demodulated == test_data}")

View File

@ -1,307 +0,0 @@
import os
import struct
from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
class MessageHeader:
"""
Header of an encrypted message (18 bytes total):
Clear Text Section (4 bytes):
- flag: 16 bits (0xBEEF by default)
- data_len: 16 bits (length of encrypted payload excluding tag)
Associated Data (14 bytes):
- retry: 8 bits (retry counter)
- connection_status: 4 bits (e.g., CRC required) + 4 bits padding
- iv/messageID: 96 bits (12 bytes)
"""
def __init__(self, flag: int, data_len: int, retry: int, connection_status: int, iv: bytes):
if not (0 <= flag < 65536):
raise ValueError("Flag must fit in 16 bits (0..65535)")
if not (0 <= data_len < 65536):
raise ValueError("Data length must fit in 16 bits (0..65535)")
if not (0 <= retry < 256):
raise ValueError("Retry must fit in 8 bits (0..255)")
if not (0 <= connection_status < 16):
raise ValueError("Connection status must fit in 4 bits (0..15)")
if len(iv) != 12:
raise ValueError("IV must be 12 bytes (96 bits)")
self.flag = flag # 16 bits
self.data_len = data_len # 16 bits
self.retry = retry # 8 bits
self.connection_status = connection_status # 4 bits
self.iv = iv # 96 bits (12 bytes)
def pack(self) -> bytes:
"""Pack header into 18 bytes."""
# Pack flag and data_len (4 bytes)
header = struct.pack('>H H', self.flag, self.data_len)
# Pack retry and connection_status (2 bytes)
# connection_status in high 4 bits of second byte, 4 bits padding as zero
ad_byte = (self.connection_status & 0x0F) << 4
ad_packed = struct.pack('>B B', self.retry, ad_byte)
# Append IV (12 bytes)
return header + ad_packed + self.iv
def get_associated_data(self) -> bytes:
"""Get the associated data for AEAD encryption (retry, conn_status, iv)."""
# Pack retry and connection_status
ad_byte = (self.connection_status & 0x0F) << 4
ad_packed = struct.pack('>B B', self.retry, ad_byte)
# Append IV
return ad_packed + self.iv
@classmethod
def unpack(cls, data: bytes) -> 'MessageHeader':
"""Unpack 18 bytes into a MessageHeader object."""
if len(data) < 18:
raise ValueError(f"Header data too short: {len(data)} bytes, expected 18")
flag, data_len = struct.unpack('>H H', data[:4])
retry, ad_byte = struct.unpack('>B B', data[4:6])
connection_status = (ad_byte >> 4) & 0x0F
iv = data[6:18]
return cls(flag, data_len, retry, connection_status, iv)
class EncryptedMessage:
"""
Encrypted message packet format:
- Header (18 bytes):
* flag: 16 bits
* data_len: 16 bits
* retry: 8 bits
* connection_status: 4 bits (+ 4 bits padding)
* iv/messageID: 96 bits (12 bytes)
- Payload: variable length encrypted data
- Footer:
* Authentication tag: 128 bits (16 bytes)
* CRC32: 32 bits (4 bytes) - optional, based on connection_status
"""
def __init__(self, plaintext: bytes, key: bytes, flag: int = 0xBEEF,
retry: int = 0, connection_status: int = 0, iv: bytes = None,
cipher_type: int = 0):
self.plaintext = plaintext
self.key = key
self.flag = flag
self.retry = retry
self.connection_status = connection_status
self.iv = iv or generate_iv(initial=True)
self.cipher_type = cipher_type # 0 = AES-256-GCM, 1 = ChaCha20-Poly1305
# Will be set after encryption
self.ciphertext = None
self.tag = None
self.header = None
def encrypt(self) -> bytes:
"""Encrypt the plaintext and return the full encrypted message."""
# Create header with correct data_len (which will be set after encryption)
self.header = MessageHeader(
flag=self.flag,
data_len=0, # Will be updated after encryption
retry=self.retry,
connection_status=self.connection_status,
iv=self.iv
)
# Get associated data for AEAD
aad = self.header.get_associated_data()
# Encrypt using the appropriate cipher
if self.cipher_type == 0: # AES-256-GCM
cipher = AESGCM(self.key)
ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad)
elif self.cipher_type == 1: # ChaCha20-Poly1305
cipher = ChaCha20Poly1305(self.key)
ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad)
else:
raise ValueError(f"Unsupported cipher type: {self.cipher_type}")
# Extract ciphertext and tag
self.tag = ciphertext_with_tag[-16:]
self.ciphertext = ciphertext_with_tag[:-16]
# Update header with actual data length
self.header.data_len = len(self.ciphertext)
# Pack everything together
packed_header = self.header.pack()
# Check if CRC is required (based on connection_status)
if self.connection_status & 0x01: # Lowest bit indicates CRC required
import zlib
# Compute CRC32 of header + ciphertext + tag
crc = zlib.crc32(packed_header + self.ciphertext + self.tag) & 0xffffffff
crc_bytes = struct.pack('>I', crc)
return packed_header + self.ciphertext + self.tag + crc_bytes
else:
return packed_header + self.ciphertext + self.tag
@classmethod
def decrypt(cls, data: bytes, key: bytes, cipher_type: int = 0) -> Tuple[bytes, MessageHeader]:
"""
Decrypt an encrypted message and return the plaintext and header.
Args:
data: The full encrypted message
key: The encryption key
cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305
Returns:
Tuple of (plaintext, header)
"""
if len(data) < 18 + 16: # Header + minimum tag size
raise ValueError("Message too short")
# Extract header
header_bytes = data[:18]
header = MessageHeader.unpack(header_bytes)
# Get ciphertext and tag
data_len = header.data_len
ciphertext_start = 18
ciphertext_end = ciphertext_start + data_len
if ciphertext_end + 16 > len(data):
raise ValueError("Message length does not match header's data_len")
ciphertext = data[ciphertext_start:ciphertext_end]
tag = data[ciphertext_end:ciphertext_end + 16]
# Get associated data for AEAD
aad = header.get_associated_data()
# Combine ciphertext and tag for decryption
ciphertext_with_tag = ciphertext + tag
# Decrypt using the appropriate cipher
try:
if cipher_type == 0: # AES-256-GCM
cipher = AESGCM(key)
plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad)
elif cipher_type == 1: # ChaCha20-Poly1305
cipher = ChaCha20Poly1305(key)
plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad)
else:
raise ValueError(f"Unsupported cipher type: {cipher_type}")
return plaintext, header
except Exception as e:
raise ValueError(f"Decryption failed: {e}")
def generate_iv(initial: bool = False, previous_iv: bytes = None) -> bytes:
"""
Generate a 96-bit IV (12 bytes).
Args:
initial: If True, return a random IV
previous_iv: The previous IV to increment
Returns:
A new IV
"""
if initial or previous_iv is None:
return os.urandom(12) # 96 bits
else:
# Increment the previous IV by 1 modulo 2^96
iv_int = int.from_bytes(previous_iv, 'big')
iv_int = (iv_int + 1) % (1 << 96)
return iv_int.to_bytes(12, 'big')
# Convenience functions to match original API
def encrypt_message(plaintext: bytes, key: bytes, flag: int = 0xBEEF,
retry: int = 0, connection_status: int = 0,
iv: bytes = None, cipher_type: int = 0) -> bytes:
"""
Encrypt a message using the specified parameters.
Args:
plaintext: The data to encrypt
key: The encryption key (32 bytes for AES-256-GCM, 32 bytes for ChaCha20-Poly1305)
flag: 16-bit flag value (default: 0xBEEF)
retry: 8-bit retry counter
connection_status: 4-bit connection status
iv: Optional 96-bit IV (if None, a random one will be generated)
cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305
Returns:
The full encrypted message
"""
message = EncryptedMessage(
plaintext=plaintext,
key=key,
flag=flag,
retry=retry,
connection_status=connection_status,
iv=iv,
cipher_type=cipher_type
)
return message.encrypt()
def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes:
"""
Decrypt a message.
Args:
message: The full encrypted message
key: The encryption key
cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305
Returns:
The decrypted plaintext
"""
plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type)
return plaintext
# ChaCha20-CTR functions for voice streaming (without authentication)
def chacha20_encrypt(plaintext: bytes, key: bytes, nonce: bytes) -> bytes:
"""
Encrypt plaintext using ChaCha20 in CTR mode (no authentication).
Args:
plaintext: Data to encrypt
key: 32-byte key
nonce: 16-byte nonce (for ChaCha20 in cryptography library)
Returns:
Ciphertext
"""
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
if len(key) != 32:
raise ValueError("ChaCha20 key must be 32 bytes")
if len(nonce) != 16:
raise ValueError("ChaCha20 nonce must be 16 bytes")
cipher = Cipher(
algorithms.ChaCha20(key, nonce),
mode=None,
backend=default_backend()
)
encryptor = cipher.encryptor()
return encryptor.update(plaintext) + encryptor.finalize()
def chacha20_decrypt(ciphertext: bytes, key: bytes, nonce: bytes) -> bytes:
"""
Decrypt ciphertext using ChaCha20 in CTR mode (no authentication).
Args:
ciphertext: Data to decrypt
key: 32-byte key
nonce: 12-byte nonce
Returns:
Plaintext
"""
# ChaCha20 is symmetrical - encryption and decryption are the same
return chacha20_encrypt(ciphertext, key, nonce)

View File

@ -1,32 +0,0 @@
#!/usr/bin/env python3
"""Debug script to trace the UI behavior"""
import sys
from pathlib import Path
# Monkey patch the integrated_protocol to see what's being called
orig_file = Path(__file__).parent / "DryBox" / "integrated_protocol.py"
backup_file = Path(__file__).parent / "DryBox" / "integrated_protocol_backup.py"
# Read the original file
with open(orig_file, 'r') as f:
content = f.read()
# Add debug prints
debug_content = content.replace(
'def initiate_key_exchange(self, cipher_type=1, is_initiator=True):',
'''def initiate_key_exchange(self, cipher_type=1, is_initiator=True):
import traceback
print(f"\\n[DEBUG] initiate_key_exchange called with is_initiator={is_initiator}")
print("[DEBUG] Call stack:")
for line in traceback.format_stack()[:-1]:
print(line.strip())
print()'''
)
# Write the debug version
with open(orig_file, 'w') as f:
f.write(debug_content)
print("Debug patch applied. Run the UI now to see the trace.")
print("To restore: cp DryBox/integrated_protocol_backup.py DryBox/integrated_protocol.py")