Need fix signatures
All checks were successful
/ mirror (push) Successful in 4s

This commit is contained in:
stcb 2025-03-24 13:04:56 +02:00
parent 1b5bda2eb4
commit f5930eef82
3 changed files with 242 additions and 100 deletions

View File

@ -23,8 +23,8 @@ def main():
print(" generate_ephemeral_keys") print(" generate_ephemeral_keys")
print(" send_ping") print(" send_ping")
print(" send_handshake") print(" send_handshake")
print(" respond_ping <index> <answer_code>") print(" respond_ping <index> <0|1>")
print(" generate_ecdhe <index> # formerly respond_handshake") print(" generate_ecdhe <index>")
print(" auto_responder <on|off>") print(" auto_responder <on|off>")
print(" show_state") print(" show_state")
print(" exit\n") print(" exit\n")

View File

@ -2,7 +2,7 @@ import os
import struct import struct
import time import time
import zlib import zlib
import hashlib
def crc32_of(data: bytes) -> int: def crc32_of(data: bytes) -> int:
""" """
@ -11,116 +11,222 @@ def crc32_of(data: bytes) -> int:
return zlib.crc32(data) & 0xffffffff return zlib.crc32(data) & 0xffffffff
# ----------------------------------------------------------------------------- # =============================================================================
# Ping # 1) Ping Request (295 bits)
# ----------------------------------------------------------------------------- # - 256-bit nonce
# - 7-bit version
# - 32-bit CRC
# = 295 bits total
# In practice, we store 37 bytes (296 bits); 1 bit is unused.
# =============================================================================
def build_ping_request(version: int = 0) -> bytes: def build_ping_request(version: int) -> bytes:
""" """
Build a Ping request: Build a Ping request with:
- Nonce (32 bytes) - 256-bit nonce (32 bytes)
- Version (1 byte) - 7-bit version
- CRC-32 (4 bytes) - 32-bit CRC
Total = 37 bytes We do bit-packing. The final result is 37 bytes (296 bits), with 1 unused bit.
""" """
nonce = os.urandom(32) if not (0 <= version < 128):
partial = nonce + struct.pack("!B", version) raise ValueError("Version must fit in 7 bits (0..127)")
crc_val = crc32_of(partial)
return partial + struct.pack("!I", crc_val) # 1) Generate 256-bit nonce
nonce = os.urandom(32) # 32 bytes = 256 bits
# We'll build partial_data = [nonce (256 bits), version (7 bits)] as an integer
# Then compute CRC-32 over those bytes, then append 32 bits of CRC.
partial_int = int.from_bytes(nonce, 'big') << 7 # shift left 7 bits
partial_int |= version # put version in the low 7 bits
# Convert partial to bytes
# partial is 256+7=263 bits => needs 33 bytes to store
partial_bytes = partial_int.to_bytes(33, 'big')
# Compute CRC over partial_bytes
cval = crc32_of(partial_bytes)
# Now combine partial_data (263 bits) with 32 bits of CRC => 295 bits
# We'll store that in a single integer
final_int = (int.from_bytes(partial_bytes, 'big') << 32) | cval
# final_int is 263+32=295 bits, needs 37 bytes to store (the last bit is unused).
final_bytes = final_int.to_bytes(37, 'big')
return final_bytes
def parse_ping_request(data: bytes): def parse_ping_request(data: bytes):
""" """
Parse a Ping request (37 bytes). Parse a Ping request (37 bytes = 295 bits).
Return (nonce, version) or None if invalid. Returns (nonce, version) or None if invalid.
""" """
if len(data) != 37: if len(data) != 37:
return None return None
nonce = data[:32]
version = data[32] # Convert to int
crc_in = struct.unpack("!I", data[33:37])[0] val_295 = int.from_bytes(data, 'big') # 295 bits in a 37-byte integer
partial = data[:33] # Extract CRC (lowest 32 bits)
crc_calc = crc32_of(partial) crc_in = val_295 & 0xffffffff
# Then shift right 32 bits to get partial_data
partial_val = val_295 >> 32 # 263 bits
# Convert partial_val back to bytes
partial_bytes = partial_val.to_bytes(33, 'big')
# Recompute CRC
crc_calc = crc32_of(partial_bytes)
if crc_calc != crc_in: if crc_calc != crc_in:
return None return None
return (nonce, version)
# Now parse out nonce (256 bits) and version (7 bits)
# partial_val is 263 bits
version = partial_val & 0x7f # low 7 bits
nonce_val = partial_val >> 7 # high 256 bits
nonce_bytes = nonce_val.to_bytes(32, 'big')
return (nonce_bytes, version)
def build_ping_response(version: int, answer_code: int) -> bytes: # =============================================================================
# 2) Ping Response (72 bits)
# - 32-bit timestamp
# - 7-bit version + 1-bit answer => 8 bits
# - 32-bit CRC
# = 72 bits total => 9 bytes
# =============================================================================
def build_ping_response(version: int, answer: int) -> bytes:
""" """
Build a Ping response: Build a Ping response:
- Timestamp (8 bytes) - 32-bit timestamp (lowest 32 bits of current time in ms)
- Version (1 byte) - 7-bit version + 1-bit answer
- Answer code (1 byte) - 32-bit CRC
- CRC-32 (4 bytes) => 72 bits = 9 bytes
Total = 14 bytes
""" """
timestamp = struct.pack("!d", time.time()) if not (0 <= version < 128):
partial = timestamp + struct.pack("!B", version) + struct.pack("!B", answer_code) raise ValueError("Version must fit in 7 bits.")
crc_val = crc32_of(partial) if answer not in (0, 1):
return partial + struct.pack("!I", crc_val) raise ValueError("Answer must be 0 or 1.")
# 32-bit timestamp = current time in ms, truncated to 32 bits
t_ms = int(time.time() * 1000) & 0xffffffff
# partial = [timestamp (32 bits), version (7 bits), answer (1 bit)] => 40 bits
partial_val = (t_ms << 8) | ((version << 1) & 0xfe) | (answer & 0x01)
# partial_val is 40 bits => 5 bytes
partial_bytes = partial_val.to_bytes(5, 'big')
# CRC over these 5 bytes
cval = crc32_of(partial_bytes)
# Combine partial (40 bits) with 32 bits of CRC => 72 bits total
final_val = (int.from_bytes(partial_bytes, 'big') << 32) | cval
final_bytes = final_val.to_bytes(9, 'big')
return final_bytes
def parse_ping_response(data: bytes): def parse_ping_response(data: bytes):
""" """
Parse a Ping response (14 bytes). Parse a Ping response (72 bits = 9 bytes).
Return (timestamp, version, answer_code) or None if invalid. Return (timestamp_ms, version, answer) or None if invalid.
""" """
if len(data) != 14: if len(data) != 9:
return None return None
timestamp = struct.unpack("!d", data[:8])[0]
version = data[8] val_72 = int.from_bytes(data, 'big') # 72 bits
answer_code = data[9] crc_in = val_72 & 0xffffffff
crc_in = struct.unpack("!I", data[10:14])[0] partial_val = val_72 >> 32 # 40 bits
partial = data[:10]
crc_calc = crc32_of(partial) partial_bytes = partial_val.to_bytes(5, 'big')
crc_calc = crc32_of(partial_bytes)
if crc_calc != crc_in: if crc_calc != crc_in:
return None return None
return (timestamp, version, answer_code)
# Now parse partial_val
# partial_val = [timestamp(32 bits), version(7 bits), answer(1 bit)]
t_ms = (partial_val >> 8) & 0xffffffff
va = partial_val & 0xff # 8 bits = [7 bits version, 1 bit answer]
version = (va >> 1) & 0x7f
answer = va & 0x01
return (t_ms, version, answer)
# ----------------------------------------------------------------------------- # =============================================================================
# Handshake # 3) Handshake
# ----------------------------------------------------------------------------- # - 32-bit timestamp
# - 64-byte ephemeral pubkey (raw x||y = 512 bits)
# - 64-byte ephemeral signature (raw r||s = 512 bits)
# - 32-byte PFS hash (256 bits)
# - 32-bit CRC
# => total 4 + 64 + 64 + 32 + 4 = 168 bytes = 1344 bits
# =============================================================================
def build_handshake_message(ephemeral_pubkey: bytes, ephemeral_signature: bytes, timestamp: float) -> bytes: def build_handshake_message(timestamp: int,
ephemeral_pubkey: bytes,
ephemeral_signature: bytes,
pfs_hash: bytes) -> bytes:
""" """
Build a handshake message: Build handshake:
- ephemeral_pubkey (64 bytes) - 4 bytes: timestamp
- ephemeral_signature (72 bytes, DER + zero-pad) - 64 bytes: ephemeral_pubkey (x||y, raw)
- timestamp (8 bytes) - 64 bytes: ephemeral_signature (r||s, raw)
- CRC-32 (4 bytes) - 32 bytes: pfs_hash
Total = 148 bytes - 4 bytes: CRC-32
=> 168 bytes total
""" """
if len(ephemeral_pubkey) != 64: if len(ephemeral_pubkey) != 64:
raise ValueError("ephemeral_pubkey must be 64 bytes") raise ValueError("ephemeral_pubkey must be 64 bytes (raw x||y).")
if len(ephemeral_signature) > 72: if len(ephemeral_signature) != 64:
raise ValueError("ephemeral_signature too large") raise ValueError("ephemeral_signature must be 64 bytes (raw r||s).")
if len(pfs_hash) != 32:
raise ValueError("pfs_hash must be 32 bytes.")
sig_padded = ephemeral_signature.ljust(72, b'\x00') partial = struct.pack("!I", timestamp) \
ts_bytes = struct.pack("!d", timestamp) + ephemeral_pubkey \
partial = ephemeral_pubkey + sig_padded + ts_bytes + ephemeral_signature \
crc_val = crc32_of(partial) + pfs_hash
return partial + struct.pack("!I", crc_val) cval = crc32_of(partial)
return partial + struct.pack("!I", cval)
def parse_handshake_message(data: bytes): def parse_handshake_message(data: bytes):
""" """
Parse a handshake message (148 bytes). Parse handshake message (168 bytes).
Return (ephemeral_pubkey, ephemeral_signature, timestamp) or None if invalid. Return (timestamp, ephemeral_pub, ephemeral_sig, pfs_hash) or None if invalid.
""" """
if len(data) != 148: if len(data) != 168:
return None return None
ephemeral_pubkey = data[:64] partial = data[:-4] # first 164 bytes
sig_padded = data[64:136] crc_in = struct.unpack("!I", data[-4:])[0]
ts_bytes = data[136:144]
crc_in = struct.unpack("!I", data[144:148])[0]
partial = data[:144]
crc_calc = crc32_of(partial) crc_calc = crc32_of(partial)
if crc_calc != crc_in: if crc_calc != crc_in:
return None return None
ephemeral_signature = sig_padded.rstrip(b'\x00') # Now parse fields
timestamp = struct.unpack("!d", ts_bytes)[0] timestamp = struct.unpack("!I", partial[:4])[0]
return (ephemeral_pubkey, ephemeral_signature, timestamp) ephemeral_pub = partial[4:4+64]
ephemeral_sig = partial[68:68+64]
pfs_hash = partial[132:132+32]
return (timestamp, ephemeral_pub, ephemeral_sig, pfs_hash)
# =============================================================================
# 4) PFS Hash Helper
# If no previous session, return 32 zero bytes
# Otherwise, compute sha256(session_number || last_shared_secret).
# =============================================================================
def compute_pfs_hash(session_number: int, shared_secret_hex: str) -> bytes:
"""
Return 32 bytes (256 bits) for the PFS field.
If session_number < 0 => means no previous session => 32 zero bytes.
Otherwise => sha256( session_number (4 bytes) || shared_secret ).
"""
if session_number < 0:
return b"\x00" * 32
# Convert shared_secret_hex to raw bytes
secret_bytes = bytes.fromhex(shared_secret_hex)
# Pack session_number as 4 bytes
sn_bytes = struct.pack("!I", session_number)
return hashlib.sha256(sn_bytes + secret_bytes).digest()

View File

@ -14,11 +14,12 @@ from crypto_utils import (
from messages import ( from messages import (
build_ping_request, parse_ping_request, build_ping_request, parse_ping_request,
build_ping_response, parse_ping_response, build_ping_response, parse_ping_response,
build_handshake_message, parse_handshake_message build_handshake_message, parse_handshake_message,
compute_pfs_hash
) )
import transmission import transmission
# ANSI colors for pretty printing # ANSI colors
RED = "\033[91m" RED = "\033[91m"
GREEN = "\033[92m" GREEN = "\033[92m"
YELLOW = "\033[93m" YELLOW = "\033[93m"
@ -31,22 +32,21 @@ class IcingProtocol:
# Identity keys # Identity keys
self.identity_privkey, self.identity_pubkey = generate_identity_keys() self.identity_privkey, self.identity_pubkey = generate_identity_keys()
# Peer identity (public key) for verifying ephemeral signatures # Peer identity
self.peer_identity_pubkey_obj = None self.peer_identity_pubkey_obj = None
self.peer_identity_pubkey_bytes = None self.peer_identity_pubkey_bytes = None
# Ephemeral keys (our side) # Ephemeral keys
self.ephemeral_privkey = None self.ephemeral_privkey = None
self.ephemeral_pubkey = None self.ephemeral_pubkey = None
# Store the last computed shared secret (hex) from ECDH # Last computed shared secret (hex)
self.shared_secret = None self.shared_secret = None
# Track open connections # For PFS: track session_number + last_shared_secret per peer identity
self.connections = [] # Key: bytes(64) peer identity pubkey
# Value: (int session_number, str last_shared_secret_hex)
# A random listening port self.pfs_history: Dict[bytes, (int, str)] = {}
self.local_port = random.randint(30000, 40000)
# Inbound messages are stored for manual or auto handling # Inbound messages are stored for manual or auto handling
# Each entry: { 'type': str, 'raw': bytes, 'parsed': Any, 'connection': PeerConnection } # Each entry: { 'type': str, 'raw': bytes, 'parsed': Any, 'connection': PeerConnection }
@ -63,7 +63,11 @@ class IcingProtocol:
# Auto-responder toggle # Auto-responder toggle
self.auto_responder = False self.auto_responder = False
# Start the listener # Connections
self.connections = []
# Listening port
self.local_port = random.randint(30000, 40000)
self.server_listener = transmission.ServerListener( self.server_listener = transmission.ServerListener(
host="127.0.0.1", host="127.0.0.1",
port=self.local_port, port=self.local_port,
@ -86,7 +90,7 @@ class IcingProtocol:
We'll parse and store the message, then handle automatically if auto_responder is on. We'll parse and store the message, then handle automatically if auto_responder is on.
""" """
# Print data size in bits, not bytes # Print data size in bits, not bytes
bits_count = len(data) * 8 bits_count = len(data)*8
print(f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex())>60 else ''}") print(f"{GREEN}[RECV]{RESET} {bits_count} bits from peer: {data.hex()[:60]}{'...' if len(data.hex())>60 else ''}")
# Attempt to parse Ping request # Attempt to parse Ping request
@ -112,7 +116,7 @@ class IcingProtocol:
return return
# Attempt to parse Ping response # Attempt to parse Ping response
if len(data) == 14: if len(data) == 9:
parsed = parse_ping_response(data) parsed = parse_ping_response(data)
if parsed: if parsed:
ts, version, answer_code = parsed ts, version, answer_code = parsed
@ -128,10 +132,10 @@ class IcingProtocol:
return return
# Attempt to parse handshake # Attempt to parse handshake
if len(data) == 148: if len(data) == 168:
parsed = parse_handshake_message(data) parsed = parse_handshake_message(data)
if parsed: if parsed:
ephemeral_pub, ephemeral_sig, ts = parsed ts, ephemeral_pub, ephemeral_sig, pfs_hash = parsed
self.state["handshake_received"] = True self.state["handshake_received"] = True
index = len(self.inbound_messages) index = len(self.inbound_messages)
msg = { msg = {
@ -140,7 +144,8 @@ class IcingProtocol:
"parsed": { "parsed": {
"ephemeral_pub": ephemeral_pub, "ephemeral_pub": ephemeral_pub,
"ephemeral_sig": ephemeral_sig, "ephemeral_sig": ephemeral_sig,
"timestamp": ts "timestamp": ts,
"pfs hash": pfs_hash
}, },
"connection": conn "connection": conn
} }
@ -228,18 +233,48 @@ class IcingProtocol:
def send_handshake(self): def send_handshake(self):
""" """
Build and send a handshake message with ephemeral keys. Build and send handshake:
- 32-bit timestamp
- ephemeral_pubkey (64 bytes, raw x||y)
- ephemeral_signature (64 bytes, raw r||s)
- pfs_hash (32 bytes)
- 32-bit CRC
""" """
if not self.connections: if not self.connections:
print(f"{RED}[ERROR]{RESET} No active connections.") print(f"{RED}[ERROR]{RESET} No active connections.")
return return
if not self.ephemeral_privkey or not self.ephemeral_pubkey: if not self.ephemeral_privkey or not self.ephemeral_pubkey:
print(f"{RED}[ERROR]{RESET} Ephemeral keys not generated. Call 'generate_ephemeral_keys' first.") print(f"{RED}[ERROR]{RESET} Ephemeral keys not generated.")
return
if self.peer_identity_pubkey_bytes is None:
print(f"{RED}[ERROR]{RESET} Peer identity not set; needed for PFS tracking.")
return return
ephemeral_signature = sign_data(self.identity_privkey, self.ephemeral_pubkey) # 1) Sign ephemeral_pubkey as r||s
ts_now = time.time() # Instead of DER, we do raw r||s each 32 bytes
pkt = build_handshake_message(self.ephemeral_pubkey, ephemeral_signature, ts_now) sig_der = sign_data(self.identity_privkey, self.ephemeral_pubkey)
# Convert DER -> (r, s) -> raw 64 bytes
# Quick approach to parse DER using cryptography, or do a custom parse
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature
r_int, s_int = decode_dss_signature(sig_der)
r_bytes = r_int.to_bytes(32, 'big')
s_bytes = s_int.to_bytes(32, 'big')
raw_signature = r_bytes + s_bytes # 64 bytes
# 2) PFS hash
session_number, last_secret_hex = self.pfs_history.get(self.peer_identity_pubkey_bytes, (-1, ""))
pfs = compute_pfs_hash(session_number, last_secret_hex)
# 3) Build handshake
timestamp_32 = int(time.time() * 1000) & 0xffffffff
pkt = build_handshake_message(
timestamp_32,
self.ephemeral_pubkey, # 64 bytes raw
raw_signature, # 64 bytes raw
pfs # 32 bytes
)
# 4) Send
self._send_packet(self.connections[0], pkt, "HANDSHAKE") self._send_packet(self.connections[0], pkt, "HANDSHAKE")
self.state["handshake_sent"] = True self.state["handshake_sent"] = True
@ -316,17 +351,18 @@ class IcingProtocol:
def show_state(self): def show_state(self):
print(f"\n{YELLOW}=== Global State ==={RESET}") print(f"\n{YELLOW}=== Global State ==={RESET}")
print(f"Listening Port: {self.local_port}") print(f"Listening Port: {self.local_port}")
print(f"Identity PubKey: {self.identity_pubkey.hex()[:16]}... (64 bytes)") print(f"Identity PubKey: 512 bits => {self.identity_pubkey.hex()[:16]}...")
if self.peer_identity_pubkey_bytes: if self.peer_identity_pubkey_bytes:
print(f"Peer Identity PubKey: {self.peer_identity_pubkey_bytes.hex()[:16]}... (64 bytes)") print(f"Peer Identity PubKey: 512 bits => {self.peer_identity_pubkey_bytes.hex()[:16]}...")
else: else:
print("Peer Identity PubKey: [None]") print("Peer Identity PubKey: [None]")
print("\nEphemeral Keys:") print("\nEphemeral Keys:")
if self.ephemeral_pubkey: if self.ephemeral_pubkey:
print(f" ephemeral_pubkey={self.ephemeral_pubkey.hex()[:16]}...") print(f" ephemeral_pubkey: 512 bits => {self.ephemeral_pubkey.hex()[:16]}...")
else: else:
print(" ephemeral_pubkey=[None]") print(" ephemeral_pubkey: [None]")
print(f"\nShared Secret: {self.shared_secret if self.shared_secret else '[None]'}") print(f"\nShared Secret: {self.shared_secret if self.shared_secret else '[None]'}")
@ -342,7 +378,7 @@ class IcingProtocol:
print("\nInbound Message Queue:") print("\nInbound Message Queue:")
for i, m in enumerate(self.inbound_messages): for i, m in enumerate(self.inbound_messages):
print(f" [{i}] type={m['type']} len={len(m['raw'])} bytes") print(f" [{i}] type={m['type']} len={len(m['raw'])} bytes => {len(m['raw']) * 8} bits")
print() print()
def stop(self): def stop(self):