Signature fix
All checks were successful
/ mirror (push) Successful in 5s

This commit is contained in:
stcb 2025-03-28 19:41:55 +02:00
parent 045d9ad417
commit 8d45d2e745

View File

@ -307,8 +307,8 @@ class IcingProtocol:
def generate_ecdhe(self, index: int): def generate_ecdhe(self, index: int):
""" """
Formerly 'respond_to_handshake'. Verifies ephemeral signature, computes ECDH, Formerly 'respond_to_handshake'. Verifies the inbound ephemeral signature
updates pfs_history, and stores shared_secret. Does NOT send a handshake back. and computes the ECDH shared secret, updating PFS history.
""" """
if index < 0 or index >= len(self.inbound_messages): if index < 0 or index >= len(self.inbound_messages):
print(f"{RED}[ERROR]{RESET} Invalid index {index}.") print(f"{RED}[ERROR]{RESET} Invalid index {index}.")
@ -318,33 +318,30 @@ class IcingProtocol:
print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a HANDSHAKE.") print(f"{RED}[ERROR]{RESET} inbound_messages[{index}] is not a HANDSHAKE.")
return return
# Parse fields ephemeral_pub = msg["parsed"]["ephemeral_pub"]
# (timestamp, ephemeral_pub, ephemeral_sig, pfs_hash) = ... ephemeral_sig = msg["parsed"]["ephemeral_sig"]
(ts32, ephemeral_pub, ephemeral_sig, pfs_val) = msg["parsed"]
# Use our raw_signature_to_der wrapper only if signature is 64 bytes.
# Otherwise, assume the signature is already DER-encoded.
from crypto_utils import raw_signature_to_der
if len(ephemeral_sig) == 64:
sig_der = raw_signature_to_der(ephemeral_sig)
else:
sig_der = ephemeral_sig
# 1) Verify ephemeral signature
if not self.peer_identity_pubkey_obj:
print(f"{RED}[ERROR]{RESET} Peer identity not set.")
return
# ephemeral_sig is raw r||s
sig_der = raw_signature_to_der(ephemeral_sig)
ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, ephemeral_pub) ok = verify_signature(self.peer_identity_pubkey_obj, sig_der, ephemeral_pub)
if not ok: if not ok:
print(f"{RED}[ERROR]{RESET} Ephemeral signature invalid.") print(f"{RED}[ERROR]{RESET} Ephemeral signature invalid.")
return return
print(f"{GREEN}[OK]{RESET} Ephemeral signature verified.") print(f"{GREEN}[OK]{RESET} Ephemeral signature verified.")
# 2) ECDH
if not self.ephemeral_privkey: if not self.ephemeral_privkey:
print(f"{YELLOW}[WARN]{RESET} No ephemeral_privkey. Cannot compute shared secret.") print(f"{YELLOW}[WARN]{RESET} No ephemeral_privkey available, cannot compute shared secret.")
return return
shared = compute_ecdh_shared_key(self.ephemeral_privkey, ephemeral_pub) shared = compute_ecdh_shared_key(self.ephemeral_privkey, ephemeral_pub)
self.shared_secret = shared.hex() self.shared_secret = shared.hex()
print(f"{GREEN}[OK]{RESET} Computed ECDH shared key = {self.shared_secret}") print(f"{GREEN}[OK]{RESET} Computed ECDH shared key = {self.shared_secret}")
# 3) Update pfs_history
# If we have an entry, increment session_number, store new secret
# If none, create session_number=1, store new secret
old_session, _ = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, "")) old_session, _ = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, ""))
new_session = 1 if old_session < 0 else old_session + 1 new_session = 1 if old_session < 0 else old_session + 1
self.pfs_history[self.peer_identity_pubkey_bytes] = (new_session, self.shared_secret) self.pfs_history[self.peer_identity_pubkey_bytes] = (new_session, self.shared_secret)