diff --git a/protocol_prototype/IcingProtocol.drawio b/protocol_prototype/IcingProtocol.drawio index 8f46988..a521a38 100644 --- a/protocol_prototype/IcingProtocol.drawio +++ b/protocol_prototype/IcingProtocol.drawio @@ -1,4 +1,4 @@ - + @@ -259,8 +259,16 @@ - - + + + + + + + + + + @@ -287,7 +295,7 @@ - + @@ -506,61 +514,292 @@ - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/protocol_prototype/Prototype/cli.py b/protocol_prototype/Prototype/cli.py new file mode 100644 index 0000000..13d314c --- /dev/null +++ b/protocol_prototype/Prototype/cli.py @@ -0,0 +1,91 @@ +import argparse +import threading +import sys + +from noise_xk.session import NoiseXKSession +from noise_xk.transport import P2PTransport +from dissononce.dh.x25519.public import PublicKey + +def main(): + parser = argparse.ArgumentParser(prog="noise_xk") + parser.add_argument( + "--listen-port", type=int, required=True, + help="Port on which to bind+listen" + ) + parser.add_argument( + "--peer-host", type=str, default="127.0.0.1", + help="Peer host to dial (default: 127.0.0.1)" + ) + parser.add_argument( + "--peer-port", type=int, required=True, + help="Peer port to dial" + ) + args = parser.parse_args() + + # 1. Generate static keypair and print our static public key + kp = NoiseXKSession.generate_keypair() + # kp.public is a PublicKey; .data holds raw bytes + local_priv = kp.private # carried implicitly in NoiseXKSession + local_pub = kp.public + print(f"[My static pubkey:] {local_pub.data.hex()}") + + # 2. Read peer pubkey from user input + peer_pubkey = None + while True: + line = input(">>> ").strip() + if line.startswith("peer_pubkey "): + hexstr = line.split(None, 1)[1] + try: + raw = bytes.fromhex(hexstr) + peer_pubkey = PublicKey(raw) # wrap raw bytes in PublicKey + break + except ValueError: + print("Invalid hex; please retry.") + else: + print("Use: peer_pubkey ") + + # 3. Establish P2P connection (race listen vs. dial) + transport = P2PTransport( + listen_port=args.listen_port, + peer_host=args.peer_host, + peer_port=args.peer_port + ) + print( + f"Racing connect/listen on ports " + f"{args.listen_port} ⇆ {args.peer_host}:{args.peer_port}…" + ) + sock, initiator = transport.connect() + print(f"Connected (initiator={initiator}); performing handshake…") + + # 4. Perform Noise XK handshake + session = NoiseXKSession(kp, peer_pubkey) + session.handshake(sock, initiator) + print("Handshake complete! You can now type messages.") + + # 5. Reader thread for incoming messages + def reader(): + while True: + try: + pt = session.receive(sock) + print(f"\n< {pt.decode()}") + except Exception as e: + print(f"\n[Receive error ({type(e).__name__}): {e!r}]") + break + + thread = threading.Thread(target=reader, daemon=True) + thread.start() + + # 6. Main loop: send user input + try: + for line in sys.stdin: + text = line.rstrip("\n") + if not text: + continue + session.send(sock, text.encode()) + except KeyboardInterrupt: + pass + finally: + sock.close() + +if __name__ == "__main__": + main() diff --git a/protocol_prototype/Prototype/noise_xk/session.py b/protocol_prototype/Prototype/noise_xk/session.py new file mode 100644 index 0000000..83b7ae4 --- /dev/null +++ b/protocol_prototype/Prototype/noise_xk/session.py @@ -0,0 +1,179 @@ +# noise_xk/session.py + +import socket +import logging +from dissononce.processing.impl.handshakestate import HandshakeState +from dissononce.processing.impl.symmetricstate import SymmetricState +from dissononce.processing.impl.cipherstate import CipherState +from dissononce.processing.handshakepatterns.interactive.XK import XKHandshakePattern +from dissononce.cipher.chachapoly import ChaChaPolyCipher +from dissononce.dh.x25519.x25519 import X25519DH +from dissononce.dh.keypair import KeyPair +from dissononce.dh.x25519.public import PublicKey +from dissononce.hash.sha256 import SHA256Hash + +# Configure root logger for debug output +logging.basicConfig(level=logging.DEBUG, format="%(message)s") + +class NoiseXKSession: + @staticmethod + def generate_keypair() -> KeyPair: + """ + Generate a static X25519 KeyPair. + Returns: + KeyPair object with .private and .public attributes. + """ + return X25519DH().generate_keypair() + + def __init__(self, local_kp: KeyPair, peer_pubkey: PublicKey): + """ + Initialize with our KeyPair and the peer's PublicKey. + """ + self.local_kp: KeyPair = local_kp + self.peer_pubkey: PublicKey = peer_pubkey + + # Build the Noise handshake state (X25519 DH, ChaChaPoly cipher, SHA256 hash) + cipher = ChaChaPolyCipher() + dh = X25519DH() + hshash = SHA256Hash() + symmetric = SymmetricState(CipherState(cipher), hshash) + self._hs = HandshakeState(symmetric, dh) + + self._send_cs = None # type: CipherState + self._recv_cs = None + + def handshake(self, sock: socket.socket, initiator: bool) -> None: + """ + Perform the XK handshake over the socket. Branches on initiator/responder + so that each side reads or writes in the correct message order. + On completion, self._send_cs and self._recv_cs hold the two CipherStates. + """ + + logging.debug(f"[handshake] start (initiator={initiator})") + # initialize with our KeyPair and their PublicKey + if initiator: + # initiator knows peer’s static out-of-band + self._hs.initialize( + XKHandshakePattern(), + True, + b'', + s=self.local_kp, + rs=self.peer_pubkey + ) + else: + logging.debug("[handshake] responder initializing without rs") + # responder must NOT supply rs here + self._hs.initialize( + XKHandshakePattern(), + False, + b'', + s=self.local_kp + ) + + cs_pair = None + if initiator: + # 1) -> e + buf1 = bytearray() + cs_pair = self._hs.write_message(b'', buf1) + logging.debug(f"[-> e] {buf1.hex()}") + self._send_all(sock, buf1) + + # 2) <- e, es, s, ss + msg2 = self._recv_all(sock) + logging.debug(f"[<- msg2] {msg2.hex()}") + self._hs.read_message(msg2, bytearray()) + + # 3) -> se (final) + buf3 = bytearray() + cs_pair = self._hs.write_message(b'', buf3) + logging.debug(f"[-> se] {buf3.hex()}") + self._send_all(sock, buf3) + else: + # 1) <- e + msg1 = self._recv_all(sock) + logging.debug(f"[<- e] {msg1.hex()}") + self._hs.read_message(msg1, bytearray()) + + # 2) -> e, es, s, ss + buf2 = bytearray() + cs_pair = self._hs.write_message(b'', buf2) + logging.debug(f"[-> msg2] {buf2.hex()}") + self._send_all(sock, buf2) + + # 3) <- se (final) + msg3 = self._recv_all(sock) + logging.debug(f"[<- se] {msg3.hex()}") + cs_pair = self._hs.read_message(msg3, bytearray()) + + # on the final step, we must get exactly two CipherStates + if not cs_pair or len(cs_pair) != 2: + raise RuntimeError("Handshake did not complete properly") + cs0, cs1 = cs_pair + # the library returns (cs_encrypt_for_initiator, cs_decrypt_for_initiator) + if initiator: + # initiator: cs0 encrypts, cs1 decrypts + self._send_cs, self._recv_cs = cs0, cs1 + else: + # responder must swap + self._send_cs, self._recv_cs = cs1, cs0 + + # dump the raw symmetric keys & nonces (if available) + self._dump_cipherstate("HANDSHAKE→ SEND", self._send_cs) + self._dump_cipherstate("HANDSHAKE→ RECV", self._recv_cs) + + def send(self, sock: socket.socket, plaintext: bytes) -> None: + """ + Encrypt and send a message. + """ + if self._send_cs is None: + raise RuntimeError("Handshake not complete") + ct = self._send_cs.encrypt_with_ad(b'', plaintext) + logging.debug(f"[ENCRYPT] {ct.hex()}") + self._dump_cipherstate("SEND→ after encrypt", self._send_cs) + self._send_all(sock, ct) + + def receive(self, sock: socket.socket) -> bytes: + """ + Receive and decrypt a message. + """ + if self._recv_cs is None: + raise RuntimeError("Handshake not complete") + ct = self._recv_all(sock) + logging.debug(f"[CIPHERTEXT] {ct.hex()}") + self._dump_cipherstate("RECV→ before decrypt", self._recv_cs) + pt = self._recv_cs.decrypt_with_ad(b'', ct) + logging.debug(f"[DECRYPT] {pt!r}") + return pt + + def _send_all(self, sock: socket.socket, data: bytes) -> None: + # Length-prefix (2 bytes big-endian) + data + length = len(data).to_bytes(2, 'big') + sock.sendall(length + data) + + def _recv_all(self, sock: socket.socket) -> bytes: + # Read 2-byte length prefix, then the payload + hdr = self._read_exact(sock, 2) + length = int.from_bytes(hdr, 'big') + return self._read_exact(sock, length) + + @staticmethod + def _read_exact(sock: socket.socket, n: int) -> bytes: + buf = bytearray() + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + raise ConnectionError("Socket closed during read") + buf.extend(chunk) + return bytes(buf) + + def _dump_cipherstate(self, label: str, cs: CipherState) -> None: + """ + Print the symmetric key (cs._k) and nonce counter (cs._n) for inspection. + """ + key = cs._key + nonce = getattr(cs, "_n", None) + if isinstance(key, (bytes, bytearray)): + key_hex = key.hex() + else: + key_hex = repr(key) + logging.debug(f"[{label}] key={key_hex}") diff --git a/protocol_prototype/Prototype/noise_xk/transport.py b/protocol_prototype/Prototype/noise_xk/transport.py new file mode 100644 index 0000000..d667340 --- /dev/null +++ b/protocol_prototype/Prototype/noise_xk/transport.py @@ -0,0 +1,99 @@ +import socket +import threading +import time + +class P2PTransport: + def __init__(self, listen_port: int, peer_host: str, peer_port: int): + """ + Args: + listen_port: port to bind() and accept() + peer_host: host to dial() + peer_port: port to dial() + """ + self.listen_port = listen_port + self.peer_host = peer_host + self.peer_port = peer_port + + def connect(self) -> (socket.socket, bool): + """ + Race bind+listen vs. dial: + - If dial succeeds first, return (sock, True) # we are initiator + - If accept succeeds first, return (sock, False) # we are responder + """ + server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + server.bind(('0.0.0.0', self.listen_port)) + server.listen(1) + + result = {} + event = threading.Event() + lock = threading.Lock() + + def accept_thread(): + try: + conn, _ = server.accept() + with lock: + if not event.is_set(): + result['sock'] = conn + result['initiator'] = False + event.set() + except Exception: + pass + + def dial_thread(): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(1.0) + while not event.is_set(): + try: + sock.connect((self.peer_host, self.peer_port)) + with lock: + if not event.is_set(): + result['sock'] = sock + result['initiator'] = True + event.set() + return + except (ConnectionRefusedError, socket.timeout): + time.sleep(0.1) + except Exception: + break + + t1 = threading.Thread(target=accept_thread, daemon=True) + t2 = threading.Thread(target=dial_thread, daemon=True) + t1.start() + t2.start() + + event.wait() + sock, initiator = result['sock'], result['initiator'] + # close the listening socket—we’ve got our P2P link + server.close() + # ensure this socket is in blocking mode (no lingering timeouts) + sock.settimeout(None) + return sock, initiator + + def send_packet(self, sock: socket.socket, data: bytes) -> None: + """ + Send a 2-byte big-endian length prefix followed by data. + """ + length = len(data).to_bytes(2, 'big') + sock.sendall(length + data) + + def recv_packet(self, sock: socket.socket) -> bytes: + """ + Receive a 2-byte length prefix, then that many payload bytes. + """ + hdr = self._read_exact(sock, 2) + length = int.from_bytes(hdr, 'big') + return self._read_exact(sock, length) + + @staticmethod + def _read_exact(sock: socket.socket, n: int) -> bytes: + buf = bytearray() + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + raise ConnectionError("Socket closed during read") + buf.extend(chunk) + return bytes(buf) + + def close(self, sock: socket.socket) -> None: + sock.close() diff --git a/protocol_prototype/Prototype/noise_xk_cli/cli.py b/protocol_prototype/Prototype/noise_xk_cli/cli.py deleted file mode 100644 index c67dafb..0000000 --- a/protocol_prototype/Prototype/noise_xk_cli/cli.py +++ /dev/null @@ -1,253 +0,0 @@ -#!/usr/bin/env python3 -import os -import sys -import cmd -import json -import logging -from typing import Optional - -from keys import generate_static_key, load_static_key, save_contact, list_contacts, CONTACTS_FILE -from transport import Peer -from noise_handshake import NoiseXK -from messaging import Messenger - -# Configure default logging -logger = logging.getLogger("noise_xk_cli") -logger.setLevel(logging.INFO) -handler = logging.StreamHandler() -formatter = logging.Formatter("[%(levelname)s] %(message)s") -handler.setFormatter(formatter) -logger.addHandler(handler) - - -class NoiseCLI(cmd.Cmd): - intro = "Welcome to the Noise_XK CLI. Type help or ? to list commands." - prompt = "(noise) " - - def __init__(self): - super().__init__() - # Static key pair for this instance - self.static_priv = None - self.static_pub = None - # Peer static key (bytes) - self.peer_static: Optional[bytes] = None - # Transport peer connection - self.peer: Optional[Peer] = None - # Handshake and messenger - self.handshake: Optional[NoiseXK] = None - self.messenger: Optional[Messenger] = None - # Incoming ciphertext buffer - self.inbox = [] - # Ensure contacts directory - os.makedirs(os.path.dirname(CONTACTS_FILE), exist_ok=True) - - def preloop(self): - # Load existing static key or generate new - priv, pub = generate_static_key() - self.static_priv, self.static_pub = priv, pub - logger.info(f"Generated static X25519 key, public: {pub.hex()}") - - def do_generate_key(self, arg): - "Regenerate a new static key pair" - priv, pub = generate_static_key() - self.static_priv, self.static_pub = priv, pub - logger.info(f"New static public key: {pub.hex()}") - - def do_import_key(self, arg): - "import_key Import peer static public key" - parts = arg.split() - if len(parts) != 2: - logger.error("Usage: import_key ") - return - alias, hex_pub = parts - try: - pub = load_static_key(hex_pub) - save_contact(alias, hex_pub) - logger.info(f"Imported contact '{alias}' = {hex_pub}") - except Exception as e: - logger.error(f"Failed to import key: {e}") - - def do_list_keys(self, arg): - "List saved contacts" - contacts = list_contacts() - if not contacts: - logger.info("No contacts saved.") - return - for alias, hex_pub in contacts: - print(f" {alias}: {hex_pub}") - - def do_listen(self, arg): - "listen Start listening for incoming P2P connections" - if not arg: - logger.error("Usage: listen ") - return - try: - port = int(arg.strip()) - except ValueError: - logger.error("Port must be an integer.") - return - self.peer = Peer(port, self.handle_data) - self.peer.start_listen() - logger.info(f"Listening on port {port}") - - def do_connect(self, arg): - "connect [host] Connect to remote peer and set peer static key by alias" - parts = arg.split() - if len(parts) == 2: - # Host not specified, use localhost as default - host = "localhost" - port_str, alias = parts - elif len(parts) == 3: - # Host explicitly specified - logger.debug(parts) - host, port_str, alias = parts - else: - logger.error("Usage: connect [host] ") - return - try: - port = int(port_str) - except ValueError: - logger.error("Port must be an integer. " + port_str) - return - - contacts = dict(list_contacts()) - if alias not in contacts.keys(): - logger.error(f"No such alias '{alias}'. Use import_key first.") - return - - self.peer_static = load_static_key(contacts[alias]) - # Establish connection - self.peer = Peer(port, self.handle_data) - try: - self.peer.connect(host, port) - logger.info(f"Connected to {host}:{port}") - except Exception as e: - logger.error(f"Connection failed: {e}") - - def do_handshake(self, arg): - "handshake Initiate XK handshake with the peer (initiator role)" - if not self.peer or not self.peer_static: - logger.error("Need to connect first and import peer key.") - return - # Create handshake as initiator - self.handshake = NoiseXK(initiator=True, peer_static=self.peer_static) - # Send first handshake message - msg = self.handshake.write() - self.peer.send(msg) - logger.info("Sent handshake message 1") - - def handle_data(self, data: bytes): - "Callback for incoming raw data" - if self.handshake and not self.handshake.is_finished(): - # In the middle of handshake - try: - _ = self.handshake.read(data) - if not self.handshake.is_finished(): - # Send next handshake message - msg = self.handshake.write() - self.peer.send(msg) - logger.info("Exchanged handshake message") - else: - cs_s, cs_r = self.handshake.get_cipher_states() - self.messenger = Messenger(cs_s, cs_r) - logger.info("Handshake complete, cipher states ready") - return - except Exception as e: - logger.error(f"Handshake error: {e}") - return - - # If no handshake object, data might be initial handshake for responder - if not self.handshake and self.peer_static: - # Passive responder: start handshake on first data - try: - self.handshake = NoiseXK(initiator=False, peer_static=self.peer_static) - _ = self.handshake.read(data) - # Send response - msg = self.handshake.write() - self.peer.send(msg) - # Continue handshake if needed - if not self.handshake.is_finished(): - # Await next, then finalize - logger.info("Responder sent handshake message 2") - else: - cs_s, cs_r = self.handshake.get_cipher_states() - self.messenger = Messenger(cs_s, cs_r) - logger.info("Handshake complete (responder), cipher states ready") - return - except Exception as e: - logger.error(f"Responder handshake error: {e}") - return - - # After handshake, treat as encrypted application data - if self.messenger: - try: - plaintext = self.messenger.decrypt(data) - logger.info(f"Received plaintext: {plaintext.decode(errors='ignore')}") - except Exception as e: - # If decryption fails, queue raw ciphertext - self.inbox.append(data) - idx = len(self.inbox) - 1 - logger.warning(f"Failed decrypt, queued ciphertext at index {idx}") - else: - logger.warning("Received data but handshake not established.") - - def do_send(self, arg): - "send Encrypt and send a message to the peer" - if not self.messenger: - logger.error("Handshake not complete.") - return - payload = arg.encode() - ct = self.messenger.encrypt(payload) - self.peer.send(ct) - logger.info(f"Sent encrypted message ({len(ct)} bytes)") - - def do_decrypt(self, arg): - "decrypt Attempt to decrypt a queued ciphertext by index" - if not self.messenger: - logger.error("Handshake not complete.") - return - try: - idx = int(arg.strip()) - ct = self.inbox[idx] - pt = self.messenger.decrypt(ct) - logger.info(f"Decrypted [#{idx}]: {pt.decode(errors='ignore')}") - except Exception as e: - logger.error(f"Decrypt error: {e}") - - def do_show_state(self, arg): - "show_state Display current internal state" - state = { - "static_pub": self.static_pub.hex() if self.static_pub else None, - "peer_static": self.peer_static.hex() if self.peer_static else None, - "handshake": type(self.handshake).__name__ if self.handshake else None, - "handshake_finished": self.handshake.is_finished() if self.handshake else False, - "inbox_size": len(self.inbox), - } - print(json.dumps(state, indent=2)) - - def do_set_log_level(self, arg): - "set_log_level Adjust CLI verbosity" - level = arg.strip().upper() - if level not in ("DEBUG","INFO","WARNING","ERROR"): - logger.error("Invalid level. Choose DEBUG, INFO, WARNING, ERROR.") - return - logger.setLevel(getattr(logging, level)) - logger.info(f"Log level set to {level}") - - def do_exit(self, arg): - "Exit the CLI" - logger.info("Closing connection and exiting.") - if self.peer: - self.peer.close() - return True - - def do_EOF(self, arg): - return self.do_exit(arg) - - -if __name__ == "__main__": - # Adjust module search path - sys.path.insert(0, os.path.abspath(os.path.dirname(__file__))) - from transport import Peer # ensure imports - cli = NoiseCLI() - cli.cmdloop() diff --git a/protocol_prototype/Prototype/noise_xk_cli/keys.py b/protocol_prototype/Prototype/noise_xk_cli/keys.py deleted file mode 100644 index fb016d5..0000000 --- a/protocol_prototype/Prototype/noise_xk_cli/keys.py +++ /dev/null @@ -1,100 +0,0 @@ -""" -Module keys.py - -Génération, import, export et gestion de clés statiques X25519 pour Noise_XK CLI. -Stocke les contacts dans ~/.noise_xk_cli/contacts.json -""" -import os -import json -from typing import Tuple, Dict, List - -from cryptography.hazmat.primitives.asymmetric.x25519 import ( - X25519PrivateKey, - X25519PublicKey, -) -from cryptography.hazmat.primitives import serialization - -# Chemin vers le fichier de contacts -CONTACTS_FILE = os.path.expanduser("~/.noise_xk_cli/contacts.json") - - -def _ensure_contacts_file() -> None: - """ - Crée le répertoire et le fichier JSON si nécessaire. - """ - directory = os.path.dirname(CONTACTS_FILE) - os.makedirs(directory, exist_ok=True) - if not os.path.exists(CONTACTS_FILE): - with open(CONTACTS_FILE, 'w') as f: - json.dump({}, f) - - -def generate_static_key() -> Tuple[X25519PrivateKey, bytes]: - """ - Génère une paire de clés X25519 statique. - - Returns: - (private_key, public_key_bytes) - """ - private_key = X25519PrivateKey.generate() - public_key = private_key.public_key() - public_bytes = public_key.public_bytes( - encoding=serialization.Encoding.Raw, - format=serialization.PublicFormat.Raw - ) - return private_key, public_bytes - - -def public_key_to_hex(public_bytes: bytes) -> str: - """ - Convertit bytes de clé publique en hex. - """ - return public_bytes.hex() - - -def load_static_key(hex_str: str) -> X25519PublicKey: - """ - Charge une clé publique X25519 depuis sa représentation hex. - """ - data = bytes.fromhex(hex_str) - if len(data) != 32: - raise ValueError("Clé publique X25519 doit faire 32 octets (raw)") - return X25519PublicKey.from_public_bytes(data) - - -def save_contact(alias: str, pub_hex: str) -> None: - """ - Sauvegarde un contact dans le fichier JSON sous l'alias donné. - """ - _ensure_contacts_file() - with open(CONTACTS_FILE, 'r+') as f: - data: Dict[str, str] = json.load(f) - data[alias] = pub_hex - f.seek(0) - json.dump(data, f, indent=2) - f.truncate() - - -def list_contacts() -> List[Tuple[str, str]]: - """ - Retourne la liste des contacts (alias, clé publique hex). - """ - _ensure_contacts_file() - with open(CONTACTS_FILE, 'r') as f: - data: Dict[str, str] = json.load(f) - return list(data.items()) - - -def get_contact(alias: str) -> str: - """ - Récupère la clé publique hex pour un alias donné. - - Raises: - KeyError: si l'alias n'existe pas - """ - _ensure_contacts_file() - with open(CONTACTS_FILE, 'r') as f: - data: Dict[str, str] = json.load(f) - if alias not in data: - raise KeyError(f"Alias '{alias}' introuvable dans les contacts") - return data[alias] diff --git a/protocol_prototype/Prototype/noise_xk_cli/noise_handshake.py b/protocol_prototype/Prototype/noise_xk_cli/noise_handshake.py deleted file mode 100644 index 33c0692..0000000 --- a/protocol_prototype/Prototype/noise_xk_cli/noise_handshake.py +++ /dev/null @@ -1,69 +0,0 @@ -# noise_handshake.py - -from dissononce.extras.meta.protocol.factory import NoiseProtocolFactory -# from dissononce.exceptions import HandshakeError - -class NoiseXK: - """ - Wrapper around the Noise_XK_25519_ChaChaPoly_SHA256 handshake pattern, - using dissononce rev.34. - """ - - def __init__(self, initiator: bool, peer_static: bytes): - """ - :param initiator: True if this party initiates the handshake. - :param peer_static: The peer's static public key (32 bytes X25519). - """ - # Instantiate the factory for XK over 25519/ChaChaPoly/SHA256 - factory = NoiseProtocolFactory.from_name( - "Noise_XK_25519_ChaChaPoly_SHA256" - ) - - # Create the HandshakeState: - # - if initiator: supply 's' = peer_static - # - if responder: supply 'rs' = peer_static - self._hs = factory.create_handshakestate( - initiator=initiator, - s=peer_static if initiator else None, - rs=None if initiator else peer_static - ) - - def write(self, payload: bytes = b"") -> bytes: - """ - Generate the next handshake message to send. - :param payload: optional application data to embed (usually b""). - :return: the raw bytes of the handshake message. - """ - try: - msg = self._hs.write_message(payload) - return msg - except Exception as e: - raise Exception(f"Error writing handshake message: {e}") from e - - def read(self, message: bytes) -> bytes: - """ - Process a received handshake message. - :param message: raw bytes received from the peer. - :return: any payload decrypted from the message (usually b""). - """ - try: - payload = self._hs.read_message(message) - return payload - except Exception as e: - raise Exception(f"Error reading handshake message: {e}") from e - - def is_finished(self) -> bool: - """ - :return: True if the handshake is complete and cipher-states are ready. - """ - return self._hs.is_finished() - - def get_cipher_states(self): - """ - Split the HandshakeState into two CipherState objects once the handshake is done. - :return: (cs_send, cs_recv) - :raises: HandshakeError if called before handshake completion. - """ - if not self._hs.is_finished(): - raise Exception("Cannot split cipher-states before handshake is finished") - return self._hs.split() diff --git a/protocol_prototype/Prototype/noise_xk_cli/transport.py b/protocol_prototype/Prototype/noise_xk_cli/transport.py deleted file mode 100644 index 077d70c..0000000 --- a/protocol_prototype/Prototype/noise_xk_cli/transport.py +++ /dev/null @@ -1,114 +0,0 @@ -""" -Module transport.py - -Abstraction pair-à-pair TCP basique pour CLI Noise_XK. -Chaque instance peut écouter sur un port et/ou se connecter à un pair. -""" -import socket -import threading -from typing import Callable, Optional - - -class Peer: - """ - Peer TCP abstrait : écoute et connexion à un seul pair. - on_data(data: bytes) est appelé à la réception de données. - """ - def __init__(self, port: int, on_data: Callable[[bytes], None], listen: bool = True): - self.port = port - self.on_data = on_data - self.server_socket: Optional[socket.socket] = None - self.conn_socket: Optional[socket.socket] = None - self.alive = False - if listen: - self.start_listen() - - def start_listen(self) -> None: - """ - Démarre le socket serveur et la boucle d'accept. - """ - if self.server_socket: - return - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind(('0.0.0.0', self.port)) - sock.listen(1) - self.server_socket = sock - self.alive = True - threading.Thread(target=self._accept_loop, daemon=True).start() - - def _accept_loop(self) -> None: - while self.alive: - try: - client_sock, addr = self.server_socket.accept() - # Fermer connexion précédente si existante - if self.conn_socket: - self.conn_socket.close() - self.conn_socket = client_sock - threading.Thread( - target=self._read_loop, - args=(client_sock,), - daemon=True - ).start() - except OSError: - break - - def connect(self, host: str, port: int) -> None: - """ - Se connecte à un peer à l'adresse spécifiée. - """ - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.connect((host, port)) - # Fermer connexion précédente si existante - if self.conn_socket: - self.conn_socket.close() - self.conn_socket = sock - self.alive = True - threading.Thread( - target=self._read_loop, - args=(sock,), - daemon=True - ).start() - - def _read_loop(self, sock: socket.socket) -> None: - """ - Boucle de lecture asynchrone sur la socket. - """ - while self.alive: - try: - data = sock.recv(4096) - if not data: - break - self.on_data(data) - except OSError: - break - try: - sock.close() - except OSError: - pass - - def send(self, data: bytes) -> None: - """ - Envoie de données au peer (connexion active). - """ - if not self.conn_socket: - raise ConnectionError("Aucune connexion active pour envoyer") - self.conn_socket.sendall(data) - - def close(self) -> None: - """ - Ferme sockets d'écoute et de connexion. - """ - self.alive = False - if self.server_socket: - try: - self.server_socket.close() - except OSError: - pass - self.server_socket = None - if self.conn_socket: - try: - self.conn_socket.close() - except OSError: - pass - self.conn_socket = None