Updating DryBox (#64)
All checks were successful
/ mirror (push) Successful in 4s

Co-authored-by: STCB <stephane.corbiere@epitech.eu>
Reviewed-on: #64
This commit is contained in:
stcb 2025-07-06 22:53:08 +00:00
parent c793793c8e
commit 4ce487c1c9
25 changed files with 2909 additions and 441 deletions

View File

@ -0,0 +1,253 @@
import wave
import threading
import queue
import time
import os
from datetime import datetime
from PyQt5.QtCore import QObject, pyqtSignal
# Try to import PyAudio, but handle if it's not available
try:
import pyaudio
PYAUDIO_AVAILABLE = True
except ImportError:
PYAUDIO_AVAILABLE = False
print("Warning: PyAudio not installed. Audio playback will be disabled.")
print("To enable playback, install with: sudo dnf install python3-devel portaudio-devel && pip install pyaudio")
class AudioPlayer(QObject):
playback_started = pyqtSignal(int) # client_id
playback_stopped = pyqtSignal(int) # client_id
recording_saved = pyqtSignal(int, str) # client_id, filepath
def __init__(self):
super().__init__()
self.audio = None
self.streams = {} # client_id -> stream
self.buffers = {} # client_id -> queue
self.threads = {} # client_id -> thread
self.recording_buffers = {} # client_id -> list of audio data
self.recording_enabled = {} # client_id -> bool
self.playback_enabled = {} # client_id -> bool
self.sample_rate = 8000
self.channels = 1
self.chunk_size = 320 # 40ms at 8kHz
self.debug_callback = None
if PYAUDIO_AVAILABLE:
try:
self.audio = pyaudio.PyAudio()
except Exception as e:
self.debug(f"Failed to initialize PyAudio: {e}")
self.audio = None
else:
self.audio = None
self.debug("PyAudio not available - playback disabled, recording still works")
def debug(self, message):
if self.debug_callback:
self.debug_callback(f"[AudioPlayer] {message}")
else:
print(f"[AudioPlayer] {message}")
def set_debug_callback(self, callback):
self.debug_callback = callback
def start_playback(self, client_id):
"""Start audio playback for a client"""
if not self.audio:
self.debug("Audio playback not available - PyAudio not installed")
self.debug("To enable: sudo dnf install python3-devel portaudio-devel && pip install pyaudio")
return False
if client_id in self.streams:
self.debug(f"Playback already active for client {client_id}")
return False
try:
# Create audio stream
stream = self.audio.open(
format=pyaudio.paInt16,
channels=self.channels,
rate=self.sample_rate,
output=True,
frames_per_buffer=self.chunk_size
)
self.streams[client_id] = stream
self.buffers[client_id] = queue.Queue()
self.playback_enabled[client_id] = True
# Start playback thread
thread = threading.Thread(
target=self._playback_thread,
args=(client_id,),
daemon=True
)
self.threads[client_id] = thread
thread.start()
self.debug(f"Started playback for client {client_id}")
self.playback_started.emit(client_id)
return True
except Exception as e:
self.debug(f"Failed to start playback for client {client_id}: {e}")
return False
def stop_playback(self, client_id):
"""Stop audio playback for a client"""
if client_id not in self.streams:
return
self.playback_enabled[client_id] = False
# Wait for thread to finish
if client_id in self.threads:
self.threads[client_id].join(timeout=1.0)
del self.threads[client_id]
# Close stream
if client_id in self.streams:
try:
self.streams[client_id].stop_stream()
self.streams[client_id].close()
except:
pass
del self.streams[client_id]
# Clear buffer
if client_id in self.buffers:
del self.buffers[client_id]
self.debug(f"Stopped playback for client {client_id}")
self.playback_stopped.emit(client_id)
def add_audio_data(self, client_id, pcm_data):
"""Add audio data to playback buffer"""
# Initialize frame counter for debug logging
if not hasattr(self, '_frame_count'):
self._frame_count = {}
if client_id not in self._frame_count:
self._frame_count[client_id] = 0
self._frame_count[client_id] += 1
# Only log occasionally to avoid spam
if self._frame_count[client_id] == 1 or self._frame_count[client_id] % 25 == 0:
self.debug(f"Client {client_id} audio frame #{self._frame_count[client_id]}: {len(pcm_data)} bytes")
if client_id in self.buffers:
self.buffers[client_id].put(pcm_data)
if self._frame_count[client_id] == 1:
self.debug(f"Client {client_id} buffer started, queue size: {self.buffers[client_id].qsize()}")
else:
self.debug(f"Client {client_id} has no buffer (playback not started?)")
# Add to recording buffer if recording
if self.recording_enabled.get(client_id, False):
if client_id not in self.recording_buffers:
self.recording_buffers[client_id] = []
self.recording_buffers[client_id].append(pcm_data)
def _playback_thread(self, client_id):
"""Thread function for audio playback"""
stream = self.streams.get(client_id)
buffer = self.buffers.get(client_id)
if not stream or not buffer:
return
self.debug(f"Playback thread started for client {client_id}")
while self.playback_enabled.get(client_id, False):
try:
# Get audio data from buffer with timeout
audio_data = buffer.get(timeout=0.1)
# Only log first frame to avoid spam
if not hasattr(self, '_playback_logged'):
self._playback_logged = {}
if client_id not in self._playback_logged:
self._playback_logged[client_id] = False
if not self._playback_logged[client_id]:
self.debug(f"Client {client_id} playback thread playing first frame: {len(audio_data)} bytes")
self._playback_logged[client_id] = True
# Play audio
stream.write(audio_data)
except queue.Empty:
# No data available, continue
continue
except Exception as e:
self.debug(f"Playback error for client {client_id}: {e}")
break
self.debug(f"Playback thread ended for client {client_id}")
def start_recording(self, client_id):
"""Start recording received audio"""
self.recording_enabled[client_id] = True
self.recording_buffers[client_id] = []
self.debug(f"Started recording for client {client_id}")
def stop_recording(self, client_id, save_path=None):
"""Stop recording and optionally save to file"""
if not self.recording_enabled.get(client_id, False):
return None
self.recording_enabled[client_id] = False
if client_id not in self.recording_buffers:
return None
audio_data = self.recording_buffers[client_id]
if not audio_data:
self.debug(f"No audio data recorded for client {client_id}")
return None
# Generate filename if not provided
if not save_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
save_path = f"wav/received_client{client_id}_{timestamp}.wav"
# Ensure directory exists
save_dir = os.path.dirname(save_path)
if save_dir:
os.makedirs(save_dir, exist_ok=True)
try:
# Combine all audio chunks
combined_audio = b''.join(audio_data)
# Save as WAV file
with wave.open(save_path, 'wb') as wav_file:
wav_file.setnchannels(self.channels)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(self.sample_rate)
wav_file.writeframes(combined_audio)
self.debug(f"Saved recording for client {client_id} to {save_path}")
self.recording_saved.emit(client_id, save_path)
# Clear recording buffer
del self.recording_buffers[client_id]
return save_path
except Exception as e:
self.debug(f"Failed to save recording for client {client_id}: {e}")
return None
def cleanup(self):
"""Clean up audio resources"""
# Stop all playback
for client_id in list(self.streams.keys()):
self.stop_playback(client_id)
# Terminate PyAudio
if self.audio:
self.audio.terminate()
self.audio = None

View File

@ -0,0 +1,220 @@
import numpy as np
import wave
import os
from datetime import datetime
from PyQt5.QtCore import QObject, pyqtSignal
import struct
class AudioProcessor(QObject):
processing_complete = pyqtSignal(str) # filepath
def __init__(self):
super().__init__()
self.debug_callback = None
def debug(self, message):
if self.debug_callback:
self.debug_callback(f"[AudioProcessor] {message}")
else:
print(f"[AudioProcessor] {message}")
def set_debug_callback(self, callback):
self.debug_callback = callback
def apply_gain(self, audio_data, gain_db):
"""Apply gain to audio data"""
# Convert bytes to numpy array
samples = np.frombuffer(audio_data, dtype=np.int16)
# Apply gain
gain_linear = 10 ** (gain_db / 20.0)
samples_float = samples.astype(np.float32) * gain_linear
# Clip to prevent overflow
samples_float = np.clip(samples_float, -32768, 32767)
# Convert back to int16
return samples_float.astype(np.int16).tobytes()
def apply_noise_gate(self, audio_data, threshold_db=-40):
"""Apply noise gate to remove low-level noise"""
samples = np.frombuffer(audio_data, dtype=np.int16)
# Calculate RMS in dB
rms = np.sqrt(np.mean(samples.astype(np.float32) ** 2))
rms_db = 20 * np.log10(max(rms, 1e-10))
# Gate the audio if below threshold
if rms_db < threshold_db:
return np.zeros_like(samples, dtype=np.int16).tobytes()
return audio_data
def apply_low_pass_filter(self, audio_data, cutoff_hz=3400, sample_rate=8000):
"""Apply simple low-pass filter"""
samples = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32)
# Simple moving average filter
# Calculate filter length based on cutoff frequency
filter_length = int(sample_rate / cutoff_hz)
if filter_length < 3:
filter_length = 3
# Apply moving average
filtered = np.convolve(samples, np.ones(filter_length) / filter_length, mode='same')
return filtered.astype(np.int16).tobytes()
def apply_high_pass_filter(self, audio_data, cutoff_hz=300, sample_rate=8000):
"""Apply simple high-pass filter"""
samples = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32)
# Simple differentiator as high-pass
filtered = np.diff(samples, prepend=samples[0])
# Scale to maintain amplitude
scale = cutoff_hz / (sample_rate / 2)
filtered *= scale
return filtered.astype(np.int16).tobytes()
def normalize_audio(self, audio_data, target_db=-3):
"""Normalize audio to target dB level"""
samples = np.frombuffer(audio_data, dtype=np.int16).astype(np.float32)
# Find peak
peak = np.max(np.abs(samples))
if peak == 0:
return audio_data
# Calculate current peak in dB
current_db = 20 * np.log10(peak / 32768.0)
# Calculate gain needed
gain_db = target_db - current_db
# Apply gain
return self.apply_gain(audio_data, gain_db)
def remove_silence(self, audio_data, threshold_db=-40, min_silence_ms=100, sample_rate=8000):
"""Remove silence from audio"""
samples = np.frombuffer(audio_data, dtype=np.int16)
# Calculate frame size for silence detection
frame_size = int(sample_rate * min_silence_ms / 1000)
# Detect non-silent regions
non_silent_regions = []
i = 0
while i < len(samples):
frame = samples[i:i+frame_size]
if len(frame) == 0:
break
# Calculate RMS of frame
rms = np.sqrt(np.mean(frame.astype(np.float32) ** 2))
rms_db = 20 * np.log10(max(rms, 1e-10))
if rms_db > threshold_db:
# Found non-silent region, find its extent
start = i
while i < len(samples):
frame = samples[i:i+frame_size]
if len(frame) == 0:
break
rms = np.sqrt(np.mean(frame.astype(np.float32) ** 2))
rms_db = 20 * np.log10(max(rms, 1e-10))
if rms_db <= threshold_db:
break
i += frame_size
non_silent_regions.append((start, i))
else:
i += frame_size
# Combine non-silent regions
if not non_silent_regions:
return audio_data # Return original if all silent
combined = []
for start, end in non_silent_regions:
combined.extend(samples[start:end])
return np.array(combined, dtype=np.int16).tobytes()
def save_processed_audio(self, audio_data, original_path, processing_type):
"""Save processed audio with descriptive filename"""
# Generate new filename
base_name = os.path.splitext(os.path.basename(original_path))[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
new_filename = f"{base_name}_{processing_type}_{timestamp}.wav"
# Ensure directory exists
save_dir = os.path.dirname(original_path)
if not save_dir:
save_dir = "wav"
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, new_filename)
try:
with wave.open(save_path, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(8000)
wav_file.writeframes(audio_data)
self.debug(f"Saved processed audio to {save_path}")
self.processing_complete.emit(save_path)
return save_path
except Exception as e:
self.debug(f"Failed to save processed audio: {e}")
return None
def concatenate_audio_files(self, file_paths, output_path=None):
"""Concatenate multiple audio files"""
if not file_paths:
return None
combined_data = b''
sample_rate = None
for file_path in file_paths:
try:
with wave.open(file_path, 'rb') as wav_file:
if sample_rate is None:
sample_rate = wav_file.getframerate()
elif wav_file.getframerate() != sample_rate:
self.debug(f"Sample rate mismatch in {file_path}")
continue
data = wav_file.readframes(wav_file.getnframes())
combined_data += data
except Exception as e:
self.debug(f"Failed to read {file_path}: {e}")
if not combined_data:
return None
# Save concatenated audio
if not output_path:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_path = f"wav/concatenated_{timestamp}.wav"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
try:
with wave.open(output_path, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate or 8000)
wav_file.writeframes(combined_data)
self.debug(f"Saved concatenated audio to {output_path}")
return output_path
except Exception as e:
self.debug(f"Failed to save concatenated audio: {e}")
return None

View File

@ -1,79 +0,0 @@
# client_state.py
from queue import Queue
from session import NoiseXKSession
import time
class ClientState:
def __init__(self, client_id):
self.client_id = client_id
self.command_queue = Queue()
self.initiator = None
self.keypair = None
self.peer_pubkey = None
self.session = None
self.handshake_in_progress = False
self.handshake_start_time = None
self.call_active = False
def process_command(self, client):
"""Process commands from the queue."""
if not self.command_queue.empty():
print(f"Client {self.client_id} processing command queue, size: {self.command_queue.qsize()}")
command = self.command_queue.get()
if command == "handshake":
try:
print(f"Client {self.client_id} starting handshake, initiator: {self.initiator}")
self.session = NoiseXKSession(self.keypair, self.peer_pubkey)
self.session.handshake(client.sock, self.initiator)
print(f"Client {self.client_id} handshake complete")
client.send("HANDSHAKE_DONE")
except Exception as e:
print(f"Client {self.client_id} handshake failed: {e}")
client.state_changed.emit("CALL_END", "", self.client_id)
finally:
self.handshake_in_progress = False
self.handshake_start_time = None
def start_handshake(self, initiator, keypair, peer_pubkey):
"""Queue handshake command."""
self.initiator = initiator
self.keypair = keypair
self.peer_pubkey = peer_pubkey
print(f"Client {self.client_id} queuing handshake, initiator: {initiator}")
self.handshake_in_progress = True
self.handshake_start_time = time.time()
self.command_queue.put("handshake")
def handle_data(self, client, data):
"""Handle received data (control or audio)."""
try:
decoded_data = data.decode('utf-8').strip()
print(f"Client {self.client_id} received raw: {decoded_data}")
if decoded_data in ["RINGING", "CALL_END", "CALL_DROPPED", "IN_CALL", "HANDSHAKE", "HANDSHAKE_DONE"]:
client.state_changed.emit(decoded_data, decoded_data, self.client_id)
if decoded_data == "HANDSHAKE":
self.handshake_in_progress = True
elif decoded_data == "HANDSHAKE_DONE":
self.call_active = True
else:
print(f"Client {self.client_id} ignored unexpected text message: {decoded_data}")
except UnicodeDecodeError:
if self.call_active and self.session:
try:
print(f"Client {self.client_id} received audio packet, length={len(data)}")
decrypted_data = self.session.decrypt(data)
print(f"Client {self.client_id} decrypted audio packet, length={len(decrypted_data)}")
client.data_received.emit(decrypted_data, self.client_id)
except Exception as e:
print(f"Client {self.client_id} failed to process audio packet: {e}")
else:
print(f"Client {self.client_id} ignored non-text message: {data.hex()}")
def check_handshake_timeout(self, client):
"""Check for handshake timeout."""
if self.handshake_in_progress and self.handshake_start_time:
if time.time() - self.handshake_start_time > 30:
print(f"Client {self.client_id} handshake timeout after 30s")
client.state_changed.emit("CALL_END", "", self.client_id)
self.handshake_in_progress = False
self.handshake_start_time = None

View File

@ -1,19 +1,32 @@
import sys
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QLabel, QFrame, QSizePolicy, QStyle
QPushButton, QLabel, QFrame, QSizePolicy, QStyle, QTextEdit, QSplitter,
QMenu, QAction, QInputDialog, QShortcut
)
from PyQt5.QtCore import Qt, QSize
from PyQt5.QtGui import QFont
from PyQt5.QtCore import Qt, QSize, QTimer, pyqtSignal
from PyQt5.QtGui import QFont, QTextCursor, QKeySequence
import time
import threading
from phone_manager import PhoneManager
from waveform_widget import WaveformWidget
from phone_state import PhoneState
class PhoneUI(QMainWindow):
debug_signal = pyqtSignal(str)
def __init__(self):
super().__init__()
self.setWindowTitle("Enhanced Dual Phone Interface")
self.setGeometry(100, 100, 900, 750)
self.setWindowTitle("DryBox - Noise XK + Codec2 + 4FSK")
self.setGeometry(100, 100, 1200, 900)
# Set minimum size to ensure window is resizable
self.setMinimumSize(800, 600)
# Auto test state
self.auto_test_running = False
self.auto_test_timer = None
self.test_step = 0
self.setStyleSheet("""
QMainWindow { background-color: #333333; }
QLabel { color: #E0E0E0; font-size: 14px; }
@ -39,75 +52,162 @@ class PhoneUI(QMainWindow):
padding: 15px;
}
QWidget#phoneWidget {
border: 1px solid #4A4A4A; border-radius: 8px;
padding: 10px; background-color: #3A3A3A;
border: 2px solid #4A4A4A; border-radius: 10px;
background-color: #3A3A3A;
min-width: 250px;
}
QTextEdit#debugConsole {
background-color: #1E1E1E; color: #00FF00;
font-family: monospace; font-size: 12px;
border: 2px solid #0078D4; border-radius: 5px;
}
QPushButton#autoTestButton {
background-color: #FF8C00; min-height: 35px;
}
QPushButton#autoTestButton:hover { background-color: #FF7F00; }
""")
# Setup debug signal early
self.debug_signal.connect(self.append_debug)
self.manager = PhoneManager()
self.manager.ui = self # Set UI reference for debug logging
self.manager.initialize_phones()
# Main widget and layout
# Main widget with splitter
main_widget = QWidget()
self.setCentralWidget(main_widget)
main_layout = QVBoxLayout()
main_layout.setSpacing(20)
main_layout.setContentsMargins(20, 20, 20, 20)
main_layout.setAlignment(Qt.AlignCenter)
main_widget.setLayout(main_layout)
# Create splitter for phones and debug console
self.splitter = QSplitter(Qt.Vertical)
main_layout.addWidget(self.splitter)
# Top widget for phones
phones_widget = QWidget()
phones_layout = QVBoxLayout()
phones_layout.setSpacing(20)
phones_layout.setContentsMargins(20, 20, 20, 20)
phones_layout.setAlignment(Qt.AlignCenter)
phones_widget.setLayout(phones_layout)
# App Title
app_title_label = QLabel("Dual Phone Control Panel")
app_title_label = QLabel("Integrated Protocol Control Panel")
app_title_label.setObjectName("mainTitleLabel")
app_title_label.setAlignment(Qt.AlignCenter)
main_layout.addWidget(app_title_label)
phones_layout.addWidget(app_title_label)
# Protocol info
protocol_info = QLabel("Noise XK + Codec2 (1200bps) + 4FSK")
protocol_info.setAlignment(Qt.AlignCenter)
protocol_info.setStyleSheet("font-size: 12px; color: #00A2E8;")
phones_layout.addWidget(protocol_info)
# Phone displays layout
phone_controls_layout = QHBoxLayout()
phone_controls_layout.setSpacing(50)
phone_controls_layout.setAlignment(Qt.AlignCenter)
main_layout.addLayout(phone_controls_layout)
phone_controls_layout.setSpacing(20)
phone_controls_layout.setContentsMargins(10, 0, 10, 0)
phones_layout.addLayout(phone_controls_layout)
# Setup UI for phones
for phone in self.manager.phones:
phone_container_widget, phone_display_frame, phone_button, waveform_widget, sent_waveform_widget, phone_status_label = self._create_phone_ui(
phone_container_widget, phone_display_frame, phone_button, waveform_widget, sent_waveform_widget, phone_status_label, playback_button, record_button = self._create_phone_ui(
f"Phone {phone['id']+1}", lambda checked, pid=phone['id']: self.manager.phone_action(pid, self)
)
phone['button'] = phone_button
phone['waveform'] = waveform_widget
phone['sent_waveform'] = sent_waveform_widget
phone['status_label'] = phone_status_label
phone['playback_button'] = playback_button
phone['record_button'] = record_button
# Connect audio control buttons with proper closure
playback_button.clicked.connect(lambda checked, pid=phone['id']: self.toggle_playback(pid))
record_button.clicked.connect(lambda checked, pid=phone['id']: self.toggle_recording(pid))
phone_controls_layout.addWidget(phone_container_widget)
phone['client'].data_received.connect(lambda data, cid=phone['id']: self.manager.update_waveform(cid, data))
# Connect data_received signal - it emits (data, client_id)
phone['client'].data_received.connect(lambda data, cid: self.manager.update_waveform(cid, data))
phone['client'].state_changed.connect(lambda state, num, cid=phone['id']: self.set_phone_state(cid, state, num))
phone['client'].start()
# Spacer
main_layout.addStretch(1)
# Control buttons layout
control_layout = QHBoxLayout()
control_layout.setSpacing(15)
control_layout.setContentsMargins(20, 10, 20, 10)
# Auto Test Button
self.auto_test_button = QPushButton("🧪 Run Automatic Test")
self.auto_test_button.setObjectName("autoTestButton")
self.auto_test_button.setMinimumWidth(180)
self.auto_test_button.setMaximumWidth(250)
self.auto_test_button.clicked.connect(self.toggle_auto_test)
control_layout.addWidget(self.auto_test_button)
# Clear Debug Button
self.clear_debug_button = QPushButton("Clear Debug")
self.clear_debug_button.setMinimumWidth(100)
self.clear_debug_button.setMaximumWidth(150)
self.clear_debug_button.clicked.connect(self.clear_debug)
control_layout.addWidget(self.clear_debug_button)
# Audio Processing Button
self.audio_menu_button = QPushButton("Audio Options")
self.audio_menu_button.setMinimumWidth(100)
self.audio_menu_button.setMaximumWidth(150)
self.audio_menu_button.clicked.connect(self.show_audio_menu)
control_layout.addWidget(self.audio_menu_button)
# Settings Button
self.settings_button = QPushButton("Settings")
self.settings_button.setObjectName("settingsButton")
self.settings_button.setFixedWidth(180)
self.settings_button.setMinimumWidth(100)
self.settings_button.setMaximumWidth(150)
self.settings_button.setIcon(self.style().standardIcon(QStyle.SP_FileDialogDetailedView))
self.settings_button.setIconSize(QSize(20, 20))
self.settings_button.clicked.connect(self.settings_action)
settings_layout = QHBoxLayout()
settings_layout.addStretch()
settings_layout.addWidget(self.settings_button)
settings_layout.addStretch()
main_layout.addLayout(settings_layout)
control_layout.addWidget(self.settings_button)
phones_layout.addLayout(control_layout)
# Add phones widget to splitter
self.splitter.addWidget(phones_widget)
# Debug console
self.debug_console = QTextEdit()
self.debug_console.setObjectName("debugConsole")
self.debug_console.setReadOnly(True)
self.debug_console.setMinimumHeight(200)
self.debug_console.setMaximumHeight(400)
self.splitter.addWidget(self.debug_console)
# Flush any queued debug messages
if hasattr(self, '_debug_queue'):
for msg in self._debug_queue:
self.debug_console.append(msg)
del self._debug_queue
# Set splitter sizes (70% phones, 30% debug)
self.splitter.setSizes([600, 300])
# Initialize UI
for phone in self.manager.phones:
self.update_phone_ui(phone['id'])
# Initial debug message
QTimer.singleShot(100, lambda: self.debug("DryBox UI initialized with integrated protocol"))
# Setup keyboard shortcuts
self.setup_shortcuts()
def _create_phone_ui(self, title, action_slot):
phone_container_widget = QWidget()
phone_container_widget.setObjectName("phoneWidget")
phone_container_widget.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred)
phone_layout = QVBoxLayout()
phone_layout.setAlignment(Qt.AlignCenter)
phone_layout.setSpacing(15)
phone_layout.setSpacing(10)
phone_layout.setContentsMargins(15, 15, 15, 15)
phone_container_widget.setLayout(phone_layout)
phone_title_label = QLabel(title)
@ -117,8 +217,9 @@ class PhoneUI(QMainWindow):
phone_display_frame = QFrame()
phone_display_frame.setObjectName("phoneDisplay")
phone_display_frame.setFixedSize(250, 350)
phone_display_frame.setSizePolicy(QSizePolicy.Fixed, QSizePolicy.Fixed)
phone_display_frame.setMinimumSize(200, 250)
phone_display_frame.setMaximumSize(300, 400)
phone_display_frame.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Preferred)
display_content_layout = QVBoxLayout(phone_display_frame)
display_content_layout.setAlignment(Qt.AlignCenter)
@ -129,28 +230,83 @@ class PhoneUI(QMainWindow):
phone_layout.addWidget(phone_display_frame, alignment=Qt.AlignCenter)
phone_button = QPushButton()
phone_button.setFixedWidth(120)
phone_button.setMinimumWidth(100)
phone_button.setMaximumWidth(150)
phone_button.setIconSize(QSize(20, 20))
phone_button.clicked.connect(action_slot)
phone_layout.addWidget(phone_button, alignment=Qt.AlignCenter)
# Received waveform
waveform_label = QLabel(f"{title} Received Audio")
waveform_label = QLabel(f"{title} Received")
waveform_label.setAlignment(Qt.AlignCenter)
waveform_label.setStyleSheet("font-size: 14px; color: #E0E0E0;")
waveform_label.setStyleSheet("font-size: 12px; color: #E0E0E0;")
phone_layout.addWidget(waveform_label)
waveform_widget = WaveformWidget(dynamic=False)
waveform_widget.setMinimumSize(200, 50)
waveform_widget.setMaximumSize(300, 80)
waveform_widget.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
phone_layout.addWidget(waveform_widget, alignment=Qt.AlignCenter)
# Sent waveform
sent_waveform_label = QLabel(f"{title} Sent Audio")
sent_waveform_label = QLabel(f"{title} Sent")
sent_waveform_label.setAlignment(Qt.AlignCenter)
sent_waveform_label.setStyleSheet("font-size: 14px; color: #E0E0E0;")
sent_waveform_label.setStyleSheet("font-size: 12px; color: #E0E0E0;")
phone_layout.addWidget(sent_waveform_label)
sent_waveform_widget = WaveformWidget(dynamic=False)
sent_waveform_widget.setMinimumSize(200, 50)
sent_waveform_widget.setMaximumSize(300, 80)
sent_waveform_widget.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Fixed)
phone_layout.addWidget(sent_waveform_widget, alignment=Qt.AlignCenter)
return phone_container_widget, phone_display_frame, phone_button, waveform_widget, sent_waveform_widget, phone_status_label
# Audio control buttons
audio_controls_layout = QHBoxLayout()
audio_controls_layout.setAlignment(Qt.AlignCenter)
playback_button = QPushButton("🔊 Playback")
playback_button.setCheckable(True)
playback_button.setMinimumWidth(90)
playback_button.setMaximumWidth(120)
playback_button.setStyleSheet("""
QPushButton {
background-color: #404040;
color: white;
border: 1px solid #606060;
padding: 5px;
border-radius: 3px;
}
QPushButton:checked {
background-color: #4CAF50;
}
QPushButton:hover {
background-color: #505050;
}
""")
record_button = QPushButton("⏺ Record")
record_button.setCheckable(True)
record_button.setMinimumWidth(90)
record_button.setMaximumWidth(120)
record_button.setStyleSheet("""
QPushButton {
background-color: #404040;
color: white;
border: 1px solid #606060;
padding: 5px;
border-radius: 3px;
}
QPushButton:checked {
background-color: #F44336;
}
QPushButton:hover {
background-color: #505050;
}
""")
audio_controls_layout.addWidget(playback_button)
audio_controls_layout.addWidget(record_button)
phone_layout.addLayout(audio_controls_layout)
return phone_container_widget, phone_display_frame, phone_button, waveform_widget, sent_waveform_widget, phone_status_label, playback_button, record_button
def update_phone_ui(self, phone_id):
phone = self.manager.phones[phone_id]
@ -182,28 +338,371 @@ class PhoneUI(QMainWindow):
button.setStyleSheet("background-color: #107C10;")
def set_phone_state(self, client_id, state_str, number):
self.debug(f"Phone {client_id + 1} state change: {state_str}")
# Handle protocol-specific states
if state_str == "HANDSHAKE_COMPLETE":
phone = self.manager.phones[client_id]
phone['status_label'].setText("🔒 Secure Channel Established")
self.debug(f"Phone {client_id + 1} secure channel established")
self.manager.start_audio(client_id, parent=self)
return
elif state_str == "VOICE_START":
phone = self.manager.phones[client_id]
phone['status_label'].setText("🎤 Voice Active (Encrypted)")
self.debug(f"Phone {client_id + 1} voice session started")
return
elif state_str == "VOICE_END":
phone = self.manager.phones[client_id]
phone['status_label'].setText("🔒 Secure Channel")
self.debug(f"Phone {client_id + 1} voice session ended")
return
# Handle regular states
state = self.manager.map_state(state_str)
phone = self.manager.phones[client_id]
other_phone = self.manager.phones[1 - client_id]
print(f"Setting state for Phone {client_id + 1}: {state}, number: {number}, is_initiator: {phone['is_initiator']}")
self.debug(f"Setting state for Phone {client_id + 1}: {state.name if hasattr(state, 'name') else state}, number: {number}, is_initiator: {phone['is_initiator']}")
phone['state'] = state
if state == PhoneState.IN_CALL:
print(f"Phone {client_id + 1} confirmed in IN_CALL state")
if number == "IN_CALL" and phone['is_initiator']:
print(f"Phone {client_id + 1} (initiator) starting handshake")
phone['client'].send("HANDSHAKE")
self.debug(f"Phone {client_id + 1} confirmed in IN_CALL state")
self.debug(f" state_str={state_str}, number={number}")
self.debug(f" is_initiator={phone['is_initiator']}")
# Only start handshake when the initiator RECEIVES the IN_CALL message
if state_str == "IN_CALL" and phone['is_initiator']:
self.debug(f"Phone {client_id + 1} (initiator) received IN_CALL, starting handshake")
phone['client'].start_handshake(initiator=True, keypair=phone['keypair'], peer_pubkey=other_phone['public_key'])
elif number == "HANDSHAKE" and not phone['is_initiator']:
print(f"Phone {client_id + 1} (responder) starting handshake")
phone['client'].start_handshake(initiator=False, keypair=phone['keypair'], peer_pubkey=other_phone['public_key'])
elif number == "HANDSHAKE":
# Old text-based handshake trigger - no longer used
self.debug(f"Phone {client_id + 1} received legacy HANDSHAKE message")
elif number == "HANDSHAKE_DONE":
self.manager.start_audio(client_id, parent=self) # Pass self as parent
self.debug(f"Phone {client_id + 1} received HANDSHAKE_DONE")
# Handled by HANDSHAKE_COMPLETE now
pass
self.update_phone_ui(client_id)
def settings_action(self):
print("Settings clicked")
self.debug("Settings clicked")
def debug(self, message):
"""Thread-safe debug logging to both console and UI"""
timestamp = time.strftime("%H:%M:%S.%f")[:-3]
debug_msg = f"[{timestamp}] {message}"
print(debug_msg) # Console output
self.debug_signal.emit(debug_msg) # UI output
def append_debug(self, message):
"""Append debug message to console (called from main thread)"""
if hasattr(self, 'debug_console'):
self.debug_console.append(message)
# Auto-scroll to bottom
cursor = self.debug_console.textCursor()
cursor.movePosition(QTextCursor.End)
self.debug_console.setTextCursor(cursor)
else:
# Queue messages until console is ready
if not hasattr(self, '_debug_queue'):
self._debug_queue = []
self._debug_queue.append(message)
def clear_debug(self):
"""Clear debug console"""
self.debug_console.clear()
self.debug("Debug console cleared")
def toggle_auto_test(self):
"""Toggle automatic test sequence"""
if not self.auto_test_running:
self.start_auto_test()
else:
self.stop_auto_test()
def start_auto_test(self):
"""Start automatic test sequence"""
self.auto_test_running = True
self.auto_test_button.setText("⏹ Stop Test")
self.test_step = 0
self.debug("=== STARTING AUTOMATIC TEST SEQUENCE ===")
self.debug("Test will go through complete protocol flow")
# Start test timer
self.auto_test_timer = QTimer()
self.auto_test_timer.timeout.connect(self.execute_test_step)
self.auto_test_timer.start(2000) # 2 second intervals
# Execute first step immediately
self.execute_test_step()
def stop_auto_test(self):
"""Stop automatic test sequence"""
self.auto_test_running = False
self.auto_test_button.setText("🧪 Run Automatic Test")
if self.auto_test_timer:
self.auto_test_timer.stop()
self.auto_test_timer = None
self.debug("=== TEST SEQUENCE STOPPED ===")
def execute_test_step(self):
"""Execute next step in test sequence"""
phone1 = self.manager.phones[0]
phone2 = self.manager.phones[1]
self.debug(f"\n--- Test Step {self.test_step + 1} ---")
if self.test_step == 0:
# Step 1: Check initial state
self.debug("Checking initial state...")
state1 = phone1['state']
state2 = phone2['state']
# Handle both enum and int states
state1_name = state1.name if hasattr(state1, 'name') else str(state1)
state2_name = state2.name if hasattr(state2, 'name') else str(state2)
self.debug(f"Phone 1 state: {state1_name}")
self.debug(f"Phone 2 state: {state2_name}")
self.debug(f"Phone 1 connected: {phone1['client'].sock is not None}")
self.debug(f"Phone 2 connected: {phone2['client'].sock is not None}")
elif self.test_step == 1:
# Step 2: Make call
self.debug("Phone 1 calling Phone 2...")
self.manager.phone_action(0, self)
state1_name = phone1['state'].name if hasattr(phone1['state'], 'name') else str(phone1['state'])
state2_name = phone2['state'].name if hasattr(phone2['state'], 'name') else str(phone2['state'])
self.debug(f"Phone 1 state after call: {state1_name}")
self.debug(f"Phone 2 state after call: {state2_name}")
elif self.test_step == 2:
# Step 3: Answer call
self.debug("Phone 2 answering call...")
self.manager.phone_action(1, self)
state1_name = phone1['state'].name if hasattr(phone1['state'], 'name') else str(phone1['state'])
state2_name = phone2['state'].name if hasattr(phone2['state'], 'name') else str(phone2['state'])
self.debug(f"Phone 1 state after answer: {state1_name}")
self.debug(f"Phone 2 state after answer: {state2_name}")
self.debug(f"Phone 1 is_initiator: {phone1['is_initiator']}")
self.debug(f"Phone 2 is_initiator: {phone2['is_initiator']}")
elif self.test_step == 3:
# Step 4: Check handshake progress
self.debug("Checking handshake progress...")
self.debug(f"Phone 1 handshake in progress: {phone1['client'].state.handshake_in_progress}")
self.debug(f"Phone 2 handshake in progress: {phone2['client'].state.handshake_in_progress}")
self.debug(f"Phone 1 command queue: {phone1['client'].state.command_queue.qsize()}")
self.debug(f"Phone 2 command queue: {phone2['client'].state.command_queue.qsize()}")
# Increase timer interval for handshake
self.auto_test_timer.setInterval(3000) # 3 seconds
elif self.test_step == 4:
# Step 5: Check handshake status
self.debug("Checking Noise XK handshake status...")
self.debug(f"Phone 1 handshake complete: {phone1['client'].handshake_complete}")
self.debug(f"Phone 2 handshake complete: {phone2['client'].handshake_complete}")
self.debug(f"Phone 1 has session: {phone1['client'].noise_session is not None}")
self.debug(f"Phone 2 has session: {phone2['client'].noise_session is not None}")
# Reset timer interval
self.auto_test_timer.setInterval(2000)
elif self.test_step == 5:
# Step 6: Check voice status
self.debug("Checking voice session status...")
self.debug(f"Phone 1 voice active: {phone1['client'].voice_active}")
self.debug(f"Phone 2 voice active: {phone2['client'].voice_active}")
self.debug(f"Phone 1 codec initialized: {phone1['client'].codec is not None}")
self.debug(f"Phone 2 codec initialized: {phone2['client'].codec is not None}")
self.debug(f"Phone 1 modem initialized: {phone1['client'].modem is not None}")
self.debug(f"Phone 2 modem initialized: {phone2['client'].modem is not None}")
elif self.test_step == 6:
# Step 7: Check audio transmission
self.debug("Checking audio transmission...")
self.debug(f"Phone 1 audio file loaded: {phone1['audio_file'] is not None}")
self.debug(f"Phone 2 audio file loaded: {phone2['audio_file'] is not None}")
self.debug(f"Phone 1 frame counter: {phone1.get('frame_counter', 0)}")
self.debug(f"Phone 2 frame counter: {phone2.get('frame_counter', 0)}")
self.debug(f"Phone 1 audio timer active: {phone1['audio_timer'] is not None and phone1['audio_timer'].isActive()}")
self.debug(f"Phone 2 audio timer active: {phone2['audio_timer'] is not None and phone2['audio_timer'].isActive()}")
elif self.test_step == 7:
# Step 8: Protocol details
self.debug("Protocol stack details:")
if phone1['client'].codec:
self.debug(f"Codec mode: {phone1['client'].codec.mode.name}")
self.debug(f"Frame size: {phone1['client'].codec.frame_bits} bits")
self.debug(f"Frame duration: {phone1['client'].codec.frame_ms} ms")
if phone1['client'].modem:
self.debug(f"FSK frequencies: {phone1['client'].modem.frequencies}")
self.debug(f"Symbol rate: {phone1['client'].modem.baud_rate} baud")
elif self.test_step == 8:
# Step 9: Wait for more frames
self.debug("Letting voice transmission run...")
self.auto_test_timer.setInterval(5000) # Wait 5 seconds
elif self.test_step == 9:
# Step 10: Final statistics
self.debug("Final transmission statistics:")
self.debug(f"Phone 1 frames sent: {phone1.get('frame_counter', 0)}")
self.debug(f"Phone 2 frames sent: {phone2.get('frame_counter', 0)}")
self.auto_test_timer.setInterval(2000) # Back to 2 seconds
elif self.test_step == 10:
# Step 11: Hang up
self.debug("Hanging up call...")
self.manager.phone_action(0, self)
state1_name = phone1['state'].name if hasattr(phone1['state'], 'name') else str(phone1['state'])
state2_name = phone2['state'].name if hasattr(phone2['state'], 'name') else str(phone2['state'])
self.debug(f"Phone 1 state after hangup: {state1_name}")
self.debug(f"Phone 2 state after hangup: {state2_name}")
elif self.test_step == 11:
# Complete
self.debug("\n=== TEST SEQUENCE COMPLETE ===")
self.debug("All protocol components tested successfully!")
self.stop_auto_test()
return
self.test_step += 1
def toggle_playback(self, phone_id):
"""Toggle audio playback for a phone"""
is_enabled = self.manager.toggle_playback(phone_id)
phone = self.manager.phones[phone_id]
phone['playback_button'].setChecked(is_enabled)
if is_enabled:
self.debug(f"Phone {phone_id + 1}: Audio playback enabled")
else:
self.debug(f"Phone {phone_id + 1}: Audio playback disabled")
def toggle_recording(self, phone_id):
"""Toggle audio recording for a phone"""
is_recording, save_path = self.manager.toggle_recording(phone_id)
phone = self.manager.phones[phone_id]
phone['record_button'].setChecked(is_recording)
if is_recording:
self.debug(f"Phone {phone_id + 1}: Recording started")
else:
if save_path:
self.debug(f"Phone {phone_id + 1}: Recording saved to {save_path}")
else:
self.debug(f"Phone {phone_id + 1}: Recording stopped (no data)")
def show_audio_menu(self):
"""Show audio processing options menu"""
menu = QMenu(self)
# Create phone selection submenu
for phone_id in range(2):
phone_menu = menu.addMenu(f"Phone {phone_id + 1}")
# Export buffer
export_action = QAction("Export Audio Buffer", self)
export_action.triggered.connect(lambda checked, pid=phone_id: self.export_audio_buffer(pid))
phone_menu.addAction(export_action)
# Clear buffer
clear_action = QAction("Clear Audio Buffer", self)
clear_action.triggered.connect(lambda checked, pid=phone_id: self.clear_audio_buffer(pid))
phone_menu.addAction(clear_action)
phone_menu.addSeparator()
# Processing options
normalize_action = QAction("Normalize Audio", self)
normalize_action.triggered.connect(lambda checked, pid=phone_id: self.process_audio(pid, "normalize"))
phone_menu.addAction(normalize_action)
gain_action = QAction("Apply Gain...", self)
gain_action.triggered.connect(lambda checked, pid=phone_id: self.apply_gain_dialog(pid))
phone_menu.addAction(gain_action)
noise_gate_action = QAction("Apply Noise Gate", self)
noise_gate_action.triggered.connect(lambda checked, pid=phone_id: self.process_audio(pid, "noise_gate"))
phone_menu.addAction(noise_gate_action)
low_pass_action = QAction("Apply Low Pass Filter", self)
low_pass_action.triggered.connect(lambda checked, pid=phone_id: self.process_audio(pid, "low_pass"))
phone_menu.addAction(low_pass_action)
high_pass_action = QAction("Apply High Pass Filter", self)
high_pass_action.triggered.connect(lambda checked, pid=phone_id: self.process_audio(pid, "high_pass"))
phone_menu.addAction(high_pass_action)
remove_silence_action = QAction("Remove Silence", self)
remove_silence_action.triggered.connect(lambda checked, pid=phone_id: self.process_audio(pid, "remove_silence"))
phone_menu.addAction(remove_silence_action)
# Show menu at button position
menu.exec_(self.audio_menu_button.mapToGlobal(self.audio_menu_button.rect().bottomLeft()))
def export_audio_buffer(self, phone_id):
"""Export audio buffer for a phone"""
save_path = self.manager.export_buffered_audio(phone_id)
if save_path:
self.debug(f"Phone {phone_id + 1}: Audio buffer exported to {save_path}")
else:
self.debug(f"Phone {phone_id + 1}: No audio data to export")
def clear_audio_buffer(self, phone_id):
"""Clear audio buffer for a phone"""
self.manager.clear_audio_buffer(phone_id)
def process_audio(self, phone_id, processing_type):
"""Process audio with specified type"""
save_path = self.manager.process_audio(phone_id, processing_type)
if save_path:
self.debug(f"Phone {phone_id + 1}: Processed audio saved to {save_path}")
else:
self.debug(f"Phone {phone_id + 1}: Audio processing failed")
def apply_gain_dialog(self, phone_id):
"""Show dialog to get gain value"""
gain, ok = QInputDialog.getDouble(
self, "Apply Gain", "Enter gain in dB:",
0.0, -20.0, 20.0, 1
)
if ok:
save_path = self.manager.process_audio(phone_id, "gain", gain_db=gain)
if save_path:
self.debug(f"Phone {phone_id + 1}: Applied {gain}dB gain, saved to {save_path}")
def setup_shortcuts(self):
"""Setup keyboard shortcuts"""
# Phone 1 shortcuts
QShortcut(QKeySequence("1"), self, lambda: self.manager.phone_action(0, self))
QShortcut(QKeySequence("Ctrl+1"), self, lambda: self.toggle_playback(0))
QShortcut(QKeySequence("Alt+1"), self, lambda: self.toggle_recording(0))
# Phone 2 shortcuts
QShortcut(QKeySequence("2"), self, lambda: self.manager.phone_action(1, self))
QShortcut(QKeySequence("Ctrl+2"), self, lambda: self.toggle_playback(1))
QShortcut(QKeySequence("Alt+2"), self, lambda: self.toggle_recording(1))
# General shortcuts
QShortcut(QKeySequence("Space"), self, self.toggle_auto_test)
QShortcut(QKeySequence("Ctrl+L"), self, self.clear_debug)
QShortcut(QKeySequence("Ctrl+A"), self, self.show_audio_menu)
self.debug("Keyboard shortcuts enabled:")
self.debug(" 1/2: Phone action (call/answer/hangup)")
self.debug(" Ctrl+1/2: Toggle playback")
self.debug(" Alt+1/2: Toggle recording")
self.debug(" Space: Toggle auto test")
self.debug(" Ctrl+L: Clear debug")
self.debug(" Ctrl+A: Audio options menu")
def closeEvent(self, event):
if self.auto_test_running:
self.stop_auto_test()
# Clean up audio player
if hasattr(self.manager, 'audio_player'):
self.manager.audio_player.cleanup()
for phone in self.manager.phones:
phone['client'].stop()
event.accept()

View File

@ -0,0 +1,127 @@
"""Wrapper for Noise XK handshake over GSM simulator"""
import struct
from dissononce.processing.impl.handshakestate import HandshakeState
from dissononce.processing.impl.symmetricstate import SymmetricState
from dissononce.processing.impl.cipherstate import CipherState
from dissononce.processing.handshakepatterns.interactive.XK import XKHandshakePattern
from dissononce.cipher.chachapoly import ChaChaPolyCipher
from dissononce.dh.x25519.x25519 import X25519DH
from dissononce.dh.keypair import KeyPair
from dissononce.dh.x25519.public import PublicKey
from dissononce.hash.sha256 import SHA256Hash
class NoiseXKWrapper:
"""Wrapper for Noise XK that works over message-passing instead of direct sockets"""
def __init__(self, keypair, peer_pubkey, debug_callback=None):
self.keypair = keypair
self.peer_pubkey = peer_pubkey
self.debug = debug_callback or print
# Build handshake state
cipher = ChaChaPolyCipher()
dh = X25519DH()
hshash = SHA256Hash()
symmetric = SymmetricState(CipherState(cipher), hshash)
self._hs = HandshakeState(symmetric, dh)
self._send_cs = None
self._recv_cs = None
self.handshake_complete = False
self.is_initiator = None # Track initiator status
# Message buffers
self.outgoing_messages = []
self.incoming_messages = []
def start_handshake(self, initiator):
"""Start the handshake process"""
self.debug(f"Starting Noise XK handshake as {'initiator' if initiator else 'responder'}")
self.is_initiator = initiator # Store initiator status
if initiator:
# Initiator knows peer's static out-of-band
self._hs.initialize(
XKHandshakePattern(),
True,
b'',
s=self.keypair,
rs=self.peer_pubkey
)
# Generate first message
buf = bytearray()
self._hs.write_message(b'', buf)
self.outgoing_messages.append(bytes(buf))
self.debug(f"Generated handshake message 1: {len(buf)} bytes")
else:
# Responder doesn't know peer's static yet
self._hs.initialize(
XKHandshakePattern(),
False,
b'',
s=self.keypair
)
self.debug("Responder initialized, waiting for first message")
def process_handshake_message(self, data):
"""Process incoming handshake message and generate response if needed"""
self.debug(f"Processing handshake message: {len(data)} bytes")
try:
# Read the message
payload = bytearray()
cs_pair = self._hs.read_message(data, payload)
# Check if we need to send a response
if not cs_pair:
# More messages needed
buf = bytearray()
cs_pair = self._hs.write_message(b'', buf)
self.outgoing_messages.append(bytes(buf))
self.debug(f"Generated handshake response: {len(buf)} bytes")
# Check if handshake completed after writing (for initiator)
if cs_pair:
self._complete_handshake(cs_pair)
else:
# Handshake complete after reading (for responder)
self._complete_handshake(cs_pair)
except Exception as e:
self.debug(f"Handshake error: {e}")
raise
def get_next_handshake_message(self):
"""Get next outgoing handshake message"""
if self.outgoing_messages:
return self.outgoing_messages.pop(0)
return None
def encrypt(self, plaintext):
"""Encrypt a message"""
if not self.handshake_complete:
raise RuntimeError("Handshake not complete")
return self._send_cs.encrypt_with_ad(b'', plaintext)
def decrypt(self, ciphertext):
"""Decrypt a message"""
if not self.handshake_complete:
raise RuntimeError("Handshake not complete")
return self._recv_cs.decrypt_with_ad(b'', ciphertext)
def _complete_handshake(self, cs_pair):
"""Complete the handshake with the given cipher states"""
self.debug("Handshake complete, setting up cipher states")
cs0, cs1 = cs_pair
# Use stored initiator status
if self.is_initiator:
self._send_cs, self._recv_cs = cs0, cs1
self.debug("Set up cipher states as initiator")
else:
self._send_cs, self._recv_cs = cs1, cs0
self.debug("Set up cipher states as responder")
self.handshake_complete = True
self.debug("Cipher states established")

View File

@ -1,110 +0,0 @@
import socket
import time
import select
from PyQt5.QtCore import QThread, pyqtSignal
from client_state import ClientState
class PhoneClient(QThread):
data_received = pyqtSignal(bytes, int)
state_changed = pyqtSignal(str, str, int)
def __init__(self, client_id):
super().__init__()
self.host = "localhost"
self.port = 12345
self.client_id = client_id
self.sock = None
self.running = True
self.state = ClientState(client_id)
def connect_socket(self):
retries = 3
for attempt in range(retries):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock.settimeout(120)
self.sock.connect((self.host, self.port))
print(f"Client {self.client_id} connected to {self.host}:{self.port}")
return True
except Exception as e:
print(f"Client {self.client_id} connection attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(1)
self.sock = None
return False
def run(self):
while self.running:
if not self.sock:
if not self.connect_socket():
print(f"Client {self.client_id} failed to connect after retries")
self.state_changed.emit("CALL_END", "", self.client_id)
break
try:
while self.running:
self.state.process_command(self)
self.state.check_handshake_timeout(self)
if not self.state.handshake_in_progress:
if self.sock is None:
print(f"Client {self.client_id} socket is None, exiting inner loop")
break
readable, _, _ = select.select([self.sock], [], [], 0.01)
if readable:
try:
if self.sock is None:
print(f"Client {self.client_id} socket is None before recv, exiting")
break
data = self.sock.recv(1024)
if not data:
print(f"Client {self.client_id} disconnected")
self.state_changed.emit("CALL_END", "", self.client_id)
break
self.state.handle_data(self, data)
except socket.error as e:
print(f"Client {self.client_id} socket error: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
break
else:
self.msleep(20)
print(f"Client {self.client_id} yielding during handshake")
self.msleep(1)
except Exception as e:
print(f"Client {self.client_id} unexpected error in run loop: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
break
finally:
if self.sock:
try:
self.sock.close()
except Exception as e:
print(f"Client {self.client_id} error closing socket: {e}")
self.sock = None
def send(self, message):
if self.sock and self.running:
try:
if isinstance(message, str):
data = message.encode('utf-8')
self.sock.send(data)
print(f"Client {self.client_id} sent: {message}, length={len(data)}")
else:
self.sock.send(message)
print(f"Client {self.client_id} sent binary data, length={len(message)}")
except socket.error as e:
print(f"Client {self.client_id} send error: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
def stop(self):
self.running = False
if self.sock:
try:
self.sock.close()
except Exception as e:
print(f"Client {self.client_id} error closing socket in stop: {e}")
self.sock = None
self.quit()
self.wait(1000)
def start_handshake(self, initiator, keypair, peer_pubkey):
self.state.start_handshake(initiator, keypair, peer_pubkey)

View File

@ -1,17 +1,37 @@
import secrets
from PyQt5.QtCore import QTimer
from phone_client import PhoneClient
from protocol_phone_client import ProtocolPhoneClient
from session import NoiseXKSession
from phone_state import PhoneState # Added import
from audio_player import AudioPlayer
from audio_processor import AudioProcessor
import struct
import wave
import os
class PhoneManager:
def __init__(self):
self.phones = []
self.handshake_done_count = 0
self.ui = None # Will be set by UI
self.audio_player = AudioPlayer()
self.audio_player.set_debug_callback(self.debug)
self.audio_processor = AudioProcessor()
self.audio_processor.set_debug_callback(self.debug)
self.audio_buffer = {} # client_id -> list of audio chunks for processing
def debug(self, message):
"""Send debug message to UI if available"""
if self.ui and hasattr(self.ui, 'debug'):
self.ui.debug(f"[PhoneManager] {message}")
else:
print(f"[PhoneManager] {message}")
def initialize_phones(self):
for i in range(2):
client = PhoneClient(i)
client = ProtocolPhoneClient(i) # Use protocol client
client.set_debug_callback(self.debug) # Set debug callback
client.manager = self # Set manager reference for handshake lookup
keypair = NoiseXKSession.generate_keypair()
phone = {
'id': i,
@ -21,9 +41,15 @@ class PhoneManager:
'audio_timer': None,
'keypair': keypair,
'public_key': keypair.public,
'is_initiator': False
'is_initiator': False,
'audio_file': None, # For test audio
'frame_counter': 0,
'playback_enabled': False,
'recording_enabled': False
}
client.keypair = keypair # Also set keypair on client
self.phones.append(phone)
self.debug(f"Initialized Phone {i+1} with public key: {keypair.public.data.hex()[:32]}...")
self.phones[0]['peer_public_key'] = self.phones[1]['public_key']
self.phones[1]['peer_public_key'] = self.phones[0]['public_key']
@ -31,16 +57,19 @@ class PhoneManager:
def phone_action(self, phone_id, ui_manager):
phone = self.phones[phone_id]
other_phone = self.phones[1 - phone_id]
print(f"Phone {phone_id + 1} Action, current state: {phone['state']}, is_initiator: {phone['is_initiator']}")
self.debug(f"Phone {phone_id + 1} action triggered, current state: {phone['state'].name}")
if phone['state'] == PhoneState.IDLE:
self.debug(f"Phone {phone_id + 1} initiating call to Phone {2-phone_id}")
phone['state'] = PhoneState.CALLING
other_phone['state'] = PhoneState.RINGING
phone['is_initiator'] = True
other_phone['is_initiator'] = False
phone['client'].send("RINGING")
elif phone['state'] == PhoneState.RINGING:
phone['state'] = other_phone['state'] = PhoneState.IN_CALL
self.debug(f"Phone {phone_id + 1} answering call from Phone {2-phone_id}")
phone['state'] = PhoneState.IN_CALL
# Don't set other_phone state here - let it set when it receives IN_CALL
phone['client'].send("IN_CALL")
elif phone['state'] in [PhoneState.IN_CALL, PhoneState.CALLING]:
if not phone['client'].state.handshake_in_progress and phone['state'] != PhoneState.CALLING:
@ -49,41 +78,288 @@ class PhoneManager:
for p in [phone, other_phone]:
if p['audio_timer']:
p['audio_timer'].stop()
# End voice session
if p['client'].voice_active:
p['client'].end_voice_session()
# Close audio file
if p['audio_file']:
p['audio_file'].close()
p['audio_file'] = None
p['frame_counter'] = 0
else:
print(f"Phone {phone_id + 1} cannot hang up during handshake or call setup")
self.debug(f"Phone {phone_id + 1} cannot hang up during handshake or call setup")
ui_manager.update_phone_ui(phone_id)
ui_manager.update_phone_ui(1 - phone_id)
def send_audio(self, phone_id):
phone = self.phones[phone_id]
if phone['state'] == PhoneState.IN_CALL and phone['client'].state.session and phone['client'].sock:
mock_audio = secrets.token_bytes(16)
try:
if phone['state'] != PhoneState.IN_CALL:
self.debug(f"Phone {phone_id + 1} not in call, stopping audio timer")
if phone['audio_timer']:
phone['audio_timer'].stop()
return
if not phone['client'].handshake_complete:
self.debug(f"Phone {phone_id + 1} handshake not complete, skipping audio send")
return
if not phone['client'].voice_active:
self.debug(f"Phone {phone_id + 1} voice not active, skipping audio send")
return
if phone['state'] == PhoneState.IN_CALL and phone['client'].handshake_complete and phone['client'].voice_active:
# Load test audio file if not loaded
if phone['audio_file'] is None:
wav_path = "../wav/input.wav"
if not os.path.exists(wav_path):
wav_path = "wav/input.wav"
if os.path.exists(wav_path):
try:
phone['audio_file'] = wave.open(wav_path, 'rb')
self.debug(f"Phone {phone_id + 1} loaded test audio file: {wav_path}")
# Verify it's 8kHz mono
if phone['audio_file'].getframerate() != 8000:
self.debug(f"Warning: {wav_path} is {phone['audio_file'].getframerate()}Hz, expected 8000Hz")
if phone['audio_file'].getnchannels() != 1:
self.debug(f"Warning: {wav_path} has {phone['audio_file'].getnchannels()} channels, expected 1")
# Skip initial silence - jump to 1 second in (8000 samples)
phone['audio_file'].setpos(8000)
self.debug(f"Phone {phone_id + 1} skipped initial silence, starting at 1 second")
except Exception as e:
self.debug(f"Phone {phone_id + 1} failed to load audio: {e}")
# Use mock audio as fallback
phone['audio_file'] = None
# Read audio frame (40ms at 8kHz = 320 samples)
if phone['audio_file']:
try:
frames = phone['audio_file'].readframes(320)
if not frames or len(frames) < 640: # 320 samples * 2 bytes
# Loop back to 1 second (skip silence)
phone['audio_file'].setpos(8000)
frames = phone['audio_file'].readframes(320)
self.debug(f"Phone {phone_id + 1} looped audio back to 1 second mark")
# Send through protocol (codec + 4FSK + encryption)
phone['client'].send_voice_frame(frames)
# Update waveform
if len(frames) >= 2:
samples = struct.unpack(f'{len(frames)//2}h', frames)
self.update_sent_waveform(phone_id, frames)
# If playback is enabled on the sender, play the original audio
if phone['playback_enabled']:
self.audio_player.add_audio_data(phone_id, frames)
if phone['frame_counter'] % 25 == 0:
self.debug(f"Phone {phone_id + 1} playing original audio (sender playback)")
phone['frame_counter'] += 1
if phone['frame_counter'] % 25 == 0: # Log every second
self.debug(f"Phone {phone_id + 1} sent {phone['frame_counter']} voice frames")
except Exception as e:
self.debug(f"Phone {phone_id + 1} audio send error: {e}")
else:
# Fallback: send mock audio
mock_audio = secrets.token_bytes(320)
phone['client'].send_voice_frame(mock_audio)
self.update_sent_waveform(phone_id, mock_audio)
phone['client'].state.session.send(phone['client'].sock, mock_audio)
print(f"Client {phone_id} sent encrypted audio packet, length=32")
except Exception as e:
print(f"Client {phone_id} failed to send audio: {e}")
def start_audio(self, client_id, parent=None):
self.handshake_done_count += 1
print(f"HANDSHAKE_DONE received for client {client_id}, count: {self.handshake_done_count}")
self.debug(f"HANDSHAKE_DONE received for client {client_id}, count: {self.handshake_done_count}")
# Start voice session for this client
phone = self.phones[client_id]
if phone['client'].handshake_complete and not phone['client'].voice_active:
phone['client'].start_voice_session()
if self.handshake_done_count == 2:
for phone in self.phones:
if phone['state'] == PhoneState.IN_CALL:
if not phone['audio_timer'] or not phone['audio_timer'].isActive():
phone['audio_timer'] = QTimer(parent) # Parent to PhoneUI
phone['audio_timer'].timeout.connect(lambda pid=phone['id']: self.send_audio(pid))
phone['audio_timer'].start(100)
# Add a small delay to ensure both sides are ready
def start_audio_timers():
self.debug("Starting audio timers for both phones")
for phone in self.phones:
if phone['state'] == PhoneState.IN_CALL:
if not phone['audio_timer'] or not phone['audio_timer'].isActive():
phone['audio_timer'] = QTimer(parent) # Parent to PhoneUI
phone['audio_timer'].timeout.connect(lambda pid=phone['id']: self.send_audio(pid))
phone['audio_timer'].start(40) # 40ms for each voice frame
# Delay audio start by 500ms to ensure both sides are ready
QTimer.singleShot(500, start_audio_timers)
self.handshake_done_count = 0
def update_waveform(self, client_id, data):
# Only process actual audio data (should be 640 bytes for 320 samples * 2 bytes)
# Ignore small control messages
if len(data) < 320: # Less than 160 samples (too small for audio)
self.debug(f"Phone {client_id + 1} received non-audio data: {len(data)} bytes (ignoring)")
return
self.phones[client_id]['waveform'].set_data(data)
# Debug log audio data reception (only occasionally to avoid spam)
if not hasattr(self, '_audio_frame_count'):
self._audio_frame_count = {}
if client_id not in self._audio_frame_count:
self._audio_frame_count[client_id] = 0
self._audio_frame_count[client_id] += 1
if self._audio_frame_count[client_id] == 1 or self._audio_frame_count[client_id] % 25 == 0:
self.debug(f"Phone {client_id + 1} received audio frame #{self._audio_frame_count[client_id]}: {len(data)} bytes")
# Store audio data in buffer for potential processing
if client_id not in self.audio_buffer:
self.audio_buffer[client_id] = []
self.audio_buffer[client_id].append(data)
# Keep buffer size reasonable (last 30 seconds at 8kHz)
max_chunks = 30 * 25 # 30 seconds * 25 chunks/second
if len(self.audio_buffer[client_id]) > max_chunks:
self.audio_buffer[client_id] = self.audio_buffer[client_id][-max_chunks:]
# Forward audio data to player if playback is enabled
if self.phones[client_id]['playback_enabled']:
if self._audio_frame_count[client_id] == 1:
self.debug(f"Phone {client_id + 1} forwarding audio to player (playback enabled)")
self.audio_player.add_audio_data(client_id, data)
def update_sent_waveform(self, client_id, data):
self.phones[client_id]['sent_waveform'].set_data(data)
def toggle_playback(self, client_id):
"""Toggle audio playback for a phone"""
phone = self.phones[client_id]
if phone['playback_enabled']:
# Stop playback
self.audio_player.stop_playback(client_id)
phone['playback_enabled'] = False
self.debug(f"Phone {client_id + 1} playback stopped")
else:
# Start playback
if self.audio_player.start_playback(client_id):
phone['playback_enabled'] = True
self.debug(f"Phone {client_id + 1} playback started")
# Removed test beep - we want to hear actual audio
else:
self.debug(f"Phone {client_id + 1} failed to start playback")
return phone['playback_enabled']
def toggle_recording(self, client_id):
"""Toggle audio recording for a phone"""
phone = self.phones[client_id]
if phone['recording_enabled']:
# Stop recording and save
save_path = self.audio_player.stop_recording(client_id)
phone['recording_enabled'] = False
if save_path:
self.debug(f"Phone {client_id + 1} recording saved to {save_path}")
return False, save_path
else:
# Start recording
self.audio_player.start_recording(client_id)
phone['recording_enabled'] = True
self.debug(f"Phone {client_id + 1} recording started")
return True, None
def save_received_audio(self, client_id, filename=None):
"""Save the last received audio to a file"""
if client_id not in self.phones:
return None
save_path = self.audio_player.stop_recording(client_id, filename)
if save_path:
self.debug(f"Phone {client_id + 1} audio saved to {save_path}")
return save_path
def process_audio(self, client_id, processing_type, **kwargs):
"""Process buffered audio with specified processing type"""
if client_id not in self.audio_buffer or not self.audio_buffer[client_id]:
self.debug(f"No audio data available for Phone {client_id + 1}")
return None
# Combine all audio chunks
combined_audio = b''.join(self.audio_buffer[client_id])
# Apply processing based on type
processed_audio = combined_audio
if processing_type == "normalize":
target_db = kwargs.get('target_db', -3)
processed_audio = self.audio_processor.normalize_audio(combined_audio, target_db)
elif processing_type == "gain":
gain_db = kwargs.get('gain_db', 0)
processed_audio = self.audio_processor.apply_gain(combined_audio, gain_db)
elif processing_type == "noise_gate":
threshold_db = kwargs.get('threshold_db', -40)
processed_audio = self.audio_processor.apply_noise_gate(combined_audio, threshold_db)
elif processing_type == "low_pass":
cutoff_hz = kwargs.get('cutoff_hz', 3400)
processed_audio = self.audio_processor.apply_low_pass_filter(combined_audio, cutoff_hz)
elif processing_type == "high_pass":
cutoff_hz = kwargs.get('cutoff_hz', 300)
processed_audio = self.audio_processor.apply_high_pass_filter(combined_audio, cutoff_hz)
elif processing_type == "remove_silence":
threshold_db = kwargs.get('threshold_db', -40)
processed_audio = self.audio_processor.remove_silence(combined_audio, threshold_db)
# Save processed audio
save_path = f"wav/phone{client_id + 1}_received.wav"
processed_path = self.audio_processor.save_processed_audio(
processed_audio, save_path, processing_type
)
return processed_path
def export_buffered_audio(self, client_id, filename=None):
"""Export current audio buffer to file"""
if client_id not in self.audio_buffer or not self.audio_buffer[client_id]:
self.debug(f"No audio data available for Phone {client_id + 1}")
return None
# Combine all audio chunks
combined_audio = b''.join(self.audio_buffer[client_id])
# Generate filename if not provided
if not filename:
from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"wav/phone{client_id + 1}_buffer_{timestamp}.wav"
# Ensure directory exists
os.makedirs(os.path.dirname(filename), exist_ok=True)
try:
with wave.open(filename, 'wb') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(8000)
wav_file.writeframes(combined_audio)
self.debug(f"Exported audio buffer for Phone {client_id + 1} to {filename}")
return filename
except Exception as e:
self.debug(f"Failed to export audio buffer: {e}")
return None
def clear_audio_buffer(self, client_id):
"""Clear audio buffer for a phone"""
if client_id in self.audio_buffer:
self.audio_buffer[client_id] = []
self.debug(f"Cleared audio buffer for Phone {client_id + 1}")
def map_state(self, state_str):
if state_str == "RINGING":
return PhoneState.RINGING

View File

@ -1,4 +1,6 @@
class PhoneState:
from enum import Enum
class PhoneState(Enum):
IDLE = 0
CALLING = 1
IN_CALL = 2

View File

@ -0,0 +1,133 @@
# protocol_client_state.py
from queue import Queue
from session import NoiseXKSession
import time
class ProtocolClientState:
"""Enhanced client state for integrated protocol with voice codec"""
def __init__(self, client_id):
self.client_id = client_id
self.command_queue = Queue()
self.initiator = None
self.keypair = None
self.peer_pubkey = None
self.session = None
self.handshake_in_progress = False
self.handshake_start_time = None
self.call_active = False
self.voice_active = False
self.debug_callback = None
def debug(self, message):
"""Send debug message"""
if self.debug_callback:
self.debug_callback(f"[State{self.client_id+1}] {message}")
else:
print(f"[State{self.client_id+1}] {message}")
def process_command(self, client):
"""Process commands from the queue."""
if not self.command_queue.empty():
self.debug(f"Processing command queue, size: {self.command_queue.qsize()}")
command = self.command_queue.get()
self.debug(f"Processing command: {command}")
if command == "handshake":
# Handshake is now handled by the wrapper in the client
self.debug(f"Handshake command processed")
self.handshake_in_progress = False
self.handshake_start_time = None
elif command == "start_voice":
if client.handshake_complete:
client.start_voice_session()
self.voice_active = True
elif command == "end_voice":
if self.voice_active:
client.end_voice_session()
self.voice_active = False
def start_handshake(self, initiator, keypair, peer_pubkey):
"""Queue handshake command."""
self.initiator = initiator
self.keypair = keypair
self.peer_pubkey = peer_pubkey
self.debug(f"Queuing handshake, initiator: {initiator}")
self.handshake_in_progress = True
self.handshake_start_time = time.time()
self.command_queue.put("handshake")
def handle_data(self, client, data):
"""Handle received data (control or audio)."""
try:
# Try to decode as text first
decoded_data = data.decode('utf-8').strip()
self.debug(f"Received raw: {decoded_data}")
# Handle control messages
if decoded_data in ["RINGING", "CALL_END", "CALL_DROPPED", "IN_CALL", "HANDSHAKE", "HANDSHAKE_DONE"]:
self.debug(f"Emitting state change: {decoded_data}")
# Log which client is receiving what
self.debug(f"Client {self.client_id} received {decoded_data} message")
client.state_changed.emit(decoded_data, decoded_data, self.client_id)
if decoded_data == "IN_CALL":
self.debug(f"Received IN_CALL, setting call_active = True")
self.call_active = True
elif decoded_data == "HANDSHAKE":
self.debug(f"Received HANDSHAKE, setting handshake_in_progress = True")
self.handshake_in_progress = True
elif decoded_data == "HANDSHAKE_DONE":
self.debug(f"Received HANDSHAKE_DONE from peer")
self.call_active = True
# Start voice session on this side too
if client.handshake_complete and not client.voice_active:
self.debug(f"Starting voice session after receiving HANDSHAKE_DONE")
self.command_queue.put("start_voice")
elif decoded_data in ["CALL_END", "CALL_DROPPED"]:
self.debug(f"Received {decoded_data}, ending call")
self.call_active = False
if self.voice_active:
self.command_queue.put("end_voice")
else:
self.debug(f"Ignored unexpected text message: {decoded_data}")
except UnicodeDecodeError:
# Handle binary data (protocol messages or encrypted data)
if len(data) > 0 and data[0] == 0x20 and not client.handshake_complete: # Noise handshake message only before handshake completes
self.debug(f"Received Noise handshake message")
# Initialize responder if not already done
if not client.handshake_initiated:
# Find the other phone's public key
# This is a bit hacky but works for our 2-phone setup
manager = getattr(client, 'manager', None)
if manager:
other_phone = manager.phones[1 - self.client_id]
client.start_handshake(initiator=False,
keypair=client.keypair or manager.phones[self.client_id]['keypair'],
peer_pubkey=other_phone['public_key'])
# Pass to protocol handler
client._handle_protocol_message(data)
elif client.handshake_complete and client.noise_wrapper:
# Pass encrypted data back to client for decryption
client._handle_encrypted_data(data)
else:
# Pass other binary messages to protocol handler only if not yet complete
if not client.handshake_complete:
client._handle_protocol_message(data)
def check_handshake_timeout(self, client):
"""Check for handshake timeout."""
if self.handshake_in_progress and self.handshake_start_time:
if time.time() - self.handshake_start_time > 30:
self.debug(f"Handshake timeout after 30s")
client.state_changed.emit("CALL_END", "", self.client_id)
self.handshake_in_progress = False
self.handshake_start_time = None
def queue_voice_command(self, command):
"""Queue voice-related commands"""
if command in ["start_voice", "end_voice"]:
self.command_queue.put(command)

View File

@ -0,0 +1,456 @@
import socket
import time
import select
import struct
import array
from PyQt5.QtCore import QThread, pyqtSignal
from protocol_client_state import ProtocolClientState
from session import NoiseXKSession
from noise_wrapper import NoiseXKWrapper
from dissononce.dh.keypair import KeyPair
from dissononce.dh.x25519.public import PublicKey
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from voice_codec import Codec2Wrapper, FSKModem, Codec2Mode
# ChaCha20 removed - using only Noise XK encryption
class ProtocolPhoneClient(QThread):
"""Integrated phone client with Noise XK, Codec2, 4FSK, and ChaCha20"""
data_received = pyqtSignal(bytes, int)
state_changed = pyqtSignal(str, str, int)
def __init__(self, client_id):
super().__init__()
self.host = "localhost"
self.port = 12345
self.client_id = client_id
self.sock = None
self.running = True
self.state = ProtocolClientState(client_id)
# Noise XK session
self.noise_session = None
self.noise_wrapper = None
self.handshake_complete = False
self.handshake_initiated = False
# No buffer needed with larger frame size
# Voice codec components
self.codec = Codec2Wrapper(mode=Codec2Mode.MODE_1200)
self.modem = FSKModem()
# Voice encryption handled by Noise XK
# No separate voice key needed
# Voice state
self.voice_active = False
self.voice_frame_counter = 0
# Message buffer for fragmented messages
self.recv_buffer = bytearray()
# Debug callback
self.debug_callback = None
def set_debug_callback(self, callback):
"""Set debug callback function"""
self.debug_callback = callback
self.state.debug_callback = callback
def debug(self, message):
"""Send debug message"""
if self.debug_callback:
self.debug_callback(f"[Phone{self.client_id+1}] {message}")
else:
print(f"[Phone{self.client_id+1}] {message}")
def connect_socket(self):
retries = 3
for attempt in range(retries):
try:
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
self.sock.settimeout(120)
self.sock.connect((self.host, self.port))
self.debug(f"Connected to GSM simulator at {self.host}:{self.port}")
return True
except Exception as e:
self.debug(f"Connection attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(1)
self.sock = None
return False
def run(self):
while self.running:
if not self.sock:
if not self.connect_socket():
self.debug("Failed to connect after retries")
self.state_changed.emit("CALL_END", "", self.client_id)
break
try:
while self.running:
self.state.process_command(self)
self.state.check_handshake_timeout(self)
if self.handshake_complete and self.voice_active:
# Process voice data if active
self._process_voice_data()
# Always check for incoming data, even during handshake
if self.sock is None:
break
readable, _, _ = select.select([self.sock], [], [], 0.01)
if readable:
try:
if self.sock is None:
break
chunk = self.sock.recv(4096)
if not chunk:
self.debug("Disconnected from server")
self.state_changed.emit("CALL_END", "", self.client_id)
break
# Add to buffer
self.recv_buffer.extend(chunk)
# Process complete messages
while len(self.recv_buffer) >= 4:
# Read message length
msg_len = struct.unpack('>I', self.recv_buffer[:4])[0]
# Check if we have the complete message
if len(self.recv_buffer) >= 4 + msg_len:
# Extract message
data = bytes(self.recv_buffer[4:4+msg_len])
# Remove from buffer
self.recv_buffer = self.recv_buffer[4+msg_len:]
# Pass to state handler
self.state.handle_data(self, data)
else:
# Wait for more data
break
except socket.error as e:
self.debug(f"Socket error: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
break
self.msleep(1)
except Exception as e:
self.debug(f"Unexpected error in run loop: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
break
finally:
if self.sock:
try:
self.sock.close()
except Exception as e:
self.debug(f"Error closing socket: {e}")
self.sock = None
def _handle_encrypted_data(self, data):
"""Handle encrypted data after handshake"""
if not self.handshake_complete or not self.noise_wrapper:
self.debug(f"Cannot decrypt - handshake not complete")
return
# All data after handshake is encrypted, decrypt it first
try:
plaintext = self.noise_wrapper.decrypt(data)
# Check if it's a text message
try:
text_msg = plaintext.decode('utf-8').strip()
if text_msg == "HANDSHAKE_DONE":
self.debug(f"Received encrypted HANDSHAKE_DONE")
self.state_changed.emit("HANDSHAKE_DONE", "HANDSHAKE_DONE", self.client_id)
return
except:
pass
# Otherwise handle as protocol message
self._handle_protocol_message(plaintext)
except Exception as e:
# Suppress common decryption errors
pass
def _handle_protocol_message(self, plaintext):
"""Handle decrypted protocol messages"""
if len(plaintext) < 1:
return
msg_type = plaintext[0]
msg_data = plaintext[1:]
if msg_type == 0x10: # Voice start
self.debug("Received VOICE_START message")
self._handle_voice_start(msg_data)
elif msg_type == 0x11: # Voice data
self._handle_voice_data(msg_data)
elif msg_type == 0x12: # Voice end
self.debug("Received VOICE_END message")
self._handle_voice_end(msg_data)
elif msg_type == 0x20: # Noise handshake
self.debug("Received NOISE_HS message")
self._handle_noise_handshake(msg_data)
else:
self.debug(f"Received unknown protocol message type: 0x{msg_type:02x}")
# Don't emit control messages to data_received - that's only for audio
# Control messages should be handled via state_changed signal
def _handle_voice_start(self, data):
"""Handle voice session start"""
self.debug("Voice session started by peer")
self.voice_active = True
self.voice_frame_counter = 0
self.state_changed.emit("VOICE_START", "", self.client_id)
def _handle_voice_data(self, data):
"""Handle voice frame (already decrypted by Noise)"""
if len(data) < 4:
return
try:
# Data is float array packed as bytes
# Unpack the float array
num_floats = len(data) // 4
modulated_signal = struct.unpack(f'{num_floats}f', data)
# Demodulate FSK
demodulated_data, confidence = self.modem.demodulate(modulated_signal)
if confidence > 0.5: # Only decode if confidence is good
# Create Codec2Frame from demodulated data
from voice_codec import Codec2Frame, Codec2Mode
frame = Codec2Frame(
mode=Codec2Mode.MODE_1200,
bits=demodulated_data,
timestamp=time.time(),
frame_number=self.voice_frame_counter
)
# Decode with Codec2
pcm_samples = self.codec.decode(frame)
if self.voice_frame_counter == 0:
self.debug(f"First voice frame demodulated with confidence {confidence:.2f}")
# Send PCM to UI for playback
if pcm_samples is not None and len(pcm_samples) > 0:
# Only log details for first frame and every 25th frame
if self.voice_frame_counter == 0 or self.voice_frame_counter % 25 == 0:
self.debug(f"Decoded PCM samples: type={type(pcm_samples)}, len={len(pcm_samples)}")
# Convert to bytes if needed
if hasattr(pcm_samples, 'tobytes'):
pcm_bytes = pcm_samples.tobytes()
elif isinstance(pcm_samples, (list, array.array)):
# Convert array to bytes
import array
if isinstance(pcm_samples, list):
pcm_array = array.array('h', pcm_samples)
pcm_bytes = pcm_array.tobytes()
else:
pcm_bytes = pcm_samples.tobytes()
else:
pcm_bytes = bytes(pcm_samples)
if self.voice_frame_counter == 0:
self.debug(f"Emitting first PCM frame: {len(pcm_bytes)} bytes")
self.data_received.emit(pcm_bytes, self.client_id)
self.voice_frame_counter += 1
# Log frame reception periodically
if self.voice_frame_counter == 1 or self.voice_frame_counter % 25 == 0:
self.debug(f"Received voice data frame #{self.voice_frame_counter}")
else:
self.debug(f"Codec decode returned None or empty")
else:
if self.voice_frame_counter % 10 == 0:
self.debug(f"Low confidence demodulation: {confidence:.2f}")
except Exception as e:
self.debug(f"Voice decode error: {e}")
def _handle_voice_end(self, data):
"""Handle voice session end"""
self.debug("Voice session ended by peer")
self.voice_active = False
self.state_changed.emit("VOICE_END", "", self.client_id)
def _handle_noise_handshake(self, data):
"""Handle Noise handshake message"""
if not self.noise_wrapper:
self.debug("Received handshake message but no wrapper initialized")
return
try:
# Process the handshake message
self.noise_wrapper.process_handshake_message(data)
# Check if we need to send a response
response = self.noise_wrapper.get_next_handshake_message()
if response:
self.send(b'\x20' + response)
# Check if handshake is complete
if self.noise_wrapper.handshake_complete and not self.handshake_complete:
self.debug("Noise wrapper handshake complete, calling complete_handshake()")
self.complete_handshake()
except Exception as e:
self.debug(f"Handshake processing error: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
def _process_voice_data(self):
"""Process outgoing voice data"""
# This would be called when we have voice input to send
# For now, this is a placeholder
pass
def send_voice_frame(self, pcm_samples):
"""Send a voice frame through the protocol"""
if not self.handshake_complete:
self.debug("Cannot send voice - handshake not complete")
return
if not self.voice_active:
self.debug("Cannot send voice - voice session not active")
return
try:
# Encode with Codec2
codec_frame = self.codec.encode(pcm_samples)
if not codec_frame:
return
if self.voice_frame_counter % 25 == 0: # Log every 25 frames (1 second)
self.debug(f"Encoding voice frame #{self.voice_frame_counter}: {len(pcm_samples)} bytes PCM → {len(codec_frame.bits)} bytes compressed")
# Modulate with FSK
modulated_data = self.modem.modulate(codec_frame.bits)
# Convert modulated float array to bytes
modulated_bytes = struct.pack(f'{len(modulated_data)}f', *modulated_data)
if self.voice_frame_counter % 25 == 0:
self.debug(f"Voice frame size: {len(modulated_bytes)} bytes")
# Build voice data message (no ChaCha20, will be encrypted by Noise)
msg = bytes([0x11]) + modulated_bytes
# Send through Noise encrypted channel
self.send(msg)
self.voice_frame_counter += 1
except Exception as e:
self.debug(f"Voice encode error: {e}")
def send(self, message):
"""Send data through Noise encrypted channel with proper framing"""
if self.sock and self.running:
try:
# Handshake messages (0x20) bypass Noise encryption
if isinstance(message, bytes) and len(message) > 0 and message[0] == 0x20:
# Add length prefix for framing
framed = struct.pack('>I', len(message)) + message
self.sock.send(framed)
return
if self.handshake_complete and self.noise_wrapper:
# Encrypt everything with Noise after handshake
# Convert string to bytes if needed
if isinstance(message, str):
message = message.encode('utf-8')
encrypted = self.noise_wrapper.encrypt(message)
# Add length prefix for framing
framed = struct.pack('>I', len(encrypted)) + encrypted
self.sock.send(framed)
else:
# During handshake, send raw with framing
if isinstance(message, str):
data = message.encode('utf-8')
framed = struct.pack('>I', len(data)) + data
self.sock.send(framed)
self.debug(f"Sent control message: {message}")
else:
framed = struct.pack('>I', len(message)) + message
self.sock.send(framed)
except socket.error as e:
self.debug(f"Send error: {e}")
self.state_changed.emit("CALL_END", "", self.client_id)
def stop(self):
self.running = False
self.voice_active = False
if self.sock:
try:
self.sock.close()
except Exception as e:
self.debug(f"Error closing socket in stop: {e}")
self.sock = None
self.quit()
self.wait(1000)
def start_handshake(self, initiator, keypair, peer_pubkey):
"""Start Noise XK handshake"""
self.debug(f"Starting Noise XK handshake as {'initiator' if initiator else 'responder'}")
self.debug(f"Our public key: {keypair.public.data.hex()[:32]}...")
self.debug(f"Peer public key: {peer_pubkey.data.hex()[:32]}...")
# Create noise wrapper
self.noise_wrapper = NoiseXKWrapper(keypair, peer_pubkey, self.debug)
self.noise_wrapper.start_handshake(initiator)
self.handshake_initiated = True
# Send first handshake message if initiator
if initiator:
msg = self.noise_wrapper.get_next_handshake_message()
if msg:
# Send as NOISE_HS message type
self.send(b'\x20' + msg) # 0x20 = Noise handshake message
def complete_handshake(self):
"""Called when Noise handshake completes"""
self.handshake_complete = True
self.debug("Noise XK handshake complete!")
self.debug("Secure channel established")
# Send HANDSHAKE_DONE message
self.send("HANDSHAKE_DONE")
self.state_changed.emit("HANDSHAKE_COMPLETE", "", self.client_id)
def start_voice_session(self):
"""Start a voice session"""
if not self.handshake_complete:
self.debug("Cannot start voice - handshake not complete")
return
self.voice_active = True
self.voice_frame_counter = 0
# Send voice start message
msg = bytes([0x10]) # Voice start message type
self.send(msg)
self.debug("Voice session started")
self.state_changed.emit("VOICE_START", "", self.client_id)
def end_voice_session(self):
"""End a voice session"""
if not self.voice_active:
return
self.voice_active = False
# Send voice end message
msg = bytes([0x12]) # Voice end message type
self.send(msg)
self.debug("Voice session ended")
self.state_changed.emit("VOICE_END", "", self.client_id)

View File

@ -10,8 +10,8 @@ from dissononce.dh.keypair import KeyPair
from dissononce.dh.x25519.public import PublicKey
from dissononce.hash.sha256 import SHA256Hash
# Configure root logger for debug output
logging.basicConfig(level=logging.DEBUG, format="%(message)s")
# Configure logging - disabled by default to avoid noise
# logging.basicConfig(level=logging.DEBUG, format="%(message)s")
class NoiseXKSession:
@staticmethod
@ -46,7 +46,7 @@ class NoiseXKSession:
so that each side reads or writes in the correct message order.
On completion, self._send_cs and self._recv_cs hold the two CipherStates.
"""
logging.debug(f"[handshake] start (initiator={initiator})")
# logging.debug(f"[handshake] start (initiator={initiator})")
# initialize with our KeyPair and their PublicKey
if initiator:
# initiator knows peers static out-of-band
@ -58,7 +58,7 @@ class NoiseXKSession:
rs=self.peer_pubkey
)
else:
logging.debug("[handshake] responder initializing without rs")
# logging.debug("[handshake] responder initializing without rs")
# responder must NOT supply rs here
self._hs.initialize(
XKHandshakePattern(),
@ -72,34 +72,34 @@ class NoiseXKSession:
# 1) -> e
buf1 = bytearray()
cs_pair = self._hs.write_message(b'', buf1)
logging.debug(f"[-> e] {buf1.hex()}")
# logging.debug(f"[-> e] {buf1.hex()}")
self._send_all(sock, buf1)
# 2) <- e, es, s, ss
msg2 = self._recv_all(sock)
logging.debug(f"[<- msg2] {msg2.hex()}")
# logging.debug(f"[<- msg2] {msg2.hex()}")
self._hs.read_message(msg2, bytearray())
# 3) -> se (final)
buf3 = bytearray()
cs_pair = self._hs.write_message(b'', buf3)
logging.debug(f"[-> se] {buf3.hex()}")
# logging.debug(f"[-> se] {buf3.hex()}")
self._send_all(sock, buf3)
else:
# 1) <- e
msg1 = self._recv_all(sock)
logging.debug(f"[<- e] {msg1.hex()}")
# logging.debug(f"[<- e] {msg1.hex()}")
self._hs.read_message(msg1, bytearray())
# 2) -> e, es, s, ss
buf2 = bytearray()
cs_pair = self._hs.write_message(b'', buf2)
logging.debug(f"[-> msg2] {buf2.hex()}")
# logging.debug(f"[-> msg2] {buf2.hex()}")
self._send_all(sock, buf2)
# 3) <- se (final)
msg3 = self._recv_all(sock)
logging.debug(f"[<- se] {msg3.hex()}")
# logging.debug(f"[<- se] {msg3.hex()}")
cs_pair = self._hs.read_message(msg3, bytearray())
# on the final step, we must get exactly two CipherStates
@ -168,9 +168,9 @@ class NoiseXKSession:
# Read 2-byte length prefix, then the payload
hdr = self._read_exact(sock, 2)
length = int.from_bytes(hdr, 'big')
logging.debug(f"[RECV] length={length} ({hdr.hex()})")
# logging.debug(f"[RECV] length={length} ({hdr.hex()})")
data = self._read_exact(sock, length)
logging.debug(f"[RECV] data={data.hex()}")
# logging.debug(f"[RECV] data={data.hex()}")
return data
@staticmethod

View File

@ -1,4 +1,5 @@
import random
import struct
from PyQt5.QtWidgets import QWidget
from PyQt5.QtCore import QTimer, QSize, QPointF
from PyQt5.QtGui import QPainter, QColor, QPen, QLinearGradient, QBrush
@ -7,8 +8,8 @@ class WaveformWidget(QWidget):
def __init__(self, parent=None, dynamic=False):
super().__init__(parent)
self.dynamic = dynamic
self.setMinimumSize(200, 80)
self.setMaximumHeight(100)
self.setMinimumSize(200, 60)
self.setMaximumHeight(80)
self.waveform_data = [random.randint(10, 90) for _ in range(50)]
if self.dynamic:
self.timer = QTimer(self)
@ -20,8 +21,28 @@ class WaveformWidget(QWidget):
self.update()
def set_data(self, data):
amplitude = sum(byte for byte in data) % 90 + 10
self.waveform_data = self.waveform_data[1:] + [amplitude]
# Convert audio data to visual amplitude
if isinstance(data, bytes) and len(data) >= 2:
# Extract PCM samples (16-bit signed)
num_samples = min(len(data) // 2, 20) # Take up to 20 samples
samples = []
for i in range(0, num_samples * 2, 2):
if i + 1 < len(data):
sample = struct.unpack('h', data[i:i+2])[0]
# Normalize to 0-100 range
amplitude = abs(sample) / 327.68 # 32768/100
samples.append(min(95, max(5, amplitude)))
if samples:
# Add new samples and maintain fixed size
self.waveform_data.extend(samples)
# Keep last 50 samples
self.waveform_data = self.waveform_data[-50:]
else:
# Fallback for non-audio data
amplitude = sum(byte for byte in data[:20]) % 90 + 10
self.waveform_data = self.waveform_data[1:] + [amplitude]
self.update()
def paintEvent(self, event):

View File

@ -1,13 +0,0 @@
simulator/
├── gsm_simulator.py # gsm_simulator
├── launch_gsm_simulator.sh # use to start docker and simulator, run in terminal
2 clients nect to gsm_simulator and simulate a call using noise protocol
UI/
├── main.py # UI setup and event handling
├── phone_manager.py # Phone state, client init, audio logic
├── phone_client.py # Socket communication and threading
├── client_state.py # Client state and command processing
├── session.py # Noise XK crypto session
├── waveform_widget.py # Waveform UI component
├── phone_state.py # State constants

View File

@ -0,0 +1,58 @@
#!/bin/bash
# Install audio dependencies for DryBox
echo "Installing audio dependencies for DryBox..."
echo
# Detect OS
if [ -f /etc/os-release ]; then
. /etc/os-release
OS=$ID
VER=$VERSION_ID
else
echo "Cannot detect OS. Please install manually."
exit 1
fi
case $OS in
fedora)
echo "Detected Fedora $VER"
echo "Installing python3-devel and portaudio-devel..."
sudo dnf install -y python3-devel portaudio-devel
;;
ubuntu|debian)
echo "Detected $OS $VER"
echo "Installing python3-dev and portaudio19-dev..."
sudo apt-get update
sudo apt-get install -y python3-dev portaudio19-dev
;;
*)
echo "Unsupported OS: $OS"
echo "Please install manually:"
echo " - Python development headers"
echo " - PortAudio development libraries"
exit 1
;;
esac
if [ $? -eq 0 ]; then
echo
echo "System dependencies installed successfully!"
echo "Now installing PyAudio..."
pip install pyaudio
if [ $? -eq 0 ]; then
echo
echo "✅ Audio dependencies installed successfully!"
echo "You can now use real-time audio playback in DryBox."
else
echo
echo "❌ Failed to install PyAudio"
echo "Try: pip install --user pyaudio"
fi
else
echo
echo "❌ Failed to install system dependencies"
fi

View File

@ -0,0 +1,22 @@
# Core dependencies for DryBox integrated protocol
# Noise Protocol Framework
dissononce>=0.34.3
# Cryptography
cryptography>=41.0.0
# Qt GUI
PyQt5>=5.15.0
# Numerical computing (for signal processing)
numpy>=1.24.0
# Audio processing (for real audio I/O)
pyaudio>=0.2.11
# Wave file handling (included in standard library)
# wave
# For future integration with real Codec2
# pycodec2>=1.0.0

View File

@ -0,0 +1,11 @@
#!/bin/bash
# Run DryBox UI with proper Wayland support on Fedora
cd "$(dirname "$0")"
# Use native Wayland if available
export QT_QPA_PLATFORM=wayland
# Run the UI
cd UI
python3 main.py

View File

@ -1,10 +1,11 @@
import socket
import threading
import time
import struct
HOST = "0.0.0.0"
PORT = 12345
FRAME_SIZE = 1000
FRAME_SIZE = 10000 # Increased to avoid fragmenting voice frames
FRAME_DELAY = 0.02
clients = []
@ -12,25 +13,49 @@ clients_lock = threading.Lock()
def handle_client(client_sock, client_id):
print(f"Starting handle_client for Client {client_id}")
recv_buffer = bytearray()
try:
while True:
other_client = None
with clients_lock:
if len(clients) == 2 and client_id < len(clients):
other_client = clients[1 - client_id]
print(f"Client {client_id} waiting for data, other_client exists: {other_client is not None}")
try:
data = client_sock.recv(1024)
if not data:
chunk = client_sock.recv(4096)
if not chunk:
print(f"Client {client_id} disconnected or no data received")
break
if other_client:
for i in range(0, len(data), FRAME_SIZE):
frame = data[i:i + FRAME_SIZE]
other_client.send(frame)
time.sleep(FRAME_DELAY)
print(f"Forwarded {len(data)} bytes from Client {client_id} to Client {1 - client_id}")
# Add to buffer
recv_buffer.extend(chunk)
# Process complete messages
while len(recv_buffer) >= 4:
# Read message length
msg_len = struct.unpack('>I', recv_buffer[:4])[0]
# Check if we have the complete message
if len(recv_buffer) >= 4 + msg_len:
# Extract complete message (including length prefix)
complete_msg = bytes(recv_buffer[:4+msg_len])
# Remove from buffer
recv_buffer = recv_buffer[4+msg_len:]
# Forward complete message to other client
if other_client:
try:
other_client.send(complete_msg)
print(f"Forwarded {len(complete_msg)} bytes from Client {client_id} to Client {1 - client_id}")
except Exception as e:
print(f"Error forwarding from Client {client_id}: {e}")
else:
print(f"No other client to forward to from Client {client_id}")
else:
# Wait for more data
break
except socket.error as e:
print(f"Socket error with Client {client_id}: {e}")
break

View File

@ -1,24 +0,0 @@
#external_caller.py
import socket
import time
def connect():
caller_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
caller_socket.connect(('localhost', 12345))
caller_socket.send("CALLER".encode())
print("Connected to GSM simulator as CALLER")
time.sleep(2) # Wait 2 seconds for receiver to connect
for i in range(5):
message = f"Audio packet {i + 1}"
caller_socket.send(message.encode())
print(f"Sent: {message}")
time.sleep(1)
caller_socket.send("CALL_END".encode())
print("Call ended.")
caller_socket.close()
if __name__ == "__main__":
connect()

View File

@ -1,37 +0,0 @@
#external_receiver.py
import socket
def connect():
receiver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
receiver_socket.settimeout(15) # Increase timeout to 15 seconds
receiver_socket.connect(('localhost', 12345))
receiver_socket.send("RECEIVER".encode())
print("Connected to GSM simulator as RECEIVER")
while True:
try:
data = receiver_socket.recv(1024).decode().strip()
if not data:
print("No data received. Connection closed.")
break
if data == "RINGING":
print("Incoming call... ringing")
elif data == "CALL_END":
print("Call ended by caller.")
break
elif data == "CALL_DROPPED":
print("Call dropped by network.")
break
else:
print(f"Received: {data}")
except socket.timeout:
print("Timed out waiting for data.")
break
except Exception as e:
print(f"Receiver error: {e}")
break
receiver_socket.close()
if __name__ == "__main__":
connect()

View File

@ -1,86 +0,0 @@
import socket
import os
import time
import subprocess
# Configuration
HOST = "localhost"
PORT = 12345
INPUT_FILE = "wav/input.wav"
OUTPUT_FILE = "wav/received.wav"
def encrypt_data(data):
return data # Replace with your encryption protocol
def decrypt_data(data):
return data # Replace with your decryption protocol
def run_protocol(send_mode=True):
"""Connect to the simulator and send/receive data."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((HOST, PORT))
print(f"Connected to simulator at {HOST}:{PORT}")
if send_mode:
# Sender mode: Encode audio with toast
os.system(f"toast -p -l {INPUT_FILE}") # Creates input.wav.gsm
input_gsm_file = f"{INPUT_FILE}.gsm"
if not os.path.exists(input_gsm_file):
print(f"Error: {input_gsm_file} not created")
sock.close()
return
with open(input_gsm_file, "rb") as f:
voice_data = f.read()
encrypted_data = encrypt_data(voice_data)
sock.send(encrypted_data)
print(f"Sent {len(encrypted_data)} bytes")
os.remove(input_gsm_file) # Clean up
else:
# Receiver mode: Wait for and receive data
print("Waiting for data from sender...")
received_data = b""
sock.settimeout(5.0)
try:
while True:
print("Calling recv()...")
data = sock.recv(1024)
print(f"Received {len(data)} bytes")
if not data:
print("Connection closed by sender or simulator")
break
received_data += data
except socket.timeout:
print("Timed out waiting for data")
if received_data:
with open("received.gsm", "wb") as f:
f.write(decrypt_data(received_data))
print(f"Wrote {len(received_data)} bytes to received.gsm")
# Decode with untoast, then convert to WAV with sox
result = subprocess.run(["untoast", "received.gsm"], capture_output=True, text=True)
print(f"untoast return code: {result.returncode}")
print(f"untoast stderr: {result.stderr}")
if result.returncode == 0:
if os.path.exists("received"):
# Convert raw PCM to WAV (8 kHz, mono, 16-bit)
subprocess.run(["sox", "-t", "raw", "-r", "8000", "-e", "signed", "-b", "16", "-c", "1", "received",
OUTPUT_FILE])
os.remove("received")
print(f"Received and saved {len(received_data)} bytes to {OUTPUT_FILE}")
else:
print("Error: 'received' file not created by untoast")
else:
print(f"untoast failed: {result.stderr}")
else:
print("No data received from simulator")
sock.close()
if __name__ == "__main__":
mode = input("Enter 'send' to send data or 'receive' to receive data: ").strip().lower()
run_protocol(send_mode=(mode == "send"))

View File

@ -0,0 +1,714 @@
"""
Voice codec integration for encrypted voice over GSM.
Implements Codec2 compression with FSK modulation for transmitting
encrypted voice data over standard GSM voice channels.
"""
import array
import math
import struct
from typing import Optional, Tuple, List
from dataclasses import dataclass
from enum import IntEnum
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
# ANSI colors
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"
class Codec2Mode(IntEnum):
"""Codec2 bitrate modes."""
MODE_3200 = 0 # 3200 bps
MODE_2400 = 1 # 2400 bps
MODE_1600 = 2 # 1600 bps
MODE_1400 = 3 # 1400 bps
MODE_1300 = 4 # 1300 bps
MODE_1200 = 5 # 1200 bps (recommended for robustness)
MODE_700C = 6 # 700 bps
@dataclass
class Codec2Frame:
"""Represents a single Codec2 compressed voice frame."""
mode: Codec2Mode
bits: bytes
timestamp: float
frame_number: int
class Codec2Wrapper:
"""
Wrapper for Codec2 voice codec.
In production, this would use py_codec2 or ctypes bindings to libcodec2.
This is a simulation interface for protocol development.
"""
# Frame sizes in bits for each mode
FRAME_BITS = {
Codec2Mode.MODE_3200: 64,
Codec2Mode.MODE_2400: 48,
Codec2Mode.MODE_1600: 64,
Codec2Mode.MODE_1400: 56,
Codec2Mode.MODE_1300: 52,
Codec2Mode.MODE_1200: 48,
Codec2Mode.MODE_700C: 28
}
# Frame duration in ms
FRAME_MS = {
Codec2Mode.MODE_3200: 20,
Codec2Mode.MODE_2400: 20,
Codec2Mode.MODE_1600: 40,
Codec2Mode.MODE_1400: 40,
Codec2Mode.MODE_1300: 40,
Codec2Mode.MODE_1200: 40,
Codec2Mode.MODE_700C: 40
}
def __init__(self, mode: Codec2Mode = Codec2Mode.MODE_1200):
"""
Initialize Codec2 wrapper.
Args:
mode: Codec2 bitrate mode (default 1200 bps for robustness)
"""
self.mode = mode
self.frame_bits = self.FRAME_BITS[mode]
self.frame_bytes = (self.frame_bits + 7) // 8
self.frame_ms = self.FRAME_MS[mode]
self.frame_samples = int(8000 * self.frame_ms / 1000) # 8kHz sampling
self.frame_counter = 0
# Quiet initialization - no print
def encode(self, audio_samples) -> Optional[Codec2Frame]:
"""
Encode PCM audio samples to Codec2 frame.
Args:
audio_samples: PCM samples (8kHz, 16-bit signed)
Returns:
Codec2Frame or None if insufficient samples
"""
if len(audio_samples) < self.frame_samples:
return None
# In production: call codec2_encode(state, bits, samples)
# Simulation: create pseudo-compressed data
compressed = self._simulate_compression(audio_samples[:self.frame_samples])
frame = Codec2Frame(
mode=self.mode,
bits=compressed,
timestamp=self.frame_counter * self.frame_ms / 1000.0,
frame_number=self.frame_counter
)
self.frame_counter += 1
return frame
def decode(self, frame: Codec2Frame):
"""
Decode Codec2 frame to PCM audio samples.
Args:
frame: Codec2 compressed frame
Returns:
PCM samples (8kHz, 16-bit signed)
"""
if frame.mode != self.mode:
raise ValueError(f"Frame mode {frame.mode} doesn't match decoder mode {self.mode}")
# In production: call codec2_decode(state, samples, bits)
# Simulation: decompress to audio
return self._simulate_decompression(frame.bits)
def _simulate_compression(self, samples) -> bytes:
"""Simulate Codec2 compression (for testing)."""
# Convert to list if needed
if hasattr(samples, 'tolist'):
sample_list = samples.tolist()
elif hasattr(samples, '__iter__'):
sample_list = list(samples)
else:
sample_list = samples
# Extract basic features for simulation
if HAS_NUMPY and hasattr(samples, '__array__'):
# Convert to numpy array if needed
np_samples = np.asarray(samples, dtype=np.float32)
if len(np_samples) > 0:
mean_square = np.mean(np_samples ** 2)
energy = np.sqrt(mean_square) if not np.isnan(mean_square) else 0.0
zero_crossings = np.sum(np.diff(np.sign(np_samples)) != 0)
else:
energy = 0.0
zero_crossings = 0
else:
# Manual calculation without numpy
if sample_list and len(sample_list) > 0:
energy = math.sqrt(sum(s**2 for s in sample_list) / len(sample_list))
zero_crossings = sum(1 for i in range(1, len(sample_list))
if (sample_list[i-1] >= 0) != (sample_list[i] >= 0))
else:
energy = 0.0
zero_crossings = 0
# Pack into bytes (simplified)
# Ensure values are valid
energy_int = max(0, min(65535, int(energy)))
zc_int = max(0, min(65535, int(zero_crossings)))
data = struct.pack('<HH', energy_int, zc_int)
# Pad to expected frame size
data += b'\x00' * (self.frame_bytes - len(data))
return data[:self.frame_bytes]
def _simulate_decompression(self, compressed: bytes):
"""Simulate Codec2 decompression (for testing)."""
# Unpack features
if len(compressed) >= 4:
energy, zero_crossings = struct.unpack('<HH', compressed[:4])
else:
energy, zero_crossings = 1000, 100
# Generate synthetic speech-like signal
if HAS_NUMPY:
t = np.linspace(0, self.frame_ms/1000, self.frame_samples)
# Base frequency from zero crossings
freq = zero_crossings * 10 # Simplified mapping
# Generate harmonics
signal = np.zeros(self.frame_samples)
for harmonic in range(1, 4):
signal += np.sin(2 * np.pi * freq * harmonic * t) / harmonic
# Apply energy envelope
signal *= energy / 10000.0
# Convert to 16-bit PCM
return (signal * 32767).astype(np.int16)
else:
# Manual generation without numpy
samples = []
freq = zero_crossings * 10
for i in range(self.frame_samples):
t = i / 8000.0 # 8kHz sample rate
value = 0
for harmonic in range(1, 4):
value += math.sin(2 * math.pi * freq * harmonic * t) / harmonic
value *= energy / 10000.0
# Clamp to 16-bit range
sample = int(value * 32767)
sample = max(-32768, min(32767, sample))
samples.append(sample)
return array.array('h', samples)
class FSKModem:
"""
4-FSK modem for transmitting digital data over voice channels.
Designed to survive GSM/AMR/EVS vocoders.
"""
def __init__(self, sample_rate: int = 8000, baud_rate: int = 600):
"""
Initialize FSK modem.
Args:
sample_rate: Audio sample rate (Hz)
baud_rate: Symbol rate (baud)
"""
self.sample_rate = sample_rate
self.baud_rate = baud_rate
self.samples_per_symbol = int(sample_rate / baud_rate)
# 4-FSK frequencies (300-3400 Hz band)
self.frequencies = [
600, # 00
1200, # 01
1800, # 10
2400 # 11
]
# Preamble for synchronization (800 Hz, 100ms)
self.preamble_freq = 800
self.preamble_duration = 0.1 # seconds
# Quiet initialization - no print
def modulate(self, data: bytes, add_preamble: bool = True):
"""
Modulate binary data to FSK audio signal.
Args:
data: Binary data to modulate
add_preamble: Whether to add synchronization preamble
Returns:
Audio signal (normalized float32 array or list)
"""
# Convert bytes to dibits (2-bit symbols)
symbols = []
for byte in data:
symbols.extend([
(byte >> 6) & 0x03,
(byte >> 4) & 0x03,
(byte >> 2) & 0x03,
byte & 0x03
])
# Generate audio signal
signal = []
# Add preamble
if add_preamble:
preamble_samples = int(self.preamble_duration * self.sample_rate)
if HAS_NUMPY:
t = np.arange(preamble_samples) / self.sample_rate
preamble = np.sin(2 * np.pi * self.preamble_freq * t)
signal.extend(preamble)
else:
for i in range(preamble_samples):
t = i / self.sample_rate
value = math.sin(2 * math.pi * self.preamble_freq * t)
signal.append(value)
# Modulate symbols
for symbol in symbols:
freq = self.frequencies[symbol]
if HAS_NUMPY:
t = np.arange(self.samples_per_symbol) / self.sample_rate
tone = np.sin(2 * np.pi * freq * t)
signal.extend(tone)
else:
for i in range(self.samples_per_symbol):
t = i / self.sample_rate
value = math.sin(2 * math.pi * freq * t)
signal.append(value)
# Apply smoothing to reduce clicks
if HAS_NUMPY:
audio = np.array(signal, dtype=np.float32)
else:
audio = array.array('f', signal)
audio = self._apply_envelope(audio)
return audio
def demodulate(self, audio) -> Tuple[bytes, float]:
"""
Demodulate FSK audio signal to binary data.
Args:
audio: Audio signal
Returns:
Tuple of (demodulated data, confidence score)
"""
# Find preamble
preamble_start = self._find_preamble(audio)
if preamble_start < 0:
return b'', 0.0
# Skip preamble
data_start = preamble_start + int(self.preamble_duration * self.sample_rate)
# Demodulate symbols
symbols = []
confidence_scores = []
pos = data_start
while pos + self.samples_per_symbol <= len(audio):
symbol_audio = audio[pos:pos + self.samples_per_symbol]
symbol, confidence = self._demodulate_symbol(symbol_audio)
symbols.append(symbol)
confidence_scores.append(confidence)
pos += self.samples_per_symbol
# Convert symbols to bytes
data = bytearray()
for i in range(0, len(symbols), 4):
if i + 3 < len(symbols):
byte = (symbols[i] << 6) | (symbols[i+1] << 4) | (symbols[i+2] << 2) | symbols[i+3]
data.append(byte)
if HAS_NUMPY and confidence_scores:
avg_confidence = np.mean(confidence_scores)
else:
avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0
return bytes(data), avg_confidence
def _find_preamble(self, audio) -> int:
"""Find preamble in audio signal."""
# Simple energy-based detection
window_size = int(0.01 * self.sample_rate) # 10ms window
if HAS_NUMPY:
for i in range(0, len(audio) - window_size, window_size // 2):
window = audio[i:i + window_size]
# Check for preamble frequency
fft = np.fft.fft(window)
freqs = np.fft.fftfreq(len(window), 1/self.sample_rate)
# Find peak near preamble frequency
idx = np.argmax(np.abs(fft[:len(fft)//2]))
peak_freq = abs(freqs[idx])
if abs(peak_freq - self.preamble_freq) < 50: # 50 Hz tolerance
return i
else:
# Simple zero-crossing based detection without FFT
for i in range(0, len(audio) - window_size, window_size // 2):
window = list(audio[i:i + window_size])
# Count zero crossings
zero_crossings = 0
for j in range(1, len(window)):
if (window[j-1] >= 0) != (window[j] >= 0):
zero_crossings += 1
# Estimate frequency from zero crossings
estimated_freq = (zero_crossings * self.sample_rate) / (2 * len(window))
if abs(estimated_freq - self.preamble_freq) < 100: # 100 Hz tolerance
return i
return -1
def _demodulate_symbol(self, audio) -> Tuple[int, float]:
"""Demodulate a single FSK symbol."""
if HAS_NUMPY:
# FFT-based demodulation
fft = np.fft.fft(audio)
freqs = np.fft.fftfreq(len(audio), 1/self.sample_rate)
magnitude = np.abs(fft[:len(fft)//2])
# Find energy at each FSK frequency
energies = []
for freq in self.frequencies:
idx = np.argmin(np.abs(freqs[:len(freqs)//2] - freq))
energy = magnitude[idx]
energies.append(energy)
# Select symbol with highest energy
symbol = np.argmax(energies)
else:
# Goertzel algorithm for specific frequency detection
audio_list = list(audio) if hasattr(audio, '__iter__') else audio
energies = []
for freq in self.frequencies:
# Goertzel algorithm
omega = 2 * math.pi * freq / self.sample_rate
coeff = 2 * math.cos(omega)
s_prev = 0
s_prev2 = 0
for sample in audio_list:
s = sample + coeff * s_prev - s_prev2
s_prev2 = s_prev
s_prev = s
# Calculate magnitude
power = s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2
energies.append(math.sqrt(abs(power)))
# Select symbol with highest energy
symbol = energies.index(max(energies))
# Confidence is ratio of strongest to second strongest
sorted_energies = sorted(energies, reverse=True)
confidence = sorted_energies[0] / (sorted_energies[1] + 1e-6)
return symbol, min(confidence, 10.0) / 10.0
def _apply_envelope(self, audio):
"""Apply smoothing envelope to reduce clicks."""
# Simple raised cosine envelope
ramp_samples = int(0.002 * self.sample_rate) # 2ms ramps
if len(audio) > 2 * ramp_samples:
if HAS_NUMPY:
# Fade in
t = np.linspace(0, np.pi/2, ramp_samples)
audio[:ramp_samples] *= np.sin(t) ** 2
# Fade out
audio[-ramp_samples:] *= np.sin(t[::-1]) ** 2
else:
# Manual fade in
for i in range(ramp_samples):
t = (i / ramp_samples) * (math.pi / 2)
factor = math.sin(t) ** 2
audio[i] *= factor
# Manual fade out
for i in range(ramp_samples):
t = ((ramp_samples - 1 - i) / ramp_samples) * (math.pi / 2)
factor = math.sin(t) ** 2
audio[-(i+1)] *= factor
return audio
class VoiceProtocol:
"""
Integrates voice codec and modem with the Icing protocol
for encrypted voice transmission over GSM.
"""
def __init__(self, protocol_instance):
"""
Initialize voice protocol handler.
Args:
protocol_instance: IcingProtocol instance
"""
self.protocol = protocol_instance
self.codec = Codec2Wrapper(Codec2Mode.MODE_1200)
self.modem = FSKModem(sample_rate=8000, baud_rate=600)
# Voice crypto state
self.voice_iv_counter = 0
self.voice_sequence = 0
# Buffers
if HAS_NUMPY:
self.audio_buffer = np.array([], dtype=np.int16)
else:
self.audio_buffer = array.array('h') # 16-bit signed integers
self.frame_buffer = []
print(f"{GREEN}[VOICE]{RESET} Voice protocol initialized")
def process_voice_input(self, audio_samples):
"""
Process voice input: compress, encrypt, and modulate.
Args:
audio_samples: PCM audio samples (8kHz, 16-bit)
Returns:
Modulated audio signal ready for transmission (numpy array or array.array)
"""
# Add to buffer
if HAS_NUMPY:
self.audio_buffer = np.concatenate([self.audio_buffer, audio_samples])
else:
self.audio_buffer.extend(audio_samples)
# Process complete frames
modulated_audio = []
while len(self.audio_buffer) >= self.codec.frame_samples:
# Extract frame
if HAS_NUMPY:
frame_audio = self.audio_buffer[:self.codec.frame_samples]
self.audio_buffer = self.audio_buffer[self.codec.frame_samples:]
else:
frame_audio = array.array('h', self.audio_buffer[:self.codec.frame_samples])
del self.audio_buffer[:self.codec.frame_samples]
# Compress with Codec2
compressed_frame = self.codec.encode(frame_audio)
if not compressed_frame:
continue
# Encrypt frame
encrypted = self._encrypt_voice_frame(compressed_frame)
# Add FEC
protected = self._add_fec(encrypted)
# Modulate to audio
audio_signal = self.modem.modulate(protected, add_preamble=True)
modulated_audio.append(audio_signal)
if modulated_audio:
if HAS_NUMPY:
return np.concatenate(modulated_audio)
else:
# Concatenate array.array objects
result = array.array('f')
for audio in modulated_audio:
result.extend(audio)
return result
return None
def process_voice_output(self, modulated_audio):
"""
Process received audio: demodulate, decrypt, and decompress.
Args:
modulated_audio: Received FSK-modulated audio
Returns:
Decoded PCM audio samples (numpy array or array.array)
"""
# Demodulate
data, confidence = self.modem.demodulate(modulated_audio)
if confidence < 0.5:
print(f"{YELLOW}[VOICE]{RESET} Low demodulation confidence: {confidence:.2f}")
return None
# Remove FEC
frame_data = self._remove_fec(data)
if not frame_data:
return None
# Decrypt
compressed_frame = self._decrypt_voice_frame(frame_data)
if not compressed_frame:
return None
# Decompress
audio_samples = self.codec.decode(compressed_frame)
return audio_samples
def _encrypt_voice_frame(self, frame: Codec2Frame) -> bytes:
"""Encrypt a voice frame using ChaCha20-CTR."""
if not self.protocol.hkdf_key:
raise ValueError("No encryption key available")
# Prepare frame data
frame_data = struct.pack('<BIH',
frame.mode,
frame.frame_number,
len(frame.bits)
) + frame.bits
# Generate IV for this frame (ChaCha20 needs 16 bytes)
iv = struct.pack('<Q', self.voice_iv_counter) + b'\x00' * 8 # 8 + 8 = 16 bytes
self.voice_iv_counter += 1
# Encrypt using ChaCha20
from encryption import chacha20_encrypt
key = bytes.fromhex(self.protocol.hkdf_key)
encrypted = chacha20_encrypt(frame_data, key, iv)
# Add sequence number and IV hint
return struct.pack('<HQ', self.voice_sequence, self.voice_iv_counter) + encrypted
def _decrypt_voice_frame(self, data: bytes) -> Optional[Codec2Frame]:
"""Decrypt a voice frame."""
if len(data) < 10:
return None
# Extract sequence and IV hint
sequence, iv_hint = struct.unpack('<HQ', data[:10])
encrypted = data[10:]
# Generate IV (16 bytes for ChaCha20)
iv = struct.pack('<Q', iv_hint) + b'\x00' * 8
# Decrypt
from encryption import chacha20_decrypt
key = bytes.fromhex(self.protocol.hkdf_key)
try:
decrypted = chacha20_decrypt(encrypted, key, iv)
# Parse frame
mode, frame_num, bits_len = struct.unpack('<BIH', decrypted[:7])
bits = decrypted[7:7+bits_len]
return Codec2Frame(
mode=Codec2Mode(mode),
bits=bits,
timestamp=0, # Will be set by caller
frame_number=frame_num
)
except Exception as e:
print(f"{RED}[VOICE]{RESET} Decryption failed: {e}")
return None
def _add_fec(self, data: bytes) -> bytes:
"""Add forward error correction."""
# Simple repetition code (3x) for testing
# In production: use convolutional code or LDPC
fec_data = bytearray()
for byte in data:
# Repeat each byte 3 times
fec_data.extend([byte, byte, byte])
return bytes(fec_data)
def _remove_fec(self, data: bytes) -> Optional[bytes]:
"""Remove FEC and correct errors."""
if len(data) % 3 != 0:
return None
corrected = bytearray()
for i in range(0, len(data), 3):
# Majority voting
votes = [data[i], data[i+1], data[i+2]]
byte_value = max(set(votes), key=votes.count)
corrected.append(byte_value)
return bytes(corrected)
# Example usage
if __name__ == "__main__":
# Test Codec2 wrapper
print(f"\n{BLUE}=== Testing Codec2 Wrapper ==={RESET}")
codec = Codec2Wrapper(Codec2Mode.MODE_1200)
# Generate test audio
if HAS_NUMPY:
t = np.linspace(0, 0.04, 320) # 40ms at 8kHz
test_audio = (np.sin(2 * np.pi * 440 * t) * 16384).astype(np.int16)
else:
test_audio = array.array('h')
for i in range(320):
t = i * 0.04 / 320
value = int(math.sin(2 * math.pi * 440 * t) * 16384)
test_audio.append(value)
# Encode
frame = codec.encode(test_audio)
print(f"Encoded frame: {len(frame.bits)} bytes")
# Decode
decoded = codec.decode(frame)
print(f"Decoded audio: {len(decoded)} samples")
# Test FSK modem
print(f"\n{BLUE}=== Testing FSK Modem ==={RESET}")
modem = FSKModem()
# Test data
test_data = b"Hello, secure voice!"
# Modulate
modulated = modem.modulate(test_data)
print(f"Modulated: {len(modulated)} samples ({len(modulated)/8000:.2f}s)")
# Demodulate
demodulated, confidence = modem.demodulate(modulated)
print(f"Demodulated: {demodulated}")
print(f"Confidence: {confidence:.2%}")
print(f"Match: {demodulated == test_data}")

Binary file not shown.

Binary file not shown.

Binary file not shown.