diff --git a/protocol_prototype/IcingProtocol.drawio b/protocol_prototype/IcingProtocol.drawio index cbb1e62..8f46988 100644 --- a/protocol_prototype/IcingProtocol.drawio +++ b/protocol_prototype/IcingProtocol.drawio @@ -260,11 +260,11 @@ - + - + @@ -281,55 +281,58 @@ - + - + + + + - + - + - + - + - + - + - + - + - + - + - + - + - + - + @@ -337,28 +340,28 @@ - + - + - + - + - + @@ -366,25 +369,25 @@ - + - + - + - + @@ -392,7 +395,7 @@ - + @@ -400,58 +403,162 @@ - + - + - + - + - + - + - + - + - + - + - - + + - + - + - + - + - + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/protocol_prototype/auto_mode.py b/protocol_prototype/auto_mode.py new file mode 100644 index 0000000..690ba9c --- /dev/null +++ b/protocol_prototype/auto_mode.py @@ -0,0 +1,430 @@ +import time +import threading +import queue +from typing import Optional, Dict, Any, List, Callable, Tuple + +# ANSI colors for logging +RED = "\033[91m" +GREEN = "\033[92m" +YELLOW = "\033[93m" +BLUE = "\033[94m" +RESET = "\033[0m" + +class AutoModeConfig: + """Configuration parameters for the automatic mode behavior.""" + def __init__(self): + # Ping behavior + self.ping_response_accept = True # Whether to accept incoming pings + self.ping_auto_initiate = False # Whether to initiate pings when connected + self.ping_retry_count = 3 # Number of ping retries + self.ping_retry_delay = 5.0 # Seconds between ping retries + self.ping_timeout = 10.0 # Seconds to wait for ping response + self.preferred_cipher = 0 # 0=AES-GCM, 1=ChaCha20-Poly1305 + + # Handshake behavior + self.handshake_retry_count = 3 # Number of handshake retries + self.handshake_retry_delay = 5.0 # Seconds between handshake retries + self.handshake_timeout = 10.0 # Seconds to wait for handshake + + # Messaging behavior + self.auto_message_enabled = False # Whether to auto-send messages + self.message_interval = 10.0 # Seconds between auto messages + self.message_content = "Hello, secure world!" # Default message + + # General behavior + self.active_mode = False # If true, initiates protocol instead of waiting + + +class AutoMode: + """ + Manages automated behavior for the Icing protocol. + Handles automatic progression through the protocol stages: + 1. Connection setup + 2. Ping/discovery + 3. Key exchange + 4. Encrypted communication + """ + + def __init__(self, protocol_interface): + """ + Initialize the AutoMode manager. + + Args: + protocol_interface: An object implementing the required protocol methods + """ + self.protocol = protocol_interface + self.config = AutoModeConfig() + self.active = False + self.state = "idle" + + # Message queue for automated sending + self.message_queue = queue.Queue() + + # Tracking variables + self.ping_attempts = 0 + self.handshake_attempts = 0 + self.last_action_time = 0 + self.timer_tasks = [] # List of active timer tasks (for cleanup) + + def start(self): + """Start the automatic mode.""" + if self.active: + return + + self.active = True + self.state = "idle" + self.ping_attempts = 0 + self.handshake_attempts = 0 + self.last_action_time = time.time() + + self._log_info("Automatic mode started") + + # Start in active mode if configured + if self.config.active_mode and self.protocol.connections: + self._start_ping_sequence() + + def stop(self): + """Stop the automatic mode and clean up any pending tasks.""" + if not self.active: + return + + # Cancel any pending timers + for timer in self.timer_tasks: + if timer.is_alive(): + timer.cancel() + self.timer_tasks = [] + + self.active = False + self.state = "idle" + self._log_info("Automatic mode stopped") + + def handle_connection_established(self): + """Called when a new connection is established.""" + if not self.active: + return + + self._log_info("Connection established") + + # If in active mode, start pinging + if self.config.active_mode: + self._start_ping_sequence() + + def handle_ping_received(self, index: int): + """ + Handle a received ping request. + + Args: + index: Index of the ping request in the protocol's inbound message queue + """ + if not self.active or not self._is_valid_message_index(index): + return + + self._log_info(f"Ping request received (index={index})") + + # Automatically respond to ping if configured to accept + if self.config.ping_response_accept: + self._log_info(f"Auto-responding to ping with accept={self.config.ping_response_accept}") + try: + # Schedule the response with a small delay to simulate real behavior + timer = threading.Timer(0.5, self._respond_to_ping, args=[index]) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + except Exception as e: + self._log_error(f"Failed to auto-respond to ping: {e}") + + def handle_ping_response_received(self, accepted: bool): + """ + Handle a received ping response. + + Args: + accepted: Whether the ping was accepted + """ + if not self.active: + return + + self.ping_attempts = 0 # Reset ping attempts counter + + if accepted: + self._log_info("Ping accepted! Proceeding with handshake") + # Send handshake if not already done + if self.state != "handshake_sent": + self._ensure_ephemeral_keys() + self._start_handshake_sequence() + else: + self._log_info("Ping rejected by peer. Stopping auto-protocol sequence.") + self.state = "idle" + + def handle_handshake_received(self, index: int): + """ + Handle a received handshake. + + Args: + index: Index of the handshake in the protocol's inbound message queue + """ + if not self.active or not self._is_valid_message_index(index): + return + + self._log_info(f"Handshake received (index={index})") + + try: + # Ensure we have ephemeral keys + self._ensure_ephemeral_keys() + + # Process the handshake (compute ECDH) + self.protocol.generate_ecdhe(index) + + # Derive HKDF key + self.protocol.derive_hkdf() + + # If we haven't sent our handshake yet, send it + if self.state != "handshake_sent": + timer = threading.Timer(0.5, self.protocol.send_handshake) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + self.state = "handshake_sent" + else: + self.state = "key_exchange_complete" + + # Start sending queued messages if auto messaging is enabled + if self.config.auto_message_enabled: + self._start_message_sequence() + + except Exception as e: + self._log_error(f"Failed to process handshake: {e}") + + def handle_encrypted_received(self, index: int): + """ + Handle a received encrypted message. + + Args: + index: Index of the encrypted message in the protocol's inbound message queue + """ + if not self.active or not self._is_valid_message_index(index): + return + + # Try to decrypt automatically + try: + plaintext = self.protocol.decrypt_received_message(index) + self._log_info(f"Auto-decrypted message: {plaintext}") + except Exception as e: + self._log_error(f"Failed to auto-decrypt message: {e}") + + def queue_message(self, message: str): + """ + Add a message to the auto-send queue. + + Args: + message: Message text to send + """ + self.message_queue.put(message) + self._log_info(f"Message queued for sending: {message}") + + # If we're in the right state, start sending messages + if self.active and self.state == "key_exchange_complete" and self.config.auto_message_enabled: + self._process_message_queue() + + def _start_ping_sequence(self): + """Start the ping sequence to discover the peer.""" + if self.ping_attempts >= self.config.ping_retry_count: + self._log_warning(f"Maximum ping attempts ({self.config.ping_retry_count}) reached") + self.state = "idle" + return + + self.state = "pinging" + self.ping_attempts += 1 + + self._log_info(f"Sending ping request (attempt {self.ping_attempts}/{self.config.ping_retry_count})") + try: + self.protocol.send_ping_request(self.config.preferred_cipher) + self.last_action_time = time.time() + + # Schedule next ping attempt if needed + timer = threading.Timer( + self.config.ping_retry_delay, + self._check_ping_response + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + + except Exception as e: + self._log_error(f"Failed to send ping: {e}") + + def _check_ping_response(self): + """Check if we got a ping response, retry if not.""" + if not self.active or self.state != "pinging": + return + + # If we've waited long enough for a response, retry + if time.time() - self.last_action_time >= self.config.ping_timeout: + self._log_warning("No ping response received, retrying") + self._start_ping_sequence() + + def _respond_to_ping(self, index: int): + """ + Respond to a ping request. + + Args: + index: Index of the ping request in the inbound messages + """ + if not self.active or not self._is_valid_message_index(index): + return + + try: + answer = 1 if self.config.ping_response_accept else 0 + self.protocol.respond_to_ping(index, answer) + + if answer == 1: + # If we accepted, we should expect a handshake + self.state = "accepted_ping" + self._ensure_ephemeral_keys() + + # Set a timer to send our handshake if we don't receive one + timer = threading.Timer( + self.config.handshake_timeout, + self._check_handshake_received + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + self.last_action_time = time.time() + + except Exception as e: + self._log_error(f"Failed to respond to ping: {e}") + + def _check_handshake_received(self): + """Check if we've received a handshake after accepting a ping.""" + if not self.active or self.state != "accepted_ping": + return + + # If we've waited long enough and haven't received a handshake, initiate one + if time.time() - self.last_action_time >= self.config.handshake_timeout: + self._log_warning("No handshake received after accepting ping, initiating handshake") + self._start_handshake_sequence() + + def _start_handshake_sequence(self): + """Start the handshake sequence.""" + if self.handshake_attempts >= self.config.handshake_retry_count: + self._log_warning(f"Maximum handshake attempts ({self.config.handshake_retry_count}) reached") + self.state = "idle" + return + + self.state = "handshake_sent" + self.handshake_attempts += 1 + + self._log_info(f"Sending handshake (attempt {self.handshake_attempts}/{self.config.handshake_retry_count})") + try: + self.protocol.send_handshake() + self.last_action_time = time.time() + + # Schedule handshake retry check + timer = threading.Timer( + self.config.handshake_retry_delay, + self._check_handshake_response + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + + except Exception as e: + self._log_error(f"Failed to send handshake: {e}") + + def _check_handshake_response(self): + """Check if we've completed the key exchange, retry handshake if not.""" + if not self.active or self.state != "handshake_sent": + return + + # If we've waited long enough for a response, retry + if time.time() - self.last_action_time >= self.config.handshake_timeout: + self._log_warning("No handshake response received, retrying") + self._start_handshake_sequence() + + def _start_message_sequence(self): + """Start the automated message sending sequence.""" + if not self.config.auto_message_enabled: + return + + self._log_info("Starting automated message sequence") + + # Add the default message if queue is empty + if self.message_queue.empty(): + self.message_queue.put(self.config.message_content) + + # Start processing the queue + self._process_message_queue() + + def _process_message_queue(self): + """Process messages in the queue and send them.""" + if not self.active or self.state != "key_exchange_complete" or not self.config.auto_message_enabled: + return + + if not self.message_queue.empty(): + message = self.message_queue.get() + self._log_info(f"Sending queued message: {message}") + + try: + self.protocol.send_encrypted_message(message) + + # Schedule next message send + timer = threading.Timer( + self.config.message_interval, + self._process_message_queue + ) + timer.daemon = True + timer.start() + self.timer_tasks.append(timer) + + except Exception as e: + self._log_error(f"Failed to send queued message: {e}") + # Put the message back in the queue + self.message_queue.put(message) + + def _ensure_ephemeral_keys(self): + """Ensure ephemeral keys are generated if needed.""" + if not hasattr(self.protocol, 'ephemeral_pubkey') or self.protocol.ephemeral_pubkey is None: + self._log_info("Generating ephemeral keys") + self.protocol.generate_ephemeral_keys() + + def _is_valid_message_index(self, index: int) -> bool: + """ + Check if a message index is valid in the protocol's inbound_messages queue. + + Args: + index: The index to check + + Returns: + bool: True if the index is valid, False otherwise + """ + if not hasattr(self.protocol, 'inbound_messages'): + self._log_error("Protocol has no inbound_messages attribute") + return False + + if index < 0 or index >= len(self.protocol.inbound_messages): + self._log_error(f"Invalid message index: {index}") + return False + + return True + + # Helper methods for logging + def _log_info(self, message: str): + print(f"{BLUE}[AUTO]{RESET} {message}") + if hasattr(self, 'verbose_logging') and self.verbose_logging: + state_info = f"(state={self.state})" + if 'pinging' in self.state and hasattr(self, 'ping_attempts'): + state_info += f", attempts={self.ping_attempts}/{self.config.ping_retry_count}" + elif 'handshake' in self.state and hasattr(self, 'handshake_attempts'): + state_info += f", attempts={self.handshake_attempts}/{self.config.handshake_retry_count}" + print(f"{BLUE}[AUTO-DETAIL]{RESET} {state_info}") + + def _log_warning(self, message: str): + print(f"{YELLOW}[AUTO-WARN]{RESET} {message}") + if hasattr(self, 'verbose_logging') and self.verbose_logging: + timer_info = f"Active timers: {len(self.timer_tasks)}" + print(f"{YELLOW}[AUTO-WARN-DETAIL]{RESET} {timer_info}") + + def _log_error(self, message: str): + print(f"{RED}[AUTO-ERROR]{RESET} {message}") + if hasattr(self, 'verbose_logging') and self.verbose_logging: + print(f"{RED}[AUTO-ERROR-DETAIL]{RESET} Current state: {self.state}, Active: {self.active}") \ No newline at end of file diff --git a/protocol_prototype/cli.py b/protocol_prototype/cli.py index 600376b..53c3f01 100644 --- a/protocol_prototype/cli.py +++ b/protocol_prototype/cli.py @@ -1,113 +1,328 @@ import sys +import argparse +import shlex from protocol import IcingProtocol RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" +MAGENTA = "\033[95m" +CYAN = "\033[96m" RESET = "\033[0m" +def print_help(): + """Display all available commands.""" + print(f"\n{YELLOW}=== Available Commands ==={RESET}") + print(f"\n{CYAN}Basic Protocol Commands:{RESET}") + print(" help - Show this help message") + print(" peer_id - Set peer identity public key") + print(" connect - Connect to a peer at the specified port") + print(" show_state - Display current protocol state") + print(" exit - Exit the program") + + print(f"\n{CYAN}Manual Protocol Operation:{RESET}") + print(" generate_ephemeral_keys - Generate ephemeral ECDH keys") + print(" send_ping [cipher] - Send PING request (cipher: 0=AES-GCM, 1=ChaCha20-Poly1305, default: 0)") + print(" respond_ping <0|1> - Respond to a PING (0=reject, 1=accept)") + print(" send_handshake - Send handshake with ephemeral keys") + print(" generate_ecdhe - Process handshake at specified index") + print(" derive_hkdf - Derive encryption key using HKDF") + print(" send_encrypted - Encrypt and send a message") + print(" decrypt <index> - Decrypt received message at index") + + print(f"\n{CYAN}Automatic Mode Commands:{RESET}") + print(" auto start - Start automatic mode") + print(" auto stop - Stop automatic mode") + print(" auto status - Show current auto mode status and configuration") + print(" auto config <param> <value> - Configure auto mode parameters") + print(" auto config list - Show all configurable parameters") + print(" auto message <text> - Queue message for automatic sending") + print(" auto passive - Configure as passive peer (responds to pings but doesn't initiate)") + print(" auto active - Configure as active peer (initiates protocol)") + print(" auto log - Toggle detailed logging for auto mode") + + print(f"\n{CYAN}Debugging Commands:{RESET}") + print(" debug_message <index> - Display detailed information about a message in the queue") + + print(f"\n{CYAN}Legacy Commands:{RESET}") + print(" auto_responder <on|off> - Enable/disable legacy auto responder (deprecated)") + def main(): protocol = IcingProtocol() print(f"{YELLOW}\n======================================") - print(" Icing Protocol - Manual CLI Demo ") + print(" Icing Protocol - Secure Communication ") print("======================================\n" + RESET) - print(f"Listening on port: {protocol.local_port}") print(f"Your identity public key (hex): {protocol.identity_pubkey.hex()}") - print("\nAvailable commands:") - print(" set_peer_identity <hex_pubkey>") - print(" connect <port>") - print(" generate_ephemeral_keys") - print(" send_ping") - print(" send_handshake") - print(" respond_ping <index> <0|1>") - print(" generate_ecdhe <index>") - print(" auto_responder <on|off>") - print(" show_state") - print(" exit\n") + print_help() while True: try: - line = input("Cmd> ").strip() + line = input(f"{MAGENTA}Cmd>{RESET} ").strip() except EOFError: break if not line: continue - - parts = line.split() + + parts = shlex.split(line) # Handle quoted arguments properly cmd = parts[0].lower() - - if cmd == "exit": - protocol.stop() - sys.exit(0) - - elif cmd == "show_state": - protocol.show_state() - - elif cmd == "set_peer_identity": - if len(parts) != 2: - print(f"{RED}Usage: set_peer_identity <hex_pubkey>{RESET}") - continue - protocol.set_peer_identity(parts[1]) - - elif cmd == "connect": - if len(parts) != 2: - print(f"{RED}Usage: connect <port>{RESET}") - continue - try: - port = int(parts[1]) - protocol.connect_to_peer(port) - except ValueError: - print(f"{RED}Invalid port.{RESET}") - - elif cmd == "generate_ephemeral_keys": - protocol.generate_ephemeral_keys() - - elif cmd == "send_ping": - protocol.send_ping_request() - - elif cmd == "send_handshake": - protocol.send_handshake() - - elif cmd == "respond_ping": - if len(parts) != 3: - print(f"{RED}Usage: respond_ping <index> <answer_code>{RESET}") - continue - try: - idx = int(parts[1]) - ac = int(parts[2]) - protocol.respond_to_ping(idx, ac) - except ValueError: - print(f"{RED}Index and answer_code must be integers.{RESET}") - - elif cmd == "generate_ecdhe": - if len(parts) != 2: - print(f"{RED}Usage: generate_ecdhe <index>{RESET}") - continue - try: - idx = int(parts[1]) - protocol.generate_ecdhe(idx) - except ValueError: - print(f"{RED}Index must be an integer.{RESET}") - - elif cmd == "auto_responder": - if len(parts) != 2: - print(f"{RED}Usage: auto_responder <on|off>{RESET}") - continue - arg = parts[1].lower() - if arg == "on": - protocol.enable_auto_responder(True) - elif arg == "off": - protocol.enable_auto_responder(False) + + try: + # Basic commands + if cmd == "exit": + protocol.stop() + break + + elif cmd == "help": + print_help() + + elif cmd == "show_state": + protocol.show_state() + + elif cmd == "peer_id": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: peer_id <hex_pubkey>") + continue + try: + protocol.set_peer_identity(parts[1]) + except ValueError as e: + print(f"{RED}[ERROR]{RESET} Invalid public key: {e}") + + elif cmd == "connect": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: connect <port>") + continue + try: + port = int(parts[1]) + protocol.connect_to_peer(port) + except ValueError: + print(f"{RED}[ERROR]{RESET} Invalid port number.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Connection failed: {e}") + + # Manual protocol operation + elif cmd == "generate_ephemeral_keys": + protocol.generate_ephemeral_keys() + + elif cmd == "send_ping": + # Optional cipher parameter (0 = AES-GCM, 1 = ChaCha20-Poly1305) + cipher = 0 # Default to AES-GCM + if len(parts) >= 2: + try: + cipher = int(parts[1]) + if cipher not in (0, 1): + print(f"{YELLOW}[WARNING]{RESET} Unsupported cipher code {cipher}. Using AES-GCM (0).") + cipher = 0 + except ValueError: + print(f"{YELLOW}[WARNING]{RESET} Invalid cipher code. Using AES-GCM (0).") + protocol.send_ping_request(cipher) + + elif cmd == "send_handshake": + protocol.send_handshake() + + elif cmd == "respond_ping": + if len(parts) != 3: + print(f"{RED}[ERROR]{RESET} Usage: respond_ping <index> <0|1>") + continue + try: + idx = int(parts[1]) + answer = int(parts[2]) + if answer not in (0, 1): + print(f"{RED}[ERROR]{RESET} Answer must be 0 (reject) or 1 (accept).") + continue + protocol.respond_to_ping(idx, answer) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index and answer must be integers.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to respond to ping: {e}") + + elif cmd == "generate_ecdhe": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: generate_ecdhe <index>") + continue + try: + idx = int(parts[1]) + protocol.generate_ecdhe(idx) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index must be an integer.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to process handshake: {e}") + + elif cmd == "derive_hkdf": + try: + protocol.derive_hkdf() + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to derive HKDF key: {e}") + + elif cmd == "send_encrypted": + if len(parts) < 2: + print(f"{RED}[ERROR]{RESET} Usage: send_encrypted <plaintext>") + continue + plaintext = " ".join(parts[1:]) + try: + protocol.send_encrypted_message(plaintext) + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to send encrypted message: {e}") + + elif cmd == "decrypt": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: decrypt <index>") + continue + try: + idx = int(parts[1]) + protocol.decrypt_received_message(idx) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index must be an integer.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to decrypt message: {e}") + + # Debugging commands + elif cmd == "debug_message": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: debug_message <index>") + continue + try: + idx = int(parts[1]) + protocol.debug_message(idx) + except ValueError: + print(f"{RED}[ERROR]{RESET} Index must be an integer.") + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to debug message: {e}") + + # Automatic mode commands + elif cmd == "auto": + if len(parts) < 2: + print(f"{RED}[ERROR]{RESET} Usage: auto <command> [options]") + print("Available commands: start, stop, status, config, message, passive, active") + continue + + subcmd = parts[1].lower() + + if subcmd == "start": + protocol.start_auto_mode() + print(f"{GREEN}[AUTO]{RESET} Automatic mode started") + + elif subcmd == "stop": + protocol.stop_auto_mode() + print(f"{GREEN}[AUTO]{RESET} Automatic mode stopped") + + elif subcmd == "status": + config = protocol.get_auto_mode_config() + print(f"{YELLOW}=== Auto Mode Status ==={RESET}") + print(f"Active: {protocol.auto_mode.active}") + print(f"State: {protocol.auto_mode.state}") + print(f"\n{YELLOW}--- Configuration ---{RESET}") + for key, value in vars(config).items(): + print(f" {key}: {value}") + + elif subcmd == "config": + if len(parts) < 3: + print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value> or auto config list") + continue + + if parts[2].lower() == "list": + config = protocol.get_auto_mode_config() + print(f"{YELLOW}=== Auto Mode Configuration Parameters ==={RESET}") + for key, value in vars(config).items(): + print(f" {key} ({type(value).__name__}): {value}") + continue + + if len(parts) != 4: + print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value>") + continue + + param = parts[2] + value_str = parts[3] + + # Convert the string value to the appropriate type + config = protocol.get_auto_mode_config() + if not hasattr(config, param): + print(f"{RED}[ERROR]{RESET} Unknown parameter: {param}") + print("Use 'auto config list' to see all available parameters") + continue + + current_value = getattr(config, param) + try: + if isinstance(current_value, bool): + if value_str.lower() in ("true", "yes", "on", "1"): + value = True + elif value_str.lower() in ("false", "no", "off", "0"): + value = False + else: + raise ValueError(f"Boolean value must be true/false/yes/no/on/off/1/0") + elif isinstance(current_value, int): + value = int(value_str) + elif isinstance(current_value, float): + value = float(value_str) + elif isinstance(current_value, str): + value = value_str + else: + value = value_str # Default to string + + protocol.configure_auto_mode(**{param: value}) + print(f"{GREEN}[AUTO]{RESET} Set {param} = {value}") + + except ValueError as e: + print(f"{RED}[ERROR]{RESET} Invalid value for {param}: {e}") + + elif subcmd == "message": + if len(parts) < 3: + print(f"{RED}[ERROR]{RESET} Usage: auto message <text>") + continue + + message = " ".join(parts[2:]) + protocol.queue_auto_message(message) + print(f"{GREEN}[AUTO]{RESET} Message queued for sending: {message}") + + elif subcmd == "passive": + # Configure as passive peer (responds but doesn't initiate) + protocol.configure_auto_mode( + ping_response_accept=True, + ping_auto_initiate=False, + active_mode=False + ) + print(f"{GREEN}[AUTO]{RESET} Configured as passive peer") + + elif subcmd == "active": + # Configure as active peer (initiates protocol) + protocol.configure_auto_mode( + ping_response_accept=True, + ping_auto_initiate=True, + active_mode=True + ) + print(f"{GREEN}[AUTO]{RESET} Configured as active peer") + + else: + print(f"{RED}[ERROR]{RESET} Unknown auto mode command: {subcmd}") + print("Available commands: start, stop, status, config, message, passive, active") + + # Legacy commands + elif cmd == "auto_responder": + if len(parts) != 2: + print(f"{RED}[ERROR]{RESET} Usage: auto_responder <on|off>") + continue + val = parts[1].lower() + if val not in ("on", "off"): + print(f"{RED}[ERROR]{RESET} Value must be 'on' or 'off'.") + continue + protocol.enable_auto_responder(val == "on") + print(f"{YELLOW}[WARNING]{RESET} Using legacy auto responder. Consider using 'auto' commands instead.") + else: - print(f"{RED}Usage: auto_responder <on|off>{RESET}") - - else: - print(f"{RED}[ERROR]{RESET} Unknown command: {cmd}") - + print(f"{RED}[ERROR]{RESET} Unknown command: {cmd}") + print("Type 'help' for a list of available commands.") + + except Exception as e: + print(f"{RED}[ERROR]{RESET} Command failed: {e}") if __name__ == "__main__": - main() + try: + main() + except KeyboardInterrupt: + print("\nExiting...") + except Exception as e: + print(f"{RED}[FATAL ERROR]{RESET} {e}") + sys.exit(1) diff --git a/protocol_prototype/crypto_utils.py b/protocol_prototype/crypto_utils.py index 07c5330..8c2e110 100644 --- a/protocol_prototype/crypto_utils.py +++ b/protocol_prototype/crypto_utils.py @@ -1,50 +1,78 @@ import os +from typing import Tuple from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import ec, utils from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature -def generate_identity_keys(): +def generate_identity_keys() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: """ Generate an ECDSA (P-256) identity key pair. - Return (private_key, public_key_bytes). - public_key_bytes is raw x||y each 32 bytes (uncompressed minus the 0x04 prefix). + + Returns: + Tuple containing: + - private_key: EllipticCurvePrivateKey object + - public_key_bytes: Raw x||y format (64 bytes, 512 bits) """ private_key = ec.generate_private_key(ec.SECP256R1()) public_numbers = private_key.public_key().public_numbers() x_bytes = public_numbers.x.to_bytes(32, byteorder='big') y_bytes = public_numbers.y.to_bytes(32, byteorder='big') - pubkey_bytes = x_bytes + y_bytes # 64 bytes + pubkey_bytes = x_bytes + y_bytes # 64 bytes total return private_key, pubkey_bytes -def load_peer_identity_key(pubkey_bytes: bytes): +def load_peer_identity_key(pubkey_bytes: bytes) -> ec.EllipticCurvePublicKey: """ - Given 64 bytes (x||y) for P-256, return a cryptography public key object. + Convert a raw public key (64 bytes, x||y format) to a cryptography public key object. + + Args: + pubkey_bytes: Raw 64-byte public key (x||y format) + + Returns: + EllipticCurvePublicKey object + + Raises: + ValueError: If the pubkey_bytes is not exactly 64 bytes """ if len(pubkey_bytes) != 64: raise ValueError("Peer identity pubkey must be exactly 64 bytes (x||y).") + x_int = int.from_bytes(pubkey_bytes[:32], byteorder='big') y_int = int.from_bytes(pubkey_bytes[32:], byteorder='big') + public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) return public_numbers.public_key() -def sign_data(private_key, data: bytes) -> bytes: +def sign_data(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> bytes: """ - Sign 'data' with ECDSA using P-256 private key. - Returns DER-encoded signature (variable length, up to ~70-72 bytes). + Sign data with ECDSA using a P-256 private key. + + Args: + private_key: EllipticCurvePrivateKey for signing + data: Bytes to sign + + Returns: + DER-encoded signature (variable length, up to ~70-72 bytes) """ signature = private_key.sign(data, ec.ECDSA(hashes.SHA256())) return signature -def verify_signature(public_key, signature: bytes, data: bytes) -> bool: +def verify_signature(public_key: ec.EllipticCurvePublicKey, signature: bytes, data: bytes) -> bool: """ - Verify DER-encoded ECDSA signature with the given public key. - Return True if valid, False otherwise. + Verify a DER-encoded ECDSA signature. + + Args: + public_key: EllipticCurvePublicKey for verification + signature: DER-encoded signature + data: Original signed data + + Returns: + True if signature is valid, False otherwise """ try: public_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) @@ -53,35 +81,62 @@ def verify_signature(public_key, signature: bytes, data: bytes) -> bool: return False -def get_ephemeral_keypair(): +def get_ephemeral_keypair() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: """ - Generate ephemeral ECDH keypair (P-256). - Return (private_key, pubkey_bytes). + Generate an ephemeral ECDH key pair (P-256). + + Returns: + Tuple containing: + - private_key: EllipticCurvePrivateKey object + - pubkey_bytes: Raw x||y format (64 bytes, 512 bits) """ private_key = ec.generate_private_key(ec.SECP256R1()) numbers = private_key.public_key().public_numbers() + x_bytes = numbers.x.to_bytes(32, 'big') y_bytes = numbers.y.to_bytes(32, 'big') - return private_key, x_bytes + y_bytes # 64 bytes + + return private_key, x_bytes + y_bytes # 64 bytes total -def compute_ecdh_shared_key(private_key, peer_pubkey_bytes: bytes) -> bytes: +def compute_ecdh_shared_key(private_key: ec.EllipticCurvePrivateKey, peer_pubkey_bytes: bytes) -> bytes: """ - Given a local ECDH private_key and the peer's ephemeral pubkey (64 bytes), - compute the shared secret. + Compute a shared secret using ECDH. + + Args: + private_key: Local ECDH private key + peer_pubkey_bytes: Peer's ephemeral public key (64 bytes, raw x||y format) + + Returns: + Shared secret bytes + + Raises: + ValueError: If peer_pubkey_bytes is not 64 bytes """ + if len(peer_pubkey_bytes) != 64: + raise ValueError("Peer public key must be 64 bytes (x||y format)") + x_int = int.from_bytes(peer_pubkey_bytes[:32], 'big') y_int = int.from_bytes(peer_pubkey_bytes[32:], 'big') + + # Create public key object from raw components peer_public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) peer_public_key = peer_public_numbers.public_key() + + # Perform key exchange shared_key = private_key.exchange(ec.ECDH(), peer_public_key) return shared_key def der_to_raw(der_sig: bytes) -> bytes: """ - Convert a DER-encoded ECDSA signature to a raw 64-byte signature (r||s), - where each component is padded to 32 bytes. + Convert a DER-encoded ECDSA signature to a raw 64-byte signature (r||s). + + Args: + der_sig: DER-encoded signature + + Returns: + Raw 64-byte signature (r||s format), with each component padded to 32 bytes """ r, s = decode_dss_signature(der_sig) r_bytes = r.to_bytes(32, byteorder='big') @@ -92,10 +147,19 @@ def der_to_raw(der_sig: bytes) -> bytes: def raw_signature_to_der(raw_sig: bytes) -> bytes: """ Convert a raw signature (64 bytes, concatenated r||s) to DER-encoded signature. + + Args: + raw_sig: Raw 64-byte signature (r||s format) + + Returns: + DER-encoded signature + + Raises: + ValueError: If raw_sig is not 64 bytes """ if len(raw_sig) != 64: raise ValueError("Raw signature must be 64 bytes (r||s).") - from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature + r = int.from_bytes(raw_sig[:32], 'big') s = int.from_bytes(raw_sig[32:], 'big') return encode_dss_signature(r, s) diff --git a/protocol_prototype/encryption.py b/protocol_prototype/encryption.py new file mode 100644 index 0000000..9aa3730 --- /dev/null +++ b/protocol_prototype/encryption.py @@ -0,0 +1,263 @@ +import os +import struct +from typing import Optional, Tuple +from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305 + +class MessageHeader: + """ + Header of an encrypted message (18 bytes total): + + Clear Text Section (4 bytes): + - flag: 16 bits (0xBEEF by default) + - data_len: 16 bits (length of encrypted payload excluding tag) + + Associated Data (14 bytes): + - retry: 8 bits (retry counter) + - connection_status: 4 bits (e.g., CRC required) + 4 bits padding + - iv/messageID: 96 bits (12 bytes) + """ + def __init__(self, flag: int, data_len: int, retry: int, connection_status: int, iv: bytes): + if not (0 <= flag < 65536): + raise ValueError("Flag must fit in 16 bits (0..65535)") + if not (0 <= data_len < 65536): + raise ValueError("Data length must fit in 16 bits (0..65535)") + if not (0 <= retry < 256): + raise ValueError("Retry must fit in 8 bits (0..255)") + if not (0 <= connection_status < 16): + raise ValueError("Connection status must fit in 4 bits (0..15)") + if len(iv) != 12: + raise ValueError("IV must be 12 bytes (96 bits)") + + self.flag = flag # 16 bits + self.data_len = data_len # 16 bits + self.retry = retry # 8 bits + self.connection_status = connection_status # 4 bits + self.iv = iv # 96 bits (12 bytes) + + def pack(self) -> bytes: + """Pack header into 18 bytes.""" + # Pack flag and data_len (4 bytes) + header = struct.pack('>H H', self.flag, self.data_len) + + # Pack retry and connection_status (2 bytes) + # connection_status in high 4 bits of second byte, 4 bits padding as zero + ad_byte = (self.connection_status & 0x0F) << 4 + ad_packed = struct.pack('>B B', self.retry, ad_byte) + + # Append IV (12 bytes) + return header + ad_packed + self.iv + + def get_associated_data(self) -> bytes: + """Get the associated data for AEAD encryption (retry, conn_status, iv).""" + # Pack retry and connection_status + ad_byte = (self.connection_status & 0x0F) << 4 + ad_packed = struct.pack('>B B', self.retry, ad_byte) + + # Append IV + return ad_packed + self.iv + + @classmethod + def unpack(cls, data: bytes) -> 'MessageHeader': + """Unpack 18 bytes into a MessageHeader object.""" + if len(data) < 18: + raise ValueError(f"Header data too short: {len(data)} bytes, expected 18") + + flag, data_len = struct.unpack('>H H', data[:4]) + retry, ad_byte = struct.unpack('>B B', data[4:6]) + connection_status = (ad_byte >> 4) & 0x0F + iv = data[6:18] + + return cls(flag, data_len, retry, connection_status, iv) + +class EncryptedMessage: + """ + Encrypted message packet format: + + - Header (18 bytes): + * flag: 16 bits + * data_len: 16 bits + * retry: 8 bits + * connection_status: 4 bits (+ 4 bits padding) + * iv/messageID: 96 bits (12 bytes) + + - Payload: variable length encrypted data + + - Footer: + * Authentication tag: 128 bits (16 bytes) + * CRC32: 32 bits (4 bytes) - optional, based on connection_status + """ + def __init__(self, plaintext: bytes, key: bytes, flag: int = 0xBEEF, + retry: int = 0, connection_status: int = 0, iv: bytes = None, + cipher_type: int = 0): + self.plaintext = plaintext + self.key = key + self.flag = flag + self.retry = retry + self.connection_status = connection_status + self.iv = iv or generate_iv(initial=True) + self.cipher_type = cipher_type # 0 = AES-256-GCM, 1 = ChaCha20-Poly1305 + + # Will be set after encryption + self.ciphertext = None + self.tag = None + self.header = None + + def encrypt(self) -> bytes: + """Encrypt the plaintext and return the full encrypted message.""" + # Create header with correct data_len (which will be set after encryption) + self.header = MessageHeader( + flag=self.flag, + data_len=0, # Will be updated after encryption + retry=self.retry, + connection_status=self.connection_status, + iv=self.iv + ) + + # Get associated data for AEAD + aad = self.header.get_associated_data() + + # Encrypt using the appropriate cipher + if self.cipher_type == 0: # AES-256-GCM + cipher = AESGCM(self.key) + ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) + elif self.cipher_type == 1: # ChaCha20-Poly1305 + cipher = ChaCha20Poly1305(self.key) + ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) + else: + raise ValueError(f"Unsupported cipher type: {self.cipher_type}") + + # Extract ciphertext and tag + self.tag = ciphertext_with_tag[-16:] + self.ciphertext = ciphertext_with_tag[:-16] + + # Update header with actual data length + self.header.data_len = len(self.ciphertext) + + # Pack everything together + packed_header = self.header.pack() + + # Check if CRC is required (based on connection_status) + if self.connection_status & 0x01: # Lowest bit indicates CRC required + import zlib + # Compute CRC32 of header + ciphertext + tag + crc = zlib.crc32(packed_header + self.ciphertext + self.tag) & 0xffffffff + crc_bytes = struct.pack('>I', crc) + return packed_header + self.ciphertext + self.tag + crc_bytes + else: + return packed_header + self.ciphertext + self.tag + + @classmethod + def decrypt(cls, data: bytes, key: bytes, cipher_type: int = 0) -> Tuple[bytes, MessageHeader]: + """ + Decrypt an encrypted message and return the plaintext and header. + + Args: + data: The full encrypted message + key: The encryption key + cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 + + Returns: + Tuple of (plaintext, header) + """ + if len(data) < 18 + 16: # Header + minimum tag size + raise ValueError("Message too short") + + # Extract header + header_bytes = data[:18] + header = MessageHeader.unpack(header_bytes) + + # Get ciphertext and tag + data_len = header.data_len + ciphertext_start = 18 + ciphertext_end = ciphertext_start + data_len + + if ciphertext_end + 16 > len(data): + raise ValueError("Message length does not match header's data_len") + + ciphertext = data[ciphertext_start:ciphertext_end] + tag = data[ciphertext_end:ciphertext_end + 16] + + # Get associated data for AEAD + aad = header.get_associated_data() + + # Combine ciphertext and tag for decryption + ciphertext_with_tag = ciphertext + tag + + # Decrypt using the appropriate cipher + try: + if cipher_type == 0: # AES-256-GCM + cipher = AESGCM(key) + plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) + elif cipher_type == 1: # ChaCha20-Poly1305 + cipher = ChaCha20Poly1305(key) + plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) + else: + raise ValueError(f"Unsupported cipher type: {cipher_type}") + + return plaintext, header + except Exception as e: + raise ValueError(f"Decryption failed: {e}") + +def generate_iv(initial: bool = False, previous_iv: bytes = None) -> bytes: + """ + Generate a 96-bit IV (12 bytes). + + Args: + initial: If True, return a random IV + previous_iv: The previous IV to increment + + Returns: + A new IV + """ + if initial or previous_iv is None: + return os.urandom(12) # 96 bits + else: + # Increment the previous IV by 1 modulo 2^96 + iv_int = int.from_bytes(previous_iv, 'big') + iv_int = (iv_int + 1) % (1 << 96) + return iv_int.to_bytes(12, 'big') + +# Convenience functions to match original API +def encrypt_message(plaintext: bytes, key: bytes, flag: int = 0xBEEF, + retry: int = 0, connection_status: int = 0, + iv: bytes = None, cipher_type: int = 0) -> bytes: + """ + Encrypt a message using the specified parameters. + + Args: + plaintext: The data to encrypt + key: The encryption key (32 bytes for AES-256-GCM, 32 bytes for ChaCha20-Poly1305) + flag: 16-bit flag value (default: 0xBEEF) + retry: 8-bit retry counter + connection_status: 4-bit connection status + iv: Optional 96-bit IV (if None, a random one will be generated) + cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 + + Returns: + The full encrypted message + """ + message = EncryptedMessage( + plaintext=plaintext, + key=key, + flag=flag, + retry=retry, + connection_status=connection_status, + iv=iv, + cipher_type=cipher_type + ) + return message.encrypt() + +def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes: + """ + Decrypt a message. + + Args: + message: The full encrypted message + key: The encryption key + cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 + + Returns: + The decrypted plaintext + """ + plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type) + return plaintext diff --git a/protocol_prototype/main.py b/protocol_prototype/main.py deleted file mode 100644 index 5596b44..0000000 --- a/protocol_prototype/main.py +++ /dev/null @@ -1,16 +0,0 @@ -# This is a sample Python script. - -# Press Shift+F10 to execute it or replace it with your code. -# Press Double Shift to search everywhere for classes, files, tool windows, actions, and settings. - - -def print_hi(name): - # Use a breakpoint in the code line below to debug your script. - print(f'Hi, {name}') # Press Ctrl+F8 to toggle the breakpoint. - - -# Press the green button in the gutter to run the script. -if __name__ == '__main__': - print_hi('PyCharm') - -# See PyCharm help at https://www.jetbrains.com/help/pycharm/ diff --git a/protocol_prototype/messages.py b/protocol_prototype/messages.py index 7194e73..98151ab 100644 --- a/protocol_prototype/messages.py +++ b/protocol_prototype/messages.py @@ -3,6 +3,7 @@ import struct import time import zlib import hashlib +from typing import Tuple, Optional def crc32_of(data: bytes) -> int: """ @@ -11,144 +12,163 @@ def crc32_of(data: bytes) -> int: return zlib.crc32(data) & 0xffffffff -# ============================================================================= -# 1) Ping Request (295 bits) -# - 256-bit nonce -# - 7-bit version -# - 32-bit CRC -# = 295 bits total -# In practice, we store 37 bytes (296 bits); 1 bit is unused. -# ============================================================================= - -def build_ping_request(version: int) -> bytes: +# --------------------------------------------------------------------------- +# PING REQUEST (new format) +# Fields (in order): +# - session_nonce: 129 bits (from the top 129 bits of 17 random bytes) +# - version: 7 bits +# - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305; for now only 0 is used) +# - CRC: 32 bits +# +# Total bits: 129 + 7 + 4 + 32 = 172 bits. We pack into 22 bytes (176 bits) with 4 spare bits. +# --------------------------------------------------------------------------- +class PingRequest: """ - Build a Ping request with: - - 256-bit nonce (32 bytes) - - 7-bit version - - 32-bit CRC - We do bit-packing. The final result is 37 bytes (296 bits), with 1 unused bit. + PING REQUEST format (172 bits / 22 bytes): + - session_nonce: 129 bits (from top 129 bits of 17 random bytes) + - version: 7 bits + - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305) + - CRC: 32 bits """ - if not (0 <= version < 128): - raise ValueError("Version must fit in 7 bits (0..127)") - - # 1) Generate 256-bit nonce - nonce = os.urandom(32) # 32 bytes = 256 bits - - # We'll build partial_data = [nonce (256 bits), version (7 bits)] as an integer - # Then compute CRC-32 over those bytes, then append 32 bits of CRC. - partial_int = int.from_bytes(nonce, 'big') << 7 # shift left 7 bits - partial_int |= version # put version in the low 7 bits - - # Convert partial to bytes - # partial is 256+7=263 bits => needs 33 bytes to store - partial_bytes = partial_int.to_bytes(33, 'big') - - # Compute CRC over partial_bytes - cval = crc32_of(partial_bytes) - - # Now combine partial_data (263 bits) with 32 bits of CRC => 295 bits - # We'll store that in a single integer - final_int = (int.from_bytes(partial_bytes, 'big') << 32) | cval - # final_int is 263+32=295 bits, needs 37 bytes to store (the last bit is unused). - final_bytes = final_int.to_bytes(37, 'big') - return final_bytes + def __init__(self, version: int, cipher: int, session_nonce: bytes = None): + if not (0 <= version < 128): + raise ValueError("Version must fit in 7 bits (0..127)") + if not (0 <= cipher < 16): + raise ValueError("Cipher must fit in 4 bits (0..15)") + + self.version = version + self.cipher = cipher + + # Generate session nonce if not provided + if session_nonce is None: + # Generate 17 random bytes + nonce_full = os.urandom(17) + # Use top 129 bits + nonce_int_full = int.from_bytes(nonce_full, 'big') + nonce_129_int = nonce_int_full >> 7 # drop lowest 7 bits + self.session_nonce = nonce_129_int.to_bytes(17, 'big') + else: + if len(session_nonce) != 17: + raise ValueError("Session nonce must be 17 bytes (136 bits)") + self.session_nonce = session_nonce + + def serialize(self) -> bytes: + """Serialize the ping request into a 22-byte packet.""" + # Convert session_nonce to integer (129 bits) + nonce_int = int.from_bytes(self.session_nonce, 'big') + + # Pack fields: shift nonce left by 11 bits, add version and cipher + partial_int = (nonce_int << 11) | (self.version << 4) | (self.cipher & 0x0F) + # This creates 129+7+4 = 140 bits; pack into 18 bytes + partial_bytes = partial_int.to_bytes(18, 'big') + + # Compute CRC over these 18 bytes + cval = crc32_of(partial_bytes) + + # Combine partial data with 32-bit CRC + final_int = (int.from_bytes(partial_bytes, 'big') << 32) | cval + return final_int.to_bytes(22, 'big') + + @classmethod + def deserialize(cls, data: bytes) -> Optional['PingRequest']: + """Deserialize a 22-byte packet into a PingRequest object.""" + if len(data) != 22: + return None + + # Extract 176-bit integer + final_int = int.from_bytes(data, 'big') + + # Extract CRC and verify + crc_in = final_int & 0xffffffff + partial_int = final_int >> 32 # 140 bits + partial_bytes = partial_int.to_bytes(18, 'big') + crc_calc = crc32_of(partial_bytes) + + if crc_calc != crc_in: + return None + + # Extract fields + cipher = partial_int & 0x0F + version = (partial_int >> 4) & 0x7F + nonce_129_int = partial_int >> 11 # 129 bits + session_nonce = nonce_129_int.to_bytes(17, 'big') + + return cls(version, cipher, session_nonce) -def parse_ping_request(data: bytes): + +# --------------------------------------------------------------------------- +# PING RESPONSE (new format) +# Fields: +# - timestamp: 32 bits (we take the lower 32 bits of the time in ms) +# - version: 7 bits +# - cipher: 4 bits +# - answer: 1 bit +# - CRC: 32 bits +# +# Total bits: 32 + 7 + 4 + 1 + 32 = 76 bits; pack into 10 bytes (80 bits) with 4 spare bits. +# --------------------------------------------------------------------------- +class PingResponse: """ - Parse a Ping request (37 bytes = 295 bits). - Returns (nonce, version) or None if invalid. + PING RESPONSE format (76 bits / 10 bytes): + - timestamp: 32 bits (milliseconds since epoch, lower 32 bits) + - version: 7 bits + - cipher: 4 bits + - answer: 1 bit (0 = no, 1 = yes) + - CRC: 32 bits """ - if len(data) != 37: - return None - - # Convert to int - val_295 = int.from_bytes(data, 'big') # 295 bits in a 37-byte integer - # Extract CRC (lowest 32 bits) - crc_in = val_295 & 0xffffffff - # Then shift right 32 bits to get partial_data - partial_val = val_295 >> 32 # 263 bits - - # Convert partial_val back to bytes - partial_bytes = partial_val.to_bytes(33, 'big') - - # Recompute CRC - crc_calc = crc32_of(partial_bytes) - if crc_calc != crc_in: - return None - - # Now parse out nonce (256 bits) and version (7 bits) - # partial_val is 263 bits - version = partial_val & 0x7f # low 7 bits - nonce_val = partial_val >> 7 # high 256 bits - nonce_bytes = nonce_val.to_bytes(32, 'big') - - return (nonce_bytes, version) - - -# ============================================================================= -# 2) Ping Response (72 bits) -# - 32-bit timestamp -# - 7-bit version + 1-bit answer => 8 bits -# - 32-bit CRC -# = 72 bits total => 9 bytes -# ============================================================================= - -def build_ping_response(version: int, answer: int) -> bytes: - """ - Build a Ping response: - - 32-bit timestamp (lowest 32 bits of current time in ms) - - 7-bit version + 1-bit answer - - 32-bit CRC - => 72 bits = 9 bytes - """ - if not (0 <= version < 128): - raise ValueError("Version must fit in 7 bits.") - if answer not in (0, 1): - raise ValueError("Answer must be 0 or 1.") - - # 32-bit timestamp = current time in ms, truncated to 32 bits - t_ms = int(time.time() * 1000) & 0xffffffff - - # partial = [timestamp (32 bits), version (7 bits), answer (1 bit)] => 40 bits - partial_val = (t_ms << 8) | ((version << 1) & 0xfe) | (answer & 0x01) - # partial_val is 40 bits => 5 bytes - partial_bytes = partial_val.to_bytes(5, 'big') - - # CRC over these 5 bytes - cval = crc32_of(partial_bytes) - - # Combine partial (40 bits) with 32 bits of CRC => 72 bits total - final_val = (int.from_bytes(partial_bytes, 'big') << 32) | cval - final_bytes = final_val.to_bytes(9, 'big') - return final_bytes - - -def parse_ping_response(data: bytes): - """ - Parse a Ping response (72 bits = 9 bytes). - Return (timestamp_ms, version, answer) or None if invalid. - """ - if len(data) != 9: - return None - - val_72 = int.from_bytes(data, 'big') # 72 bits - crc_in = val_72 & 0xffffffff - partial_val = val_72 >> 32 # 40 bits - - partial_bytes = partial_val.to_bytes(5, 'big') - crc_calc = crc32_of(partial_bytes) - if crc_calc != crc_in: - return None - - # Now parse partial_val - # partial_val = [timestamp(32 bits), version(7 bits), answer(1 bit)] - t_ms = (partial_val >> 8) & 0xffffffff - va = partial_val & 0xff # 8 bits = [7 bits version, 1 bit answer] - version = (va >> 1) & 0x7f - answer = va & 0x01 - - return (t_ms, version, answer) + def __init__(self, version: int, cipher: int, answer: int, timestamp: int = None): + if not (0 <= version < 128): + raise ValueError("Version must fit in 7 bits") + if not (0 <= cipher < 16): + raise ValueError("Cipher must fit in 4 bits") + if answer not in (0, 1): + raise ValueError("Answer must be 0 or 1") + + self.version = version + self.cipher = cipher + self.answer = answer + self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) + + def serialize(self) -> bytes: + """Serialize the ping response into a 10-byte packet.""" + # Pack timestamp, version, cipher, answer: 32+7+4+1 = 44 bits + partial_val = (self.timestamp << (7+4+1)) | (self.version << (4+1)) | (self.cipher << 1) | self.answer + partial_bytes = partial_val.to_bytes(6, 'big') # 6 bytes = 48 bits, 4 spare bits + + # Compute CRC + cval = crc32_of(partial_bytes) + + # Combine with CRC + final_val = (int.from_bytes(partial_bytes, 'big') << 32) | cval + return final_val.to_bytes(10, 'big') + + @classmethod + def deserialize(cls, data: bytes) -> Optional['PingResponse']: + """Deserialize a 10-byte packet into a PingResponse object.""" + if len(data) != 10: + return None + + # Extract 80-bit integer + final_int = int.from_bytes(data, 'big') + + # Extract CRC and verify + crc_in = final_int & 0xffffffff + partial_int = final_int >> 32 # 48 bits + partial_bytes = partial_int.to_bytes(6, 'big') + crc_calc = crc32_of(partial_bytes) + + if crc_calc != crc_in: + return None + + # Extract fields (discard 4 spare bits) + partial_int >>= 4 # now 44 bits + answer = partial_int & 0x01 + cipher = (partial_int >> 1) & 0x0F + version = (partial_int >> (1+4)) & 0x7F + timestamp = partial_int >> (1+4+7) + + return cls(version, cipher, answer, timestamp) # ============================================================================= @@ -161,53 +181,60 @@ def parse_ping_response(data: bytes): # => total 4 + 64 + 64 + 32 + 4 = 168 bytes = 1344 bits # ============================================================================= -def build_handshake_message(timestamp: int, - ephemeral_pubkey: bytes, - ephemeral_signature: bytes, - pfs_hash: bytes) -> bytes: +class Handshake: """ - Build handshake: - - 4 bytes: timestamp - - 64 bytes: ephemeral_pubkey (x||y, raw) - - 64 bytes: ephemeral_signature (r||s, raw) - - 32 bytes: pfs_hash - - 4 bytes: CRC-32 - => 168 bytes total + HANDSHAKE format (1344 bits / 168 bytes): + - timestamp: 32 bits + - ephemeral_pubkey: 512 bits (64 bytes, raw x||y format) + - ephemeral_signature: 512 bits (64 bytes, raw r||s format) + - pfs_hash: 256 bits (32 bytes) + - CRC: 32 bits """ - if len(ephemeral_pubkey) != 64: - raise ValueError("ephemeral_pubkey must be 64 bytes (raw x||y).") - if len(ephemeral_signature) != 64: - raise ValueError("ephemeral_signature must be 64 bytes (raw r||s).") - if len(pfs_hash) != 32: - raise ValueError("pfs_hash must be 32 bytes.") - - partial = struct.pack("!I", timestamp) \ - + ephemeral_pubkey \ - + ephemeral_signature \ - + pfs_hash - cval = crc32_of(partial) - return partial + struct.pack("!I", cval) - - -def parse_handshake_message(data: bytes): - """ - Parse handshake message (168 bytes). - Return (timestamp, ephemeral_pub, ephemeral_sig, pfs_hash) or None if invalid. - """ - if len(data) != 168: - return None - partial = data[:-4] # first 164 bytes - crc_in = struct.unpack("!I", data[-4:])[0] - crc_calc = crc32_of(partial) - if crc_calc != crc_in: - return None - - # Now parse fields - timestamp = struct.unpack("!I", partial[:4])[0] - ephemeral_pub = partial[4:4+64] - ephemeral_sig = partial[68:68+64] - pfs_hash = partial[132:132+32] - return (timestamp, ephemeral_pub, ephemeral_sig, pfs_hash) + def __init__(self, ephemeral_pubkey: bytes, ephemeral_signature: bytes, pfs_hash: bytes, timestamp: int = None): + if len(ephemeral_pubkey) != 64: + raise ValueError("ephemeral_pubkey must be 64 bytes (raw x||y)") + if len(ephemeral_signature) != 64: + raise ValueError("ephemeral_signature must be 64 bytes (raw r||s)") + if len(pfs_hash) != 32: + raise ValueError("pfs_hash must be 32 bytes") + + self.ephemeral_pubkey = ephemeral_pubkey + self.ephemeral_signature = ephemeral_signature + self.pfs_hash = pfs_hash + self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) + + def serialize(self) -> bytes: + """Serialize the handshake into a 168-byte packet.""" + # Pack timestamp and other fields + partial = struct.pack("!I", self.timestamp) + self.ephemeral_pubkey + self.ephemeral_signature + self.pfs_hash + + # Compute CRC + cval = crc32_of(partial) + + # Append CRC + return partial + struct.pack("!I", cval) + + @classmethod + def deserialize(cls, data: bytes) -> Optional['Handshake']: + """Deserialize a 168-byte packet into a Handshake object.""" + if len(data) != 168: + return None + + # Extract and verify CRC + partial = data[:-4] + crc_in = struct.unpack("!I", data[-4:])[0] + crc_calc = crc32_of(partial) + + if crc_calc != crc_in: + return None + + # Extract fields + timestamp = struct.unpack("!I", partial[:4])[0] + ephemeral_pubkey = partial[4:4+64] + ephemeral_signature = partial[68:68+64] + pfs_hash = partial[132:132+32] + + return cls(ephemeral_pubkey, ephemeral_signature, pfs_hash, timestamp) # ============================================================================= @@ -218,15 +245,18 @@ def parse_handshake_message(data: bytes): def compute_pfs_hash(session_number: int, shared_secret_hex: str) -> bytes: """ - Return 32 bytes (256 bits) for the PFS field. - If session_number < 0 => means no previous session => 32 zero bytes. - Otherwise => sha256( session_number (4 bytes) || shared_secret ). + Compute the PFS hash field for handshake messages: + - If no previous session (session_number < 0), return 32 zero bytes + - Otherwise, compute sha256(session_number || shared_secret) """ if session_number < 0: return b"\x00" * 32 # Convert shared_secret_hex to raw bytes secret_bytes = bytes.fromhex(shared_secret_hex) + # Pack session_number as 4 bytes sn_bytes = struct.pack("!I", session_number) + + # Compute hash return hashlib.sha256(sn_bytes + secret_bytes).digest() diff --git a/protocol_prototype/protocol.py b/protocol_prototype/protocol.py index 99e22e3..1887476 100644 --- a/protocol_prototype/protocol.py +++ b/protocol_prototype/protocol.py @@ -1,8 +1,8 @@ import random +import os import time import threading -from typing import List, Dict, Any -from crypto_utils import raw_signature_to_der +from typing import List, Dict, Any, Optional, Tuple from crypto_utils import ( generate_identity_keys, @@ -10,15 +10,20 @@ from crypto_utils import ( sign_data, verify_signature, get_ephemeral_keypair, - compute_ecdh_shared_key + compute_ecdh_shared_key, + der_to_raw, + raw_signature_to_der ) from messages import ( - build_ping_request, parse_ping_request, - build_ping_response, parse_ping_response, - build_handshake_message, parse_handshake_message, + PingRequest, PingResponse, Handshake, compute_pfs_hash ) import transmission +from encryption import ( + EncryptedMessage, MessageHeader, + generate_iv, encrypt_message, decrypt_message +) +from auto_mode import AutoMode, AutoModeConfig # ANSI colors RED = "\033[91m" @@ -30,44 +35,56 @@ RESET = "\033[0m" class IcingProtocol: def __init__(self): - # Identity keys + # Identity keys (each 512 bits when printed as hex of 64 bytes) self.identity_privkey, self.identity_pubkey = generate_identity_keys() - # Peer identity + # Peer identity for verifying ephemeral signatures self.peer_identity_pubkey_obj = None self.peer_identity_pubkey_bytes = None - # Ephemeral keys + # Ephemeral keys (our side) self.ephemeral_privkey = None self.ephemeral_pubkey = None - # Last computed shared secret (hex) + # Last computed shared secret (hex string) self.shared_secret = None - # For PFS: track session_number + last_shared_secret per peer identity - # Key: bytes(64) peer identity pubkey - # Value: (int session_number, str last_shared_secret_hex) - self.pfs_history: Dict[bytes, (int, str)] = {} + # Derived HKDF key (hex string, 256 bits) + self.hkdf_key = None + + # Negotiated cipher (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) + self.cipher_type = 0 - # Inbound messages are stored for manual or auto handling - # Each entry: { 'type': str, 'raw': bytes, 'parsed': Any, 'connection': PeerConnection } - self.inbound_messages: List[Dict[str, Any]] = [] + # For PFS: track per-peer session info (session number and last shared secret) + self.pfs_history: Dict[bytes, Tuple[int, str]] = {} - # Simple dictionary to track protocol flags + # Protocol flags self.state = { "ping_sent": False, "ping_received": False, "handshake_sent": False, "handshake_received": False, + "key_exchange_complete": False } - # Auto-responder toggle + # Auto mode for automated protocol operation + self.auto_mode = AutoMode(self) + + # Legacy auto-responder toggle (kept for backward compatibility) self.auto_responder = False - # Connections + # Active connections list self.connections = [] - # Listening port + # Inbound messages (each message is a dict with keys: type, raw, parsed, connection) + self.inbound_messages: List[Dict[str, Any]] = [] + + # Store the session nonce (17 bytes but only 129 bits are valid) from first sent or received PING + self.session_nonce: bytes = None + + # Last used IV for encrypted messages + self.last_iv: bytes = None + self.local_port = random.randint(30000, 40000) self.server_listener = transmission.ServerListener( host="127.0.0.1", @@ -84,82 +101,128 @@ class IcingProtocol: def on_new_connection(self, conn: transmission.PeerConnection): print(f"{GREEN}[IcingProtocol]{RESET} New incoming connection.") self.connections.append(conn) + + # Notify auto mode + self.auto_mode.handle_connection_established() def on_data_received(self, conn: transmission.PeerConnection, data: bytes): - """ - Called whenever data arrives on any open PeerConnection. - We'll parse and store the message, then handle automatically if auto_responder is on. - """ - # Print data size in bits, not bytes - bits_count = len(data)*8 - print(f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex())>60 else ''}") + bits_count = len(data) * 8 + print( + f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex()) > 60 else ''}") - # Attempt to parse Ping request - if len(data) == 37: - parsed = parse_ping_request(data) - if parsed: - nonce, version = parsed + # PING REQUEST (22 bytes) + if len(data) == 22: + ping_request = PingRequest.deserialize(data) + if ping_request: self.state["ping_received"] = True + + # If received cipher is not supported, force to 0 (AES-256-GCM) + if ping_request.cipher != 0 and ping_request.cipher != 1: + print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({ping_request.cipher}); forcing cipher to 0 in response.") + ping_request.cipher = 0 + + # Store cipher type for future encrypted messages + self.cipher_type = ping_request.cipher + + # Store session nonce if not already set + if self.session_nonce is None: + self.session_nonce = ping_request.session_nonce + print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from received PING.") + index = len(self.inbound_messages) msg = { "type": "PING_REQUEST", "raw": data, - "parsed": {"nonce": nonce, "version": version}, + "parsed": ping_request, "connection": conn } self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Stored inbound PING request (nonce={nonce.hex()}) at index={index}.") - - if self.auto_responder: - # Schedule an automatic response after 2 seconds - threading.Timer(2.0, self._auto_respond_ping, args=(index,)).start() - + + # Handle in auto mode (if active) + self.auto_mode.handle_ping_received(index) + + # Legacy auto-responder (for backward compatibility) + if self.auto_responder and not self.auto_mode.active: + timer = threading.Timer(2.0, self._auto_respond_ping, args=[index]) + timer.daemon = True + timer.start() return - # Attempt to parse Ping response - if len(data) == 9: - parsed = parse_ping_response(data) - if parsed: - ts, version, answer_code = parsed + # PING RESPONSE (10 bytes) + elif len(data) == 10: + ping_response = PingResponse.deserialize(data) + if ping_response: + # Store negotiated cipher type + self.cipher_type = ping_response.cipher + index = len(self.inbound_messages) msg = { "type": "PING_RESPONSE", "raw": data, - "parsed": {"timestamp": ts, "version": version, "answer_code": answer_code}, + "parsed": ping_response, "connection": conn } self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Stored inbound PING response (answer_code={answer_code}) at index={index}.") + + # Notify auto mode (if active) + self.auto_mode.handle_ping_response_received(ping_response.answer == 1) return - # Attempt to parse handshake - if len(data) == 168: - parsed = parse_handshake_message(data) - if parsed: - ts, ephemeral_pub, ephemeral_sig, pfs_hash = parsed + # HANDSHAKE message (168 bytes) + elif len(data) == 168: + handshake = Handshake.deserialize(data) + if handshake: self.state["handshake_received"] = True index = len(self.inbound_messages) msg = { "type": "HANDSHAKE", "raw": data, - "parsed": { - "ephemeral_pub": ephemeral_pub, - "ephemeral_sig": ephemeral_sig, - "timestamp": ts, - "pfs hash": pfs_hash - }, + "parsed": handshake, "connection": conn } self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Stored inbound HANDSHAKE at index={index}. ephemeral_pub={ephemeral_pub.hex()[:20]}...") - - if self.auto_responder: - # Schedule an automatic handshake "response" after 2 seconds - threading.Timer(2.0, self._auto_respond_handshake, args=(index,)).start() - + + # Notify auto mode (if active) + self.auto_mode.handle_handshake_received(index) + + # Legacy auto-responder + if self.auto_responder and not self.auto_mode.active: + timer = threading.Timer(2.0, self._auto_respond_handshake, args=[index]) + timer.daemon = True + timer.start() return - # Otherwise, unrecognized + # Check if the message might be an encrypted message (e.g. header of 18 bytes at start) + elif len(data) >= 18: + # Try to parse header + try: + header = MessageHeader.unpack(data[:18]) + # If header unpacking is successful and data length matches header expectations + expected_len = 18 + header.data_len + 16 # Header + payload + tag + + # Check if CRC is included + has_crc = (header.connection_status & 0x01) != 0 + if has_crc: + expected_len += 4 # Add CRC32 length + + if len(data) >= expected_len: + index = len(self.inbound_messages) + msg = { + "type": "ENCRYPTED_MESSAGE", + "raw": data, + "parsed": header, + "connection": conn + } + self.inbound_messages.append(msg) + print(f"{YELLOW}[NOTICE]{RESET} Stored inbound ENCRYPTED_MESSAGE at index={index}.") + + # Notify auto mode + self.auto_mode.handle_encrypted_received(index) + return + except Exception as e: + print(f"{RED}[ERROR]{RESET} Failed to parse message header: {e}") + + # Otherwise, unrecognized/malformed message. index = len(self.inbound_messages) msg = { "type": "UNKNOWN", @@ -170,8 +233,68 @@ class IcingProtocol: self.inbound_messages.append(msg) print(f"{RED}[WARNING]{RESET} Unrecognized or malformed message stored at index={index}.") + # ------------------------------------------------------------------------- - # Auto-responder helpers + # HKDF Derivation + # ------------------------------------------------------------------------- + + def derive_hkdf(self): + """ + Derives a 256-bit key using HKDF. + Uses as input keying material (IKM) the shared secret from ECDH. + The salt is computed as SHA256(session_nonce || pfs_param), where: + - session_nonce is taken from self.session_nonce (17 bytes, 129 bits) or defaults to zeros. + - pfs_param is taken from the first inbound HANDSHAKE's pfs_hash field (32 bytes) or zeros. + """ + if not self.shared_secret: + print(f"{RED}[ERROR]{RESET} No shared secret available; cannot derive HKDF key.") + return + + # IKM: shared secret converted from hex to bytes. + ikm = bytes.fromhex(self.shared_secret) + # Use stored session_nonce if available; otherwise default to zeros. + session_nonce = self.session_nonce if self.session_nonce is not None else (b"\x00" * 17) + + # Determine pfs_param from first HANDSHAKE message (if any) + pfs_param = None + for msg in self.inbound_messages: + if msg["type"] == "HANDSHAKE": + try: + handshake = msg["parsed"] + pfs_param = handshake.pfs_hash + except Exception: + pfs_param = None + break + if pfs_param is None: + print(f"{RED}[WARNING]{RESET} No HANDSHAKE found; using 32 zero bytes for pfs_param.") + pfs_param = b"\x00" * 32 # 256-bit zeros + + # Ensure both are bytes + if isinstance(session_nonce, str): + session_nonce = session_nonce.encode() + if isinstance(pfs_param, str): + pfs_param = pfs_param.encode() + + from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + hasher = hashes.Hash(hashes.SHA256()) + hasher.update(session_nonce + pfs_param) + salt_value = hasher.finalize() + + hkdf = HKDF( + algorithm=hashes.SHA256(), + length=32, # 256 bits + salt=salt_value, + info=b"", + ) + derived_key = hkdf.derive(ikm) + self.hkdf_key = derived_key.hex() + self.state["key_exchange_complete"] = True + print(f"{GREEN}[HKDF]{RESET} Derived HKDF key: {self.hkdf_key}") + return True + + # ------------------------------------------------------------------------- + # Legacy Auto-responder helpers (kept for backward compatibility) # ------------------------------------------------------------------------- def _auto_respond_ping(self, index: int): @@ -179,7 +302,7 @@ class IcingProtocol: Called by a Timer to respond automatically to a PING_REQUEST after 2s. """ print(f"{BLUE}[AUTO]{RESET} Delayed responding to PING at index={index}") - self.respond_to_ping(index, answer_code=0) + self.respond_to_ping(index, answer=1) # Accept by default self.show_state() def _auto_respond_handshake(self, index: int): @@ -205,6 +328,140 @@ class IcingProtocol: # 4) Show final state self.show_state() + # ------------------------------------------------------------------------- + # Public Methods for Auto Mode Management + # ------------------------------------------------------------------------- + + def start_auto_mode(self): + """Start the automatic protocol operation mode.""" + self.auto_mode.start() + + def stop_auto_mode(self): + """Stop the automatic protocol operation mode.""" + self.auto_mode.stop() + + def configure_auto_mode(self, **kwargs): + """ + Configure the automatic mode parameters. + + Args: + **kwargs: Configuration parameters to set. Supported parameters: + - ping_response_accept: bool, whether to accept incoming pings + - ping_auto_initiate: bool, whether to initiate pings on connection + - ping_retry_count: int, number of ping retries + - ping_retry_delay: float, seconds between ping retries + - ping_timeout: float, seconds to wait for ping response + - preferred_cipher: int, preferred cipher (0=AES-GCM, 1=ChaCha20-Poly1305) + - handshake_retry_count: int, number of handshake retries + - handshake_retry_delay: float, seconds between handshake retries + - handshake_timeout: float, seconds to wait for handshake + - auto_message_enabled: bool, whether to auto-send messages + - message_interval: float, seconds between auto messages + - message_content: str, default message content + - active_mode: bool, whether to actively initiate protocol + """ + for key, value in kwargs.items(): + if hasattr(self.auto_mode.config, key): + setattr(self.auto_mode.config, key, value) + print(f"{BLUE}[CONFIG]{RESET} Set auto mode {key} = {value}") + else: + print(f"{RED}[ERROR]{RESET} Unknown auto mode configuration parameter: {key}") + + def get_auto_mode_config(self): + """Return the current auto mode configuration.""" + return self.auto_mode.config + + def queue_auto_message(self, message: str): + """ + Add a message to the auto-send queue. + + Args: + message: Message text to send + """ + self.auto_mode.queue_message(message) + + def toggle_auto_mode_logging(self): + """ + Toggle detailed logging for auto mode. + When enabled, will show more information about state transitions and decision making. + """ + if not hasattr(self.auto_mode, 'verbose_logging'): + self.auto_mode.verbose_logging = True + else: + self.auto_mode.verbose_logging = not self.auto_mode.verbose_logging + + status = "enabled" if self.auto_mode.verbose_logging else "disabled" + print(f"{BLUE}[AUTO-LOG]{RESET} Detailed logging {status}") + + def debug_message(self, index: int): + """ + Debug a message in the inbound message queue. + Prints detailed information about the message. + + Args: + index: The index of the message in the inbound_messages queue + """ + if index < 0 or index >= len(self.inbound_messages): + print(f"{RED}[ERROR]{RESET} Invalid message index {index}") + return + + msg = self.inbound_messages[index] + print(f"\n{YELLOW}=== Message Debug [{index}] ==={RESET}") + print(f"Type: {msg['type']}") + print(f"Length: {len(msg['raw'])} bytes = {len(msg['raw'])*8} bits") + print(f"Raw data: {msg['raw'].hex()}") + + if msg['parsed'] is not None: + print(f"\n{YELLOW}--- Parsed Data ---{RESET}") + if msg['type'] == 'PING_REQUEST': + ping = msg['parsed'] + print(f"Version: {ping.version}") + print(f"Cipher: {ping.cipher} ({'AES-256-GCM' if ping.cipher == 0 else 'ChaCha20-Poly1305' if ping.cipher == 1 else 'Unknown'})") + print(f"Session nonce: {ping.session_nonce.hex()}") + print(f"CRC32: {ping.crc32:08x}") + + elif msg['type'] == 'PING_RESPONSE': + resp = msg['parsed'] + print(f"Version: {resp.version}") + print(f"Cipher: {resp.cipher} ({'AES-256-GCM' if resp.cipher == 0 else 'ChaCha20-Poly1305' if resp.cipher == 1 else 'Unknown'})") + print(f"Answer: {resp.answer} ({'Accept' if resp.answer == 1 else 'Reject'})") + print(f"CRC32: {resp.crc32:08x}") + + elif msg['type'] == 'HANDSHAKE': + hs = msg['parsed'] + print(f"Ephemeral pubkey: {hs.ephemeral_pubkey.hex()[:16]}...") + print(f"Ephemeral signature: {hs.ephemeral_signature.hex()[:16]}...") + print(f"PFS hash: {hs.pfs_hash.hex()[:16]}...") + print(f"Timestamp: {hs.timestamp}") + print(f"CRC32: {hs.crc32:08x}") + + elif msg['type'] == 'ENCRYPTED_MESSAGE': + header = msg['parsed'] + print(f"Flag: 0x{header.flag:04x}") + print(f"Data length: {header.data_len} bytes") + print(f"Retry: {header.retry}") + print(f"Connection status: {header.connection_status} ({'CRC included' if header.connection_status & 0x01 else 'No CRC'})") + print(f"IV: {header.iv.hex()}") + + # Calculate expected message size + expected_len = 18 + header.data_len + 16 # Header + payload + tag + if header.connection_status & 0x01: + expected_len += 4 # Add CRC + + print(f"Expected total length: {expected_len} bytes") + print(f"Actual length: {len(msg['raw'])} bytes") + + # If we have a key, try to decrypt + if self.hkdf_key: + print("\nAttempting decryption...") + try: + key = bytes.fromhex(self.hkdf_key) + plaintext = decrypt_message(msg['raw'], key, self.cipher_type) + print(f"Decrypted: {plaintext.decode('utf-8')}") + except Exception as e: + print(f"Decryption failed: {e}") + print() + # ------------------------------------------------------------------------- # Public Methods # ------------------------------------------------------------------------- @@ -213,6 +470,9 @@ class IcingProtocol: conn = transmission.connect_to_peer("127.0.0.1", port, self.on_data_received) self.connections.append(conn) print(f"{GREEN}[IcingProtocol]{RESET} Outgoing connection to port {port} established.") + + # Notify auto mode + self.auto_mode.handle_connection_established() def set_peer_identity(self, peer_pubkey_hex: str): pubkey_bytes = bytes.fromhex(peer_pubkey_hex) @@ -224,127 +484,167 @@ class IcingProtocol: self.ephemeral_privkey, self.ephemeral_pubkey = get_ephemeral_keypair() print(f"{GREEN}[IcingProtocol]{RESET} Generated ephemeral key pair: pubkey={self.ephemeral_pubkey.hex()[:16]}...") - def send_ping_request(self): + # Send PING (session discovery and cipher negotiation) + def send_ping_request(self, cipher_type=0): + """ + Send a ping request to the first connected peer. + + Args: + cipher_type: Preferred cipher type (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) + """ if not self.connections: print(f"{RED}[ERROR]{RESET} No active connections.") - return - pkt = build_ping_request(version=0) + return False + + # Validate cipher type + if cipher_type not in (0, 1): + print(f"{YELLOW}[WARNING]{RESET} Invalid cipher type {cipher_type}, defaulting to AES-256-GCM (0)") + cipher_type = 0 + + # Create ping request with specified cipher + ping_request = PingRequest(version=0, cipher=cipher_type) + + # Store session nonce if not already set + if self.session_nonce is None: + self.session_nonce = ping_request.session_nonce + print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from sent PING.") + + # Serialize and send + pkt = ping_request.serialize() self._send_packet(self.connections[0], pkt, "PING_REQUEST") self.state["ping_sent"] = True + return True def send_handshake(self): """ Build and send handshake: - - 32-bit timestamp - ephemeral_pubkey (64 bytes, raw x||y) - ephemeral_signature (64 bytes, raw r||s) - pfs_hash (32 bytes) - - 32-bit CRC + - timestamp (32 bits) + - CRC (32 bits) """ if not self.connections: print(f"{RED}[ERROR]{RESET} No active connections.") - return + return False if not self.ephemeral_privkey or not self.ephemeral_pubkey: print(f"{RED}[ERROR]{RESET} Ephemeral keys not generated.") - return + return False if self.peer_identity_pubkey_bytes is None: print(f"{RED}[ERROR]{RESET} Peer identity not set; needed for PFS tracking.") - return + return False - # 1) Sign ephemeral_pubkey as r||s - # Instead of DER, we do raw r||s each 32 bytes + # 1) Sign ephemeral_pubkey using identity key sig_der = sign_data(self.identity_privkey, self.ephemeral_pubkey) - # Convert DER -> (r, s) -> raw 64 bytes - # Quick approach to parse DER using cryptography, or do a custom parse - from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature - r_int, s_int = decode_dss_signature(sig_der) - r_bytes = r_int.to_bytes(32, 'big') - s_bytes = s_int.to_bytes(32, 'big') - raw_signature = r_bytes + s_bytes # 64 bytes + # Convert DER signature to raw r||s format (64 bytes) + raw_signature = der_to_raw(sig_der) - # 2) PFS hash + # 2) Compute PFS hash session_number, last_secret_hex = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) pfs = compute_pfs_hash(session_number, last_secret_hex) - # 3) Build handshake - timestamp_32 = int(time.time() * 1000) & 0xffffffff - pkt = build_handshake_message( - timestamp_32, - self.ephemeral_pubkey, # 64 bytes raw - raw_signature, # 64 bytes raw - pfs # 32 bytes + # 3) Create handshake object + handshake = Handshake( + ephemeral_pubkey=self.ephemeral_pubkey, + ephemeral_signature=raw_signature, + pfs_hash=pfs ) - # 4) Send + # 4) Serialize and send + pkt = handshake.serialize() self._send_packet(self.connections[0], pkt, "HANDSHAKE") self.state["handshake_sent"] = True + return True def enable_auto_responder(self, enable: bool): + """ + Legacy method for enabling/disabling auto responder. + For new code, use start_auto_mode() and stop_auto_mode() instead. + """ self.auto_responder = enable - print(f"{BLUE}[AUTO]{RESET} Auto responder set to {enable}.") + print(f"{YELLOW}[LEGACY]{RESET} Auto responder set to {enable}. Consider using auto_mode instead.") # ------------------------------------------------------------------------- # Manual Responses # ------------------------------------------------------------------------- - def respond_to_ping(self, index: int, answer_code: int): + def respond_to_ping(self, index: int, answer: int): """ - Manually respond to an inbound PING_REQUEST in inbound_messages[index]. + Respond to a ping request with the specified answer (0 = no, 1 = yes). + If answer is 1, we accept the connection and use the cipher specified in the request. """ if index < 0 or index >= len(self.inbound_messages): print(f"{RED}[ERROR]{RESET} Invalid index {index}.") - return + return False msg = self.inbound_messages[index] if msg["type"] != "PING_REQUEST": print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a PING_REQUEST.") - return + return False + + ping_request = msg["parsed"] + version = ping_request.version + cipher = ping_request.cipher + + # Force cipher to 0 or 1 (only AES-256-GCM and ChaCha20-Poly1305 are supported) + if cipher != 0 and cipher != 1: + print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({cipher}); forcing cipher to 0 in response.") + cipher = 0 + + # Store the negotiated cipher type if we're accepting + if answer == 1: + self.cipher_type = cipher - version = msg["parsed"]["version"] conn = msg["connection"] - resp = build_ping_response(version, answer_code) + # Create ping response + ping_response = PingResponse(version, cipher, answer) + resp = ping_response.serialize() self._send_packet(conn, resp, "PING_RESPONSE") - print(f"{BLUE}[MANUAL]{RESET} Responded to ping with answer_code={answer_code}.") + print(f"{BLUE}[MANUAL]{RESET} Responded to ping with answer={answer}.") + return True def generate_ecdhe(self, index: int): """ - Formerly 'respond_to_handshake'. Verifies the inbound ephemeral signature - and computes the ECDH shared secret, updating PFS history. + Process a handshake message: + 1. Verify the ephemeral signature + 2. Compute the ECDH shared secret + 3. Update PFS history """ if index < 0 or index >= len(self.inbound_messages): print(f"{RED}[ERROR]{RESET} Invalid index {index}.") - return + return False msg = self.inbound_messages[index] if msg["type"] != "HANDSHAKE": print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a HANDSHAKE.") - return + return False - ephemeral_pub = msg["parsed"]["ephemeral_pub"] - ephemeral_sig = msg["parsed"]["ephemeral_sig"] + handshake = msg["parsed"] + + # Convert raw signature to DER for verification + raw_sig = handshake.ephemeral_signature + sig_der = raw_signature_to_der(raw_sig) - # Use our raw_signature_to_der wrapper only if signature is 64 bytes. - # Otherwise, assume the signature is already DER-encoded. - from crypto_utils import raw_signature_to_der - if len(ephemeral_sig) == 64: - sig_der = raw_signature_to_der(ephemeral_sig) - else: - sig_der = ephemeral_sig - - ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, ephemeral_pub) + # Verify signature + ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, handshake.ephemeral_pubkey) if not ok: print(f"{RED}[ERROR]{RESET} Ephemeral signature invalid.") - return + return False print(f"{GREEN}[OK]{RESET} Ephemeral signature verified.") + # Check if we have ephemeral keys if not self.ephemeral_privkey: print(f"{YELLOW}[WARN]{RESET} No ephemeral_privkey available, cannot compute shared secret.") - return - shared = compute_ecdh_shared_key(self.ephemeral_privkey, ephemeral_pub) + return False + + # Compute ECDH shared secret + shared = compute_ecdh_shared_key(self.ephemeral_privkey, handshake.ephemeral_pubkey) self.shared_secret = shared.hex() print(f"{GREEN}[OK]{RESET} Computed ECDH shared key = {self.shared_secret}") + # Update PFS history old_session, _ = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) new_session = 1 if old_session < 0 else old_session + 1 self.pfs_history[self.peer_identity_pubkey_bytes] = (new_session, self.shared_secret) + return True # ------------------------------------------------------------------------- # Utility @@ -373,11 +673,30 @@ class IcingProtocol: print(f"\nShared Secret: {self.shared_secret if self.shared_secret else '[None]'}") + if self.hkdf_key: + print(f"HKDF Derived Key: {self.hkdf_key} (size: {len(self.hkdf_key)*8} bits)") + else: + print("HKDF Derived Key: [None]") + + print(f"Negotiated Cipher: {'AES-256-GCM' if self.cipher_type == 0 else 'ChaCha20-Poly1305'} (code: {self.cipher_type})") + + if self.session_nonce: + print(f"Session Nonce: {self.session_nonce.hex()} (129 bits)") + else: + print("Session Nonce: [None]") + + if self.last_iv: + print(f"Last IV: {self.last_iv.hex()} (96 bits)") + else: + print("Last IV: [None]") + print("\nProtocol Flags:") for k, v in self.state.items(): print(f" {k}: {v}") - print("\nAuto Responder:", self.auto_responder) + print("\nAuto Mode Active:", self.auto_mode.active) + print("Auto Mode State:", self.auto_mode.state) + print("Legacy Auto Responder:", self.auto_responder) print("\nActive Connections:") for i, c in enumerate(self.connections): @@ -389,9 +708,108 @@ class IcingProtocol: print() def stop(self): + """Stop the protocol and clean up resources.""" + # Stop auto mode first + self.auto_mode.stop() + + # Stop server listener self.server_listener.stop() + + # Close all connections for c in self.connections: c.close() self.connections.clear() self.inbound_messages.clear() print(f"{RED}[STOP]{RESET} Protocol stopped.") + + # ------------------------------------------------------------------------- + # Encrypted Messaging + # ------------------------------------------------------------------------- + + def send_encrypted_message(self, plaintext: str): + """ + Encrypts and sends a message using the derived HKDF key and negotiated cipher. + The message format is: + - Header (18 bytes): flag, data_len, retry, connection_status, IV + - Payload: variable length encrypted data + - Footer: Authentication tag (16 bytes) + optional CRC32 (4 bytes) + """ + if not self.connections: + print(f"{RED}[ERROR]{RESET} No active connections.") + return False + if not self.hkdf_key: + print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot encrypt message.") + return False + + # Get the encryption key + key = bytes.fromhex(self.hkdf_key) + + # Convert plaintext to bytes + plaintext_bytes = plaintext.encode('utf-8') + + # Generate or increment the IV + if self.last_iv is None: + # First message, generate random IV + iv = generate_iv(initial=True) + else: + # Subsequent message, increment previous IV + iv = generate_iv(initial=False, previous_iv=self.last_iv) + + # Store the new IV + self.last_iv = iv + + # Create encrypted message (connection_status 0 = no CRC) + encrypted = encrypt_message( + plaintext=plaintext_bytes, + key=key, + flag=0xBEEF, # Default flag + retry=0, + connection_status=0, # No CRC + iv=iv, + cipher_type=self.cipher_type + ) + + # Send the encrypted message + self._send_packet(self.connections[0], encrypted, "ENCRYPTED_MESSAGE") + print(f"{GREEN}[SEND_ENCRYPTED]{RESET} Encrypted message sent.") + return True + + def decrypt_received_message(self, index: int): + """ + Decrypt a received encrypted message using the HKDF key and negotiated cipher. + """ + if index < 0 or index >= len(self.inbound_messages): + print(f"{RED}[ERROR]{RESET} Invalid message index.") + return None + + msg = self.inbound_messages[index] + if msg["type"] != "ENCRYPTED_MESSAGE": + print(f"{RED}[ERROR]{RESET} Message at index {index} is not an ENCRYPTED_MESSAGE.") + return None + + # Get the encrypted message + encrypted = msg["raw"] + + if not self.hkdf_key: + print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot decrypt message.") + return None + + # Get the encryption key + key = bytes.fromhex(self.hkdf_key) + + try: + # Decrypt the message + plaintext = decrypt_message(encrypted, key, self.cipher_type) + + # Convert to string + plaintext_str = plaintext.decode('utf-8') + + # Update last IV from the header + header = MessageHeader.unpack(encrypted[:18]) + self.last_iv = header.iv + + print(f"{GREEN}[DECRYPTED]{RESET} Decrypted message: {plaintext_str}") + return plaintext_str + except Exception as e: + print(f"{RED}[ERROR]{RESET} Decryption failed: {e}") + return None