From f8a7aa014753ee9988faa5f7dce46297f75cd1e4 Mon Sep 17 00:00:00 2001 From: STCB <21@stcb.cc> Date: Mon, 7 Jul 2025 00:37:55 +0200 Subject: [PATCH] Reverse adding Protocol_Alpha_0 --- .../Protocol_Alpha_0/IcingProtocol.drawio | 566 --------- .../Protocol_Alpha_0/VOICE_PROTOCOL_README.md | 119 -- .../Prototype/Protocol_Alpha_0/auto_mode.py | 430 ------- .../Prototype/Protocol_Alpha_0/cli.py | 328 ----- .../Protocol_Alpha_0/crypto_utils.py | 165 --- .../Prototype/Protocol_Alpha_0/encryption.py | 307 ----- .../Prototype/Protocol_Alpha_0/messages.py | 463 ------- .../Prototype/Protocol_Alpha_0/protocol.py | 1069 ----------------- .../Protocol_Alpha_0/transmission.py | 100 -- .../Prototype/Protocol_Alpha_0/voice_codec.py | 716 ----------- 10 files changed, 4263 deletions(-) delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/IcingProtocol.drawio delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/VOICE_PROTOCOL_README.md delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/auto_mode.py delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/cli.py delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/crypto_utils.py delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/encryption.py delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/messages.py delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/protocol.py delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/transmission.py delete mode 100644 protocol_prototype/Prototype/Protocol_Alpha_0/voice_codec.py diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/IcingProtocol.drawio b/protocol_prototype/Prototype/Protocol_Alpha_0/IcingProtocol.drawio deleted file mode 100644 index 8f46988..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/IcingProtocol.drawio +++ /dev/null @@ -1,566 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/VOICE_PROTOCOL_README.md b/protocol_prototype/Prototype/Protocol_Alpha_0/VOICE_PROTOCOL_README.md deleted file mode 100644 index 885e798..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/VOICE_PROTOCOL_README.md +++ /dev/null @@ -1,119 +0,0 @@ -# Voice-over-GSM Protocol Implementation - -This implementation provides encrypted voice communication over standard GSM voice channels without requiring CSD/HSCSD. - -## Architecture - -### 1. Voice Codec (`voice_codec.py`) -- **Codec2Wrapper**: Simulates Codec2 compression - - Supports multiple bitrates (700-3200 bps) - - Default: 1200 bps for GSM robustness - - 40ms frames (48 bits/frame at 1200 bps) - -- **FSKModem**: 4-FSK modulation for voice channels - - Frequency band: 300-3400 Hz (GSM compatible) - - Symbol rate: 600 baud - - 4 frequencies: 600, 1200, 1800, 2400 Hz - - Preamble: 800 Hz for 100ms - -- **VoiceProtocol**: Integration layer - - Manages codec and modem - - Handles encryption with ChaCha20-CTR - - Frame-based processing - -### 2. Protocol Messages (`messages.py`) -- **VoiceStart** (20 bytes): Initiates voice call - - Version, codec mode, FEC type - - Session ID (64 bits) - - Initial sequence number - -- **VoiceAck** (16 bytes): Accepts/rejects call - - Status (accept/reject) - - Negotiated codec and FEC - -- **VoiceEnd** (12 bytes): Terminates call - - Session ID for confirmation - -- **VoiceSync** (20 bytes): Synchronization - - Sequence number and timestamp - - For jitter buffer management - -### 3. Encryption (`encryption.py`) -- **ChaCha20-CTR**: Stream cipher for voice - - No authentication overhead (HMAC per second) - - 12-byte nonce with frame counter - - Uses HKDF-derived key from main protocol - -### 4. Protocol Integration (`protocol.py`) -- Voice session management -- Message handlers for all voice messages -- Methods: - - `start_voice_call()`: Initiate call - - `accept_voice_call()`: Accept incoming - - `end_voice_call()`: Terminate - - `send_voice_audio()`: Process audio - -## Usage Example - -```python -# After key exchange is complete -alice.start_voice_call(codec_mode=5, fec_type=0) - -# Bob automatically accepts if in auto mode -# Or manually: bob.accept_voice_call(session_id, codec_mode, fec_type) - -# Send audio -audio_samples = generate_audio() # 8kHz, 16-bit PCM -alice.send_voice_audio(audio_samples) - -# End call -alice.end_voice_call() -``` - -## Key Features - -1. **Codec2 @ 1200 bps** - - Optimal for GSM vocoder survival - - Intelligible but "robotic" quality - -2. **4-FSK Modulation** - - Survives GSM/AMR/EVS vocoders - - 2400 baud with FEC - -3. **ChaCha20-CTR Encryption** - - Low latency stream cipher - - Frame-based IV management - -4. **Forward Error Correction** - - Repetition code (3x) - - Future: Convolutional or LDPC - -5. **No Special Requirements** - - Works over standard voice calls - - Compatible with any phone - - Software-only solution - -## Testing - -Run the test scripts: -- `test_voice_simple.py`: Basic voice call setup -- `test_voice_protocol.py`: Full test with audio simulation (requires numpy) - -## Implementation Notes - -1. Message disambiguation: VoiceStart sets high bit in flags field to distinguish from VoiceSync (both 20 bytes) - -2. The actual Codec2 library would need to be integrated for production use - -3. FEC implementation is simplified (repetition code) - production would use convolutional codes - -4. Audio I/O integration needed for real voice calls - -5. Jitter buffer and timing recovery needed for production - -## Security Considerations - -- Voice frames use ChaCha20-CTR without per-frame authentication -- HMAC computed over 1-second blocks for efficiency -- Session binding through encrypted session ID -- PFS maintained through main protocol key rotation \ No newline at end of file diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/auto_mode.py b/protocol_prototype/Prototype/Protocol_Alpha_0/auto_mode.py deleted file mode 100644 index 690ba9c..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/auto_mode.py +++ /dev/null @@ -1,430 +0,0 @@ -import time -import threading -import queue -from typing import Optional, Dict, Any, List, Callable, Tuple - -# ANSI colors for logging -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -RESET = "\033[0m" - -class AutoModeConfig: - """Configuration parameters for the automatic mode behavior.""" - def __init__(self): - # Ping behavior - self.ping_response_accept = True # Whether to accept incoming pings - self.ping_auto_initiate = False # Whether to initiate pings when connected - self.ping_retry_count = 3 # Number of ping retries - self.ping_retry_delay = 5.0 # Seconds between ping retries - self.ping_timeout = 10.0 # Seconds to wait for ping response - self.preferred_cipher = 0 # 0=AES-GCM, 1=ChaCha20-Poly1305 - - # Handshake behavior - self.handshake_retry_count = 3 # Number of handshake retries - self.handshake_retry_delay = 5.0 # Seconds between handshake retries - self.handshake_timeout = 10.0 # Seconds to wait for handshake - - # Messaging behavior - self.auto_message_enabled = False # Whether to auto-send messages - self.message_interval = 10.0 # Seconds between auto messages - self.message_content = "Hello, secure world!" # Default message - - # General behavior - self.active_mode = False # If true, initiates protocol instead of waiting - - -class AutoMode: - """ - Manages automated behavior for the Icing protocol. - Handles automatic progression through the protocol stages: - 1. Connection setup - 2. Ping/discovery - 3. Key exchange - 4. Encrypted communication - """ - - def __init__(self, protocol_interface): - """ - Initialize the AutoMode manager. - - Args: - protocol_interface: An object implementing the required protocol methods - """ - self.protocol = protocol_interface - self.config = AutoModeConfig() - self.active = False - self.state = "idle" - - # Message queue for automated sending - self.message_queue = queue.Queue() - - # Tracking variables - self.ping_attempts = 0 - self.handshake_attempts = 0 - self.last_action_time = 0 - self.timer_tasks = [] # List of active timer tasks (for cleanup) - - def start(self): - """Start the automatic mode.""" - if self.active: - return - - self.active = True - self.state = "idle" - self.ping_attempts = 0 - self.handshake_attempts = 0 - self.last_action_time = time.time() - - self._log_info("Automatic mode started") - - # Start in active mode if configured - if self.config.active_mode and self.protocol.connections: - self._start_ping_sequence() - - def stop(self): - """Stop the automatic mode and clean up any pending tasks.""" - if not self.active: - return - - # Cancel any pending timers - for timer in self.timer_tasks: - if timer.is_alive(): - timer.cancel() - self.timer_tasks = [] - - self.active = False - self.state = "idle" - self._log_info("Automatic mode stopped") - - def handle_connection_established(self): - """Called when a new connection is established.""" - if not self.active: - return - - self._log_info("Connection established") - - # If in active mode, start pinging - if self.config.active_mode: - self._start_ping_sequence() - - def handle_ping_received(self, index: int): - """ - Handle a received ping request. - - Args: - index: Index of the ping request in the protocol's inbound message queue - """ - if not self.active or not self._is_valid_message_index(index): - return - - self._log_info(f"Ping request received (index={index})") - - # Automatically respond to ping if configured to accept - if self.config.ping_response_accept: - self._log_info(f"Auto-responding to ping with accept={self.config.ping_response_accept}") - try: - # Schedule the response with a small delay to simulate real behavior - timer = threading.Timer(0.5, self._respond_to_ping, args=[index]) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - except Exception as e: - self._log_error(f"Failed to auto-respond to ping: {e}") - - def handle_ping_response_received(self, accepted: bool): - """ - Handle a received ping response. - - Args: - accepted: Whether the ping was accepted - """ - if not self.active: - return - - self.ping_attempts = 0 # Reset ping attempts counter - - if accepted: - self._log_info("Ping accepted! Proceeding with handshake") - # Send handshake if not already done - if self.state != "handshake_sent": - self._ensure_ephemeral_keys() - self._start_handshake_sequence() - else: - self._log_info("Ping rejected by peer. Stopping auto-protocol sequence.") - self.state = "idle" - - def handle_handshake_received(self, index: int): - """ - Handle a received handshake. - - Args: - index: Index of the handshake in the protocol's inbound message queue - """ - if not self.active or not self._is_valid_message_index(index): - return - - self._log_info(f"Handshake received (index={index})") - - try: - # Ensure we have ephemeral keys - self._ensure_ephemeral_keys() - - # Process the handshake (compute ECDH) - self.protocol.generate_ecdhe(index) - - # Derive HKDF key - self.protocol.derive_hkdf() - - # If we haven't sent our handshake yet, send it - if self.state != "handshake_sent": - timer = threading.Timer(0.5, self.protocol.send_handshake) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - self.state = "handshake_sent" - else: - self.state = "key_exchange_complete" - - # Start sending queued messages if auto messaging is enabled - if self.config.auto_message_enabled: - self._start_message_sequence() - - except Exception as e: - self._log_error(f"Failed to process handshake: {e}") - - def handle_encrypted_received(self, index: int): - """ - Handle a received encrypted message. - - Args: - index: Index of the encrypted message in the protocol's inbound message queue - """ - if not self.active or not self._is_valid_message_index(index): - return - - # Try to decrypt automatically - try: - plaintext = self.protocol.decrypt_received_message(index) - self._log_info(f"Auto-decrypted message: {plaintext}") - except Exception as e: - self._log_error(f"Failed to auto-decrypt message: {e}") - - def queue_message(self, message: str): - """ - Add a message to the auto-send queue. - - Args: - message: Message text to send - """ - self.message_queue.put(message) - self._log_info(f"Message queued for sending: {message}") - - # If we're in the right state, start sending messages - if self.active and self.state == "key_exchange_complete" and self.config.auto_message_enabled: - self._process_message_queue() - - def _start_ping_sequence(self): - """Start the ping sequence to discover the peer.""" - if self.ping_attempts >= self.config.ping_retry_count: - self._log_warning(f"Maximum ping attempts ({self.config.ping_retry_count}) reached") - self.state = "idle" - return - - self.state = "pinging" - self.ping_attempts += 1 - - self._log_info(f"Sending ping request (attempt {self.ping_attempts}/{self.config.ping_retry_count})") - try: - self.protocol.send_ping_request(self.config.preferred_cipher) - self.last_action_time = time.time() - - # Schedule next ping attempt if needed - timer = threading.Timer( - self.config.ping_retry_delay, - self._check_ping_response - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - - except Exception as e: - self._log_error(f"Failed to send ping: {e}") - - def _check_ping_response(self): - """Check if we got a ping response, retry if not.""" - if not self.active or self.state != "pinging": - return - - # If we've waited long enough for a response, retry - if time.time() - self.last_action_time >= self.config.ping_timeout: - self._log_warning("No ping response received, retrying") - self._start_ping_sequence() - - def _respond_to_ping(self, index: int): - """ - Respond to a ping request. - - Args: - index: Index of the ping request in the inbound messages - """ - if not self.active or not self._is_valid_message_index(index): - return - - try: - answer = 1 if self.config.ping_response_accept else 0 - self.protocol.respond_to_ping(index, answer) - - if answer == 1: - # If we accepted, we should expect a handshake - self.state = "accepted_ping" - self._ensure_ephemeral_keys() - - # Set a timer to send our handshake if we don't receive one - timer = threading.Timer( - self.config.handshake_timeout, - self._check_handshake_received - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - self.last_action_time = time.time() - - except Exception as e: - self._log_error(f"Failed to respond to ping: {e}") - - def _check_handshake_received(self): - """Check if we've received a handshake after accepting a ping.""" - if not self.active or self.state != "accepted_ping": - return - - # If we've waited long enough and haven't received a handshake, initiate one - if time.time() - self.last_action_time >= self.config.handshake_timeout: - self._log_warning("No handshake received after accepting ping, initiating handshake") - self._start_handshake_sequence() - - def _start_handshake_sequence(self): - """Start the handshake sequence.""" - if self.handshake_attempts >= self.config.handshake_retry_count: - self._log_warning(f"Maximum handshake attempts ({self.config.handshake_retry_count}) reached") - self.state = "idle" - return - - self.state = "handshake_sent" - self.handshake_attempts += 1 - - self._log_info(f"Sending handshake (attempt {self.handshake_attempts}/{self.config.handshake_retry_count})") - try: - self.protocol.send_handshake() - self.last_action_time = time.time() - - # Schedule handshake retry check - timer = threading.Timer( - self.config.handshake_retry_delay, - self._check_handshake_response - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - - except Exception as e: - self._log_error(f"Failed to send handshake: {e}") - - def _check_handshake_response(self): - """Check if we've completed the key exchange, retry handshake if not.""" - if not self.active or self.state != "handshake_sent": - return - - # If we've waited long enough for a response, retry - if time.time() - self.last_action_time >= self.config.handshake_timeout: - self._log_warning("No handshake response received, retrying") - self._start_handshake_sequence() - - def _start_message_sequence(self): - """Start the automated message sending sequence.""" - if not self.config.auto_message_enabled: - return - - self._log_info("Starting automated message sequence") - - # Add the default message if queue is empty - if self.message_queue.empty(): - self.message_queue.put(self.config.message_content) - - # Start processing the queue - self._process_message_queue() - - def _process_message_queue(self): - """Process messages in the queue and send them.""" - if not self.active or self.state != "key_exchange_complete" or not self.config.auto_message_enabled: - return - - if not self.message_queue.empty(): - message = self.message_queue.get() - self._log_info(f"Sending queued message: {message}") - - try: - self.protocol.send_encrypted_message(message) - - # Schedule next message send - timer = threading.Timer( - self.config.message_interval, - self._process_message_queue - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - - except Exception as e: - self._log_error(f"Failed to send queued message: {e}") - # Put the message back in the queue - self.message_queue.put(message) - - def _ensure_ephemeral_keys(self): - """Ensure ephemeral keys are generated if needed.""" - if not hasattr(self.protocol, 'ephemeral_pubkey') or self.protocol.ephemeral_pubkey is None: - self._log_info("Generating ephemeral keys") - self.protocol.generate_ephemeral_keys() - - def _is_valid_message_index(self, index: int) -> bool: - """ - Check if a message index is valid in the protocol's inbound_messages queue. - - Args: - index: The index to check - - Returns: - bool: True if the index is valid, False otherwise - """ - if not hasattr(self.protocol, 'inbound_messages'): - self._log_error("Protocol has no inbound_messages attribute") - return False - - if index < 0 or index >= len(self.protocol.inbound_messages): - self._log_error(f"Invalid message index: {index}") - return False - - return True - - # Helper methods for logging - def _log_info(self, message: str): - print(f"{BLUE}[AUTO]{RESET} {message}") - if hasattr(self, 'verbose_logging') and self.verbose_logging: - state_info = f"(state={self.state})" - if 'pinging' in self.state and hasattr(self, 'ping_attempts'): - state_info += f", attempts={self.ping_attempts}/{self.config.ping_retry_count}" - elif 'handshake' in self.state and hasattr(self, 'handshake_attempts'): - state_info += f", attempts={self.handshake_attempts}/{self.config.handshake_retry_count}" - print(f"{BLUE}[AUTO-DETAIL]{RESET} {state_info}") - - def _log_warning(self, message: str): - print(f"{YELLOW}[AUTO-WARN]{RESET} {message}") - if hasattr(self, 'verbose_logging') and self.verbose_logging: - timer_info = f"Active timers: {len(self.timer_tasks)}" - print(f"{YELLOW}[AUTO-WARN-DETAIL]{RESET} {timer_info}") - - def _log_error(self, message: str): - print(f"{RED}[AUTO-ERROR]{RESET} {message}") - if hasattr(self, 'verbose_logging') and self.verbose_logging: - print(f"{RED}[AUTO-ERROR-DETAIL]{RESET} Current state: {self.state}, Active: {self.active}") \ No newline at end of file diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/cli.py b/protocol_prototype/Prototype/Protocol_Alpha_0/cli.py deleted file mode 100644 index 53c3f01..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/cli.py +++ /dev/null @@ -1,328 +0,0 @@ -import sys -import argparse -import shlex -from protocol import IcingProtocol - -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -MAGENTA = "\033[95m" -CYAN = "\033[96m" -RESET = "\033[0m" - -def print_help(): - """Display all available commands.""" - print(f"\n{YELLOW}=== Available Commands ==={RESET}") - print(f"\n{CYAN}Basic Protocol Commands:{RESET}") - print(" help - Show this help message") - print(" peer_id - Set peer identity public key") - print(" connect - Connect to a peer at the specified port") - print(" show_state - Display current protocol state") - print(" exit - Exit the program") - - print(f"\n{CYAN}Manual Protocol Operation:{RESET}") - print(" generate_ephemeral_keys - Generate ephemeral ECDH keys") - print(" send_ping [cipher] - Send PING request (cipher: 0=AES-GCM, 1=ChaCha20-Poly1305, default: 0)") - print(" respond_ping <0|1> - Respond to a PING (0=reject, 1=accept)") - print(" send_handshake - Send handshake with ephemeral keys") - print(" generate_ecdhe - Process handshake at specified index") - print(" derive_hkdf - Derive encryption key using HKDF") - print(" send_encrypted - Encrypt and send a message") - print(" decrypt <index> - Decrypt received message at index") - - print(f"\n{CYAN}Automatic Mode Commands:{RESET}") - print(" auto start - Start automatic mode") - print(" auto stop - Stop automatic mode") - print(" auto status - Show current auto mode status and configuration") - print(" auto config <param> <value> - Configure auto mode parameters") - print(" auto config list - Show all configurable parameters") - print(" auto message <text> - Queue message for automatic sending") - print(" auto passive - Configure as passive peer (responds to pings but doesn't initiate)") - print(" auto active - Configure as active peer (initiates protocol)") - print(" auto log - Toggle detailed logging for auto mode") - - print(f"\n{CYAN}Debugging Commands:{RESET}") - print(" debug_message <index> - Display detailed information about a message in the queue") - - print(f"\n{CYAN}Legacy Commands:{RESET}") - print(" auto_responder <on|off> - Enable/disable legacy auto responder (deprecated)") - - -def main(): - protocol = IcingProtocol() - - print(f"{YELLOW}\n======================================") - print(" Icing Protocol - Secure Communication ") - print("======================================\n" + RESET) - print(f"Listening on port: {protocol.local_port}") - print(f"Your identity public key (hex): {protocol.identity_pubkey.hex()}") - print_help() - - while True: - try: - line = input(f"{MAGENTA}Cmd>{RESET} ").strip() - except EOFError: - break - if not line: - continue - - parts = shlex.split(line) # Handle quoted arguments properly - cmd = parts[0].lower() - - try: - # Basic commands - if cmd == "exit": - protocol.stop() - break - - elif cmd == "help": - print_help() - - elif cmd == "show_state": - protocol.show_state() - - elif cmd == "peer_id": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: peer_id <hex_pubkey>") - continue - try: - protocol.set_peer_identity(parts[1]) - except ValueError as e: - print(f"{RED}[ERROR]{RESET} Invalid public key: {e}") - - elif cmd == "connect": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: connect <port>") - continue - try: - port = int(parts[1]) - protocol.connect_to_peer(port) - except ValueError: - print(f"{RED}[ERROR]{RESET} Invalid port number.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Connection failed: {e}") - - # Manual protocol operation - elif cmd == "generate_ephemeral_keys": - protocol.generate_ephemeral_keys() - - elif cmd == "send_ping": - # Optional cipher parameter (0 = AES-GCM, 1 = ChaCha20-Poly1305) - cipher = 0 # Default to AES-GCM - if len(parts) >= 2: - try: - cipher = int(parts[1]) - if cipher not in (0, 1): - print(f"{YELLOW}[WARNING]{RESET} Unsupported cipher code {cipher}. Using AES-GCM (0).") - cipher = 0 - except ValueError: - print(f"{YELLOW}[WARNING]{RESET} Invalid cipher code. Using AES-GCM (0).") - protocol.send_ping_request(cipher) - - elif cmd == "send_handshake": - protocol.send_handshake() - - elif cmd == "respond_ping": - if len(parts) != 3: - print(f"{RED}[ERROR]{RESET} Usage: respond_ping <index> <0|1>") - continue - try: - idx = int(parts[1]) - answer = int(parts[2]) - if answer not in (0, 1): - print(f"{RED}[ERROR]{RESET} Answer must be 0 (reject) or 1 (accept).") - continue - protocol.respond_to_ping(idx, answer) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index and answer must be integers.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to respond to ping: {e}") - - elif cmd == "generate_ecdhe": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: generate_ecdhe <index>") - continue - try: - idx = int(parts[1]) - protocol.generate_ecdhe(idx) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index must be an integer.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to process handshake: {e}") - - elif cmd == "derive_hkdf": - try: - protocol.derive_hkdf() - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to derive HKDF key: {e}") - - elif cmd == "send_encrypted": - if len(parts) < 2: - print(f"{RED}[ERROR]{RESET} Usage: send_encrypted <plaintext>") - continue - plaintext = " ".join(parts[1:]) - try: - protocol.send_encrypted_message(plaintext) - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to send encrypted message: {e}") - - elif cmd == "decrypt": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: decrypt <index>") - continue - try: - idx = int(parts[1]) - protocol.decrypt_received_message(idx) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index must be an integer.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to decrypt message: {e}") - - # Debugging commands - elif cmd == "debug_message": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: debug_message <index>") - continue - try: - idx = int(parts[1]) - protocol.debug_message(idx) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index must be an integer.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to debug message: {e}") - - # Automatic mode commands - elif cmd == "auto": - if len(parts) < 2: - print(f"{RED}[ERROR]{RESET} Usage: auto <command> [options]") - print("Available commands: start, stop, status, config, message, passive, active") - continue - - subcmd = parts[1].lower() - - if subcmd == "start": - protocol.start_auto_mode() - print(f"{GREEN}[AUTO]{RESET} Automatic mode started") - - elif subcmd == "stop": - protocol.stop_auto_mode() - print(f"{GREEN}[AUTO]{RESET} Automatic mode stopped") - - elif subcmd == "status": - config = protocol.get_auto_mode_config() - print(f"{YELLOW}=== Auto Mode Status ==={RESET}") - print(f"Active: {protocol.auto_mode.active}") - print(f"State: {protocol.auto_mode.state}") - print(f"\n{YELLOW}--- Configuration ---{RESET}") - for key, value in vars(config).items(): - print(f" {key}: {value}") - - elif subcmd == "config": - if len(parts) < 3: - print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value> or auto config list") - continue - - if parts[2].lower() == "list": - config = protocol.get_auto_mode_config() - print(f"{YELLOW}=== Auto Mode Configuration Parameters ==={RESET}") - for key, value in vars(config).items(): - print(f" {key} ({type(value).__name__}): {value}") - continue - - if len(parts) != 4: - print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value>") - continue - - param = parts[2] - value_str = parts[3] - - # Convert the string value to the appropriate type - config = protocol.get_auto_mode_config() - if not hasattr(config, param): - print(f"{RED}[ERROR]{RESET} Unknown parameter: {param}") - print("Use 'auto config list' to see all available parameters") - continue - - current_value = getattr(config, param) - try: - if isinstance(current_value, bool): - if value_str.lower() in ("true", "yes", "on", "1"): - value = True - elif value_str.lower() in ("false", "no", "off", "0"): - value = False - else: - raise ValueError(f"Boolean value must be true/false/yes/no/on/off/1/0") - elif isinstance(current_value, int): - value = int(value_str) - elif isinstance(current_value, float): - value = float(value_str) - elif isinstance(current_value, str): - value = value_str - else: - value = value_str # Default to string - - protocol.configure_auto_mode(**{param: value}) - print(f"{GREEN}[AUTO]{RESET} Set {param} = {value}") - - except ValueError as e: - print(f"{RED}[ERROR]{RESET} Invalid value for {param}: {e}") - - elif subcmd == "message": - if len(parts) < 3: - print(f"{RED}[ERROR]{RESET} Usage: auto message <text>") - continue - - message = " ".join(parts[2:]) - protocol.queue_auto_message(message) - print(f"{GREEN}[AUTO]{RESET} Message queued for sending: {message}") - - elif subcmd == "passive": - # Configure as passive peer (responds but doesn't initiate) - protocol.configure_auto_mode( - ping_response_accept=True, - ping_auto_initiate=False, - active_mode=False - ) - print(f"{GREEN}[AUTO]{RESET} Configured as passive peer") - - elif subcmd == "active": - # Configure as active peer (initiates protocol) - protocol.configure_auto_mode( - ping_response_accept=True, - ping_auto_initiate=True, - active_mode=True - ) - print(f"{GREEN}[AUTO]{RESET} Configured as active peer") - - else: - print(f"{RED}[ERROR]{RESET} Unknown auto mode command: {subcmd}") - print("Available commands: start, stop, status, config, message, passive, active") - - # Legacy commands - elif cmd == "auto_responder": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: auto_responder <on|off>") - continue - val = parts[1].lower() - if val not in ("on", "off"): - print(f"{RED}[ERROR]{RESET} Value must be 'on' or 'off'.") - continue - protocol.enable_auto_responder(val == "on") - print(f"{YELLOW}[WARNING]{RESET} Using legacy auto responder. Consider using 'auto' commands instead.") - - else: - print(f"{RED}[ERROR]{RESET} Unknown command: {cmd}") - print("Type 'help' for a list of available commands.") - - except Exception as e: - print(f"{RED}[ERROR]{RESET} Command failed: {e}") - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("\nExiting...") - except Exception as e: - print(f"{RED}[FATAL ERROR]{RESET} {e}") - sys.exit(1) diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/crypto_utils.py b/protocol_prototype/Prototype/Protocol_Alpha_0/crypto_utils.py deleted file mode 100644 index 8c2e110..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/crypto_utils.py +++ /dev/null @@ -1,165 +0,0 @@ -import os -from typing import Tuple -from cryptography.exceptions import InvalidSignature -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import ec, utils -from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature - -def generate_identity_keys() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: - """ - Generate an ECDSA (P-256) identity key pair. - - Returns: - Tuple containing: - - private_key: EllipticCurvePrivateKey object - - public_key_bytes: Raw x||y format (64 bytes, 512 bits) - """ - private_key = ec.generate_private_key(ec.SECP256R1()) - public_numbers = private_key.public_key().public_numbers() - - x_bytes = public_numbers.x.to_bytes(32, byteorder='big') - y_bytes = public_numbers.y.to_bytes(32, byteorder='big') - pubkey_bytes = x_bytes + y_bytes # 64 bytes total - - return private_key, pubkey_bytes - - -def load_peer_identity_key(pubkey_bytes: bytes) -> ec.EllipticCurvePublicKey: - """ - Convert a raw public key (64 bytes, x||y format) to a cryptography public key object. - - Args: - pubkey_bytes: Raw 64-byte public key (x||y format) - - Returns: - EllipticCurvePublicKey object - - Raises: - ValueError: If the pubkey_bytes is not exactly 64 bytes - """ - if len(pubkey_bytes) != 64: - raise ValueError("Peer identity pubkey must be exactly 64 bytes (x||y).") - - x_int = int.from_bytes(pubkey_bytes[:32], byteorder='big') - y_int = int.from_bytes(pubkey_bytes[32:], byteorder='big') - - public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) - return public_numbers.public_key() - - -def sign_data(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> bytes: - """ - Sign data with ECDSA using a P-256 private key. - - Args: - private_key: EllipticCurvePrivateKey for signing - data: Bytes to sign - - Returns: - DER-encoded signature (variable length, up to ~70-72 bytes) - """ - signature = private_key.sign(data, ec.ECDSA(hashes.SHA256())) - return signature - - -def verify_signature(public_key: ec.EllipticCurvePublicKey, signature: bytes, data: bytes) -> bool: - """ - Verify a DER-encoded ECDSA signature. - - Args: - public_key: EllipticCurvePublicKey for verification - signature: DER-encoded signature - data: Original signed data - - Returns: - True if signature is valid, False otherwise - """ - try: - public_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) - return True - except InvalidSignature: - return False - - -def get_ephemeral_keypair() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: - """ - Generate an ephemeral ECDH key pair (P-256). - - Returns: - Tuple containing: - - private_key: EllipticCurvePrivateKey object - - pubkey_bytes: Raw x||y format (64 bytes, 512 bits) - """ - private_key = ec.generate_private_key(ec.SECP256R1()) - numbers = private_key.public_key().public_numbers() - - x_bytes = numbers.x.to_bytes(32, 'big') - y_bytes = numbers.y.to_bytes(32, 'big') - - return private_key, x_bytes + y_bytes # 64 bytes total - - -def compute_ecdh_shared_key(private_key: ec.EllipticCurvePrivateKey, peer_pubkey_bytes: bytes) -> bytes: - """ - Compute a shared secret using ECDH. - - Args: - private_key: Local ECDH private key - peer_pubkey_bytes: Peer's ephemeral public key (64 bytes, raw x||y format) - - Returns: - Shared secret bytes - - Raises: - ValueError: If peer_pubkey_bytes is not 64 bytes - """ - if len(peer_pubkey_bytes) != 64: - raise ValueError("Peer public key must be 64 bytes (x||y format)") - - x_int = int.from_bytes(peer_pubkey_bytes[:32], 'big') - y_int = int.from_bytes(peer_pubkey_bytes[32:], 'big') - - # Create public key object from raw components - peer_public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) - peer_public_key = peer_public_numbers.public_key() - - # Perform key exchange - shared_key = private_key.exchange(ec.ECDH(), peer_public_key) - return shared_key - - -def der_to_raw(der_sig: bytes) -> bytes: - """ - Convert a DER-encoded ECDSA signature to a raw 64-byte signature (r||s). - - Args: - der_sig: DER-encoded signature - - Returns: - Raw 64-byte signature (r||s format), with each component padded to 32 bytes - """ - r, s = decode_dss_signature(der_sig) - r_bytes = r.to_bytes(32, byteorder='big') - s_bytes = s.to_bytes(32, byteorder='big') - return r_bytes + s_bytes - - -def raw_signature_to_der(raw_sig: bytes) -> bytes: - """ - Convert a raw signature (64 bytes, concatenated r||s) to DER-encoded signature. - - Args: - raw_sig: Raw 64-byte signature (r||s format) - - Returns: - DER-encoded signature - - Raises: - ValueError: If raw_sig is not 64 bytes - """ - if len(raw_sig) != 64: - raise ValueError("Raw signature must be 64 bytes (r||s).") - - r = int.from_bytes(raw_sig[:32], 'big') - s = int.from_bytes(raw_sig[32:], 'big') - return encode_dss_signature(r, s) diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/encryption.py b/protocol_prototype/Prototype/Protocol_Alpha_0/encryption.py deleted file mode 100644 index 87a6b32..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/encryption.py +++ /dev/null @@ -1,307 +0,0 @@ -import os -import struct -from typing import Optional, Tuple -from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305 - -class MessageHeader: - """ - Header of an encrypted message (18 bytes total): - - Clear Text Section (4 bytes): - - flag: 16 bits (0xBEEF by default) - - data_len: 16 bits (length of encrypted payload excluding tag) - - Associated Data (14 bytes): - - retry: 8 bits (retry counter) - - connection_status: 4 bits (e.g., CRC required) + 4 bits padding - - iv/messageID: 96 bits (12 bytes) - """ - def __init__(self, flag: int, data_len: int, retry: int, connection_status: int, iv: bytes): - if not (0 <= flag < 65536): - raise ValueError("Flag must fit in 16 bits (0..65535)") - if not (0 <= data_len < 65536): - raise ValueError("Data length must fit in 16 bits (0..65535)") - if not (0 <= retry < 256): - raise ValueError("Retry must fit in 8 bits (0..255)") - if not (0 <= connection_status < 16): - raise ValueError("Connection status must fit in 4 bits (0..15)") - if len(iv) != 12: - raise ValueError("IV must be 12 bytes (96 bits)") - - self.flag = flag # 16 bits - self.data_len = data_len # 16 bits - self.retry = retry # 8 bits - self.connection_status = connection_status # 4 bits - self.iv = iv # 96 bits (12 bytes) - - def pack(self) -> bytes: - """Pack header into 18 bytes.""" - # Pack flag and data_len (4 bytes) - header = struct.pack('>H H', self.flag, self.data_len) - - # Pack retry and connection_status (2 bytes) - # connection_status in high 4 bits of second byte, 4 bits padding as zero - ad_byte = (self.connection_status & 0x0F) << 4 - ad_packed = struct.pack('>B B', self.retry, ad_byte) - - # Append IV (12 bytes) - return header + ad_packed + self.iv - - def get_associated_data(self) -> bytes: - """Get the associated data for AEAD encryption (retry, conn_status, iv).""" - # Pack retry and connection_status - ad_byte = (self.connection_status & 0x0F) << 4 - ad_packed = struct.pack('>B B', self.retry, ad_byte) - - # Append IV - return ad_packed + self.iv - - @classmethod - def unpack(cls, data: bytes) -> 'MessageHeader': - """Unpack 18 bytes into a MessageHeader object.""" - if len(data) < 18: - raise ValueError(f"Header data too short: {len(data)} bytes, expected 18") - - flag, data_len = struct.unpack('>H H', data[:4]) - retry, ad_byte = struct.unpack('>B B', data[4:6]) - connection_status = (ad_byte >> 4) & 0x0F - iv = data[6:18] - - return cls(flag, data_len, retry, connection_status, iv) - -class EncryptedMessage: - """ - Encrypted message packet format: - - - Header (18 bytes): - * flag: 16 bits - * data_len: 16 bits - * retry: 8 bits - * connection_status: 4 bits (+ 4 bits padding) - * iv/messageID: 96 bits (12 bytes) - - - Payload: variable length encrypted data - - - Footer: - * Authentication tag: 128 bits (16 bytes) - * CRC32: 32 bits (4 bytes) - optional, based on connection_status - """ - def __init__(self, plaintext: bytes, key: bytes, flag: int = 0xBEEF, - retry: int = 0, connection_status: int = 0, iv: bytes = None, - cipher_type: int = 0): - self.plaintext = plaintext - self.key = key - self.flag = flag - self.retry = retry - self.connection_status = connection_status - self.iv = iv or generate_iv(initial=True) - self.cipher_type = cipher_type # 0 = AES-256-GCM, 1 = ChaCha20-Poly1305 - - # Will be set after encryption - self.ciphertext = None - self.tag = None - self.header = None - - def encrypt(self) -> bytes: - """Encrypt the plaintext and return the full encrypted message.""" - # Create header with correct data_len (which will be set after encryption) - self.header = MessageHeader( - flag=self.flag, - data_len=0, # Will be updated after encryption - retry=self.retry, - connection_status=self.connection_status, - iv=self.iv - ) - - # Get associated data for AEAD - aad = self.header.get_associated_data() - - # Encrypt using the appropriate cipher - if self.cipher_type == 0: # AES-256-GCM - cipher = AESGCM(self.key) - ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) - elif self.cipher_type == 1: # ChaCha20-Poly1305 - cipher = ChaCha20Poly1305(self.key) - ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) - else: - raise ValueError(f"Unsupported cipher type: {self.cipher_type}") - - # Extract ciphertext and tag - self.tag = ciphertext_with_tag[-16:] - self.ciphertext = ciphertext_with_tag[:-16] - - # Update header with actual data length - self.header.data_len = len(self.ciphertext) - - # Pack everything together - packed_header = self.header.pack() - - # Check if CRC is required (based on connection_status) - if self.connection_status & 0x01: # Lowest bit indicates CRC required - import zlib - # Compute CRC32 of header + ciphertext + tag - crc = zlib.crc32(packed_header + self.ciphertext + self.tag) & 0xffffffff - crc_bytes = struct.pack('>I', crc) - return packed_header + self.ciphertext + self.tag + crc_bytes - else: - return packed_header + self.ciphertext + self.tag - - @classmethod - def decrypt(cls, data: bytes, key: bytes, cipher_type: int = 0) -> Tuple[bytes, MessageHeader]: - """ - Decrypt an encrypted message and return the plaintext and header. - - Args: - data: The full encrypted message - key: The encryption key - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - Tuple of (plaintext, header) - """ - if len(data) < 18 + 16: # Header + minimum tag size - raise ValueError("Message too short") - - # Extract header - header_bytes = data[:18] - header = MessageHeader.unpack(header_bytes) - - # Get ciphertext and tag - data_len = header.data_len - ciphertext_start = 18 - ciphertext_end = ciphertext_start + data_len - - if ciphertext_end + 16 > len(data): - raise ValueError("Message length does not match header's data_len") - - ciphertext = data[ciphertext_start:ciphertext_end] - tag = data[ciphertext_end:ciphertext_end + 16] - - # Get associated data for AEAD - aad = header.get_associated_data() - - # Combine ciphertext and tag for decryption - ciphertext_with_tag = ciphertext + tag - - # Decrypt using the appropriate cipher - try: - if cipher_type == 0: # AES-256-GCM - cipher = AESGCM(key) - plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) - elif cipher_type == 1: # ChaCha20-Poly1305 - cipher = ChaCha20Poly1305(key) - plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) - else: - raise ValueError(f"Unsupported cipher type: {cipher_type}") - - return plaintext, header - except Exception as e: - raise ValueError(f"Decryption failed: {e}") - -def generate_iv(initial: bool = False, previous_iv: bytes = None) -> bytes: - """ - Generate a 96-bit IV (12 bytes). - - Args: - initial: If True, return a random IV - previous_iv: The previous IV to increment - - Returns: - A new IV - """ - if initial or previous_iv is None: - return os.urandom(12) # 96 bits - else: - # Increment the previous IV by 1 modulo 2^96 - iv_int = int.from_bytes(previous_iv, 'big') - iv_int = (iv_int + 1) % (1 << 96) - return iv_int.to_bytes(12, 'big') - -# Convenience functions to match original API -def encrypt_message(plaintext: bytes, key: bytes, flag: int = 0xBEEF, - retry: int = 0, connection_status: int = 0, - iv: bytes = None, cipher_type: int = 0) -> bytes: - """ - Encrypt a message using the specified parameters. - - Args: - plaintext: The data to encrypt - key: The encryption key (32 bytes for AES-256-GCM, 32 bytes for ChaCha20-Poly1305) - flag: 16-bit flag value (default: 0xBEEF) - retry: 8-bit retry counter - connection_status: 4-bit connection status - iv: Optional 96-bit IV (if None, a random one will be generated) - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - The full encrypted message - """ - message = EncryptedMessage( - plaintext=plaintext, - key=key, - flag=flag, - retry=retry, - connection_status=connection_status, - iv=iv, - cipher_type=cipher_type - ) - return message.encrypt() - -def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes: - """ - Decrypt a message. - - Args: - message: The full encrypted message - key: The encryption key - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - The decrypted plaintext - """ - plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type) - return plaintext - -# ChaCha20-CTR functions for voice streaming (without authentication) -def chacha20_encrypt(plaintext: bytes, key: bytes, nonce: bytes) -> bytes: - """ - Encrypt plaintext using ChaCha20 in CTR mode (no authentication). - - Args: - plaintext: Data to encrypt - key: 32-byte key - nonce: 16-byte nonce (for ChaCha20 in cryptography library) - - Returns: - Ciphertext - """ - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - from cryptography.hazmat.backends import default_backend - - if len(key) != 32: - raise ValueError("ChaCha20 key must be 32 bytes") - if len(nonce) != 16: - raise ValueError("ChaCha20 nonce must be 16 bytes") - - cipher = Cipher( - algorithms.ChaCha20(key, nonce), - mode=None, - backend=default_backend() - ) - encryptor = cipher.encryptor() - return encryptor.update(plaintext) + encryptor.finalize() - -def chacha20_decrypt(ciphertext: bytes, key: bytes, nonce: bytes) -> bytes: - """ - Decrypt ciphertext using ChaCha20 in CTR mode (no authentication). - - Args: - ciphertext: Data to decrypt - key: 32-byte key - nonce: 12-byte nonce - - Returns: - Plaintext - """ - # ChaCha20 is symmetrical - encryption and decryption are the same - return chacha20_encrypt(ciphertext, key, nonce) diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/messages.py b/protocol_prototype/Prototype/Protocol_Alpha_0/messages.py deleted file mode 100644 index 5521499..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/messages.py +++ /dev/null @@ -1,463 +0,0 @@ -import os -import struct -import time -import zlib -import hashlib -from typing import Tuple, Optional - -def crc32_of(data: bytes) -> int: - """ - Compute CRC-32 of 'data'. - """ - return zlib.crc32(data) & 0xffffffff - - -# --------------------------------------------------------------------------- -# PING REQUEST (new format) -# Fields (in order): -# - session_nonce: 129 bits (from the top 129 bits of 17 random bytes) -# - version: 7 bits -# - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305; for now only 0 is used) -# - CRC: 32 bits -# -# Total bits: 129 + 7 + 4 + 32 = 172 bits. We pack into 22 bytes (176 bits) with 4 spare bits. -# --------------------------------------------------------------------------- -class PingRequest: - """ - PING REQUEST format (172 bits / 22 bytes): - - session_nonce: 129 bits (from top 129 bits of 17 random bytes) - - version: 7 bits - - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305) - - CRC: 32 bits - """ - def __init__(self, version: int, cipher: int, session_nonce: bytes = None): - if not (0 <= version < 128): - raise ValueError("Version must fit in 7 bits (0..127)") - if not (0 <= cipher < 16): - raise ValueError("Cipher must fit in 4 bits (0..15)") - - self.version = version - self.cipher = cipher - - # Generate session nonce if not provided - if session_nonce is None: - # Generate 17 random bytes - nonce_full = os.urandom(17) - # Use top 129 bits - nonce_int_full = int.from_bytes(nonce_full, 'big') - nonce_129_int = nonce_int_full >> 7 # drop lowest 7 bits - self.session_nonce = nonce_129_int.to_bytes(17, 'big') - else: - if len(session_nonce) != 17: - raise ValueError("Session nonce must be 17 bytes (136 bits)") - self.session_nonce = session_nonce - - def serialize(self) -> bytes: - """Serialize the ping request into a 22-byte packet.""" - # Convert session_nonce to integer (129 bits) - nonce_int = int.from_bytes(self.session_nonce, 'big') - - # Pack fields: shift nonce left by 11 bits, add version and cipher - partial_int = (nonce_int << 11) | (self.version << 4) | (self.cipher & 0x0F) - # This creates 129+7+4 = 140 bits; pack into 18 bytes - partial_bytes = partial_int.to_bytes(18, 'big') - - # Compute CRC over these 18 bytes - cval = crc32_of(partial_bytes) - - # Combine partial data with 32-bit CRC - final_int = (int.from_bytes(partial_bytes, 'big') << 32) | cval - return final_int.to_bytes(22, 'big') - - @classmethod - def deserialize(cls, data: bytes) -> Optional['PingRequest']: - """Deserialize a 22-byte packet into a PingRequest object.""" - if len(data) != 22: - return None - - # Extract 176-bit integer - final_int = int.from_bytes(data, 'big') - - # Extract CRC and verify - crc_in = final_int & 0xffffffff - partial_int = final_int >> 32 # 140 bits - partial_bytes = partial_int.to_bytes(18, 'big') - crc_calc = crc32_of(partial_bytes) - - if crc_calc != crc_in: - return None - - # Extract fields - cipher = partial_int & 0x0F - version = (partial_int >> 4) & 0x7F - nonce_129_int = partial_int >> 11 # 129 bits - session_nonce = nonce_129_int.to_bytes(17, 'big') - - return cls(version, cipher, session_nonce) - - - -# --------------------------------------------------------------------------- -# PING RESPONSE (new format) -# Fields: -# - timestamp: 32 bits (we take the lower 32 bits of the time in ms) -# - version: 7 bits -# - cipher: 4 bits -# - answer: 1 bit -# - CRC: 32 bits -# -# Total bits: 32 + 7 + 4 + 1 + 32 = 76 bits; pack into 10 bytes (80 bits) with 4 spare bits. -# --------------------------------------------------------------------------- -class PingResponse: - """ - PING RESPONSE format (76 bits / 10 bytes): - - timestamp: 32 bits (milliseconds since epoch, lower 32 bits) - - version: 7 bits - - cipher: 4 bits - - answer: 1 bit (0 = no, 1 = yes) - - CRC: 32 bits - """ - def __init__(self, version: int, cipher: int, answer: int, timestamp: int = None): - if not (0 <= version < 128): - raise ValueError("Version must fit in 7 bits") - if not (0 <= cipher < 16): - raise ValueError("Cipher must fit in 4 bits") - if answer not in (0, 1): - raise ValueError("Answer must be 0 or 1") - - self.version = version - self.cipher = cipher - self.answer = answer - self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) - - def serialize(self) -> bytes: - """Serialize the ping response into a 10-byte packet.""" - # Pack timestamp, version, cipher, answer: 32+7+4+1 = 44 bits - # Shift left by 4 to put spare bits at the end - partial_val = (self.timestamp << (7+4+1)) | (self.version << (4+1)) | (self.cipher << 1) | self.answer - partial_val_shifted = partial_val << 4 # Add 4 spare bits at the end - partial_bytes = partial_val_shifted.to_bytes(6, 'big') # 6 bytes = 48 bits - - # Compute CRC - cval = crc32_of(partial_bytes) - - # Combine with CRC - final_val = (int.from_bytes(partial_bytes, 'big') << 32) | cval - return final_val.to_bytes(10, 'big') - - @classmethod - def deserialize(cls, data: bytes) -> Optional['PingResponse']: - """Deserialize a 10-byte packet into a PingResponse object.""" - if len(data) != 10: - return None - - # Extract 80-bit integer - final_int = int.from_bytes(data, 'big') - - # Extract CRC and verify - crc_in = final_int & 0xffffffff - partial_int = final_int >> 32 # 48 bits - partial_bytes = partial_int.to_bytes(6, 'big') - crc_calc = crc32_of(partial_bytes) - - if crc_calc != crc_in: - return None - - # Extract fields (discard 4 spare bits) - partial_int >>= 4 # now 44 bits - answer = partial_int & 0x01 - cipher = (partial_int >> 1) & 0x0F - version = (partial_int >> (1+4)) & 0x7F - timestamp = partial_int >> (1+4+7) - - return cls(version, cipher, answer, timestamp) - - -# ============================================================================= -# 3) Handshake -# - 32-bit timestamp -# - 64-byte ephemeral pubkey (raw x||y = 512 bits) -# - 64-byte ephemeral signature (raw r||s = 512 bits) -# - 32-byte PFS hash (256 bits) -# - 32-bit CRC -# => total 4 + 64 + 64 + 32 + 4 = 168 bytes = 1344 bits -# ============================================================================= - -class Handshake: - """ - HANDSHAKE format (1344 bits / 168 bytes): - - timestamp: 32 bits - - ephemeral_pubkey: 512 bits (64 bytes, raw x||y format) - - ephemeral_signature: 512 bits (64 bytes, raw r||s format) - - pfs_hash: 256 bits (32 bytes) - - CRC: 32 bits - """ - def __init__(self, ephemeral_pubkey: bytes, ephemeral_signature: bytes, pfs_hash: bytes, timestamp: int = None): - if len(ephemeral_pubkey) != 64: - raise ValueError("ephemeral_pubkey must be 64 bytes (raw x||y)") - if len(ephemeral_signature) != 64: - raise ValueError("ephemeral_signature must be 64 bytes (raw r||s)") - if len(pfs_hash) != 32: - raise ValueError("pfs_hash must be 32 bytes") - - self.ephemeral_pubkey = ephemeral_pubkey - self.ephemeral_signature = ephemeral_signature - self.pfs_hash = pfs_hash - self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) - - def serialize(self) -> bytes: - """Serialize the handshake into a 168-byte packet.""" - # Pack timestamp and other fields - partial = struct.pack("!I", self.timestamp) + self.ephemeral_pubkey + self.ephemeral_signature + self.pfs_hash - - # Compute CRC - cval = crc32_of(partial) - - # Append CRC - return partial + struct.pack("!I", cval) - - @classmethod - def deserialize(cls, data: bytes) -> Optional['Handshake']: - """Deserialize a 168-byte packet into a Handshake object.""" - if len(data) != 168: - return None - - # Extract and verify CRC - partial = data[:-4] - crc_in = struct.unpack("!I", data[-4:])[0] - crc_calc = crc32_of(partial) - - if crc_calc != crc_in: - return None - - # Extract fields - timestamp = struct.unpack("!I", partial[:4])[0] - ephemeral_pubkey = partial[4:4+64] - ephemeral_signature = partial[68:68+64] - pfs_hash = partial[132:132+32] - - return cls(ephemeral_pubkey, ephemeral_signature, pfs_hash, timestamp) - - -# ============================================================================= -# 4) PFS Hash Helper -# If no previous session, return 32 zero bytes -# Otherwise, compute sha256(session_number || last_shared_secret). -# ============================================================================= - -def compute_pfs_hash(session_number: int, shared_secret_hex: str) -> bytes: - """ - Compute the PFS hash field for handshake messages: - - If no previous session (session_number < 0), return 32 zero bytes - - Otherwise, compute sha256(session_number || shared_secret) - """ - if session_number < 0: - return b"\x00" * 32 - - # Convert shared_secret_hex to raw bytes - secret_bytes = bytes.fromhex(shared_secret_hex) - - # Pack session_number as 4 bytes - sn_bytes = struct.pack("!I", session_number) - - # Compute hash - return hashlib.sha256(sn_bytes + secret_bytes).digest() - - -# Helper function for CRC32 calculations -def compute_crc32(data: bytes) -> int: - """Compute CRC32 of data (for consistency with crc32_of).""" - return zlib.crc32(data) & 0xffffffff - - -# ============================================================================= -# Voice Protocol Messages -# ============================================================================= - -class VoiceStart: - """ - Voice call initiation message (20 bytes). - - Fields: - - version: 8 bits (protocol version) - - codec_mode: 8 bits (Codec2 mode) - - fec_type: 8 bits (0=repetition, 1=convolutional, 2=LDPC) - - flags: 8 bits (reserved for future use) - - session_id: 64 bits (unique voice session identifier) - - initial_sequence: 32 bits (starting sequence number) - - crc32: 32 bits - """ - - def __init__(self, version: int = 0, codec_mode: int = 5, fec_type: int = 0, - flags: int = 0, session_id: int = None, initial_sequence: int = 0): - self.version = version - self.codec_mode = codec_mode - self.fec_type = fec_type - self.flags = flags | 0x80 # Set high bit to distinguish from VoiceSync - self.session_id = session_id or int.from_bytes(os.urandom(8), 'big') - self.initial_sequence = initial_sequence - - def serialize(self) -> bytes: - """Serialize to 20 bytes.""" - # Pack all fields except CRC - data = struct.pack('>BBBBQII', - self.version, - self.codec_mode, - self.fec_type, - self.flags, - self.session_id, - self.initial_sequence, - 0 # CRC placeholder - ) - - # Calculate and append CRC - crc = compute_crc32(data[:-4]) - return data[:-4] + struct.pack('>I', crc) - - @classmethod - def deserialize(cls, data: bytes) -> Optional['VoiceStart']: - """Deserialize from bytes.""" - if len(data) != 20: - return None - - try: - version, codec_mode, fec_type, flags, session_id, initial_seq, crc = struct.unpack('>BBBBQII', data) - - # Verify CRC - expected_crc = compute_crc32(data[:-4]) - if crc != expected_crc: - return None - - return cls(version, codec_mode, fec_type, flags, session_id, initial_seq) - except struct.error: - return None - - -class VoiceAck: - """ - Voice call acknowledgment message (16 bytes). - - Fields: - - version: 8 bits - - status: 8 bits (0=reject, 1=accept) - - codec_mode: 8 bits (negotiated codec mode) - - fec_type: 8 bits (negotiated FEC type) - - session_id: 64 bits (echo of received session_id) - - crc32: 32 bits - """ - - def __init__(self, version: int = 0, status: int = 1, codec_mode: int = 5, - fec_type: int = 0, session_id: int = 0): - self.version = version - self.status = status - self.codec_mode = codec_mode - self.fec_type = fec_type - self.session_id = session_id - - def serialize(self) -> bytes: - """Serialize to 16 bytes.""" - data = struct.pack('>BBBBQI', - self.version, - self.status, - self.codec_mode, - self.fec_type, - self.session_id, - 0 # CRC placeholder - ) - - crc = compute_crc32(data[:-4]) - return data[:-4] + struct.pack('>I', crc) - - @classmethod - def deserialize(cls, data: bytes) -> Optional['VoiceAck']: - """Deserialize from bytes.""" - if len(data) != 16: - return None - - try: - version, status, codec_mode, fec_type, session_id, crc = struct.unpack('>BBBBQI', data) - - expected_crc = compute_crc32(data[:-4]) - if crc != expected_crc: - return None - - return cls(version, status, codec_mode, fec_type, session_id) - except struct.error: - return None - - -class VoiceEnd: - """ - Voice call termination message (12 bytes). - - Fields: - - session_id: 64 bits - - crc32: 32 bits - """ - - def __init__(self, session_id: int): - self.session_id = session_id - - def serialize(self) -> bytes: - """Serialize to 12 bytes.""" - data = struct.pack('>QI', self.session_id, 0) - crc = compute_crc32(data[:-4]) - return data[:-4] + struct.pack('>I', crc) - - @classmethod - def deserialize(cls, data: bytes) -> Optional['VoiceEnd']: - """Deserialize from bytes.""" - if len(data) != 12: - return None - - try: - session_id, crc = struct.unpack('>QI', data) - - expected_crc = compute_crc32(data[:-4]) - if crc != expected_crc: - return None - - return cls(session_id) - except struct.error: - return None - - -class VoiceSync: - """ - Voice synchronization frame (20 bytes). - Used for maintaining sync and providing timing information. - - Fields: - - session_id: 64 bits - - sequence: 32 bits - - timestamp: 32 bits (milliseconds since voice start) - - crc32: 32 bits - """ - - def __init__(self, session_id: int, sequence: int, timestamp: int): - self.session_id = session_id - self.sequence = sequence - self.timestamp = timestamp - - def serialize(self) -> bytes: - """Serialize to 20 bytes.""" - data = struct.pack('>QIII', self.session_id, self.sequence, self.timestamp, 0) - crc = compute_crc32(data[:-4]) - return data[:-4] + struct.pack('>I', crc) - - @classmethod - def deserialize(cls, data: bytes) -> Optional['VoiceSync']: - """Deserialize from bytes.""" - if len(data) != 20: - return None - - try: - session_id, sequence, timestamp, crc = struct.unpack('>QIII', data) - - expected_crc = compute_crc32(data[:-4]) - if crc != expected_crc: - return None - - return cls(session_id, sequence, timestamp) - except struct.error: - return None diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/protocol.py b/protocol_prototype/Prototype/Protocol_Alpha_0/protocol.py deleted file mode 100644 index 28ed168..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/protocol.py +++ /dev/null @@ -1,1069 +0,0 @@ -import random -import os -import time -import threading -from typing import List, Dict, Any, Optional, Tuple - -from crypto_utils import ( - generate_identity_keys, - load_peer_identity_key, - sign_data, - verify_signature, - get_ephemeral_keypair, - compute_ecdh_shared_key, - der_to_raw, - raw_signature_to_der -) -from messages import ( - PingRequest, PingResponse, Handshake, - compute_pfs_hash, - VoiceStart, VoiceAck, VoiceEnd, VoiceSync -) -import transmission -from encryption import ( - EncryptedMessage, MessageHeader, - generate_iv, encrypt_message, decrypt_message -) -from auto_mode import AutoMode, AutoModeConfig -from voice_codec import VoiceProtocol - -# ANSI colors -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -RESET = "\033[0m" - - -class IcingProtocol: - def __init__(self): - # Identity keys (each 512 bits when printed as hex of 64 bytes) - self.identity_privkey, self.identity_pubkey = generate_identity_keys() - - # Peer identity for verifying ephemeral signatures - self.peer_identity_pubkey_obj = None - self.peer_identity_pubkey_bytes = None - - # Ephemeral keys (our side) - self.ephemeral_privkey = None - self.ephemeral_pubkey = None - - # Last computed shared secret (hex string) - self.shared_secret = None - - # Derived HKDF key (hex string, 256 bits) - self.hkdf_key = None - - # Negotiated cipher (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) - self.cipher_type = 0 - - # For PFS: track per-peer session info (session number and last shared secret) - self.pfs_history: Dict[bytes, Tuple[int, str]] = {} - - # Protocol flags - self.state = { - "ping_sent": False, - "ping_received": False, - "handshake_sent": False, - "handshake_received": False, - "key_exchange_complete": False - } - - # Auto mode for automated protocol operation - self.auto_mode = AutoMode(self) - - # Legacy auto-responder toggle (kept for backward compatibility) - self.auto_responder = False - - # Voice protocol handler - self.voice_protocol = None # Will be initialized after key exchange - self.voice_session_active = False - self.voice_session_id = None - - # Active connections list - self.connections = [] - - # Inbound messages (each message is a dict with keys: type, raw, parsed, connection) - self.inbound_messages: List[Dict[str, Any]] = [] - - # Store the session nonce (17 bytes but only 129 bits are valid) from first sent or received PING - self.session_nonce: bytes = None - - # Last used IV for encrypted messages - self.last_iv: bytes = None - - self.local_port = random.randint(30000, 40000) - self.server_listener = transmission.ServerListener( - host="127.0.0.1", - port=self.local_port, - on_new_connection=self.on_new_connection, - on_data_received=self.on_data_received - ) - self.server_listener.start() - - # ------------------------------------------------------------------------- - # Transport callbacks - # ------------------------------------------------------------------------- - - def on_new_connection(self, conn: transmission.PeerConnection): - print(f"{GREEN}[IcingProtocol]{RESET} New incoming connection.") - self.connections.append(conn) - - # Notify auto mode - self.auto_mode.handle_connection_established() - - def on_data_received(self, conn: transmission.PeerConnection, data: bytes): - bits_count = len(data) * 8 - print( - f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex()) > 60 else ''}") - - # PING REQUEST (22 bytes) - if len(data) == 22: - ping_request = PingRequest.deserialize(data) - if ping_request: - self.state["ping_received"] = True - - # If received cipher is not supported, force to 0 (AES-256-GCM) - if ping_request.cipher != 0 and ping_request.cipher != 1: - print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({ping_request.cipher}); forcing cipher to 0 in response.") - ping_request.cipher = 0 - - # Store cipher type for future encrypted messages - self.cipher_type = ping_request.cipher - - # Store session nonce if not already set - if self.session_nonce is None: - self.session_nonce = ping_request.session_nonce - print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from received PING.") - - index = len(self.inbound_messages) - msg = { - "type": "PING_REQUEST", - "raw": data, - "parsed": ping_request, - "connection": conn - } - self.inbound_messages.append(msg) - - # Handle in auto mode (if active) - self.auto_mode.handle_ping_received(index) - - # Legacy auto-responder (for backward compatibility) - if self.auto_responder and not self.auto_mode.active: - timer = threading.Timer(2.0, self._auto_respond_ping, args=[index]) - timer.daemon = True - timer.start() - return - - # PING RESPONSE (10 bytes) - elif len(data) == 10: - ping_response = PingResponse.deserialize(data) - if ping_response: - # Store negotiated cipher type - self.cipher_type = ping_response.cipher - - index = len(self.inbound_messages) - msg = { - "type": "PING_RESPONSE", - "raw": data, - "parsed": ping_response, - "connection": conn - } - self.inbound_messages.append(msg) - - # Notify auto mode (if active) - self.auto_mode.handle_ping_response_received(ping_response.answer == 1) - return - - # HANDSHAKE message (168 bytes) - elif len(data) == 168: - handshake = Handshake.deserialize(data) - if handshake: - self.state["handshake_received"] = True - index = len(self.inbound_messages) - msg = { - "type": "HANDSHAKE", - "raw": data, - "parsed": handshake, - "connection": conn - } - self.inbound_messages.append(msg) - - # Notify auto mode (if active) - self.auto_mode.handle_handshake_received(index) - - # Legacy auto-responder - if self.auto_responder and not self.auto_mode.active: - timer = threading.Timer(2.0, self._auto_respond_handshake, args=[index]) - timer.daemon = True - timer.start() - return - - # VOICE_START or VOICE_SYNC message (20 bytes) - elif len(data) == 20: - # Check fourth byte (flags field) to distinguish between messages - # VOICE_START has high bit set in flags (byte 3) - # VOICE_SYNC doesn't have this structure - if len(data) >= 4 and (data[3] & 0x80): - # Try VOICE_START first - voice_start = VoiceStart.deserialize(data) - if voice_start: - index = len(self.inbound_messages) - msg = { - "type": "VOICE_START", - "raw": data, - "parsed": voice_start, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_START at index={index}.") - - # Handle voice call initiation - self.handle_voice_start(index) - return - - # Try VOICE_SYNC - voice_sync = VoiceSync.deserialize(data) - if voice_sync: - index = len(self.inbound_messages) - msg = { - "type": "VOICE_SYNC", - "raw": data, - "parsed": voice_sync, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_SYNC at index={index}.") - - # Handle voice synchronization - self.handle_voice_sync(index) - return - - # VOICE_ACK message (16 bytes) - elif len(data) == 16: - # Try VOICE_ACK first, then fall back to PING_RESPONSE - voice_ack = VoiceAck.deserialize(data) - if voice_ack: - index = len(self.inbound_messages) - msg = { - "type": "VOICE_ACK", - "raw": data, - "parsed": voice_ack, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_ACK at index={index}.") - - # Handle voice call acknowledgment - self.handle_voice_ack(index) - return - - # VOICE_END message (12 bytes) - elif len(data) == 12: - voice_end = VoiceEnd.deserialize(data) - if voice_end: - index = len(self.inbound_messages) - msg = { - "type": "VOICE_END", - "raw": data, - "parsed": voice_end, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_END at index={index}.") - - # Handle voice call termination - self.handle_voice_end(index) - return - - - # Check if the message might be an encrypted message (e.g. header of 18 bytes at start) - elif len(data) >= 18: - # Try to parse header - try: - header = MessageHeader.unpack(data[:18]) - # If header unpacking is successful and data length matches header expectations - expected_len = 18 + header.data_len + 16 # Header + payload + tag - - # Check if CRC is included - has_crc = (header.connection_status & 0x01) != 0 - if has_crc: - expected_len += 4 # Add CRC32 length - - if len(data) >= expected_len: - index = len(self.inbound_messages) - msg = { - "type": "ENCRYPTED_MESSAGE", - "raw": data, - "parsed": header, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Stored inbound ENCRYPTED_MESSAGE at index={index}.") - - # Notify auto mode - self.auto_mode.handle_encrypted_received(index) - return - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to parse message header: {e}") - - # Otherwise, unrecognized/malformed message. - index = len(self.inbound_messages) - msg = { - "type": "UNKNOWN", - "raw": data, - "parsed": None, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{RED}[WARNING]{RESET} Unrecognized or malformed message stored at index={index}.") - - - # ------------------------------------------------------------------------- - # HKDF Derivation - # ------------------------------------------------------------------------- - - def derive_hkdf(self): - """ - Derives a 256-bit key using HKDF. - Uses as input keying material (IKM) the shared secret from ECDH. - The salt is computed as SHA256(session_nonce || pfs_param), where: - - session_nonce is taken from self.session_nonce (17 bytes, 129 bits) or defaults to zeros. - - pfs_param is taken from the first inbound HANDSHAKE's pfs_hash field (32 bytes) or zeros. - """ - if not self.shared_secret: - print(f"{RED}[ERROR]{RESET} No shared secret available; cannot derive HKDF key.") - return - - # IKM: shared secret converted from hex to bytes. - ikm = bytes.fromhex(self.shared_secret) - # Use stored session_nonce if available; otherwise default to zeros. - session_nonce = self.session_nonce if self.session_nonce is not None else (b"\x00" * 17) - - # For now, use a simpler approach: just use session_nonce for salt - # This ensures both peers derive the same key - # PFS is still maintained through the shared secret rotation - pfs_param = b"\x00" * 32 # Will use session_nonce only for salt - - # Ensure both are bytes - if isinstance(session_nonce, str): - session_nonce = session_nonce.encode() - if isinstance(pfs_param, str): - pfs_param = pfs_param.encode() - - from cryptography.hazmat.primitives import hashes - from cryptography.hazmat.primitives.kdf.hkdf import HKDF - hasher = hashes.Hash(hashes.SHA256()) - hasher.update(session_nonce + pfs_param) - salt_value = hasher.finalize() - - hkdf = HKDF( - algorithm=hashes.SHA256(), - length=32, # 256 bits - salt=salt_value, - info=b"", - ) - derived_key = hkdf.derive(ikm) - self.hkdf_key = derived_key.hex() - self.state["key_exchange_complete"] = True - print(f"{GREEN}[HKDF]{RESET} Derived HKDF key: {self.hkdf_key}") - return True - - # ------------------------------------------------------------------------- - # Legacy Auto-responder helpers (kept for backward compatibility) - # ------------------------------------------------------------------------- - - def _auto_respond_ping(self, index: int): - """ - Called by a Timer to respond automatically to a PING_REQUEST after 2s. - """ - print(f"{BLUE}[AUTO]{RESET} Delayed responding to PING at index={index}") - self.respond_to_ping(index, answer=1) # Accept by default - self.show_state() - - def _auto_respond_handshake(self, index: int): - """ - Called by a Timer to handle inbound HANDSHAKE automatically. - 1) Generate ephemeral keys if not already set - 2) Compute ECDH with the inbound ephemeral pub (generate_ecdhe) - 3) Send our handshake back - 4) Show state - """ - print(f"{BLUE}[AUTO]{RESET} Delayed ECDH process for HANDSHAKE at index={index}") - - # 1) Generate ephemeral keys if we don't have them - if not self.ephemeral_privkey or not self.ephemeral_pubkey: - self.generate_ephemeral_keys() - - # 2) Compute ECDH from inbound ephemeral pub - self.generate_ecdhe(index) - - # 3) Send our handshake to the peer - self.send_handshake() - - # 4) Show final state - self.show_state() - - # ------------------------------------------------------------------------- - # Public Methods for Auto Mode Management - # ------------------------------------------------------------------------- - - def start_auto_mode(self): - """Start the automatic protocol operation mode.""" - self.auto_mode.start() - - def stop_auto_mode(self): - """Stop the automatic protocol operation mode.""" - self.auto_mode.stop() - - def configure_auto_mode(self, **kwargs): - """ - Configure the automatic mode parameters. - - Args: - **kwargs: Configuration parameters to set. Supported parameters: - - ping_response_accept: bool, whether to accept incoming pings - - ping_auto_initiate: bool, whether to initiate pings on connection - - ping_retry_count: int, number of ping retries - - ping_retry_delay: float, seconds between ping retries - - ping_timeout: float, seconds to wait for ping response - - preferred_cipher: int, preferred cipher (0=AES-GCM, 1=ChaCha20-Poly1305) - - handshake_retry_count: int, number of handshake retries - - handshake_retry_delay: float, seconds between handshake retries - - handshake_timeout: float, seconds to wait for handshake - - auto_message_enabled: bool, whether to auto-send messages - - message_interval: float, seconds between auto messages - - message_content: str, default message content - - active_mode: bool, whether to actively initiate protocol - """ - for key, value in kwargs.items(): - if hasattr(self.auto_mode.config, key): - setattr(self.auto_mode.config, key, value) - print(f"{BLUE}[CONFIG]{RESET} Set auto mode {key} = {value}") - else: - print(f"{RED}[ERROR]{RESET} Unknown auto mode configuration parameter: {key}") - - def get_auto_mode_config(self): - """Return the current auto mode configuration.""" - return self.auto_mode.config - - def queue_auto_message(self, message: str): - """ - Add a message to the auto-send queue. - - Args: - message: Message text to send - """ - self.auto_mode.queue_message(message) - - def toggle_auto_mode_logging(self): - """ - Toggle detailed logging for auto mode. - When enabled, will show more information about state transitions and decision making. - """ - if not hasattr(self.auto_mode, 'verbose_logging'): - self.auto_mode.verbose_logging = True - else: - self.auto_mode.verbose_logging = not self.auto_mode.verbose_logging - - status = "enabled" if self.auto_mode.verbose_logging else "disabled" - print(f"{BLUE}[AUTO-LOG]{RESET} Detailed logging {status}") - - def debug_message(self, index: int): - """ - Debug a message in the inbound message queue. - Prints detailed information about the message. - - Args: - index: The index of the message in the inbound_messages queue - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid message index {index}") - return - - msg = self.inbound_messages[index] - print(f"\n{YELLOW}=== Message Debug [{index}] ==={RESET}") - print(f"Type: {msg['type']}") - print(f"Length: {len(msg['raw'])} bytes = {len(msg['raw'])*8} bits") - print(f"Raw data: {msg['raw'].hex()}") - - if msg['parsed'] is not None: - print(f"\n{YELLOW}--- Parsed Data ---{RESET}") - if msg['type'] == 'PING_REQUEST': - ping = msg['parsed'] - print(f"Version: {ping.version}") - print(f"Cipher: {ping.cipher} ({'AES-256-GCM' if ping.cipher == 0 else 'ChaCha20-Poly1305' if ping.cipher == 1 else 'Unknown'})") - print(f"Session nonce: {ping.session_nonce.hex()}") - print(f"CRC32: {ping.crc32:08x}") - - elif msg['type'] == 'PING_RESPONSE': - resp = msg['parsed'] - print(f"Version: {resp.version}") - print(f"Cipher: {resp.cipher} ({'AES-256-GCM' if resp.cipher == 0 else 'ChaCha20-Poly1305' if resp.cipher == 1 else 'Unknown'})") - print(f"Answer: {resp.answer} ({'Accept' if resp.answer == 1 else 'Reject'})") - print(f"CRC32: {resp.crc32:08x}") - - elif msg['type'] == 'HANDSHAKE': - hs = msg['parsed'] - print(f"Ephemeral pubkey: {hs.ephemeral_pubkey.hex()[:16]}...") - print(f"Ephemeral signature: {hs.ephemeral_signature.hex()[:16]}...") - print(f"PFS hash: {hs.pfs_hash.hex()[:16]}...") - print(f"Timestamp: {hs.timestamp}") - print(f"CRC32: {hs.crc32:08x}") - - elif msg['type'] == 'ENCRYPTED_MESSAGE': - header = msg['parsed'] - print(f"Flag: 0x{header.flag:04x}") - print(f"Data length: {header.data_len} bytes") - print(f"Retry: {header.retry}") - print(f"Connection status: {header.connection_status} ({'CRC included' if header.connection_status & 0x01 else 'No CRC'})") - print(f"IV: {header.iv.hex()}") - - # Calculate expected message size - expected_len = 18 + header.data_len + 16 # Header + payload + tag - if header.connection_status & 0x01: - expected_len += 4 # Add CRC - - print(f"Expected total length: {expected_len} bytes") - print(f"Actual length: {len(msg['raw'])} bytes") - - # If we have a key, try to decrypt - if self.hkdf_key: - print("\nAttempting decryption...") - try: - key = bytes.fromhex(self.hkdf_key) - plaintext = decrypt_message(msg['raw'], key, self.cipher_type) - print(f"Decrypted: {plaintext.decode('utf-8')}") - except Exception as e: - print(f"Decryption failed: {e}") - print() - - # ------------------------------------------------------------------------- - # Public Methods - # ------------------------------------------------------------------------- - - def connect_to_peer(self, port: int): - conn = transmission.connect_to_peer("127.0.0.1", port, self.on_data_received) - self.connections.append(conn) - print(f"{GREEN}[IcingProtocol]{RESET} Outgoing connection to port {port} established.") - - # Notify auto mode - self.auto_mode.handle_connection_established() - - def set_peer_identity(self, peer_pubkey_hex: str): - pubkey_bytes = bytes.fromhex(peer_pubkey_hex) - self.peer_identity_pubkey_obj = load_peer_identity_key(pubkey_bytes) - self.peer_identity_pubkey_bytes = pubkey_bytes - print(f"{GREEN}[IcingProtocol]{RESET} Stored peer identity pubkey (hex={peer_pubkey_hex[:16]}...).") - - def generate_ephemeral_keys(self): - self.ephemeral_privkey, self.ephemeral_pubkey = get_ephemeral_keypair() - print(f"{GREEN}[IcingProtocol]{RESET} Generated ephemeral key pair: pubkey={self.ephemeral_pubkey.hex()[:16]}...") - - # Send PING (session discovery and cipher negotiation) - def send_ping_request(self, cipher_type=0): - """ - Send a ping request to the first connected peer. - - Args: - cipher_type: Preferred cipher type (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) - """ - if not self.connections: - print(f"{RED}[ERROR]{RESET} No active connections.") - return False - - # Validate cipher type - if cipher_type not in (0, 1): - print(f"{YELLOW}[WARNING]{RESET} Invalid cipher type {cipher_type}, defaulting to AES-256-GCM (0)") - cipher_type = 0 - - # Create ping request with specified cipher - ping_request = PingRequest(version=0, cipher=cipher_type) - - # Store session nonce if not already set - if self.session_nonce is None: - self.session_nonce = ping_request.session_nonce - print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from sent PING.") - - # Serialize and send - pkt = ping_request.serialize() - self._send_packet(self.connections[0], pkt, "PING_REQUEST") - self.state["ping_sent"] = True - return True - - def send_handshake(self): - """ - Build and send handshake: - - ephemeral_pubkey (64 bytes, raw x||y) - - ephemeral_signature (64 bytes, raw r||s) - - pfs_hash (32 bytes) - - timestamp (32 bits) - - CRC (32 bits) - """ - if not self.connections: - print(f"{RED}[ERROR]{RESET} No active connections.") - return False - if not self.ephemeral_privkey or not self.ephemeral_pubkey: - print(f"{RED}[ERROR]{RESET} Ephemeral keys not generated.") - return False - if self.peer_identity_pubkey_bytes is None: - print(f"{RED}[ERROR]{RESET} Peer identity not set; needed for PFS tracking.") - return False - - # 1) Sign ephemeral_pubkey using identity key - sig_der = sign_data(self.identity_privkey, self.ephemeral_pubkey) - # Convert DER signature to raw r||s format (64 bytes) - raw_signature = der_to_raw(sig_der) - - # 2) Compute PFS hash - session_number, last_secret_hex = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) - pfs = compute_pfs_hash(session_number, last_secret_hex) - - # 3) Create handshake object - handshake = Handshake( - ephemeral_pubkey=self.ephemeral_pubkey, - ephemeral_signature=raw_signature, - pfs_hash=pfs - ) - - # 4) Serialize and send - pkt = handshake.serialize() - self._send_packet(self.connections[0], pkt, "HANDSHAKE") - self.state["handshake_sent"] = True - return True - - def enable_auto_responder(self, enable: bool): - """ - Legacy method for enabling/disabling auto responder. - For new code, use start_auto_mode() and stop_auto_mode() instead. - """ - self.auto_responder = enable - print(f"{YELLOW}[LEGACY]{RESET} Auto responder set to {enable}. Consider using auto_mode instead.") - - # ------------------------------------------------------------------------- - # Manual Responses - # ------------------------------------------------------------------------- - - def respond_to_ping(self, index: int, answer: int): - """ - Respond to a ping request with the specified answer (0 = no, 1 = yes). - If answer is 1, we accept the connection and use the cipher specified in the request. - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid index {index}.") - return False - msg = self.inbound_messages[index] - if msg["type"] != "PING_REQUEST": - print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a PING_REQUEST.") - return False - - ping_request = msg["parsed"] - version = ping_request.version - cipher = ping_request.cipher - - # Force cipher to 0 or 1 (only AES-256-GCM and ChaCha20-Poly1305 are supported) - if cipher != 0 and cipher != 1: - print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({cipher}); forcing cipher to 0 in response.") - cipher = 0 - - # Store the negotiated cipher type if we're accepting - if answer == 1: - self.cipher_type = cipher - - conn = msg["connection"] - # Create ping response - ping_response = PingResponse(version, cipher, answer) - resp = ping_response.serialize() - self._send_packet(conn, resp, "PING_RESPONSE") - print(f"{BLUE}[MANUAL]{RESET} Responded to ping with answer={answer}.") - return True - - def generate_ecdhe(self, index: int): - """ - Process a handshake message: - 1. Verify the ephemeral signature - 2. Compute the ECDH shared secret - 3. Update PFS history - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid index {index}.") - return False - msg = self.inbound_messages[index] - if msg["type"] != "HANDSHAKE": - print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a HANDSHAKE.") - return False - - handshake = msg["parsed"] - - # Convert raw signature to DER for verification - raw_sig = handshake.ephemeral_signature - sig_der = raw_signature_to_der(raw_sig) - - # Verify signature - ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, handshake.ephemeral_pubkey) - if not ok: - print(f"{RED}[ERROR]{RESET} Ephemeral signature invalid.") - return False - print(f"{GREEN}[OK]{RESET} Ephemeral signature verified.") - - # Check if we have ephemeral keys - if not self.ephemeral_privkey: - print(f"{YELLOW}[WARN]{RESET} No ephemeral_privkey available, cannot compute shared secret.") - return False - - # Compute ECDH shared secret - shared = compute_ecdh_shared_key(self.ephemeral_privkey, handshake.ephemeral_pubkey) - self.shared_secret = shared.hex() - print(f"{GREEN}[OK]{RESET} Computed ECDH shared key = {self.shared_secret}") - - # Update PFS history - old_session, _ = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) - new_session = 1 if old_session < 0 else old_session + 1 - self.pfs_history[self.peer_identity_pubkey_bytes] = (new_session, self.shared_secret) - return True - - # ------------------------------------------------------------------------- - # Utility - # ------------------------------------------------------------------------- - - def _send_packet(self, conn: transmission.PeerConnection, data: bytes, label: str): - bits_count = len(data) * 8 - print(f"{BLUE}[SEND]{RESET} {label} -> {bits_count} bits: {data.hex()[:60]}{'...' if len(data.hex())>60 else ''}") - conn.send(data) - - def show_state(self): - print(f"\n{YELLOW}=== Global State ==={RESET}") - print(f"Listening Port: {self.local_port}") - print(f"Identity PubKey: 512 bits => {self.identity_pubkey.hex()[:16]}...") - - if self.peer_identity_pubkey_bytes: - print(f"Peer Identity PubKey: 512 bits => {self.peer_identity_pubkey_bytes.hex()[:16]}...") - else: - print("Peer Identity PubKey: [None]") - - print("\nEphemeral Keys:") - if self.ephemeral_pubkey: - print(f" ephemeral_pubkey: 512 bits => {self.ephemeral_pubkey.hex()[:16]}...") - else: - print(" ephemeral_pubkey: [None]") - - print(f"\nShared Secret: {self.shared_secret if self.shared_secret else '[None]'}") - - if self.hkdf_key: - print(f"HKDF Derived Key: {self.hkdf_key} (size: {len(self.hkdf_key)*8} bits)") - else: - print("HKDF Derived Key: [None]") - - print(f"Negotiated Cipher: {'AES-256-GCM' if self.cipher_type == 0 else 'ChaCha20-Poly1305'} (code: {self.cipher_type})") - - if self.session_nonce: - print(f"Session Nonce: {self.session_nonce.hex()} (129 bits)") - else: - print("Session Nonce: [None]") - - if self.last_iv: - print(f"Last IV: {self.last_iv.hex()} (96 bits)") - else: - print("Last IV: [None]") - - print("\nProtocol Flags:") - for k, v in self.state.items(): - print(f" {k}: {v}") - - print("\nAuto Mode Active:", self.auto_mode.active) - print("Auto Mode State:", self.auto_mode.state) - print("Legacy Auto Responder:", self.auto_responder) - - print("\nVoice Status:") - print(f" Active: {self.voice_session_active}") - if self.voice_session_id: - print(f" Session ID: {self.voice_session_id:016x}") - print(f" Voice Protocol: {'Initialized' if self.voice_protocol else 'Not initialized'}") - - print("\nActive Connections:") - for i, c in enumerate(self.connections): - print(f" [{i}] Alive={c.alive}") - - print("\nInbound Message Queue:") - for i, m in enumerate(self.inbound_messages): - print(f" [{i}] type={m['type']} len={len(m['raw'])} bytes => {len(m['raw']) * 8} bits") - print() - - def stop(self): - """Stop the protocol and clean up resources.""" - # Stop auto mode first - self.auto_mode.stop() - - # Stop server listener - self.server_listener.stop() - - # Close all connections - for c in self.connections: - c.close() - self.connections.clear() - self.inbound_messages.clear() - print(f"{RED}[STOP]{RESET} Protocol stopped.") - - # ------------------------------------------------------------------------- - # Encrypted Messaging - # ------------------------------------------------------------------------- - - def send_encrypted_message(self, plaintext: str): - """ - Encrypts and sends a message using the derived HKDF key and negotiated cipher. - The message format is: - - Header (18 bytes): flag, data_len, retry, connection_status, IV - - Payload: variable length encrypted data - - Footer: Authentication tag (16 bytes) + optional CRC32 (4 bytes) - """ - if not self.connections: - print(f"{RED}[ERROR]{RESET} No active connections.") - return False - if not self.hkdf_key: - print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot encrypt message.") - return False - - # Get the encryption key - key = bytes.fromhex(self.hkdf_key) - - # Convert plaintext to bytes - plaintext_bytes = plaintext.encode('utf-8') - - # Generate or increment the IV - if self.last_iv is None: - # First message, generate random IV - iv = generate_iv(initial=True) - else: - # Subsequent message, increment previous IV - iv = generate_iv(initial=False, previous_iv=self.last_iv) - - # Store the new IV - self.last_iv = iv - - # Create encrypted message (connection_status 0 = no CRC) - encrypted = encrypt_message( - plaintext=plaintext_bytes, - key=key, - flag=0xBEEF, # Default flag - retry=0, - connection_status=0, # No CRC - iv=iv, - cipher_type=self.cipher_type - ) - - # Send the encrypted message - self._send_packet(self.connections[0], encrypted, "ENCRYPTED_MESSAGE") - print(f"{GREEN}[SEND_ENCRYPTED]{RESET} Encrypted message sent.") - return True - - def decrypt_received_message(self, index: int): - """ - Decrypt a received encrypted message using the HKDF key and negotiated cipher. - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid message index.") - return None - - msg = self.inbound_messages[index] - if msg["type"] != "ENCRYPTED_MESSAGE": - print(f"{RED}[ERROR]{RESET} Message at index {index} is not an ENCRYPTED_MESSAGE.") - return None - - # Get the encrypted message - encrypted = msg["raw"] - - if not self.hkdf_key: - print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot decrypt message.") - return None - - # Get the encryption key - key = bytes.fromhex(self.hkdf_key) - - try: - # Decrypt the message - plaintext = decrypt_message(encrypted, key, self.cipher_type) - - # Convert to string - plaintext_str = plaintext.decode('utf-8') - - # Update last IV from the header - header = MessageHeader.unpack(encrypted[:18]) - self.last_iv = header.iv - - print(f"{GREEN}[DECRYPTED]{RESET} Decrypted message: {plaintext_str}") - return plaintext_str - except Exception as e: - print(f"{RED}[ERROR]{RESET} Decryption failed: {e}") - return None - - # ------------------------------------------------------------------------- - # Voice Protocol Methods - # ------------------------------------------------------------------------- - - def handle_voice_start(self, index: int): - """Handle incoming voice call initiation.""" - if index < 0 or index >= len(self.inbound_messages): - return - - msg = self.inbound_messages[index] - voice_start = msg["parsed"] - - print(f"{BLUE}[VOICE]{RESET} Incoming voice call (session_id={voice_start.session_id:016x})") - print(f" Codec mode: {voice_start.codec_mode}") - print(f" FEC type: {voice_start.fec_type}") - - # Auto-accept if in auto mode (or implement your own logic) - if self.auto_mode.active: - self.accept_voice_call(voice_start.session_id, voice_start.codec_mode, voice_start.fec_type) - - def handle_voice_ack(self, index: int): - """Handle voice call acknowledgment.""" - if index < 0 or index >= len(self.inbound_messages): - return - - msg = self.inbound_messages[index] - voice_ack = msg["parsed"] - - if voice_ack.status == 1: - print(f"{GREEN}[VOICE]{RESET} Voice call accepted (session_id={voice_ack.session_id:016x})") - self.voice_session_active = True - self.voice_session_id = voice_ack.session_id - - # Initialize voice protocol if not already done - if not self.voice_protocol: - self.voice_protocol = VoiceProtocol(self) - else: - print(f"{RED}[VOICE]{RESET} Voice call rejected") - - def handle_voice_end(self, index: int): - """Handle voice call termination.""" - if index < 0 or index >= len(self.inbound_messages): - return - - msg = self.inbound_messages[index] - voice_end = msg["parsed"] - - print(f"{YELLOW}[VOICE]{RESET} Voice call ended (session_id={voice_end.session_id:016x})") - - if self.voice_session_id == voice_end.session_id: - self.voice_session_active = False - self.voice_session_id = None - - def handle_voice_sync(self, index: int): - """Handle voice synchronization frame.""" - if index < 0 or index >= len(self.inbound_messages): - return - - msg = self.inbound_messages[index] - voice_sync = msg["parsed"] - - # Use sync info for timing/jitter buffer management - print(f"{BLUE}[VOICE-SYNC]{RESET} seq={voice_sync.sequence}, ts={voice_sync.timestamp}ms") - - def start_voice_call(self, codec_mode: int = 5, fec_type: int = 0): - """ - Initiate a voice call. - - Args: - codec_mode: Codec2 mode (default 5 = 1200bps) - fec_type: FEC type (0=repetition, 1=convolutional, 2=LDPC) - """ - if not self.connections: - print(f"{RED}[ERROR]{RESET} No active connections.") - return False - - if not self.state.get("key_exchange_complete"): - print(f"{RED}[ERROR]{RESET} Key exchange not complete. Cannot start voice call.") - return False - - # Create VOICE_START message - voice_start = VoiceStart( - version=0, - codec_mode=codec_mode, - fec_type=fec_type - ) - - self.voice_session_id = voice_start.session_id - - # Send the message - pkt = voice_start.serialize() - self._send_packet(self.connections[0], pkt, "VOICE_START") - - print(f"{GREEN}[VOICE]{RESET} Initiating voice call (session_id={self.voice_session_id:016x})") - return True - - def accept_voice_call(self, session_id: int, codec_mode: int, fec_type: int): - """Accept an incoming voice call.""" - if not self.connections: - return False - - # Send VOICE_ACK - voice_ack = VoiceAck( - version=0, - status=1, # Accept - codec_mode=codec_mode, - fec_type=fec_type, - session_id=session_id - ) - - pkt = voice_ack.serialize() - self._send_packet(self.connections[0], pkt, "VOICE_ACK") - - self.voice_session_active = True - self.voice_session_id = session_id - - # Initialize voice protocol - if not self.voice_protocol: - self.voice_protocol = VoiceProtocol(self) - - return True - - def end_voice_call(self): - """End the current voice call.""" - if not self.voice_session_active or not self.voice_session_id: - print(f"{YELLOW}[VOICE]{RESET} No active voice call to end") - return False - - if not self.connections: - return False - - # Send VOICE_END - voice_end = VoiceEnd(self.voice_session_id) - pkt = voice_end.serialize() - self._send_packet(self.connections[0], pkt, "VOICE_END") - - self.voice_session_active = False - self.voice_session_id = None - - print(f"{YELLOW}[VOICE]{RESET} Voice call ended") - return True - - def send_voice_audio(self, audio_samples): - """ - Send voice audio samples. - - Args: - audio_samples: PCM audio samples (8kHz, 16-bit) - """ - if not self.voice_session_active: - print(f"{RED}[ERROR]{RESET} No active voice session") - return False - - if not self.voice_protocol: - print(f"{RED}[ERROR]{RESET} Voice protocol not initialized") - return False - - try: - # Process and send audio - modulated = self.voice_protocol.process_voice_input(audio_samples) - if modulated is not None: - # In real implementation, this would go through the audio channel - # For now, we could send it as encrypted data - print(f"{BLUE}[VOICE-AUDIO]{RESET} Processed {len(modulated)} samples") - return True - except Exception as e: - print(f"{RED}[ERROR]{RESET} Voice audio processing failed: {e}") - import traceback - traceback.print_exc() - - return False diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/transmission.py b/protocol_prototype/Prototype/Protocol_Alpha_0/transmission.py deleted file mode 100644 index 35f3a21..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/transmission.py +++ /dev/null @@ -1,100 +0,0 @@ -import socket -import threading -from typing import Callable - -class PeerConnection: - """ - Represents a live, two-way connection to a peer. - We keep a socket open, read data in a background thread, - and can send data from the main thread at any time. - """ - def __init__(self, sock: socket.socket, on_data_received: Callable[['PeerConnection', bytes], None]): - self.sock = sock - self.on_data_received = on_data_received - self.alive = True - - self.read_thread = threading.Thread(target=self.read_loop, daemon=True) - self.read_thread.start() - - def read_loop(self): - while self.alive: - try: - data = self.sock.recv(4096) - if not data: - break - self.on_data_received(self, data) - except OSError: - break - self.alive = False - self.sock.close() - print("[PeerConnection] Connection closed.") - - def send(self, data: bytes): - if not self.alive: - print("[PeerConnection.send] Cannot send, connection not alive.") - return - try: - self.sock.sendall(data) - except OSError: - print("[PeerConnection.send] Send failed, connection might be closed.") - self.alive = False - - def close(self): - self.alive = False - try: - self.sock.shutdown(socket.SHUT_RDWR) - except OSError: - pass - self.sock.close() - - -class ServerListener(threading.Thread): - """ - A thread that listens on a given port. When a new client connects, - it creates a PeerConnection for that client. - """ - def __init__(self, host: str, port: int, - on_new_connection: Callable[[PeerConnection], None], - on_data_received: Callable[[PeerConnection, bytes], None]): - super().__init__(daemon=True) - self.host = host - self.port = port - self.on_new_connection = on_new_connection - self.on_data_received = on_data_received - self.server_socket = None - self.stop_event = threading.Event() - - def run(self): - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_socket.bind((self.host, self.port)) - self.server_socket.listen(5) - self.server_socket.settimeout(1.0) - print(f"[ServerListener] Listening on {self.host}:{self.port}") - - while not self.stop_event.is_set(): - try: - client_sock, addr = self.server_socket.accept() - print(f"[ServerListener] Accepted connection from {addr}") - conn = PeerConnection(client_sock, self.on_data_received) - self.on_new_connection(conn) - except socket.timeout: - pass - except OSError: - break - - if self.server_socket: - self.server_socket.close() - - def stop(self): - self.stop_event.set() - if self.server_socket: - self.server_socket.close() - - -def connect_to_peer(host: str, port: int, - on_data_received: Callable[[PeerConnection, bytes], None]) -> PeerConnection: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) - print(f"[connect_to_peer] Connected to {host}:{port}") - conn = PeerConnection(sock, on_data_received) - return conn diff --git a/protocol_prototype/Prototype/Protocol_Alpha_0/voice_codec.py b/protocol_prototype/Prototype/Protocol_Alpha_0/voice_codec.py deleted file mode 100644 index c8e70be..0000000 --- a/protocol_prototype/Prototype/Protocol_Alpha_0/voice_codec.py +++ /dev/null @@ -1,716 +0,0 @@ -""" -Voice codec integration for encrypted voice over GSM. -Implements Codec2 compression with FSK modulation for transmitting -encrypted voice data over standard GSM voice channels. -""" - -import array -import math -import struct -from typing import Optional, Tuple, List -from dataclasses import dataclass -from enum import IntEnum - -try: - import numpy as np - HAS_NUMPY = True -except ImportError: - HAS_NUMPY = False - -# ANSI colors -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -RESET = "\033[0m" - - -class Codec2Mode(IntEnum): - """Codec2 bitrate modes.""" - MODE_3200 = 0 # 3200 bps - MODE_2400 = 1 # 2400 bps - MODE_1600 = 2 # 1600 bps - MODE_1400 = 3 # 1400 bps - MODE_1300 = 4 # 1300 bps - MODE_1200 = 5 # 1200 bps (recommended for robustness) - MODE_700C = 6 # 700 bps - - -@dataclass -class Codec2Frame: - """Represents a single Codec2 compressed voice frame.""" - mode: Codec2Mode - bits: bytes - timestamp: float - frame_number: int - - -class Codec2Wrapper: - """ - Wrapper for Codec2 voice codec. - In production, this would use py_codec2 or ctypes bindings to libcodec2. - This is a simulation interface for protocol development. - """ - - # Frame sizes in bits for each mode - FRAME_BITS = { - Codec2Mode.MODE_3200: 64, - Codec2Mode.MODE_2400: 48, - Codec2Mode.MODE_1600: 64, - Codec2Mode.MODE_1400: 56, - Codec2Mode.MODE_1300: 52, - Codec2Mode.MODE_1200: 48, - Codec2Mode.MODE_700C: 28 - } - - # Frame duration in ms - FRAME_MS = { - Codec2Mode.MODE_3200: 20, - Codec2Mode.MODE_2400: 20, - Codec2Mode.MODE_1600: 40, - Codec2Mode.MODE_1400: 40, - Codec2Mode.MODE_1300: 40, - Codec2Mode.MODE_1200: 40, - Codec2Mode.MODE_700C: 40 - } - - def __init__(self, mode: Codec2Mode = Codec2Mode.MODE_1200): - """ - Initialize Codec2 wrapper. - - Args: - mode: Codec2 bitrate mode (default 1200 bps for robustness) - """ - self.mode = mode - self.frame_bits = self.FRAME_BITS[mode] - self.frame_bytes = (self.frame_bits + 7) // 8 - self.frame_ms = self.FRAME_MS[mode] - self.frame_samples = int(8000 * self.frame_ms / 1000) # 8kHz sampling - self.frame_counter = 0 - - print(f"{GREEN}[CODEC2]{RESET} Initialized in mode {mode.name} " - f"({self.frame_bits} bits/frame, {self.frame_ms}ms duration)") - - def encode(self, audio_samples) -> Optional[Codec2Frame]: - """ - Encode PCM audio samples to Codec2 frame. - - Args: - audio_samples: PCM samples (8kHz, 16-bit signed) - - Returns: - Codec2Frame or None if insufficient samples - """ - if len(audio_samples) < self.frame_samples: - return None - - # In production: call codec2_encode(state, bits, samples) - # Simulation: create pseudo-compressed data - compressed = self._simulate_compression(audio_samples[:self.frame_samples]) - - frame = Codec2Frame( - mode=self.mode, - bits=compressed, - timestamp=self.frame_counter * self.frame_ms / 1000.0, - frame_number=self.frame_counter - ) - - self.frame_counter += 1 - return frame - - def decode(self, frame: Codec2Frame): - """ - Decode Codec2 frame to PCM audio samples. - - Args: - frame: Codec2 compressed frame - - Returns: - PCM samples (8kHz, 16-bit signed) - """ - if frame.mode != self.mode: - raise ValueError(f"Frame mode {frame.mode} doesn't match decoder mode {self.mode}") - - # In production: call codec2_decode(state, samples, bits) - # Simulation: decompress to audio - return self._simulate_decompression(frame.bits) - - def _simulate_compression(self, samples) -> bytes: - """Simulate Codec2 compression (for testing).""" - # Convert to list if needed - if hasattr(samples, 'tolist'): - sample_list = samples.tolist() - elif hasattr(samples, '__iter__'): - sample_list = list(samples) - else: - sample_list = samples - - # Extract basic features for simulation - if HAS_NUMPY and hasattr(samples, '__array__'): - # Convert to numpy array if needed - np_samples = np.asarray(samples, dtype=np.float32) - if len(np_samples) > 0: - mean_square = np.mean(np_samples ** 2) - energy = np.sqrt(mean_square) if not np.isnan(mean_square) else 0.0 - zero_crossings = np.sum(np.diff(np.sign(np_samples)) != 0) - else: - energy = 0.0 - zero_crossings = 0 - else: - # Manual calculation without numpy - if sample_list and len(sample_list) > 0: - energy = math.sqrt(sum(s**2 for s in sample_list) / len(sample_list)) - zero_crossings = sum(1 for i in range(1, len(sample_list)) - if (sample_list[i-1] >= 0) != (sample_list[i] >= 0)) - else: - energy = 0.0 - zero_crossings = 0 - - # Pack into bytes (simplified) - # Ensure values are valid - energy_int = max(0, min(65535, int(energy))) - zc_int = max(0, min(65535, int(zero_crossings))) - data = struct.pack('<HH', energy_int, zc_int) - - # Pad to expected frame size - data += b'\x00' * (self.frame_bytes - len(data)) - - return data[:self.frame_bytes] - - def _simulate_decompression(self, compressed: bytes): - """Simulate Codec2 decompression (for testing).""" - # Unpack features - if len(compressed) >= 4: - energy, zero_crossings = struct.unpack('<HH', compressed[:4]) - else: - energy, zero_crossings = 1000, 100 - - # Generate synthetic speech-like signal - if HAS_NUMPY: - t = np.linspace(0, self.frame_ms/1000, self.frame_samples) - - # Base frequency from zero crossings - freq = zero_crossings * 10 # Simplified mapping - - # Generate harmonics - signal = np.zeros(self.frame_samples) - for harmonic in range(1, 4): - signal += np.sin(2 * np.pi * freq * harmonic * t) / harmonic - - # Apply energy envelope - signal *= energy / 10000.0 - - # Convert to 16-bit PCM - return (signal * 32767).astype(np.int16) - else: - # Manual generation without numpy - samples = [] - freq = zero_crossings * 10 - - for i in range(self.frame_samples): - t = i / 8000.0 # 8kHz sample rate - value = 0 - for harmonic in range(1, 4): - value += math.sin(2 * math.pi * freq * harmonic * t) / harmonic - - value *= energy / 10000.0 - # Clamp to 16-bit range - sample = int(value * 32767) - sample = max(-32768, min(32767, sample)) - samples.append(sample) - - return array.array('h', samples) - - -class FSKModem: - """ - 4-FSK modem for transmitting digital data over voice channels. - Designed to survive GSM/AMR/EVS vocoders. - """ - - def __init__(self, sample_rate: int = 8000, baud_rate: int = 600): - """ - Initialize FSK modem. - - Args: - sample_rate: Audio sample rate (Hz) - baud_rate: Symbol rate (baud) - """ - self.sample_rate = sample_rate - self.baud_rate = baud_rate - self.samples_per_symbol = int(sample_rate / baud_rate) - - # 4-FSK frequencies (300-3400 Hz band) - self.frequencies = [ - 600, # 00 - 1200, # 01 - 1800, # 10 - 2400 # 11 - ] - - # Preamble for synchronization (800 Hz, 100ms) - self.preamble_freq = 800 - self.preamble_duration = 0.1 # seconds - - print(f"{GREEN}[FSK]{RESET} Initialized 4-FSK modem " - f"({baud_rate} baud, frequencies: {self.frequencies})") - - def modulate(self, data: bytes, add_preamble: bool = True): - """ - Modulate binary data to FSK audio signal. - - Args: - data: Binary data to modulate - add_preamble: Whether to add synchronization preamble - - Returns: - Audio signal (normalized float32 array or list) - """ - # Convert bytes to dibits (2-bit symbols) - symbols = [] - for byte in data: - symbols.extend([ - (byte >> 6) & 0x03, - (byte >> 4) & 0x03, - (byte >> 2) & 0x03, - byte & 0x03 - ]) - - # Generate audio signal - signal = [] - - # Add preamble - if add_preamble: - preamble_samples = int(self.preamble_duration * self.sample_rate) - if HAS_NUMPY: - t = np.arange(preamble_samples) / self.sample_rate - preamble = np.sin(2 * np.pi * self.preamble_freq * t) - signal.extend(preamble) - else: - for i in range(preamble_samples): - t = i / self.sample_rate - value = math.sin(2 * math.pi * self.preamble_freq * t) - signal.append(value) - - # Modulate symbols - for symbol in symbols: - freq = self.frequencies[symbol] - if HAS_NUMPY: - t = np.arange(self.samples_per_symbol) / self.sample_rate - tone = np.sin(2 * np.pi * freq * t) - signal.extend(tone) - else: - for i in range(self.samples_per_symbol): - t = i / self.sample_rate - value = math.sin(2 * math.pi * freq * t) - signal.append(value) - - # Apply smoothing to reduce clicks - if HAS_NUMPY: - audio = np.array(signal, dtype=np.float32) - else: - audio = array.array('f', signal) - audio = self._apply_envelope(audio) - - return audio - - def demodulate(self, audio) -> Tuple[bytes, float]: - """ - Demodulate FSK audio signal to binary data. - - Args: - audio: Audio signal - - Returns: - Tuple of (demodulated data, confidence score) - """ - # Find preamble - preamble_start = self._find_preamble(audio) - if preamble_start < 0: - return b'', 0.0 - - # Skip preamble - data_start = preamble_start + int(self.preamble_duration * self.sample_rate) - - # Demodulate symbols - symbols = [] - confidence_scores = [] - - pos = data_start - while pos + self.samples_per_symbol <= len(audio): - symbol_audio = audio[pos:pos + self.samples_per_symbol] - symbol, confidence = self._demodulate_symbol(symbol_audio) - symbols.append(symbol) - confidence_scores.append(confidence) - pos += self.samples_per_symbol - - # Convert symbols to bytes - data = bytearray() - for i in range(0, len(symbols), 4): - if i + 3 < len(symbols): - byte = (symbols[i] << 6) | (symbols[i+1] << 4) | (symbols[i+2] << 2) | symbols[i+3] - data.append(byte) - - if HAS_NUMPY and confidence_scores: - avg_confidence = np.mean(confidence_scores) - else: - avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0 - return bytes(data), avg_confidence - - def _find_preamble(self, audio) -> int: - """Find preamble in audio signal.""" - # Simple energy-based detection - window_size = int(0.01 * self.sample_rate) # 10ms window - - if HAS_NUMPY: - for i in range(0, len(audio) - window_size, window_size // 2): - window = audio[i:i + window_size] - - # Check for preamble frequency - fft = np.fft.fft(window) - freqs = np.fft.fftfreq(len(window), 1/self.sample_rate) - - # Find peak near preamble frequency - idx = np.argmax(np.abs(fft[:len(fft)//2])) - peak_freq = abs(freqs[idx]) - - if abs(peak_freq - self.preamble_freq) < 50: # 50 Hz tolerance - return i - else: - # Simple zero-crossing based detection without FFT - for i in range(0, len(audio) - window_size, window_size // 2): - window = list(audio[i:i + window_size]) - - # Count zero crossings - zero_crossings = 0 - for j in range(1, len(window)): - if (window[j-1] >= 0) != (window[j] >= 0): - zero_crossings += 1 - - # Estimate frequency from zero crossings - estimated_freq = (zero_crossings * self.sample_rate) / (2 * len(window)) - - if abs(estimated_freq - self.preamble_freq) < 100: # 100 Hz tolerance - return i - - return -1 - - def _demodulate_symbol(self, audio) -> Tuple[int, float]: - """Demodulate a single FSK symbol.""" - if HAS_NUMPY: - # FFT-based demodulation - fft = np.fft.fft(audio) - freqs = np.fft.fftfreq(len(audio), 1/self.sample_rate) - magnitude = np.abs(fft[:len(fft)//2]) - - # Find energy at each FSK frequency - energies = [] - for freq in self.frequencies: - idx = np.argmin(np.abs(freqs[:len(freqs)//2] - freq)) - energy = magnitude[idx] - energies.append(energy) - - # Select symbol with highest energy - symbol = np.argmax(energies) - else: - # Goertzel algorithm for specific frequency detection - audio_list = list(audio) if hasattr(audio, '__iter__') else audio - energies = [] - - for freq in self.frequencies: - # Goertzel algorithm - omega = 2 * math.pi * freq / self.sample_rate - coeff = 2 * math.cos(omega) - - s_prev = 0 - s_prev2 = 0 - - for sample in audio_list: - s = sample + coeff * s_prev - s_prev2 - s_prev2 = s_prev - s_prev = s - - # Calculate magnitude - power = s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2 - energies.append(math.sqrt(abs(power))) - - # Select symbol with highest energy - symbol = energies.index(max(energies)) - - # Confidence is ratio of strongest to second strongest - sorted_energies = sorted(energies, reverse=True) - confidence = sorted_energies[0] / (sorted_energies[1] + 1e-6) - - return symbol, min(confidence, 10.0) / 10.0 - - def _apply_envelope(self, audio): - """Apply smoothing envelope to reduce clicks.""" - # Simple raised cosine envelope - ramp_samples = int(0.002 * self.sample_rate) # 2ms ramps - - if len(audio) > 2 * ramp_samples: - if HAS_NUMPY: - # Fade in - t = np.linspace(0, np.pi/2, ramp_samples) - audio[:ramp_samples] *= np.sin(t) ** 2 - - # Fade out - audio[-ramp_samples:] *= np.sin(t[::-1]) ** 2 - else: - # Manual fade in - for i in range(ramp_samples): - t = (i / ramp_samples) * (math.pi / 2) - factor = math.sin(t) ** 2 - audio[i] *= factor - - # Manual fade out - for i in range(ramp_samples): - t = ((ramp_samples - 1 - i) / ramp_samples) * (math.pi / 2) - factor = math.sin(t) ** 2 - audio[-(i+1)] *= factor - - return audio - - -class VoiceProtocol: - """ - Integrates voice codec and modem with the Icing protocol - for encrypted voice transmission over GSM. - """ - - def __init__(self, protocol_instance): - """ - Initialize voice protocol handler. - - Args: - protocol_instance: IcingProtocol instance - """ - self.protocol = protocol_instance - self.codec = Codec2Wrapper(Codec2Mode.MODE_1200) - self.modem = FSKModem(sample_rate=8000, baud_rate=600) - - # Voice crypto state - self.voice_iv_counter = 0 - self.voice_sequence = 0 - - # Buffers - if HAS_NUMPY: - self.audio_buffer = np.array([], dtype=np.int16) - else: - self.audio_buffer = array.array('h') # 16-bit signed integers - self.frame_buffer = [] - - print(f"{GREEN}[VOICE]{RESET} Voice protocol initialized") - - def process_voice_input(self, audio_samples): - """ - Process voice input: compress, encrypt, and modulate. - - Args: - audio_samples: PCM audio samples (8kHz, 16-bit) - - Returns: - Modulated audio signal ready for transmission (numpy array or array.array) - """ - # Add to buffer - if HAS_NUMPY: - self.audio_buffer = np.concatenate([self.audio_buffer, audio_samples]) - else: - self.audio_buffer.extend(audio_samples) - - # Process complete frames - modulated_audio = [] - - while len(self.audio_buffer) >= self.codec.frame_samples: - # Extract frame - if HAS_NUMPY: - frame_audio = self.audio_buffer[:self.codec.frame_samples] - self.audio_buffer = self.audio_buffer[self.codec.frame_samples:] - else: - frame_audio = array.array('h', self.audio_buffer[:self.codec.frame_samples]) - del self.audio_buffer[:self.codec.frame_samples] - - # Compress with Codec2 - compressed_frame = self.codec.encode(frame_audio) - if not compressed_frame: - continue - - # Encrypt frame - encrypted = self._encrypt_voice_frame(compressed_frame) - - # Add FEC - protected = self._add_fec(encrypted) - - # Modulate to audio - audio_signal = self.modem.modulate(protected, add_preamble=True) - modulated_audio.append(audio_signal) - - if modulated_audio: - if HAS_NUMPY: - return np.concatenate(modulated_audio) - else: - # Concatenate array.array objects - result = array.array('f') - for audio in modulated_audio: - result.extend(audio) - return result - return None - - def process_voice_output(self, modulated_audio): - """ - Process received audio: demodulate, decrypt, and decompress. - - Args: - modulated_audio: Received FSK-modulated audio - - Returns: - Decoded PCM audio samples (numpy array or array.array) - """ - # Demodulate - data, confidence = self.modem.demodulate(modulated_audio) - - if confidence < 0.5: - print(f"{YELLOW}[VOICE]{RESET} Low demodulation confidence: {confidence:.2f}") - return None - - # Remove FEC - frame_data = self._remove_fec(data) - if not frame_data: - return None - - # Decrypt - compressed_frame = self._decrypt_voice_frame(frame_data) - if not compressed_frame: - return None - - # Decompress - audio_samples = self.codec.decode(compressed_frame) - - return audio_samples - - def _encrypt_voice_frame(self, frame: Codec2Frame) -> bytes: - """Encrypt a voice frame using ChaCha20-CTR.""" - if not self.protocol.hkdf_key: - raise ValueError("No encryption key available") - - # Prepare frame data - frame_data = struct.pack('<BIH', - frame.mode, - frame.frame_number, - len(frame.bits) - ) + frame.bits - - # Generate IV for this frame (ChaCha20 needs 16 bytes) - iv = struct.pack('<Q', self.voice_iv_counter) + b'\x00' * 8 # 8 + 8 = 16 bytes - self.voice_iv_counter += 1 - - # Encrypt using ChaCha20 - from encryption import chacha20_encrypt - key = bytes.fromhex(self.protocol.hkdf_key) - encrypted = chacha20_encrypt(frame_data, key, iv) - - # Add sequence number and IV hint - return struct.pack('<HQ', self.voice_sequence, self.voice_iv_counter) + encrypted - - def _decrypt_voice_frame(self, data: bytes) -> Optional[Codec2Frame]: - """Decrypt a voice frame.""" - if len(data) < 10: - return None - - # Extract sequence and IV hint - sequence, iv_hint = struct.unpack('<HQ', data[:10]) - encrypted = data[10:] - - # Generate IV (16 bytes for ChaCha20) - iv = struct.pack('<Q', iv_hint) + b'\x00' * 8 - - # Decrypt - from encryption import chacha20_decrypt - key = bytes.fromhex(self.protocol.hkdf_key) - - try: - decrypted = chacha20_decrypt(encrypted, key, iv) - - # Parse frame - mode, frame_num, bits_len = struct.unpack('<BIH', decrypted[:7]) - bits = decrypted[7:7+bits_len] - - return Codec2Frame( - mode=Codec2Mode(mode), - bits=bits, - timestamp=0, # Will be set by caller - frame_number=frame_num - ) - except Exception as e: - print(f"{RED}[VOICE]{RESET} Decryption failed: {e}") - return None - - def _add_fec(self, data: bytes) -> bytes: - """Add forward error correction.""" - # Simple repetition code (3x) for testing - # In production: use convolutional code or LDPC - fec_data = bytearray() - - for byte in data: - # Repeat each byte 3 times - fec_data.extend([byte, byte, byte]) - - return bytes(fec_data) - - def _remove_fec(self, data: bytes) -> Optional[bytes]: - """Remove FEC and correct errors.""" - if len(data) % 3 != 0: - return None - - corrected = bytearray() - - for i in range(0, len(data), 3): - # Majority voting - votes = [data[i], data[i+1], data[i+2]] - byte_value = max(set(votes), key=votes.count) - corrected.append(byte_value) - - return bytes(corrected) - - -# Example usage -if __name__ == "__main__": - # Test Codec2 wrapper - print(f"\n{BLUE}=== Testing Codec2 Wrapper ==={RESET}") - codec = Codec2Wrapper(Codec2Mode.MODE_1200) - - # Generate test audio - if HAS_NUMPY: - t = np.linspace(0, 0.04, 320) # 40ms at 8kHz - test_audio = (np.sin(2 * np.pi * 440 * t) * 16384).astype(np.int16) - else: - test_audio = array.array('h') - for i in range(320): - t = i * 0.04 / 320 - value = int(math.sin(2 * math.pi * 440 * t) * 16384) - test_audio.append(value) - - # Encode - frame = codec.encode(test_audio) - print(f"Encoded frame: {len(frame.bits)} bytes") - - # Decode - decoded = codec.decode(frame) - print(f"Decoded audio: {len(decoded)} samples") - - # Test FSK modem - print(f"\n{BLUE}=== Testing FSK Modem ==={RESET}") - modem = FSKModem() - - # Test data - test_data = b"Hello, secure voice!" - - # Modulate - modulated = modem.modulate(test_data) - print(f"Modulated: {len(modulated)} samples ({len(modulated)/8000:.2f}s)") - - # Demodulate - demodulated, confidence = modem.demodulate(modulated) - print(f"Demodulated: {demodulated}") - print(f"Confidence: {confidence:.2%}") - print(f"Match: {demodulated == test_data}") \ No newline at end of file