From 75f54dc90ab8783fc03d5b72bc6c19f85fdaea31 Mon Sep 17 00:00:00 2001 From: Bartosz Date: Sun, 15 Jun 2025 11:59:27 +0100 Subject: [PATCH] add protocole into drybox --- .../Prototype/Protocol/IcingProtocol.drawio | 566 +++++++++ .../Protocol/VOICE_PROTOCOL_README.md | 119 ++ .../Prototype/Protocol/auto_mode.py | 430 +++++++ protocol_prototype/Prototype/Protocol/cli.py | 328 +++++ .../Prototype/Protocol/crypto_utils.py | 165 +++ .../Prototype/Protocol/encryption.py | 307 +++++ .../Prototype/Protocol/messages.py | 463 +++++++ .../Prototype/Protocol/protocol.py | 1069 +++++++++++++++++ .../Prototype/Protocol/transmission.py | 100 ++ .../Prototype/Protocol/voice_codec.py | 716 +++++++++++ 10 files changed, 4263 insertions(+) create mode 100644 protocol_prototype/Prototype/Protocol/IcingProtocol.drawio create mode 100644 protocol_prototype/Prototype/Protocol/VOICE_PROTOCOL_README.md create mode 100644 protocol_prototype/Prototype/Protocol/auto_mode.py create mode 100644 protocol_prototype/Prototype/Protocol/cli.py create mode 100644 protocol_prototype/Prototype/Protocol/crypto_utils.py create mode 100644 protocol_prototype/Prototype/Protocol/encryption.py create mode 100644 protocol_prototype/Prototype/Protocol/messages.py create mode 100644 protocol_prototype/Prototype/Protocol/protocol.py create mode 100644 protocol_prototype/Prototype/Protocol/transmission.py create mode 100644 protocol_prototype/Prototype/Protocol/voice_codec.py diff --git a/protocol_prototype/Prototype/Protocol/IcingProtocol.drawio b/protocol_prototype/Prototype/Protocol/IcingProtocol.drawio new file mode 100644 index 0000000..8f46988 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/IcingProtocol.drawio @@ -0,0 +1,566 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/protocol_prototype/Prototype/Protocol/VOICE_PROTOCOL_README.md b/protocol_prototype/Prototype/Protocol/VOICE_PROTOCOL_README.md new file mode 100644 index 0000000..885e798 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/VOICE_PROTOCOL_README.md @@ -0,0 +1,119 @@ +# Voice-over-GSM Protocol Implementation + +This implementation provides encrypted voice communication over standard GSM voice channels without requiring CSD/HSCSD. + +## Architecture + +### 1. Voice Codec (`voice_codec.py`) +- **Codec2Wrapper**: Simulates Codec2 compression + - Supports multiple bitrates (700-3200 bps) + - Default: 1200 bps for GSM robustness + - 40ms frames (48 bits/frame at 1200 bps) + +- **FSKModem**: 4-FSK modulation for voice channels + - Frequency band: 300-3400 Hz (GSM compatible) + - Symbol rate: 600 baud + - 4 frequencies: 600, 1200, 1800, 2400 Hz + - Preamble: 800 Hz for 100ms + +- **VoiceProtocol**: Integration layer + - Manages codec and modem + - Handles encryption with ChaCha20-CTR + - Frame-based processing + +### 2. Protocol Messages (`messages.py`) +- **VoiceStart** (20 bytes): Initiates voice call + - Version, codec mode, FEC type + - Session ID (64 bits) + - Initial sequence number + +- **VoiceAck** (16 bytes): Accepts/rejects call + - Status (accept/reject) + - Negotiated codec and FEC + +- **VoiceEnd** (12 bytes): Terminates call + - Session ID for confirmation + +- **VoiceSync** (20 bytes): Synchronization + - Sequence number and timestamp + - For jitter buffer management + +### 3. Encryption (`encryption.py`) +- **ChaCha20-CTR**: Stream cipher for voice + - No authentication overhead (HMAC per second) + - 12-byte nonce with frame counter + - Uses HKDF-derived key from main protocol + +### 4. Protocol Integration (`protocol.py`) +- Voice session management +- Message handlers for all voice messages +- Methods: + - `start_voice_call()`: Initiate call + - `accept_voice_call()`: Accept incoming + - `end_voice_call()`: Terminate + - `send_voice_audio()`: Process audio + +## Usage Example + +```python +# After key exchange is complete +alice.start_voice_call(codec_mode=5, fec_type=0) + +# Bob automatically accepts if in auto mode +# Or manually: bob.accept_voice_call(session_id, codec_mode, fec_type) + +# Send audio +audio_samples = generate_audio() # 8kHz, 16-bit PCM +alice.send_voice_audio(audio_samples) + +# End call +alice.end_voice_call() +``` + +## Key Features + +1. **Codec2 @ 1200 bps** + - Optimal for GSM vocoder survival + - Intelligible but "robotic" quality + +2. **4-FSK Modulation** + - Survives GSM/AMR/EVS vocoders + - 2400 baud with FEC + +3. **ChaCha20-CTR Encryption** + - Low latency stream cipher + - Frame-based IV management + +4. **Forward Error Correction** + - Repetition code (3x) + - Future: Convolutional or LDPC + +5. **No Special Requirements** + - Works over standard voice calls + - Compatible with any phone + - Software-only solution + +## Testing + +Run the test scripts: +- `test_voice_simple.py`: Basic voice call setup +- `test_voice_protocol.py`: Full test with audio simulation (requires numpy) + +## Implementation Notes + +1. Message disambiguation: VoiceStart sets high bit in flags field to distinguish from VoiceSync (both 20 bytes) + +2. The actual Codec2 library would need to be integrated for production use + +3. FEC implementation is simplified (repetition code) - production would use convolutional codes + +4. Audio I/O integration needed for real voice calls + +5. Jitter buffer and timing recovery needed for production + +## Security Considerations + +- Voice frames use ChaCha20-CTR without per-frame authentication +- HMAC computed over 1-second blocks for efficiency +- Session binding through encrypted session ID +- PFS maintained through main protocol key rotation \ No newline at end of file diff --git a/protocol_prototype/Prototype/Protocol/auto_mode.py b/protocol_prototype/Prototype/Protocol/auto_mode.py new file mode 100644 index 0000000..690ba9c --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/auto_mode.py @@ -0,0 +1,430 @@ +import time +import threading +import queue +from typing import Optional, Dict, Any, List, Callable, Tuple + +# ANSI colors for logging +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +RESET = "\033[0m" + +class AutoModeConfig: + """Configuration parameters for the automatic mode behavior.""" + def __init__(self): + # Ping behavior + self.ping_response_accept = True # Whether to accept incoming pings + self.ping_auto_initiate = False # Whether to initiate pings when connected + self.ping_retry_count = 3 # Number of ping retries + self.ping_retry_delay = 5.0 # Seconds between ping retries + self.ping_timeout = 10.0 # Seconds to wait for ping response + self.preferred_cipher = 0 # 0=AES-GCM, 1=ChaCha20-Poly1305 + + # Handshake behavior + self.handshake_retry_count = 3 # Number of handshake retries + self.handshake_retry_delay = 5.0 # Seconds between handshake retries + self.handshake_timeout = 10.0 # Seconds to wait for handshake + + # Messaging behavior + self.auto_message_enabled = False # Whether to auto-send messages + self.message_interval = 10.0 # Seconds between auto messages + self.message_content = "Hello, secure world!" # Default message + + # General behavior + self.active_mode = False # If true, initiates protocol instead of waiting + + +class AutoMode: + """ + Manages automated behavior for the Icing protocol. + Handles automatic progression through the protocol stages: + 1. Connection setup + 2. Ping/discovery + 3. Key exchange + 4. Encrypted communication + """ + + def __init__(self, protocol_interface): + """ + Initialize the AutoMode manager. + + Args: + protocol_interface: An object implementing the required protocol methods + """ + self.protocol = protocol_interface + self.config = AutoModeConfig() + self.active = False + self.state = "idle" + + # Message queue for automated sending + self.message_queue = queue.Queue() + + # Tracking variables + self.ping_attempts = 0 + self.handshake_attempts = 0 + self.last_action_time = 0 + self.timer_tasks = [] # List of active timer tasks (for cleanup) + + def start(self): + """Start the automatic mode.""" + if self.active: + return + + self.active = True + self.state = "idle" + self.ping_attempts = 0 + self.handshake_attempts = 0 + self.last_action_time = time.time() + + self._log_info("Automatic mode started") + + # Start in active mode if configured + if self.config.active_mode and self.protocol.connections: + self._start_ping_sequence() + + def stop(self): + """Stop the automatic mode and clean up any pending tasks.""" + if not self.active: + return + + # Cancel any pending timers + for timer in self.timer_tasks: + if timer.is_alive(): + timer.cancel() + self.timer_tasks = [] + + self.active = False + self.state = "idle" + self._log_info("Automatic mode stopped") + + def handle_connection_established(self): + """Called when a new connection is established.""" + if not self.active: + return + + self._log_info("Connection established") + + # If in active mode, start pinging + if self.config.active_mode: + self._start_ping_sequence() + + def handle_ping_received(self, index: int): + """ + Handle a received ping request. + + Args: + index: Index of the ping request in the protocol's inbound message queue + """ + if not self.active or not self._is_valid_message_index(index): + return + + self._log_info(f"Ping request received (index={index})") + + # Automatically respond to ping if configured to accept + if self.config.ping_response_accept: + self._log_info(f"Auto-responding to ping with accept={self.config.ping_response_accept}") + try: + # Schedule the response with a small delay to simulate real behavior + timer = threading.Timer(0.5, self._respond_to_ping, args=[index]) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + except Exception as e: + self._log_error(f"Failed to auto-respond to ping: {e}") + + def handle_ping_response_received(self, accepted: bool): + """ + Handle a received ping response. + + Args: + accepted: Whether the ping was accepted + """ + if not self.active: + return + + self.ping_attempts = 0 # Reset ping attempts counter + + if accepted: + self._log_info("Ping accepted! Proceeding with handshake") + # Send handshake if not already done + if self.state != "handshake_sent": + self._ensure_ephemeral_keys() + self._start_handshake_sequence() + else: + self._log_info("Ping rejected by peer. Stopping auto-protocol sequence.") + self.state = "idle" + + def handle_handshake_received(self, index: int): + """ + Handle a received handshake. + + Args: + index: Index of the handshake in the protocol's inbound message queue + """ + if not self.active or not self._is_valid_message_index(index): + return + + self._log_info(f"Handshake received (index={index})") + + try: + # Ensure we have ephemeral keys + self._ensure_ephemeral_keys() + + # Process the handshake (compute ECDH) + self.protocol.generate_ecdhe(index) + + # Derive HKDF key + self.protocol.derive_hkdf() + + # If we haven't sent our handshake yet, send it + if self.state != "handshake_sent": + timer = threading.Timer(0.5, self.protocol.send_handshake) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + self.state = "handshake_sent" + else: + self.state = "key_exchange_complete" + + # Start sending queued messages if auto messaging is enabled + if self.config.auto_message_enabled: + self._start_message_sequence() + + except Exception as e: + self._log_error(f"Failed to process handshake: {e}") + + def handle_encrypted_received(self, index: int): + """ + Handle a received encrypted message. + + Args: + index: Index of the encrypted message in the protocol's inbound message queue + """ + if not self.active or not self._is_valid_message_index(index): + return + + # Try to decrypt automatically + try: + plaintext = self.protocol.decrypt_received_message(index) + self._log_info(f"Auto-decrypted message: {plaintext}") + except Exception as e: + self._log_error(f"Failed to auto-decrypt message: {e}") + + def queue_message(self, message: str): + """ + Add a message to the auto-send queue. + + Args: + message: Message text to send + """ + self.message_queue.put(message) + self._log_info(f"Message queued for sending: {message}") + + # If we're in the right state, start sending messages + if self.active and self.state == "key_exchange_complete" and self.config.auto_message_enabled: + self._process_message_queue() + + def _start_ping_sequence(self): + """Start the ping sequence to discover the peer.""" + if self.ping_attempts >= self.config.ping_retry_count: + self._log_warning(f"Maximum ping attempts ({self.config.ping_retry_count}) reached") + self.state = "idle" + return + + self.state = "pinging" + self.ping_attempts += 1 + + self._log_info(f"Sending ping request (attempt {self.ping_attempts}/{self.config.ping_retry_count})") + try: + self.protocol.send_ping_request(self.config.preferred_cipher) + self.last_action_time = time.time() + + # Schedule next ping attempt if needed + timer = threading.Timer( + self.config.ping_retry_delay, + self._check_ping_response + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + + except Exception as e: + self._log_error(f"Failed to send ping: {e}") + + def _check_ping_response(self): + """Check if we got a ping response, retry if not.""" + if not self.active or self.state != "pinging": + return + + # If we've waited long enough for a response, retry + if time.time() - self.last_action_time >= self.config.ping_timeout: + self._log_warning("No ping response received, retrying") + self._start_ping_sequence() + + def _respond_to_ping(self, index: int): + """ + Respond to a ping request. + + Args: + index: Index of the ping request in the inbound messages + """ + if not self.active or not self._is_valid_message_index(index): + return + + try: + answer = 1 if self.config.ping_response_accept else 0 + self.protocol.respond_to_ping(index, answer) + + if answer == 1: + # If we accepted, we should expect a handshake + self.state = "accepted_ping" + self._ensure_ephemeral_keys() + + # Set a timer to send our handshake if we don't receive one + timer = threading.Timer( + self.config.handshake_timeout, + self._check_handshake_received + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + self.last_action_time = time.time() + + except Exception as e: + self._log_error(f"Failed to respond to ping: {e}") + + def _check_handshake_received(self): + """Check if we've received a handshake after accepting a ping.""" + if not self.active or self.state != "accepted_ping": + return + + # If we've waited long enough and haven't received a handshake, initiate one + if time.time() - self.last_action_time >= self.config.handshake_timeout: + self._log_warning("No handshake received after accepting ping, initiating handshake") + self._start_handshake_sequence() + + def _start_handshake_sequence(self): + """Start the handshake sequence.""" + if self.handshake_attempts >= self.config.handshake_retry_count: + self._log_warning(f"Maximum handshake attempts ({self.config.handshake_retry_count}) reached") + self.state = "idle" + return + + self.state = "handshake_sent" + self.handshake_attempts += 1 + + self._log_info(f"Sending handshake (attempt {self.handshake_attempts}/{self.config.handshake_retry_count})") + try: + self.protocol.send_handshake() + self.last_action_time = time.time() + + # Schedule handshake retry check + timer = threading.Timer( + self.config.handshake_retry_delay, + self._check_handshake_response + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + + except Exception as e: + self._log_error(f"Failed to send handshake: {e}") + + def _check_handshake_response(self): + """Check if we've completed the key exchange, retry handshake if not.""" + if not self.active or self.state != "handshake_sent": + return + + # If we've waited long enough for a response, retry + if time.time() - self.last_action_time >= self.config.handshake_timeout: + self._log_warning("No handshake response received, retrying") + self._start_handshake_sequence() + + def _start_message_sequence(self): + """Start the automated message sending sequence.""" + if not self.config.auto_message_enabled: + return + + self._log_info("Starting automated message sequence") + + # Add the default message if queue is empty + if self.message_queue.empty(): + self.message_queue.put(self.config.message_content) + + # Start processing the queue + self._process_message_queue() + + def _process_message_queue(self): + """Process messages in the queue and send them.""" + if not self.active or self.state != "key_exchange_complete" or not self.config.auto_message_enabled: + return + + if not self.message_queue.empty(): + message = self.message_queue.get() + self._log_info(f"Sending queued message: {message}") + + try: + self.protocol.send_encrypted_message(message) + + # Schedule next message send + timer = threading.Timer( + self.config.message_interval, + self._process_message_queue + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + + except Exception as e: + self._log_error(f"Failed to send queued message: {e}") + # Put the message back in the queue + self.message_queue.put(message) + + def _ensure_ephemeral_keys(self): + """Ensure ephemeral keys are generated if needed.""" + if not hasattr(self.protocol, 'ephemeral_pubkey') or self.protocol.ephemeral_pubkey is None: + self._log_info("Generating ephemeral keys") + self.protocol.generate_ephemeral_keys() + + def _is_valid_message_index(self, index: int) -> bool: + """ + Check if a message index is valid in the protocol's inbound_messages queue. + + Args: + index: The index to check + + Returns: + bool: True if the index is valid, False otherwise + """ + if not hasattr(self.protocol, 'inbound_messages'): + self._log_error("Protocol has no inbound_messages attribute") + return False + + if index < 0 or index >= len(self.protocol.inbound_messages): + self._log_error(f"Invalid message index: {index}") + return False + + return True + + # Helper methods for logging + def _log_info(self, message: str): + print(f"{BLUE}[AUTO]{RESET} {message}") + if hasattr(self, 'verbose_logging') and self.verbose_logging: + state_info = f"(state={self.state})" + if 'pinging' in self.state and hasattr(self, 'ping_attempts'): + state_info += f", attempts={self.ping_attempts}/{self.config.ping_retry_count}" + elif 'handshake' in self.state and hasattr(self, 'handshake_attempts'): + state_info += f", attempts={self.handshake_attempts}/{self.config.handshake_retry_count}" + print(f"{BLUE}[AUTO-DETAIL]{RESET} {state_info}") + + def _log_warning(self, message: str): + print(f"{YELLOW}[AUTO-WARN]{RESET} {message}") + if hasattr(self, 'verbose_logging') and self.verbose_logging: + timer_info = f"Active timers: {len(self.timer_tasks)}" + print(f"{YELLOW}[AUTO-WARN-DETAIL]{RESET} {timer_info}") + + def _log_error(self, message: str): + print(f"{RED}[AUTO-ERROR]{RESET} {message}") + if hasattr(self, 'verbose_logging') and self.verbose_logging: + print(f"{RED}[AUTO-ERROR-DETAIL]{RESET} Current state: {self.state}, Active: {self.active}") \ No newline at end of file diff --git a/protocol_prototype/Prototype/Protocol/cli.py b/protocol_prototype/Prototype/Protocol/cli.py new file mode 100644 index 0000000..53c3f01 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/cli.py @@ -0,0 +1,328 @@ +import sys +import argparse +import shlex +from protocol import IcingProtocol + +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +MAGENTA = "\033[95m" +CYAN = "\033[96m" +RESET = "\033[0m" + +def print_help(): + """Display all available commands.""" + print(f"\n{YELLOW}=== Available Commands ==={RESET}") + print(f"\n{CYAN}Basic Protocol Commands:{RESET}") + print(" help - Show this help message") + print(" peer_id - Set peer identity public key") + print(" connect - Connect to a peer at the specified port") + print(" show_state - Display current protocol state") + print(" exit - Exit the program") + + print(f"\n{CYAN}Manual Protocol Operation:{RESET}") + print(" generate_ephemeral_keys - Generate ephemeral ECDH keys") + print(" send_ping [cipher] - Send PING request (cipher: 0=AES-GCM, 1=ChaCha20-Poly1305, default: 0)") + print(" respond_ping <0|1> - Respond to a PING (0=reject, 1=accept)") + print(" send_handshake - Send handshake with ephemeral keys") + print(" generate_ecdhe - Process handshake at specified index") + print(" derive_hkdf - Derive encryption key using HKDF") + print(" send_encrypted - Encrypt and send a message") + print(" decrypt <index> - Decrypt received message at index") + + print(f"\n{CYAN}Automatic Mode Commands:{RESET}") + print(" auto start - Start automatic mode") + print(" auto stop - Stop automatic mode") + print(" auto status - Show current auto mode status and configuration") + print(" auto config <param> <value> - Configure auto mode parameters") + print(" auto config list - Show all configurable parameters") + print(" auto message <text> - Queue message for automatic sending") + print(" auto passive - Configure as passive peer (responds to pings but doesn't initiate)") + print(" auto active - Configure as active peer (initiates protocol)") + print(" auto log - Toggle detailed logging for auto mode") + + print(f"\n{CYAN}Debugging Commands:{RESET}") + print(" debug_message <index> - Display detailed information about a message in the queue") + + print(f"\n{CYAN}Legacy Commands:{RESET}") + print(" auto_responder <on|off> - Enable/disable legacy auto responder (deprecated)") + + +def main(): + protocol = IcingProtocol() + + print(f"{YELLOW}\n======================================") + print(" Icing Protocol - Secure Communication ") + print("======================================\n" + RESET) + print(f"Listening on port: {protocol.local_port}") + print(f"Your identity public key (hex): {protocol.identity_pubkey.hex()}") + print_help() + + while True: + try: + line = input(f"{MAGENTA}Cmd>{RESET} ").strip() + except EOFError: + break + if not line: + continue + + parts = shlex.split(line) # Handle quoted arguments properly + cmd = parts[0].lower() + + try: + # Basic commands + if cmd == "exit": + protocol.stop() + break + + elif cmd == "help": + print_help() + + elif cmd == "show_state": + protocol.show_state() + + elif cmd == "peer_id": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: peer_id <hex_pubkey>") + continue + try: + protocol.set_peer_identity(parts[1]) + except ValueError as e: + print(f"{RED}[ERROR]{RESET} Invalid public key: {e}") + + elif cmd == "connect": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: connect <port>") + continue + try: + port = int(parts[1]) + protocol.connect_to_peer(port) + except ValueError: + print(f"{RED}[ERROR]{RESET} Invalid port number.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Connection failed: {e}") + + # Manual protocol operation + elif cmd == "generate_ephemeral_keys": + protocol.generate_ephemeral_keys() + + elif cmd == "send_ping": + # Optional cipher parameter (0 = AES-GCM, 1 = ChaCha20-Poly1305) + cipher = 0 # Default to AES-GCM + if len(parts) >= 2: + try: + cipher = int(parts[1]) + if cipher not in (0, 1): + print(f"{YELLOW}[WARNING]{RESET} Unsupported cipher code {cipher}. Using AES-GCM (0).") + cipher = 0 + except ValueError: + print(f"{YELLOW}[WARNING]{RESET} Invalid cipher code. Using AES-GCM (0).") + protocol.send_ping_request(cipher) + + elif cmd == "send_handshake": + protocol.send_handshake() + + elif cmd == "respond_ping": + if len(parts) != 3: + print(f"{RED}[ERROR]{RESET} Usage: respond_ping <index> <0|1>") + continue + try: + idx = int(parts[1]) + answer = int(parts[2]) + if answer not in (0, 1): + print(f"{RED}[ERROR]{RESET} Answer must be 0 (reject) or 1 (accept).") + continue + protocol.respond_to_ping(idx, answer) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index and answer must be integers.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to respond to ping: {e}") + + elif cmd == "generate_ecdhe": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: generate_ecdhe <index>") + continue + try: + idx = int(parts[1]) + protocol.generate_ecdhe(idx) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index must be an integer.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to process handshake: {e}") + + elif cmd == "derive_hkdf": + try: + protocol.derive_hkdf() + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to derive HKDF key: {e}") + + elif cmd == "send_encrypted": + if len(parts) < 2: + print(f"{RED}[ERROR]{RESET} Usage: send_encrypted <plaintext>") + continue + plaintext = " ".join(parts[1:]) + try: + protocol.send_encrypted_message(plaintext) + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to send encrypted message: {e}") + + elif cmd == "decrypt": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: decrypt <index>") + continue + try: + idx = int(parts[1]) + protocol.decrypt_received_message(idx) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index must be an integer.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to decrypt message: {e}") + + # Debugging commands + elif cmd == "debug_message": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: debug_message <index>") + continue + try: + idx = int(parts[1]) + protocol.debug_message(idx) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index must be an integer.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to debug message: {e}") + + # Automatic mode commands + elif cmd == "auto": + if len(parts) < 2: + print(f"{RED}[ERROR]{RESET} Usage: auto <command> [options]") + print("Available commands: start, stop, status, config, message, passive, active") + continue + + subcmd = parts[1].lower() + + if subcmd == "start": + protocol.start_auto_mode() + print(f"{GREEN}[AUTO]{RESET} Automatic mode started") + + elif subcmd == "stop": + protocol.stop_auto_mode() + print(f"{GREEN}[AUTO]{RESET} Automatic mode stopped") + + elif subcmd == "status": + config = protocol.get_auto_mode_config() + print(f"{YELLOW}=== Auto Mode Status ==={RESET}") + print(f"Active: {protocol.auto_mode.active}") + print(f"State: {protocol.auto_mode.state}") + print(f"\n{YELLOW}--- Configuration ---{RESET}") + for key, value in vars(config).items(): + print(f" {key}: {value}") + + elif subcmd == "config": + if len(parts) < 3: + print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value> or auto config list") + continue + + if parts[2].lower() == "list": + config = protocol.get_auto_mode_config() + print(f"{YELLOW}=== Auto Mode Configuration Parameters ==={RESET}") + for key, value in vars(config).items(): + print(f" {key} ({type(value).__name__}): {value}") + continue + + if len(parts) != 4: + print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value>") + continue + + param = parts[2] + value_str = parts[3] + + # Convert the string value to the appropriate type + config = protocol.get_auto_mode_config() + if not hasattr(config, param): + print(f"{RED}[ERROR]{RESET} Unknown parameter: {param}") + print("Use 'auto config list' to see all available parameters") + continue + + current_value = getattr(config, param) + try: + if isinstance(current_value, bool): + if value_str.lower() in ("true", "yes", "on", "1"): + value = True + elif value_str.lower() in ("false", "no", "off", "0"): + value = False + else: + raise ValueError(f"Boolean value must be true/false/yes/no/on/off/1/0") + elif isinstance(current_value, int): + value = int(value_str) + elif isinstance(current_value, float): + value = float(value_str) + elif isinstance(current_value, str): + value = value_str + else: + value = value_str # Default to string + + protocol.configure_auto_mode(**{param: value}) + print(f"{GREEN}[AUTO]{RESET} Set {param} = {value}") + + except ValueError as e: + print(f"{RED}[ERROR]{RESET} Invalid value for {param}: {e}") + + elif subcmd == "message": + if len(parts) < 3: + print(f"{RED}[ERROR]{RESET} Usage: auto message <text>") + continue + + message = " ".join(parts[2:]) + protocol.queue_auto_message(message) + print(f"{GREEN}[AUTO]{RESET} Message queued for sending: {message}") + + elif subcmd == "passive": + # Configure as passive peer (responds but doesn't initiate) + protocol.configure_auto_mode( + ping_response_accept=True, + ping_auto_initiate=False, + active_mode=False + ) + print(f"{GREEN}[AUTO]{RESET} Configured as passive peer") + + elif subcmd == "active": + # Configure as active peer (initiates protocol) + protocol.configure_auto_mode( + ping_response_accept=True, + ping_auto_initiate=True, + active_mode=True + ) + print(f"{GREEN}[AUTO]{RESET} Configured as active peer") + + else: + print(f"{RED}[ERROR]{RESET} Unknown auto mode command: {subcmd}") + print("Available commands: start, stop, status, config, message, passive, active") + + # Legacy commands + elif cmd == "auto_responder": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: auto_responder <on|off>") + continue + val = parts[1].lower() + if val not in ("on", "off"): + print(f"{RED}[ERROR]{RESET} Value must be 'on' or 'off'.") + continue + protocol.enable_auto_responder(val == "on") + print(f"{YELLOW}[WARNING]{RESET} Using legacy auto responder. Consider using 'auto' commands instead.") + + else: + print(f"{RED}[ERROR]{RESET} Unknown command: {cmd}") + print("Type 'help' for a list of available commands.") + + except Exception as e: + print(f"{RED}[ERROR]{RESET} Command failed: {e}") + +if __name__ == "__main__": + try: + main() + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"{RED}[FATAL ERROR]{RESET} {e}") + sys.exit(1) diff --git a/protocol_prototype/Prototype/Protocol/crypto_utils.py b/protocol_prototype/Prototype/Protocol/crypto_utils.py new file mode 100644 index 0000000..8c2e110 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/crypto_utils.py @@ -0,0 +1,165 @@ +import os +from typing import Tuple +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, utils +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature + +def generate_identity_keys() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: + """ + Generate an ECDSA (P-256) identity key pair. + + Returns: + Tuple containing: + - private_key: EllipticCurvePrivateKey object + - public_key_bytes: Raw x||y format (64 bytes, 512 bits) + """ + private_key = ec.generate_private_key(ec.SECP256R1()) + public_numbers = private_key.public_key().public_numbers() + + x_bytes = public_numbers.x.to_bytes(32, byteorder='big') + y_bytes = public_numbers.y.to_bytes(32, byteorder='big') + pubkey_bytes = x_bytes + y_bytes # 64 bytes total + + return private_key, pubkey_bytes + + +def load_peer_identity_key(pubkey_bytes: bytes) -> ec.EllipticCurvePublicKey: + """ + Convert a raw public key (64 bytes, x||y format) to a cryptography public key object. + + Args: + pubkey_bytes: Raw 64-byte public key (x||y format) + + Returns: + EllipticCurvePublicKey object + + Raises: + ValueError: If the pubkey_bytes is not exactly 64 bytes + """ + if len(pubkey_bytes) != 64: + raise ValueError("Peer identity pubkey must be exactly 64 bytes (x||y).") + + x_int = int.from_bytes(pubkey_bytes[:32], byteorder='big') + y_int = int.from_bytes(pubkey_bytes[32:], byteorder='big') + + public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) + return public_numbers.public_key() + + +def sign_data(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> bytes: + """ + Sign data with ECDSA using a P-256 private key. + + Args: + private_key: EllipticCurvePrivateKey for signing + data: Bytes to sign + + Returns: + DER-encoded signature (variable length, up to ~70-72 bytes) + """ + signature = private_key.sign(data, ec.ECDSA(hashes.SHA256())) + return signature + + +def verify_signature(public_key: ec.EllipticCurvePublicKey, signature: bytes, data: bytes) -> bool: + """ + Verify a DER-encoded ECDSA signature. + + Args: + public_key: EllipticCurvePublicKey for verification + signature: DER-encoded signature + data: Original signed data + + Returns: + True if signature is valid, False otherwise + """ + try: + public_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) + return True + except InvalidSignature: + return False + + +def get_ephemeral_keypair() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: + """ + Generate an ephemeral ECDH key pair (P-256). + + Returns: + Tuple containing: + - private_key: EllipticCurvePrivateKey object + - pubkey_bytes: Raw x||y format (64 bytes, 512 bits) + """ + private_key = ec.generate_private_key(ec.SECP256R1()) + numbers = private_key.public_key().public_numbers() + + x_bytes = numbers.x.to_bytes(32, 'big') + y_bytes = numbers.y.to_bytes(32, 'big') + + return private_key, x_bytes + y_bytes # 64 bytes total + + +def compute_ecdh_shared_key(private_key: ec.EllipticCurvePrivateKey, peer_pubkey_bytes: bytes) -> bytes: + """ + Compute a shared secret using ECDH. + + Args: + private_key: Local ECDH private key + peer_pubkey_bytes: Peer's ephemeral public key (64 bytes, raw x||y format) + + Returns: + Shared secret bytes + + Raises: + ValueError: If peer_pubkey_bytes is not 64 bytes + """ + if len(peer_pubkey_bytes) != 64: + raise ValueError("Peer public key must be 64 bytes (x||y format)") + + x_int = int.from_bytes(peer_pubkey_bytes[:32], 'big') + y_int = int.from_bytes(peer_pubkey_bytes[32:], 'big') + + # Create public key object from raw components + peer_public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) + peer_public_key = peer_public_numbers.public_key() + + # Perform key exchange + shared_key = private_key.exchange(ec.ECDH(), peer_public_key) + return shared_key + + +def der_to_raw(der_sig: bytes) -> bytes: + """ + Convert a DER-encoded ECDSA signature to a raw 64-byte signature (r||s). + + Args: + der_sig: DER-encoded signature + + Returns: + Raw 64-byte signature (r||s format), with each component padded to 32 bytes + """ + r, s = decode_dss_signature(der_sig) + r_bytes = r.to_bytes(32, byteorder='big') + s_bytes = s.to_bytes(32, byteorder='big') + return r_bytes + s_bytes + + +def raw_signature_to_der(raw_sig: bytes) -> bytes: + """ + Convert a raw signature (64 bytes, concatenated r||s) to DER-encoded signature. + + Args: + raw_sig: Raw 64-byte signature (r||s format) + + Returns: + DER-encoded signature + + Raises: + ValueError: If raw_sig is not 64 bytes + """ + if len(raw_sig) != 64: + raise ValueError("Raw signature must be 64 bytes (r||s).") + + r = int.from_bytes(raw_sig[:32], 'big') + s = int.from_bytes(raw_sig[32:], 'big') + return encode_dss_signature(r, s) diff --git a/protocol_prototype/Prototype/Protocol/encryption.py b/protocol_prototype/Prototype/Protocol/encryption.py new file mode 100644 index 0000000..87a6b32 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/encryption.py @@ -0,0 +1,307 @@ +import os +import struct +from typing import Optional, Tuple +from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305 + +class MessageHeader: + """ + Header of an encrypted message (18 bytes total): + + Clear Text Section (4 bytes): + - flag: 16 bits (0xBEEF by default) + - data_len: 16 bits (length of encrypted payload excluding tag) + + Associated Data (14 bytes): + - retry: 8 bits (retry counter) + - connection_status: 4 bits (e.g., CRC required) + 4 bits padding + - iv/messageID: 96 bits (12 bytes) + """ + def __init__(self, flag: int, data_len: int, retry: int, connection_status: int, iv: bytes): + if not (0 <= flag < 65536): + raise ValueError("Flag must fit in 16 bits (0..65535)") + if not (0 <= data_len < 65536): + raise ValueError("Data length must fit in 16 bits (0..65535)") + if not (0 <= retry < 256): + raise ValueError("Retry must fit in 8 bits (0..255)") + if not (0 <= connection_status < 16): + raise ValueError("Connection status must fit in 4 bits (0..15)") + if len(iv) != 12: + raise ValueError("IV must be 12 bytes (96 bits)") + + self.flag = flag # 16 bits + self.data_len = data_len # 16 bits + self.retry = retry # 8 bits + self.connection_status = connection_status # 4 bits + self.iv = iv # 96 bits (12 bytes) + + def pack(self) -> bytes: + """Pack header into 18 bytes.""" + # Pack flag and data_len (4 bytes) + header = struct.pack('>H H', self.flag, self.data_len) + + # Pack retry and connection_status (2 bytes) + # connection_status in high 4 bits of second byte, 4 bits padding as zero + ad_byte = (self.connection_status & 0x0F) << 4 + ad_packed = struct.pack('>B B', self.retry, ad_byte) + + # Append IV (12 bytes) + return header + ad_packed + self.iv + + def get_associated_data(self) -> bytes: + """Get the associated data for AEAD encryption (retry, conn_status, iv).""" + # Pack retry and connection_status + ad_byte = (self.connection_status & 0x0F) << 4 + ad_packed = struct.pack('>B B', self.retry, ad_byte) + + # Append IV + return ad_packed + self.iv + + @classmethod + def unpack(cls, data: bytes) -> 'MessageHeader': + """Unpack 18 bytes into a MessageHeader object.""" + if len(data) < 18: + raise ValueError(f"Header data too short: {len(data)} bytes, expected 18") + + flag, data_len = struct.unpack('>H H', data[:4]) + retry, ad_byte = struct.unpack('>B B', data[4:6]) + connection_status = (ad_byte >> 4) & 0x0F + iv = data[6:18] + + return cls(flag, data_len, retry, connection_status, iv) + +class EncryptedMessage: + """ + Encrypted message packet format: + + - Header (18 bytes): + * flag: 16 bits + * data_len: 16 bits + * retry: 8 bits + * connection_status: 4 bits (+ 4 bits padding) + * iv/messageID: 96 bits (12 bytes) + + - Payload: variable length encrypted data + + - Footer: + * Authentication tag: 128 bits (16 bytes) + * CRC32: 32 bits (4 bytes) - optional, based on connection_status + """ + def __init__(self, plaintext: bytes, key: bytes, flag: int = 0xBEEF, + retry: int = 0, connection_status: int = 0, iv: bytes = None, + cipher_type: int = 0): + self.plaintext = plaintext + self.key = key + self.flag = flag + self.retry = retry + self.connection_status = connection_status + self.iv = iv or generate_iv(initial=True) + self.cipher_type = cipher_type # 0 = AES-256-GCM, 1 = ChaCha20-Poly1305 + + # Will be set after encryption + self.ciphertext = None + self.tag = None + self.header = None + + def encrypt(self) -> bytes: + """Encrypt the plaintext and return the full encrypted message.""" + # Create header with correct data_len (which will be set after encryption) + self.header = MessageHeader( + flag=self.flag, + data_len=0, # Will be updated after encryption + retry=self.retry, + connection_status=self.connection_status, + iv=self.iv + ) + + # Get associated data for AEAD + aad = self.header.get_associated_data() + + # Encrypt using the appropriate cipher + if self.cipher_type == 0: # AES-256-GCM + cipher = AESGCM(self.key) + ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) + elif self.cipher_type == 1: # ChaCha20-Poly1305 + cipher = ChaCha20Poly1305(self.key) + ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) + else: + raise ValueError(f"Unsupported cipher type: {self.cipher_type}") + + # Extract ciphertext and tag + self.tag = ciphertext_with_tag[-16:] + self.ciphertext = ciphertext_with_tag[:-16] + + # Update header with actual data length + self.header.data_len = len(self.ciphertext) + + # Pack everything together + packed_header = self.header.pack() + + # Check if CRC is required (based on connection_status) + if self.connection_status & 0x01: # Lowest bit indicates CRC required + import zlib + # Compute CRC32 of header + ciphertext + tag + crc = zlib.crc32(packed_header + self.ciphertext + self.tag) & 0xffffffff + crc_bytes = struct.pack('>I', crc) + return packed_header + self.ciphertext + self.tag + crc_bytes + else: + return packed_header + self.ciphertext + self.tag + + @classmethod + def decrypt(cls, data: bytes, key: bytes, cipher_type: int = 0) -> Tuple[bytes, MessageHeader]: + """ + Decrypt an encrypted message and return the plaintext and header. + + Args: + data: The full encrypted message + key: The encryption key + cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 + + Returns: + Tuple of (plaintext, header) + """ + if len(data) < 18 + 16: # Header + minimum tag size + raise ValueError("Message too short") + + # Extract header + header_bytes = data[:18] + header = MessageHeader.unpack(header_bytes) + + # Get ciphertext and tag + data_len = header.data_len + ciphertext_start = 18 + ciphertext_end = ciphertext_start + data_len + + if ciphertext_end + 16 > len(data): + raise ValueError("Message length does not match header's data_len") + + ciphertext = data[ciphertext_start:ciphertext_end] + tag = data[ciphertext_end:ciphertext_end + 16] + + # Get associated data for AEAD + aad = header.get_associated_data() + + # Combine ciphertext and tag for decryption + ciphertext_with_tag = ciphertext + tag + + # Decrypt using the appropriate cipher + try: + if cipher_type == 0: # AES-256-GCM + cipher = AESGCM(key) + plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) + elif cipher_type == 1: # ChaCha20-Poly1305 + cipher = ChaCha20Poly1305(key) + plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) + else: + raise ValueError(f"Unsupported cipher type: {cipher_type}") + + return plaintext, header + except Exception as e: + raise ValueError(f"Decryption failed: {e}") + +def generate_iv(initial: bool = False, previous_iv: bytes = None) -> bytes: + """ + Generate a 96-bit IV (12 bytes). + + Args: + initial: If True, return a random IV + previous_iv: The previous IV to increment + + Returns: + A new IV + """ + if initial or previous_iv is None: + return os.urandom(12) # 96 bits + else: + # Increment the previous IV by 1 modulo 2^96 + iv_int = int.from_bytes(previous_iv, 'big') + iv_int = (iv_int + 1) % (1 << 96) + return iv_int.to_bytes(12, 'big') + +# Convenience functions to match original API +def encrypt_message(plaintext: bytes, key: bytes, flag: int = 0xBEEF, + retry: int = 0, connection_status: int = 0, + iv: bytes = None, cipher_type: int = 0) -> bytes: + """ + Encrypt a message using the specified parameters. + + Args: + plaintext: The data to encrypt + key: The encryption key (32 bytes for AES-256-GCM, 32 bytes for ChaCha20-Poly1305) + flag: 16-bit flag value (default: 0xBEEF) + retry: 8-bit retry counter + connection_status: 4-bit connection status + iv: Optional 96-bit IV (if None, a random one will be generated) + cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 + + Returns: + The full encrypted message + """ + message = EncryptedMessage( + plaintext=plaintext, + key=key, + flag=flag, + retry=retry, + connection_status=connection_status, + iv=iv, + cipher_type=cipher_type + ) + return message.encrypt() + +def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes: + """ + Decrypt a message. + + Args: + message: The full encrypted message + key: The encryption key + cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 + + Returns: + The decrypted plaintext + """ + plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type) + return plaintext + +# ChaCha20-CTR functions for voice streaming (without authentication) +def chacha20_encrypt(plaintext: bytes, key: bytes, nonce: bytes) -> bytes: + """ + Encrypt plaintext using ChaCha20 in CTR mode (no authentication). + + Args: + plaintext: Data to encrypt + key: 32-byte key + nonce: 16-byte nonce (for ChaCha20 in cryptography library) + + Returns: + Ciphertext + """ + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from cryptography.hazmat.backends import default_backend + + if len(key) != 32: + raise ValueError("ChaCha20 key must be 32 bytes") + if len(nonce) != 16: + raise ValueError("ChaCha20 nonce must be 16 bytes") + + cipher = Cipher( + algorithms.ChaCha20(key, nonce), + mode=None, + backend=default_backend() + ) + encryptor = cipher.encryptor() + return encryptor.update(plaintext) + encryptor.finalize() + +def chacha20_decrypt(ciphertext: bytes, key: bytes, nonce: bytes) -> bytes: + """ + Decrypt ciphertext using ChaCha20 in CTR mode (no authentication). + + Args: + ciphertext: Data to decrypt + key: 32-byte key + nonce: 12-byte nonce + + Returns: + Plaintext + """ + # ChaCha20 is symmetrical - encryption and decryption are the same + return chacha20_encrypt(ciphertext, key, nonce) diff --git a/protocol_prototype/Prototype/Protocol/messages.py b/protocol_prototype/Prototype/Protocol/messages.py new file mode 100644 index 0000000..5521499 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/messages.py @@ -0,0 +1,463 @@ +import os +import struct +import time +import zlib +import hashlib +from typing import Tuple, Optional + +def crc32_of(data: bytes) -> int: + """ + Compute CRC-32 of 'data'. + """ + return zlib.crc32(data) & 0xffffffff + + +# --------------------------------------------------------------------------- +# PING REQUEST (new format) +# Fields (in order): +# - session_nonce: 129 bits (from the top 129 bits of 17 random bytes) +# - version: 7 bits +# - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305; for now only 0 is used) +# - CRC: 32 bits +# +# Total bits: 129 + 7 + 4 + 32 = 172 bits. We pack into 22 bytes (176 bits) with 4 spare bits. +# --------------------------------------------------------------------------- +class PingRequest: + """ + PING REQUEST format (172 bits / 22 bytes): + - session_nonce: 129 bits (from top 129 bits of 17 random bytes) + - version: 7 bits + - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305) + - CRC: 32 bits + """ + def __init__(self, version: int, cipher: int, session_nonce: bytes = None): + if not (0 <= version < 128): + raise ValueError("Version must fit in 7 bits (0..127)") + if not (0 <= cipher < 16): + raise ValueError("Cipher must fit in 4 bits (0..15)") + + self.version = version + self.cipher = cipher + + # Generate session nonce if not provided + if session_nonce is None: + # Generate 17 random bytes + nonce_full = os.urandom(17) + # Use top 129 bits + nonce_int_full = int.from_bytes(nonce_full, 'big') + nonce_129_int = nonce_int_full >> 7 # drop lowest 7 bits + self.session_nonce = nonce_129_int.to_bytes(17, 'big') + else: + if len(session_nonce) != 17: + raise ValueError("Session nonce must be 17 bytes (136 bits)") + self.session_nonce = session_nonce + + def serialize(self) -> bytes: + """Serialize the ping request into a 22-byte packet.""" + # Convert session_nonce to integer (129 bits) + nonce_int = int.from_bytes(self.session_nonce, 'big') + + # Pack fields: shift nonce left by 11 bits, add version and cipher + partial_int = (nonce_int << 11) | (self.version << 4) | (self.cipher & 0x0F) + # This creates 129+7+4 = 140 bits; pack into 18 bytes + partial_bytes = partial_int.to_bytes(18, 'big') + + # Compute CRC over these 18 bytes + cval = crc32_of(partial_bytes) + + # Combine partial data with 32-bit CRC + final_int = (int.from_bytes(partial_bytes, 'big') << 32) | cval + return final_int.to_bytes(22, 'big') + + @classmethod + def deserialize(cls, data: bytes) -> Optional['PingRequest']: + """Deserialize a 22-byte packet into a PingRequest object.""" + if len(data) != 22: + return None + + # Extract 176-bit integer + final_int = int.from_bytes(data, 'big') + + # Extract CRC and verify + crc_in = final_int & 0xffffffff + partial_int = final_int >> 32 # 140 bits + partial_bytes = partial_int.to_bytes(18, 'big') + crc_calc = crc32_of(partial_bytes) + + if crc_calc != crc_in: + return None + + # Extract fields + cipher = partial_int & 0x0F + version = (partial_int >> 4) & 0x7F + nonce_129_int = partial_int >> 11 # 129 bits + session_nonce = nonce_129_int.to_bytes(17, 'big') + + return cls(version, cipher, session_nonce) + + + +# --------------------------------------------------------------------------- +# PING RESPONSE (new format) +# Fields: +# - timestamp: 32 bits (we take the lower 32 bits of the time in ms) +# - version: 7 bits +# - cipher: 4 bits +# - answer: 1 bit +# - CRC: 32 bits +# +# Total bits: 32 + 7 + 4 + 1 + 32 = 76 bits; pack into 10 bytes (80 bits) with 4 spare bits. +# --------------------------------------------------------------------------- +class PingResponse: + """ + PING RESPONSE format (76 bits / 10 bytes): + - timestamp: 32 bits (milliseconds since epoch, lower 32 bits) + - version: 7 bits + - cipher: 4 bits + - answer: 1 bit (0 = no, 1 = yes) + - CRC: 32 bits + """ + def __init__(self, version: int, cipher: int, answer: int, timestamp: int = None): + if not (0 <= version < 128): + raise ValueError("Version must fit in 7 bits") + if not (0 <= cipher < 16): + raise ValueError("Cipher must fit in 4 bits") + if answer not in (0, 1): + raise ValueError("Answer must be 0 or 1") + + self.version = version + self.cipher = cipher + self.answer = answer + self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) + + def serialize(self) -> bytes: + """Serialize the ping response into a 10-byte packet.""" + # Pack timestamp, version, cipher, answer: 32+7+4+1 = 44 bits + # Shift left by 4 to put spare bits at the end + partial_val = (self.timestamp << (7+4+1)) | (self.version << (4+1)) | (self.cipher << 1) | self.answer + partial_val_shifted = partial_val << 4 # Add 4 spare bits at the end + partial_bytes = partial_val_shifted.to_bytes(6, 'big') # 6 bytes = 48 bits + + # Compute CRC + cval = crc32_of(partial_bytes) + + # Combine with CRC + final_val = (int.from_bytes(partial_bytes, 'big') << 32) | cval + return final_val.to_bytes(10, 'big') + + @classmethod + def deserialize(cls, data: bytes) -> Optional['PingResponse']: + """Deserialize a 10-byte packet into a PingResponse object.""" + if len(data) != 10: + return None + + # Extract 80-bit integer + final_int = int.from_bytes(data, 'big') + + # Extract CRC and verify + crc_in = final_int & 0xffffffff + partial_int = final_int >> 32 # 48 bits + partial_bytes = partial_int.to_bytes(6, 'big') + crc_calc = crc32_of(partial_bytes) + + if crc_calc != crc_in: + return None + + # Extract fields (discard 4 spare bits) + partial_int >>= 4 # now 44 bits + answer = partial_int & 0x01 + cipher = (partial_int >> 1) & 0x0F + version = (partial_int >> (1+4)) & 0x7F + timestamp = partial_int >> (1+4+7) + + return cls(version, cipher, answer, timestamp) + + +# ============================================================================= +# 3) Handshake +# - 32-bit timestamp +# - 64-byte ephemeral pubkey (raw x||y = 512 bits) +# - 64-byte ephemeral signature (raw r||s = 512 bits) +# - 32-byte PFS hash (256 bits) +# - 32-bit CRC +# => total 4 + 64 + 64 + 32 + 4 = 168 bytes = 1344 bits +# ============================================================================= + +class Handshake: + """ + HANDSHAKE format (1344 bits / 168 bytes): + - timestamp: 32 bits + - ephemeral_pubkey: 512 bits (64 bytes, raw x||y format) + - ephemeral_signature: 512 bits (64 bytes, raw r||s format) + - pfs_hash: 256 bits (32 bytes) + - CRC: 32 bits + """ + def __init__(self, ephemeral_pubkey: bytes, ephemeral_signature: bytes, pfs_hash: bytes, timestamp: int = None): + if len(ephemeral_pubkey) != 64: + raise ValueError("ephemeral_pubkey must be 64 bytes (raw x||y)") + if len(ephemeral_signature) != 64: + raise ValueError("ephemeral_signature must be 64 bytes (raw r||s)") + if len(pfs_hash) != 32: + raise ValueError("pfs_hash must be 32 bytes") + + self.ephemeral_pubkey = ephemeral_pubkey + self.ephemeral_signature = ephemeral_signature + self.pfs_hash = pfs_hash + self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) + + def serialize(self) -> bytes: + """Serialize the handshake into a 168-byte packet.""" + # Pack timestamp and other fields + partial = struct.pack("!I", self.timestamp) + self.ephemeral_pubkey + self.ephemeral_signature + self.pfs_hash + + # Compute CRC + cval = crc32_of(partial) + + # Append CRC + return partial + struct.pack("!I", cval) + + @classmethod + def deserialize(cls, data: bytes) -> Optional['Handshake']: + """Deserialize a 168-byte packet into a Handshake object.""" + if len(data) != 168: + return None + + # Extract and verify CRC + partial = data[:-4] + crc_in = struct.unpack("!I", data[-4:])[0] + crc_calc = crc32_of(partial) + + if crc_calc != crc_in: + return None + + # Extract fields + timestamp = struct.unpack("!I", partial[:4])[0] + ephemeral_pubkey = partial[4:4+64] + ephemeral_signature = partial[68:68+64] + pfs_hash = partial[132:132+32] + + return cls(ephemeral_pubkey, ephemeral_signature, pfs_hash, timestamp) + + +# ============================================================================= +# 4) PFS Hash Helper +# If no previous session, return 32 zero bytes +# Otherwise, compute sha256(session_number || last_shared_secret). +# ============================================================================= + +def compute_pfs_hash(session_number: int, shared_secret_hex: str) -> bytes: + """ + Compute the PFS hash field for handshake messages: + - If no previous session (session_number < 0), return 32 zero bytes + - Otherwise, compute sha256(session_number || shared_secret) + """ + if session_number < 0: + return b"\x00" * 32 + + # Convert shared_secret_hex to raw bytes + secret_bytes = bytes.fromhex(shared_secret_hex) + + # Pack session_number as 4 bytes + sn_bytes = struct.pack("!I", session_number) + + # Compute hash + return hashlib.sha256(sn_bytes + secret_bytes).digest() + + +# Helper function for CRC32 calculations +def compute_crc32(data: bytes) -> int: + """Compute CRC32 of data (for consistency with crc32_of).""" + return zlib.crc32(data) & 0xffffffff + + +# ============================================================================= +# Voice Protocol Messages +# ============================================================================= + +class VoiceStart: + """ + Voice call initiation message (20 bytes). + + Fields: + - version: 8 bits (protocol version) + - codec_mode: 8 bits (Codec2 mode) + - fec_type: 8 bits (0=repetition, 1=convolutional, 2=LDPC) + - flags: 8 bits (reserved for future use) + - session_id: 64 bits (unique voice session identifier) + - initial_sequence: 32 bits (starting sequence number) + - crc32: 32 bits + """ + + def __init__(self, version: int = 0, codec_mode: int = 5, fec_type: int = 0, + flags: int = 0, session_id: int = None, initial_sequence: int = 0): + self.version = version + self.codec_mode = codec_mode + self.fec_type = fec_type + self.flags = flags | 0x80 # Set high bit to distinguish from VoiceSync + self.session_id = session_id or int.from_bytes(os.urandom(8), 'big') + self.initial_sequence = initial_sequence + + def serialize(self) -> bytes: + """Serialize to 20 bytes.""" + # Pack all fields except CRC + data = struct.pack('>BBBBQII', + self.version, + self.codec_mode, + self.fec_type, + self.flags, + self.session_id, + self.initial_sequence, + 0 # CRC placeholder + ) + + # Calculate and append CRC + crc = compute_crc32(data[:-4]) + return data[:-4] + struct.pack('>I', crc) + + @classmethod + def deserialize(cls, data: bytes) -> Optional['VoiceStart']: + """Deserialize from bytes.""" + if len(data) != 20: + return None + + try: + version, codec_mode, fec_type, flags, session_id, initial_seq, crc = struct.unpack('>BBBBQII', data) + + # Verify CRC + expected_crc = compute_crc32(data[:-4]) + if crc != expected_crc: + return None + + return cls(version, codec_mode, fec_type, flags, session_id, initial_seq) + except struct.error: + return None + + +class VoiceAck: + """ + Voice call acknowledgment message (16 bytes). + + Fields: + - version: 8 bits + - status: 8 bits (0=reject, 1=accept) + - codec_mode: 8 bits (negotiated codec mode) + - fec_type: 8 bits (negotiated FEC type) + - session_id: 64 bits (echo of received session_id) + - crc32: 32 bits + """ + + def __init__(self, version: int = 0, status: int = 1, codec_mode: int = 5, + fec_type: int = 0, session_id: int = 0): + self.version = version + self.status = status + self.codec_mode = codec_mode + self.fec_type = fec_type + self.session_id = session_id + + def serialize(self) -> bytes: + """Serialize to 16 bytes.""" + data = struct.pack('>BBBBQI', + self.version, + self.status, + self.codec_mode, + self.fec_type, + self.session_id, + 0 # CRC placeholder + ) + + crc = compute_crc32(data[:-4]) + return data[:-4] + struct.pack('>I', crc) + + @classmethod + def deserialize(cls, data: bytes) -> Optional['VoiceAck']: + """Deserialize from bytes.""" + if len(data) != 16: + return None + + try: + version, status, codec_mode, fec_type, session_id, crc = struct.unpack('>BBBBQI', data) + + expected_crc = compute_crc32(data[:-4]) + if crc != expected_crc: + return None + + return cls(version, status, codec_mode, fec_type, session_id) + except struct.error: + return None + + +class VoiceEnd: + """ + Voice call termination message (12 bytes). + + Fields: + - session_id: 64 bits + - crc32: 32 bits + """ + + def __init__(self, session_id: int): + self.session_id = session_id + + def serialize(self) -> bytes: + """Serialize to 12 bytes.""" + data = struct.pack('>QI', self.session_id, 0) + crc = compute_crc32(data[:-4]) + return data[:-4] + struct.pack('>I', crc) + + @classmethod + def deserialize(cls, data: bytes) -> Optional['VoiceEnd']: + """Deserialize from bytes.""" + if len(data) != 12: + return None + + try: + session_id, crc = struct.unpack('>QI', data) + + expected_crc = compute_crc32(data[:-4]) + if crc != expected_crc: + return None + + return cls(session_id) + except struct.error: + return None + + +class VoiceSync: + """ + Voice synchronization frame (20 bytes). + Used for maintaining sync and providing timing information. + + Fields: + - session_id: 64 bits + - sequence: 32 bits + - timestamp: 32 bits (milliseconds since voice start) + - crc32: 32 bits + """ + + def __init__(self, session_id: int, sequence: int, timestamp: int): + self.session_id = session_id + self.sequence = sequence + self.timestamp = timestamp + + def serialize(self) -> bytes: + """Serialize to 20 bytes.""" + data = struct.pack('>QIII', self.session_id, self.sequence, self.timestamp, 0) + crc = compute_crc32(data[:-4]) + return data[:-4] + struct.pack('>I', crc) + + @classmethod + def deserialize(cls, data: bytes) -> Optional['VoiceSync']: + """Deserialize from bytes.""" + if len(data) != 20: + return None + + try: + session_id, sequence, timestamp, crc = struct.unpack('>QIII', data) + + expected_crc = compute_crc32(data[:-4]) + if crc != expected_crc: + return None + + return cls(session_id, sequence, timestamp) + except struct.error: + return None diff --git a/protocol_prototype/Prototype/Protocol/protocol.py b/protocol_prototype/Prototype/Protocol/protocol.py new file mode 100644 index 0000000..28ed168 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/protocol.py @@ -0,0 +1,1069 @@ +import random +import os +import time +import threading +from typing import List, Dict, Any, Optional, Tuple + +from crypto_utils import ( + generate_identity_keys, + load_peer_identity_key, + sign_data, + verify_signature, + get_ephemeral_keypair, + compute_ecdh_shared_key, + der_to_raw, + raw_signature_to_der +) +from messages import ( + PingRequest, PingResponse, Handshake, + compute_pfs_hash, + VoiceStart, VoiceAck, VoiceEnd, VoiceSync +) +import transmission +from encryption import ( + EncryptedMessage, MessageHeader, + generate_iv, encrypt_message, decrypt_message +) +from auto_mode import AutoMode, AutoModeConfig +from voice_codec import VoiceProtocol + +# ANSI colors +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +RESET = "\033[0m" + + +class IcingProtocol: + def __init__(self): + # Identity keys (each 512 bits when printed as hex of 64 bytes) + self.identity_privkey, self.identity_pubkey = generate_identity_keys() + + # Peer identity for verifying ephemeral signatures + self.peer_identity_pubkey_obj = None + self.peer_identity_pubkey_bytes = None + + # Ephemeral keys (our side) + self.ephemeral_privkey = None + self.ephemeral_pubkey = None + + # Last computed shared secret (hex string) + self.shared_secret = None + + # Derived HKDF key (hex string, 256 bits) + self.hkdf_key = None + + # Negotiated cipher (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) + self.cipher_type = 0 + + # For PFS: track per-peer session info (session number and last shared secret) + self.pfs_history: Dict[bytes, Tuple[int, str]] = {} + + # Protocol flags + self.state = { + "ping_sent": False, + "ping_received": False, + "handshake_sent": False, + "handshake_received": False, + "key_exchange_complete": False + } + + # Auto mode for automated protocol operation + self.auto_mode = AutoMode(self) + + # Legacy auto-responder toggle (kept for backward compatibility) + self.auto_responder = False + + # Voice protocol handler + self.voice_protocol = None # Will be initialized after key exchange + self.voice_session_active = False + self.voice_session_id = None + + # Active connections list + self.connections = [] + + # Inbound messages (each message is a dict with keys: type, raw, parsed, connection) + self.inbound_messages: List[Dict[str, Any]] = [] + + # Store the session nonce (17 bytes but only 129 bits are valid) from first sent or received PING + self.session_nonce: bytes = None + + # Last used IV for encrypted messages + self.last_iv: bytes = None + + self.local_port = random.randint(30000, 40000) + self.server_listener = transmission.ServerListener( + host="127.0.0.1", + port=self.local_port, + on_new_connection=self.on_new_connection, + on_data_received=self.on_data_received + ) + self.server_listener.start() + + # ------------------------------------------------------------------------- + # Transport callbacks + # ------------------------------------------------------------------------- + + def on_new_connection(self, conn: transmission.PeerConnection): + print(f"{GREEN}[IcingProtocol]{RESET} New incoming connection.") + self.connections.append(conn) + + # Notify auto mode + self.auto_mode.handle_connection_established() + + def on_data_received(self, conn: transmission.PeerConnection, data: bytes): + bits_count = len(data) * 8 + print( + f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex()) > 60 else ''}") + + # PING REQUEST (22 bytes) + if len(data) == 22: + ping_request = PingRequest.deserialize(data) + if ping_request: + self.state["ping_received"] = True + + # If received cipher is not supported, force to 0 (AES-256-GCM) + if ping_request.cipher != 0 and ping_request.cipher != 1: + print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({ping_request.cipher}); forcing cipher to 0 in response.") + ping_request.cipher = 0 + + # Store cipher type for future encrypted messages + self.cipher_type = ping_request.cipher + + # Store session nonce if not already set + if self.session_nonce is None: + self.session_nonce = ping_request.session_nonce + print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from received PING.") + + index = len(self.inbound_messages) + msg = { + "type": "PING_REQUEST", + "raw": data, + "parsed": ping_request, + "connection": conn + } + self.inbound_messages.append(msg) + + # Handle in auto mode (if active) + self.auto_mode.handle_ping_received(index) + + # Legacy auto-responder (for backward compatibility) + if self.auto_responder and not self.auto_mode.active: + timer = threading.Timer(2.0, self._auto_respond_ping, args=[index]) + timer.daemon = True + timer.start() + return + + # PING RESPONSE (10 bytes) + elif len(data) == 10: + ping_response = PingResponse.deserialize(data) + if ping_response: + # Store negotiated cipher type + self.cipher_type = ping_response.cipher + + index = len(self.inbound_messages) + msg = { + "type": "PING_RESPONSE", + "raw": data, + "parsed": ping_response, + "connection": conn + } + self.inbound_messages.append(msg) + + # Notify auto mode (if active) + self.auto_mode.handle_ping_response_received(ping_response.answer == 1) + return + + # HANDSHAKE message (168 bytes) + elif len(data) == 168: + handshake = Handshake.deserialize(data) + if handshake: + self.state["handshake_received"] = True + index = len(self.inbound_messages) + msg = { + "type": "HANDSHAKE", + "raw": data, + "parsed": handshake, + "connection": conn + } + self.inbound_messages.append(msg) + + # Notify auto mode (if active) + self.auto_mode.handle_handshake_received(index) + + # Legacy auto-responder + if self.auto_responder and not self.auto_mode.active: + timer = threading.Timer(2.0, self._auto_respond_handshake, args=[index]) + timer.daemon = True + timer.start() + return + + # VOICE_START or VOICE_SYNC message (20 bytes) + elif len(data) == 20: + # Check fourth byte (flags field) to distinguish between messages + # VOICE_START has high bit set in flags (byte 3) + # VOICE_SYNC doesn't have this structure + if len(data) >= 4 and (data[3] & 0x80): + # Try VOICE_START first + voice_start = VoiceStart.deserialize(data) + if voice_start: + index = len(self.inbound_messages) + msg = { + "type": "VOICE_START", + "raw": data, + "parsed": voice_start, + "connection": conn + } + self.inbound_messages.append(msg) + print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_START at index={index}.") + + # Handle voice call initiation + self.handle_voice_start(index) + return + + # Try VOICE_SYNC + voice_sync = VoiceSync.deserialize(data) + if voice_sync: + index = len(self.inbound_messages) + msg = { + "type": "VOICE_SYNC", + "raw": data, + "parsed": voice_sync, + "connection": conn + } + self.inbound_messages.append(msg) + print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_SYNC at index={index}.") + + # Handle voice synchronization + self.handle_voice_sync(index) + return + + # VOICE_ACK message (16 bytes) + elif len(data) == 16: + # Try VOICE_ACK first, then fall back to PING_RESPONSE + voice_ack = VoiceAck.deserialize(data) + if voice_ack: + index = len(self.inbound_messages) + msg = { + "type": "VOICE_ACK", + "raw": data, + "parsed": voice_ack, + "connection": conn + } + self.inbound_messages.append(msg) + print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_ACK at index={index}.") + + # Handle voice call acknowledgment + self.handle_voice_ack(index) + return + + # VOICE_END message (12 bytes) + elif len(data) == 12: + voice_end = VoiceEnd.deserialize(data) + if voice_end: + index = len(self.inbound_messages) + msg = { + "type": "VOICE_END", + "raw": data, + "parsed": voice_end, + "connection": conn + } + self.inbound_messages.append(msg) + print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_END at index={index}.") + + # Handle voice call termination + self.handle_voice_end(index) + return + + + # Check if the message might be an encrypted message (e.g. header of 18 bytes at start) + elif len(data) >= 18: + # Try to parse header + try: + header = MessageHeader.unpack(data[:18]) + # If header unpacking is successful and data length matches header expectations + expected_len = 18 + header.data_len + 16 # Header + payload + tag + + # Check if CRC is included + has_crc = (header.connection_status & 0x01) != 0 + if has_crc: + expected_len += 4 # Add CRC32 length + + if len(data) >= expected_len: + index = len(self.inbound_messages) + msg = { + "type": "ENCRYPTED_MESSAGE", + "raw": data, + "parsed": header, + "connection": conn + } + self.inbound_messages.append(msg) + print(f"{YELLOW}[NOTICE]{RESET} Stored inbound ENCRYPTED_MESSAGE at index={index}.") + + # Notify auto mode + self.auto_mode.handle_encrypted_received(index) + return + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to parse message header: {e}") + + # Otherwise, unrecognized/malformed message. + index = len(self.inbound_messages) + msg = { + "type": "UNKNOWN", + "raw": data, + "parsed": None, + "connection": conn + } + self.inbound_messages.append(msg) + print(f"{RED}[WARNING]{RESET} Unrecognized or malformed message stored at index={index}.") + + + # ------------------------------------------------------------------------- + # HKDF Derivation + # ------------------------------------------------------------------------- + + def derive_hkdf(self): + """ + Derives a 256-bit key using HKDF. + Uses as input keying material (IKM) the shared secret from ECDH. + The salt is computed as SHA256(session_nonce || pfs_param), where: + - session_nonce is taken from self.session_nonce (17 bytes, 129 bits) or defaults to zeros. + - pfs_param is taken from the first inbound HANDSHAKE's pfs_hash field (32 bytes) or zeros. + """ + if not self.shared_secret: + print(f"{RED}[ERROR]{RESET} No shared secret available; cannot derive HKDF key.") + return + + # IKM: shared secret converted from hex to bytes. + ikm = bytes.fromhex(self.shared_secret) + # Use stored session_nonce if available; otherwise default to zeros. + session_nonce = self.session_nonce if self.session_nonce is not None else (b"\x00" * 17) + + # For now, use a simpler approach: just use session_nonce for salt + # This ensures both peers derive the same key + # PFS is still maintained through the shared secret rotation + pfs_param = b"\x00" * 32 # Will use session_nonce only for salt + + # Ensure both are bytes + if isinstance(session_nonce, str): + session_nonce = session_nonce.encode() + if isinstance(pfs_param, str): + pfs_param = pfs_param.encode() + + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + hasher = hashes.Hash(hashes.SHA256()) + hasher.update(session_nonce + pfs_param) + salt_value = hasher.finalize() + + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=32, # 256 bits + salt=salt_value, + info=b"", + ) + derived_key = hkdf.derive(ikm) + self.hkdf_key = derived_key.hex() + self.state["key_exchange_complete"] = True + print(f"{GREEN}[HKDF]{RESET} Derived HKDF key: {self.hkdf_key}") + return True + + # ------------------------------------------------------------------------- + # Legacy Auto-responder helpers (kept for backward compatibility) + # ------------------------------------------------------------------------- + + def _auto_respond_ping(self, index: int): + """ + Called by a Timer to respond automatically to a PING_REQUEST after 2s. + """ + print(f"{BLUE}[AUTO]{RESET} Delayed responding to PING at index={index}") + self.respond_to_ping(index, answer=1) # Accept by default + self.show_state() + + def _auto_respond_handshake(self, index: int): + """ + Called by a Timer to handle inbound HANDSHAKE automatically. + 1) Generate ephemeral keys if not already set + 2) Compute ECDH with the inbound ephemeral pub (generate_ecdhe) + 3) Send our handshake back + 4) Show state + """ + print(f"{BLUE}[AUTO]{RESET} Delayed ECDH process for HANDSHAKE at index={index}") + + # 1) Generate ephemeral keys if we don't have them + if not self.ephemeral_privkey or not self.ephemeral_pubkey: + self.generate_ephemeral_keys() + + # 2) Compute ECDH from inbound ephemeral pub + self.generate_ecdhe(index) + + # 3) Send our handshake to the peer + self.send_handshake() + + # 4) Show final state + self.show_state() + + # ------------------------------------------------------------------------- + # Public Methods for Auto Mode Management + # ------------------------------------------------------------------------- + + def start_auto_mode(self): + """Start the automatic protocol operation mode.""" + self.auto_mode.start() + + def stop_auto_mode(self): + """Stop the automatic protocol operation mode.""" + self.auto_mode.stop() + + def configure_auto_mode(self, **kwargs): + """ + Configure the automatic mode parameters. + + Args: + **kwargs: Configuration parameters to set. Supported parameters: + - ping_response_accept: bool, whether to accept incoming pings + - ping_auto_initiate: bool, whether to initiate pings on connection + - ping_retry_count: int, number of ping retries + - ping_retry_delay: float, seconds between ping retries + - ping_timeout: float, seconds to wait for ping response + - preferred_cipher: int, preferred cipher (0=AES-GCM, 1=ChaCha20-Poly1305) + - handshake_retry_count: int, number of handshake retries + - handshake_retry_delay: float, seconds between handshake retries + - handshake_timeout: float, seconds to wait for handshake + - auto_message_enabled: bool, whether to auto-send messages + - message_interval: float, seconds between auto messages + - message_content: str, default message content + - active_mode: bool, whether to actively initiate protocol + """ + for key, value in kwargs.items(): + if hasattr(self.auto_mode.config, key): + setattr(self.auto_mode.config, key, value) + print(f"{BLUE}[CONFIG]{RESET} Set auto mode {key} = {value}") + else: + print(f"{RED}[ERROR]{RESET} Unknown auto mode configuration parameter: {key}") + + def get_auto_mode_config(self): + """Return the current auto mode configuration.""" + return self.auto_mode.config + + def queue_auto_message(self, message: str): + """ + Add a message to the auto-send queue. + + Args: + message: Message text to send + """ + self.auto_mode.queue_message(message) + + def toggle_auto_mode_logging(self): + """ + Toggle detailed logging for auto mode. + When enabled, will show more information about state transitions and decision making. + """ + if not hasattr(self.auto_mode, 'verbose_logging'): + self.auto_mode.verbose_logging = True + else: + self.auto_mode.verbose_logging = not self.auto_mode.verbose_logging + + status = "enabled" if self.auto_mode.verbose_logging else "disabled" + print(f"{BLUE}[AUTO-LOG]{RESET} Detailed logging {status}") + + def debug_message(self, index: int): + """ + Debug a message in the inbound message queue. + Prints detailed information about the message. + + Args: + index: The index of the message in the inbound_messages queue + """ + if index < 0 or index >= len(self.inbound_messages): + print(f"{RED}[ERROR]{RESET} Invalid message index {index}") + return + + msg = self.inbound_messages[index] + print(f"\n{YELLOW}=== Message Debug [{index}] ==={RESET}") + print(f"Type: {msg['type']}") + print(f"Length: {len(msg['raw'])} bytes = {len(msg['raw'])*8} bits") + print(f"Raw data: {msg['raw'].hex()}") + + if msg['parsed'] is not None: + print(f"\n{YELLOW}--- Parsed Data ---{RESET}") + if msg['type'] == 'PING_REQUEST': + ping = msg['parsed'] + print(f"Version: {ping.version}") + print(f"Cipher: {ping.cipher} ({'AES-256-GCM' if ping.cipher == 0 else 'ChaCha20-Poly1305' if ping.cipher == 1 else 'Unknown'})") + print(f"Session nonce: {ping.session_nonce.hex()}") + print(f"CRC32: {ping.crc32:08x}") + + elif msg['type'] == 'PING_RESPONSE': + resp = msg['parsed'] + print(f"Version: {resp.version}") + print(f"Cipher: {resp.cipher} ({'AES-256-GCM' if resp.cipher == 0 else 'ChaCha20-Poly1305' if resp.cipher == 1 else 'Unknown'})") + print(f"Answer: {resp.answer} ({'Accept' if resp.answer == 1 else 'Reject'})") + print(f"CRC32: {resp.crc32:08x}") + + elif msg['type'] == 'HANDSHAKE': + hs = msg['parsed'] + print(f"Ephemeral pubkey: {hs.ephemeral_pubkey.hex()[:16]}...") + print(f"Ephemeral signature: {hs.ephemeral_signature.hex()[:16]}...") + print(f"PFS hash: {hs.pfs_hash.hex()[:16]}...") + print(f"Timestamp: {hs.timestamp}") + print(f"CRC32: {hs.crc32:08x}") + + elif msg['type'] == 'ENCRYPTED_MESSAGE': + header = msg['parsed'] + print(f"Flag: 0x{header.flag:04x}") + print(f"Data length: {header.data_len} bytes") + print(f"Retry: {header.retry}") + print(f"Connection status: {header.connection_status} ({'CRC included' if header.connection_status & 0x01 else 'No CRC'})") + print(f"IV: {header.iv.hex()}") + + # Calculate expected message size + expected_len = 18 + header.data_len + 16 # Header + payload + tag + if header.connection_status & 0x01: + expected_len += 4 # Add CRC + + print(f"Expected total length: {expected_len} bytes") + print(f"Actual length: {len(msg['raw'])} bytes") + + # If we have a key, try to decrypt + if self.hkdf_key: + print("\nAttempting decryption...") + try: + key = bytes.fromhex(self.hkdf_key) + plaintext = decrypt_message(msg['raw'], key, self.cipher_type) + print(f"Decrypted: {plaintext.decode('utf-8')}") + except Exception as e: + print(f"Decryption failed: {e}") + print() + + # ------------------------------------------------------------------------- + # Public Methods + # ------------------------------------------------------------------------- + + def connect_to_peer(self, port: int): + conn = transmission.connect_to_peer("127.0.0.1", port, self.on_data_received) + self.connections.append(conn) + print(f"{GREEN}[IcingProtocol]{RESET} Outgoing connection to port {port} established.") + + # Notify auto mode + self.auto_mode.handle_connection_established() + + def set_peer_identity(self, peer_pubkey_hex: str): + pubkey_bytes = bytes.fromhex(peer_pubkey_hex) + self.peer_identity_pubkey_obj = load_peer_identity_key(pubkey_bytes) + self.peer_identity_pubkey_bytes = pubkey_bytes + print(f"{GREEN}[IcingProtocol]{RESET} Stored peer identity pubkey (hex={peer_pubkey_hex[:16]}...).") + + def generate_ephemeral_keys(self): + self.ephemeral_privkey, self.ephemeral_pubkey = get_ephemeral_keypair() + print(f"{GREEN}[IcingProtocol]{RESET} Generated ephemeral key pair: pubkey={self.ephemeral_pubkey.hex()[:16]}...") + + # Send PING (session discovery and cipher negotiation) + def send_ping_request(self, cipher_type=0): + """ + Send a ping request to the first connected peer. + + Args: + cipher_type: Preferred cipher type (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) + """ + if not self.connections: + print(f"{RED}[ERROR]{RESET} No active connections.") + return False + + # Validate cipher type + if cipher_type not in (0, 1): + print(f"{YELLOW}[WARNING]{RESET} Invalid cipher type {cipher_type}, defaulting to AES-256-GCM (0)") + cipher_type = 0 + + # Create ping request with specified cipher + ping_request = PingRequest(version=0, cipher=cipher_type) + + # Store session nonce if not already set + if self.session_nonce is None: + self.session_nonce = ping_request.session_nonce + print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from sent PING.") + + # Serialize and send + pkt = ping_request.serialize() + self._send_packet(self.connections[0], pkt, "PING_REQUEST") + self.state["ping_sent"] = True + return True + + def send_handshake(self): + """ + Build and send handshake: + - ephemeral_pubkey (64 bytes, raw x||y) + - ephemeral_signature (64 bytes, raw r||s) + - pfs_hash (32 bytes) + - timestamp (32 bits) + - CRC (32 bits) + """ + if not self.connections: + print(f"{RED}[ERROR]{RESET} No active connections.") + return False + if not self.ephemeral_privkey or not self.ephemeral_pubkey: + print(f"{RED}[ERROR]{RESET} Ephemeral keys not generated.") + return False + if self.peer_identity_pubkey_bytes is None: + print(f"{RED}[ERROR]{RESET} Peer identity not set; needed for PFS tracking.") + return False + + # 1) Sign ephemeral_pubkey using identity key + sig_der = sign_data(self.identity_privkey, self.ephemeral_pubkey) + # Convert DER signature to raw r||s format (64 bytes) + raw_signature = der_to_raw(sig_der) + + # 2) Compute PFS hash + session_number, last_secret_hex = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) + pfs = compute_pfs_hash(session_number, last_secret_hex) + + # 3) Create handshake object + handshake = Handshake( + ephemeral_pubkey=self.ephemeral_pubkey, + ephemeral_signature=raw_signature, + pfs_hash=pfs + ) + + # 4) Serialize and send + pkt = handshake.serialize() + self._send_packet(self.connections[0], pkt, "HANDSHAKE") + self.state["handshake_sent"] = True + return True + + def enable_auto_responder(self, enable: bool): + """ + Legacy method for enabling/disabling auto responder. + For new code, use start_auto_mode() and stop_auto_mode() instead. + """ + self.auto_responder = enable + print(f"{YELLOW}[LEGACY]{RESET} Auto responder set to {enable}. Consider using auto_mode instead.") + + # ------------------------------------------------------------------------- + # Manual Responses + # ------------------------------------------------------------------------- + + def respond_to_ping(self, index: int, answer: int): + """ + Respond to a ping request with the specified answer (0 = no, 1 = yes). + If answer is 1, we accept the connection and use the cipher specified in the request. + """ + if index < 0 or index >= len(self.inbound_messages): + print(f"{RED}[ERROR]{RESET} Invalid index {index}.") + return False + msg = self.inbound_messages[index] + if msg["type"] != "PING_REQUEST": + print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a PING_REQUEST.") + return False + + ping_request = msg["parsed"] + version = ping_request.version + cipher = ping_request.cipher + + # Force cipher to 0 or 1 (only AES-256-GCM and ChaCha20-Poly1305 are supported) + if cipher != 0 and cipher != 1: + print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({cipher}); forcing cipher to 0 in response.") + cipher = 0 + + # Store the negotiated cipher type if we're accepting + if answer == 1: + self.cipher_type = cipher + + conn = msg["connection"] + # Create ping response + ping_response = PingResponse(version, cipher, answer) + resp = ping_response.serialize() + self._send_packet(conn, resp, "PING_RESPONSE") + print(f"{BLUE}[MANUAL]{RESET} Responded to ping with answer={answer}.") + return True + + def generate_ecdhe(self, index: int): + """ + Process a handshake message: + 1. Verify the ephemeral signature + 2. Compute the ECDH shared secret + 3. Update PFS history + """ + if index < 0 or index >= len(self.inbound_messages): + print(f"{RED}[ERROR]{RESET} Invalid index {index}.") + return False + msg = self.inbound_messages[index] + if msg["type"] != "HANDSHAKE": + print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a HANDSHAKE.") + return False + + handshake = msg["parsed"] + + # Convert raw signature to DER for verification + raw_sig = handshake.ephemeral_signature + sig_der = raw_signature_to_der(raw_sig) + + # Verify signature + ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, handshake.ephemeral_pubkey) + if not ok: + print(f"{RED}[ERROR]{RESET} Ephemeral signature invalid.") + return False + print(f"{GREEN}[OK]{RESET} Ephemeral signature verified.") + + # Check if we have ephemeral keys + if not self.ephemeral_privkey: + print(f"{YELLOW}[WARN]{RESET} No ephemeral_privkey available, cannot compute shared secret.") + return False + + # Compute ECDH shared secret + shared = compute_ecdh_shared_key(self.ephemeral_privkey, handshake.ephemeral_pubkey) + self.shared_secret = shared.hex() + print(f"{GREEN}[OK]{RESET} Computed ECDH shared key = {self.shared_secret}") + + # Update PFS history + old_session, _ = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) + new_session = 1 if old_session < 0 else old_session + 1 + self.pfs_history[self.peer_identity_pubkey_bytes] = (new_session, self.shared_secret) + return True + + # ------------------------------------------------------------------------- + # Utility + # ------------------------------------------------------------------------- + + def _send_packet(self, conn: transmission.PeerConnection, data: bytes, label: str): + bits_count = len(data) * 8 + print(f"{BLUE}[SEND]{RESET} {label} -> {bits_count} bits: {data.hex()[:60]}{'...' if len(data.hex())>60 else ''}") + conn.send(data) + + def show_state(self): + print(f"\n{YELLOW}=== Global State ==={RESET}") + print(f"Listening Port: {self.local_port}") + print(f"Identity PubKey: 512 bits => {self.identity_pubkey.hex()[:16]}...") + + if self.peer_identity_pubkey_bytes: + print(f"Peer Identity PubKey: 512 bits => {self.peer_identity_pubkey_bytes.hex()[:16]}...") + else: + print("Peer Identity PubKey: [None]") + + print("\nEphemeral Keys:") + if self.ephemeral_pubkey: + print(f" ephemeral_pubkey: 512 bits => {self.ephemeral_pubkey.hex()[:16]}...") + else: + print(" ephemeral_pubkey: [None]") + + print(f"\nShared Secret: {self.shared_secret if self.shared_secret else '[None]'}") + + if self.hkdf_key: + print(f"HKDF Derived Key: {self.hkdf_key} (size: {len(self.hkdf_key)*8} bits)") + else: + print("HKDF Derived Key: [None]") + + print(f"Negotiated Cipher: {'AES-256-GCM' if self.cipher_type == 0 else 'ChaCha20-Poly1305'} (code: {self.cipher_type})") + + if self.session_nonce: + print(f"Session Nonce: {self.session_nonce.hex()} (129 bits)") + else: + print("Session Nonce: [None]") + + if self.last_iv: + print(f"Last IV: {self.last_iv.hex()} (96 bits)") + else: + print("Last IV: [None]") + + print("\nProtocol Flags:") + for k, v in self.state.items(): + print(f" {k}: {v}") + + print("\nAuto Mode Active:", self.auto_mode.active) + print("Auto Mode State:", self.auto_mode.state) + print("Legacy Auto Responder:", self.auto_responder) + + print("\nVoice Status:") + print(f" Active: {self.voice_session_active}") + if self.voice_session_id: + print(f" Session ID: {self.voice_session_id:016x}") + print(f" Voice Protocol: {'Initialized' if self.voice_protocol else 'Not initialized'}") + + print("\nActive Connections:") + for i, c in enumerate(self.connections): + print(f" [{i}] Alive={c.alive}") + + print("\nInbound Message Queue:") + for i, m in enumerate(self.inbound_messages): + print(f" [{i}] type={m['type']} len={len(m['raw'])} bytes => {len(m['raw']) * 8} bits") + print() + + def stop(self): + """Stop the protocol and clean up resources.""" + # Stop auto mode first + self.auto_mode.stop() + + # Stop server listener + self.server_listener.stop() + + # Close all connections + for c in self.connections: + c.close() + self.connections.clear() + self.inbound_messages.clear() + print(f"{RED}[STOP]{RESET} Protocol stopped.") + + # ------------------------------------------------------------------------- + # Encrypted Messaging + # ------------------------------------------------------------------------- + + def send_encrypted_message(self, plaintext: str): + """ + Encrypts and sends a message using the derived HKDF key and negotiated cipher. + The message format is: + - Header (18 bytes): flag, data_len, retry, connection_status, IV + - Payload: variable length encrypted data + - Footer: Authentication tag (16 bytes) + optional CRC32 (4 bytes) + """ + if not self.connections: + print(f"{RED}[ERROR]{RESET} No active connections.") + return False + if not self.hkdf_key: + print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot encrypt message.") + return False + + # Get the encryption key + key = bytes.fromhex(self.hkdf_key) + + # Convert plaintext to bytes + plaintext_bytes = plaintext.encode('utf-8') + + # Generate or increment the IV + if self.last_iv is None: + # First message, generate random IV + iv = generate_iv(initial=True) + else: + # Subsequent message, increment previous IV + iv = generate_iv(initial=False, previous_iv=self.last_iv) + + # Store the new IV + self.last_iv = iv + + # Create encrypted message (connection_status 0 = no CRC) + encrypted = encrypt_message( + plaintext=plaintext_bytes, + key=key, + flag=0xBEEF, # Default flag + retry=0, + connection_status=0, # No CRC + iv=iv, + cipher_type=self.cipher_type + ) + + # Send the encrypted message + self._send_packet(self.connections[0], encrypted, "ENCRYPTED_MESSAGE") + print(f"{GREEN}[SEND_ENCRYPTED]{RESET} Encrypted message sent.") + return True + + def decrypt_received_message(self, index: int): + """ + Decrypt a received encrypted message using the HKDF key and negotiated cipher. + """ + if index < 0 or index >= len(self.inbound_messages): + print(f"{RED}[ERROR]{RESET} Invalid message index.") + return None + + msg = self.inbound_messages[index] + if msg["type"] != "ENCRYPTED_MESSAGE": + print(f"{RED}[ERROR]{RESET} Message at index {index} is not an ENCRYPTED_MESSAGE.") + return None + + # Get the encrypted message + encrypted = msg["raw"] + + if not self.hkdf_key: + print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot decrypt message.") + return None + + # Get the encryption key + key = bytes.fromhex(self.hkdf_key) + + try: + # Decrypt the message + plaintext = decrypt_message(encrypted, key, self.cipher_type) + + # Convert to string + plaintext_str = plaintext.decode('utf-8') + + # Update last IV from the header + header = MessageHeader.unpack(encrypted[:18]) + self.last_iv = header.iv + + print(f"{GREEN}[DECRYPTED]{RESET} Decrypted message: {plaintext_str}") + return plaintext_str + except Exception as e: + print(f"{RED}[ERROR]{RESET} Decryption failed: {e}") + return None + + # ------------------------------------------------------------------------- + # Voice Protocol Methods + # ------------------------------------------------------------------------- + + def handle_voice_start(self, index: int): + """Handle incoming voice call initiation.""" + if index < 0 or index >= len(self.inbound_messages): + return + + msg = self.inbound_messages[index] + voice_start = msg["parsed"] + + print(f"{BLUE}[VOICE]{RESET} Incoming voice call (session_id={voice_start.session_id:016x})") + print(f" Codec mode: {voice_start.codec_mode}") + print(f" FEC type: {voice_start.fec_type}") + + # Auto-accept if in auto mode (or implement your own logic) + if self.auto_mode.active: + self.accept_voice_call(voice_start.session_id, voice_start.codec_mode, voice_start.fec_type) + + def handle_voice_ack(self, index: int): + """Handle voice call acknowledgment.""" + if index < 0 or index >= len(self.inbound_messages): + return + + msg = self.inbound_messages[index] + voice_ack = msg["parsed"] + + if voice_ack.status == 1: + print(f"{GREEN}[VOICE]{RESET} Voice call accepted (session_id={voice_ack.session_id:016x})") + self.voice_session_active = True + self.voice_session_id = voice_ack.session_id + + # Initialize voice protocol if not already done + if not self.voice_protocol: + self.voice_protocol = VoiceProtocol(self) + else: + print(f"{RED}[VOICE]{RESET} Voice call rejected") + + def handle_voice_end(self, index: int): + """Handle voice call termination.""" + if index < 0 or index >= len(self.inbound_messages): + return + + msg = self.inbound_messages[index] + voice_end = msg["parsed"] + + print(f"{YELLOW}[VOICE]{RESET} Voice call ended (session_id={voice_end.session_id:016x})") + + if self.voice_session_id == voice_end.session_id: + self.voice_session_active = False + self.voice_session_id = None + + def handle_voice_sync(self, index: int): + """Handle voice synchronization frame.""" + if index < 0 or index >= len(self.inbound_messages): + return + + msg = self.inbound_messages[index] + voice_sync = msg["parsed"] + + # Use sync info for timing/jitter buffer management + print(f"{BLUE}[VOICE-SYNC]{RESET} seq={voice_sync.sequence}, ts={voice_sync.timestamp}ms") + + def start_voice_call(self, codec_mode: int = 5, fec_type: int = 0): + """ + Initiate a voice call. + + Args: + codec_mode: Codec2 mode (default 5 = 1200bps) + fec_type: FEC type (0=repetition, 1=convolutional, 2=LDPC) + """ + if not self.connections: + print(f"{RED}[ERROR]{RESET} No active connections.") + return False + + if not self.state.get("key_exchange_complete"): + print(f"{RED}[ERROR]{RESET} Key exchange not complete. Cannot start voice call.") + return False + + # Create VOICE_START message + voice_start = VoiceStart( + version=0, + codec_mode=codec_mode, + fec_type=fec_type + ) + + self.voice_session_id = voice_start.session_id + + # Send the message + pkt = voice_start.serialize() + self._send_packet(self.connections[0], pkt, "VOICE_START") + + print(f"{GREEN}[VOICE]{RESET} Initiating voice call (session_id={self.voice_session_id:016x})") + return True + + def accept_voice_call(self, session_id: int, codec_mode: int, fec_type: int): + """Accept an incoming voice call.""" + if not self.connections: + return False + + # Send VOICE_ACK + voice_ack = VoiceAck( + version=0, + status=1, # Accept + codec_mode=codec_mode, + fec_type=fec_type, + session_id=session_id + ) + + pkt = voice_ack.serialize() + self._send_packet(self.connections[0], pkt, "VOICE_ACK") + + self.voice_session_active = True + self.voice_session_id = session_id + + # Initialize voice protocol + if not self.voice_protocol: + self.voice_protocol = VoiceProtocol(self) + + return True + + def end_voice_call(self): + """End the current voice call.""" + if not self.voice_session_active or not self.voice_session_id: + print(f"{YELLOW}[VOICE]{RESET} No active voice call to end") + return False + + if not self.connections: + return False + + # Send VOICE_END + voice_end = VoiceEnd(self.voice_session_id) + pkt = voice_end.serialize() + self._send_packet(self.connections[0], pkt, "VOICE_END") + + self.voice_session_active = False + self.voice_session_id = None + + print(f"{YELLOW}[VOICE]{RESET} Voice call ended") + return True + + def send_voice_audio(self, audio_samples): + """ + Send voice audio samples. + + Args: + audio_samples: PCM audio samples (8kHz, 16-bit) + """ + if not self.voice_session_active: + print(f"{RED}[ERROR]{RESET} No active voice session") + return False + + if not self.voice_protocol: + print(f"{RED}[ERROR]{RESET} Voice protocol not initialized") + return False + + try: + # Process and send audio + modulated = self.voice_protocol.process_voice_input(audio_samples) + if modulated is not None: + # In real implementation, this would go through the audio channel + # For now, we could send it as encrypted data + print(f"{BLUE}[VOICE-AUDIO]{RESET} Processed {len(modulated)} samples") + return True + except Exception as e: + print(f"{RED}[ERROR]{RESET} Voice audio processing failed: {e}") + import traceback + traceback.print_exc() + + return False diff --git a/protocol_prototype/Prototype/Protocol/transmission.py b/protocol_prototype/Prototype/Protocol/transmission.py new file mode 100644 index 0000000..35f3a21 --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/transmission.py @@ -0,0 +1,100 @@ +import socket +import threading +from typing import Callable + +class PeerConnection: + """ + Represents a live, two-way connection to a peer. + We keep a socket open, read data in a background thread, + and can send data from the main thread at any time. + """ + def __init__(self, sock: socket.socket, on_data_received: Callable[['PeerConnection', bytes], None]): + self.sock = sock + self.on_data_received = on_data_received + self.alive = True + + self.read_thread = threading.Thread(target=self.read_loop, daemon=True) + self.read_thread.start() + + def read_loop(self): + while self.alive: + try: + data = self.sock.recv(4096) + if not data: + break + self.on_data_received(self, data) + except OSError: + break + self.alive = False + self.sock.close() + print("[PeerConnection] Connection closed.") + + def send(self, data: bytes): + if not self.alive: + print("[PeerConnection.send] Cannot send, connection not alive.") + return + try: + self.sock.sendall(data) + except OSError: + print("[PeerConnection.send] Send failed, connection might be closed.") + self.alive = False + + def close(self): + self.alive = False + try: + self.sock.shutdown(socket.SHUT_RDWR) + except OSError: + pass + self.sock.close() + + +class ServerListener(threading.Thread): + """ + A thread that listens on a given port. When a new client connects, + it creates a PeerConnection for that client. + """ + def __init__(self, host: str, port: int, + on_new_connection: Callable[[PeerConnection], None], + on_data_received: Callable[[PeerConnection, bytes], None]): + super().__init__(daemon=True) + self.host = host + self.port = port + self.on_new_connection = on_new_connection + self.on_data_received = on_data_received + self.server_socket = None + self.stop_event = threading.Event() + + def run(self): + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.bind((self.host, self.port)) + self.server_socket.listen(5) + self.server_socket.settimeout(1.0) + print(f"[ServerListener] Listening on {self.host}:{self.port}") + + while not self.stop_event.is_set(): + try: + client_sock, addr = self.server_socket.accept() + print(f"[ServerListener] Accepted connection from {addr}") + conn = PeerConnection(client_sock, self.on_data_received) + self.on_new_connection(conn) + except socket.timeout: + pass + except OSError: + break + + if self.server_socket: + self.server_socket.close() + + def stop(self): + self.stop_event.set() + if self.server_socket: + self.server_socket.close() + + +def connect_to_peer(host: str, port: int, + on_data_received: Callable[[PeerConnection, bytes], None]) -> PeerConnection: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + print(f"[connect_to_peer] Connected to {host}:{port}") + conn = PeerConnection(sock, on_data_received) + return conn diff --git a/protocol_prototype/Prototype/Protocol/voice_codec.py b/protocol_prototype/Prototype/Protocol/voice_codec.py new file mode 100644 index 0000000..c8e70be --- /dev/null +++ b/protocol_prototype/Prototype/Protocol/voice_codec.py @@ -0,0 +1,716 @@ +""" +Voice codec integration for encrypted voice over GSM. +Implements Codec2 compression with FSK modulation for transmitting +encrypted voice data over standard GSM voice channels. +""" + +import array +import math +import struct +from typing import Optional, Tuple, List +from dataclasses import dataclass +from enum import IntEnum + +try: + import numpy as np + HAS_NUMPY = True +except ImportError: + HAS_NUMPY = False + +# ANSI colors +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +RESET = "\033[0m" + + +class Codec2Mode(IntEnum): + """Codec2 bitrate modes.""" + MODE_3200 = 0 # 3200 bps + MODE_2400 = 1 # 2400 bps + MODE_1600 = 2 # 1600 bps + MODE_1400 = 3 # 1400 bps + MODE_1300 = 4 # 1300 bps + MODE_1200 = 5 # 1200 bps (recommended for robustness) + MODE_700C = 6 # 700 bps + + +@dataclass +class Codec2Frame: + """Represents a single Codec2 compressed voice frame.""" + mode: Codec2Mode + bits: bytes + timestamp: float + frame_number: int + + +class Codec2Wrapper: + """ + Wrapper for Codec2 voice codec. + In production, this would use py_codec2 or ctypes bindings to libcodec2. + This is a simulation interface for protocol development. + """ + + # Frame sizes in bits for each mode + FRAME_BITS = { + Codec2Mode.MODE_3200: 64, + Codec2Mode.MODE_2400: 48, + Codec2Mode.MODE_1600: 64, + Codec2Mode.MODE_1400: 56, + Codec2Mode.MODE_1300: 52, + Codec2Mode.MODE_1200: 48, + Codec2Mode.MODE_700C: 28 + } + + # Frame duration in ms + FRAME_MS = { + Codec2Mode.MODE_3200: 20, + Codec2Mode.MODE_2400: 20, + Codec2Mode.MODE_1600: 40, + Codec2Mode.MODE_1400: 40, + Codec2Mode.MODE_1300: 40, + Codec2Mode.MODE_1200: 40, + Codec2Mode.MODE_700C: 40 + } + + def __init__(self, mode: Codec2Mode = Codec2Mode.MODE_1200): + """ + Initialize Codec2 wrapper. + + Args: + mode: Codec2 bitrate mode (default 1200 bps for robustness) + """ + self.mode = mode + self.frame_bits = self.FRAME_BITS[mode] + self.frame_bytes = (self.frame_bits + 7) // 8 + self.frame_ms = self.FRAME_MS[mode] + self.frame_samples = int(8000 * self.frame_ms / 1000) # 8kHz sampling + self.frame_counter = 0 + + print(f"{GREEN}[CODEC2]{RESET} Initialized in mode {mode.name} " + f"({self.frame_bits} bits/frame, {self.frame_ms}ms duration)") + + def encode(self, audio_samples) -> Optional[Codec2Frame]: + """ + Encode PCM audio samples to Codec2 frame. + + Args: + audio_samples: PCM samples (8kHz, 16-bit signed) + + Returns: + Codec2Frame or None if insufficient samples + """ + if len(audio_samples) < self.frame_samples: + return None + + # In production: call codec2_encode(state, bits, samples) + # Simulation: create pseudo-compressed data + compressed = self._simulate_compression(audio_samples[:self.frame_samples]) + + frame = Codec2Frame( + mode=self.mode, + bits=compressed, + timestamp=self.frame_counter * self.frame_ms / 1000.0, + frame_number=self.frame_counter + ) + + self.frame_counter += 1 + return frame + + def decode(self, frame: Codec2Frame): + """ + Decode Codec2 frame to PCM audio samples. + + Args: + frame: Codec2 compressed frame + + Returns: + PCM samples (8kHz, 16-bit signed) + """ + if frame.mode != self.mode: + raise ValueError(f"Frame mode {frame.mode} doesn't match decoder mode {self.mode}") + + # In production: call codec2_decode(state, samples, bits) + # Simulation: decompress to audio + return self._simulate_decompression(frame.bits) + + def _simulate_compression(self, samples) -> bytes: + """Simulate Codec2 compression (for testing).""" + # Convert to list if needed + if hasattr(samples, 'tolist'): + sample_list = samples.tolist() + elif hasattr(samples, '__iter__'): + sample_list = list(samples) + else: + sample_list = samples + + # Extract basic features for simulation + if HAS_NUMPY and hasattr(samples, '__array__'): + # Convert to numpy array if needed + np_samples = np.asarray(samples, dtype=np.float32) + if len(np_samples) > 0: + mean_square = np.mean(np_samples ** 2) + energy = np.sqrt(mean_square) if not np.isnan(mean_square) else 0.0 + zero_crossings = np.sum(np.diff(np.sign(np_samples)) != 0) + else: + energy = 0.0 + zero_crossings = 0 + else: + # Manual calculation without numpy + if sample_list and len(sample_list) > 0: + energy = math.sqrt(sum(s**2 for s in sample_list) / len(sample_list)) + zero_crossings = sum(1 for i in range(1, len(sample_list)) + if (sample_list[i-1] >= 0) != (sample_list[i] >= 0)) + else: + energy = 0.0 + zero_crossings = 0 + + # Pack into bytes (simplified) + # Ensure values are valid + energy_int = max(0, min(65535, int(energy))) + zc_int = max(0, min(65535, int(zero_crossings))) + data = struct.pack('<HH', energy_int, zc_int) + + # Pad to expected frame size + data += b'\x00' * (self.frame_bytes - len(data)) + + return data[:self.frame_bytes] + + def _simulate_decompression(self, compressed: bytes): + """Simulate Codec2 decompression (for testing).""" + # Unpack features + if len(compressed) >= 4: + energy, zero_crossings = struct.unpack('<HH', compressed[:4]) + else: + energy, zero_crossings = 1000, 100 + + # Generate synthetic speech-like signal + if HAS_NUMPY: + t = np.linspace(0, self.frame_ms/1000, self.frame_samples) + + # Base frequency from zero crossings + freq = zero_crossings * 10 # Simplified mapping + + # Generate harmonics + signal = np.zeros(self.frame_samples) + for harmonic in range(1, 4): + signal += np.sin(2 * np.pi * freq * harmonic * t) / harmonic + + # Apply energy envelope + signal *= energy / 10000.0 + + # Convert to 16-bit PCM + return (signal * 32767).astype(np.int16) + else: + # Manual generation without numpy + samples = [] + freq = zero_crossings * 10 + + for i in range(self.frame_samples): + t = i / 8000.0 # 8kHz sample rate + value = 0 + for harmonic in range(1, 4): + value += math.sin(2 * math.pi * freq * harmonic * t) / harmonic + + value *= energy / 10000.0 + # Clamp to 16-bit range + sample = int(value * 32767) + sample = max(-32768, min(32767, sample)) + samples.append(sample) + + return array.array('h', samples) + + +class FSKModem: + """ + 4-FSK modem for transmitting digital data over voice channels. + Designed to survive GSM/AMR/EVS vocoders. + """ + + def __init__(self, sample_rate: int = 8000, baud_rate: int = 600): + """ + Initialize FSK modem. + + Args: + sample_rate: Audio sample rate (Hz) + baud_rate: Symbol rate (baud) + """ + self.sample_rate = sample_rate + self.baud_rate = baud_rate + self.samples_per_symbol = int(sample_rate / baud_rate) + + # 4-FSK frequencies (300-3400 Hz band) + self.frequencies = [ + 600, # 00 + 1200, # 01 + 1800, # 10 + 2400 # 11 + ] + + # Preamble for synchronization (800 Hz, 100ms) + self.preamble_freq = 800 + self.preamble_duration = 0.1 # seconds + + print(f"{GREEN}[FSK]{RESET} Initialized 4-FSK modem " + f"({baud_rate} baud, frequencies: {self.frequencies})") + + def modulate(self, data: bytes, add_preamble: bool = True): + """ + Modulate binary data to FSK audio signal. + + Args: + data: Binary data to modulate + add_preamble: Whether to add synchronization preamble + + Returns: + Audio signal (normalized float32 array or list) + """ + # Convert bytes to dibits (2-bit symbols) + symbols = [] + for byte in data: + symbols.extend([ + (byte >> 6) & 0x03, + (byte >> 4) & 0x03, + (byte >> 2) & 0x03, + byte & 0x03 + ]) + + # Generate audio signal + signal = [] + + # Add preamble + if add_preamble: + preamble_samples = int(self.preamble_duration * self.sample_rate) + if HAS_NUMPY: + t = np.arange(preamble_samples) / self.sample_rate + preamble = np.sin(2 * np.pi * self.preamble_freq * t) + signal.extend(preamble) + else: + for i in range(preamble_samples): + t = i / self.sample_rate + value = math.sin(2 * math.pi * self.preamble_freq * t) + signal.append(value) + + # Modulate symbols + for symbol in symbols: + freq = self.frequencies[symbol] + if HAS_NUMPY: + t = np.arange(self.samples_per_symbol) / self.sample_rate + tone = np.sin(2 * np.pi * freq * t) + signal.extend(tone) + else: + for i in range(self.samples_per_symbol): + t = i / self.sample_rate + value = math.sin(2 * math.pi * freq * t) + signal.append(value) + + # Apply smoothing to reduce clicks + if HAS_NUMPY: + audio = np.array(signal, dtype=np.float32) + else: + audio = array.array('f', signal) + audio = self._apply_envelope(audio) + + return audio + + def demodulate(self, audio) -> Tuple[bytes, float]: + """ + Demodulate FSK audio signal to binary data. + + Args: + audio: Audio signal + + Returns: + Tuple of (demodulated data, confidence score) + """ + # Find preamble + preamble_start = self._find_preamble(audio) + if preamble_start < 0: + return b'', 0.0 + + # Skip preamble + data_start = preamble_start + int(self.preamble_duration * self.sample_rate) + + # Demodulate symbols + symbols = [] + confidence_scores = [] + + pos = data_start + while pos + self.samples_per_symbol <= len(audio): + symbol_audio = audio[pos:pos + self.samples_per_symbol] + symbol, confidence = self._demodulate_symbol(symbol_audio) + symbols.append(symbol) + confidence_scores.append(confidence) + pos += self.samples_per_symbol + + # Convert symbols to bytes + data = bytearray() + for i in range(0, len(symbols), 4): + if i + 3 < len(symbols): + byte = (symbols[i] << 6) | (symbols[i+1] << 4) | (symbols[i+2] << 2) | symbols[i+3] + data.append(byte) + + if HAS_NUMPY and confidence_scores: + avg_confidence = np.mean(confidence_scores) + else: + avg_confidence = sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0.0 + return bytes(data), avg_confidence + + def _find_preamble(self, audio) -> int: + """Find preamble in audio signal.""" + # Simple energy-based detection + window_size = int(0.01 * self.sample_rate) # 10ms window + + if HAS_NUMPY: + for i in range(0, len(audio) - window_size, window_size // 2): + window = audio[i:i + window_size] + + # Check for preamble frequency + fft = np.fft.fft(window) + freqs = np.fft.fftfreq(len(window), 1/self.sample_rate) + + # Find peak near preamble frequency + idx = np.argmax(np.abs(fft[:len(fft)//2])) + peak_freq = abs(freqs[idx]) + + if abs(peak_freq - self.preamble_freq) < 50: # 50 Hz tolerance + return i + else: + # Simple zero-crossing based detection without FFT + for i in range(0, len(audio) - window_size, window_size // 2): + window = list(audio[i:i + window_size]) + + # Count zero crossings + zero_crossings = 0 + for j in range(1, len(window)): + if (window[j-1] >= 0) != (window[j] >= 0): + zero_crossings += 1 + + # Estimate frequency from zero crossings + estimated_freq = (zero_crossings * self.sample_rate) / (2 * len(window)) + + if abs(estimated_freq - self.preamble_freq) < 100: # 100 Hz tolerance + return i + + return -1 + + def _demodulate_symbol(self, audio) -> Tuple[int, float]: + """Demodulate a single FSK symbol.""" + if HAS_NUMPY: + # FFT-based demodulation + fft = np.fft.fft(audio) + freqs = np.fft.fftfreq(len(audio), 1/self.sample_rate) + magnitude = np.abs(fft[:len(fft)//2]) + + # Find energy at each FSK frequency + energies = [] + for freq in self.frequencies: + idx = np.argmin(np.abs(freqs[:len(freqs)//2] - freq)) + energy = magnitude[idx] + energies.append(energy) + + # Select symbol with highest energy + symbol = np.argmax(energies) + else: + # Goertzel algorithm for specific frequency detection + audio_list = list(audio) if hasattr(audio, '__iter__') else audio + energies = [] + + for freq in self.frequencies: + # Goertzel algorithm + omega = 2 * math.pi * freq / self.sample_rate + coeff = 2 * math.cos(omega) + + s_prev = 0 + s_prev2 = 0 + + for sample in audio_list: + s = sample + coeff * s_prev - s_prev2 + s_prev2 = s_prev + s_prev = s + + # Calculate magnitude + power = s_prev2 * s_prev2 + s_prev * s_prev - coeff * s_prev * s_prev2 + energies.append(math.sqrt(abs(power))) + + # Select symbol with highest energy + symbol = energies.index(max(energies)) + + # Confidence is ratio of strongest to second strongest + sorted_energies = sorted(energies, reverse=True) + confidence = sorted_energies[0] / (sorted_energies[1] + 1e-6) + + return symbol, min(confidence, 10.0) / 10.0 + + def _apply_envelope(self, audio): + """Apply smoothing envelope to reduce clicks.""" + # Simple raised cosine envelope + ramp_samples = int(0.002 * self.sample_rate) # 2ms ramps + + if len(audio) > 2 * ramp_samples: + if HAS_NUMPY: + # Fade in + t = np.linspace(0, np.pi/2, ramp_samples) + audio[:ramp_samples] *= np.sin(t) ** 2 + + # Fade out + audio[-ramp_samples:] *= np.sin(t[::-1]) ** 2 + else: + # Manual fade in + for i in range(ramp_samples): + t = (i / ramp_samples) * (math.pi / 2) + factor = math.sin(t) ** 2 + audio[i] *= factor + + # Manual fade out + for i in range(ramp_samples): + t = ((ramp_samples - 1 - i) / ramp_samples) * (math.pi / 2) + factor = math.sin(t) ** 2 + audio[-(i+1)] *= factor + + return audio + + +class VoiceProtocol: + """ + Integrates voice codec and modem with the Icing protocol + for encrypted voice transmission over GSM. + """ + + def __init__(self, protocol_instance): + """ + Initialize voice protocol handler. + + Args: + protocol_instance: IcingProtocol instance + """ + self.protocol = protocol_instance + self.codec = Codec2Wrapper(Codec2Mode.MODE_1200) + self.modem = FSKModem(sample_rate=8000, baud_rate=600) + + # Voice crypto state + self.voice_iv_counter = 0 + self.voice_sequence = 0 + + # Buffers + if HAS_NUMPY: + self.audio_buffer = np.array([], dtype=np.int16) + else: + self.audio_buffer = array.array('h') # 16-bit signed integers + self.frame_buffer = [] + + print(f"{GREEN}[VOICE]{RESET} Voice protocol initialized") + + def process_voice_input(self, audio_samples): + """ + Process voice input: compress, encrypt, and modulate. + + Args: + audio_samples: PCM audio samples (8kHz, 16-bit) + + Returns: + Modulated audio signal ready for transmission (numpy array or array.array) + """ + # Add to buffer + if HAS_NUMPY: + self.audio_buffer = np.concatenate([self.audio_buffer, audio_samples]) + else: + self.audio_buffer.extend(audio_samples) + + # Process complete frames + modulated_audio = [] + + while len(self.audio_buffer) >= self.codec.frame_samples: + # Extract frame + if HAS_NUMPY: + frame_audio = self.audio_buffer[:self.codec.frame_samples] + self.audio_buffer = self.audio_buffer[self.codec.frame_samples:] + else: + frame_audio = array.array('h', self.audio_buffer[:self.codec.frame_samples]) + del self.audio_buffer[:self.codec.frame_samples] + + # Compress with Codec2 + compressed_frame = self.codec.encode(frame_audio) + if not compressed_frame: + continue + + # Encrypt frame + encrypted = self._encrypt_voice_frame(compressed_frame) + + # Add FEC + protected = self._add_fec(encrypted) + + # Modulate to audio + audio_signal = self.modem.modulate(protected, add_preamble=True) + modulated_audio.append(audio_signal) + + if modulated_audio: + if HAS_NUMPY: + return np.concatenate(modulated_audio) + else: + # Concatenate array.array objects + result = array.array('f') + for audio in modulated_audio: + result.extend(audio) + return result + return None + + def process_voice_output(self, modulated_audio): + """ + Process received audio: demodulate, decrypt, and decompress. + + Args: + modulated_audio: Received FSK-modulated audio + + Returns: + Decoded PCM audio samples (numpy array or array.array) + """ + # Demodulate + data, confidence = self.modem.demodulate(modulated_audio) + + if confidence < 0.5: + print(f"{YELLOW}[VOICE]{RESET} Low demodulation confidence: {confidence:.2f}") + return None + + # Remove FEC + frame_data = self._remove_fec(data) + if not frame_data: + return None + + # Decrypt + compressed_frame = self._decrypt_voice_frame(frame_data) + if not compressed_frame: + return None + + # Decompress + audio_samples = self.codec.decode(compressed_frame) + + return audio_samples + + def _encrypt_voice_frame(self, frame: Codec2Frame) -> bytes: + """Encrypt a voice frame using ChaCha20-CTR.""" + if not self.protocol.hkdf_key: + raise ValueError("No encryption key available") + + # Prepare frame data + frame_data = struct.pack('<BIH', + frame.mode, + frame.frame_number, + len(frame.bits) + ) + frame.bits + + # Generate IV for this frame (ChaCha20 needs 16 bytes) + iv = struct.pack('<Q', self.voice_iv_counter) + b'\x00' * 8 # 8 + 8 = 16 bytes + self.voice_iv_counter += 1 + + # Encrypt using ChaCha20 + from encryption import chacha20_encrypt + key = bytes.fromhex(self.protocol.hkdf_key) + encrypted = chacha20_encrypt(frame_data, key, iv) + + # Add sequence number and IV hint + return struct.pack('<HQ', self.voice_sequence, self.voice_iv_counter) + encrypted + + def _decrypt_voice_frame(self, data: bytes) -> Optional[Codec2Frame]: + """Decrypt a voice frame.""" + if len(data) < 10: + return None + + # Extract sequence and IV hint + sequence, iv_hint = struct.unpack('<HQ', data[:10]) + encrypted = data[10:] + + # Generate IV (16 bytes for ChaCha20) + iv = struct.pack('<Q', iv_hint) + b'\x00' * 8 + + # Decrypt + from encryption import chacha20_decrypt + key = bytes.fromhex(self.protocol.hkdf_key) + + try: + decrypted = chacha20_decrypt(encrypted, key, iv) + + # Parse frame + mode, frame_num, bits_len = struct.unpack('<BIH', decrypted[:7]) + bits = decrypted[7:7+bits_len] + + return Codec2Frame( + mode=Codec2Mode(mode), + bits=bits, + timestamp=0, # Will be set by caller + frame_number=frame_num + ) + except Exception as e: + print(f"{RED}[VOICE]{RESET} Decryption failed: {e}") + return None + + def _add_fec(self, data: bytes) -> bytes: + """Add forward error correction.""" + # Simple repetition code (3x) for testing + # In production: use convolutional code or LDPC + fec_data = bytearray() + + for byte in data: + # Repeat each byte 3 times + fec_data.extend([byte, byte, byte]) + + return bytes(fec_data) + + def _remove_fec(self, data: bytes) -> Optional[bytes]: + """Remove FEC and correct errors.""" + if len(data) % 3 != 0: + return None + + corrected = bytearray() + + for i in range(0, len(data), 3): + # Majority voting + votes = [data[i], data[i+1], data[i+2]] + byte_value = max(set(votes), key=votes.count) + corrected.append(byte_value) + + return bytes(corrected) + + +# Example usage +if __name__ == "__main__": + # Test Codec2 wrapper + print(f"\n{BLUE}=== Testing Codec2 Wrapper ==={RESET}") + codec = Codec2Wrapper(Codec2Mode.MODE_1200) + + # Generate test audio + if HAS_NUMPY: + t = np.linspace(0, 0.04, 320) # 40ms at 8kHz + test_audio = (np.sin(2 * np.pi * 440 * t) * 16384).astype(np.int16) + else: + test_audio = array.array('h') + for i in range(320): + t = i * 0.04 / 320 + value = int(math.sin(2 * math.pi * 440 * t) * 16384) + test_audio.append(value) + + # Encode + frame = codec.encode(test_audio) + print(f"Encoded frame: {len(frame.bits)} bytes") + + # Decode + decoded = codec.decode(frame) + print(f"Decoded audio: {len(decoded)} samples") + + # Test FSK modem + print(f"\n{BLUE}=== Testing FSK Modem ==={RESET}") + modem = FSKModem() + + # Test data + test_data = b"Hello, secure voice!" + + # Modulate + modulated = modem.modulate(test_data) + print(f"Modulated: {len(modulated)} samples ({len(modulated)/8000:.2f}s)") + + # Demodulate + demodulated, confidence = modem.demodulate(modulated) + print(f"Demodulated: {demodulated}") + print(f"Confidence: {confidence:.2%}") + print(f"Match: {demodulated == test_data}") \ No newline at end of file