Noise prototype
All checks were successful
/ mirror (push) Successful in 4s

This commit is contained in:
STCB 2025-04-25 22:46:48 +02:00
parent 9e2daa7f53
commit d6d1c8ceba
11 changed files with 536 additions and 2363 deletions

View File

@ -0,0 +1,253 @@
#!/usr/bin/env python3
import os
import sys
import cmd
import json
import logging
from typing import Optional
from keys import generate_static_key, load_static_key, save_contact, list_contacts, CONTACTS_FILE
from transport import Peer
from noise_handshake import NoiseXK
from messaging import Messenger
# Configure default logging
logger = logging.getLogger("noise_xk_cli")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter("[%(levelname)s] %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
class NoiseCLI(cmd.Cmd):
intro = "Welcome to the Noise_XK CLI. Type help or ? to list commands."
prompt = "(noise) "
def __init__(self):
super().__init__()
# Static key pair for this instance
self.static_priv = None
self.static_pub = None
# Peer static key (bytes)
self.peer_static: Optional[bytes] = None
# Transport peer connection
self.peer: Optional[Peer] = None
# Handshake and messenger
self.handshake: Optional[NoiseXK] = None
self.messenger: Optional[Messenger] = None
# Incoming ciphertext buffer
self.inbox = []
# Ensure contacts directory
os.makedirs(os.path.dirname(CONTACTS_FILE), exist_ok=True)
def preloop(self):
# Load existing static key or generate new
priv, pub = generate_static_key()
self.static_priv, self.static_pub = priv, pub
logger.info(f"Generated static X25519 key, public: {pub.hex()}")
def do_generate_key(self, arg):
"Regenerate a new static key pair"
priv, pub = generate_static_key()
self.static_priv, self.static_pub = priv, pub
logger.info(f"New static public key: {pub.hex()}")
def do_import_key(self, arg):
"import_key <alias> <hex_pub> Import peer static public key"
parts = arg.split()
if len(parts) != 2:
logger.error("Usage: import_key <alias> <hex_pub>")
return
alias, hex_pub = parts
try:
pub = load_static_key(hex_pub)
save_contact(alias, hex_pub)
logger.info(f"Imported contact '{alias}' = {hex_pub}")
except Exception as e:
logger.error(f"Failed to import key: {e}")
def do_list_keys(self, arg):
"List saved contacts"
contacts = list_contacts()
if not contacts:
logger.info("No contacts saved.")
return
for alias, hex_pub in contacts:
print(f" {alias}: {hex_pub}")
def do_listen(self, arg):
"listen <port> Start listening for incoming P2P connections"
if not arg:
logger.error("Usage: listen <port>")
return
try:
port = int(arg.strip())
except ValueError:
logger.error("Port must be an integer.")
return
self.peer = Peer(port, self.handle_data)
self.peer.start_listen()
logger.info(f"Listening on port {port}")
def do_connect(self, arg):
"connect [host] <port> <alias> Connect to remote peer and set peer static key by alias"
parts = arg.split()
if len(parts) == 2:
# Host not specified, use localhost as default
host = "localhost"
port_str, alias = parts
elif len(parts) == 3:
# Host explicitly specified
logger.debug(parts)
host, port_str, alias = parts
else:
logger.error("Usage: connect [host] <port> <alias>")
return
try:
port = int(port_str)
except ValueError:
logger.error("Port must be an integer. " + port_str)
return
contacts = dict(list_contacts())
if alias not in contacts.keys():
logger.error(f"No such alias '{alias}'. Use import_key first.")
return
self.peer_static = load_static_key(contacts[alias])
# Establish connection
self.peer = Peer(port, self.handle_data)
try:
self.peer.connect(host, port)
logger.info(f"Connected to {host}:{port}")
except Exception as e:
logger.error(f"Connection failed: {e}")
def do_handshake(self, arg):
"handshake Initiate XK handshake with the peer (initiator role)"
if not self.peer or not self.peer_static:
logger.error("Need to connect first and import peer key.")
return
# Create handshake as initiator
self.handshake = NoiseXK(initiator=True, peer_static=self.peer_static)
# Send first handshake message
msg = self.handshake.write()
self.peer.send(msg)
logger.info("Sent handshake message 1")
def handle_data(self, data: bytes):
"Callback for incoming raw data"
if self.handshake and not self.handshake.is_finished():
# In the middle of handshake
try:
_ = self.handshake.read(data)
if not self.handshake.is_finished():
# Send next handshake message
msg = self.handshake.write()
self.peer.send(msg)
logger.info("Exchanged handshake message")
else:
cs_s, cs_r = self.handshake.get_cipher_states()
self.messenger = Messenger(cs_s, cs_r)
logger.info("Handshake complete, cipher states ready")
return
except Exception as e:
logger.error(f"Handshake error: {e}")
return
# If no handshake object, data might be initial handshake for responder
if not self.handshake and self.peer_static:
# Passive responder: start handshake on first data
try:
self.handshake = NoiseXK(initiator=False, peer_static=self.peer_static)
_ = self.handshake.read(data)
# Send response
msg = self.handshake.write()
self.peer.send(msg)
# Continue handshake if needed
if not self.handshake.is_finished():
# Await next, then finalize
logger.info("Responder sent handshake message 2")
else:
cs_s, cs_r = self.handshake.get_cipher_states()
self.messenger = Messenger(cs_s, cs_r)
logger.info("Handshake complete (responder), cipher states ready")
return
except Exception as e:
logger.error(f"Responder handshake error: {e}")
return
# After handshake, treat as encrypted application data
if self.messenger:
try:
plaintext = self.messenger.decrypt(data)
logger.info(f"Received plaintext: {plaintext.decode(errors='ignore')}")
except Exception as e:
# If decryption fails, queue raw ciphertext
self.inbox.append(data)
idx = len(self.inbox) - 1
logger.warning(f"Failed decrypt, queued ciphertext at index {idx}")
else:
logger.warning("Received data but handshake not established.")
def do_send(self, arg):
"send <text> Encrypt and send a message to the peer"
if not self.messenger:
logger.error("Handshake not complete.")
return
payload = arg.encode()
ct = self.messenger.encrypt(payload)
self.peer.send(ct)
logger.info(f"Sent encrypted message ({len(ct)} bytes)")
def do_decrypt(self, arg):
"decrypt <index> Attempt to decrypt a queued ciphertext by index"
if not self.messenger:
logger.error("Handshake not complete.")
return
try:
idx = int(arg.strip())
ct = self.inbox[idx]
pt = self.messenger.decrypt(ct)
logger.info(f"Decrypted [#{idx}]: {pt.decode(errors='ignore')}")
except Exception as e:
logger.error(f"Decrypt error: {e}")
def do_show_state(self, arg):
"show_state Display current internal state"
state = {
"static_pub": self.static_pub.hex() if self.static_pub else None,
"peer_static": self.peer_static.hex() if self.peer_static else None,
"handshake": type(self.handshake).__name__ if self.handshake else None,
"handshake_finished": self.handshake.is_finished() if self.handshake else False,
"inbox_size": len(self.inbox),
}
print(json.dumps(state, indent=2))
def do_set_log_level(self, arg):
"set_log_level <DEBUG|INFO|WARNING|ERROR> Adjust CLI verbosity"
level = arg.strip().upper()
if level not in ("DEBUG","INFO","WARNING","ERROR"):
logger.error("Invalid level. Choose DEBUG, INFO, WARNING, ERROR.")
return
logger.setLevel(getattr(logging, level))
logger.info(f"Log level set to {level}")
def do_exit(self, arg):
"Exit the CLI"
logger.info("Closing connection and exiting.")
if self.peer:
self.peer.close()
return True
def do_EOF(self, arg):
return self.do_exit(arg)
if __name__ == "__main__":
# Adjust module search path
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from transport import Peer # ensure imports
cli = NoiseCLI()
cli.cmdloop()

View File

@ -0,0 +1,100 @@
"""
Module keys.py
Génération, import, export et gestion de clés statiques X25519 pour Noise_XK CLI.
Stocke les contacts dans ~/.noise_xk_cli/contacts.json
"""
import os
import json
from typing import Tuple, Dict, List
from cryptography.hazmat.primitives.asymmetric.x25519 import (
X25519PrivateKey,
X25519PublicKey,
)
from cryptography.hazmat.primitives import serialization
# Chemin vers le fichier de contacts
CONTACTS_FILE = os.path.expanduser("~/.noise_xk_cli/contacts.json")
def _ensure_contacts_file() -> None:
"""
Crée le répertoire et le fichier JSON si nécessaire.
"""
directory = os.path.dirname(CONTACTS_FILE)
os.makedirs(directory, exist_ok=True)
if not os.path.exists(CONTACTS_FILE):
with open(CONTACTS_FILE, 'w') as f:
json.dump({}, f)
def generate_static_key() -> Tuple[X25519PrivateKey, bytes]:
"""
Génère une paire de clés X25519 statique.
Returns:
(private_key, public_key_bytes)
"""
private_key = X25519PrivateKey.generate()
public_key = private_key.public_key()
public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
return private_key, public_bytes
def public_key_to_hex(public_bytes: bytes) -> str:
"""
Convertit bytes de clé publique en hex.
"""
return public_bytes.hex()
def load_static_key(hex_str: str) -> X25519PublicKey:
"""
Charge une clé publique X25519 depuis sa représentation hex.
"""
data = bytes.fromhex(hex_str)
if len(data) != 32:
raise ValueError("Clé publique X25519 doit faire 32 octets (raw)")
return X25519PublicKey.from_public_bytes(data)
def save_contact(alias: str, pub_hex: str) -> None:
"""
Sauvegarde un contact dans le fichier JSON sous l'alias donné.
"""
_ensure_contacts_file()
with open(CONTACTS_FILE, 'r+') as f:
data: Dict[str, str] = json.load(f)
data[alias] = pub_hex
f.seek(0)
json.dump(data, f, indent=2)
f.truncate()
def list_contacts() -> List[Tuple[str, str]]:
"""
Retourne la liste des contacts (alias, clé publique hex).
"""
_ensure_contacts_file()
with open(CONTACTS_FILE, 'r') as f:
data: Dict[str, str] = json.load(f)
return list(data.items())
def get_contact(alias: str) -> str:
"""
Récupère la clé publique hex pour un alias donné.
Raises:
KeyError: si l'alias n'existe pas
"""
_ensure_contacts_file()
with open(CONTACTS_FILE, 'r') as f:
data: Dict[str, str] = json.load(f)
if alias not in data:
raise KeyError(f"Alias '{alias}' introuvable dans les contacts")
return data[alias]

View File

@ -0,0 +1,69 @@
# noise_handshake.py
from dissononce.extras.meta.protocol.factory import NoiseProtocolFactory
# from dissononce.exceptions import HandshakeError
class NoiseXK:
"""
Wrapper around the Noise_XK_25519_ChaChaPoly_SHA256 handshake pattern,
using dissononce rev.34.
"""
def __init__(self, initiator: bool, peer_static: bytes):
"""
:param initiator: True if this party initiates the handshake.
:param peer_static: The peer's static public key (32 bytes X25519).
"""
# Instantiate the factory for XK over 25519/ChaChaPoly/SHA256
factory = NoiseProtocolFactory.from_name(
"Noise_XK_25519_ChaChaPoly_SHA256"
)
# Create the HandshakeState:
# - if initiator: supply 's' = peer_static
# - if responder: supply 'rs' = peer_static
self._hs = factory.create_handshakestate(
initiator=initiator,
s=peer_static if initiator else None,
rs=None if initiator else peer_static
)
def write(self, payload: bytes = b"") -> bytes:
"""
Generate the next handshake message to send.
:param payload: optional application data to embed (usually b"").
:return: the raw bytes of the handshake message.
"""
try:
msg = self._hs.write_message(payload)
return msg
except Exception as e:
raise Exception(f"Error writing handshake message: {e}") from e
def read(self, message: bytes) -> bytes:
"""
Process a received handshake message.
:param message: raw bytes received from the peer.
:return: any payload decrypted from the message (usually b"").
"""
try:
payload = self._hs.read_message(message)
return payload
except Exception as e:
raise Exception(f"Error reading handshake message: {e}") from e
def is_finished(self) -> bool:
"""
:return: True if the handshake is complete and cipher-states are ready.
"""
return self._hs.is_finished()
def get_cipher_states(self):
"""
Split the HandshakeState into two CipherState objects once the handshake is done.
:return: (cs_send, cs_recv)
:raises: HandshakeError if called before handshake completion.
"""
if not self._hs.is_finished():
raise Exception("Cannot split cipher-states before handshake is finished")
return self._hs.split()

View File

@ -0,0 +1,114 @@
"""
Module transport.py
Abstraction pair-à-pair TCP basique pour CLI Noise_XK.
Chaque instance peut écouter sur un port et/ou se connecter à un pair.
"""
import socket
import threading
from typing import Callable, Optional
class Peer:
"""
Peer TCP abstrait : écoute et connexion à un seul pair.
on_data(data: bytes) est appelé à la réception de données.
"""
def __init__(self, port: int, on_data: Callable[[bytes], None], listen: bool = True):
self.port = port
self.on_data = on_data
self.server_socket: Optional[socket.socket] = None
self.conn_socket: Optional[socket.socket] = None
self.alive = False
if listen:
self.start_listen()
def start_listen(self) -> None:
"""
Démarre le socket serveur et la boucle d'accept.
"""
if self.server_socket:
return
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('0.0.0.0', self.port))
sock.listen(1)
self.server_socket = sock
self.alive = True
threading.Thread(target=self._accept_loop, daemon=True).start()
def _accept_loop(self) -> None:
while self.alive:
try:
client_sock, addr = self.server_socket.accept()
# Fermer connexion précédente si existante
if self.conn_socket:
self.conn_socket.close()
self.conn_socket = client_sock
threading.Thread(
target=self._read_loop,
args=(client_sock,),
daemon=True
).start()
except OSError:
break
def connect(self, host: str, port: int) -> None:
"""
Se connecte à un peer à l'adresse spécifiée.
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
# Fermer connexion précédente si existante
if self.conn_socket:
self.conn_socket.close()
self.conn_socket = sock
self.alive = True
threading.Thread(
target=self._read_loop,
args=(sock,),
daemon=True
).start()
def _read_loop(self, sock: socket.socket) -> None:
"""
Boucle de lecture asynchrone sur la socket.
"""
while self.alive:
try:
data = sock.recv(4096)
if not data:
break
self.on_data(data)
except OSError:
break
try:
sock.close()
except OSError:
pass
def send(self, data: bytes) -> None:
"""
Envoie de données au peer (connexion active).
"""
if not self.conn_socket:
raise ConnectionError("Aucune connexion active pour envoyer")
self.conn_socket.sendall(data)
def close(self) -> None:
"""
Ferme sockets d'écoute et de connexion.
"""
self.alive = False
if self.server_socket:
try:
self.server_socket.close()
except OSError:
pass
self.server_socket = None
if self.conn_socket:
try:
self.conn_socket.close()
except OSError:
pass
self.conn_socket = None

View File

@ -1,430 +0,0 @@
import time
import threading
import queue
from typing import Optional, Dict, Any, List, Callable, Tuple
# ANSI colors for logging
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"
class AutoModeConfig:
"""Configuration parameters for the automatic mode behavior."""
def __init__(self):
# Ping behavior
self.ping_response_accept = True # Whether to accept incoming pings
self.ping_auto_initiate = False # Whether to initiate pings when connected
self.ping_retry_count = 3 # Number of ping retries
self.ping_retry_delay = 5.0 # Seconds between ping retries
self.ping_timeout = 10.0 # Seconds to wait for ping response
self.preferred_cipher = 0 # 0=AES-GCM, 1=ChaCha20-Poly1305
# Handshake behavior
self.handshake_retry_count = 3 # Number of handshake retries
self.handshake_retry_delay = 5.0 # Seconds between handshake retries
self.handshake_timeout = 10.0 # Seconds to wait for handshake
# Messaging behavior
self.auto_message_enabled = False # Whether to auto-send messages
self.message_interval = 10.0 # Seconds between auto messages
self.message_content = "Hello, secure world!" # Default message
# General behavior
self.active_mode = False # If true, initiates protocol instead of waiting
class AutoMode:
"""
Manages automated behavior for the Icing protocol.
Handles automatic progression through the protocol stages:
1. Connection setup
2. Ping/discovery
3. Key exchange
4. Encrypted communication
"""
def __init__(self, protocol_interface):
"""
Initialize the AutoMode manager.
Args:
protocol_interface: An object implementing the required protocol methods
"""
self.protocol = protocol_interface
self.config = AutoModeConfig()
self.active = False
self.state = "idle"
# Message queue for automated sending
self.message_queue = queue.Queue()
# Tracking variables
self.ping_attempts = 0
self.handshake_attempts = 0
self.last_action_time = 0
self.timer_tasks = [] # List of active timer tasks (for cleanup)
def start(self):
"""Start the automatic mode."""
if self.active:
return
self.active = True
self.state = "idle"
self.ping_attempts = 0
self.handshake_attempts = 0
self.last_action_time = time.time()
self._log_info("Automatic mode started")
# Start in active mode if configured
if self.config.active_mode and self.protocol.connections:
self._start_ping_sequence()
def stop(self):
"""Stop the automatic mode and clean up any pending tasks."""
if not self.active:
return
# Cancel any pending timers
for timer in self.timer_tasks:
if timer.is_alive():
timer.cancel()
self.timer_tasks = []
self.active = False
self.state = "idle"
self._log_info("Automatic mode stopped")
def handle_connection_established(self):
"""Called when a new connection is established."""
if not self.active:
return
self._log_info("Connection established")
# If in active mode, start pinging
if self.config.active_mode:
self._start_ping_sequence()
def handle_ping_received(self, index: int):
"""
Handle a received ping request.
Args:
index: Index of the ping request in the protocol's inbound message queue
"""
if not self.active or not self._is_valid_message_index(index):
return
self._log_info(f"Ping request received (index={index})")
# Automatically respond to ping if configured to accept
if self.config.ping_response_accept:
self._log_info(f"Auto-responding to ping with accept={self.config.ping_response_accept}")
try:
# Schedule the response with a small delay to simulate real behavior
timer = threading.Timer(0.5, self._respond_to_ping, args=[index])
timer.daemon = True
timer.start()
self.timer_tasks.append(timer)
except Exception as e:
self._log_error(f"Failed to auto-respond to ping: {e}")
def handle_ping_response_received(self, accepted: bool):
"""
Handle a received ping response.
Args:
accepted: Whether the ping was accepted
"""
if not self.active:
return
self.ping_attempts = 0 # Reset ping attempts counter
if accepted:
self._log_info("Ping accepted! Proceeding with handshake")
# Send handshake if not already done
if self.state != "handshake_sent":
self._ensure_ephemeral_keys()
self._start_handshake_sequence()
else:
self._log_info("Ping rejected by peer. Stopping auto-protocol sequence.")
self.state = "idle"
def handle_handshake_received(self, index: int):
"""
Handle a received handshake.
Args:
index: Index of the handshake in the protocol's inbound message queue
"""
if not self.active or not self._is_valid_message_index(index):
return
self._log_info(f"Handshake received (index={index})")
try:
# Ensure we have ephemeral keys
self._ensure_ephemeral_keys()
# Process the handshake (compute ECDH)
self.protocol.generate_ecdhe(index)
# Derive HKDF key
self.protocol.derive_hkdf()
# If we haven't sent our handshake yet, send it
if self.state != "handshake_sent":
timer = threading.Timer(0.5, self.protocol.send_handshake)
timer.daemon = True
timer.start()
self.timer_tasks.append(timer)
self.state = "handshake_sent"
else:
self.state = "key_exchange_complete"
# Start sending queued messages if auto messaging is enabled
if self.config.auto_message_enabled:
self._start_message_sequence()
except Exception as e:
self._log_error(f"Failed to process handshake: {e}")
def handle_encrypted_received(self, index: int):
"""
Handle a received encrypted message.
Args:
index: Index of the encrypted message in the protocol's inbound message queue
"""
if not self.active or not self._is_valid_message_index(index):
return
# Try to decrypt automatically
try:
plaintext = self.protocol.decrypt_received_message(index)
self._log_info(f"Auto-decrypted message: {plaintext}")
except Exception as e:
self._log_error(f"Failed to auto-decrypt message: {e}")
def queue_message(self, message: str):
"""
Add a message to the auto-send queue.
Args:
message: Message text to send
"""
self.message_queue.put(message)
self._log_info(f"Message queued for sending: {message}")
# If we're in the right state, start sending messages
if self.active and self.state == "key_exchange_complete" and self.config.auto_message_enabled:
self._process_message_queue()
def _start_ping_sequence(self):
"""Start the ping sequence to discover the peer."""
if self.ping_attempts >= self.config.ping_retry_count:
self._log_warning(f"Maximum ping attempts ({self.config.ping_retry_count}) reached")
self.state = "idle"
return
self.state = "pinging"
self.ping_attempts += 1
self._log_info(f"Sending ping request (attempt {self.ping_attempts}/{self.config.ping_retry_count})")
try:
self.protocol.send_ping_request(self.config.preferred_cipher)
self.last_action_time = time.time()
# Schedule next ping attempt if needed
timer = threading.Timer(
self.config.ping_retry_delay,
self._check_ping_response
)
timer.daemon = True
timer.start()
self.timer_tasks.append(timer)
except Exception as e:
self._log_error(f"Failed to send ping: {e}")
def _check_ping_response(self):
"""Check if we got a ping response, retry if not."""
if not self.active or self.state != "pinging":
return
# If we've waited long enough for a response, retry
if time.time() - self.last_action_time >= self.config.ping_timeout:
self._log_warning("No ping response received, retrying")
self._start_ping_sequence()
def _respond_to_ping(self, index: int):
"""
Respond to a ping request.
Args:
index: Index of the ping request in the inbound messages
"""
if not self.active or not self._is_valid_message_index(index):
return
try:
answer = 1 if self.config.ping_response_accept else 0
self.protocol.respond_to_ping(index, answer)
if answer == 1:
# If we accepted, we should expect a handshake
self.state = "accepted_ping"
self._ensure_ephemeral_keys()
# Set a timer to send our handshake if we don't receive one
timer = threading.Timer(
self.config.handshake_timeout,
self._check_handshake_received
)
timer.daemon = True
timer.start()
self.timer_tasks.append(timer)
self.last_action_time = time.time()
except Exception as e:
self._log_error(f"Failed to respond to ping: {e}")
def _check_handshake_received(self):
"""Check if we've received a handshake after accepting a ping."""
if not self.active or self.state != "accepted_ping":
return
# If we've waited long enough and haven't received a handshake, initiate one
if time.time() - self.last_action_time >= self.config.handshake_timeout:
self._log_warning("No handshake received after accepting ping, initiating handshake")
self._start_handshake_sequence()
def _start_handshake_sequence(self):
"""Start the handshake sequence."""
if self.handshake_attempts >= self.config.handshake_retry_count:
self._log_warning(f"Maximum handshake attempts ({self.config.handshake_retry_count}) reached")
self.state = "idle"
return
self.state = "handshake_sent"
self.handshake_attempts += 1
self._log_info(f"Sending handshake (attempt {self.handshake_attempts}/{self.config.handshake_retry_count})")
try:
self.protocol.send_handshake()
self.last_action_time = time.time()
# Schedule handshake retry check
timer = threading.Timer(
self.config.handshake_retry_delay,
self._check_handshake_response
)
timer.daemon = True
timer.start()
self.timer_tasks.append(timer)
except Exception as e:
self._log_error(f"Failed to send handshake: {e}")
def _check_handshake_response(self):
"""Check if we've completed the key exchange, retry handshake if not."""
if not self.active or self.state != "handshake_sent":
return
# If we've waited long enough for a response, retry
if time.time() - self.last_action_time >= self.config.handshake_timeout:
self._log_warning("No handshake response received, retrying")
self._start_handshake_sequence()
def _start_message_sequence(self):
"""Start the automated message sending sequence."""
if not self.config.auto_message_enabled:
return
self._log_info("Starting automated message sequence")
# Add the default message if queue is empty
if self.message_queue.empty():
self.message_queue.put(self.config.message_content)
# Start processing the queue
self._process_message_queue()
def _process_message_queue(self):
"""Process messages in the queue and send them."""
if not self.active or self.state != "key_exchange_complete" or not self.config.auto_message_enabled:
return
if not self.message_queue.empty():
message = self.message_queue.get()
self._log_info(f"Sending queued message: {message}")
try:
self.protocol.send_encrypted_message(message)
# Schedule next message send
timer = threading.Timer(
self.config.message_interval,
self._process_message_queue
)
timer.daemon = True
timer.start()
self.timer_tasks.append(timer)
except Exception as e:
self._log_error(f"Failed to send queued message: {e}")
# Put the message back in the queue
self.message_queue.put(message)
def _ensure_ephemeral_keys(self):
"""Ensure ephemeral keys are generated if needed."""
if not hasattr(self.protocol, 'ephemeral_pubkey') or self.protocol.ephemeral_pubkey is None:
self._log_info("Generating ephemeral keys")
self.protocol.generate_ephemeral_keys()
def _is_valid_message_index(self, index: int) -> bool:
"""
Check if a message index is valid in the protocol's inbound_messages queue.
Args:
index: The index to check
Returns:
bool: True if the index is valid, False otherwise
"""
if not hasattr(self.protocol, 'inbound_messages'):
self._log_error("Protocol has no inbound_messages attribute")
return False
if index < 0 or index >= len(self.protocol.inbound_messages):
self._log_error(f"Invalid message index: {index}")
return False
return True
# Helper methods for logging
def _log_info(self, message: str):
print(f"{BLUE}[AUTO]{RESET} {message}")
if hasattr(self, 'verbose_logging') and self.verbose_logging:
state_info = f"(state={self.state})"
if 'pinging' in self.state and hasattr(self, 'ping_attempts'):
state_info += f", attempts={self.ping_attempts}/{self.config.ping_retry_count}"
elif 'handshake' in self.state and hasattr(self, 'handshake_attempts'):
state_info += f", attempts={self.handshake_attempts}/{self.config.handshake_retry_count}"
print(f"{BLUE}[AUTO-DETAIL]{RESET} {state_info}")
def _log_warning(self, message: str):
print(f"{YELLOW}[AUTO-WARN]{RESET} {message}")
if hasattr(self, 'verbose_logging') and self.verbose_logging:
timer_info = f"Active timers: {len(self.timer_tasks)}"
print(f"{YELLOW}[AUTO-WARN-DETAIL]{RESET} {timer_info}")
def _log_error(self, message: str):
print(f"{RED}[AUTO-ERROR]{RESET} {message}")
if hasattr(self, 'verbose_logging') and self.verbose_logging:
print(f"{RED}[AUTO-ERROR-DETAIL]{RESET} Current state: {self.state}, Active: {self.active}")

View File

@ -1,328 +0,0 @@
import sys
import argparse
import shlex
from protocol import IcingProtocol
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
MAGENTA = "\033[95m"
CYAN = "\033[96m"
RESET = "\033[0m"
def print_help():
"""Display all available commands."""
print(f"\n{YELLOW}=== Available Commands ==={RESET}")
print(f"\n{CYAN}Basic Protocol Commands:{RESET}")
print(" help - Show this help message")
print(" peer_id <hex_pubkey> - Set peer identity public key")
print(" connect <port> - Connect to a peer at the specified port")
print(" show_state - Display current protocol state")
print(" exit - Exit the program")
print(f"\n{CYAN}Manual Protocol Operation:{RESET}")
print(" generate_ephemeral_keys - Generate ephemeral ECDH keys")
print(" send_ping [cipher] - Send PING request (cipher: 0=AES-GCM, 1=ChaCha20-Poly1305, default: 0)")
print(" respond_ping <index> <0|1> - Respond to a PING (0=reject, 1=accept)")
print(" send_handshake - Send handshake with ephemeral keys")
print(" generate_ecdhe <index> - Process handshake at specified index")
print(" derive_hkdf - Derive encryption key using HKDF")
print(" send_encrypted <plaintext> - Encrypt and send a message")
print(" decrypt <index> - Decrypt received message at index")
print(f"\n{CYAN}Automatic Mode Commands:{RESET}")
print(" auto start - Start automatic mode")
print(" auto stop - Stop automatic mode")
print(" auto status - Show current auto mode status and configuration")
print(" auto config <param> <value> - Configure auto mode parameters")
print(" auto config list - Show all configurable parameters")
print(" auto message <text> - Queue message for automatic sending")
print(" auto passive - Configure as passive peer (responds to pings but doesn't initiate)")
print(" auto active - Configure as active peer (initiates protocol)")
print(" auto log - Toggle detailed logging for auto mode")
print(f"\n{CYAN}Debugging Commands:{RESET}")
print(" debug_message <index> - Display detailed information about a message in the queue")
print(f"\n{CYAN}Legacy Commands:{RESET}")
print(" auto_responder <on|off> - Enable/disable legacy auto responder (deprecated)")
def main():
protocol = IcingProtocol()
print(f"{YELLOW}\n======================================")
print(" Icing Protocol - Secure Communication ")
print("======================================\n" + RESET)
print(f"Listening on port: {protocol.local_port}")
print(f"Your identity public key (hex): {protocol.identity_pubkey.hex()}")
print_help()
while True:
try:
line = input(f"{MAGENTA}Cmd>{RESET} ").strip()
except EOFError:
break
if not line:
continue
parts = shlex.split(line) # Handle quoted arguments properly
cmd = parts[0].lower()
try:
# Basic commands
if cmd == "exit":
protocol.stop()
break
elif cmd == "help":
print_help()
elif cmd == "show_state":
protocol.show_state()
elif cmd == "peer_id":
if len(parts) != 2:
print(f"{RED}[ERROR]{RESET} Usage: peer_id <hex_pubkey>")
continue
try:
protocol.set_peer_identity(parts[1])
except ValueError as e:
print(f"{RED}[ERROR]{RESET} Invalid public key: {e}")
elif cmd == "connect":
if len(parts) != 2:
print(f"{RED}[ERROR]{RESET} Usage: connect <port>")
continue
try:
port = int(parts[1])
protocol.connect_to_peer(port)
except ValueError:
print(f"{RED}[ERROR]{RESET} Invalid port number.")
except Exception as e:
print(f"{RED}[ERROR]{RESET} Connection failed: {e}")
# Manual protocol operation
elif cmd == "generate_ephemeral_keys":
protocol.generate_ephemeral_keys()
elif cmd == "send_ping":
# Optional cipher parameter (0 = AES-GCM, 1 = ChaCha20-Poly1305)
cipher = 0 # Default to AES-GCM
if len(parts) >= 2:
try:
cipher = int(parts[1])
if cipher not in (0, 1):
print(f"{YELLOW}[WARNING]{RESET} Unsupported cipher code {cipher}. Using AES-GCM (0).")
cipher = 0
except ValueError:
print(f"{YELLOW}[WARNING]{RESET} Invalid cipher code. Using AES-GCM (0).")
protocol.send_ping_request(cipher)
elif cmd == "send_handshake":
protocol.send_handshake()
elif cmd == "respond_ping":
if len(parts) != 3:
print(f"{RED}[ERROR]{RESET} Usage: respond_ping <index> <0|1>")
continue
try:
idx = int(parts[1])
answer = int(parts[2])
if answer not in (0, 1):
print(f"{RED}[ERROR]{RESET} Answer must be 0 (reject) or 1 (accept).")
continue
protocol.respond_to_ping(idx, answer)
except ValueError:
print(f"{RED}[ERROR]{RESET} Index and answer must be integers.")
except Exception as e:
print(f"{RED}[ERROR]{RESET} Failed to respond to ping: {e}")
elif cmd == "generate_ecdhe":
if len(parts) != 2:
print(f"{RED}[ERROR]{RESET} Usage: generate_ecdhe <index>")
continue
try:
idx = int(parts[1])
protocol.generate_ecdhe(idx)
except ValueError:
print(f"{RED}[ERROR]{RESET} Index must be an integer.")
except Exception as e:
print(f"{RED}[ERROR]{RESET} Failed to process handshake: {e}")
elif cmd == "derive_hkdf":
try:
protocol.derive_hkdf()
except Exception as e:
print(f"{RED}[ERROR]{RESET} Failed to derive HKDF key: {e}")
elif cmd == "send_encrypted":
if len(parts) < 2:
print(f"{RED}[ERROR]{RESET} Usage: send_encrypted <plaintext>")
continue
plaintext = " ".join(parts[1:])
try:
protocol.send_encrypted_message(plaintext)
except Exception as e:
print(f"{RED}[ERROR]{RESET} Failed to send encrypted message: {e}")
elif cmd == "decrypt":
if len(parts) != 2:
print(f"{RED}[ERROR]{RESET} Usage: decrypt <index>")
continue
try:
idx = int(parts[1])
protocol.decrypt_received_message(idx)
except ValueError:
print(f"{RED}[ERROR]{RESET} Index must be an integer.")
except Exception as e:
print(f"{RED}[ERROR]{RESET} Failed to decrypt message: {e}")
# Debugging commands
elif cmd == "debug_message":
if len(parts) != 2:
print(f"{RED}[ERROR]{RESET} Usage: debug_message <index>")
continue
try:
idx = int(parts[1])
protocol.debug_message(idx)
except ValueError:
print(f"{RED}[ERROR]{RESET} Index must be an integer.")
except Exception as e:
print(f"{RED}[ERROR]{RESET} Failed to debug message: {e}")
# Automatic mode commands
elif cmd == "auto":
if len(parts) < 2:
print(f"{RED}[ERROR]{RESET} Usage: auto <command> [options]")
print("Available commands: start, stop, status, config, message, passive, active")
continue
subcmd = parts[1].lower()
if subcmd == "start":
protocol.start_auto_mode()
print(f"{GREEN}[AUTO]{RESET} Automatic mode started")
elif subcmd == "stop":
protocol.stop_auto_mode()
print(f"{GREEN}[AUTO]{RESET} Automatic mode stopped")
elif subcmd == "status":
config = protocol.get_auto_mode_config()
print(f"{YELLOW}=== Auto Mode Status ==={RESET}")
print(f"Active: {protocol.auto_mode.active}")
print(f"State: {protocol.auto_mode.state}")
print(f"\n{YELLOW}--- Configuration ---{RESET}")
for key, value in vars(config).items():
print(f" {key}: {value}")
elif subcmd == "config":
if len(parts) < 3:
print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value> or auto config list")
continue
if parts[2].lower() == "list":
config = protocol.get_auto_mode_config()
print(f"{YELLOW}=== Auto Mode Configuration Parameters ==={RESET}")
for key, value in vars(config).items():
print(f" {key} ({type(value).__name__}): {value}")
continue
if len(parts) != 4:
print(f"{RED}[ERROR]{RESET} Usage: auto config <param> <value>")
continue
param = parts[2]
value_str = parts[3]
# Convert the string value to the appropriate type
config = protocol.get_auto_mode_config()
if not hasattr(config, param):
print(f"{RED}[ERROR]{RESET} Unknown parameter: {param}")
print("Use 'auto config list' to see all available parameters")
continue
current_value = getattr(config, param)
try:
if isinstance(current_value, bool):
if value_str.lower() in ("true", "yes", "on", "1"):
value = True
elif value_str.lower() in ("false", "no", "off", "0"):
value = False
else:
raise ValueError(f"Boolean value must be true/false/yes/no/on/off/1/0")
elif isinstance(current_value, int):
value = int(value_str)
elif isinstance(current_value, float):
value = float(value_str)
elif isinstance(current_value, str):
value = value_str
else:
value = value_str # Default to string
protocol.configure_auto_mode(**{param: value})
print(f"{GREEN}[AUTO]{RESET} Set {param} = {value}")
except ValueError as e:
print(f"{RED}[ERROR]{RESET} Invalid value for {param}: {e}")
elif subcmd == "message":
if len(parts) < 3:
print(f"{RED}[ERROR]{RESET} Usage: auto message <text>")
continue
message = " ".join(parts[2:])
protocol.queue_auto_message(message)
print(f"{GREEN}[AUTO]{RESET} Message queued for sending: {message}")
elif subcmd == "passive":
# Configure as passive peer (responds but doesn't initiate)
protocol.configure_auto_mode(
ping_response_accept=True,
ping_auto_initiate=False,
active_mode=False
)
print(f"{GREEN}[AUTO]{RESET} Configured as passive peer")
elif subcmd == "active":
# Configure as active peer (initiates protocol)
protocol.configure_auto_mode(
ping_response_accept=True,
ping_auto_initiate=True,
active_mode=True
)
print(f"{GREEN}[AUTO]{RESET} Configured as active peer")
else:
print(f"{RED}[ERROR]{RESET} Unknown auto mode command: {subcmd}")
print("Available commands: start, stop, status, config, message, passive, active")
# Legacy commands
elif cmd == "auto_responder":
if len(parts) != 2:
print(f"{RED}[ERROR]{RESET} Usage: auto_responder <on|off>")
continue
val = parts[1].lower()
if val not in ("on", "off"):
print(f"{RED}[ERROR]{RESET} Value must be 'on' or 'off'.")
continue
protocol.enable_auto_responder(val == "on")
print(f"{YELLOW}[WARNING]{RESET} Using legacy auto responder. Consider using 'auto' commands instead.")
else:
print(f"{RED}[ERROR]{RESET} Unknown command: {cmd}")
print("Type 'help' for a list of available commands.")
except Exception as e:
print(f"{RED}[ERROR]{RESET} Command failed: {e}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\nExiting...")
except Exception as e:
print(f"{RED}[FATAL ERROR]{RESET} {e}")
sys.exit(1)

View File

@ -1,165 +0,0 @@
import os
from typing import Tuple
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, utils
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature
def generate_identity_keys() -> Tuple[ec.EllipticCurvePrivateKey, bytes]:
"""
Generate an ECDSA (P-256) identity key pair.
Returns:
Tuple containing:
- private_key: EllipticCurvePrivateKey object
- public_key_bytes: Raw x||y format (64 bytes, 512 bits)
"""
private_key = ec.generate_private_key(ec.SECP256R1())
public_numbers = private_key.public_key().public_numbers()
x_bytes = public_numbers.x.to_bytes(32, byteorder='big')
y_bytes = public_numbers.y.to_bytes(32, byteorder='big')
pubkey_bytes = x_bytes + y_bytes # 64 bytes total
return private_key, pubkey_bytes
def load_peer_identity_key(pubkey_bytes: bytes) -> ec.EllipticCurvePublicKey:
"""
Convert a raw public key (64 bytes, x||y format) to a cryptography public key object.
Args:
pubkey_bytes: Raw 64-byte public key (x||y format)
Returns:
EllipticCurvePublicKey object
Raises:
ValueError: If the pubkey_bytes is not exactly 64 bytes
"""
if len(pubkey_bytes) != 64:
raise ValueError("Peer identity pubkey must be exactly 64 bytes (x||y).")
x_int = int.from_bytes(pubkey_bytes[:32], byteorder='big')
y_int = int.from_bytes(pubkey_bytes[32:], byteorder='big')
public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1())
return public_numbers.public_key()
def sign_data(private_key: ec.EllipticCurvePrivateKey, data: bytes) -> bytes:
"""
Sign data with ECDSA using a P-256 private key.
Args:
private_key: EllipticCurvePrivateKey for signing
data: Bytes to sign
Returns:
DER-encoded signature (variable length, up to ~70-72 bytes)
"""
signature = private_key.sign(data, ec.ECDSA(hashes.SHA256()))
return signature
def verify_signature(public_key: ec.EllipticCurvePublicKey, signature: bytes, data: bytes) -> bool:
"""
Verify a DER-encoded ECDSA signature.
Args:
public_key: EllipticCurvePublicKey for verification
signature: DER-encoded signature
data: Original signed data
Returns:
True if signature is valid, False otherwise
"""
try:
public_key.verify(signature, data, ec.ECDSA(hashes.SHA256()))
return True
except InvalidSignature:
return False
def get_ephemeral_keypair() -> Tuple[ec.EllipticCurvePrivateKey, bytes]:
"""
Generate an ephemeral ECDH key pair (P-256).
Returns:
Tuple containing:
- private_key: EllipticCurvePrivateKey object
- pubkey_bytes: Raw x||y format (64 bytes, 512 bits)
"""
private_key = ec.generate_private_key(ec.SECP256R1())
numbers = private_key.public_key().public_numbers()
x_bytes = numbers.x.to_bytes(32, 'big')
y_bytes = numbers.y.to_bytes(32, 'big')
return private_key, x_bytes + y_bytes # 64 bytes total
def compute_ecdh_shared_key(private_key: ec.EllipticCurvePrivateKey, peer_pubkey_bytes: bytes) -> bytes:
"""
Compute a shared secret using ECDH.
Args:
private_key: Local ECDH private key
peer_pubkey_bytes: Peer's ephemeral public key (64 bytes, raw x||y format)
Returns:
Shared secret bytes
Raises:
ValueError: If peer_pubkey_bytes is not 64 bytes
"""
if len(peer_pubkey_bytes) != 64:
raise ValueError("Peer public key must be 64 bytes (x||y format)")
x_int = int.from_bytes(peer_pubkey_bytes[:32], 'big')
y_int = int.from_bytes(peer_pubkey_bytes[32:], 'big')
# Create public key object from raw components
peer_public_numbers = ec.EllipticCurvePublicNumbers(x_int, y_int, ec.SECP256R1())
peer_public_key = peer_public_numbers.public_key()
# Perform key exchange
shared_key = private_key.exchange(ec.ECDH(), peer_public_key)
return shared_key
def der_to_raw(der_sig: bytes) -> bytes:
"""
Convert a DER-encoded ECDSA signature to a raw 64-byte signature (r||s).
Args:
der_sig: DER-encoded signature
Returns:
Raw 64-byte signature (r||s format), with each component padded to 32 bytes
"""
r, s = decode_dss_signature(der_sig)
r_bytes = r.to_bytes(32, byteorder='big')
s_bytes = s.to_bytes(32, byteorder='big')
return r_bytes + s_bytes
def raw_signature_to_der(raw_sig: bytes) -> bytes:
"""
Convert a raw signature (64 bytes, concatenated r||s) to DER-encoded signature.
Args:
raw_sig: Raw 64-byte signature (r||s format)
Returns:
DER-encoded signature
Raises:
ValueError: If raw_sig is not 64 bytes
"""
if len(raw_sig) != 64:
raise ValueError("Raw signature must be 64 bytes (r||s).")
r = int.from_bytes(raw_sig[:32], 'big')
s = int.from_bytes(raw_sig[32:], 'big')
return encode_dss_signature(r, s)

View File

@ -1,263 +0,0 @@
import os
import struct
from typing import Optional, Tuple
from cryptography.hazmat.primitives.ciphers.aead import AESGCM, ChaCha20Poly1305
class MessageHeader:
"""
Header of an encrypted message (18 bytes total):
Clear Text Section (4 bytes):
- flag: 16 bits (0xBEEF by default)
- data_len: 16 bits (length of encrypted payload excluding tag)
Associated Data (14 bytes):
- retry: 8 bits (retry counter)
- connection_status: 4 bits (e.g., CRC required) + 4 bits padding
- iv/messageID: 96 bits (12 bytes)
"""
def __init__(self, flag: int, data_len: int, retry: int, connection_status: int, iv: bytes):
if not (0 <= flag < 65536):
raise ValueError("Flag must fit in 16 bits (0..65535)")
if not (0 <= data_len < 65536):
raise ValueError("Data length must fit in 16 bits (0..65535)")
if not (0 <= retry < 256):
raise ValueError("Retry must fit in 8 bits (0..255)")
if not (0 <= connection_status < 16):
raise ValueError("Connection status must fit in 4 bits (0..15)")
if len(iv) != 12:
raise ValueError("IV must be 12 bytes (96 bits)")
self.flag = flag # 16 bits
self.data_len = data_len # 16 bits
self.retry = retry # 8 bits
self.connection_status = connection_status # 4 bits
self.iv = iv # 96 bits (12 bytes)
def pack(self) -> bytes:
"""Pack header into 18 bytes."""
# Pack flag and data_len (4 bytes)
header = struct.pack('>H H', self.flag, self.data_len)
# Pack retry and connection_status (2 bytes)
# connection_status in high 4 bits of second byte, 4 bits padding as zero
ad_byte = (self.connection_status & 0x0F) << 4
ad_packed = struct.pack('>B B', self.retry, ad_byte)
# Append IV (12 bytes)
return header + ad_packed + self.iv
def get_associated_data(self) -> bytes:
"""Get the associated data for AEAD encryption (retry, conn_status, iv)."""
# Pack retry and connection_status
ad_byte = (self.connection_status & 0x0F) << 4
ad_packed = struct.pack('>B B', self.retry, ad_byte)
# Append IV
return ad_packed + self.iv
@classmethod
def unpack(cls, data: bytes) -> 'MessageHeader':
"""Unpack 18 bytes into a MessageHeader object."""
if len(data) < 18:
raise ValueError(f"Header data too short: {len(data)} bytes, expected 18")
flag, data_len = struct.unpack('>H H', data[:4])
retry, ad_byte = struct.unpack('>B B', data[4:6])
connection_status = (ad_byte >> 4) & 0x0F
iv = data[6:18]
return cls(flag, data_len, retry, connection_status, iv)
class EncryptedMessage:
"""
Encrypted message packet format:
- Header (18 bytes):
* flag: 16 bits
* data_len: 16 bits
* retry: 8 bits
* connection_status: 4 bits (+ 4 bits padding)
* iv/messageID: 96 bits (12 bytes)
- Payload: variable length encrypted data
- Footer:
* Authentication tag: 128 bits (16 bytes)
* CRC32: 32 bits (4 bytes) - optional, based on connection_status
"""
def __init__(self, plaintext: bytes, key: bytes, flag: int = 0xBEEF,
retry: int = 0, connection_status: int = 0, iv: bytes = None,
cipher_type: int = 0):
self.plaintext = plaintext
self.key = key
self.flag = flag
self.retry = retry
self.connection_status = connection_status
self.iv = iv or generate_iv(initial=True)
self.cipher_type = cipher_type # 0 = AES-256-GCM, 1 = ChaCha20-Poly1305
# Will be set after encryption
self.ciphertext = None
self.tag = None
self.header = None
def encrypt(self) -> bytes:
"""Encrypt the plaintext and return the full encrypted message."""
# Create header with correct data_len (which will be set after encryption)
self.header = MessageHeader(
flag=self.flag,
data_len=0, # Will be updated after encryption
retry=self.retry,
connection_status=self.connection_status,
iv=self.iv
)
# Get associated data for AEAD
aad = self.header.get_associated_data()
# Encrypt using the appropriate cipher
if self.cipher_type == 0: # AES-256-GCM
cipher = AESGCM(self.key)
ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad)
elif self.cipher_type == 1: # ChaCha20-Poly1305
cipher = ChaCha20Poly1305(self.key)
ciphertext_with_tag = cipher.encrypt(self.iv, self.plaintext, aad)
else:
raise ValueError(f"Unsupported cipher type: {self.cipher_type}")
# Extract ciphertext and tag
self.tag = ciphertext_with_tag[-16:]
self.ciphertext = ciphertext_with_tag[:-16]
# Update header with actual data length
self.header.data_len = len(self.ciphertext)
# Pack everything together
packed_header = self.header.pack()
# Check if CRC is required (based on connection_status)
if self.connection_status & 0x01: # Lowest bit indicates CRC required
import zlib
# Compute CRC32 of header + ciphertext + tag
crc = zlib.crc32(packed_header + self.ciphertext + self.tag) & 0xffffffff
crc_bytes = struct.pack('>I', crc)
return packed_header + self.ciphertext + self.tag + crc_bytes
else:
return packed_header + self.ciphertext + self.tag
@classmethod
def decrypt(cls, data: bytes, key: bytes, cipher_type: int = 0) -> Tuple[bytes, MessageHeader]:
"""
Decrypt an encrypted message and return the plaintext and header.
Args:
data: The full encrypted message
key: The encryption key
cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305
Returns:
Tuple of (plaintext, header)
"""
if len(data) < 18 + 16: # Header + minimum tag size
raise ValueError("Message too short")
# Extract header
header_bytes = data[:18]
header = MessageHeader.unpack(header_bytes)
# Get ciphertext and tag
data_len = header.data_len
ciphertext_start = 18
ciphertext_end = ciphertext_start + data_len
if ciphertext_end + 16 > len(data):
raise ValueError("Message length does not match header's data_len")
ciphertext = data[ciphertext_start:ciphertext_end]
tag = data[ciphertext_end:ciphertext_end + 16]
# Get associated data for AEAD
aad = header.get_associated_data()
# Combine ciphertext and tag for decryption
ciphertext_with_tag = ciphertext + tag
# Decrypt using the appropriate cipher
try:
if cipher_type == 0: # AES-256-GCM
cipher = AESGCM(key)
plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad)
elif cipher_type == 1: # ChaCha20-Poly1305
cipher = ChaCha20Poly1305(key)
plaintext = cipher.decrypt(header.iv, ciphertext_with_tag, aad)
else:
raise ValueError(f"Unsupported cipher type: {cipher_type}")
return plaintext, header
except Exception as e:
raise ValueError(f"Decryption failed: {e}")
def generate_iv(initial: bool = False, previous_iv: bytes = None) -> bytes:
"""
Generate a 96-bit IV (12 bytes).
Args:
initial: If True, return a random IV
previous_iv: The previous IV to increment
Returns:
A new IV
"""
if initial or previous_iv is None:
return os.urandom(12) # 96 bits
else:
# Increment the previous IV by 1 modulo 2^96
iv_int = int.from_bytes(previous_iv, 'big')
iv_int = (iv_int + 1) % (1 << 96)
return iv_int.to_bytes(12, 'big')
# Convenience functions to match original API
def encrypt_message(plaintext: bytes, key: bytes, flag: int = 0xBEEF,
retry: int = 0, connection_status: int = 0,
iv: bytes = None, cipher_type: int = 0) -> bytes:
"""
Encrypt a message using the specified parameters.
Args:
plaintext: The data to encrypt
key: The encryption key (32 bytes for AES-256-GCM, 32 bytes for ChaCha20-Poly1305)
flag: 16-bit flag value (default: 0xBEEF)
retry: 8-bit retry counter
connection_status: 4-bit connection status
iv: Optional 96-bit IV (if None, a random one will be generated)
cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305
Returns:
The full encrypted message
"""
message = EncryptedMessage(
plaintext=plaintext,
key=key,
flag=flag,
retry=retry,
connection_status=connection_status,
iv=iv,
cipher_type=cipher_type
)
return message.encrypt()
def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes:
"""
Decrypt a message.
Args:
message: The full encrypted message
key: The encryption key
cipher_type: 0 for AES-256-GCM, 1 for ChaCha20-Poly1305
Returns:
The decrypted plaintext
"""
plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type)
return plaintext

View File

@ -1,262 +0,0 @@
import os
import struct
import time
import zlib
import hashlib
from typing import Tuple, Optional
def crc32_of(data: bytes) -> int:
"""
Compute CRC-32 of 'data'.
"""
return zlib.crc32(data) & 0xffffffff
# ---------------------------------------------------------------------------
# PING REQUEST (new format)
# Fields (in order):
# - session_nonce: 129 bits (from the top 129 bits of 17 random bytes)
# - version: 7 bits
# - cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305; for now only 0 is used)
# - CRC: 32 bits
#
# Total bits: 129 + 7 + 4 + 32 = 172 bits. We pack into 22 bytes (176 bits) with 4 spare bits.
# ---------------------------------------------------------------------------
class PingRequest:
"""
PING REQUEST format (172 bits / 22 bytes):
- session_nonce: 129 bits (from top 129 bits of 17 random bytes)
- version: 7 bits
- cipher: 4 bits (0 = AES-256-GCM, 1 = ChaCha20-poly1305)
- CRC: 32 bits
"""
def __init__(self, version: int, cipher: int, session_nonce: bytes = None):
if not (0 <= version < 128):
raise ValueError("Version must fit in 7 bits (0..127)")
if not (0 <= cipher < 16):
raise ValueError("Cipher must fit in 4 bits (0..15)")
self.version = version
self.cipher = cipher
# Generate session nonce if not provided
if session_nonce is None:
# Generate 17 random bytes
nonce_full = os.urandom(17)
# Use top 129 bits
nonce_int_full = int.from_bytes(nonce_full, 'big')
nonce_129_int = nonce_int_full >> 7 # drop lowest 7 bits
self.session_nonce = nonce_129_int.to_bytes(17, 'big')
else:
if len(session_nonce) != 17:
raise ValueError("Session nonce must be 17 bytes (136 bits)")
self.session_nonce = session_nonce
def serialize(self) -> bytes:
"""Serialize the ping request into a 22-byte packet."""
# Convert session_nonce to integer (129 bits)
nonce_int = int.from_bytes(self.session_nonce, 'big')
# Pack fields: shift nonce left by 11 bits, add version and cipher
partial_int = (nonce_int << 11) | (self.version << 4) | (self.cipher & 0x0F)
# This creates 129+7+4 = 140 bits; pack into 18 bytes
partial_bytes = partial_int.to_bytes(18, 'big')
# Compute CRC over these 18 bytes
cval = crc32_of(partial_bytes)
# Combine partial data with 32-bit CRC
final_int = (int.from_bytes(partial_bytes, 'big') << 32) | cval
return final_int.to_bytes(22, 'big')
@classmethod
def deserialize(cls, data: bytes) -> Optional['PingRequest']:
"""Deserialize a 22-byte packet into a PingRequest object."""
if len(data) != 22:
return None
# Extract 176-bit integer
final_int = int.from_bytes(data, 'big')
# Extract CRC and verify
crc_in = final_int & 0xffffffff
partial_int = final_int >> 32 # 140 bits
partial_bytes = partial_int.to_bytes(18, 'big')
crc_calc = crc32_of(partial_bytes)
if crc_calc != crc_in:
return None
# Extract fields
cipher = partial_int & 0x0F
version = (partial_int >> 4) & 0x7F
nonce_129_int = partial_int >> 11 # 129 bits
session_nonce = nonce_129_int.to_bytes(17, 'big')
return cls(version, cipher, session_nonce)
# ---------------------------------------------------------------------------
# PING RESPONSE (new format)
# Fields:
# - timestamp: 32 bits (we take the lower 32 bits of the time in ms)
# - version: 7 bits
# - cipher: 4 bits
# - answer: 1 bit
# - CRC: 32 bits
#
# Total bits: 32 + 7 + 4 + 1 + 32 = 76 bits; pack into 10 bytes (80 bits) with 4 spare bits.
# ---------------------------------------------------------------------------
class PingResponse:
"""
PING RESPONSE format (76 bits / 10 bytes):
- timestamp: 32 bits (milliseconds since epoch, lower 32 bits)
- version: 7 bits
- cipher: 4 bits
- answer: 1 bit (0 = no, 1 = yes)
- CRC: 32 bits
"""
def __init__(self, version: int, cipher: int, answer: int, timestamp: int = None):
if not (0 <= version < 128):
raise ValueError("Version must fit in 7 bits")
if not (0 <= cipher < 16):
raise ValueError("Cipher must fit in 4 bits")
if answer not in (0, 1):
raise ValueError("Answer must be 0 or 1")
self.version = version
self.cipher = cipher
self.answer = answer
self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff)
def serialize(self) -> bytes:
"""Serialize the ping response into a 10-byte packet."""
# Pack timestamp, version, cipher, answer: 32+7+4+1 = 44 bits
partial_val = (self.timestamp << (7+4+1)) | (self.version << (4+1)) | (self.cipher << 1) | self.answer
partial_bytes = partial_val.to_bytes(6, 'big') # 6 bytes = 48 bits, 4 spare bits
# Compute CRC
cval = crc32_of(partial_bytes)
# Combine with CRC
final_val = (int.from_bytes(partial_bytes, 'big') << 32) | cval
return final_val.to_bytes(10, 'big')
@classmethod
def deserialize(cls, data: bytes) -> Optional['PingResponse']:
"""Deserialize a 10-byte packet into a PingResponse object."""
if len(data) != 10:
return None
# Extract 80-bit integer
final_int = int.from_bytes(data, 'big')
# Extract CRC and verify
crc_in = final_int & 0xffffffff
partial_int = final_int >> 32 # 48 bits
partial_bytes = partial_int.to_bytes(6, 'big')
crc_calc = crc32_of(partial_bytes)
if crc_calc != crc_in:
return None
# Extract fields (discard 4 spare bits)
partial_int >>= 4 # now 44 bits
answer = partial_int & 0x01
cipher = (partial_int >> 1) & 0x0F
version = (partial_int >> (1+4)) & 0x7F
timestamp = partial_int >> (1+4+7)
return cls(version, cipher, answer, timestamp)
# =============================================================================
# 3) Handshake
# - 32-bit timestamp
# - 64-byte ephemeral pubkey (raw x||y = 512 bits)
# - 64-byte ephemeral signature (raw r||s = 512 bits)
# - 32-byte PFS hash (256 bits)
# - 32-bit CRC
# => total 4 + 64 + 64 + 32 + 4 = 168 bytes = 1344 bits
# =============================================================================
class Handshake:
"""
HANDSHAKE format (1344 bits / 168 bytes):
- timestamp: 32 bits
- ephemeral_pubkey: 512 bits (64 bytes, raw x||y format)
- ephemeral_signature: 512 bits (64 bytes, raw r||s format)
- pfs_hash: 256 bits (32 bytes)
- CRC: 32 bits
"""
def __init__(self, ephemeral_pubkey: bytes, ephemeral_signature: bytes, pfs_hash: bytes, timestamp: int = None):
if len(ephemeral_pubkey) != 64:
raise ValueError("ephemeral_pubkey must be 64 bytes (raw x||y)")
if len(ephemeral_signature) != 64:
raise ValueError("ephemeral_signature must be 64 bytes (raw r||s)")
if len(pfs_hash) != 32:
raise ValueError("pfs_hash must be 32 bytes")
self.ephemeral_pubkey = ephemeral_pubkey
self.ephemeral_signature = ephemeral_signature
self.pfs_hash = pfs_hash
self.timestamp = timestamp or (int(time.time() * 1000) & 0xffffffff)
def serialize(self) -> bytes:
"""Serialize the handshake into a 168-byte packet."""
# Pack timestamp and other fields
partial = struct.pack("!I", self.timestamp) + self.ephemeral_pubkey + self.ephemeral_signature + self.pfs_hash
# Compute CRC
cval = crc32_of(partial)
# Append CRC
return partial + struct.pack("!I", cval)
@classmethod
def deserialize(cls, data: bytes) -> Optional['Handshake']:
"""Deserialize a 168-byte packet into a Handshake object."""
if len(data) != 168:
return None
# Extract and verify CRC
partial = data[:-4]
crc_in = struct.unpack("!I", data[-4:])[0]
crc_calc = crc32_of(partial)
if crc_calc != crc_in:
return None
# Extract fields
timestamp = struct.unpack("!I", partial[:4])[0]
ephemeral_pubkey = partial[4:4+64]
ephemeral_signature = partial[68:68+64]
pfs_hash = partial[132:132+32]
return cls(ephemeral_pubkey, ephemeral_signature, pfs_hash, timestamp)
# =============================================================================
# 4) PFS Hash Helper
# If no previous session, return 32 zero bytes
# Otherwise, compute sha256(session_number || last_shared_secret).
# =============================================================================
def compute_pfs_hash(session_number: int, shared_secret_hex: str) -> bytes:
"""
Compute the PFS hash field for handshake messages:
- If no previous session (session_number < 0), return 32 zero bytes
- Otherwise, compute sha256(session_number || shared_secret)
"""
if session_number < 0:
return b"\x00" * 32
# Convert shared_secret_hex to raw bytes
secret_bytes = bytes.fromhex(shared_secret_hex)
# Pack session_number as 4 bytes
sn_bytes = struct.pack("!I", session_number)
# Compute hash
return hashlib.sha256(sn_bytes + secret_bytes).digest()

View File

@ -1,815 +0,0 @@
import random
import os
import time
import threading
from typing import List, Dict, Any, Optional, Tuple
from crypto_utils import (
generate_identity_keys,
load_peer_identity_key,
sign_data,
verify_signature,
get_ephemeral_keypair,
compute_ecdh_shared_key,
der_to_raw,
raw_signature_to_der
)
from messages import (
PingRequest, PingResponse, Handshake,
compute_pfs_hash
)
import transmission
from encryption import (
EncryptedMessage, MessageHeader,
generate_iv, encrypt_message, decrypt_message
)
from auto_mode import AutoMode, AutoModeConfig
# ANSI colors
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
BLUE = "\033[94m"
RESET = "\033[0m"
class IcingProtocol:
def __init__(self):
# Identity keys (each 512 bits when printed as hex of 64 bytes)
self.identity_privkey, self.identity_pubkey = generate_identity_keys()
# Peer identity for verifying ephemeral signatures
self.peer_identity_pubkey_obj = None
self.peer_identity_pubkey_bytes = None
# Ephemeral keys (our side)
self.ephemeral_privkey = None
self.ephemeral_pubkey = None
# Last computed shared secret (hex string)
self.shared_secret = None
# Derived HKDF key (hex string, 256 bits)
self.hkdf_key = None
# Negotiated cipher (0 = AES-256-GCM, 1 = ChaCha20-Poly1305)
self.cipher_type = 0
# For PFS: track per-peer session info (session number and last shared secret)
self.pfs_history: Dict[bytes, Tuple[int, str]] = {}
# Protocol flags
self.state = {
"ping_sent": False,
"ping_received": False,
"handshake_sent": False,
"handshake_received": False,
"key_exchange_complete": False
}
# Auto mode for automated protocol operation
self.auto_mode = AutoMode(self)
# Legacy auto-responder toggle (kept for backward compatibility)
self.auto_responder = False
# Active connections list
self.connections = []
# Inbound messages (each message is a dict with keys: type, raw, parsed, connection)
self.inbound_messages: List[Dict[str, Any]] = []
# Store the session nonce (17 bytes but only 129 bits are valid) from first sent or received PING
self.session_nonce: bytes = None
# Last used IV for encrypted messages
self.last_iv: bytes = None
self.local_port = random.randint(30000, 40000)
self.server_listener = transmission.ServerListener(
host="127.0.0.1",
port=self.local_port,
on_new_connection=self.on_new_connection,
on_data_received=self.on_data_received
)
self.server_listener.start()
# -------------------------------------------------------------------------
# Transport callbacks
# -------------------------------------------------------------------------
def on_new_connection(self, conn: transmission.PeerConnection):
print(f"{GREEN}[IcingProtocol]{RESET} New incoming connection.")
self.connections.append(conn)
# Notify auto mode
self.auto_mode.handle_connection_established()
def on_data_received(self, conn: transmission.PeerConnection, data: bytes):
bits_count = len(data) * 8
print(
f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex()) > 60 else ''}")
# PING REQUEST (22 bytes)
if len(data) == 22:
ping_request = PingRequest.deserialize(data)
if ping_request:
self.state["ping_received"] = True
# If received cipher is not supported, force to 0 (AES-256-GCM)
if ping_request.cipher != 0 and ping_request.cipher != 1:
print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({ping_request.cipher}); forcing cipher to 0 in response.")
ping_request.cipher = 0
# Store cipher type for future encrypted messages
self.cipher_type = ping_request.cipher
# Store session nonce if not already set
if self.session_nonce is None:
self.session_nonce = ping_request.session_nonce
print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from received PING.")
index = len(self.inbound_messages)
msg = {
"type": "PING_REQUEST",
"raw": data,
"parsed": ping_request,
"connection": conn
}
self.inbound_messages.append(msg)
# Handle in auto mode (if active)
self.auto_mode.handle_ping_received(index)
# Legacy auto-responder (for backward compatibility)
if self.auto_responder and not self.auto_mode.active:
timer = threading.Timer(2.0, self._auto_respond_ping, args=[index])
timer.daemon = True
timer.start()
return
# PING RESPONSE (10 bytes)
elif len(data) == 10:
ping_response = PingResponse.deserialize(data)
if ping_response:
# Store negotiated cipher type
self.cipher_type = ping_response.cipher
index = len(self.inbound_messages)
msg = {
"type": "PING_RESPONSE",
"raw": data,
"parsed": ping_response,
"connection": conn
}
self.inbound_messages.append(msg)
# Notify auto mode (if active)
self.auto_mode.handle_ping_response_received(ping_response.answer == 1)
return
# HANDSHAKE message (168 bytes)
elif len(data) == 168:
handshake = Handshake.deserialize(data)
if handshake:
self.state["handshake_received"] = True
index = len(self.inbound_messages)
msg = {
"type": "HANDSHAKE",
"raw": data,
"parsed": handshake,
"connection": conn
}
self.inbound_messages.append(msg)
# Notify auto mode (if active)
self.auto_mode.handle_handshake_received(index)
# Legacy auto-responder
if self.auto_responder and not self.auto_mode.active:
timer = threading.Timer(2.0, self._auto_respond_handshake, args=[index])
timer.daemon = True
timer.start()
return
# Check if the message might be an encrypted message (e.g. header of 18 bytes at start)
elif len(data) >= 18:
# Try to parse header
try:
header = MessageHeader.unpack(data[:18])
# If header unpacking is successful and data length matches header expectations
expected_len = 18 + header.data_len + 16 # Header + payload + tag
# Check if CRC is included
has_crc = (header.connection_status & 0x01) != 0
if has_crc:
expected_len += 4 # Add CRC32 length
if len(data) >= expected_len:
index = len(self.inbound_messages)
msg = {
"type": "ENCRYPTED_MESSAGE",
"raw": data,
"parsed": header,
"connection": conn
}
self.inbound_messages.append(msg)
print(f"{YELLOW}[NOTICE]{RESET} Stored inbound ENCRYPTED_MESSAGE at index={index}.")
# Notify auto mode
self.auto_mode.handle_encrypted_received(index)
return
except Exception as e:
print(f"{RED}[ERROR]{RESET} Failed to parse message header: {e}")
# Otherwise, unrecognized/malformed message.
index = len(self.inbound_messages)
msg = {
"type": "UNKNOWN",
"raw": data,
"parsed": None,
"connection": conn
}
self.inbound_messages.append(msg)
print(f"{RED}[WARNING]{RESET} Unrecognized or malformed message stored at index={index}.")
# -------------------------------------------------------------------------
# HKDF Derivation
# -------------------------------------------------------------------------
def derive_hkdf(self):
"""
Derives a 256-bit key using HKDF.
Uses as input keying material (IKM) the shared secret from ECDH.
The salt is computed as SHA256(session_nonce || pfs_param), where:
- session_nonce is taken from self.session_nonce (17 bytes, 129 bits) or defaults to zeros.
- pfs_param is taken from the first inbound HANDSHAKE's pfs_hash field (32 bytes) or zeros.
"""
if not self.shared_secret:
print(f"{RED}[ERROR]{RESET} No shared secret available; cannot derive HKDF key.")
return
# IKM: shared secret converted from hex to bytes.
ikm = bytes.fromhex(self.shared_secret)
# Use stored session_nonce if available; otherwise default to zeros.
session_nonce = self.session_nonce if self.session_nonce is not None else (b"\x00" * 17)
# Determine pfs_param from first HANDSHAKE message (if any)
pfs_param = None
for msg in self.inbound_messages:
if msg["type"] == "HANDSHAKE":
try:
handshake = msg["parsed"]
pfs_param = handshake.pfs_hash
except Exception:
pfs_param = None
break
if pfs_param is None:
print(f"{RED}[WARNING]{RESET} No HANDSHAKE found; using 32 zero bytes for pfs_param.")
pfs_param = b"\x00" * 32 # 256-bit zeros
# Ensure both are bytes
if isinstance(session_nonce, str):
session_nonce = session_nonce.encode()
if isinstance(pfs_param, str):
pfs_param = pfs_param.encode()
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
hasher = hashes.Hash(hashes.SHA256())
hasher.update(session_nonce + pfs_param)
salt_value = hasher.finalize()
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32, # 256 bits
salt=salt_value,
info=b"",
)
derived_key = hkdf.derive(ikm)
self.hkdf_key = derived_key.hex()
self.state["key_exchange_complete"] = True
print(f"{GREEN}[HKDF]{RESET} Derived HKDF key: {self.hkdf_key}")
return True
# -------------------------------------------------------------------------
# Legacy Auto-responder helpers (kept for backward compatibility)
# -------------------------------------------------------------------------
def _auto_respond_ping(self, index: int):
"""
Called by a Timer to respond automatically to a PING_REQUEST after 2s.
"""
print(f"{BLUE}[AUTO]{RESET} Delayed responding to PING at index={index}")
self.respond_to_ping(index, answer=1) # Accept by default
self.show_state()
def _auto_respond_handshake(self, index: int):
"""
Called by a Timer to handle inbound HANDSHAKE automatically.
1) Generate ephemeral keys if not already set
2) Compute ECDH with the inbound ephemeral pub (generate_ecdhe)
3) Send our handshake back
4) Show state
"""
print(f"{BLUE}[AUTO]{RESET} Delayed ECDH process for HANDSHAKE at index={index}")
# 1) Generate ephemeral keys if we don't have them
if not self.ephemeral_privkey or not self.ephemeral_pubkey:
self.generate_ephemeral_keys()
# 2) Compute ECDH from inbound ephemeral pub
self.generate_ecdhe(index)
# 3) Send our handshake to the peer
self.send_handshake()
# 4) Show final state
self.show_state()
# -------------------------------------------------------------------------
# Public Methods for Auto Mode Management
# -------------------------------------------------------------------------
def start_auto_mode(self):
"""Start the automatic protocol operation mode."""
self.auto_mode.start()
def stop_auto_mode(self):
"""Stop the automatic protocol operation mode."""
self.auto_mode.stop()
def configure_auto_mode(self, **kwargs):
"""
Configure the automatic mode parameters.
Args:
**kwargs: Configuration parameters to set. Supported parameters:
- ping_response_accept: bool, whether to accept incoming pings
- ping_auto_initiate: bool, whether to initiate pings on connection
- ping_retry_count: int, number of ping retries
- ping_retry_delay: float, seconds between ping retries
- ping_timeout: float, seconds to wait for ping response
- preferred_cipher: int, preferred cipher (0=AES-GCM, 1=ChaCha20-Poly1305)
- handshake_retry_count: int, number of handshake retries
- handshake_retry_delay: float, seconds between handshake retries
- handshake_timeout: float, seconds to wait for handshake
- auto_message_enabled: bool, whether to auto-send messages
- message_interval: float, seconds between auto messages
- message_content: str, default message content
- active_mode: bool, whether to actively initiate protocol
"""
for key, value in kwargs.items():
if hasattr(self.auto_mode.config, key):
setattr(self.auto_mode.config, key, value)
print(f"{BLUE}[CONFIG]{RESET} Set auto mode {key} = {value}")
else:
print(f"{RED}[ERROR]{RESET} Unknown auto mode configuration parameter: {key}")
def get_auto_mode_config(self):
"""Return the current auto mode configuration."""
return self.auto_mode.config
def queue_auto_message(self, message: str):
"""
Add a message to the auto-send queue.
Args:
message: Message text to send
"""
self.auto_mode.queue_message(message)
def toggle_auto_mode_logging(self):
"""
Toggle detailed logging for auto mode.
When enabled, will show more information about state transitions and decision making.
"""
if not hasattr(self.auto_mode, 'verbose_logging'):
self.auto_mode.verbose_logging = True
else:
self.auto_mode.verbose_logging = not self.auto_mode.verbose_logging
status = "enabled" if self.auto_mode.verbose_logging else "disabled"
print(f"{BLUE}[AUTO-LOG]{RESET} Detailed logging {status}")
def debug_message(self, index: int):
"""
Debug a message in the inbound message queue.
Prints detailed information about the message.
Args:
index: The index of the message in the inbound_messages queue
"""
if index < 0 or index >= len(self.inbound_messages):
print(f"{RED}[ERROR]{RESET} Invalid message index {index}")
return
msg = self.inbound_messages[index]
print(f"\n{YELLOW}=== Message Debug [{index}] ==={RESET}")
print(f"Type: {msg['type']}")
print(f"Length: {len(msg['raw'])} bytes = {len(msg['raw'])*8} bits")
print(f"Raw data: {msg['raw'].hex()}")
if msg['parsed'] is not None:
print(f"\n{YELLOW}--- Parsed Data ---{RESET}")
if msg['type'] == 'PING_REQUEST':
ping = msg['parsed']
print(f"Version: {ping.version}")
print(f"Cipher: {ping.cipher} ({'AES-256-GCM' if ping.cipher == 0 else 'ChaCha20-Poly1305' if ping.cipher == 1 else 'Unknown'})")
print(f"Session nonce: {ping.session_nonce.hex()}")
print(f"CRC32: {ping.crc32:08x}")
elif msg['type'] == 'PING_RESPONSE':
resp = msg['parsed']
print(f"Version: {resp.version}")
print(f"Cipher: {resp.cipher} ({'AES-256-GCM' if resp.cipher == 0 else 'ChaCha20-Poly1305' if resp.cipher == 1 else 'Unknown'})")
print(f"Answer: {resp.answer} ({'Accept' if resp.answer == 1 else 'Reject'})")
print(f"CRC32: {resp.crc32:08x}")
elif msg['type'] == 'HANDSHAKE':
hs = msg['parsed']
print(f"Ephemeral pubkey: {hs.ephemeral_pubkey.hex()[:16]}...")
print(f"Ephemeral signature: {hs.ephemeral_signature.hex()[:16]}...")
print(f"PFS hash: {hs.pfs_hash.hex()[:16]}...")
print(f"Timestamp: {hs.timestamp}")
print(f"CRC32: {hs.crc32:08x}")
elif msg['type'] == 'ENCRYPTED_MESSAGE':
header = msg['parsed']
print(f"Flag: 0x{header.flag:04x}")
print(f"Data length: {header.data_len} bytes")
print(f"Retry: {header.retry}")
print(f"Connection status: {header.connection_status} ({'CRC included' if header.connection_status & 0x01 else 'No CRC'})")
print(f"IV: {header.iv.hex()}")
# Calculate expected message size
expected_len = 18 + header.data_len + 16 # Header + payload + tag
if header.connection_status & 0x01:
expected_len += 4 # Add CRC
print(f"Expected total length: {expected_len} bytes")
print(f"Actual length: {len(msg['raw'])} bytes")
# If we have a key, try to decrypt
if self.hkdf_key:
print("\nAttempting decryption...")
try:
key = bytes.fromhex(self.hkdf_key)
plaintext = decrypt_message(msg['raw'], key, self.cipher_type)
print(f"Decrypted: {plaintext.decode('utf-8')}")
except Exception as e:
print(f"Decryption failed: {e}")
print()
# -------------------------------------------------------------------------
# Public Methods
# -------------------------------------------------------------------------
def connect_to_peer(self, port: int):
conn = transmission.connect_to_peer("127.0.0.1", port, self.on_data_received)
self.connections.append(conn)
print(f"{GREEN}[IcingProtocol]{RESET} Outgoing connection to port {port} established.")
# Notify auto mode
self.auto_mode.handle_connection_established()
def set_peer_identity(self, peer_pubkey_hex: str):
pubkey_bytes = bytes.fromhex(peer_pubkey_hex)
self.peer_identity_pubkey_obj = load_peer_identity_key(pubkey_bytes)
self.peer_identity_pubkey_bytes = pubkey_bytes
print(f"{GREEN}[IcingProtocol]{RESET} Stored peer identity pubkey (hex={peer_pubkey_hex[:16]}...).")
def generate_ephemeral_keys(self):
self.ephemeral_privkey, self.ephemeral_pubkey = get_ephemeral_keypair()
print(f"{GREEN}[IcingProtocol]{RESET} Generated ephemeral key pair: pubkey={self.ephemeral_pubkey.hex()[:16]}...")
# Send PING (session discovery and cipher negotiation)
def send_ping_request(self, cipher_type=0):
"""
Send a ping request to the first connected peer.
Args:
cipher_type: Preferred cipher type (0 = AES-256-GCM, 1 = ChaCha20-Poly1305)
"""
if not self.connections:
print(f"{RED}[ERROR]{RESET} No active connections.")
return False
# Validate cipher type
if cipher_type not in (0, 1):
print(f"{YELLOW}[WARNING]{RESET} Invalid cipher type {cipher_type}, defaulting to AES-256-GCM (0)")
cipher_type = 0
# Create ping request with specified cipher
ping_request = PingRequest(version=0, cipher=cipher_type)
# Store session nonce if not already set
if self.session_nonce is None:
self.session_nonce = ping_request.session_nonce
print(f"{YELLOW}[NOTICE]{RESET} Stored session nonce from sent PING.")
# Serialize and send
pkt = ping_request.serialize()
self._send_packet(self.connections[0], pkt, "PING_REQUEST")
self.state["ping_sent"] = True
return True
def send_handshake(self):
"""
Build and send handshake:
- ephemeral_pubkey (64 bytes, raw x||y)
- ephemeral_signature (64 bytes, raw r||s)
- pfs_hash (32 bytes)
- timestamp (32 bits)
- CRC (32 bits)
"""
if not self.connections:
print(f"{RED}[ERROR]{RESET} No active connections.")
return False
if not self.ephemeral_privkey or not self.ephemeral_pubkey:
print(f"{RED}[ERROR]{RESET} Ephemeral keys not generated.")
return False
if self.peer_identity_pubkey_bytes is None:
print(f"{RED}[ERROR]{RESET} Peer identity not set; needed for PFS tracking.")
return False
# 1) Sign ephemeral_pubkey using identity key
sig_der = sign_data(self.identity_privkey, self.ephemeral_pubkey)
# Convert DER signature to raw r||s format (64 bytes)
raw_signature = der_to_raw(sig_der)
# 2) Compute PFS hash
session_number, last_secret_hex = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, ""))
pfs = compute_pfs_hash(session_number, last_secret_hex)
# 3) Create handshake object
handshake = Handshake(
ephemeral_pubkey=self.ephemeral_pubkey,
ephemeral_signature=raw_signature,
pfs_hash=pfs
)
# 4) Serialize and send
pkt = handshake.serialize()
self._send_packet(self.connections[0], pkt, "HANDSHAKE")
self.state["handshake_sent"] = True
return True
def enable_auto_responder(self, enable: bool):
"""
Legacy method for enabling/disabling auto responder.
For new code, use start_auto_mode() and stop_auto_mode() instead.
"""
self.auto_responder = enable
print(f"{YELLOW}[LEGACY]{RESET} Auto responder set to {enable}. Consider using auto_mode instead.")
# -------------------------------------------------------------------------
# Manual Responses
# -------------------------------------------------------------------------
def respond_to_ping(self, index: int, answer: int):
"""
Respond to a ping request with the specified answer (0 = no, 1 = yes).
If answer is 1, we accept the connection and use the cipher specified in the request.
"""
if index < 0 or index >= len(self.inbound_messages):
print(f"{RED}[ERROR]{RESET} Invalid index {index}.")
return False
msg = self.inbound_messages[index]
if msg["type"] != "PING_REQUEST":
print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a PING_REQUEST.")
return False
ping_request = msg["parsed"]
version = ping_request.version
cipher = ping_request.cipher
# Force cipher to 0 or 1 (only AES-256-GCM and ChaCha20-Poly1305 are supported)
if cipher != 0 and cipher != 1:
print(f"{YELLOW}[NOTICE]{RESET} Received PING with unsupported cipher ({cipher}); forcing cipher to 0 in response.")
cipher = 0
# Store the negotiated cipher type if we're accepting
if answer == 1:
self.cipher_type = cipher
conn = msg["connection"]
# Create ping response
ping_response = PingResponse(version, cipher, answer)
resp = ping_response.serialize()
self._send_packet(conn, resp, "PING_RESPONSE")
print(f"{BLUE}[MANUAL]{RESET} Responded to ping with answer={answer}.")
return True
def generate_ecdhe(self, index: int):
"""
Process a handshake message:
1. Verify the ephemeral signature
2. Compute the ECDH shared secret
3. Update PFS history
"""
if index < 0 or index >= len(self.inbound_messages):
print(f"{RED}[ERROR]{RESET} Invalid index {index}.")
return False
msg = self.inbound_messages[index]
if msg["type"] != "HANDSHAKE":
print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a HANDSHAKE.")
return False
handshake = msg["parsed"]
# Convert raw signature to DER for verification
raw_sig = handshake.ephemeral_signature
sig_der = raw_signature_to_der(raw_sig)
# Verify signature
ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, handshake.ephemeral_pubkey)
if not ok:
print(f"{RED}[ERROR]{RESET} Ephemeral signature invalid.")
return False
print(f"{GREEN}[OK]{RESET} Ephemeral signature verified.")
# Check if we have ephemeral keys
if not self.ephemeral_privkey:
print(f"{YELLOW}[WARN]{RESET} No ephemeral_privkey available, cannot compute shared secret.")
return False
# Compute ECDH shared secret
shared = compute_ecdh_shared_key(self.ephemeral_privkey, handshake.ephemeral_pubkey)
self.shared_secret = shared.hex()
print(f"{GREEN}[OK]{RESET} Computed ECDH shared key = {self.shared_secret}")
# Update PFS history
old_session, _ = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, ""))
new_session = 1 if old_session < 0 else old_session + 1
self.pfs_history[self.peer_identity_pubkey_bytes] = (new_session, self.shared_secret)
return True
# -------------------------------------------------------------------------
# Utility
# -------------------------------------------------------------------------
def _send_packet(self, conn: transmission.PeerConnection, data: bytes, label: str):
bits_count = len(data) * 8
print(f"{BLUE}[SEND]{RESET} {label} -> {bits_count} bits: {data.hex()[:60]}{'...' if len(data.hex())>60 else ''}")
conn.send(data)
def show_state(self):
print(f"\n{YELLOW}=== Global State ==={RESET}")
print(f"Listening Port: {self.local_port}")
print(f"Identity PubKey: 512 bits => {self.identity_pubkey.hex()[:16]}...")
if self.peer_identity_pubkey_bytes:
print(f"Peer Identity PubKey: 512 bits => {self.peer_identity_pubkey_bytes.hex()[:16]}...")
else:
print("Peer Identity PubKey: [None]")
print("\nEphemeral Keys:")
if self.ephemeral_pubkey:
print(f" ephemeral_pubkey: 512 bits => {self.ephemeral_pubkey.hex()[:16]}...")
else:
print(" ephemeral_pubkey: [None]")
print(f"\nShared Secret: {self.shared_secret if self.shared_secret else '[None]'}")
if self.hkdf_key:
print(f"HKDF Derived Key: {self.hkdf_key} (size: {len(self.hkdf_key)*8} bits)")
else:
print("HKDF Derived Key: [None]")
print(f"Negotiated Cipher: {'AES-256-GCM' if self.cipher_type == 0 else 'ChaCha20-Poly1305'} (code: {self.cipher_type})")
if self.session_nonce:
print(f"Session Nonce: {self.session_nonce.hex()} (129 bits)")
else:
print("Session Nonce: [None]")
if self.last_iv:
print(f"Last IV: {self.last_iv.hex()} (96 bits)")
else:
print("Last IV: [None]")
print("\nProtocol Flags:")
for k, v in self.state.items():
print(f" {k}: {v}")
print("\nAuto Mode Active:", self.auto_mode.active)
print("Auto Mode State:", self.auto_mode.state)
print("Legacy Auto Responder:", self.auto_responder)
print("\nActive Connections:")
for i, c in enumerate(self.connections):
print(f" [{i}] Alive={c.alive}")
print("\nInbound Message Queue:")
for i, m in enumerate(self.inbound_messages):
print(f" [{i}] type={m['type']} len={len(m['raw'])} bytes => {len(m['raw']) * 8} bits")
print()
def stop(self):
"""Stop the protocol and clean up resources."""
# Stop auto mode first
self.auto_mode.stop()
# Stop server listener
self.server_listener.stop()
# Close all connections
for c in self.connections:
c.close()
self.connections.clear()
self.inbound_messages.clear()
print(f"{RED}[STOP]{RESET} Protocol stopped.")
# -------------------------------------------------------------------------
# Encrypted Messaging
# -------------------------------------------------------------------------
def send_encrypted_message(self, plaintext: str):
"""
Encrypts and sends a message using the derived HKDF key and negotiated cipher.
The message format is:
- Header (18 bytes): flag, data_len, retry, connection_status, IV
- Payload: variable length encrypted data
- Footer: Authentication tag (16 bytes) + optional CRC32 (4 bytes)
"""
if not self.connections:
print(f"{RED}[ERROR]{RESET} No active connections.")
return False
if not self.hkdf_key:
print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot encrypt message.")
return False
# Get the encryption key
key = bytes.fromhex(self.hkdf_key)
# Convert plaintext to bytes
plaintext_bytes = plaintext.encode('utf-8')
# Generate or increment the IV
if self.last_iv is None:
# First message, generate random IV
iv = generate_iv(initial=True)
else:
# Subsequent message, increment previous IV
iv = generate_iv(initial=False, previous_iv=self.last_iv)
# Store the new IV
self.last_iv = iv
# Create encrypted message (connection_status 0 = no CRC)
encrypted = encrypt_message(
plaintext=plaintext_bytes,
key=key,
flag=0xBEEF, # Default flag
retry=0,
connection_status=0, # No CRC
iv=iv,
cipher_type=self.cipher_type
)
# Send the encrypted message
self._send_packet(self.connections[0], encrypted, "ENCRYPTED_MESSAGE")
print(f"{GREEN}[SEND_ENCRYPTED]{RESET} Encrypted message sent.")
return True
def decrypt_received_message(self, index: int):
"""
Decrypt a received encrypted message using the HKDF key and negotiated cipher.
"""
if index < 0 or index >= len(self.inbound_messages):
print(f"{RED}[ERROR]{RESET} Invalid message index.")
return None
msg = self.inbound_messages[index]
if msg["type"] != "ENCRYPTED_MESSAGE":
print(f"{RED}[ERROR]{RESET} Message at index {index} is not an ENCRYPTED_MESSAGE.")
return None
# Get the encrypted message
encrypted = msg["raw"]
if not self.hkdf_key:
print(f"{RED}[ERROR]{RESET} No HKDF key derived. Cannot decrypt message.")
return None
# Get the encryption key
key = bytes.fromhex(self.hkdf_key)
try:
# Decrypt the message
plaintext = decrypt_message(encrypted, key, self.cipher_type)
# Convert to string
plaintext_str = plaintext.decode('utf-8')
# Update last IV from the header
header = MessageHeader.unpack(encrypted[:18])
self.last_iv = header.iv
print(f"{GREEN}[DECRYPTED]{RESET} Decrypted message: {plaintext_str}")
return plaintext_str
except Exception as e:
print(f"{RED}[ERROR]{RESET} Decryption failed: {e}")
return None

View File

@ -1,100 +0,0 @@
import socket
import threading
from typing import Callable
class PeerConnection:
"""
Represents a live, two-way connection to a peer.
We keep a socket open, read data in a background thread,
and can send data from the main thread at any time.
"""
def __init__(self, sock: socket.socket, on_data_received: Callable[['PeerConnection', bytes], None]):
self.sock = sock
self.on_data_received = on_data_received
self.alive = True
self.read_thread = threading.Thread(target=self.read_loop, daemon=True)
self.read_thread.start()
def read_loop(self):
while self.alive:
try:
data = self.sock.recv(4096)
if not data:
break
self.on_data_received(self, data)
except OSError:
break
self.alive = False
self.sock.close()
print("[PeerConnection] Connection closed.")
def send(self, data: bytes):
if not self.alive:
print("[PeerConnection.send] Cannot send, connection not alive.")
return
try:
self.sock.sendall(data)
except OSError:
print("[PeerConnection.send] Send failed, connection might be closed.")
self.alive = False
def close(self):
self.alive = False
try:
self.sock.shutdown(socket.SHUT_RDWR)
except OSError:
pass
self.sock.close()
class ServerListener(threading.Thread):
"""
A thread that listens on a given port. When a new client connects,
it creates a PeerConnection for that client.
"""
def __init__(self, host: str, port: int,
on_new_connection: Callable[[PeerConnection], None],
on_data_received: Callable[[PeerConnection, bytes], None]):
super().__init__(daemon=True)
self.host = host
self.port = port
self.on_new_connection = on_new_connection
self.on_data_received = on_data_received
self.server_socket = None
self.stop_event = threading.Event()
def run(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.server_socket.settimeout(1.0)
print(f"[ServerListener] Listening on {self.host}:{self.port}")
while not self.stop_event.is_set():
try:
client_sock, addr = self.server_socket.accept()
print(f"[ServerListener] Accepted connection from {addr}")
conn = PeerConnection(client_sock, self.on_data_received)
self.on_new_connection(conn)
except socket.timeout:
pass
except OSError:
break
if self.server_socket:
self.server_socket.close()
def stop(self):
self.stop_event.set()
if self.server_socket:
self.server_socket.close()
def connect_to_peer(host: str, port: int,
on_data_received: Callable[[PeerConnection, bytes], None]) -> PeerConnection:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
print(f"[connect_to_peer] Connected to {host}:{port}")
conn = PeerConnection(sock, on_data_received)
return conn