From 56183f194891cf47e5c38f650160b9f4ce50a069 Mon Sep 17 00:00:00 2001 From: stcb <21@stcb.cc> Date: Wed, 14 May 2025 15:03:47 +0300 Subject: [PATCH] Cleaning --- protocol_prototype/auto_mode.py | 430 --------------- protocol_prototype/cli.py | 328 ------------ protocol_prototype/crypto_utils.py | 165 ------ protocol_prototype/encryption.py | 263 ---------- protocol_prototype/messages.py | 262 ---------- protocol_prototype/protocol.py | 815 ----------------------------- protocol_prototype/transmission.py | 100 ---- 7 files changed, 2363 deletions(-) delete mode 100644 protocol_prototype/auto_mode.py delete mode 100644 protocol_prototype/cli.py delete mode 100644 protocol_prototype/crypto_utils.py delete mode 100644 protocol_prototype/encryption.py delete mode 100644 protocol_prototype/messages.py delete mode 100644 protocol_prototype/protocol.py delete mode 100644 protocol_prototype/transmission.py diff --git a/protocol_prototype/auto_mode.py b/protocol_prototype/auto_mode.py deleted file mode 100644 index 690ba9c..0000000 --- a/protocol_prototype/auto_mode.py +++ /dev/null @@ -1,430 +0,0 @@ -import time -import threading -import queue -from typing import Optional, Dict, Any, List, Callable, Tuple - -# ANSI colors for logging -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -RESET = "\033[0m" - -class AutoModeConfig: - """Configuration parameters for the automatic mode behavior.""" - def __init__(self): - # Ping behavior - self.ping_response_accept = True # Whether to accept incoming pings - self.ping_auto_initiate = False # Whether to initiate pings when connected - self.ping_retry_count = 3 # Number of ping retries - self.ping_retry_delay = 5.0 # Seconds between ping retries - self.ping_timeout = 10.0 # Seconds to wait for ping response - self.preferred_cipher = 0 # 0=AES-GCM, 1=ChaCha20-Poly1305 - - # Handshake behavior - self.handshake_retry_count = 3 # Number of handshake retries - self.handshake_retry_delay = 5.0 # Seconds between handshake retries - self.handshake_timeout = 10.0 # Seconds to wait for handshake - - # Messaging behavior - self.auto_message_enabled = False # Whether to auto-send messages - self.message_interval = 10.0 # Seconds between auto messages - self.message_content = "Hello, secure world!" # Default message - - # General behavior - self.active_mode = False # If true, initiates protocol instead of waiting - - -class AutoMode: - """ - Manages automated behavior for the Icing protocol. - Handles automatic progression through the protocol stages: - 1. Connection setup - 2. Ping/discovery - 3. Key exchange - 4. Encrypted communication - """ - - def __init__(self, protocol_interface): - """ - Initialize the AutoMode manager. - - Args: - protocol_interface: An object implementing the required protocol methods - """ - self.protocol = protocol_interface - self.config = AutoModeConfig() - self.active = False - self.state = "idle" - - # Message queue for automated sending - self.message_queue = queue.Queue() - - # Tracking variables - self.ping_attempts = 0 - self.handshake_attempts = 0 - self.last_action_time = 0 - self.timer_tasks = [] # List of active timer tasks (for cleanup) - - def start(self): - """Start the automatic mode.""" - if self.active: - return - - self.active = True - self.state = "idle" - self.ping_attempts = 0 - self.handshake_attempts = 0 - self.last_action_time = time.time() - - self._log_info("Automatic mode started") - - # Start in active mode if configured - if self.config.active_mode and self.protocol.connections: - self._start_ping_sequence() - - def stop(self): - """Stop the automatic mode and clean up any pending tasks.""" - if not self.active: - return - - # Cancel any pending timers - for timer in self.timer_tasks: - if timer.is_alive(): - timer.cancel() - self.timer_tasks = [] - - self.active = False - self.state = "idle" - self._log_info("Automatic mode stopped") - - def handle_connection_established(self): - """Called when a new connection is established.""" - if not self.active: - return - - self._log_info("Connection established") - - # If in active mode, start pinging - if self.config.active_mode: - self._start_ping_sequence() - - def handle_ping_received(self, index: int): - """ - Handle a received ping request. - - Args: - index: Index of the ping request in the protocol's inbound message queue - """ - if not self.active or not self._is_valid_message_index(index): - return - - self._log_info(f"Ping request received (index={index})") - - # Automatically respond to ping if configured to accept - if self.config.ping_response_accept: - self._log_info(f"Auto-responding to ping with accept={self.config.ping_response_accept}") - try: - # Schedule the response with a small delay to simulate real behavior - timer = threading.Timer(0.5, self._respond_to_ping, args=[index]) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - except Exception as e: - self._log_error(f"Failed to auto-respond to ping: {e}") - - def handle_ping_response_received(self, accepted: bool): - """ - Handle a received ping response. - - Args: - accepted: Whether the ping was accepted - """ - if not self.active: - return - - self.ping_attempts = 0 # Reset ping attempts counter - - if accepted: - self._log_info("Ping accepted! Proceeding with handshake") - # Send handshake if not already done - if self.state != "handshake_sent": - self._ensure_ephemeral_keys() - self._start_handshake_sequence() - else: - self._log_info("Ping rejected by peer. Stopping auto-protocol sequence.") - self.state = "idle" - - def handle_handshake_received(self, index: int): - """ - Handle a received handshake. - - Args: - index: Index of the handshake in the protocol's inbound message queue - """ - if not self.active or not self._is_valid_message_index(index): - return - - self._log_info(f"Handshake received (index={index})") - - try: - # Ensure we have ephemeral keys - self._ensure_ephemeral_keys() - - # Process the handshake (compute ECDH) - self.protocol.generate_ecdhe(index) - - # Derive HKDF key - self.protocol.derive_hkdf() - - # If we haven't sent our handshake yet, send it - if self.state != "handshake_sent": - timer = threading.Timer(0.5, self.protocol.send_handshake) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - self.state = "handshake_sent" - else: - self.state = "key_exchange_complete" - - # Start sending queued messages if auto messaging is enabled - if self.config.auto_message_enabled: - self._start_message_sequence() - - except Exception as e: - self._log_error(f"Failed to process handshake: {e}") - - def handle_encrypted_received(self, index: int): - """ - Handle a received encrypted message. - - Args: - index: Index of the encrypted message in the protocol's inbound message queue - """ - if not self.active or not self._is_valid_message_index(index): - return - - # Try to decrypt automatically - try: - plaintext = self.protocol.decrypt_received_message(index) - self._log_info(f"Auto-decrypted message: {plaintext}") - except Exception as e: - self._log_error(f"Failed to auto-decrypt message: {e}") - - def queue_message(self, message: str): - """ - Add a message to the auto-send queue. - - Args: - message: Message text to send - """ - self.message_queue.put(message) - self._log_info(f"Message queued for sending: {message}") - - # If we're in the right state, start sending messages - if self.active and self.state == "key_exchange_complete" and self.config.auto_message_enabled: - self._process_message_queue() - - def _start_ping_sequence(self): - """Start the ping sequence to discover the peer.""" - if self.ping_attempts >= self.config.ping_retry_count: - self._log_warning(f"Maximum ping attempts ({self.config.ping_retry_count}) reached") - self.state = "idle" - return - - self.state = "pinging" - self.ping_attempts += 1 - - self._log_info(f"Sending ping request (attempt {self.ping_attempts}/{self.config.ping_retry_count})") - try: - self.protocol.send_ping_request(self.config.preferred_cipher) - self.last_action_time = time.time() - - # Schedule next ping attempt if needed - timer = threading.Timer( - self.config.ping_retry_delay, - self._check_ping_response - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - - except Exception as e: - self._log_error(f"Failed to send ping: {e}") - - def _check_ping_response(self): - """Check if we got a ping response, retry if not.""" - if not self.active or self.state != "pinging": - return - - # If we've waited long enough for a response, retry - if time.time() - self.last_action_time >= self.config.ping_timeout: - self._log_warning("No ping response received, retrying") - self._start_ping_sequence() - - def _respond_to_ping(self, index: int): - """ - Respond to a ping request. - - Args: - index: Index of the ping request in the inbound messages - """ - if not self.active or not self._is_valid_message_index(index): - return - - try: - answer = 1 if self.config.ping_response_accept else 0 - self.protocol.respond_to_ping(index, answer) - - if answer == 1: - # If we accepted, we should expect a handshake - self.state = "accepted_ping" - self._ensure_ephemeral_keys() - - # Set a timer to send our handshake if we don't receive one - timer = threading.Timer( - self.config.handshake_timeout, - self._check_handshake_received - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - self.last_action_time = time.time() - - except Exception as e: - self._log_error(f"Failed to respond to ping: {e}") - - def _check_handshake_received(self): - """Check if we've received a handshake after accepting a ping.""" - if not self.active or self.state != "accepted_ping": - return - - # If we've waited long enough and haven't received a handshake, initiate one - if time.time() - self.last_action_time >= self.config.handshake_timeout: - self._log_warning("No handshake received after accepting ping, initiating handshake") - self._start_handshake_sequence() - - def _start_handshake_sequence(self): - """Start the handshake sequence.""" - if self.handshake_attempts >= self.config.handshake_retry_count: - self._log_warning(f"Maximum handshake attempts ({self.config.handshake_retry_count}) reached") - self.state = "idle" - return - - self.state = "handshake_sent" - self.handshake_attempts += 1 - - self._log_info(f"Sending handshake (attempt {self.handshake_attempts}/{self.config.handshake_retry_count})") - try: - self.protocol.send_handshake() - self.last_action_time = time.time() - - # Schedule handshake retry check - timer = threading.Timer( - self.config.handshake_retry_delay, - self._check_handshake_response - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - - except Exception as e: - self._log_error(f"Failed to send handshake: {e}") - - def _check_handshake_response(self): - """Check if we've completed the key exchange, retry handshake if not.""" - if not self.active or self.state != "handshake_sent": - return - - # If we've waited long enough for a response, retry - if time.time() - self.last_action_time >= self.config.handshake_timeout: - self._log_warning("No handshake response received, retrying") - self._start_handshake_sequence() - - def _start_message_sequence(self): - """Start the automated message sending sequence.""" - if not self.config.auto_message_enabled: - return - - self._log_info("Starting automated message sequence") - - # Add the default message if queue is empty - if self.message_queue.empty(): - self.message_queue.put(self.config.message_content) - - # Start processing the queue - self._process_message_queue() - - def _process_message_queue(self): - """Process messages in the queue and send them.""" - if not self.active or self.state != "key_exchange_complete" or not self.config.auto_message_enabled: - return - - if not self.message_queue.empty(): - message = self.message_queue.get() - self._log_info(f"Sending queued message: {message}") - - try: - self.protocol.send_encrypted_message(message) - - # Schedule next message send - timer = threading.Timer( - self.config.message_interval, - self._process_message_queue - ) - timer.daemon = True - timer.start() - self.timer_tasks.append(timer) - - except Exception as e: - self._log_error(f"Failed to send queued message: {e}") - # Put the message back in the queue - self.message_queue.put(message) - - def _ensure_ephemeral_keys(self): - """Ensure ephemeral keys are generated if needed.""" - if not hasattr(self.protocol, 'ephemeral_pubkey') or self.protocol.ephemeral_pubkey is None: - self._log_info("Generating ephemeral keys") - self.protocol.generate_ephemeral_keys() - - def _is_valid_message_index(self, index: int) -> bool: - """ - Check if a message index is valid in the protocol's inbound_messages queue. - - Args: - index: The index to check - - Returns: - bool: True if the index is valid, False otherwise - """ - if not hasattr(self.protocol, 'inbound_messages'): - self._log_error("Protocol has no inbound_messages attribute") - return False - - if index < 0 or index >= len(self.protocol.inbound_messages): - self._log_error(f"Invalid message index: {index}") - return False - - return True - - # Helper methods for logging - def _log_info(self, message: str): - print(f"{BLUE}[AUTO]{RESET} {message}") - if hasattr(self, 'verbose_logging') and self.verbose_logging: - state_info = f"(state={self.state})" - if 'pinging' in self.state and hasattr(self, 'ping_attempts'): - state_info += f", attempts={self.ping_attempts}/{self.config.ping_retry_count}" - elif 'handshake' in self.state and hasattr(self, 'handshake_attempts'): - state_info += f", attempts={self.handshake_attempts}/{self.config.handshake_retry_count}" - print(f"{BLUE}[AUTO-DETAIL]{RESET} {state_info}") - - def _log_warning(self, message: str): - print(f"{YELLOW}[AUTO-WARN]{RESET} {message}") - if hasattr(self, 'verbose_logging') and self.verbose_logging: - timer_info = f"Active timers: {len(self.timer_tasks)}" - print(f"{YELLOW}[AUTO-WARN-DETAIL]{RESET} {timer_info}") - - def _log_error(self, message: str): - print(f"{RED}[AUTO-ERROR]{RESET} {message}") - if hasattr(self, 'verbose_logging') and self.verbose_logging: - print(f"{RED}[AUTO-ERROR-DETAIL]{RESET} Current state: {self.state}, Active: {self.active}") \ No newline at end of file diff --git a/protocol_prototype/cli.py b/protocol_prototype/cli.py deleted file mode 100644 index 53c3f01..0000000 --- a/protocol_prototype/cli.py +++ /dev/null @@ -1,328 +0,0 @@ -import sys -import argparse -import shlex -from protocol import IcingProtocol - -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -MAGENTA = "\033[95m" -CYAN = "\033[96m" -RESET = "\033[0m" - -def print_help(): - """Display all available commands.""" - print(f"\n{YELLOW}=== Available Commands ==={RESET}") - print(f"\n{CYAN}Basic Protocol Commands:{RESET}") - print(" help - Show this help message") - print(" peer_id - Set peer identity public key") - print(" connect - Connect to a peer at the specified port") - print(" show_state - Display current protocol state") - print(" exit - Exit the program") - - print(f"\n{CYAN}Manual Protocol Operation:{RESET}") - print(" generate_ephemeral_keys - Generate ephemeral ECDH keys") - print(" send_ping [cipher] - Send PING request (cipher: 0=AES-GCM, 1=ChaCha20-Poly1305, default: 0)") - print(" respond_ping <0|1> - Respond to a PING (0=reject, 1=accept)") - print(" send_handshake - Send handshake with ephemeral keys") - print(" generate_ecdhe - Process handshake at specified index") - print(" derive_hkdf - Derive encryption key using HKDF") - print(" send_encrypted - Encrypt and send a message") - print(" decrypt <index> - Decrypt received message at index") - - print(f"\n{CYAN}Automatic Mode Commands:{RESET}") - print(" auto start - Start automatic mode") - print(" auto stop - Stop automatic mode") - print(" auto status - Show current auto mode status and configuration") - print(" auto config <param> <value> - Configure auto mode parameters") - print(" auto config list - Show all configurable parameters") - print(" auto message <text> - Queue message for automatic sending") - print(" auto passive - Configure as passive peer (responds to pings but doesn't initiate)") - print(" auto active - Configure as active peer (initiates protocol)") - print(" auto log - Toggle detailed logging for auto mode") - - print(f"\n{CYAN}Debugging Commands:{RESET}") - print(" debug_message <index> - Display detailed information about a message in the queue") - - print(f"\n{CYAN}Legacy Commands:{RESET}") - print(" auto_responder <on|off> - Enable/disable legacy auto responder (deprecated)") - - -def main(): - protocol = IcingProtocol() - - print(f"{YELLOW}\n======================================") - print(" Icing Protocol - Secure Communication ") - print("======================================\n" + RESET) - print(f"Listening on port: {protocol.local_port}") - print(f"Your identity public key (hex): {protocol.identity_pubkey.hex()}") - print_help() - - while True: - try: - line = input(f"{MAGENTA}Cmd>{RESET} ").strip() - except EOFError: - break - if not line: - continue - - parts = shlex.split(line) # Handle quoted arguments properly - cmd = parts[0].lower() - - try: - # Basic commands - if cmd == "exit": - protocol.stop() - break - - elif cmd == "help": - print_help() - - elif cmd == "show_state": - protocol.show_state() - - elif cmd == "peer_id": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: peer_id <hex_pubkey>") - continue - try: - protocol.set_peer_identity(parts[1]) - except ValueError as e: - print(f"{RED}[ERROR]{RESET} Invalid public key: {e}") - - elif cmd == "connect": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: connect <port>") - continue - try: - port = int(parts[1]) - protocol.connect_to_peer(port) - except ValueError: - print(f"{RED}[ERROR]{RESET} Invalid port number.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Connection failed: {e}") - - # Manual protocol operation - elif cmd == "generate_ephemeral_keys": - protocol.generate_ephemeral_keys() - - elif cmd == "send_ping": - # Optional cipher parameter (0 = AES-GCM, 1 = ChaCha20-Poly1305) - cipher = 0 # Default to AES-GCM - if len(parts) >= 2: - try: - cipher = int(parts[1]) - if cipher not in (0, 1): - print(f"{YELLOW}[WARNING]{RESET} Unsupported cipher code {cipher}. Using AES-GCM (0).") - cipher = 0 - except ValueError: - print(f"{YELLOW}[WARNING]{RESET} Invalid cipher code. Using AES-GCM (0).") - protocol.send_ping_request(cipher) - - elif cmd == "send_handshake": - protocol.send_handshake() - - elif cmd == "respond_ping": - if len(parts) != 3: - print(f"{RED}[ERROR]{RESET} Usage: respond_ping <index> <0|1>") - continue - try: - idx = int(parts[1]) - answer = int(parts[2]) - if answer not in (0, 1): - print(f"{RED}[ERROR]{RESET} Answer must be 0 (reject) or 1 (accept).") - continue - protocol.respond_to_ping(idx, answer) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index and answer must be integers.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to respond to ping: {e}") - - elif cmd == "generate_ecdhe": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: generate_ecdhe <index>") - continue - try: - idx = int(parts[1]) - protocol.generate_ecdhe(idx) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index must be an integer.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to process handshake: {e}") - - elif cmd == "derive_hkdf": - try: - protocol.derive_hkdf() - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to derive HKDF key: {e}") - - elif cmd == "send_encrypted": - if len(parts) < 2: - print(f"{RED}[ERROR]{RESET} Usage: send_encrypted <plaintext>") - continue - plaintext = " ".join(parts[1:]) - try: - protocol.send_encrypted_message(plaintext) - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to send encrypted message: {e}") - - elif cmd == "decrypt": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: decrypt <index>") - continue - try: - idx = int(parts[1]) - protocol.decrypt_received_message(idx) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index must be an integer.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to decrypt message: {e}") - - # Debugging commands - elif cmd == "debug_message": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: debug_message <index>") - continue - try: - idx = int(parts[1]) - protocol.debug_message(idx) - except ValueError: - print(f"{RED}[ERROR]{RESET} Index must be an integer.") - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to debug message: {e}") - - # Automatic mode commands - elif cmd == "auto": - if len(parts) < 2: - print(f"{RED}[ERROR]{RESET} Usage: auto <command> [options]") - print("Available commands: start, stop, status, config, message, passive, active") - continue - - subcmd = parts[1].lower() - - if subcmd == "start": - protocol.start_auto_mode() - print(f"{GREEN}[AUTO]{RESET} Automatic mode started") - - elif subcmd == "stop": - protocol.stop_auto_mode() - print(f"{GREEN}[AUTO]{RESET} Automatic mode stopped") - - elif subcmd == "status": - config = protocol.get_auto_mode_config() - print(f"{YELLOW}=== Auto Mode Status ==={RESET}") - print(f"Active: {protocol.auto_mode.active}") - print(f"State: {protocol.auto_mode.state}") - print(f"\n{YELLOW}--- Configuration ---{RESET}") - for key, value in vars(config).items(): - print(f" {key}: {value}") - - elif subcmd == "config": - if len(parts) < 3: - print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value> or auto config list") - continue - - if parts[2].lower() == "list": - config = protocol.get_auto_mode_config() - print(f"{YELLOW}=== Auto Mode Configuration Parameters ==={RESET}") - for key, value in vars(config).items(): - print(f" {key} ({type(value).__name__}): {value}") - continue - - if len(parts) != 4: - print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value>") - continue - - param = parts[2] - value_str = parts[3] - - # Convert the string value to the appropriate type - config = protocol.get_auto_mode_config() - if not hasattr(config, param): - print(f"{RED}[ERROR]{RESET} Unknown parameter: {param}") - print("Use 'auto config list' to see all available parameters") - continue - - current_value = getattr(config, param) - try: - if isinstance(current_value, bool): - if value_str.lower() in ("true", "yes", "on", "1"): - value = True - elif value_str.lower() in ("false", "no", "off", "0"): - value = False - else: - raise ValueError(f"Boolean value must be true/false/yes/no/on/off/1/0") - elif isinstance(current_value, int): - value = int(value_str) - elif isinstance(current_value, float): - value = float(value_str) - elif isinstance(current_value, str): - value = value_str - else: - value = value_str # Default to string - - protocol.configure_auto_mode(**{param: value}) - print(f"{GREEN}[AUTO]{RESET} Set {param} = {value}") - - except ValueError as e: - print(f"{RED}[ERROR]{RESET} Invalid value for {param}: {e}") - - elif subcmd == "message": - if len(parts) < 3: - print(f"{RED}[ERROR]{RESET} Usage: auto message <text>") - continue - - message = " ".join(parts[2:]) - protocol.queue_auto_message(message) - print(f"{GREEN}[AUTO]{RESET} Message queued for sending: {message}") - - elif subcmd == "passive": - # Configure as passive peer (responds but doesn't initiate) - protocol.configure_auto_mode( - ping_response_accept=True, - ping_auto_initiate=False, - active_mode=False - ) - print(f"{GREEN}[AUTO]{RESET} Configured as passive peer") - - elif subcmd == "active": - # Configure as active peer (initiates protocol) - protocol.configure_auto_mode( - ping_response_accept=True, - ping_auto_initiate=True, - active_mode=True - ) - print(f"{GREEN}[AUTO]{RESET} Configured as active peer") - - else: - print(f"{RED}[ERROR]{RESET} Unknown auto mode command: {subcmd}") - print("Available commands: start, stop, status, config, message, passive, active") - - # Legacy commands - elif cmd == "auto_responder": - if len(parts) != 2: - print(f"{RED}[ERROR]{RESET} Usage: auto_responder <on|off>") - continue - val = parts[1].lower() - if val not in ("on", "off"): - print(f"{RED}[ERROR]{RESET} Value must be 'on' or 'off'.") - continue - protocol.enable_auto_responder(val == "on") - print(f"{YELLOW}[WARNING]{RESET} Using legacy auto responder. Consider using 'auto' commands instead.") - - else: - print(f"{RED}[ERROR]{RESET} Unknown command: {cmd}") - print("Type 'help' for a list of available commands.") - - except Exception as e: - print(f"{RED}[ERROR]{RESET} Command failed: {e}") - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("\nExiting...") - except Exception as e: - print(f"{RED}[FATAL ERROR]{RESET} {e}") - sys.exit(1) diff --git a/protocol_prototype/crypto_utils.py b/protocol_prototype/crypto_utils.py deleted file mode 100644 index 8c2e110..0000000 --- a/protocol_prototype/crypto_utils.py +++ /dev/null @@ -1,165 +0,0 @@ -import os -from typing import Tuple -from cryptography.exceptions import InvalidSignature -from cryptography.hazmat.primitives import hashes, serialization -from cryptography.hazmat.primitives.asymmetric import ec, utils -from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature - -def generate_identity_keys() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: - """ - Generate an ECDSA (P-256) identity key pair. - - Returns: - Tuple containing: - - private_key: EllipticCurvePrivateKey object - - public_key_bytes: Raw x||y format (64 bytes, 512 bits) - """ - private_key = ec.generate_private_key(ec.SECP256R1()) - public_numbers = private_key.public_key().public_numbers() - - x_bytes = public_numbers.x.to_bytes(32, byteorder='big') - y_bytes = public_numbers.y.to_bytes(32, byteorder='big') - pubkey_bytes = x_bytes + y_bytes # 64 bytes total - - return private_key, pubkey_bytes - - -def load_peer_identity_key(pubkey_bytes: bytes) -> ec.EllipticCurvePublicKey: - """ - Convert a raw public key (64 bytes, x||y format) to a cryptography public key object. - - Args: - pubkey_bytes: Raw 64-byte public key (x||y format) - - Returns: - EllipticCurvePublicKey object - - Raises: - ValueError: If the pubkey_bytes is not exactly 64 bytes - """ - if len(pubkey_bytes) != 64: - raise ValueError("Peer identity pubkey must be exactly 64 bytes (x||y).") - - x_int = int.from_bytes(pubkey_bytes[:32], byteorder='big') - y_int = int.from_bytes(pubkey_bytes[32:], byteorder='big') - - public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) - return public_numbers.public_key() - - -def sign_data(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> bytes: - """ - Sign data with ECDSA using a P-256 private key. - - Args: - private_key: EllipticCurvePrivateKey for signing - data: Bytes to sign - - Returns: - DER-encoded signature (variable length, up to ~70-72 bytes) - """ - signature = private_key.sign(data, ec.ECDSA(hashes.SHA256())) - return signature - - -def verify_signature(public_key: ec.EllipticCurvePublicKey, signature: bytes, data: bytes) -> bool: - """ - Verify a DER-encoded ECDSA signature. - - Args: - public_key: EllipticCurvePublicKey for verification - signature: DER-encoded signature - data: Original signed data - - Returns: - True if signature is valid, False otherwise - """ - try: - public_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) - return True - except InvalidSignature: - return False - - -def get_ephemeral_keypair() -> Tuple[ec.EllipticCurvePrivateKey, bytes]: - """ - Generate an ephemeral ECDH key pair (P-256). - - Returns: - Tuple containing: - - private_key: EllipticCurvePrivateKey object - - pubkey_bytes: Raw x||y format (64 bytes, 512 bits) - """ - private_key = ec.generate_private_key(ec.SECP256R1()) - numbers = private_key.public_key().public_numbers() - - x_bytes = numbers.x.to_bytes(32, 'big') - y_bytes = numbers.y.to_bytes(32, 'big') - - return private_key, x_bytes + y_bytes # 64 bytes total - - -def compute_ecdh_shared_key(private_key: ec.EllipticCurvePrivateKey, peer_pubkey_bytes: bytes) -> bytes: - """ - Compute a shared secret using ECDH. - - Args: - private_key: Local ECDH private key - peer_pubkey_bytes: Peer's ephemeral public key (64 bytes, raw x||y format) - - Returns: - Shared secret bytes - - Raises: - ValueError: If peer_pubkey_bytes is not 64 bytes - """ - if len(peer_pubkey_bytes) != 64: - raise ValueError("Peer public key must be 64 bytes (x||y format)") - - x_int = int.from_bytes(peer_pubkey_bytes[:32], 'big') - y_int = int.from_bytes(peer_pubkey_bytes[32:], 'big') - - # Create public key object from raw components - peer_public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1()) - peer_public_key = peer_public_numbers.public_key() - - # Perform key exchange - shared_key = private_key.exchange(ec.ECDH(), peer_public_key) - return shared_key - - -def der_to_raw(der_sig: bytes) -> bytes: - """ - Convert a DER-encoded ECDSA signature to a raw 64-byte signature (r||s). - - Args: - der_sig: DER-encoded signature - - Returns: - Raw 64-byte signature (r||s format), with each component padded to 32 bytes - """ - r, s = decode_dss_signature(der_sig) - r_bytes = r.to_bytes(32, byteorder='big') - s_bytes = s.to_bytes(32, byteorder='big') - return r_bytes + s_bytes - - -def raw_signature_to_der(raw_sig: bytes) -> bytes: - """ - Convert a raw signature (64 bytes, concatenated r||s) to DER-encoded signature. - - Args: - raw_sig: Raw 64-byte signature (r||s format) - - Returns: - DER-encoded signature - - Raises: - ValueError: If raw_sig is not 64 bytes - """ - if len(raw_sig) != 64: - raise ValueError("Raw signature must be 64 bytes (r||s).") - - r = int.from_bytes(raw_sig[:32], 'big') - s = int.from_bytes(raw_sig[32:], 'big') - return encode_dss_signature(r, s) diff --git a/protocol_prototype/encryption.py b/protocol_prototype/encryption.py deleted file mode 100644 index 9aa3730..0000000 --- a/protocol_prototype/encryption.py +++ /dev/null @@ -1,263 +0,0 @@ -import os -import struct -from typing import Optional, Tuple -from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305 - -class MessageHeader: - """ - Header of an encrypted message (18 bytes total): - - Clear Text Section (4 bytes): - - flag: 16 bits (0xBEEF by default) - - data_len: 16 bits (length of encrypted payload excluding tag) - - Associated Data (14 bytes): - - retry: 8 bits (retry counter) - - connection_status: 4 bits (e.g., CRC required) + 4 bits padding - - iv/messageID: 96 bits (12 bytes) - """ - def __init__(self, flag: int, data_len: int, retry: int, connection_status: int, iv: bytes): - if not (0 <= flag < 65536): - raise ValueError("Flag must fit in 16 bits (0..65535)") - if not (0 <= data_len < 65536): - raise ValueError("Data length must fit in 16 bits (0..65535)") - if not (0 <= retry < 256): - raise ValueError("Retry must fit in 8 bits (0..255)") - if not (0 <= connection_status < 16): - raise ValueError("Connection status must fit in 4 bits (0..15)") - if len(iv) != 12: - raise ValueError("IV must be 12 bytes (96 bits)") - - self.flag = flag # 16 bits - self.data_len = data_len # 16 bits - self.retry = retry # 8 bits - self.connection_status = connection_status # 4 bits - self.iv = iv # 96 bits (12 bytes) - - def pack(self) -> bytes: - """Pack header into 18 bytes.""" - # Pack flag and data_len (4 bytes) - header = struct.pack('>H H', self.flag, self.data_len) - - # Pack retry and connection_status (2 bytes) - # connection_status in high 4 bits of second byte, 4 bits padding as zero - ad_byte = (self.connection_status & 0x0F) << 4 - ad_packed = struct.pack('>B B', self.retry, ad_byte) - - # Append IV (12 bytes) - return header + ad_packed + self.iv - - def get_associated_data(self) -> bytes: - """Get the associated data for AEAD encryption (retry, conn_status, iv).""" - # Pack retry and connection_status - ad_byte = (self.connection_status & 0x0F) << 4 - ad_packed = struct.pack('>B B', self.retry, ad_byte) - - # Append IV - return ad_packed + self.iv - - @classmethod - def unpack(cls, data: bytes) -> 'MessageHeader': - """Unpack 18 bytes into a MessageHeader object.""" - if len(data) < 18: - raise ValueError(f"Header data too short: {len(data)} bytes, expected 18") - - flag, data_len = struct.unpack('>H H', data[:4]) - retry, ad_byte = struct.unpack('>B B', data[4:6]) - connection_status = (ad_byte >> 4) & 0x0F - iv = data[6:18] - - return cls(flag, data_len, retry, connection_status, iv) - -class EncryptedMessage: - """ - Encrypted message packet format: - - - Header (18 bytes): - * flag: 16 bits - * data_len: 16 bits - * retry: 8 bits - * connection_status: 4 bits (+ 4 bits padding) - * iv/messageID: 96 bits (12 bytes) - - - Payload: variable length encrypted data - - - Footer: - * Authentication tag: 128 bits (16 bytes) - * CRC32: 32 bits (4 bytes) - optional, based on connection_status - """ - def __init__(self, plaintext: bytes, key: bytes, flag: int = 0xBEEF, - retry: int = 0, connection_status: int = 0, iv: bytes = None, - cipher_type: int = 0): - self.plaintext = plaintext - self.key = key - self.flag = flag - self.retry = retry - self.connection_status = connection_status - self.iv = iv or generate_iv(initial=True) - self.cipher_type = cipher_type # 0 = AES-256-GCM, 1 = ChaCha20-Poly1305 - - # Will be set after encryption - self.ciphertext = None - self.tag = None - self.header = None - - def encrypt(self) -> bytes: - """Encrypt the plaintext and return the full encrypted message.""" - # Create header with correct data_len (which will be set after encryption) - self.header = MessageHeader( - flag=self.flag, - data_len=0, # Will be updated after encryption - retry=self.retry, - connection_status=self.connection_status, - iv=self.iv - ) - - # Get associated data for AEAD - aad = self.header.get_associated_data() - - # Encrypt using the appropriate cipher - if self.cipher_type == 0: # AES-256-GCM - cipher = AESGCM(self.key) - ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) - elif self.cipher_type == 1: # ChaCha20-Poly1305 - cipher = ChaCha20Poly1305(self.key) - ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad) - else: - raise ValueError(f"Unsupported cipher type: {self.cipher_type}") - - # Extract ciphertext and tag - self.tag = ciphertext_with_tag[-16:] - self.ciphertext = ciphertext_with_tag[:-16] - - # Update header with actual data length - self.header.data_len = len(self.ciphertext) - - # Pack everything together - packed_header = self.header.pack() - - # Check if CRC is required (based on connection_status) - if self.connection_status & 0x01: # Lowest bit indicates CRC required - import zlib - # Compute CRC32 of header + ciphertext + tag - crc = zlib.crc32(packed_header + self.ciphertext + self.tag) & 0xffffffff - crc_bytes = struct.pack('>I', crc) - return packed_header + self.ciphertext + self.tag + crc_bytes - else: - return packed_header + self.ciphertext + self.tag - - @classmethod - def decrypt(cls, data: bytes, key: bytes, cipher_type: int = 0) -> Tuple[bytes, MessageHeader]: - """ - Decrypt an encrypted message and return the plaintext and header. - - Args: - data: The full encrypted message - key: The encryption key - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - Tuple of (plaintext, header) - """ - if len(data) < 18 + 16: # Header + minimum tag size - raise ValueError("Message too short") - - # Extract header - header_bytes = data[:18] - header = MessageHeader.unpack(header_bytes) - - # Get ciphertext and tag - data_len = header.data_len - ciphertext_start = 18 - ciphertext_end = ciphertext_start + data_len - - if ciphertext_end + 16 > len(data): - raise ValueError("Message length does not match header's data_len") - - ciphertext = data[ciphertext_start:ciphertext_end] - tag = data[ciphertext_end:ciphertext_end + 16] - - # Get associated data for AEAD - aad = header.get_associated_data() - - # Combine ciphertext and tag for decryption - ciphertext_with_tag = ciphertext + tag - - # Decrypt using the appropriate cipher - try: - if cipher_type == 0: # AES-256-GCM - cipher = AESGCM(key) - plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) - elif cipher_type == 1: # ChaCha20-Poly1305 - cipher = ChaCha20Poly1305(key) - plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad) - else: - raise ValueError(f"Unsupported cipher type: {cipher_type}") - - return plaintext, header - except Exception as e: - raise ValueError(f"Decryption failed: {e}") - -def generate_iv(initial: bool = False, previous_iv: bytes = None) -> bytes: - """ - Generate a 96-bit IV (12 bytes). - - Args: - initial: If True, return a random IV - previous_iv: The previous IV to increment - - Returns: - A new IV - """ - if initial or previous_iv is None: - return os.urandom(12) # 96 bits - else: - # Increment the previous IV by 1 modulo 2^96 - iv_int = int.from_bytes(previous_iv, 'big') - iv_int = (iv_int + 1) % (1 << 96) - return iv_int.to_bytes(12, 'big') - -# Convenience functions to match original API -def encrypt_message(plaintext: bytes, key: bytes, flag: int = 0xBEEF, - retry: int = 0, connection_status: int = 0, - iv: bytes = None, cipher_type: int = 0) -> bytes: - """ - Encrypt a message using the specified parameters. - - Args: - plaintext: The data to encrypt - key: The encryption key (32 bytes for AES-256-GCM, 32 bytes for ChaCha20-Poly1305) - flag: 16-bit flag value (default: 0xBEEF) - retry: 8-bit retry counter - connection_status: 4-bit connection status - iv: Optional 96-bit IV (if None, a random one will be generated) - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - The full encrypted message - """ - message = EncryptedMessage( - plaintext=plaintext, - key=key, - flag=flag, - retry=retry, - connection_status=connection_status, - iv=iv, - cipher_type=cipher_type - ) - return message.encrypt() - -def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes: - """ - Decrypt a message. - - Args: - message: The full encrypted message - key: The encryption key - cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305 - - Returns: - The decrypted plaintext - """ - plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type) - return plaintext diff --git a/protocol_prototype/messages.py b/protocol_prototype/messages.py deleted file mode 100644 index 98151ab..0000000 --- a/protocol_prototype/messages.py +++ /dev/null @@ -1,262 +0,0 @@ -import os -import struct -import time -import zlib -import hashlib -from typing import Tuple, Optional - -def crc32_of(data: bytes) -> int: - """ - Compute CRC-32 of 'data'. - """ - return zlib.crc32(data) & 0xffffffff - - -# --------------------------------------------------------------------------- -# PING REQUEST (new format) -# Fields (in order): -# - session_nonce: 129 bits (from the top 129 bits of 17 random bytes) -# - version: 7 bits -# - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305; for now only 0 is used) -# - CRC: 32 bits -# -# Total bits: 129 + 7 + 4 + 32 = 172 bits. We pack into 22 bytes (176 bits) with 4 spare bits. -# --------------------------------------------------------------------------- -class PingRequest: - """ - PING REQUEST format (172 bits / 22 bytes): - - session_nonce: 129 bits (from top 129 bits of 17 random bytes) - - version: 7 bits - - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305) - - CRC: 32 bits - """ - def __init__(self, version: int, cipher: int, session_nonce: bytes = None): - if not (0 <= version < 128): - raise ValueError("Version must fit in 7 bits (0..127)") - if not (0 <= cipher < 16): - raise ValueError("Cipher must fit in 4 bits (0..15)") - - self.version = version - self.cipher = cipher - - # Generate session nonce if not provided - if session_nonce is None: - # Generate 17 random bytes - nonce_full = os.urandom(17) - # Use top 129 bits - nonce_int_full = int.from_bytes(nonce_full, 'big') - nonce_129_int = nonce_int_full >> 7 # drop lowest 7 bits - self.session_nonce = nonce_129_int.to_bytes(17, 'big') - else: - if len(session_nonce) != 17: - raise ValueError("Session nonce must be 17 bytes (136 bits)") - self.session_nonce = session_nonce - - def serialize(self) -> bytes: - """Serialize the ping request into a 22-byte packet.""" - # Convert session_nonce to integer (129 bits) - nonce_int = int.from_bytes(self.session_nonce, 'big') - - # Pack fields: shift nonce left by 11 bits, add version and cipher - partial_int = (nonce_int << 11) | (self.version << 4) | (self.cipher & 0x0F) - # This creates 129+7+4 = 140 bits; pack into 18 bytes - partial_bytes = partial_int.to_bytes(18, 'big') - - # Compute CRC over these 18 bytes - cval = crc32_of(partial_bytes) - - # Combine partial data with 32-bit CRC - final_int = (int.from_bytes(partial_bytes, 'big') << 32) | cval - return final_int.to_bytes(22, 'big') - - @classmethod - def deserialize(cls, data: bytes) -> Optional['PingRequest']: - """Deserialize a 22-byte packet into a PingRequest object.""" - if len(data) != 22: - return None - - # Extract 176-bit integer - final_int = int.from_bytes(data, 'big') - - # Extract CRC and verify - crc_in = final_int & 0xffffffff - partial_int = final_int >> 32 # 140 bits - partial_bytes = partial_int.to_bytes(18, 'big') - crc_calc = crc32_of(partial_bytes) - - if crc_calc != crc_in: - return None - - # Extract fields - cipher = partial_int & 0x0F - version = (partial_int >> 4) & 0x7F - nonce_129_int = partial_int >> 11 # 129 bits - session_nonce = nonce_129_int.to_bytes(17, 'big') - - return cls(version, cipher, session_nonce) - - - -# --------------------------------------------------------------------------- -# PING RESPONSE (new format) -# Fields: -# - timestamp: 32 bits (we take the lower 32 bits of the time in ms) -# - version: 7 bits -# - cipher: 4 bits -# - answer: 1 bit -# - CRC: 32 bits -# -# Total bits: 32 + 7 + 4 + 1 + 32 = 76 bits; pack into 10 bytes (80 bits) with 4 spare bits. -# --------------------------------------------------------------------------- -class PingResponse: - """ - PING RESPONSE format (76 bits / 10 bytes): - - timestamp: 32 bits (milliseconds since epoch, lower 32 bits) - - version: 7 bits - - cipher: 4 bits - - answer: 1 bit (0 = no, 1 = yes) - - CRC: 32 bits - """ - def __init__(self, version: int, cipher: int, answer: int, timestamp: int = None): - if not (0 <= version < 128): - raise ValueError("Version must fit in 7 bits") - if not (0 <= cipher < 16): - raise ValueError("Cipher must fit in 4 bits") - if answer not in (0, 1): - raise ValueError("Answer must be 0 or 1") - - self.version = version - self.cipher = cipher - self.answer = answer - self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) - - def serialize(self) -> bytes: - """Serialize the ping response into a 10-byte packet.""" - # Pack timestamp, version, cipher, answer: 32+7+4+1 = 44 bits - partial_val = (self.timestamp << (7+4+1)) | (self.version << (4+1)) | (self.cipher << 1) | self.answer - partial_bytes = partial_val.to_bytes(6, 'big') # 6 bytes = 48 bits, 4 spare bits - - # Compute CRC - cval = crc32_of(partial_bytes) - - # Combine with CRC - final_val = (int.from_bytes(partial_bytes, 'big') << 32) | cval - return final_val.to_bytes(10, 'big') - - @classmethod - def deserialize(cls, data: bytes) -> Optional['PingResponse']: - """Deserialize a 10-byte packet into a PingResponse object.""" - if len(data) != 10: - return None - - # Extract 80-bit integer - final_int = int.from_bytes(data, 'big') - - # Extract CRC and verify - crc_in = final_int & 0xffffffff - partial_int = final_int >> 32 # 48 bits - partial_bytes = partial_int.to_bytes(6, 'big') - crc_calc = crc32_of(partial_bytes) - - if crc_calc != crc_in: - return None - - # Extract fields (discard 4 spare bits) - partial_int >>= 4 # now 44 bits - answer = partial_int & 0x01 - cipher = (partial_int >> 1) & 0x0F - version = (partial_int >> (1+4)) & 0x7F - timestamp = partial_int >> (1+4+7) - - return cls(version, cipher, answer, timestamp) - - -# ============================================================================= -# 3) Handshake -# - 32-bit timestamp -# - 64-byte ephemeral pubkey (raw x||y = 512 bits) -# - 64-byte ephemeral signature (raw r||s = 512 bits) -# - 32-byte PFS hash (256 bits) -# - 32-bit CRC -# => total 4 + 64 + 64 + 32 + 4 = 168 bytes = 1344 bits -# ============================================================================= - -class Handshake: - """ - HANDSHAKE format (1344 bits / 168 bytes): - - timestamp: 32 bits - - ephemeral_pubkey: 512 bits (64 bytes, raw x||y format) - - ephemeral_signature: 512 bits (64 bytes, raw r||s format) - - pfs_hash: 256 bits (32 bytes) - - CRC: 32 bits - """ - def __init__(self, ephemeral_pubkey: bytes, ephemeral_signature: bytes, pfs_hash: bytes, timestamp: int = None): - if len(ephemeral_pubkey) != 64: - raise ValueError("ephemeral_pubkey must be 64 bytes (raw x||y)") - if len(ephemeral_signature) != 64: - raise ValueError("ephemeral_signature must be 64 bytes (raw r||s)") - if len(pfs_hash) != 32: - raise ValueError("pfs_hash must be 32 bytes") - - self.ephemeral_pubkey = ephemeral_pubkey - self.ephemeral_signature = ephemeral_signature - self.pfs_hash = pfs_hash - self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff) - - def serialize(self) -> bytes: - """Serialize the handshake into a 168-byte packet.""" - # Pack timestamp and other fields - partial = struct.pack("!I", self.timestamp) + self.ephemeral_pubkey + self.ephemeral_signature + self.pfs_hash - - # Compute CRC - cval = crc32_of(partial) - - # Append CRC - return partial + struct.pack("!I", cval) - - @classmethod - def deserialize(cls, data: bytes) -> Optional['Handshake']: - """Deserialize a 168-byte packet into a Handshake object.""" - if len(data) != 168: - return None - - # Extract and verify CRC - partial = data[:-4] - crc_in = struct.unpack("!I", data[-4:])[0] - crc_calc = crc32_of(partial) - - if crc_calc != crc_in: - return None - - # Extract fields - timestamp = struct.unpack("!I", partial[:4])[0] - ephemeral_pubkey = partial[4:4+64] - ephemeral_signature = partial[68:68+64] - pfs_hash = partial[132:132+32] - - return cls(ephemeral_pubkey, ephemeral_signature, pfs_hash, timestamp) - - -# ============================================================================= -# 4) PFS Hash Helper -# If no previous session, return 32 zero bytes -# Otherwise, compute sha256(session_number || last_shared_secret). -# ============================================================================= - -def compute_pfs_hash(session_number: int, shared_secret_hex: str) -> bytes: - """ - Compute the PFS hash field for handshake messages: - - If no previous session (session_number < 0), return 32 zero bytes - - Otherwise, compute sha256(session_number || shared_secret) - """ - if session_number < 0: - return b"\x00" * 32 - - # Convert shared_secret_hex to raw bytes - secret_bytes = bytes.fromhex(shared_secret_hex) - - # Pack session_number as 4 bytes - sn_bytes = struct.pack("!I", session_number) - - # Compute hash - return hashlib.sha256(sn_bytes + secret_bytes).digest() diff --git a/protocol_prototype/protocol.py b/protocol_prototype/protocol.py deleted file mode 100644 index 1887476..0000000 --- a/protocol_prototype/protocol.py +++ /dev/null @@ -1,815 +0,0 @@ -import random -import os -import time -import threading -from typing import List, Dict, Any, Optional, Tuple - -from crypto_utils import ( - generate_identity_keys, - load_peer_identity_key, - sign_data, - verify_signature, - get_ephemeral_keypair, - compute_ecdh_shared_key, - der_to_raw, - raw_signature_to_der -) -from messages import ( - PingRequest, PingResponse, Handshake, - compute_pfs_hash -) -import transmission -from encryption import ( - EncryptedMessage, MessageHeader, - generate_iv, encrypt_message, decrypt_message -) -from auto_mode import AutoMode, AutoModeConfig - -# ANSI colors -RED = "\033[91m" -GREEN = "\033[92m" -YELLOW = "\033[93m" -BLUE = "\033[94m" -RESET = "\033[0m" - - -class IcingProtocol: - def __init__(self): - # Identity keys (each 512 bits when printed as hex of 64 bytes) - self.identity_privkey, self.identity_pubkey = generate_identity_keys() - - # Peer identity for verifying ephemeral signatures - self.peer_identity_pubkey_obj = None - self.peer_identity_pubkey_bytes = None - - # Ephemeral keys (our side) - self.ephemeral_privkey = None - self.ephemeral_pubkey = None - - # Last computed shared secret (hex string) - self.shared_secret = None - - # Derived HKDF key (hex string, 256 bits) - self.hkdf_key = None - - # Negotiated cipher (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) - self.cipher_type = 0 - - # For PFS: track per-peer session info (session number and last shared secret) - self.pfs_history: Dict[bytes, Tuple[int, str]] = {} - - # Protocol flags - self.state = { - "ping_sent": False, - "ping_received": False, - "handshake_sent": False, - "handshake_received": False, - "key_exchange_complete": False - } - - # Auto mode for automated protocol operation - self.auto_mode = AutoMode(self) - - # Legacy auto-responder toggle (kept for backward compatibility) - self.auto_responder = False - - # Active connections list - self.connections = [] - - # Inbound messages (each message is a dict with keys: type, raw, parsed, connection) - self.inbound_messages: List[Dict[str, Any]] = [] - - # Store the session nonce (17 bytes but only 129 bits are valid) from first sent or received PING - self.session_nonce: bytes = None - - # Last used IV for encrypted messages - self.last_iv: bytes = None - - self.local_port = random.randint(30000, 40000) - self.server_listener = transmission.ServerListener( - host="127.0.0.1", - port=self.local_port, - on_new_connection=self.on_new_connection, - on_data_received=self.on_data_received - ) - self.server_listener.start() - - # ------------------------------------------------------------------------- - # Transport callbacks - # ------------------------------------------------------------------------- - - def on_new_connection(self, conn: transmission.PeerConnection): - print(f"{GREEN}[IcingProtocol]{RESET} New incoming connection.") - self.connections.append(conn) - - # Notify auto mode - self.auto_mode.handle_connection_established() - - def on_data_received(self, conn: transmission.PeerConnection, data: bytes): - bits_count = len(data) * 8 - print( - f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex()) > 60 else ''}") - - # PING REQUEST (22 bytes) - if len(data) == 22: - ping_request = PingRequest.deserialize(data) - if ping_request: - self.state["ping_received"] = True - - # If received cipher is not supported, force to 0 (AES-256-GCM) - if ping_request.cipher != 0 and ping_request.cipher != 1: - print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({ping_request.cipher}); forcing cipher to 0 in response.") - ping_request.cipher = 0 - - # Store cipher type for future encrypted messages - self.cipher_type = ping_request.cipher - - # Store session nonce if not already set - if self.session_nonce is None: - self.session_nonce = ping_request.session_nonce - print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from received PING.") - - index = len(self.inbound_messages) - msg = { - "type": "PING_REQUEST", - "raw": data, - "parsed": ping_request, - "connection": conn - } - self.inbound_messages.append(msg) - - # Handle in auto mode (if active) - self.auto_mode.handle_ping_received(index) - - # Legacy auto-responder (for backward compatibility) - if self.auto_responder and not self.auto_mode.active: - timer = threading.Timer(2.0, self._auto_respond_ping, args=[index]) - timer.daemon = True - timer.start() - return - - # PING RESPONSE (10 bytes) - elif len(data) == 10: - ping_response = PingResponse.deserialize(data) - if ping_response: - # Store negotiated cipher type - self.cipher_type = ping_response.cipher - - index = len(self.inbound_messages) - msg = { - "type": "PING_RESPONSE", - "raw": data, - "parsed": ping_response, - "connection": conn - } - self.inbound_messages.append(msg) - - # Notify auto mode (if active) - self.auto_mode.handle_ping_response_received(ping_response.answer == 1) - return - - # HANDSHAKE message (168 bytes) - elif len(data) == 168: - handshake = Handshake.deserialize(data) - if handshake: - self.state["handshake_received"] = True - index = len(self.inbound_messages) - msg = { - "type": "HANDSHAKE", - "raw": data, - "parsed": handshake, - "connection": conn - } - self.inbound_messages.append(msg) - - # Notify auto mode (if active) - self.auto_mode.handle_handshake_received(index) - - # Legacy auto-responder - if self.auto_responder and not self.auto_mode.active: - timer = threading.Timer(2.0, self._auto_respond_handshake, args=[index]) - timer.daemon = True - timer.start() - return - - # Check if the message might be an encrypted message (e.g. header of 18 bytes at start) - elif len(data) >= 18: - # Try to parse header - try: - header = MessageHeader.unpack(data[:18]) - # If header unpacking is successful and data length matches header expectations - expected_len = 18 + header.data_len + 16 # Header + payload + tag - - # Check if CRC is included - has_crc = (header.connection_status & 0x01) != 0 - if has_crc: - expected_len += 4 # Add CRC32 length - - if len(data) >= expected_len: - index = len(self.inbound_messages) - msg = { - "type": "ENCRYPTED_MESSAGE", - "raw": data, - "parsed": header, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{YELLOW}[NOTICE]{RESET} Stored inbound ENCRYPTED_MESSAGE at index={index}.") - - # Notify auto mode - self.auto_mode.handle_encrypted_received(index) - return - except Exception as e: - print(f"{RED}[ERROR]{RESET} Failed to parse message header: {e}") - - # Otherwise, unrecognized/malformed message. - index = len(self.inbound_messages) - msg = { - "type": "UNKNOWN", - "raw": data, - "parsed": None, - "connection": conn - } - self.inbound_messages.append(msg) - print(f"{RED}[WARNING]{RESET} Unrecognized or malformed message stored at index={index}.") - - - # ------------------------------------------------------------------------- - # HKDF Derivation - # ------------------------------------------------------------------------- - - def derive_hkdf(self): - """ - Derives a 256-bit key using HKDF. - Uses as input keying material (IKM) the shared secret from ECDH. - The salt is computed as SHA256(session_nonce || pfs_param), where: - - session_nonce is taken from self.session_nonce (17 bytes, 129 bits) or defaults to zeros. - - pfs_param is taken from the first inbound HANDSHAKE's pfs_hash field (32 bytes) or zeros. - """ - if not self.shared_secret: - print(f"{RED}[ERROR]{RESET} No shared secret available; cannot derive HKDF key.") - return - - # IKM: shared secret converted from hex to bytes. - ikm = bytes.fromhex(self.shared_secret) - # Use stored session_nonce if available; otherwise default to zeros. - session_nonce = self.session_nonce if self.session_nonce is not None else (b"\x00" * 17) - - # Determine pfs_param from first HANDSHAKE message (if any) - pfs_param = None - for msg in self.inbound_messages: - if msg["type"] == "HANDSHAKE": - try: - handshake = msg["parsed"] - pfs_param = handshake.pfs_hash - except Exception: - pfs_param = None - break - if pfs_param is None: - print(f"{RED}[WARNING]{RESET} No HANDSHAKE found; using 32 zero bytes for pfs_param.") - pfs_param = b"\x00" * 32 # 256-bit zeros - - # Ensure both are bytes - if isinstance(session_nonce, str): - session_nonce = session_nonce.encode() - if isinstance(pfs_param, str): - pfs_param = pfs_param.encode() - - from cryptography.hazmat.primitives import hashes - from cryptography.hazmat.primitives.kdf.hkdf import HKDF - hasher = hashes.Hash(hashes.SHA256()) - hasher.update(session_nonce + pfs_param) - salt_value = hasher.finalize() - - hkdf = HKDF( - algorithm=hashes.SHA256(), - length=32, # 256 bits - salt=salt_value, - info=b"", - ) - derived_key = hkdf.derive(ikm) - self.hkdf_key = derived_key.hex() - self.state["key_exchange_complete"] = True - print(f"{GREEN}[HKDF]{RESET} Derived HKDF key: {self.hkdf_key}") - return True - - # ------------------------------------------------------------------------- - # Legacy Auto-responder helpers (kept for backward compatibility) - # ------------------------------------------------------------------------- - - def _auto_respond_ping(self, index: int): - """ - Called by a Timer to respond automatically to a PING_REQUEST after 2s. - """ - print(f"{BLUE}[AUTO]{RESET} Delayed responding to PING at index={index}") - self.respond_to_ping(index, answer=1) # Accept by default - self.show_state() - - def _auto_respond_handshake(self, index: int): - """ - Called by a Timer to handle inbound HANDSHAKE automatically. - 1) Generate ephemeral keys if not already set - 2) Compute ECDH with the inbound ephemeral pub (generate_ecdhe) - 3) Send our handshake back - 4) Show state - """ - print(f"{BLUE}[AUTO]{RESET} Delayed ECDH process for HANDSHAKE at index={index}") - - # 1) Generate ephemeral keys if we don't have them - if not self.ephemeral_privkey or not self.ephemeral_pubkey: - self.generate_ephemeral_keys() - - # 2) Compute ECDH from inbound ephemeral pub - self.generate_ecdhe(index) - - # 3) Send our handshake to the peer - self.send_handshake() - - # 4) Show final state - self.show_state() - - # ------------------------------------------------------------------------- - # Public Methods for Auto Mode Management - # ------------------------------------------------------------------------- - - def start_auto_mode(self): - """Start the automatic protocol operation mode.""" - self.auto_mode.start() - - def stop_auto_mode(self): - """Stop the automatic protocol operation mode.""" - self.auto_mode.stop() - - def configure_auto_mode(self, **kwargs): - """ - Configure the automatic mode parameters. - - Args: - **kwargs: Configuration parameters to set. Supported parameters: - - ping_response_accept: bool, whether to accept incoming pings - - ping_auto_initiate: bool, whether to initiate pings on connection - - ping_retry_count: int, number of ping retries - - ping_retry_delay: float, seconds between ping retries - - ping_timeout: float, seconds to wait for ping response - - preferred_cipher: int, preferred cipher (0=AES-GCM, 1=ChaCha20-Poly1305) - - handshake_retry_count: int, number of handshake retries - - handshake_retry_delay: float, seconds between handshake retries - - handshake_timeout: float, seconds to wait for handshake - - auto_message_enabled: bool, whether to auto-send messages - - message_interval: float, seconds between auto messages - - message_content: str, default message content - - active_mode: bool, whether to actively initiate protocol - """ - for key, value in kwargs.items(): - if hasattr(self.auto_mode.config, key): - setattr(self.auto_mode.config, key, value) - print(f"{BLUE}[CONFIG]{RESET} Set auto mode {key} = {value}") - else: - print(f"{RED}[ERROR]{RESET} Unknown auto mode configuration parameter: {key}") - - def get_auto_mode_config(self): - """Return the current auto mode configuration.""" - return self.auto_mode.config - - def queue_auto_message(self, message: str): - """ - Add a message to the auto-send queue. - - Args: - message: Message text to send - """ - self.auto_mode.queue_message(message) - - def toggle_auto_mode_logging(self): - """ - Toggle detailed logging for auto mode. - When enabled, will show more information about state transitions and decision making. - """ - if not hasattr(self.auto_mode, 'verbose_logging'): - self.auto_mode.verbose_logging = True - else: - self.auto_mode.verbose_logging = not self.auto_mode.verbose_logging - - status = "enabled" if self.auto_mode.verbose_logging else "disabled" - print(f"{BLUE}[AUTO-LOG]{RESET} Detailed logging {status}") - - def debug_message(self, index: int): - """ - Debug a message in the inbound message queue. - Prints detailed information about the message. - - Args: - index: The index of the message in the inbound_messages queue - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid message index {index}") - return - - msg = self.inbound_messages[index] - print(f"\n{YELLOW}=== Message Debug [{index}] ==={RESET}") - print(f"Type: {msg['type']}") - print(f"Length: {len(msg['raw'])} bytes = {len(msg['raw'])*8} bits") - print(f"Raw data: {msg['raw'].hex()}") - - if msg['parsed'] is not None: - print(f"\n{YELLOW}--- Parsed Data ---{RESET}") - if msg['type'] == 'PING_REQUEST': - ping = msg['parsed'] - print(f"Version: {ping.version}") - print(f"Cipher: {ping.cipher} ({'AES-256-GCM' if ping.cipher == 0 else 'ChaCha20-Poly1305' if ping.cipher == 1 else 'Unknown'})") - print(f"Session nonce: {ping.session_nonce.hex()}") - print(f"CRC32: {ping.crc32:08x}") - - elif msg['type'] == 'PING_RESPONSE': - resp = msg['parsed'] - print(f"Version: {resp.version}") - print(f"Cipher: {resp.cipher} ({'AES-256-GCM' if resp.cipher == 0 else 'ChaCha20-Poly1305' if resp.cipher == 1 else 'Unknown'})") - print(f"Answer: {resp.answer} ({'Accept' if resp.answer == 1 else 'Reject'})") - print(f"CRC32: {resp.crc32:08x}") - - elif msg['type'] == 'HANDSHAKE': - hs = msg['parsed'] - print(f"Ephemeral pubkey: {hs.ephemeral_pubkey.hex()[:16]}...") - print(f"Ephemeral signature: {hs.ephemeral_signature.hex()[:16]}...") - print(f"PFS hash: {hs.pfs_hash.hex()[:16]}...") - print(f"Timestamp: {hs.timestamp}") - print(f"CRC32: {hs.crc32:08x}") - - elif msg['type'] == 'ENCRYPTED_MESSAGE': - header = msg['parsed'] - print(f"Flag: 0x{header.flag:04x}") - print(f"Data length: {header.data_len} bytes") - print(f"Retry: {header.retry}") - print(f"Connection status: {header.connection_status} ({'CRC included' if header.connection_status & 0x01 else 'No CRC'})") - print(f"IV: {header.iv.hex()}") - - # Calculate expected message size - expected_len = 18 + header.data_len + 16 # Header + payload + tag - if header.connection_status & 0x01: - expected_len += 4 # Add CRC - - print(f"Expected total length: {expected_len} bytes") - print(f"Actual length: {len(msg['raw'])} bytes") - - # If we have a key, try to decrypt - if self.hkdf_key: - print("\nAttempting decryption...") - try: - key = bytes.fromhex(self.hkdf_key) - plaintext = decrypt_message(msg['raw'], key, self.cipher_type) - print(f"Decrypted: {plaintext.decode('utf-8')}") - except Exception as e: - print(f"Decryption failed: {e}") - print() - - # ------------------------------------------------------------------------- - # Public Methods - # ------------------------------------------------------------------------- - - def connect_to_peer(self, port: int): - conn = transmission.connect_to_peer("127.0.0.1", port, self.on_data_received) - self.connections.append(conn) - print(f"{GREEN}[IcingProtocol]{RESET} Outgoing connection to port {port} established.") - - # Notify auto mode - self.auto_mode.handle_connection_established() - - def set_peer_identity(self, peer_pubkey_hex: str): - pubkey_bytes = bytes.fromhex(peer_pubkey_hex) - self.peer_identity_pubkey_obj = load_peer_identity_key(pubkey_bytes) - self.peer_identity_pubkey_bytes = pubkey_bytes - print(f"{GREEN}[IcingProtocol]{RESET} Stored peer identity pubkey (hex={peer_pubkey_hex[:16]}...).") - - def generate_ephemeral_keys(self): - self.ephemeral_privkey, self.ephemeral_pubkey = get_ephemeral_keypair() - print(f"{GREEN}[IcingProtocol]{RESET} Generated ephemeral key pair: pubkey={self.ephemeral_pubkey.hex()[:16]}...") - - # Send PING (session discovery and cipher negotiation) - def send_ping_request(self, cipher_type=0): - """ - Send a ping request to the first connected peer. - - Args: - cipher_type: Preferred cipher type (0 = AES-256-GCM, 1 = ChaCha20-Poly1305) - """ - if not self.connections: - print(f"{RED}[ERROR]{RESET} No active connections.") - return False - - # Validate cipher type - if cipher_type not in (0, 1): - print(f"{YELLOW}[WARNING]{RESET} Invalid cipher type {cipher_type}, defaulting to AES-256-GCM (0)") - cipher_type = 0 - - # Create ping request with specified cipher - ping_request = PingRequest(version=0, cipher=cipher_type) - - # Store session nonce if not already set - if self.session_nonce is None: - self.session_nonce = ping_request.session_nonce - print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from sent PING.") - - # Serialize and send - pkt = ping_request.serialize() - self._send_packet(self.connections[0], pkt, "PING_REQUEST") - self.state["ping_sent"] = True - return True - - def send_handshake(self): - """ - Build and send handshake: - - ephemeral_pubkey (64 bytes, raw x||y) - - ephemeral_signature (64 bytes, raw r||s) - - pfs_hash (32 bytes) - - timestamp (32 bits) - - CRC (32 bits) - """ - if not self.connections: - print(f"{RED}[ERROR]{RESET} No active connections.") - return False - if not self.ephemeral_privkey or not self.ephemeral_pubkey: - print(f"{RED}[ERROR]{RESET} Ephemeral keys not generated.") - return False - if self.peer_identity_pubkey_bytes is None: - print(f"{RED}[ERROR]{RESET} Peer identity not set; needed for PFS tracking.") - return False - - # 1) Sign ephemeral_pubkey using identity key - sig_der = sign_data(self.identity_privkey, self.ephemeral_pubkey) - # Convert DER signature to raw r||s format (64 bytes) - raw_signature = der_to_raw(sig_der) - - # 2) Compute PFS hash - session_number, last_secret_hex = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) - pfs = compute_pfs_hash(session_number, last_secret_hex) - - # 3) Create handshake object - handshake = Handshake( - ephemeral_pubkey=self.ephemeral_pubkey, - ephemeral_signature=raw_signature, - pfs_hash=pfs - ) - - # 4) Serialize and send - pkt = handshake.serialize() - self._send_packet(self.connections[0], pkt, "HANDSHAKE") - self.state["handshake_sent"] = True - return True - - def enable_auto_responder(self, enable: bool): - """ - Legacy method for enabling/disabling auto responder. - For new code, use start_auto_mode() and stop_auto_mode() instead. - """ - self.auto_responder = enable - print(f"{YELLOW}[LEGACY]{RESET} Auto responder set to {enable}. Consider using auto_mode instead.") - - # ------------------------------------------------------------------------- - # Manual Responses - # ------------------------------------------------------------------------- - - def respond_to_ping(self, index: int, answer: int): - """ - Respond to a ping request with the specified answer (0 = no, 1 = yes). - If answer is 1, we accept the connection and use the cipher specified in the request. - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid index {index}.") - return False - msg = self.inbound_messages[index] - if msg["type"] != "PING_REQUEST": - print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a PING_REQUEST.") - return False - - ping_request = msg["parsed"] - version = ping_request.version - cipher = ping_request.cipher - - # Force cipher to 0 or 1 (only AES-256-GCM and ChaCha20-Poly1305 are supported) - if cipher != 0 and cipher != 1: - print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({cipher}); forcing cipher to 0 in response.") - cipher = 0 - - # Store the negotiated cipher type if we're accepting - if answer == 1: - self.cipher_type = cipher - - conn = msg["connection"] - # Create ping response - ping_response = PingResponse(version, cipher, answer) - resp = ping_response.serialize() - self._send_packet(conn, resp, "PING_RESPONSE") - print(f"{BLUE}[MANUAL]{RESET} Responded to ping with answer={answer}.") - return True - - def generate_ecdhe(self, index: int): - """ - Process a handshake message: - 1. Verify the ephemeral signature - 2. Compute the ECDH shared secret - 3. Update PFS history - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid index {index}.") - return False - msg = self.inbound_messages[index] - if msg["type"] != "HANDSHAKE": - print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a HANDSHAKE.") - return False - - handshake = msg["parsed"] - - # Convert raw signature to DER for verification - raw_sig = handshake.ephemeral_signature - sig_der = raw_signature_to_der(raw_sig) - - # Verify signature - ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, handshake.ephemeral_pubkey) - if not ok: - print(f"{RED}[ERROR]{RESET} Ephemeral signature invalid.") - return False - print(f"{GREEN}[OK]{RESET} Ephemeral signature verified.") - - # Check if we have ephemeral keys - if not self.ephemeral_privkey: - print(f"{YELLOW}[WARN]{RESET} No ephemeral_privkey available, cannot compute shared secret.") - return False - - # Compute ECDH shared secret - shared = compute_ecdh_shared_key(self.ephemeral_privkey, handshake.ephemeral_pubkey) - self.shared_secret = shared.hex() - print(f"{GREEN}[OK]{RESET} Computed ECDH shared key = {self.shared_secret}") - - # Update PFS history - old_session, _ = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) - new_session = 1 if old_session < 0 else old_session + 1 - self.pfs_history[self.peer_identity_pubkey_bytes] = (new_session, self.shared_secret) - return True - - # ------------------------------------------------------------------------- - # Utility - # ------------------------------------------------------------------------- - - def _send_packet(self, conn: transmission.PeerConnection, data: bytes, label: str): - bits_count = len(data) * 8 - print(f"{BLUE}[SEND]{RESET} {label} -> {bits_count} bits: {data.hex()[:60]}{'...' if len(data.hex())>60 else ''}") - conn.send(data) - - def show_state(self): - print(f"\n{YELLOW}=== Global State ==={RESET}") - print(f"Listening Port: {self.local_port}") - print(f"Identity PubKey: 512 bits => {self.identity_pubkey.hex()[:16]}...") - - if self.peer_identity_pubkey_bytes: - print(f"Peer Identity PubKey: 512 bits => {self.peer_identity_pubkey_bytes.hex()[:16]}...") - else: - print("Peer Identity PubKey: [None]") - - print("\nEphemeral Keys:") - if self.ephemeral_pubkey: - print(f" ephemeral_pubkey: 512 bits => {self.ephemeral_pubkey.hex()[:16]}...") - else: - print(" ephemeral_pubkey: [None]") - - print(f"\nShared Secret: {self.shared_secret if self.shared_secret else '[None]'}") - - if self.hkdf_key: - print(f"HKDF Derived Key: {self.hkdf_key} (size: {len(self.hkdf_key)*8} bits)") - else: - print("HKDF Derived Key: [None]") - - print(f"Negotiated Cipher: {'AES-256-GCM' if self.cipher_type == 0 else 'ChaCha20-Poly1305'} (code: {self.cipher_type})") - - if self.session_nonce: - print(f"Session Nonce: {self.session_nonce.hex()} (129 bits)") - else: - print("Session Nonce: [None]") - - if self.last_iv: - print(f"Last IV: {self.last_iv.hex()} (96 bits)") - else: - print("Last IV: [None]") - - print("\nProtocol Flags:") - for k, v in self.state.items(): - print(f" {k}: {v}") - - print("\nAuto Mode Active:", self.auto_mode.active) - print("Auto Mode State:", self.auto_mode.state) - print("Legacy Auto Responder:", self.auto_responder) - - print("\nActive Connections:") - for i, c in enumerate(self.connections): - print(f" [{i}] Alive={c.alive}") - - print("\nInbound Message Queue:") - for i, m in enumerate(self.inbound_messages): - print(f" [{i}] type={m['type']} len={len(m['raw'])} bytes => {len(m['raw']) * 8} bits") - print() - - def stop(self): - """Stop the protocol and clean up resources.""" - # Stop auto mode first - self.auto_mode.stop() - - # Stop server listener - self.server_listener.stop() - - # Close all connections - for c in self.connections: - c.close() - self.connections.clear() - self.inbound_messages.clear() - print(f"{RED}[STOP]{RESET} Protocol stopped.") - - # ------------------------------------------------------------------------- - # Encrypted Messaging - # ------------------------------------------------------------------------- - - def send_encrypted_message(self, plaintext: str): - """ - Encrypts and sends a message using the derived HKDF key and negotiated cipher. - The message format is: - - Header (18 bytes): flag, data_len, retry, connection_status, IV - - Payload: variable length encrypted data - - Footer: Authentication tag (16 bytes) + optional CRC32 (4 bytes) - """ - if not self.connections: - print(f"{RED}[ERROR]{RESET} No active connections.") - return False - if not self.hkdf_key: - print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot encrypt message.") - return False - - # Get the encryption key - key = bytes.fromhex(self.hkdf_key) - - # Convert plaintext to bytes - plaintext_bytes = plaintext.encode('utf-8') - - # Generate or increment the IV - if self.last_iv is None: - # First message, generate random IV - iv = generate_iv(initial=True) - else: - # Subsequent message, increment previous IV - iv = generate_iv(initial=False, previous_iv=self.last_iv) - - # Store the new IV - self.last_iv = iv - - # Create encrypted message (connection_status 0 = no CRC) - encrypted = encrypt_message( - plaintext=plaintext_bytes, - key=key, - flag=0xBEEF, # Default flag - retry=0, - connection_status=0, # No CRC - iv=iv, - cipher_type=self.cipher_type - ) - - # Send the encrypted message - self._send_packet(self.connections[0], encrypted, "ENCRYPTED_MESSAGE") - print(f"{GREEN}[SEND_ENCRYPTED]{RESET} Encrypted message sent.") - return True - - def decrypt_received_message(self, index: int): - """ - Decrypt a received encrypted message using the HKDF key and negotiated cipher. - """ - if index < 0 or index >= len(self.inbound_messages): - print(f"{RED}[ERROR]{RESET} Invalid message index.") - return None - - msg = self.inbound_messages[index] - if msg["type"] != "ENCRYPTED_MESSAGE": - print(f"{RED}[ERROR]{RESET} Message at index {index} is not an ENCRYPTED_MESSAGE.") - return None - - # Get the encrypted message - encrypted = msg["raw"] - - if not self.hkdf_key: - print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot decrypt message.") - return None - - # Get the encryption key - key = bytes.fromhex(self.hkdf_key) - - try: - # Decrypt the message - plaintext = decrypt_message(encrypted, key, self.cipher_type) - - # Convert to string - plaintext_str = plaintext.decode('utf-8') - - # Update last IV from the header - header = MessageHeader.unpack(encrypted[:18]) - self.last_iv = header.iv - - print(f"{GREEN}[DECRYPTED]{RESET} Decrypted message: {plaintext_str}") - return plaintext_str - except Exception as e: - print(f"{RED}[ERROR]{RESET} Decryption failed: {e}") - return None diff --git a/protocol_prototype/transmission.py b/protocol_prototype/transmission.py deleted file mode 100644 index 35f3a21..0000000 --- a/protocol_prototype/transmission.py +++ /dev/null @@ -1,100 +0,0 @@ -import socket -import threading -from typing import Callable - -class PeerConnection: - """ - Represents a live, two-way connection to a peer. - We keep a socket open, read data in a background thread, - and can send data from the main thread at any time. - """ - def __init__(self, sock: socket.socket, on_data_received: Callable[['PeerConnection', bytes], None]): - self.sock = sock - self.on_data_received = on_data_received - self.alive = True - - self.read_thread = threading.Thread(target=self.read_loop, daemon=True) - self.read_thread.start() - - def read_loop(self): - while self.alive: - try: - data = self.sock.recv(4096) - if not data: - break - self.on_data_received(self, data) - except OSError: - break - self.alive = False - self.sock.close() - print("[PeerConnection] Connection closed.") - - def send(self, data: bytes): - if not self.alive: - print("[PeerConnection.send] Cannot send, connection not alive.") - return - try: - self.sock.sendall(data) - except OSError: - print("[PeerConnection.send] Send failed, connection might be closed.") - self.alive = False - - def close(self): - self.alive = False - try: - self.sock.shutdown(socket.SHUT_RDWR) - except OSError: - pass - self.sock.close() - - -class ServerListener(threading.Thread): - """ - A thread that listens on a given port. When a new client connects, - it creates a PeerConnection for that client. - """ - def __init__(self, host: str, port: int, - on_new_connection: Callable[[PeerConnection], None], - on_data_received: Callable[[PeerConnection, bytes], None]): - super().__init__(daemon=True) - self.host = host - self.port = port - self.on_new_connection = on_new_connection - self.on_data_received = on_data_received - self.server_socket = None - self.stop_event = threading.Event() - - def run(self): - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.server_socket.bind((self.host, self.port)) - self.server_socket.listen(5) - self.server_socket.settimeout(1.0) - print(f"[ServerListener] Listening on {self.host}:{self.port}") - - while not self.stop_event.is_set(): - try: - client_sock, addr = self.server_socket.accept() - print(f"[ServerListener] Accepted connection from {addr}") - conn = PeerConnection(client_sock, self.on_data_received) - self.on_new_connection(conn) - except socket.timeout: - pass - except OSError: - break - - if self.server_socket: - self.server_socket.close() - - def stop(self): - self.stop_event.set() - if self.server_socket: - self.server_socket.close() - - -def connect_to_peer(host: str, port: int, - on_data_received: Callable[[PeerConnection, bytes], None]) -> PeerConnection: - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) - print(f"[connect_to_peer] Connected to {host}:{port}") - conn = PeerConnection(sock, on_data_received) - return conn