diff --git a/protocol_prototype/DryBox/AUDIO_SETUP.md b/protocol_prototype/DryBox/AUDIO_SETUP.md deleted file mode 100644 index 05c38db..0000000 --- a/protocol_prototype/DryBox/AUDIO_SETUP.md +++ /dev/null @@ -1,52 +0,0 @@ -# Audio Setup Guide for DryBox - -## Installing PyAudio on Fedora - -PyAudio requires system dependencies before installation: - -```bash -# Install required system packages -sudo dnf install python3-devel portaudio-devel - -# Then install PyAudio -pip install pyaudio -``` - -## Alternative: Run Without PyAudio - -If you prefer not to install PyAudio, the application will still work but without real-time playback. You can still: -- Record audio to files -- Process and export audio -- Use all other features - -To run without PyAudio, the audio_player.py module will gracefully handle the missing dependency. - -## Ubuntu/Debian Installation - -```bash -sudo apt-get install python3-dev portaudio19-dev -pip install pyaudio -``` - -## macOS Installation - -```bash -brew install portaudio -pip install pyaudio -``` - -## Troubleshooting - -If you see "No module named 'pyaudio'" errors: -1. The app will continue to work without playback -2. Recording and processing features remain available -3. Install PyAudio later when convenient - -## Testing Audio Features - -1. Run the application: `python UI/main.py` -2. Start a call between phones -3. Test features: - - Recording: Works without PyAudio - - Playback: Requires PyAudio - - Processing: Works without PyAudio \ No newline at end of file diff --git a/protocol_prototype/DryBox/AUDIO_TESTING_GUIDE.md b/protocol_prototype/DryBox/AUDIO_TESTING_GUIDE.md deleted file mode 100644 index 7b9501c..0000000 --- a/protocol_prototype/DryBox/AUDIO_TESTING_GUIDE.md +++ /dev/null @@ -1,118 +0,0 @@ -# Audio Testing Guide for DryBox - -## Setup Verification - -1. **Start the server first**: - ```bash - python server.py - ``` - -2. **Run the UI**: - ```bash - python UI/main.py - ``` - -## Testing Audio Playback - -### Step 1: Test PyAudio is Working -When you enable playback (Ctrl+1 or Ctrl+2), you should hear a short beep (100ms, 1kHz tone). This confirms: -- PyAudio is properly installed -- Audio output device is working -- Stream format is correct - -### Step 2: Test During Call -1. Click "Run Automatic Test" or press Space -2. **Immediately** enable playback on Phone 2 (Ctrl+2) - - You should hear the test beep -3. Watch the debug console for: - - "Phone 2 playback started" - - "Phone 2 sent test beep to verify audio" -4. Wait for handshake to complete (steps 4-5 in test) -5. Once voice session starts, you should see: - - "Phone 2 received audio data: XXX bytes" - - "Phone 2 forwarding audio to player" - - "Client 1 playback thread got XXX bytes" - -### What to Look For in Debug Console - -**Good signs:** -``` -[AudioPlayer] Client 1 add_audio_data called with 640 bytes -[AudioPlayer] Client 1 added to buffer, queue size: 1 -[AudioPlayer] Client 1 playback thread got 640 bytes -``` - -**Problem signs:** -``` -[AudioPlayer] Client 1 has no buffer (playback not started?) -Low confidence demodulation: 0.XX -Codec decode returned None or empty -``` - -## Troubleshooting - -### No Test Beep -- Check system volume -- Verify PyAudio: `python test_audio_setup.py` -- Check audio device: `python -c "import pyaudio; p=pyaudio.PyAudio(); print(p.get_default_output_device_info())"` - -### Test Beep Works but No Voice Audio -1. **Check if audio is being transmitted:** - - Phone 1 should show: "sent N voice frames" - - Phone 2 should show: "Received voice data frame #N" - -2. **Check if audio is being decoded:** - - Look for: "Decoded PCM samples: type=, len=320" - - Look for: "Emitting PCM bytes: 640 bytes" - -3. **Check if audio reaches the player:** - - Look for: "Phone 2 received audio data: 640 bytes" - - Look for: "Client 1 add_audio_data called with 640 bytes" - -### Audio Sounds Distorted -This is normal! The system uses: -- Codec2 at 1200bps (very low bitrate) -- 4FSK modulation -- This creates robotic/vocoder-like sound - -### Manual Testing Commands - -Test just the codec: -```python -python test_audio_pipeline.py -``` - -Play the test outputs: -```bash -# Original -aplay wav/input.wav - -# Codec only (should sound robotic) -aplay wav/test_codec_only.wav - -# Full pipeline (codec + FSK) -aplay wav/test_full_pipeline.wav -``` - -## Expected Audio Flow - -1. Phone 1 reads `wav/input.wav` (8kHz mono) -2. Encodes 320 samples (40ms) with Codec2 → 6 bytes -3. Modulates with 4FSK → ~1112 float samples -4. Encrypts with Noise XK -5. Sends to server -6. Server routes to Phone 2 -7. Phone 2 decrypts with Noise XK -8. Demodulates FSK → 6 bytes -9. Decodes with Codec2 → 320 samples (640 bytes PCM) -10. Sends to PyAudio for playback - -## Recording Feature - -To save received audio: -1. Press Alt+1 or Alt+2 to start recording -2. Let it run during the call -3. Press again to stop and save -4. Check `wav/` directory for saved files - -This helps verify if audio is being received even if playback isn't working. \ No newline at end of file diff --git a/protocol_prototype/DryBox/PHONE2_PLAYBACK.md b/protocol_prototype/DryBox/PHONE2_PLAYBACK.md deleted file mode 100644 index e0735b8..0000000 --- a/protocol_prototype/DryBox/PHONE2_PLAYBACK.md +++ /dev/null @@ -1,58 +0,0 @@ -# Phone 2 Playback - What It Actually Plays - -## The Complete Audio Flow for Phone 2 - -When Phone 2 receives audio, it goes through this exact process: - -### 1. Network Reception -- Encrypted data arrives from server -- Data includes Noise XK encrypted voice frames - -### 2. Decryption (Noise XK) -- `protocol_phone_client.py` line 156-165: Noise wrapper decrypts the data -- Result: Decrypted voice message containing FSK modulated signal - -### 3. Demodulation (4FSK) -- `_handle_voice_data()` line 223: FSK demodulation -- Converts modulated signal back to 6 bytes of compressed data -- Only processes if confidence > 0.5 - -### 4. Decompression (Codec2 Decode) -- Line 236: `pcm_samples = self.codec.decode(frame)` -- Converts 6 bytes → 320 samples (640 bytes PCM) -- This is the final audio ready for playback - -### 5. Playback -- Line 264: `self.data_received.emit(pcm_bytes, self.client_id)` -- PCM audio sent to audio player -- PyAudio plays the 16-bit, 8kHz mono audio - -## What You Hear on Phone 2 - -Phone 2 plays audio that has been: -- ✅ Encrypted → Decrypted (Noise XK) -- ✅ Modulated → Demodulated (4FSK) -- ✅ Compressed → Decompressed (Codec2) - -The audio will sound: -- **Robotic/Vocoder-like** due to 1200bps Codec2 compression -- **Slightly delayed** due to processing pipeline -- **But intelligible** - you can understand speech - -## Fixed Issues - -1. **Silent beginning**: Now skips first second of silence in input.wav -2. **Control messages**: No longer sent to audio player -3. **Debug spam**: Reduced to show only important frames - -## Testing Phone 2 Playback - -1. Run automatic test (Space) -2. Enable Phone 2 playback (Ctrl+2) -3. Wait for handshake to complete -4. You should hear: - - Audio starting from 1 second into input.wav - - Processed through full protocol stack - - Robotic but understandable audio - -The key point: Phone 2 IS playing fully processed audio (decrypted + demodulated + decompressed)! \ No newline at end of file diff --git a/protocol_prototype/DryBox/PLAYBACK_FIXED.md b/protocol_prototype/DryBox/PLAYBACK_FIXED.md deleted file mode 100644 index 2710bd3..0000000 --- a/protocol_prototype/DryBox/PLAYBACK_FIXED.md +++ /dev/null @@ -1,67 +0,0 @@ -# Fixed Audio Playback Guide - -## How Playback Now Works - -### Phone 1 (Sender) Playback -- **What it plays**: Original audio from `input.wav` BEFORE encoding -- **When to enable**: During a call to hear what you're sending -- **Audio quality**: Clear, unprocessed 8kHz mono audio - -### Phone 2 (Receiver) Playback -- **What it plays**: Decoded audio AFTER the full pipeline (Codec2 → FSK → Noise XK → transmission → decryption → demodulation → decoding) -- **When to enable**: During a call to hear what's being received -- **Audio quality**: Robotic/vocoder sound due to 1200bps Codec2 compression - -## Changes Made - -1. **Fixed control message routing** - 8-byte control messages no longer sent to audio player -2. **Phone 1 now plays original audio** when sending (before encoding) -3. **Removed test beep** - you'll hear actual audio immediately -4. **Added size filter** - only audio data (≥320 bytes) is processed - -## Testing Steps - -1. Start server: `python server.py` -2. Start UI: `python UI/main.py` -3. Run automatic test (Space key) -4. **For Phone 1 playback**: Press Ctrl+1 to hear the original `input.wav` being sent -5. **For Phone 2 playback**: Press Ctrl+2 to hear the decoded audio after transmission - -## Expected Debug Output - -**Good signs for Phone 1 (sender):** -``` -Phone 1 playing original audio (sender playback) -[AudioPlayer] Client 0 add_audio_data called with 640 bytes -``` - -**Good signs for Phone 2 (receiver):** -``` -Phone 2 received audio data: 640 bytes -Phone 2 forwarding audio to player (playback enabled) -[AudioPlayer] Client 1 add_audio_data called with 640 bytes -``` - -**Fixed issues:** -``` -Phone 2 received non-audio data: 8 bytes (ignoring) # Control messages now filtered out -``` - -## Audio Quality Expectations - -- **Phone 1**: Should sound identical to `input.wav` -- **Phone 2**: Will sound robotic/compressed due to: - - Codec2 compression at 1200bps (very low bitrate) - - 4FSK modulation/demodulation - - This is normal and proves the protocol is working! - -## Troubleshooting - -If you still don't hear audio: - -1. **Check debug console** for the messages above -2. **Verify handshake completes** before expecting audio -3. **Try recording** (Alt+1/2) to save audio for offline playback -4. **Check system volume** and audio device - -The most important fix: control messages are no longer sent to the audio player, so you should only receive actual 640-byte audio frames. \ No newline at end of file diff --git a/protocol_prototype/DryBox/PLAYBACK_SUMMARY.md b/protocol_prototype/DryBox/PLAYBACK_SUMMARY.md deleted file mode 100644 index b8c0ee6..0000000 --- a/protocol_prototype/DryBox/PLAYBACK_SUMMARY.md +++ /dev/null @@ -1,83 +0,0 @@ -# Audio Playback Implementation Summary - -## Key Fixes Applied - -### 1. Separated Sender vs Receiver Playback -- **Phone 1 (Sender)**: Now plays the original `input.wav` audio when transmitting -- **Phone 2 (Receiver)**: Plays the decoded audio after full protocol processing - -### 2. Fixed Control Message Routing -- Control messages (like "CALL_END" - 8 bytes) no longer sent to audio player -- Added size filter: only data ≥320 bytes is considered audio -- Removed problematic `data_received.emit()` for non-audio messages - -### 3. Improved Debug Logging -- Reduced verbosity: logs only first frame and every 25th frame -- Clear indication of what's happening at each stage -- Separate logging for sender vs receiver playback - -### 4. Code Changes Made - -**phone_manager.py**: -- Added original audio playback for sender -- Added size filter for received data -- Improved debug logging with frame counters - -**protocol_phone_client.py**: -- Removed control message emission to data_received -- Added confidence logging for demodulation -- Reduced debug verbosity - -**audio_player.py**: -- Added frame counting for debug -- Reduced playback thread logging -- Better buffer status reporting - -**main.py**: -- Fixed lambda signal connection issue -- Improved UI scaling with flexible layouts - -## How to Test - -1. Start server and UI -2. Run automatic test (Space) -3. Enable playback: - - **Ctrl+1**: Hear original audio from Phone 1 - - **Ctrl+2**: Hear decoded audio on Phone 2 - -## Expected Behavior - -**Phone 1 with playback enabled:** -- Clear audio matching `input.wav` -- Shows "playing original audio (sender playback)" - -**Phone 2 with playback enabled:** -- Robotic/compressed audio (normal for 1200bps) -- Shows "received audio frame #N: 640 bytes" -- No more "8 bytes" messages - -## Audio Flow -``` -Phone 1: Phone 2: -input.wav (8kHz) - ↓ -[Playback here if enabled] - ↓ -Codec2 encode (1200bps) - ↓ -4FSK modulate - ↓ -Noise XK encrypt - ↓ -→ Network transmission → - ↓ - Noise XK decrypt - ↓ - 4FSK demodulate - ↓ - Codec2 decode - ↓ - [Playback here if enabled] -``` - -The playback now correctly plays audio at the right points in the pipeline! \ No newline at end of file diff --git a/protocol_prototype/DryBox/README.md b/protocol_prototype/DryBox/README.md deleted file mode 100644 index 0731eb5..0000000 --- a/protocol_prototype/DryBox/README.md +++ /dev/null @@ -1,60 +0,0 @@ -# DryBox - Secure Voice Over GSM Protocol - -A secure voice communication protocol that transmits encrypted voice data over standard GSM voice channels. - -## Architecture - -- **Noise XK Protocol**: Provides authenticated key exchange and secure channel -- **Codec2**: Voice compression (1200 bps mode) -- **4FSK Modulation**: Converts digital data to audio tones -- **Encryption**: ChaCha20-Poly1305 for secure communication - -## Project Structure - -``` -DryBox/ -├── UI/ # User interface components -│ ├── main.py # Main PyQt5 application -│ ├── phone_manager.py # Phone state management -│ ├── protocol_phone_client.py # Protocol client implementation -│ ├── noise_wrapper.py # Noise XK wrapper -│ └── ... -├── simulator/ # GSM channel simulator -│ └── gsm_simulator.py # Simulates GSM voice channel -├── voice_codec.py # Codec2 and FSK modem implementation -├── encryption.py # Encryption utilities -└── wav/ # Audio test files - -``` - -## Running the Protocol - -1. Start the GSM simulator: -```bash -cd simulator -python3 gsm_simulator.py -``` - -2. Run the UI application: -```bash -./run_ui.sh -# or -python3 UI/main.py -``` - -## Usage - -1. Click "Call" on Phone 1 to initiate -2. Click "Answer" on Phone 2 to accept -3. The protocol will automatically: - - Establish secure connection via Noise XK - - Start voice session - - Compress and encrypt voice data - - Transmit over simulated GSM channel - -## Requirements - -- Python 3.6+ -- PyQt5 -- dissononce (Noise protocol) -- numpy (optional, for optimized audio processing) \ No newline at end of file diff --git a/protocol_prototype/DryBox/UI/phone_client.py b/protocol_prototype/DryBox/UI/phone_client.py deleted file mode 100644 index 657452b..0000000 --- a/protocol_prototype/DryBox/UI/phone_client.py +++ /dev/null @@ -1,109 +0,0 @@ -import socket -import time -import select -from PyQt5.QtCore import QThread, pyqtSignal -from client_state import ClientState - -class PhoneClient(QThread): - data_received = pyqtSignal(bytes, int) - state_changed = pyqtSignal(str, str, int) - - def __init__(self, client_id): - super().__init__() - self.host = "localhost" - self.port = 12345 - self.client_id = client_id - self.sock = None - self.running = True - self.state = ClientState(client_id) - - def connect_socket(self): - retries = 3 - for attempt in range(retries): - try: - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) - self.sock.settimeout(120) - self.sock.connect((self.host, self.port)) - print(f"Client {self.client_id} connected to {self.host}:{self.port}") - return True - except Exception as e: - print(f"Client {self.client_id} connection attempt {attempt + 1} failed: {e}") - if attempt < retries - 1: - time.sleep(1) - self.sock = None - return False - - def run(self): - while self.running: - if not self.sock: - if not self.connect_socket(): - print(f"Client {self.client_id} failed to connect after retries") - self.state_changed.emit("CALL_END", "", self.client_id) - break - try: - while self.running: - self.state.process_command(self) - self.state.check_handshake_timeout(self) - - # Always check for incoming data, even during handshake - if self.sock is None: - print(f"Client {self.client_id} socket is None, exiting inner loop") - break - readable, _, _ = select.select([self.sock], [], [], 0.01) - if readable: - try: - if self.sock is None: - print(f"Client {self.client_id} socket is None before recv, exiting") - break - data = self.sock.recv(1024) - if not data: - print(f"Client {self.client_id} disconnected") - self.state_changed.emit("CALL_END", "", self.client_id) - break - self.state.handle_data(self, data) - except socket.error as e: - print(f"Client {self.client_id} socket error: {e}") - self.state_changed.emit("CALL_END", "", self.client_id) - break - - self.msleep(1) - except Exception as e: - print(f"Client {self.client_id} unexpected error in run loop: {e}") - self.state_changed.emit("CALL_END", "", self.client_id) - break - finally: - if self.sock: - try: - self.sock.close() - except Exception as e: - print(f"Client {self.client_id} error closing socket: {e}") - self.sock = None - - def send(self, message): - if self.sock and self.running: - try: - if isinstance(message, str): - data = message.encode('utf-8') - self.sock.send(data) - print(f"Client {self.client_id} sent: {message}, length={len(data)}") - else: - self.sock.send(message) - print(f"Client {self.client_id} sent binary data, length={len(message)}") - except socket.error as e: - print(f"Client {self.client_id} send error: {e}") - self.state_changed.emit("CALL_END", "", self.client_id) - - def stop(self): - self.running = False - if self.sock: - try: - self.sock.close() - except Exception as e: - print(f"Client {self.client_id} error closing socket in stop: {e}") - self.sock = None - self.quit() - self.wait(1000) - - def start_handshake(self, initiator, keypair, peer_pubkey): - self.state.start_handshake(initiator, keypair, peer_pubkey) \ No newline at end of file diff --git a/protocol_prototype/DryBox/UI_FEATURES_GUIDE.md b/protocol_prototype/DryBox/UI_FEATURES_GUIDE.md deleted file mode 100644 index 0dc16af..0000000 --- a/protocol_prototype/DryBox/UI_FEATURES_GUIDE.md +++ /dev/null @@ -1,105 +0,0 @@ -# DryBox UI Features Guide - -## UI Improvements -The UI has been updated with responsive layouts that scale better: -- Phone displays now use flexible sizing (min/max constraints) -- Waveform widgets adapt to available space -- Buttons have flexible widths that scale with window size -- Better margins and padding for improved visual appearance - -## Audio Playback Feature - -The DryBox UI includes real-time audio playback capabilities that allow you to hear the decoded audio as it's received. - -### How to Use Playback - -#### Manual Control -1. **During a Call**: Once a secure voice session is established, click the "🔊 Playback" button under either phone -2. **Button States**: - - Gray (unchecked): Playback disabled - - Green (checked): Playback active -3. **Toggle Anytime**: You can enable/disable playback at any time during a call - -#### Keyboard Shortcuts -- `Ctrl+1`: Toggle playback for Phone 1 -- `Ctrl+2`: Toggle playback for Phone 2 - -### Using Playback with Automatic Test - -The automatic test feature demonstrates the complete protocol flow. Here's how to use it with playback: - -1. **Start the Test**: Click "🧪 Run Automatic Test" or press `Space` -2. **Enable Playback Early**: - - As soon as the test starts, enable playback on Phone 2 (Ctrl+2) - - This ensures you'll hear audio as soon as the secure channel is established -3. **What You'll Hear**: - - Once handshake completes (step 4-5), Phone 1 starts transmitting test audio - - Phone 2 will play the received, decoded audio through your speakers - - The audio goes through: Codec2 encoding → 4FSK modulation → Noise XK encryption → transmission → decryption → demodulation → Codec2 decoding - -### Audio Recording Feature - -You can also record received audio for later analysis: - -1. **Start Recording**: Click "⏺ Record" button (or press Alt+1/Alt+2) -2. **Stop Recording**: Click the button again -3. **Files Saved**: Recordings are saved to `wav/` directory with timestamps - -### Audio Processing Options - -Access advanced audio features via "Audio Options" button (Ctrl+A): -- **Export Buffer**: Save current audio buffer to file -- **Clear Buffer**: Clear accumulated audio data -- **Processing Options**: - - Normalize Audio - - Apply Gain (adjustable dB) - - Noise Gate - - Low/High Pass Filters - - Remove Silence - -### Requirements - -For playback to work, you need PyAudio installed: -```bash -# Fedora/RHEL -sudo dnf install python3-devel portaudio-devel -pip install pyaudio - -# Ubuntu/Debian -sudo apt-get install python3-dev portaudio19-dev -pip install pyaudio -``` - -If PyAudio isn't installed, recording will still work but playback will be disabled. - -### Troubleshooting - -1. **No Sound**: - - Check PyAudio is installed - - Ensure system volume is up - - Verify audio device is working - -2. **Choppy Audio**: - - Normal for low-bitrate codec (1200bps) - - Represents actual protocol performance - -3. **Delayed Start**: - - Audio only flows after secure handshake - - Wait for "🔒 Secure Channel Established" status - -### Test Sequence Overview - -The automatic test goes through these steps: -1. Initial state check -2. Phone 1 calls Phone 2 -3. Phone 2 answers -4. Noise XK handshake begins -5. Handshake completes, secure channel established -6. Voice session starts (Codec2 + 4FSK) -7. Audio transmission begins -8. Protocol details logged -9. Transmission continues for observation -10. Final statistics -11. Call ends, cleanup - -Enable playback on the receiving phone to hear the transmitted audio in real-time! \ No newline at end of file diff --git a/protocol_prototype/DryBox/test_audio_features.py b/protocol_prototype/DryBox/test_audio_features.py deleted file mode 100755 index 36636e7..0000000 --- a/protocol_prototype/DryBox/test_audio_features.py +++ /dev/null @@ -1,150 +0,0 @@ -#!/usr/bin/env python3 -"""Test script for audio features in DryBox""" - -import sys -import os -import wave -import struct -import time - -# Add parent directory to path -sys.path.append(os.path.dirname(os.path.abspath(__file__))) - -from UI.audio_player import AudioPlayer, PYAUDIO_AVAILABLE -from UI.audio_processor import AudioProcessor - -def create_test_audio(filename="test_tone.wav", duration=2, frequency=440): - """Create a test audio file with a sine wave""" - sample_rate = 8000 - num_samples = int(sample_rate * duration) - - # Generate sine wave - import math - samples = [] - for i in range(num_samples): - t = float(i) / sample_rate - value = int(32767 * 0.5 * math.sin(2 * math.pi * frequency * t)) - samples.append(value) - - # Save to WAV file - with wave.open(filename, 'wb') as wav_file: - wav_file.setnchannels(1) - wav_file.setsampwidth(2) - wav_file.setframerate(sample_rate) - wav_file.writeframes(struct.pack(f'{len(samples)}h', *samples)) - - print(f"Created test audio file: {filename}") - return filename - -def test_audio_player(): - """Test audio player functionality""" - print("\n=== Testing Audio Player ===") - - player = AudioPlayer() - player.set_debug_callback(print) - - if PYAUDIO_AVAILABLE: - print("PyAudio is available - testing playback") - - # Test playback - client_id = 0 - if player.start_playback(client_id): - print(f"Started playback for client {client_id}") - - # Create and play test audio - test_file = create_test_audio() - with wave.open(test_file, 'rb') as wav: - data = wav.readframes(wav.getnframes()) - - # Add audio data - chunk_size = 640 # 320 samples * 2 bytes - for i in range(0, len(data), chunk_size): - chunk = data[i:i+chunk_size] - player.add_audio_data(client_id, chunk) - time.sleep(0.04) # 40ms per chunk - - time.sleep(0.5) # Let playback finish - player.stop_playback(client_id) - print(f"Stopped playback for client {client_id}") - - # Clean up - os.remove(test_file) - else: - print("PyAudio not available - skipping playback test") - - # Test recording (works without PyAudio) - print("\n=== Testing Recording ===") - client_id = 1 - player.start_recording(client_id) - - # Add some test data - test_data = b'\x00\x01' * 320 # Simple test pattern - for i in range(10): - player.add_audio_data(client_id, test_data) - - save_path = player.stop_recording(client_id, "test_recording.wav") - if save_path and os.path.exists(save_path): - print(f"Recording saved successfully: {save_path}") - os.remove(save_path) - else: - print("Recording failed") - - player.cleanup() - print("Audio player test complete") - -def test_audio_processor(): - """Test audio processor functionality""" - print("\n=== Testing Audio Processor ===") - - processor = AudioProcessor() - processor.set_debug_callback(print) - - # Create test audio - test_file = create_test_audio("test_input.wav", duration=1, frequency=1000) - - # Read test audio - with wave.open(test_file, 'rb') as wav: - test_data = wav.readframes(wav.getnframes()) - - # Test various processing functions - print("\nTesting normalize:") - normalized = processor.normalize_audio(test_data, target_db=-6) - save_path = processor.save_processed_audio(normalized, test_file, "normalized") - if save_path: - print(f"Saved: {save_path}") - os.remove(save_path) - - print("\nTesting gain:") - gained = processor.apply_gain(test_data, gain_db=6) - save_path = processor.save_processed_audio(gained, test_file, "gained") - if save_path: - print(f"Saved: {save_path}") - os.remove(save_path) - - print("\nTesting filters:") - filtered = processor.apply_low_pass_filter(test_data) - save_path = processor.save_processed_audio(filtered, test_file, "lowpass") - if save_path: - print(f"Saved: {save_path}") - os.remove(save_path) - - # Clean up - os.remove(test_file) - print("\nAudio processor test complete") - -def main(): - """Run all tests""" - print("DryBox Audio Features Test") - print("==========================") - - if not PYAUDIO_AVAILABLE: - print("\nNOTE: PyAudio not installed. Playback tests will be skipped.") - print("To install: sudo dnf install python3-devel portaudio-devel && pip install pyaudio") - - test_audio_player() - test_audio_processor() - - print("\nAll tests complete!") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/protocol_prototype/DryBox/test_audio_flow.py b/protocol_prototype/DryBox/test_audio_flow.py deleted file mode 100644 index cb7c7bf..0000000 --- a/protocol_prototype/DryBox/test_audio_flow.py +++ /dev/null @@ -1,67 +0,0 @@ -#!/usr/bin/env python3 -""" -Test to verify audio is flowing through the system -""" - -import os -import wave -import struct - -def check_audio_file(): - """Verify input.wav has actual audio content""" - print("Checking input.wav content...") - - with wave.open("wav/input.wav", 'rb') as wf: - # Read multiple frames to check for silence - total_frames = wf.getnframes() - print(f"Total frames: {total_frames}") - - # Check beginning - wf.setpos(0) - frames = wf.readframes(320) - samples = struct.unpack('320h', frames) - max_val = max(abs(s) for s in samples) - print(f"Frame 0 (beginning): max amplitude = {max_val}") - - # Check middle - wf.setpos(total_frames // 2) - frames = wf.readframes(320) - samples = struct.unpack('320h', frames) - max_val = max(abs(s) for s in samples) - print(f"Frame {total_frames//2} (middle): max amplitude = {max_val}") - - # Check near end - wf.setpos(total_frames - 640) - frames = wf.readframes(320) - samples = struct.unpack('320h', frames) - max_val = max(abs(s) for s in samples) - print(f"Frame {total_frames-640} (near end): max amplitude = {max_val}") - - # Find first non-silent frame - wf.setpos(0) - for i in range(0, total_frames, 320): - frames = wf.readframes(320) - if len(frames) < 640: - break - samples = struct.unpack('320h', frames) - max_val = max(abs(s) for s in samples) - if max_val > 100: # Not silence - print(f"\nFirst non-silent frame at position {i}") - print(f"First 10 samples: {samples[:10]}") - break - -def main(): - # Change to DryBox directory if needed - if os.path.basename(os.getcwd()) != 'DryBox': - if os.path.exists('DryBox'): - os.chdir('DryBox') - - check_audio_file() - - print("\nTo fix silence at beginning of file:") - print("1. Skip initial silence in phone_manager.py") - print("2. Or use a different test file") - print("3. Or trim the silence from input.wav") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/protocol_prototype/DryBox/test_audio_pipeline.py b/protocol_prototype/DryBox/test_audio_pipeline.py deleted file mode 100644 index ff9dbdf..0000000 --- a/protocol_prototype/DryBox/test_audio_pipeline.py +++ /dev/null @@ -1,193 +0,0 @@ -#!/usr/bin/env python3 -""" -Test the audio pipeline (Codec2 + FSK) independently -""" - -import sys -import os -import wave -import struct -import numpy as np - -# Add parent directory to path -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -from voice_codec import Codec2Wrapper, FSKModem, Codec2Mode, Codec2Frame - -def test_codec_only(): - """Test just the codec2 encode/decode""" - print("\n1. Testing Codec2 only...") - - codec = Codec2Wrapper(mode=Codec2Mode.MODE_1200) - - # Read test audio - with wave.open("wav/input.wav", 'rb') as wf: - # Read 320 samples (40ms at 8kHz) - frames = wf.readframes(320) - if len(frames) < 640: # 320 samples * 2 bytes - print("Not enough audio data") - return False - - # Convert to samples - samples = struct.unpack(f'{len(frames)//2}h', frames) - print(f"Input: {len(samples)} samples, first 10: {samples[:10]}") - - # Encode - encoded = codec.encode(frames) - if encoded: - print(f"Encoded: {len(encoded.bits)} bytes") - print(f"First 10 bytes: {encoded.bits[:10].hex()}") - else: - print("Encoding failed!") - return False - - # Decode - decoded = codec.decode(encoded) - if decoded is not None: - print(f"Decoded: type={type(decoded)}, len={len(decoded)}") - if hasattr(decoded, '__getitem__'): - print(f"First 10 samples: {list(decoded[:10])}") - - # Save decoded audio - with wave.open("wav/test_codec_only.wav", 'wb') as out: - out.setnchannels(1) - out.setsampwidth(2) - out.setframerate(8000) - if hasattr(decoded, 'tobytes'): - out.writeframes(decoded.tobytes()) - else: - # Convert to bytes - import array - arr = array.array('h', decoded) - out.writeframes(arr.tobytes()) - print("Saved decoded audio to wav/test_codec_only.wav") - return True - else: - print("Decoding failed!") - return False - -def test_full_pipeline(): - """Test the full Codec2 + FSK pipeline""" - print("\n2. Testing full pipeline (Codec2 + FSK)...") - - codec = Codec2Wrapper(mode=Codec2Mode.MODE_1200) - modem = FSKModem() - - # Read test audio - with wave.open("wav/input.wav", 'rb') as wf: - frames = wf.readframes(320) - if len(frames) < 640: - print("Not enough audio data") - return False - - # Encode with Codec2 - encoded = codec.encode(frames) - if not encoded: - print("Codec encoding failed!") - return False - print(f"Codec2 encoded: {len(encoded.bits)} bytes") - - # Modulate with FSK - modulated = modem.modulate(encoded.bits) - print(f"FSK modulated: {len(modulated)} float samples") - - # Demodulate - demodulated, confidence = modem.demodulate(modulated) - print(f"FSK demodulated: {len(demodulated)} bytes, confidence: {confidence:.2f}") - - if confidence < 0.5: - print("Low confidence demodulation!") - return False - - # Create frame for decoding - frame = Codec2Frame( - mode=Codec2Mode.MODE_1200, - bits=demodulated, - timestamp=0, - frame_number=0 - ) - - # Decode with Codec2 - decoded = codec.decode(frame) - if decoded is not None: - print(f"Decoded: type={type(decoded)}, len={len(decoded)}") - - # Save decoded audio - with wave.open("wav/test_full_pipeline.wav", 'wb') as out: - out.setnchannels(1) - out.setsampwidth(2) - out.setframerate(8000) - if hasattr(decoded, 'tobytes'): - out.writeframes(decoded.tobytes()) - else: - # Convert to bytes - import array - arr = array.array('h', decoded) - out.writeframes(arr.tobytes()) - print("Saved decoded audio to wav/test_full_pipeline.wav") - return True - else: - print("Codec decoding failed!") - return False - -def test_byte_conversion(): - """Test the byte conversion that happens in the protocol""" - print("\n3. Testing byte conversion...") - - # Create test PCM data - test_samples = [100, -100, 200, -200, 300, -300, 0, 0, 1000, -1000] - - # Method 1: array.tobytes() - import array - arr = array.array('h', test_samples) - bytes1 = arr.tobytes() - print(f"array.tobytes(): {len(bytes1)} bytes, hex: {bytes1.hex()}") - - # Method 2: struct.pack - bytes2 = struct.pack(f'{len(test_samples)}h', *test_samples) - print(f"struct.pack(): {len(bytes2)} bytes, hex: {bytes2.hex()}") - - # They should be the same - print(f"Bytes match: {bytes1 == bytes2}") - - # Test unpacking - unpacked = struct.unpack(f'{len(bytes1)//2}h', bytes1) - print(f"Unpacked: {unpacked}") - print(f"Matches original: {list(unpacked) == test_samples}") - - return True - -def main(): - print("Audio Pipeline Test") - print("=" * 50) - - # Change to DryBox directory if needed - if os.path.basename(os.getcwd()) != 'DryBox': - if os.path.exists('DryBox'): - os.chdir('DryBox') - - # Ensure wav directory exists - os.makedirs("wav", exist_ok=True) - - # Run tests - codec_ok = test_codec_only() - pipeline_ok = test_full_pipeline() - bytes_ok = test_byte_conversion() - - print("\n" + "=" * 50) - print("Test Results:") - print(f" Codec2 only: {'✅ PASS' if codec_ok else '❌ FAIL'}") - print(f" Full pipeline: {'✅ PASS' if pipeline_ok else '❌ FAIL'}") - print(f" Byte conversion: {'✅ PASS' if bytes_ok else '❌ FAIL'}") - - if codec_ok and pipeline_ok and bytes_ok: - print("\n✅ All tests passed!") - print("\nIf playback still doesn't work, check:") - print("1. Is the audio data actually being sent? (check debug logs)") - print("2. Is PyAudio stream format correct? (16-bit, 8kHz, mono)") - print("3. Is the volume turned up?") - else: - print("\n❌ Some tests failed - this explains the playback issue") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/protocol_prototype/DryBox/test_audio_setup.py b/protocol_prototype/DryBox/test_audio_setup.py deleted file mode 100755 index 9e37cde..0000000 --- a/protocol_prototype/DryBox/test_audio_setup.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify audio setup for DryBox -""" - -import os -import sys -import wave - -def check_audio_file(): - """Check if input.wav exists and has correct format""" - wav_path = "wav/input.wav" - - if not os.path.exists(wav_path): - print(f"❌ {wav_path} not found!") - return False - - try: - with wave.open(wav_path, 'rb') as wf: - channels = wf.getnchannels() - framerate = wf.getframerate() - sampwidth = wf.getsampwidth() - nframes = wf.getnframes() - duration = nframes / framerate - - print(f"✅ Audio file: {wav_path}") - print(f" Channels: {channels} {'✅' if channels == 1 else '❌ (should be 1)'}") - print(f" Sample rate: {framerate}Hz {'✅' if framerate == 8000 else '❌ (should be 8000)'}") - print(f" Sample width: {sampwidth * 8} bits {'✅' if sampwidth == 2 else '❌'}") - print(f" Duration: {duration:.2f} seconds") - print(f" Size: {os.path.getsize(wav_path) / 1024:.1f} KB") - - return channels == 1 and framerate == 8000 - - except Exception as e: - print(f"❌ Error reading {wav_path}: {e}") - return False - -def check_pyaudio(): - """Check if PyAudio is installed and working""" - try: - import pyaudio - p = pyaudio.PyAudio() - - # Check for output devices - output_devices = 0 - for i in range(p.get_device_count()): - info = p.get_device_info_by_index(i) - if info['maxOutputChannels'] > 0: - output_devices += 1 - - p.terminate() - - print(f"✅ PyAudio installed") - print(f" Output devices available: {output_devices}") - return True - - except ImportError: - print("❌ PyAudio not installed") - print(" To enable playback, run:") - print(" sudo dnf install python3-devel portaudio-devel") - print(" pip install pyaudio") - return False - except Exception as e: - print(f"❌ PyAudio error: {e}") - return False - -def check_dependencies(): - """Check all required dependencies""" - deps = { - 'PyQt5': 'PyQt5', - 'numpy': 'numpy', - 'struct': None, # Built-in - 'wave': None, # Built-in - } - - print("\nDependency check:") - all_good = True - - for module_name, pip_name in deps.items(): - try: - __import__(module_name) - print(f"✅ {module_name}") - except ImportError: - print(f"❌ {module_name} not found") - if pip_name: - print(f" Install with: pip install {pip_name}") - all_good = False - - return all_good - -def main(): - print("DryBox Audio Setup Test") - print("=" * 40) - - # Change to DryBox directory if needed - if os.path.basename(os.getcwd()) != 'DryBox': - if os.path.exists('DryBox'): - os.chdir('DryBox') - print(f"Changed to DryBox directory: {os.getcwd()}") - - print("\nChecking audio file...") - audio_ok = check_audio_file() - - print("\nChecking PyAudio...") - pyaudio_ok = check_pyaudio() - - print("\nChecking dependencies...") - deps_ok = check_dependencies() - - print("\n" + "=" * 40) - if audio_ok and deps_ok: - print("✅ Audio setup is ready!") - if not pyaudio_ok: - print("⚠️ Playback disabled (PyAudio not available)") - print(" Recording will still work") - else: - print("❌ Audio setup needs attention") - - print("\nUsage tips:") - print("1. Run the UI: python UI/main.py") - print("2. Click 'Run Automatic Test' or press Space") - print("3. Enable playback on Phone 2 with Ctrl+2") - print("4. You'll hear the decoded audio after handshake completes") - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/protocol_prototype/DryBox/voice_codec.py b/protocol_prototype/DryBox/voice_codec.py deleted file mode 100644 index 796dea1..0000000 --- a/protocol_prototype/DryBox/voice_codec.py +++ /dev/null @@ -1,714 +0,0 @@ -""" -Voice codec integration for encrypted voice over GSM. -Implements Codec2 compression with FSK modulation for transmitting -encrypted voice data over standard GSM voice channels. -""" - -import array -import math -import struct -from typing import Optional, Tuple, List -from dataclasses import dataclass -from enum import IntEnum - -try: - import numpy as np - HAS_NUMPY = True -except ImportError: - HAS_NUMPY = False - -# ANSI colors -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -RESET = "\033[0m" - - -class Codec2Mode(IntEnum): - """Codec2 bitrate modes.""" - MODE_3200 = 0 # 3200 bps - MODE_2400 = 1 # 2400 bps - MODE_1600 = 2 # 1600 bps - MODE_1400 = 3 # 1400 bps - MODE_1300 = 4 # 1300 bps - MODE_1200 = 5 # 1200 bps (recommended for robustness) - MODE_700C = 6 # 700 bps - - -@dataclass -class Codec2Frame: - """Represents a single Codec2 compressed voice frame.""" - mode: Codec2Mode - bits: bytes - timestamp: float - frame_number: int - - -class Codec2Wrapper: - """ - Wrapper for Codec2 voice codec. - In production, this would use py_codec2 or ctypes bindings to libcodec2. - This is a simulation interface for protocol development. - """ - - # Frame sizes in bits for each mode - FRAME_BITS = { - Codec2Mode.MODE_3200: 64, - Codec2Mode.MODE_2400: 48, - Codec2Mode.MODE_1600: 64, - Codec2Mode.MODE_1400: 56, - Codec2Mode.MODE_1300: 52, - Codec2Mode.MODE_1200: 48, - Codec2Mode.MODE_700C: 28 - } - - # Frame duration in ms - FRAME_MS = { - Codec2Mode.MODE_3200: 20, - Codec2Mode.MODE_2400: 20, - Codec2Mode.MODE_1600: 40, - Codec2Mode.MODE_1400: 40, - Codec2Mode.MODE_1300: 40, - Codec2Mode.MODE_1200: 40, - Codec2Mode.MODE_700C: 40 - } - - def __init__(self, mode: Codec2Mode = Codec2Mode.MODE_1200): - """ - Initialize Codec2 wrapper. - - Args: - mode: Codec2 bitrate mode (default 1200 bps for robustness) - """ - self.mode = mode - self.frame_bits = self.FRAME_BITS[mode] - self.frame_bytes = (self.frame_bits + 7) // 8 - self.frame_ms = self.FRAME_MS[mode] - self.frame_samples = int(8000 * self.frame_ms / 1000) # 8kHz sampling - self.frame_counter = 0 - - # Quiet initialization - no print - - def encode(self, audio_samples) -> Optional[Codec2Frame]: - """ - Encode PCM audio samples to Codec2 frame. - - Args: - audio_samples: PCM samples (8kHz, 16-bit signed) - - Returns: - Codec2Frame or None if insufficient samples - """ - if len(audio_samples) < self.frame_samples: - return None - - # In production: call codec2_encode(state, bits, samples) - # Simulation: create pseudo-compressed data - compressed = self._simulate_compression(audio_samples[:self.frame_samples]) - - frame = Codec2Frame( - mode=self.mode, - bits=compressed, - timestamp=self.frame_counter * self.frame_ms / 1000.0, - frame_number=self.frame_counter - ) - - self.frame_counter += 1 - return frame - - def decode(self, frame: Codec2Frame): - """ - Decode Codec2 frame to PCM audio samples. - - Args: - frame: Codec2 compressed frame - - Returns: - PCM samples (8kHz, 16-bit signed) - """ - if frame.mode != self.mode: - raise ValueError(f"Frame mode {frame.mode} doesn't match decoder mode {self.mode}") - - # In production: call codec2_decode(state, samples, bits) - # Simulation: decompress to audio - return self._simulate_decompression(frame.bits) - - def _simulate_compression(self, samples) -> bytes: - """Simulate Codec2 compression (for testing).""" - # Convert to list if needed - if hasattr(samples, 'tolist'): - sample_list = samples.tolist() - elif hasattr(samples, '__iter__'): - sample_list = list(samples) - else: - sample_list = samples - - # Extract basic features for simulation - if HAS_NUMPY and hasattr(samples, '__array__'): - # Convert to numpy array if needed - np_samples = np.asarray(samples, dtype=np.float32) - if len(np_samples) > 0: - mean_square = np.mean(np_samples ** 2) - energy = np.sqrt(mean_square) if not np.isnan(mean_square) else 0.0 - zero_crossings = np.sum(np.diff(np.sign(np_samples)) != 0) - else: - energy = 0.0 - zero_crossings = 0 - else: - # Manual calculation without numpy - if sample_list and len(sample_list) > 0: - energy = math.sqrt(sum(s**2 for s in sample_list) / len(sample_list)) - zero_crossings = sum(1 for i in range(1, len(sample_list)) - if (sample_list[i-1] >= 0) != (sample_list[i] >= 0)) - else: - energy = 0.0 - zero_crossings = 0 - - # Pack into bytes (simplified) - # Ensure values are valid - energy_int = max(0, min(65535, int(energy))) - zc_int = max(0, min(65535, int(zero_crossings))) - data = struct.pack('= 4: - energy, zero_crossings = struct.unpack('> 6) & 0x03, - (byte >> 4) & 0x03, - (byte >> 2) & 0x03, - byte & 0x03 - ]) - - # Generate audio signal - signal = [] - - # Add preamble - if add_preamble: - preamble_samples = int(self.preamble_duration * self.sample_rate) - if HAS_NUMPY: - t = np.arange(preamble_samples) / self.sample_rate - preamble = np.sin(2 * np.pi * self.preamble_freq * t) - signal.extend(preamble) - else: - for i in range(preamble_samples): - t = i / self.sample_rate - value = math.sin(2 * math.pi * self.preamble_freq * t) - signal.append(value) - - # Modulate symbols - for symbol in symbols: - freq = self.frequencies[symbol] - if HAS_NUMPY: - t = np.arange(self.samples_per_symbol) / self.sample_rate - tone = np.sin(2 * np.pi * freq * t) - signal.extend(tone) - else: - for i in range(self.samples_per_symbol): - t = i / self.sample_rate - value = math.sin(2 * math.pi * freq * t) - signal.append(value) - - # Apply smoothing to reduce clicks - if HAS_NUMPY: - audio = np.array(signal, dtype=np.float32) - else: - audio = array.array('f', signal) - audio = self._apply_envelope(audio) - - return audio - - def demodulate(self, audio) -> Tuple[bytes, float]: - """ - Demodulate FSK audio signal to binary data. - - Args: - audio: Audio signal - - Returns: - Tuple of (demodulated data, confidence score) - """ - # Find preamble - preamble_start = self._find_preamble(audio) - if preamble_start < 0: - return b'', 0.0 - - # Skip preamble - data_start = preamble_start + int(self.preamble_duration * self.sample_rate) - - # Demodulate symbols - symbols = [] - confidence_scores = [] - - pos = data_start - while pos + self.samples_per_symbol <= len(audio): - symbol_audio = audio[pos:pos + self.samples_per_symbol] - symbol, confidence = self._demodulate_symbol(symbol_audio) - symbols.append(symbol) - confidence_scores.append(confidence) - pos += self.samples_per_symbol - - # Convert symbols to bytes - data = bytearray() - for i in range(0, len(symbols), 4): - if i + 3 < len(symbols): - byte = (symbols[i] << 6) | (symbols[i+1] << 4) | (symbols[i+2] << 2) | symbols[i+3] - data.append(byte) - - if HAS_NUMPY and confidence_scores: - avg_confidence = np.mean(confidence_scores) - else: - avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0 - return bytes(data), avg_confidence - - def _find_preamble(self, audio) -> int: - """Find preamble in audio signal.""" - # Simple energy-based detection - window_size = int(0.01 * self.sample_rate) # 10ms window - - if HAS_NUMPY: - for i in range(0, len(audio) - window_size, window_size // 2): - window = audio[i:i + window_size] - - # Check for preamble frequency - fft = np.fft.fft(window) - freqs = np.fft.fftfreq(len(window), 1/self.sample_rate) - - # Find peak near preamble frequency - idx = np.argmax(np.abs(fft[:len(fft)//2])) - peak_freq = abs(freqs[idx]) - - if abs(peak_freq - self.preamble_freq) < 50: # 50 Hz tolerance - return i - else: - # Simple zero-crossing based detection without FFT - for i in range(0, len(audio) - window_size, window_size // 2): - window = list(audio[i:i + window_size]) - - # Count zero crossings - zero_crossings = 0 - for j in range(1, len(window)): - if (window[j-1] >= 0) != (window[j] >= 0): - zero_crossings += 1 - - # Estimate frequency from zero crossings - estimated_freq = (zero_crossings * self.sample_rate) / (2 * len(window)) - - if abs(estimated_freq - self.preamble_freq) < 100: # 100 Hz tolerance - return i - - return -1 - - def _demodulate_symbol(self, audio) -> Tuple[int, float]: - """Demodulate a single FSK symbol.""" - if HAS_NUMPY: - # FFT-based demodulation - fft = np.fft.fft(audio) - freqs = np.fft.fftfreq(len(audio), 1/self.sample_rate) - magnitude = np.abs(fft[:len(fft)//2]) - - # Find energy at each FSK frequency - energies = [] - for freq in self.frequencies: - idx = np.argmin(np.abs(freqs[:len(freqs)//2] - freq)) - energy = magnitude[idx] - energies.append(energy) - - # Select symbol with highest energy - symbol = np.argmax(energies) - else: - # Goertzel algorithm for specific frequency detection - audio_list = list(audio) if hasattr(audio, '__iter__') else audio - energies = [] - - for freq in self.frequencies: - # Goertzel algorithm - omega = 2 * math.pi * freq / self.sample_rate - coeff = 2 * math.cos(omega) - - s_prev = 0 - s_prev2 = 0 - - for sample in audio_list: - s = sample + coeff * s_prev - s_prev2 - s_prev2 = s_prev - s_prev = s - - # Calculate magnitude - power = s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2 - energies.append(math.sqrt(abs(power))) - - # Select symbol with highest energy - symbol = energies.index(max(energies)) - - # Confidence is ratio of strongest to second strongest - sorted_energies = sorted(energies, reverse=True) - confidence = sorted_energies[0] / (sorted_energies[1] + 1e-6) - - return symbol, min(confidence, 10.0) / 10.0 - - def _apply_envelope(self, audio): - """Apply smoothing envelope to reduce clicks.""" - # Simple raised cosine envelope - ramp_samples = int(0.002 * self.sample_rate) # 2ms ramps - - if len(audio) > 2 * ramp_samples: - if HAS_NUMPY: - # Fade in - t = np.linspace(0, np.pi/2, ramp_samples) - audio[:ramp_samples] *= np.sin(t) ** 2 - - # Fade out - audio[-ramp_samples:] *= np.sin(t[::-1]) ** 2 - else: - # Manual fade in - for i in range(ramp_samples): - t = (i / ramp_samples) * (math.pi / 2) - factor = math.sin(t) ** 2 - audio[i] *= factor - - # Manual fade out - for i in range(ramp_samples): - t = ((ramp_samples - 1 - i) / ramp_samples) * (math.pi / 2) - factor = math.sin(t) ** 2 - audio[-(i+1)] *= factor - - return audio - - -class VoiceProtocol: - """ - Integrates voice codec and modem with the Icing protocol - for encrypted voice transmission over GSM. - """ - - def __init__(self, protocol_instance): - """ - Initialize voice protocol handler. - - Args: - protocol_instance: IcingProtocol instance - """ - self.protocol = protocol_instance - self.codec = Codec2Wrapper(Codec2Mode.MODE_1200) - self.modem = FSKModem(sample_rate=8000, baud_rate=600) - - # Voice crypto state - self.voice_iv_counter = 0 - self.voice_sequence = 0 - - # Buffers - if HAS_NUMPY: - self.audio_buffer = np.array([], dtype=np.int16) - else: - self.audio_buffer = array.array('h') # 16-bit signed integers - self.frame_buffer = [] - - print(f"{GREEN}[VOICE]{RESET} Voice protocol initialized") - - def process_voice_input(self, audio_samples): - """ - Process voice input: compress, encrypt, and modulate. - - Args: - audio_samples: PCM audio samples (8kHz, 16-bit) - - Returns: - Modulated audio signal ready for transmission (numpy array or array.array) - """ - # Add to buffer - if HAS_NUMPY: - self.audio_buffer = np.concatenate([self.audio_buffer, audio_samples]) - else: - self.audio_buffer.extend(audio_samples) - - # Process complete frames - modulated_audio = [] - - while len(self.audio_buffer) >= self.codec.frame_samples: - # Extract frame - if HAS_NUMPY: - frame_audio = self.audio_buffer[:self.codec.frame_samples] - self.audio_buffer = self.audio_buffer[self.codec.frame_samples:] - else: - frame_audio = array.array('h', self.audio_buffer[:self.codec.frame_samples]) - del self.audio_buffer[:self.codec.frame_samples] - - # Compress with Codec2 - compressed_frame = self.codec.encode(frame_audio) - if not compressed_frame: - continue - - # Encrypt frame - encrypted = self._encrypt_voice_frame(compressed_frame) - - # Add FEC - protected = self._add_fec(encrypted) - - # Modulate to audio - audio_signal = self.modem.modulate(protected, add_preamble=True) - modulated_audio.append(audio_signal) - - if modulated_audio: - if HAS_NUMPY: - return np.concatenate(modulated_audio) - else: - # Concatenate array.array objects - result = array.array('f') - for audio in modulated_audio: - result.extend(audio) - return result - return None - - def process_voice_output(self, modulated_audio): - """ - Process received audio: demodulate, decrypt, and decompress. - - Args: - modulated_audio: Received FSK-modulated audio - - Returns: - Decoded PCM audio samples (numpy array or array.array) - """ - # Demodulate - data, confidence = self.modem.demodulate(modulated_audio) - - if confidence < 0.5: - print(f"{YELLOW}[VOICE]{RESET} Low demodulation confidence: {confidence:.2f}") - return None - - # Remove FEC - frame_data = self._remove_fec(data) - if not frame_data: - return None - - # Decrypt - compressed_frame = self._decrypt_voice_frame(frame_data) - if not compressed_frame: - return None - - # Decompress - audio_samples = self.codec.decode(compressed_frame) - - return audio_samples - - def _encrypt_voice_frame(self, frame: Codec2Frame) -> bytes: - """Encrypt a voice frame using ChaCha20-CTR.""" - if not self.protocol.hkdf_key: - raise ValueError("No encryption key available") - - # Prepare frame data - frame_data = struct.pack(' Optional[Codec2Frame]: - """Decrypt a voice frame.""" - if len(data) < 10: - return None - - # Extract sequence and IV hint - sequence, iv_hint = struct.unpack(' bytes: - """Add forward error correction.""" - # Simple repetition code (3x) for testing - # In production: use convolutional code or LDPC - fec_data = bytearray() - - for byte in data: - # Repeat each byte 3 times - fec_data.extend([byte, byte, byte]) - - return bytes(fec_data) - - def _remove_fec(self, data: bytes) -> Optional[bytes]: - """Remove FEC and correct errors.""" - if len(data) % 3 != 0: - return None - - corrected = bytearray() - - for i in range(0, len(data), 3): - # Majority voting - votes = [data[i], data[i+1], data[i+2]] - byte_value = max(set(votes), key=votes.count) - corrected.append(byte_value) - - return bytes(corrected) - - -# Example usage -if __name__ == "__main__": - # Test Codec2 wrapper - print(f"\n{BLUE}=== Testing Codec2 Wrapper ==={RESET}") - codec = Codec2Wrapper(Codec2Mode.MODE_1200) - - # Generate test audio - if HAS_NUMPY: - t = np.linspace(0, 0.04, 320) # 40ms at 8kHz - test_audio = (np.sin(2 * np.pi * 440 * t) * 16384).astype(np.int16) - else: - test_audio = array.array('h') - for i in range(320): - t = i * 0.04 / 320 - value = int(math.sin(2 * math.pi * 440 * t) * 16384) - test_audio.append(value) - - # Encode - frame = codec.encode(test_audio) - print(f"Encoded frame: {len(frame.bits)} bytes") - - # Decode - decoded = codec.decode(frame) - print(f"Decoded audio: {len(decoded)} samples") - - # Test FSK modem - print(f"\n{BLUE}=== Testing FSK Modem ==={RESET}") - modem = FSKModem() - - # Test data - test_data = b"Hello, secure voice!" - - # Modulate - modulated = modem.modulate(test_data) - print(f"Modulated: {len(modulated)} samples ({len(modulated)/8000:.2f}s)") - - # Demodulate - demodulated, confidence = modem.demodulate(modulated) - print(f"Demodulated: {demodulated}") - print(f"Confidence: {confidence:.2%}") - print(f"Match: {demodulated == test_data}") \ No newline at end of file diff --git a/protocol_prototype/Prototype/Protocol/encryption.py b/protocol_prototype/Prototype/Protocol/encryption.py deleted file mode 100644 index 87a6b32..0000000 --- a/protocol_prototype/Prototype/Protocol/encryption.py +++ /dev/null @@ -1,307 +0,0 @@ -import os -import struct -from typing import Optional, Tuple -from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305 - -class MessageHeader: - """ - Header of an encrypted message (18 bytes total): - - Clear Text Section (4 bytes): - - flag: 16 bits (0xBEEF by default) - - data_len: 16 bits (length of encrypted payload excluding tag) - - Associated Data (14 bytes): - - retry: 8 bits (retry counter) - - connection_status: 4 bits (e.g., CRC required) + 4 bits padding - - iv/messageID: 96 bits (12 bytes) - """ - def __init__(self, flag: int, data_len: int, retry: int, connection_status: int, iv: bytes): - if not (0 <= flag < 65536): - raise ValueError("Flag must fit in 16 bits (0..65535)") - if not (0 <= data_len < 65536): - raise ValueError("Data length must fit in 16 bits (0..65535)") - if not (0 <= retry < 256): - raise ValueError("Retry must fit in 8 bits (0..255)") - if not (0 <= connection_status < 16): - raise ValueError("Connection status must fit in 4 bits (0..15)") - if len(iv) != 12: - raise ValueError("IV must be 12 bytes (96 bits)") - - self.flag = flag # 16 bits - self.data_len = data_len # 16 bits - self.retry = retry # 8 bits - self.connection_status = connection_status # 4 bits - self.iv = iv # 96 bits (12 bytes) - - def pack(self) -> bytes: - """Pack header into 18 bytes.""" - # Pack flag and data_len (4 bytes) - header = struct.pack('>H H', self.flag, self.data_len) - - # Pack retry and connection_status (2 bytes) - # connection_status in high 4 bits of second byte, 4 bits padding as zero - ad_byte = (self.connection_status & 0x0F) << 4 - ad_packed = struct.pack('>B B', self.retry, ad_byte) - - # Append IV (12 bytes) - return header + ad_packed + self.iv - - def get_associated_data(self) -> bytes: - """Get the associated data for AEAD encryption (retry, conn_status, iv).""" - # Pack retry and connection_status - ad_byte = (self.connection_status & 0x0F) << 4 - ad_packed = struct.pack('>B B', self.retry, ad_byte) - - # Append IV - return ad_packed + self.iv - - @classmethod - def unpack(cls, data: bytes) -> 'MessageHeader': - """Unpack 18 bytes into a MessageHeader object.""" - if len(data) < 18: - raise ValueError(f"Header data too short: {len(data)} bytes, expected 18") - - flag, data_len = struct.unpack('>H H', data[:4]) - retry, ad_byte = struct.unpack('>B B', data[4:6]) - connection_status = (ad_byte >> 4) & 0x0F - iv = data[6:18] - - return cls(flag, data_len, retry, connection_status, iv) - -class EncryptedMessage: - """ - Encrypted message packet format: - - - Header (18 bytes): - * flag: 16 bits - * data_len: 16 bits - * retry: 8 bits - * connection_status: 4 bits (+ 4 bits padding) - * iv/messageID: 96 bits (12 bytes) - - - Payload: variable length encrypted data - - - Footer: - * Authentication tag: 128 bits (16 bytes) - * CRC32: 32 bits (4 bytes) - optional, based on connection_status - """ - def __init__(self, plaintext: bytes, key: bytes, flag: int = 0xBEEF, - retry: int = 0, connection_status: int = 0, iv: bytes = None, - cipher_type: int = 0): - self.plaintext = plaintext - self.key = key - self.flag = flag - self.retry = retry - self.connection_status = connection_status - self.iv = iv or generate_iv(initial=True) - self.cipher_type = cipher_type # 0 = AES-256-GCM, 1 = ChaCha20-Poly1305 - - # Will be set after encryption - self.ciphertext = None - self.tag = None - self.header = None - - def encrypt(self) -> bytes: - """Encrypt the plaintext and return the full encrypted message.""" - # Create header with correct data_len (which will be set after encryption) - self.header = MessageHeader( - flag=self.flag, - data_len=0, # Will be updated after encryption - retry=self.retry, - connection_status=self.connection_status, - iv=self.iv - ) - - # Get associated data for AEAD - aad = self.header.get_associated_data() - - # Encrypt using the appropriate cipher - if self.cipher_type == 0: # AES-256-GCM - cipher = AESGCM(self.key) - ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) - elif self.cipher_type == 1: # ChaCha20-Poly1305 - cipher = ChaCha20Poly1305(self.key) - ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) - else: - raise ValueError(f"Unsupported cipher type: {self.cipher_type}") - - # Extract ciphertext and tag - self.tag = ciphertext_with_tag[-16:] - self.ciphertext = ciphertext_with_tag[:-16] - - # Update header with actual data length - self.header.data_len = len(self.ciphertext) - - # Pack everything together - packed_header = self.header.pack() - - # Check if CRC is required (based on connection_status) - if self.connection_status & 0x01: # Lowest bit indicates CRC required - import zlib - # Compute CRC32 of header + ciphertext + tag - crc = zlib.crc32(packed_header + self.ciphertext + self.tag) & 0xffffffff - crc_bytes = struct.pack('>I', crc) - return packed_header + self.ciphertext + self.tag + crc_bytes - else: - return packed_header + self.ciphertext + self.tag - - @classmethod - def decrypt(cls, data: bytes, key: bytes, cipher_type: int = 0) -> Tuple[bytes, MessageHeader]: - """ - Decrypt an encrypted message and return the plaintext and header. - - Args: - data: The full encrypted message - key: The encryption key - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - Tuple of (plaintext, header) - """ - if len(data) < 18 + 16: # Header + minimum tag size - raise ValueError("Message too short") - - # Extract header - header_bytes = data[:18] - header = MessageHeader.unpack(header_bytes) - - # Get ciphertext and tag - data_len = header.data_len - ciphertext_start = 18 - ciphertext_end = ciphertext_start + data_len - - if ciphertext_end + 16 > len(data): - raise ValueError("Message length does not match header's data_len") - - ciphertext = data[ciphertext_start:ciphertext_end] - tag = data[ciphertext_end:ciphertext_end + 16] - - # Get associated data for AEAD - aad = header.get_associated_data() - - # Combine ciphertext and tag for decryption - ciphertext_with_tag = ciphertext + tag - - # Decrypt using the appropriate cipher - try: - if cipher_type == 0: # AES-256-GCM - cipher = AESGCM(key) - plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) - elif cipher_type == 1: # ChaCha20-Poly1305 - cipher = ChaCha20Poly1305(key) - plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) - else: - raise ValueError(f"Unsupported cipher type: {cipher_type}") - - return plaintext, header - except Exception as e: - raise ValueError(f"Decryption failed: {e}") - -def generate_iv(initial: bool = False, previous_iv: bytes = None) -> bytes: - """ - Generate a 96-bit IV (12 bytes). - - Args: - initial: If True, return a random IV - previous_iv: The previous IV to increment - - Returns: - A new IV - """ - if initial or previous_iv is None: - return os.urandom(12) # 96 bits - else: - # Increment the previous IV by 1 modulo 2^96 - iv_int = int.from_bytes(previous_iv, 'big') - iv_int = (iv_int + 1) % (1 << 96) - return iv_int.to_bytes(12, 'big') - -# Convenience functions to match original API -def encrypt_message(plaintext: bytes, key: bytes, flag: int = 0xBEEF, - retry: int = 0, connection_status: int = 0, - iv: bytes = None, cipher_type: int = 0) -> bytes: - """ - Encrypt a message using the specified parameters. - - Args: - plaintext: The data to encrypt - key: The encryption key (32 bytes for AES-256-GCM, 32 bytes for ChaCha20-Poly1305) - flag: 16-bit flag value (default: 0xBEEF) - retry: 8-bit retry counter - connection_status: 4-bit connection status - iv: Optional 96-bit IV (if None, a random one will be generated) - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - The full encrypted message - """ - message = EncryptedMessage( - plaintext=plaintext, - key=key, - flag=flag, - retry=retry, - connection_status=connection_status, - iv=iv, - cipher_type=cipher_type - ) - return message.encrypt() - -def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes: - """ - Decrypt a message. - - Args: - message: The full encrypted message - key: The encryption key - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - The decrypted plaintext - """ - plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type) - return plaintext - -# ChaCha20-CTR functions for voice streaming (without authentication) -def chacha20_encrypt(plaintext: bytes, key: bytes, nonce: bytes) -> bytes: - """ - Encrypt plaintext using ChaCha20 in CTR mode (no authentication). - - Args: - plaintext: Data to encrypt - key: 32-byte key - nonce: 16-byte nonce (for ChaCha20 in cryptography library) - - Returns: - Ciphertext - """ - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - from cryptography.hazmat.backends import default_backend - - if len(key) != 32: - raise ValueError("ChaCha20 key must be 32 bytes") - if len(nonce) != 16: - raise ValueError("ChaCha20 nonce must be 16 bytes") - - cipher = Cipher( - algorithms.ChaCha20(key, nonce), - mode=None, - backend=default_backend() - ) - encryptor = cipher.encryptor() - return encryptor.update(plaintext) + encryptor.finalize() - -def chacha20_decrypt(ciphertext: bytes, key: bytes, nonce: bytes) -> bytes: - """ - Decrypt ciphertext using ChaCha20 in CTR mode (no authentication). - - Args: - ciphertext: Data to decrypt - key: 32-byte key - nonce: 12-byte nonce - - Returns: - Plaintext - """ - # ChaCha20 is symmetrical - encryption and decryption are the same - return chacha20_encrypt(ciphertext, key, nonce) diff --git a/protocol_prototype/Prototype/Protocol/IcingProtocol.drawio b/protocol_prototype/Prototype/Protocol_Alpha_0/IcingProtocol.drawio similarity index 100% rename from protocol_prototype/Prototype/Protocol/IcingProtocol.drawio rename to protocol_prototype/Prototype/Protocol_Alpha_0/IcingProtocol.drawio diff --git a/protocol_prototype/Prototype/Protocol/VOICE_PROTOCOL_README.md b/protocol_prototype/Prototype/Protocol_Alpha_0/VOICE_PROTOCOL_README.md similarity index 100% rename from protocol_prototype/Prototype/Protocol/VOICE_PROTOCOL_README.md rename to protocol_prototype/Prototype/Protocol_Alpha_0/VOICE_PROTOCOL_README.md diff --git a/protocol_prototype/Prototype/Protocol/auto_mode.py b/protocol_prototype/Prototype/Protocol_Alpha_0/auto_mode.py similarity index 100% rename from protocol_prototype/Prototype/Protocol/auto_mode.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/auto_mode.py diff --git a/protocol_prototype/Prototype/Protocol/cli.py b/protocol_prototype/Prototype/Protocol_Alpha_0/cli.py similarity index 100% rename from protocol_prototype/Prototype/Protocol/cli.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/cli.py diff --git a/protocol_prototype/Prototype/Protocol/crypto_utils.py b/protocol_prototype/Prototype/Protocol_Alpha_0/crypto_utils.py similarity index 100% rename from protocol_prototype/Prototype/Protocol/crypto_utils.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/crypto_utils.py diff --git a/protocol_prototype/DryBox/encryption.py b/protocol_prototype/Prototype/Protocol_Alpha_0/encryption.py similarity index 100% rename from protocol_prototype/DryBox/encryption.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/encryption.py diff --git a/protocol_prototype/Prototype/Protocol/messages.py b/protocol_prototype/Prototype/Protocol_Alpha_0/messages.py similarity index 100% rename from protocol_prototype/Prototype/Protocol/messages.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/messages.py diff --git a/protocol_prototype/Prototype/Protocol/protocol.py b/protocol_prototype/Prototype/Protocol_Alpha_0/protocol.py similarity index 100% rename from protocol_prototype/Prototype/Protocol/protocol.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/protocol.py diff --git a/protocol_prototype/Prototype/Protocol/transmission.py b/protocol_prototype/Prototype/Protocol_Alpha_0/transmission.py similarity index 100% rename from protocol_prototype/Prototype/Protocol/transmission.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/transmission.py diff --git a/protocol_prototype/Prototype/Protocol/voice_codec.py b/protocol_prototype/Prototype/Protocol_Alpha_0/voice_codec.py similarity index 100% rename from protocol_prototype/Prototype/Protocol/voice_codec.py rename to protocol_prototype/Prototype/Protocol_Alpha_0/voice_codec.py diff --git a/protocol_prototype/debug_ui.py b/protocol_prototype/debug_ui.py deleted file mode 100644 index bbad088..0000000 --- a/protocol_prototype/debug_ui.py +++ /dev/null @@ -1,32 +0,0 @@ -#!/usr/bin/env python3 -"""Debug script to trace the UI behavior""" - -import sys -from pathlib import Path - -# Monkey patch the integrated_protocol to see what's being called -orig_file = Path(__file__).parent / "DryBox" / "integrated_protocol.py" -backup_file = Path(__file__).parent / "DryBox" / "integrated_protocol_backup.py" - -# Read the original file -with open(orig_file, 'r') as f: - content = f.read() - -# Add debug prints -debug_content = content.replace( - 'def initiate_key_exchange(self, cipher_type=1, is_initiator=True):', - '''def initiate_key_exchange(self, cipher_type=1, is_initiator=True): - import traceback - print(f"\\n[DEBUG] initiate_key_exchange called with is_initiator={is_initiator}") - print("[DEBUG] Call stack:") - for line in traceback.format_stack()[:-1]: - print(line.strip()) - print()''' -) - -# Write the debug version -with open(orig_file, 'w') as f: - f.write(debug_content) - -print("Debug patch applied. Run the UI now to see the trace.") -print("To restore: cp DryBox/integrated_protocol_backup.py DryBox/integrated_protocol.py") \ No newline at end of file