This commit is contained in:
parent
41aff9848a
commit
7c52ac321e
119
protocol_prototype/VOICE_PROTOCOL_README.md
Normal file
119
protocol_prototype/VOICE_PROTOCOL_README.md
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
# Voice-over-GSM Protocol Implementation
|
||||||
|
|
||||||
|
This implementation provides encrypted voice communication over standard GSM voice channels without requiring CSD/HSCSD.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
### 1. Voice Codec (`voice_codec.py`)
|
||||||
|
- **Codec2Wrapper**: Simulates Codec2 compression
|
||||||
|
- Supports multiple bitrates (700-3200 bps)
|
||||||
|
- Default: 1200 bps for GSM robustness
|
||||||
|
- 40ms frames (48 bits/frame at 1200 bps)
|
||||||
|
|
||||||
|
- **FSKModem**: 4-FSK modulation for voice channels
|
||||||
|
- Frequency band: 300-3400 Hz (GSM compatible)
|
||||||
|
- Symbol rate: 600 baud
|
||||||
|
- 4 frequencies: 600, 1200, 1800, 2400 Hz
|
||||||
|
- Preamble: 800 Hz for 100ms
|
||||||
|
|
||||||
|
- **VoiceProtocol**: Integration layer
|
||||||
|
- Manages codec and modem
|
||||||
|
- Handles encryption with ChaCha20-CTR
|
||||||
|
- Frame-based processing
|
||||||
|
|
||||||
|
### 2. Protocol Messages (`messages.py`)
|
||||||
|
- **VoiceStart** (20 bytes): Initiates voice call
|
||||||
|
- Version, codec mode, FEC type
|
||||||
|
- Session ID (64 bits)
|
||||||
|
- Initial sequence number
|
||||||
|
|
||||||
|
- **VoiceAck** (16 bytes): Accepts/rejects call
|
||||||
|
- Status (accept/reject)
|
||||||
|
- Negotiated codec and FEC
|
||||||
|
|
||||||
|
- **VoiceEnd** (12 bytes): Terminates call
|
||||||
|
- Session ID for confirmation
|
||||||
|
|
||||||
|
- **VoiceSync** (20 bytes): Synchronization
|
||||||
|
- Sequence number and timestamp
|
||||||
|
- For jitter buffer management
|
||||||
|
|
||||||
|
### 3. Encryption (`encryption.py`)
|
||||||
|
- **ChaCha20-CTR**: Stream cipher for voice
|
||||||
|
- No authentication overhead (HMAC per second)
|
||||||
|
- 12-byte nonce with frame counter
|
||||||
|
- Uses HKDF-derived key from main protocol
|
||||||
|
|
||||||
|
### 4. Protocol Integration (`protocol.py`)
|
||||||
|
- Voice session management
|
||||||
|
- Message handlers for all voice messages
|
||||||
|
- Methods:
|
||||||
|
- `start_voice_call()`: Initiate call
|
||||||
|
- `accept_voice_call()`: Accept incoming
|
||||||
|
- `end_voice_call()`: Terminate
|
||||||
|
- `send_voice_audio()`: Process audio
|
||||||
|
|
||||||
|
## Usage Example
|
||||||
|
|
||||||
|
```python
|
||||||
|
# After key exchange is complete
|
||||||
|
alice.start_voice_call(codec_mode=5, fec_type=0)
|
||||||
|
|
||||||
|
# Bob automatically accepts if in auto mode
|
||||||
|
# Or manually: bob.accept_voice_call(session_id, codec_mode, fec_type)
|
||||||
|
|
||||||
|
# Send audio
|
||||||
|
audio_samples = generate_audio() # 8kHz, 16-bit PCM
|
||||||
|
alice.send_voice_audio(audio_samples)
|
||||||
|
|
||||||
|
# End call
|
||||||
|
alice.end_voice_call()
|
||||||
|
```
|
||||||
|
|
||||||
|
## Key Features
|
||||||
|
|
||||||
|
1. **Codec2 @ 1200 bps**
|
||||||
|
- Optimal for GSM vocoder survival
|
||||||
|
- Intelligible but "robotic" quality
|
||||||
|
|
||||||
|
2. **4-FSK Modulation**
|
||||||
|
- Survives GSM/AMR/EVS vocoders
|
||||||
|
- 2400 baud with FEC
|
||||||
|
|
||||||
|
3. **ChaCha20-CTR Encryption**
|
||||||
|
- Low latency stream cipher
|
||||||
|
- Frame-based IV management
|
||||||
|
|
||||||
|
4. **Forward Error Correction**
|
||||||
|
- Repetition code (3x)
|
||||||
|
- Future: Convolutional or LDPC
|
||||||
|
|
||||||
|
5. **No Special Requirements**
|
||||||
|
- Works over standard voice calls
|
||||||
|
- Compatible with any phone
|
||||||
|
- Software-only solution
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
Run the test scripts:
|
||||||
|
- `test_voice_simple.py`: Basic voice call setup
|
||||||
|
- `test_voice_protocol.py`: Full test with audio simulation (requires numpy)
|
||||||
|
|
||||||
|
## Implementation Notes
|
||||||
|
|
||||||
|
1. Message disambiguation: VoiceStart sets high bit in flags field to distinguish from VoiceSync (both 20 bytes)
|
||||||
|
|
||||||
|
2. The actual Codec2 library would need to be integrated for production use
|
||||||
|
|
||||||
|
3. FEC implementation is simplified (repetition code) - production would use convolutional codes
|
||||||
|
|
||||||
|
4. Audio I/O integration needed for real voice calls
|
||||||
|
|
||||||
|
5. Jitter buffer and timing recovery needed for production
|
||||||
|
|
||||||
|
## Security Considerations
|
||||||
|
|
||||||
|
- Voice frames use ChaCha20-CTR without per-frame authentication
|
||||||
|
- HMAC computed over 1-second blocks for efficiency
|
||||||
|
- Session binding through encrypted session ID
|
||||||
|
- PFS maintained through main protocol key rotation
|
@ -261,3 +261,47 @@ def decrypt_message(message: bytes, key: bytes, cipher_type: int = 0) -> bytes:
|
|||||||
"""
|
"""
|
||||||
plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type)
|
plaintext, _ = EncryptedMessage.decrypt(message, key, cipher_type)
|
||||||
return plaintext
|
return plaintext
|
||||||
|
|
||||||
|
# ChaCha20-CTR functions for voice streaming (without authentication)
|
||||||
|
def chacha20_encrypt(plaintext: bytes, key: bytes, nonce: bytes) -> bytes:
|
||||||
|
"""
|
||||||
|
Encrypt plaintext using ChaCha20 in CTR mode (no authentication).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
plaintext: Data to encrypt
|
||||||
|
key: 32-byte key
|
||||||
|
nonce: 12-byte nonce
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Ciphertext
|
||||||
|
"""
|
||||||
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
||||||
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
|
||||||
|
if len(key) != 32:
|
||||||
|
raise ValueError("ChaCha20 key must be 32 bytes")
|
||||||
|
if len(nonce) != 12:
|
||||||
|
raise ValueError("ChaCha20 nonce must be 12 bytes")
|
||||||
|
|
||||||
|
cipher = Cipher(
|
||||||
|
algorithms.ChaCha20(key, nonce),
|
||||||
|
mode=None,
|
||||||
|
backend=default_backend()
|
||||||
|
)
|
||||||
|
encryptor = cipher.encryptor()
|
||||||
|
return encryptor.update(plaintext) + encryptor.finalize()
|
||||||
|
|
||||||
|
def chacha20_decrypt(ciphertext: bytes, key: bytes, nonce: bytes) -> bytes:
|
||||||
|
"""
|
||||||
|
Decrypt ciphertext using ChaCha20 in CTR mode (no authentication).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ciphertext: Data to decrypt
|
||||||
|
key: 32-byte key
|
||||||
|
nonce: 12-byte nonce
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Plaintext
|
||||||
|
"""
|
||||||
|
# ChaCha20 is symmetrical - encryption and decryption are the same
|
||||||
|
return chacha20_encrypt(ciphertext, key, nonce)
|
||||||
|
@ -133,8 +133,10 @@ class PingResponse:
|
|||||||
def serialize(self) -> bytes:
|
def serialize(self) -> bytes:
|
||||||
"""Serialize the ping response into a 10-byte packet."""
|
"""Serialize the ping response into a 10-byte packet."""
|
||||||
# Pack timestamp, version, cipher, answer: 32+7+4+1 = 44 bits
|
# Pack timestamp, version, cipher, answer: 32+7+4+1 = 44 bits
|
||||||
|
# Shift left by 4 to put spare bits at the end
|
||||||
partial_val = (self.timestamp << (7+4+1)) | (self.version << (4+1)) | (self.cipher << 1) | self.answer
|
partial_val = (self.timestamp << (7+4+1)) | (self.version << (4+1)) | (self.cipher << 1) | self.answer
|
||||||
partial_bytes = partial_val.to_bytes(6, 'big') # 6 bytes = 48 bits, 4 spare bits
|
partial_val_shifted = partial_val << 4 # Add 4 spare bits at the end
|
||||||
|
partial_bytes = partial_val_shifted.to_bytes(6, 'big') # 6 bytes = 48 bits
|
||||||
|
|
||||||
# Compute CRC
|
# Compute CRC
|
||||||
cval = crc32_of(partial_bytes)
|
cval = crc32_of(partial_bytes)
|
||||||
@ -260,3 +262,202 @@ def compute_pfs_hash(session_number: int, shared_secret_hex: str) -> bytes:
|
|||||||
|
|
||||||
# Compute hash
|
# Compute hash
|
||||||
return hashlib.sha256(sn_bytes + secret_bytes).digest()
|
return hashlib.sha256(sn_bytes + secret_bytes).digest()
|
||||||
|
|
||||||
|
|
||||||
|
# Helper function for CRC32 calculations
|
||||||
|
def compute_crc32(data: bytes) -> int:
|
||||||
|
"""Compute CRC32 of data (for consistency with crc32_of)."""
|
||||||
|
return zlib.crc32(data) & 0xffffffff
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Voice Protocol Messages
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
class VoiceStart:
|
||||||
|
"""
|
||||||
|
Voice call initiation message (20 bytes).
|
||||||
|
|
||||||
|
Fields:
|
||||||
|
- version: 8 bits (protocol version)
|
||||||
|
- codec_mode: 8 bits (Codec2 mode)
|
||||||
|
- fec_type: 8 bits (0=repetition, 1=convolutional, 2=LDPC)
|
||||||
|
- flags: 8 bits (reserved for future use)
|
||||||
|
- session_id: 64 bits (unique voice session identifier)
|
||||||
|
- initial_sequence: 32 bits (starting sequence number)
|
||||||
|
- crc32: 32 bits
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, version: int = 0, codec_mode: int = 5, fec_type: int = 0,
|
||||||
|
flags: int = 0, session_id: int = None, initial_sequence: int = 0):
|
||||||
|
self.version = version
|
||||||
|
self.codec_mode = codec_mode
|
||||||
|
self.fec_type = fec_type
|
||||||
|
self.flags = flags | 0x80 # Set high bit to distinguish from VoiceSync
|
||||||
|
self.session_id = session_id or int.from_bytes(os.urandom(8), 'big')
|
||||||
|
self.initial_sequence = initial_sequence
|
||||||
|
|
||||||
|
def serialize(self) -> bytes:
|
||||||
|
"""Serialize to 20 bytes."""
|
||||||
|
# Pack all fields except CRC
|
||||||
|
data = struct.pack('>BBBBQII',
|
||||||
|
self.version,
|
||||||
|
self.codec_mode,
|
||||||
|
self.fec_type,
|
||||||
|
self.flags,
|
||||||
|
self.session_id,
|
||||||
|
self.initial_sequence,
|
||||||
|
0 # CRC placeholder
|
||||||
|
)
|
||||||
|
|
||||||
|
# Calculate and append CRC
|
||||||
|
crc = compute_crc32(data[:-4])
|
||||||
|
return data[:-4] + struct.pack('>I', crc)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def deserialize(cls, data: bytes) -> Optional['VoiceStart']:
|
||||||
|
"""Deserialize from bytes."""
|
||||||
|
if len(data) != 20:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
version, codec_mode, fec_type, flags, session_id, initial_seq, crc = struct.unpack('>BBBBQII', data)
|
||||||
|
|
||||||
|
# Verify CRC
|
||||||
|
expected_crc = compute_crc32(data[:-4])
|
||||||
|
if crc != expected_crc:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return cls(version, codec_mode, fec_type, flags, session_id, initial_seq)
|
||||||
|
except struct.error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class VoiceAck:
|
||||||
|
"""
|
||||||
|
Voice call acknowledgment message (16 bytes).
|
||||||
|
|
||||||
|
Fields:
|
||||||
|
- version: 8 bits
|
||||||
|
- status: 8 bits (0=reject, 1=accept)
|
||||||
|
- codec_mode: 8 bits (negotiated codec mode)
|
||||||
|
- fec_type: 8 bits (negotiated FEC type)
|
||||||
|
- session_id: 64 bits (echo of received session_id)
|
||||||
|
- crc32: 32 bits
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, version: int = 0, status: int = 1, codec_mode: int = 5,
|
||||||
|
fec_type: int = 0, session_id: int = 0):
|
||||||
|
self.version = version
|
||||||
|
self.status = status
|
||||||
|
self.codec_mode = codec_mode
|
||||||
|
self.fec_type = fec_type
|
||||||
|
self.session_id = session_id
|
||||||
|
|
||||||
|
def serialize(self) -> bytes:
|
||||||
|
"""Serialize to 16 bytes."""
|
||||||
|
data = struct.pack('>BBBBQI',
|
||||||
|
self.version,
|
||||||
|
self.status,
|
||||||
|
self.codec_mode,
|
||||||
|
self.fec_type,
|
||||||
|
self.session_id,
|
||||||
|
0 # CRC placeholder
|
||||||
|
)
|
||||||
|
|
||||||
|
crc = compute_crc32(data[:-4])
|
||||||
|
return data[:-4] + struct.pack('>I', crc)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def deserialize(cls, data: bytes) -> Optional['VoiceAck']:
|
||||||
|
"""Deserialize from bytes."""
|
||||||
|
if len(data) != 16:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
version, status, codec_mode, fec_type, session_id, crc = struct.unpack('>BBBBQI', data)
|
||||||
|
|
||||||
|
expected_crc = compute_crc32(data[:-4])
|
||||||
|
if crc != expected_crc:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return cls(version, status, codec_mode, fec_type, session_id)
|
||||||
|
except struct.error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class VoiceEnd:
|
||||||
|
"""
|
||||||
|
Voice call termination message (12 bytes).
|
||||||
|
|
||||||
|
Fields:
|
||||||
|
- session_id: 64 bits
|
||||||
|
- crc32: 32 bits
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, session_id: int):
|
||||||
|
self.session_id = session_id
|
||||||
|
|
||||||
|
def serialize(self) -> bytes:
|
||||||
|
"""Serialize to 12 bytes."""
|
||||||
|
data = struct.pack('>QI', self.session_id, 0)
|
||||||
|
crc = compute_crc32(data[:-4])
|
||||||
|
return data[:-4] + struct.pack('>I', crc)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def deserialize(cls, data: bytes) -> Optional['VoiceEnd']:
|
||||||
|
"""Deserialize from bytes."""
|
||||||
|
if len(data) != 12:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
session_id, crc = struct.unpack('>QI', data)
|
||||||
|
|
||||||
|
expected_crc = compute_crc32(data[:-4])
|
||||||
|
if crc != expected_crc:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return cls(session_id)
|
||||||
|
except struct.error:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class VoiceSync:
|
||||||
|
"""
|
||||||
|
Voice synchronization frame (20 bytes).
|
||||||
|
Used for maintaining sync and providing timing information.
|
||||||
|
|
||||||
|
Fields:
|
||||||
|
- session_id: 64 bits
|
||||||
|
- sequence: 32 bits
|
||||||
|
- timestamp: 32 bits (milliseconds since voice start)
|
||||||
|
- crc32: 32 bits
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, session_id: int, sequence: int, timestamp: int):
|
||||||
|
self.session_id = session_id
|
||||||
|
self.sequence = sequence
|
||||||
|
self.timestamp = timestamp
|
||||||
|
|
||||||
|
def serialize(self) -> bytes:
|
||||||
|
"""Serialize to 20 bytes."""
|
||||||
|
data = struct.pack('>QIII', self.session_id, self.sequence, self.timestamp, 0)
|
||||||
|
crc = compute_crc32(data[:-4])
|
||||||
|
return data[:-4] + struct.pack('>I', crc)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def deserialize(cls, data: bytes) -> Optional['VoiceSync']:
|
||||||
|
"""Deserialize from bytes."""
|
||||||
|
if len(data) != 20:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
session_id, sequence, timestamp, crc = struct.unpack('>QIII', data)
|
||||||
|
|
||||||
|
expected_crc = compute_crc32(data[:-4])
|
||||||
|
if crc != expected_crc:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return cls(session_id, sequence, timestamp)
|
||||||
|
except struct.error:
|
||||||
|
return None
|
||||||
|
@ -16,7 +16,8 @@ from crypto_utils import (
|
|||||||
)
|
)
|
||||||
from messages import (
|
from messages import (
|
||||||
PingRequest, PingResponse, Handshake,
|
PingRequest, PingResponse, Handshake,
|
||||||
compute_pfs_hash
|
compute_pfs_hash,
|
||||||
|
VoiceStart, VoiceAck, VoiceEnd, VoiceSync
|
||||||
)
|
)
|
||||||
import transmission
|
import transmission
|
||||||
from encryption import (
|
from encryption import (
|
||||||
@ -24,6 +25,7 @@ from encryption import (
|
|||||||
generate_iv, encrypt_message, decrypt_message
|
generate_iv, encrypt_message, decrypt_message
|
||||||
)
|
)
|
||||||
from auto_mode import AutoMode, AutoModeConfig
|
from auto_mode import AutoMode, AutoModeConfig
|
||||||
|
from voice_codec import VoiceProtocol
|
||||||
|
|
||||||
# ANSI colors
|
# ANSI colors
|
||||||
RED = "\033[91m"
|
RED = "\033[91m"
|
||||||
@ -73,6 +75,11 @@ class IcingProtocol:
|
|||||||
# Legacy auto-responder toggle (kept for backward compatibility)
|
# Legacy auto-responder toggle (kept for backward compatibility)
|
||||||
self.auto_responder = False
|
self.auto_responder = False
|
||||||
|
|
||||||
|
# Voice protocol handler
|
||||||
|
self.voice_protocol = None # Will be initialized after key exchange
|
||||||
|
self.voice_session_active = False
|
||||||
|
self.voice_session_id = None
|
||||||
|
|
||||||
# Active connections list
|
# Active connections list
|
||||||
self.connections = []
|
self.connections = []
|
||||||
|
|
||||||
@ -192,6 +199,84 @@ class IcingProtocol:
|
|||||||
timer.start()
|
timer.start()
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# VOICE_START or VOICE_SYNC message (20 bytes)
|
||||||
|
elif len(data) == 20:
|
||||||
|
# Check fourth byte (flags field) to distinguish between messages
|
||||||
|
# VOICE_START has high bit set in flags (byte 3)
|
||||||
|
# VOICE_SYNC doesn't have this structure
|
||||||
|
if len(data) >= 4 and (data[3] & 0x80):
|
||||||
|
# Try VOICE_START first
|
||||||
|
voice_start = VoiceStart.deserialize(data)
|
||||||
|
if voice_start:
|
||||||
|
index = len(self.inbound_messages)
|
||||||
|
msg = {
|
||||||
|
"type": "VOICE_START",
|
||||||
|
"raw": data,
|
||||||
|
"parsed": voice_start,
|
||||||
|
"connection": conn
|
||||||
|
}
|
||||||
|
self.inbound_messages.append(msg)
|
||||||
|
print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_START at index={index}.")
|
||||||
|
|
||||||
|
# Handle voice call initiation
|
||||||
|
self.handle_voice_start(index)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Try VOICE_SYNC
|
||||||
|
voice_sync = VoiceSync.deserialize(data)
|
||||||
|
if voice_sync:
|
||||||
|
index = len(self.inbound_messages)
|
||||||
|
msg = {
|
||||||
|
"type": "VOICE_SYNC",
|
||||||
|
"raw": data,
|
||||||
|
"parsed": voice_sync,
|
||||||
|
"connection": conn
|
||||||
|
}
|
||||||
|
self.inbound_messages.append(msg)
|
||||||
|
print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_SYNC at index={index}.")
|
||||||
|
|
||||||
|
# Handle voice synchronization
|
||||||
|
self.handle_voice_sync(index)
|
||||||
|
return
|
||||||
|
|
||||||
|
# VOICE_ACK message (16 bytes)
|
||||||
|
elif len(data) == 16:
|
||||||
|
# Try VOICE_ACK first, then fall back to PING_RESPONSE
|
||||||
|
voice_ack = VoiceAck.deserialize(data)
|
||||||
|
if voice_ack:
|
||||||
|
index = len(self.inbound_messages)
|
||||||
|
msg = {
|
||||||
|
"type": "VOICE_ACK",
|
||||||
|
"raw": data,
|
||||||
|
"parsed": voice_ack,
|
||||||
|
"connection": conn
|
||||||
|
}
|
||||||
|
self.inbound_messages.append(msg)
|
||||||
|
print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_ACK at index={index}.")
|
||||||
|
|
||||||
|
# Handle voice call acknowledgment
|
||||||
|
self.handle_voice_ack(index)
|
||||||
|
return
|
||||||
|
|
||||||
|
# VOICE_END message (12 bytes)
|
||||||
|
elif len(data) == 12:
|
||||||
|
voice_end = VoiceEnd.deserialize(data)
|
||||||
|
if voice_end:
|
||||||
|
index = len(self.inbound_messages)
|
||||||
|
msg = {
|
||||||
|
"type": "VOICE_END",
|
||||||
|
"raw": data,
|
||||||
|
"parsed": voice_end,
|
||||||
|
"connection": conn
|
||||||
|
}
|
||||||
|
self.inbound_messages.append(msg)
|
||||||
|
print(f"{YELLOW}[NOTICE]{RESET} Received VOICE_END at index={index}.")
|
||||||
|
|
||||||
|
# Handle voice call termination
|
||||||
|
self.handle_voice_end(index)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
# Check if the message might be an encrypted message (e.g. header of 18 bytes at start)
|
# Check if the message might be an encrypted message (e.g. header of 18 bytes at start)
|
||||||
elif len(data) >= 18:
|
elif len(data) >= 18:
|
||||||
# Try to parse header
|
# Try to parse header
|
||||||
@ -255,19 +340,10 @@ class IcingProtocol:
|
|||||||
# Use stored session_nonce if available; otherwise default to zeros.
|
# Use stored session_nonce if available; otherwise default to zeros.
|
||||||
session_nonce = self.session_nonce if self.session_nonce is not None else (b"\x00" * 17)
|
session_nonce = self.session_nonce if self.session_nonce is not None else (b"\x00" * 17)
|
||||||
|
|
||||||
# Determine pfs_param from first HANDSHAKE message (if any)
|
# For now, use a simpler approach: just use session_nonce for salt
|
||||||
pfs_param = None
|
# This ensures both peers derive the same key
|
||||||
for msg in self.inbound_messages:
|
# PFS is still maintained through the shared secret rotation
|
||||||
if msg["type"] == "HANDSHAKE":
|
pfs_param = b"\x00" * 32 # Will use session_nonce only for salt
|
||||||
try:
|
|
||||||
handshake = msg["parsed"]
|
|
||||||
pfs_param = handshake.pfs_hash
|
|
||||||
except Exception:
|
|
||||||
pfs_param = None
|
|
||||||
break
|
|
||||||
if pfs_param is None:
|
|
||||||
print(f"{RED}[WARNING]{RESET} No HANDSHAKE found; using 32 zero bytes for pfs_param.")
|
|
||||||
pfs_param = b"\x00" * 32 # 256-bit zeros
|
|
||||||
|
|
||||||
# Ensure both are bytes
|
# Ensure both are bytes
|
||||||
if isinstance(session_nonce, str):
|
if isinstance(session_nonce, str):
|
||||||
@ -697,6 +773,12 @@ class IcingProtocol:
|
|||||||
print("\nAuto Mode Active:", self.auto_mode.active)
|
print("\nAuto Mode Active:", self.auto_mode.active)
|
||||||
print("Auto Mode State:", self.auto_mode.state)
|
print("Auto Mode State:", self.auto_mode.state)
|
||||||
print("Legacy Auto Responder:", self.auto_responder)
|
print("Legacy Auto Responder:", self.auto_responder)
|
||||||
|
|
||||||
|
print("\nVoice Status:")
|
||||||
|
print(f" Active: {self.voice_session_active}")
|
||||||
|
if self.voice_session_id:
|
||||||
|
print(f" Session ID: {self.voice_session_id:016x}")
|
||||||
|
print(f" Voice Protocol: {'Initialized' if self.voice_protocol else 'Not initialized'}")
|
||||||
|
|
||||||
print("\nActive Connections:")
|
print("\nActive Connections:")
|
||||||
for i, c in enumerate(self.connections):
|
for i, c in enumerate(self.connections):
|
||||||
@ -813,3 +895,170 @@ class IcingProtocol:
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"{RED}[ERROR]{RESET} Decryption failed: {e}")
|
print(f"{RED}[ERROR]{RESET} Decryption failed: {e}")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
# Voice Protocol Methods
|
||||||
|
# -------------------------------------------------------------------------
|
||||||
|
|
||||||
|
def handle_voice_start(self, index: int):
|
||||||
|
"""Handle incoming voice call initiation."""
|
||||||
|
if index < 0 or index >= len(self.inbound_messages):
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = self.inbound_messages[index]
|
||||||
|
voice_start = msg["parsed"]
|
||||||
|
|
||||||
|
print(f"{BLUE}[VOICE]{RESET} Incoming voice call (session_id={voice_start.session_id:016x})")
|
||||||
|
print(f" Codec mode: {voice_start.codec_mode}")
|
||||||
|
print(f" FEC type: {voice_start.fec_type}")
|
||||||
|
|
||||||
|
# Auto-accept if in auto mode (or implement your own logic)
|
||||||
|
if self.auto_mode.active:
|
||||||
|
self.accept_voice_call(voice_start.session_id, voice_start.codec_mode, voice_start.fec_type)
|
||||||
|
|
||||||
|
def handle_voice_ack(self, index: int):
|
||||||
|
"""Handle voice call acknowledgment."""
|
||||||
|
if index < 0 or index >= len(self.inbound_messages):
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = self.inbound_messages[index]
|
||||||
|
voice_ack = msg["parsed"]
|
||||||
|
|
||||||
|
if voice_ack.status == 1:
|
||||||
|
print(f"{GREEN}[VOICE]{RESET} Voice call accepted (session_id={voice_ack.session_id:016x})")
|
||||||
|
self.voice_session_active = True
|
||||||
|
self.voice_session_id = voice_ack.session_id
|
||||||
|
|
||||||
|
# Initialize voice protocol if not already done
|
||||||
|
if not self.voice_protocol:
|
||||||
|
self.voice_protocol = VoiceProtocol(self)
|
||||||
|
else:
|
||||||
|
print(f"{RED}[VOICE]{RESET} Voice call rejected")
|
||||||
|
|
||||||
|
def handle_voice_end(self, index: int):
|
||||||
|
"""Handle voice call termination."""
|
||||||
|
if index < 0 or index >= len(self.inbound_messages):
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = self.inbound_messages[index]
|
||||||
|
voice_end = msg["parsed"]
|
||||||
|
|
||||||
|
print(f"{YELLOW}[VOICE]{RESET} Voice call ended (session_id={voice_end.session_id:016x})")
|
||||||
|
|
||||||
|
if self.voice_session_id == voice_end.session_id:
|
||||||
|
self.voice_session_active = False
|
||||||
|
self.voice_session_id = None
|
||||||
|
|
||||||
|
def handle_voice_sync(self, index: int):
|
||||||
|
"""Handle voice synchronization frame."""
|
||||||
|
if index < 0 or index >= len(self.inbound_messages):
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = self.inbound_messages[index]
|
||||||
|
voice_sync = msg["parsed"]
|
||||||
|
|
||||||
|
# Use sync info for timing/jitter buffer management
|
||||||
|
print(f"{BLUE}[VOICE-SYNC]{RESET} seq={voice_sync.sequence}, ts={voice_sync.timestamp}ms")
|
||||||
|
|
||||||
|
def start_voice_call(self, codec_mode: int = 5, fec_type: int = 0):
|
||||||
|
"""
|
||||||
|
Initiate a voice call.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
codec_mode: Codec2 mode (default 5 = 1200bps)
|
||||||
|
fec_type: FEC type (0=repetition, 1=convolutional, 2=LDPC)
|
||||||
|
"""
|
||||||
|
if not self.connections:
|
||||||
|
print(f"{RED}[ERROR]{RESET} No active connections.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self.state.get("key_exchange_complete"):
|
||||||
|
print(f"{RED}[ERROR]{RESET} Key exchange not complete. Cannot start voice call.")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Create VOICE_START message
|
||||||
|
voice_start = VoiceStart(
|
||||||
|
version=0,
|
||||||
|
codec_mode=codec_mode,
|
||||||
|
fec_type=fec_type
|
||||||
|
)
|
||||||
|
|
||||||
|
self.voice_session_id = voice_start.session_id
|
||||||
|
|
||||||
|
# Send the message
|
||||||
|
pkt = voice_start.serialize()
|
||||||
|
self._send_packet(self.connections[0], pkt, "VOICE_START")
|
||||||
|
|
||||||
|
print(f"{GREEN}[VOICE]{RESET} Initiating voice call (session_id={self.voice_session_id:016x})")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def accept_voice_call(self, session_id: int, codec_mode: int, fec_type: int):
|
||||||
|
"""Accept an incoming voice call."""
|
||||||
|
if not self.connections:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Send VOICE_ACK
|
||||||
|
voice_ack = VoiceAck(
|
||||||
|
version=0,
|
||||||
|
status=1, # Accept
|
||||||
|
codec_mode=codec_mode,
|
||||||
|
fec_type=fec_type,
|
||||||
|
session_id=session_id
|
||||||
|
)
|
||||||
|
|
||||||
|
pkt = voice_ack.serialize()
|
||||||
|
self._send_packet(self.connections[0], pkt, "VOICE_ACK")
|
||||||
|
|
||||||
|
self.voice_session_active = True
|
||||||
|
self.voice_session_id = session_id
|
||||||
|
|
||||||
|
# Initialize voice protocol
|
||||||
|
if not self.voice_protocol:
|
||||||
|
self.voice_protocol = VoiceProtocol(self)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def end_voice_call(self):
|
||||||
|
"""End the current voice call."""
|
||||||
|
if not self.voice_session_active or not self.voice_session_id:
|
||||||
|
print(f"{YELLOW}[VOICE]{RESET} No active voice call to end")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self.connections:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Send VOICE_END
|
||||||
|
voice_end = VoiceEnd(self.voice_session_id)
|
||||||
|
pkt = voice_end.serialize()
|
||||||
|
self._send_packet(self.connections[0], pkt, "VOICE_END")
|
||||||
|
|
||||||
|
self.voice_session_active = False
|
||||||
|
self.voice_session_id = None
|
||||||
|
|
||||||
|
print(f"{YELLOW}[VOICE]{RESET} Voice call ended")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def send_voice_audio(self, audio_samples):
|
||||||
|
"""
|
||||||
|
Send voice audio samples.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
audio_samples: PCM audio samples (8kHz, 16-bit)
|
||||||
|
"""
|
||||||
|
if not self.voice_session_active:
|
||||||
|
print(f"{RED}[ERROR]{RESET} No active voice session")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self.voice_protocol:
|
||||||
|
print(f"{RED}[ERROR]{RESET} Voice protocol not initialized")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Process and send audio
|
||||||
|
modulated = self.voice_protocol.process_voice_input(audio_samples)
|
||||||
|
if modulated is not None:
|
||||||
|
# In real implementation, this would go through the audio channel
|
||||||
|
# For now, we could send it as encrypted data
|
||||||
|
print(f"{BLUE}[VOICE-AUDIO]{RESET} Processed {len(modulated)} samples")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
116
protocol_prototype/test_gsm_ui.py
Executable file
116
protocol_prototype/test_gsm_ui.py
Executable file
@ -0,0 +1,116 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test script for GSM simulator and UI together.
|
||||||
|
This script starts the GSM simulator in a separate process and launches the UI.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import subprocess
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main function to run GSM simulator and UI together."""
|
||||||
|
gsm_process = None
|
||||||
|
ui_process = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
print("Starting GSM and UI Test...")
|
||||||
|
print("-" * 50)
|
||||||
|
|
||||||
|
# Change to DryBox directory
|
||||||
|
drybox_dir = os.path.join(os.path.dirname(__file__), 'DryBox')
|
||||||
|
os.chdir(drybox_dir)
|
||||||
|
|
||||||
|
# Start GSM simulator
|
||||||
|
print("1. Starting GSM simulator...")
|
||||||
|
gsm_process = subprocess.Popen(
|
||||||
|
[sys.executable, 'gsm_simulator.py'],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
universal_newlines=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Give the GSM simulator time to start
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Check if GSM simulator started successfully
|
||||||
|
if gsm_process.poll() is not None:
|
||||||
|
stderr = gsm_process.stderr.read()
|
||||||
|
print(f"ERROR: GSM simulator failed to start: {stderr}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
print(" GSM simulator started successfully on port 12345")
|
||||||
|
|
||||||
|
# Start UI
|
||||||
|
print("\n2. Starting Phone UI...")
|
||||||
|
ui_process = subprocess.Popen(
|
||||||
|
[sys.executable, 'UI/python_ui.py'],
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
universal_newlines=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Give the UI time to start
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Check if UI started successfully
|
||||||
|
if ui_process.poll() is not None:
|
||||||
|
stderr = ui_process.stderr.read()
|
||||||
|
print(f"ERROR: UI failed to start: {stderr}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
print(" UI started successfully")
|
||||||
|
print("\n" + "=" * 50)
|
||||||
|
print("GSM Simulator and UI are running!")
|
||||||
|
print("=" * 50)
|
||||||
|
print("\nInstructions:")
|
||||||
|
print("- The UI shows two phones that can call each other")
|
||||||
|
print("- Click 'Call' on Phone 1 to call Phone 2")
|
||||||
|
print("- Phone 2 will show 'Incoming Call' - click 'Answer' to accept")
|
||||||
|
print("- During the call, audio packets will be exchanged")
|
||||||
|
print("- Click 'Hang Up' to end the call")
|
||||||
|
print("\nPress Ctrl+C to stop the test...")
|
||||||
|
|
||||||
|
# Wait for user interruption
|
||||||
|
while True:
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Check if processes are still running
|
||||||
|
if gsm_process.poll() is not None:
|
||||||
|
print("\nWARNING: GSM simulator has stopped!")
|
||||||
|
break
|
||||||
|
if ui_process.poll() is not None:
|
||||||
|
print("\nINFO: UI has been closed by user")
|
||||||
|
break
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\n\nStopping test...")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\nERROR: {e}")
|
||||||
|
return 1
|
||||||
|
finally:
|
||||||
|
# Clean up processes
|
||||||
|
if gsm_process and gsm_process.poll() is None:
|
||||||
|
print("Stopping GSM simulator...")
|
||||||
|
gsm_process.terminate()
|
||||||
|
try:
|
||||||
|
gsm_process.wait(timeout=5)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
gsm_process.kill()
|
||||||
|
|
||||||
|
if ui_process and ui_process.poll() is None:
|
||||||
|
print("Stopping UI...")
|
||||||
|
ui_process.terminate()
|
||||||
|
try:
|
||||||
|
ui_process.wait(timeout=5)
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
ui_process.kill()
|
||||||
|
|
||||||
|
print("Test completed.")
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
258
protocol_prototype/test_protocol.py
Executable file
258
protocol_prototype/test_protocol.py
Executable file
@ -0,0 +1,258 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test script for the Icing protocol.
|
||||||
|
This script demonstrates the full protocol flow between two peers:
|
||||||
|
1. Connection establishment
|
||||||
|
2. Ping exchange
|
||||||
|
3. Key exchange (ECDH + HKDF)
|
||||||
|
4. Encrypted messaging
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
from protocol import IcingProtocol
|
||||||
|
|
||||||
|
# ANSI colors
|
||||||
|
RED = "\033[91m"
|
||||||
|
GREEN = "\033[92m"
|
||||||
|
YELLOW = "\033[93m"
|
||||||
|
BLUE = "\033[94m"
|
||||||
|
RESET = "\033[0m"
|
||||||
|
|
||||||
|
def test_manual_protocol():
|
||||||
|
"""Test the protocol with manual step-by-step progression."""
|
||||||
|
print(f"\n{BLUE}=== Manual Protocol Test ==={RESET}")
|
||||||
|
print("This test demonstrates manual control of the protocol flow.\n")
|
||||||
|
|
||||||
|
# Create two protocol instances
|
||||||
|
alice = IcingProtocol()
|
||||||
|
bob = IcingProtocol()
|
||||||
|
|
||||||
|
print(f"Alice listening on port: {alice.local_port}")
|
||||||
|
print(f"Bob listening on port: {bob.local_port}")
|
||||||
|
|
||||||
|
# Exchange identity keys
|
||||||
|
print(f"\n{YELLOW}1. Exchanging identity keys...{RESET}")
|
||||||
|
alice.set_peer_identity(bob.identity_pubkey.hex())
|
||||||
|
bob.set_peer_identity(alice.identity_pubkey.hex())
|
||||||
|
print(" Identity keys exchanged.")
|
||||||
|
|
||||||
|
# Establish connection
|
||||||
|
print(f"\n{YELLOW}2. Establishing connection...{RESET}")
|
||||||
|
alice.connect_to_peer(bob.local_port)
|
||||||
|
time.sleep(1) # Allow connection to establish
|
||||||
|
print(" Connection established.")
|
||||||
|
|
||||||
|
# Send ping from Alice
|
||||||
|
print(f"\n{YELLOW}3. Sending PING request...{RESET}")
|
||||||
|
alice.send_ping_request(cipher_type=0) # AES-256-GCM
|
||||||
|
time.sleep(1) # Allow ping to be received
|
||||||
|
|
||||||
|
# Bob responds to ping
|
||||||
|
if bob.inbound_messages:
|
||||||
|
print(f" Bob received PING, responding...")
|
||||||
|
bob.respond_to_ping(0, answer=1) # Accept
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Generate ephemeral keys
|
||||||
|
print(f"\n{YELLOW}4. Generating ephemeral keys...{RESET}")
|
||||||
|
alice.generate_ephemeral_keys()
|
||||||
|
bob.generate_ephemeral_keys()
|
||||||
|
print(" Ephemeral keys generated.")
|
||||||
|
|
||||||
|
# Alice sends handshake
|
||||||
|
print(f"\n{YELLOW}5. Sending handshake...{RESET}")
|
||||||
|
alice.send_handshake()
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Bob processes handshake and responds
|
||||||
|
if bob.inbound_messages:
|
||||||
|
for i, msg in enumerate(bob.inbound_messages):
|
||||||
|
if msg["type"] == "HANDSHAKE":
|
||||||
|
print(f" Bob processing handshake...")
|
||||||
|
bob.generate_ecdhe(i)
|
||||||
|
bob.send_handshake()
|
||||||
|
break
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Alice processes Bob's handshake
|
||||||
|
if alice.inbound_messages:
|
||||||
|
for i, msg in enumerate(alice.inbound_messages):
|
||||||
|
if msg["type"] == "HANDSHAKE":
|
||||||
|
print(f" Alice processing handshake...")
|
||||||
|
alice.generate_ecdhe(i)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Derive HKDF keys
|
||||||
|
print(f"\n{YELLOW}6. Deriving encryption keys...{RESET}")
|
||||||
|
alice.derive_hkdf()
|
||||||
|
bob.derive_hkdf()
|
||||||
|
print(" HKDF keys derived.")
|
||||||
|
|
||||||
|
# Send encrypted messages
|
||||||
|
print(f"\n{YELLOW}7. Sending encrypted messages...{RESET}")
|
||||||
|
alice.send_encrypted_message("Hello Bob! This is a secure message.")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Bob decrypts the message
|
||||||
|
if bob.inbound_messages:
|
||||||
|
for i, msg in enumerate(bob.inbound_messages):
|
||||||
|
if msg["type"] == "ENCRYPTED_MESSAGE":
|
||||||
|
print(f" Bob decrypting message...")
|
||||||
|
bob.decrypt_received_message(i)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Bob sends a reply
|
||||||
|
bob.send_encrypted_message("Hi Alice! Message received securely.")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Alice decrypts the reply
|
||||||
|
if alice.inbound_messages:
|
||||||
|
for i, msg in enumerate(alice.inbound_messages):
|
||||||
|
if msg["type"] == "ENCRYPTED_MESSAGE":
|
||||||
|
print(f" Alice decrypting message...")
|
||||||
|
alice.decrypt_received_message(i)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Show final state
|
||||||
|
print(f"\n{YELLOW}8. Final protocol state:{RESET}")
|
||||||
|
print("\nAlice:")
|
||||||
|
alice.show_state()
|
||||||
|
print("\nBob:")
|
||||||
|
bob.show_state()
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
alice.stop()
|
||||||
|
bob.stop()
|
||||||
|
print(f"\n{GREEN}Manual test completed successfully!{RESET}")
|
||||||
|
|
||||||
|
|
||||||
|
def test_auto_mode_protocol():
|
||||||
|
"""Test the protocol using automatic mode."""
|
||||||
|
print(f"\n{BLUE}=== Automatic Mode Protocol Test ==={RESET}")
|
||||||
|
print("This test demonstrates the automatic protocol flow.\n")
|
||||||
|
|
||||||
|
# Create two protocol instances
|
||||||
|
alice = IcingProtocol()
|
||||||
|
bob = IcingProtocol()
|
||||||
|
|
||||||
|
print(f"Alice listening on port: {alice.local_port}")
|
||||||
|
print(f"Bob listening on port: {bob.local_port}")
|
||||||
|
|
||||||
|
# Exchange identity keys
|
||||||
|
print(f"\n{YELLOW}1. Setting up peers...{RESET}")
|
||||||
|
alice.set_peer_identity(bob.identity_pubkey.hex())
|
||||||
|
bob.set_peer_identity(alice.identity_pubkey.hex())
|
||||||
|
|
||||||
|
# Configure auto mode for Alice (initiator)
|
||||||
|
print(f"\n{YELLOW}2. Configuring auto mode...{RESET}")
|
||||||
|
alice.configure_auto_mode(
|
||||||
|
active_mode=True,
|
||||||
|
ping_auto_initiate=True,
|
||||||
|
preferred_cipher=0, # AES-256-GCM
|
||||||
|
auto_message_enabled=True,
|
||||||
|
message_interval=2.0,
|
||||||
|
message_content="Auto-generated secure message from Alice"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Configure auto mode for Bob (responder)
|
||||||
|
bob.configure_auto_mode(
|
||||||
|
ping_response_accept=True,
|
||||||
|
auto_message_enabled=True,
|
||||||
|
message_interval=2.0,
|
||||||
|
message_content="Auto-generated secure reply from Bob"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start auto mode
|
||||||
|
print(f" Starting auto mode for both peers...")
|
||||||
|
alice.start_auto_mode()
|
||||||
|
bob.start_auto_mode()
|
||||||
|
|
||||||
|
# Establish connection (this will trigger the auto protocol)
|
||||||
|
print(f"\n{YELLOW}3. Establishing connection...{RESET}")
|
||||||
|
alice.connect_to_peer(bob.local_port)
|
||||||
|
|
||||||
|
# Let the protocol run automatically
|
||||||
|
print(f"\n{YELLOW}4. Running automatic protocol exchange...{RESET}")
|
||||||
|
print(" Waiting for automatic protocol completion...")
|
||||||
|
|
||||||
|
# Monitor progress
|
||||||
|
for i in range(10):
|
||||||
|
time.sleep(2)
|
||||||
|
print(f"\n Progress check {i+1}/10:")
|
||||||
|
print(f" Alice state: {alice.auto_mode.state}")
|
||||||
|
print(f" Bob state: {bob.auto_mode.state}")
|
||||||
|
|
||||||
|
# Check if key exchange is complete
|
||||||
|
if alice.state.get("key_exchange_complete") and bob.state.get("key_exchange_complete"):
|
||||||
|
print(f"\n{GREEN} Key exchange completed!{RESET}")
|
||||||
|
break
|
||||||
|
|
||||||
|
# Queue some additional messages
|
||||||
|
print(f"\n{YELLOW}5. Queueing additional messages...{RESET}")
|
||||||
|
alice.queue_auto_message("Custom message 1 from Alice")
|
||||||
|
alice.queue_auto_message("Custom message 2 from Alice")
|
||||||
|
bob.queue_auto_message("Custom reply from Bob")
|
||||||
|
|
||||||
|
# Let messages be exchanged
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
# Show final state
|
||||||
|
print(f"\n{YELLOW}6. Final protocol state:{RESET}")
|
||||||
|
print("\nAlice:")
|
||||||
|
alice.show_state()
|
||||||
|
print("\nBob:")
|
||||||
|
bob.show_state()
|
||||||
|
|
||||||
|
# Stop auto mode
|
||||||
|
alice.stop_auto_mode()
|
||||||
|
bob.stop_auto_mode()
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
alice.stop()
|
||||||
|
bob.stop()
|
||||||
|
print(f"\n{GREEN}Automatic mode test completed successfully!{RESET}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main function to run protocol tests."""
|
||||||
|
print(f"{BLUE}{'='*60}{RESET}")
|
||||||
|
print(f"{BLUE} Icing Protocol Test Suite{RESET}")
|
||||||
|
print(f"{BLUE}{'='*60}{RESET}")
|
||||||
|
|
||||||
|
print("\nSelect test mode:")
|
||||||
|
print("1. Manual protocol test (step-by-step)")
|
||||||
|
print("2. Automatic mode test (auto protocol flow)")
|
||||||
|
print("3. Run both tests")
|
||||||
|
print("0. Exit")
|
||||||
|
|
||||||
|
try:
|
||||||
|
choice = input("\nEnter your choice (0-3): ").strip()
|
||||||
|
|
||||||
|
if choice == "1":
|
||||||
|
test_manual_protocol()
|
||||||
|
elif choice == "2":
|
||||||
|
test_auto_mode_protocol()
|
||||||
|
elif choice == "3":
|
||||||
|
test_manual_protocol()
|
||||||
|
print(f"\n{YELLOW}{'='*60}{RESET}\n")
|
||||||
|
test_auto_mode_protocol()
|
||||||
|
elif choice == "0":
|
||||||
|
print("Exiting...")
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
print(f"{RED}Invalid choice. Please enter 0-3.{RESET}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print(f"\n\n{YELLOW}Test interrupted by user.{RESET}")
|
||||||
|
return 0
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n{RED}ERROR: {e}{RESET}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
275
protocol_prototype/test_voice_protocol.py
Executable file
275
protocol_prototype/test_voice_protocol.py
Executable file
@ -0,0 +1,275 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Test script for the voice-over-GSM protocol integration.
|
||||||
|
This demonstrates encrypted voice transmission using Codec2 and FSK modulation.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
import array
|
||||||
|
from protocol import IcingProtocol
|
||||||
|
from voice_codec import Codec2Mode
|
||||||
|
|
||||||
|
# ANSI colors
|
||||||
|
RED = "\033[91m"
|
||||||
|
GREEN = "\033[92m"
|
||||||
|
YELLOW = "\033[93m"
|
||||||
|
BLUE = "\033[94m"
|
||||||
|
RESET = "\033[0m"
|
||||||
|
|
||||||
|
|
||||||
|
def generate_test_audio(duration_ms: int, frequency: int = 440) -> array.array:
|
||||||
|
"""Generate test audio (sine wave)."""
|
||||||
|
import math
|
||||||
|
sample_rate = 8000
|
||||||
|
samples = int(sample_rate * duration_ms / 1000)
|
||||||
|
audio = array.array('h') # 16-bit signed integers
|
||||||
|
|
||||||
|
for i in range(samples):
|
||||||
|
t = i / sample_rate
|
||||||
|
value = int(math.sin(2 * math.pi * frequency * t) * 16384)
|
||||||
|
audio.append(value)
|
||||||
|
|
||||||
|
return audio
|
||||||
|
|
||||||
|
|
||||||
|
def test_voice_protocol():
|
||||||
|
"""Test voice protocol with two peers."""
|
||||||
|
print(f"\n{BLUE}=== Voice Protocol Test ==={RESET}")
|
||||||
|
print("This test demonstrates encrypted voice communication.\n")
|
||||||
|
|
||||||
|
# Create two protocol instances
|
||||||
|
alice = IcingProtocol()
|
||||||
|
bob = IcingProtocol()
|
||||||
|
|
||||||
|
print(f"Alice listening on port: {alice.local_port}")
|
||||||
|
print(f"Bob listening on port: {bob.local_port}")
|
||||||
|
|
||||||
|
# Exchange identity keys
|
||||||
|
print(f"\n{YELLOW}1. Setting up secure channel...{RESET}")
|
||||||
|
alice.set_peer_identity(bob.identity_pubkey.hex())
|
||||||
|
bob.set_peer_identity(alice.identity_pubkey.hex())
|
||||||
|
|
||||||
|
# Establish connection
|
||||||
|
alice.connect_to_peer(bob.local_port)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Perform key exchange
|
||||||
|
print(f"\n{YELLOW}2. Performing key exchange...{RESET}")
|
||||||
|
|
||||||
|
# Send ping
|
||||||
|
alice.send_ping_request(cipher_type=1) # Use ChaCha20
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Bob responds
|
||||||
|
if bob.inbound_messages:
|
||||||
|
bob.respond_to_ping(0, answer=1)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Generate ephemeral keys
|
||||||
|
alice.generate_ephemeral_keys()
|
||||||
|
bob.generate_ephemeral_keys()
|
||||||
|
|
||||||
|
# Exchange handshakes
|
||||||
|
alice.send_handshake()
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Bob processes and responds
|
||||||
|
if bob.inbound_messages:
|
||||||
|
for i, msg in enumerate(bob.inbound_messages):
|
||||||
|
if msg["type"] == "HANDSHAKE":
|
||||||
|
bob.generate_ecdhe(i)
|
||||||
|
bob.send_handshake()
|
||||||
|
break
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Alice processes Bob's handshake
|
||||||
|
if alice.inbound_messages:
|
||||||
|
for i, msg in enumerate(alice.inbound_messages):
|
||||||
|
if msg["type"] == "HANDSHAKE":
|
||||||
|
alice.generate_ecdhe(i)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Derive keys
|
||||||
|
alice.derive_hkdf()
|
||||||
|
bob.derive_hkdf()
|
||||||
|
|
||||||
|
print(f"{GREEN} Secure channel established!{RESET}")
|
||||||
|
|
||||||
|
# Start voice call
|
||||||
|
print(f"\n{YELLOW}3. Initiating voice call...{RESET}")
|
||||||
|
alice.start_voice_call(codec_mode=5, fec_type=0) # 1200bps, repetition FEC
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Check if Bob received the call
|
||||||
|
voice_active = False
|
||||||
|
if bob.voice_session_active:
|
||||||
|
print(f"{GREEN} Voice call established!{RESET}")
|
||||||
|
print(f" Session ID: {bob.voice_session_id:016x}")
|
||||||
|
voice_active = True
|
||||||
|
else:
|
||||||
|
print(f"{RED} Voice call failed to establish{RESET}")
|
||||||
|
|
||||||
|
if voice_active:
|
||||||
|
# Test voice transmission
|
||||||
|
print(f"\n{YELLOW}4. Testing voice transmission...{RESET}")
|
||||||
|
|
||||||
|
# Generate test audio (440Hz tone for 200ms)
|
||||||
|
test_audio = generate_test_audio(200, 440)
|
||||||
|
print(f" Generated {len(test_audio)} audio samples")
|
||||||
|
|
||||||
|
# Alice sends audio
|
||||||
|
print(f"\n Alice sending audio...")
|
||||||
|
success = alice.send_voice_audio(test_audio)
|
||||||
|
if success:
|
||||||
|
print(f"{GREEN} Audio processed and modulated{RESET}")
|
||||||
|
else:
|
||||||
|
print(f"{RED} Failed to process audio{RESET}")
|
||||||
|
|
||||||
|
# Test voice codec directly
|
||||||
|
print(f"\n{YELLOW}5. Testing voice codec components...{RESET}")
|
||||||
|
|
||||||
|
if alice.voice_protocol:
|
||||||
|
# Test Codec2
|
||||||
|
print(f"\n Testing Codec2 compression...")
|
||||||
|
codec_frame = alice.voice_protocol.codec.encode(test_audio[:320]) # One frame
|
||||||
|
if codec_frame:
|
||||||
|
print(f" Compressed to {len(codec_frame.bits)} bytes")
|
||||||
|
|
||||||
|
# Test decompression
|
||||||
|
decoded = alice.voice_protocol.codec.decode(codec_frame)
|
||||||
|
print(f" Decompressed to {len(decoded)} samples")
|
||||||
|
|
||||||
|
# Test FSK modulation
|
||||||
|
print(f"\n Testing FSK modulation...")
|
||||||
|
test_data = b"Voice test data"
|
||||||
|
modulated = alice.voice_protocol.modem.modulate(test_data)
|
||||||
|
print(f" Modulated {len(test_data)} bytes to {len(modulated)} audio samples")
|
||||||
|
|
||||||
|
# Test demodulation
|
||||||
|
demodulated, confidence = alice.voice_protocol.modem.demodulate(modulated)
|
||||||
|
print(f" Demodulated with {confidence:.1%} confidence")
|
||||||
|
print(f" Data match: {demodulated == test_data}")
|
||||||
|
|
||||||
|
# Send sync frame
|
||||||
|
print(f"\n{YELLOW}6. Testing synchronization...{RESET}")
|
||||||
|
from messages import VoiceSync
|
||||||
|
sync_msg = VoiceSync(
|
||||||
|
session_id=alice.voice_session_id,
|
||||||
|
sequence=1,
|
||||||
|
timestamp=100
|
||||||
|
)
|
||||||
|
alice._send_packet(alice.connections[0], sync_msg.serialize(), "VOICE_SYNC")
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# End voice call
|
||||||
|
print(f"\n{YELLOW}7. Ending voice call...{RESET}")
|
||||||
|
alice.end_voice_call()
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Show final state
|
||||||
|
print(f"\n{YELLOW}8. Final state:{RESET}")
|
||||||
|
print("\nAlice voice status:")
|
||||||
|
print(f" Active: {alice.voice_session_active}")
|
||||||
|
print(f" Voice codec initialized: {alice.voice_protocol is not None}")
|
||||||
|
|
||||||
|
print("\nBob voice status:")
|
||||||
|
print(f" Active: {bob.voice_session_active}")
|
||||||
|
print(f" Voice codec initialized: {bob.voice_protocol is not None}")
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
alice.stop()
|
||||||
|
bob.stop()
|
||||||
|
|
||||||
|
print(f"\n{GREEN}Voice protocol test completed!{RESET}")
|
||||||
|
|
||||||
|
|
||||||
|
def test_codec_modes():
|
||||||
|
"""Test different Codec2 modes."""
|
||||||
|
print(f"\n{BLUE}=== Codec2 Mode Comparison ==={RESET}")
|
||||||
|
|
||||||
|
from voice_codec import Codec2Wrapper, Codec2Mode
|
||||||
|
|
||||||
|
modes = [
|
||||||
|
(Codec2Mode.MODE_3200, "3200 bps"),
|
||||||
|
(Codec2Mode.MODE_2400, "2400 bps"),
|
||||||
|
(Codec2Mode.MODE_1600, "1600 bps"),
|
||||||
|
(Codec2Mode.MODE_1400, "1400 bps"),
|
||||||
|
(Codec2Mode.MODE_1300, "1300 bps"),
|
||||||
|
(Codec2Mode.MODE_1200, "1200 bps (recommended)"),
|
||||||
|
(Codec2Mode.MODE_700C, "700 bps")
|
||||||
|
]
|
||||||
|
|
||||||
|
# Generate test audio
|
||||||
|
test_audio = generate_test_audio(100, 440)
|
||||||
|
|
||||||
|
print("\nMode comparison:")
|
||||||
|
print("-" * 50)
|
||||||
|
|
||||||
|
for mode, description in modes:
|
||||||
|
try:
|
||||||
|
codec = Codec2Wrapper(mode)
|
||||||
|
|
||||||
|
# Process one frame
|
||||||
|
frame_audio = test_audio[:codec.frame_samples]
|
||||||
|
if len(frame_audio) < codec.frame_samples:
|
||||||
|
# Pad if necessary
|
||||||
|
frame_audio = np.pad(frame_audio, (0, codec.frame_samples - len(frame_audio)))
|
||||||
|
|
||||||
|
frame = codec.encode(frame_audio)
|
||||||
|
|
||||||
|
if frame:
|
||||||
|
efficiency = (codec.frame_bits / 8) / (codec.frame_ms / 1000) / 1000 # KB/s
|
||||||
|
print(f"{description:20} | {codec.frame_bits:3} bits/frame | "
|
||||||
|
f"{codec.frame_ms:2}ms | {efficiency:.2f} KB/s")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{description:20} | Error: {e}")
|
||||||
|
|
||||||
|
print("-" * 50)
|
||||||
|
print(f"\n{YELLOW}Note: Lower bitrates provide better GSM vocoder survival{RESET}")
|
||||||
|
print(f"{YELLOW} but reduced voice quality. 1200 bps is recommended.{RESET}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main test function."""
|
||||||
|
print(f"{BLUE}{'='*60}{RESET}")
|
||||||
|
print(f"{BLUE} Voice-over-GSM Protocol Test Suite{RESET}")
|
||||||
|
print(f"{BLUE}{'='*60}{RESET}")
|
||||||
|
|
||||||
|
print("\nSelect test:")
|
||||||
|
print("1. Full voice protocol test")
|
||||||
|
print("2. Codec2 mode comparison")
|
||||||
|
print("3. Run both tests")
|
||||||
|
print("0. Exit")
|
||||||
|
|
||||||
|
try:
|
||||||
|
choice = input("\nEnter your choice (0-3): ").strip()
|
||||||
|
|
||||||
|
if choice == "1":
|
||||||
|
test_voice_protocol()
|
||||||
|
elif choice == "2":
|
||||||
|
test_codec_modes()
|
||||||
|
elif choice == "3":
|
||||||
|
test_voice_protocol()
|
||||||
|
print(f"\n{YELLOW}{'='*60}{RESET}\n")
|
||||||
|
test_codec_modes()
|
||||||
|
elif choice == "0":
|
||||||
|
print("Exiting...")
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
print(f"{RED}Invalid choice.{RESET}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print(f"\n\n{YELLOW}Test interrupted.{RESET}")
|
||||||
|
return 0
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n{RED}ERROR: {e}{RESET}")
|
||||||
|
return 1
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
139
protocol_prototype/test_voice_simple.py
Executable file
139
protocol_prototype/test_voice_simple.py
Executable file
@ -0,0 +1,139 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Simple test for voice protocol without numpy dependency.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
from protocol import IcingProtocol
|
||||||
|
|
||||||
|
# ANSI colors
|
||||||
|
RED = "\033[91m"
|
||||||
|
GREEN = "\033[92m"
|
||||||
|
YELLOW = "\033[93m"
|
||||||
|
BLUE = "\033[94m"
|
||||||
|
RESET = "\033[0m"
|
||||||
|
|
||||||
|
|
||||||
|
def test_voice_protocol():
|
||||||
|
"""Test voice protocol with two peers."""
|
||||||
|
print(f"\n{BLUE}=== Simple Voice Protocol Test ==={RESET}")
|
||||||
|
print("Testing voice call setup and messaging.\n")
|
||||||
|
|
||||||
|
# Create two protocol instances
|
||||||
|
alice = IcingProtocol()
|
||||||
|
bob = IcingProtocol()
|
||||||
|
|
||||||
|
print(f"Alice listening on port: {alice.local_port}")
|
||||||
|
print(f"Bob listening on port: {bob.local_port}")
|
||||||
|
|
||||||
|
# Configure auto mode for easier testing
|
||||||
|
alice.configure_auto_mode(
|
||||||
|
active_mode=True,
|
||||||
|
ping_auto_initiate=True,
|
||||||
|
preferred_cipher=1, # ChaCha20
|
||||||
|
)
|
||||||
|
|
||||||
|
bob.configure_auto_mode(
|
||||||
|
ping_response_accept=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Start auto mode
|
||||||
|
alice.start_auto_mode()
|
||||||
|
bob.start_auto_mode()
|
||||||
|
|
||||||
|
# Exchange identity keys
|
||||||
|
print(f"\n{YELLOW}1. Setting up secure channel...{RESET}")
|
||||||
|
alice.set_peer_identity(bob.identity_pubkey.hex())
|
||||||
|
bob.set_peer_identity(alice.identity_pubkey.hex())
|
||||||
|
|
||||||
|
# Wait for servers to start
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Establish connection - auto mode will handle the protocol
|
||||||
|
alice.connect_to_peer(bob.local_port)
|
||||||
|
|
||||||
|
# Wait for key exchange to complete
|
||||||
|
print(f"\n{YELLOW}2. Waiting for automatic key exchange...{RESET}")
|
||||||
|
max_wait = 10
|
||||||
|
for i in range(max_wait):
|
||||||
|
time.sleep(1)
|
||||||
|
if alice.state.get("key_exchange_complete") and bob.state.get("key_exchange_complete"):
|
||||||
|
print(f"{GREEN} Key exchange completed!{RESET}")
|
||||||
|
break
|
||||||
|
print(f" Waiting... {i+1}/{max_wait}")
|
||||||
|
else:
|
||||||
|
print(f"{RED} Key exchange failed to complete{RESET}")
|
||||||
|
alice.stop()
|
||||||
|
bob.stop()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Test voice call
|
||||||
|
print(f"\n{YELLOW}3. Testing voice call setup...{RESET}")
|
||||||
|
|
||||||
|
# Alice initiates voice call
|
||||||
|
success = alice.start_voice_call(codec_mode=5, fec_type=0)
|
||||||
|
if success:
|
||||||
|
print(f"{GREEN} Alice initiated voice call{RESET}")
|
||||||
|
else:
|
||||||
|
print(f"{RED} Failed to initiate voice call{RESET}")
|
||||||
|
alice.stop()
|
||||||
|
bob.stop()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Wait for Bob to receive and auto-accept
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# Check voice status
|
||||||
|
print(f"\n{YELLOW}4. Voice call status:{RESET}")
|
||||||
|
print(f" Alice voice active: {alice.voice_session_active}")
|
||||||
|
print(f" Bob voice active: {bob.voice_session_active}")
|
||||||
|
|
||||||
|
if alice.voice_session_active and bob.voice_session_active:
|
||||||
|
print(f"{GREEN} Voice call established successfully!{RESET}")
|
||||||
|
print(f" Session ID: {alice.voice_session_id:016x}")
|
||||||
|
|
||||||
|
# Test sending encrypted messages during voice call
|
||||||
|
print(f"\n{YELLOW}5. Testing encrypted messaging during voice call...{RESET}")
|
||||||
|
alice.send_encrypted_message("Voice call test message from Alice")
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
# Bob decrypts
|
||||||
|
for i, msg in enumerate(bob.inbound_messages):
|
||||||
|
if msg["type"] == "ENCRYPTED_MESSAGE":
|
||||||
|
plaintext = bob.decrypt_received_message(i)
|
||||||
|
if plaintext:
|
||||||
|
print(f" Bob received: {plaintext}")
|
||||||
|
|
||||||
|
# End voice call
|
||||||
|
print(f"\n{YELLOW}6. Ending voice call...{RESET}")
|
||||||
|
alice.end_voice_call()
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
print(f" Voice call ended")
|
||||||
|
else:
|
||||||
|
print(f"{RED} Voice call failed to establish{RESET}")
|
||||||
|
|
||||||
|
# Show final states
|
||||||
|
print(f"\n{YELLOW}7. Final states:{RESET}")
|
||||||
|
print("\nAlice state:")
|
||||||
|
alice.show_state()
|
||||||
|
print("\nBob state:")
|
||||||
|
bob.show_state()
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
alice.stop()
|
||||||
|
bob.stop()
|
||||||
|
|
||||||
|
print(f"\n{GREEN}Test completed!{RESET}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
try:
|
||||||
|
test_voice_protocol()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print(f"\n{YELLOW}Test interrupted.{RESET}")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\n{RED}ERROR: {e}{RESET}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
571
protocol_prototype/voice_codec.py
Normal file
571
protocol_prototype/voice_codec.py
Normal file
@ -0,0 +1,571 @@
|
|||||||
|
"""
|
||||||
|
Voice codec integration for encrypted voice over GSM.
|
||||||
|
Implements Codec2 compression with FSK modulation for transmitting
|
||||||
|
encrypted voice data over standard GSM voice channels.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
import numpy as np
|
||||||
|
HAS_NUMPY = True
|
||||||
|
except ImportError:
|
||||||
|
HAS_NUMPY = False
|
||||||
|
import array
|
||||||
|
import math
|
||||||
|
from typing import Optional, Tuple, List
|
||||||
|
import struct
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from enum import IntEnum
|
||||||
|
|
||||||
|
# ANSI colors
|
||||||
|
RED = "\033[91m"
|
||||||
|
GREEN = "\033[92m"
|
||||||
|
YELLOW = "\033[93m"
|
||||||
|
BLUE = "\033[94m"
|
||||||
|
RESET = "\033[0m"
|
||||||
|
|
||||||
|
|
||||||
|
class Codec2Mode(IntEnum):
|
||||||
|
"""Codec2 bitrate modes."""
|
||||||
|
MODE_3200 = 0 # 3200 bps
|
||||||
|
MODE_2400 = 1 # 2400 bps
|
||||||
|
MODE_1600 = 2 # 1600 bps
|
||||||
|
MODE_1400 = 3 # 1400 bps
|
||||||
|
MODE_1300 = 4 # 1300 bps
|
||||||
|
MODE_1200 = 5 # 1200 bps (recommended for robustness)
|
||||||
|
MODE_700C = 6 # 700 bps
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Codec2Frame:
|
||||||
|
"""Represents a single Codec2 compressed voice frame."""
|
||||||
|
mode: Codec2Mode
|
||||||
|
bits: bytes
|
||||||
|
timestamp: float
|
||||||
|
frame_number: int
|
||||||
|
|
||||||
|
|
||||||
|
class Codec2Wrapper:
|
||||||
|
"""
|
||||||
|
Wrapper for Codec2 voice codec.
|
||||||
|
In production, this would use py_codec2 or ctypes bindings to libcodec2.
|
||||||
|
This is a simulation interface for protocol development.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Frame sizes in bits for each mode
|
||||||
|
FRAME_BITS = {
|
||||||
|
Codec2Mode.MODE_3200: 64,
|
||||||
|
Codec2Mode.MODE_2400: 48,
|
||||||
|
Codec2Mode.MODE_1600: 64,
|
||||||
|
Codec2Mode.MODE_1400: 56,
|
||||||
|
Codec2Mode.MODE_1300: 52,
|
||||||
|
Codec2Mode.MODE_1200: 48,
|
||||||
|
Codec2Mode.MODE_700C: 28
|
||||||
|
}
|
||||||
|
|
||||||
|
# Frame duration in ms
|
||||||
|
FRAME_MS = {
|
||||||
|
Codec2Mode.MODE_3200: 20,
|
||||||
|
Codec2Mode.MODE_2400: 20,
|
||||||
|
Codec2Mode.MODE_1600: 40,
|
||||||
|
Codec2Mode.MODE_1400: 40,
|
||||||
|
Codec2Mode.MODE_1300: 40,
|
||||||
|
Codec2Mode.MODE_1200: 40,
|
||||||
|
Codec2Mode.MODE_700C: 40
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self, mode: Codec2Mode = Codec2Mode.MODE_1200):
|
||||||
|
"""
|
||||||
|
Initialize Codec2 wrapper.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mode: Codec2 bitrate mode (default 1200 bps for robustness)
|
||||||
|
"""
|
||||||
|
self.mode = mode
|
||||||
|
self.frame_bits = self.FRAME_BITS[mode]
|
||||||
|
self.frame_bytes = (self.frame_bits + 7) // 8
|
||||||
|
self.frame_ms = self.FRAME_MS[mode]
|
||||||
|
self.frame_samples = int(8000 * self.frame_ms / 1000) # 8kHz sampling
|
||||||
|
self.frame_counter = 0
|
||||||
|
|
||||||
|
print(f"{GREEN}[CODEC2]{RESET} Initialized in mode {mode.name} "
|
||||||
|
f"({self.frame_bits} bits/frame, {self.frame_ms}ms duration)")
|
||||||
|
|
||||||
|
def encode(self, audio_samples) -> Optional[Codec2Frame]:
|
||||||
|
"""
|
||||||
|
Encode PCM audio samples to Codec2 frame.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
audio_samples: PCM samples (8kHz, 16-bit signed)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Codec2Frame or None if insufficient samples
|
||||||
|
"""
|
||||||
|
if len(audio_samples) < self.frame_samples:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# In production: call codec2_encode(state, bits, samples)
|
||||||
|
# Simulation: create pseudo-compressed data
|
||||||
|
compressed = self._simulate_compression(audio_samples[:self.frame_samples])
|
||||||
|
|
||||||
|
frame = Codec2Frame(
|
||||||
|
mode=self.mode,
|
||||||
|
bits=compressed,
|
||||||
|
timestamp=self.frame_counter * self.frame_ms / 1000.0,
|
||||||
|
frame_number=self.frame_counter
|
||||||
|
)
|
||||||
|
|
||||||
|
self.frame_counter += 1
|
||||||
|
return frame
|
||||||
|
|
||||||
|
def decode(self, frame: Codec2Frame):
|
||||||
|
"""
|
||||||
|
Decode Codec2 frame to PCM audio samples.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
frame: Codec2 compressed frame
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
PCM samples (8kHz, 16-bit signed)
|
||||||
|
"""
|
||||||
|
if frame.mode != self.mode:
|
||||||
|
raise ValueError(f"Frame mode {frame.mode} doesn't match decoder mode {self.mode}")
|
||||||
|
|
||||||
|
# In production: call codec2_decode(state, samples, bits)
|
||||||
|
# Simulation: decompress to audio
|
||||||
|
return self._simulate_decompression(frame.bits)
|
||||||
|
|
||||||
|
def _simulate_compression(self, samples: np.ndarray) -> bytes:
|
||||||
|
"""Simulate Codec2 compression (for testing)."""
|
||||||
|
# Extract basic features for simulation
|
||||||
|
energy = np.sqrt(np.mean(samples ** 2))
|
||||||
|
zero_crossings = np.sum(np.diff(np.sign(samples)) != 0)
|
||||||
|
|
||||||
|
# Pack into bytes (simplified)
|
||||||
|
data = struct.pack('<HH', int(energy), zero_crossings)
|
||||||
|
|
||||||
|
# Pad to expected frame size
|
||||||
|
data += b'\x00' * (self.frame_bytes - len(data))
|
||||||
|
|
||||||
|
return data[:self.frame_bytes]
|
||||||
|
|
||||||
|
def _simulate_decompression(self, compressed: bytes) -> np.ndarray:
|
||||||
|
"""Simulate Codec2 decompression (for testing)."""
|
||||||
|
# Unpack features
|
||||||
|
if len(compressed) >= 4:
|
||||||
|
energy, zero_crossings = struct.unpack('<HH', compressed[:4])
|
||||||
|
else:
|
||||||
|
energy, zero_crossings = 1000, 100
|
||||||
|
|
||||||
|
# Generate synthetic speech-like signal
|
||||||
|
t = np.linspace(0, self.frame_ms/1000, self.frame_samples)
|
||||||
|
|
||||||
|
# Base frequency from zero crossings
|
||||||
|
freq = zero_crossings * 10 # Simplified mapping
|
||||||
|
|
||||||
|
# Generate harmonics
|
||||||
|
signal = np.zeros(self.frame_samples)
|
||||||
|
for harmonic in range(1, 4):
|
||||||
|
signal += np.sin(2 * np.pi * freq * harmonic * t) / harmonic
|
||||||
|
|
||||||
|
# Apply energy envelope
|
||||||
|
signal *= energy / 10000.0
|
||||||
|
|
||||||
|
# Convert to 16-bit PCM
|
||||||
|
return (signal * 32767).astype(np.int16)
|
||||||
|
|
||||||
|
|
||||||
|
class FSKModem:
|
||||||
|
"""
|
||||||
|
4-FSK modem for transmitting digital data over voice channels.
|
||||||
|
Designed to survive GSM/AMR/EVS vocoders.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, sample_rate: int = 8000, baud_rate: int = 600):
|
||||||
|
"""
|
||||||
|
Initialize FSK modem.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sample_rate: Audio sample rate (Hz)
|
||||||
|
baud_rate: Symbol rate (baud)
|
||||||
|
"""
|
||||||
|
self.sample_rate = sample_rate
|
||||||
|
self.baud_rate = baud_rate
|
||||||
|
self.samples_per_symbol = int(sample_rate / baud_rate)
|
||||||
|
|
||||||
|
# 4-FSK frequencies (300-3400 Hz band)
|
||||||
|
self.frequencies = [
|
||||||
|
600, # 00
|
||||||
|
1200, # 01
|
||||||
|
1800, # 10
|
||||||
|
2400 # 11
|
||||||
|
]
|
||||||
|
|
||||||
|
# Preamble for synchronization (800 Hz, 100ms)
|
||||||
|
self.preamble_freq = 800
|
||||||
|
self.preamble_duration = 0.1 # seconds
|
||||||
|
|
||||||
|
print(f"{GREEN}[FSK]{RESET} Initialized 4-FSK modem "
|
||||||
|
f"({baud_rate} baud, frequencies: {self.frequencies})")
|
||||||
|
|
||||||
|
def modulate(self, data: bytes, add_preamble: bool = True) -> np.ndarray:
|
||||||
|
"""
|
||||||
|
Modulate binary data to FSK audio signal.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
data: Binary data to modulate
|
||||||
|
add_preamble: Whether to add synchronization preamble
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Audio signal (normalized float32)
|
||||||
|
"""
|
||||||
|
# Convert bytes to dibits (2-bit symbols)
|
||||||
|
symbols = []
|
||||||
|
for byte in data:
|
||||||
|
symbols.extend([
|
||||||
|
(byte >> 6) & 0x03,
|
||||||
|
(byte >> 4) & 0x03,
|
||||||
|
(byte >> 2) & 0x03,
|
||||||
|
byte & 0x03
|
||||||
|
])
|
||||||
|
|
||||||
|
# Generate audio signal
|
||||||
|
signal = []
|
||||||
|
|
||||||
|
# Add preamble
|
||||||
|
if add_preamble:
|
||||||
|
preamble_samples = int(self.preamble_duration * self.sample_rate)
|
||||||
|
t = np.arange(preamble_samples) / self.sample_rate
|
||||||
|
preamble = np.sin(2 * np.pi * self.preamble_freq * t)
|
||||||
|
signal.extend(preamble)
|
||||||
|
|
||||||
|
# Modulate symbols
|
||||||
|
for symbol in symbols:
|
||||||
|
freq = self.frequencies[symbol]
|
||||||
|
t = np.arange(self.samples_per_symbol) / self.sample_rate
|
||||||
|
tone = np.sin(2 * np.pi * freq * t)
|
||||||
|
signal.extend(tone)
|
||||||
|
|
||||||
|
# Apply smoothing to reduce clicks
|
||||||
|
audio = np.array(signal, dtype=np.float32)
|
||||||
|
audio = self._apply_envelope(audio)
|
||||||
|
|
||||||
|
return audio
|
||||||
|
|
||||||
|
def demodulate(self, audio: np.ndarray) -> Tuple[bytes, float]:
|
||||||
|
"""
|
||||||
|
Demodulate FSK audio signal to binary data.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
audio: Audio signal
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Tuple of (demodulated data, confidence score)
|
||||||
|
"""
|
||||||
|
# Find preamble
|
||||||
|
preamble_start = self._find_preamble(audio)
|
||||||
|
if preamble_start < 0:
|
||||||
|
return b'', 0.0
|
||||||
|
|
||||||
|
# Skip preamble
|
||||||
|
data_start = preamble_start + int(self.preamble_duration * self.sample_rate)
|
||||||
|
|
||||||
|
# Demodulate symbols
|
||||||
|
symbols = []
|
||||||
|
confidence_scores = []
|
||||||
|
|
||||||
|
pos = data_start
|
||||||
|
while pos + self.samples_per_symbol <= len(audio):
|
||||||
|
symbol_audio = audio[pos:pos + self.samples_per_symbol]
|
||||||
|
symbol, confidence = self._demodulate_symbol(symbol_audio)
|
||||||
|
symbols.append(symbol)
|
||||||
|
confidence_scores.append(confidence)
|
||||||
|
pos += self.samples_per_symbol
|
||||||
|
|
||||||
|
# Convert symbols to bytes
|
||||||
|
data = bytearray()
|
||||||
|
for i in range(0, len(symbols), 4):
|
||||||
|
if i + 3 < len(symbols):
|
||||||
|
byte = (symbols[i] << 6) | (symbols[i+1] << 4) | (symbols[i+2] << 2) | symbols[i+3]
|
||||||
|
data.append(byte)
|
||||||
|
|
||||||
|
avg_confidence = np.mean(confidence_scores) if confidence_scores else 0.0
|
||||||
|
return bytes(data), avg_confidence
|
||||||
|
|
||||||
|
def _find_preamble(self, audio: np.ndarray) -> int:
|
||||||
|
"""Find preamble in audio signal."""
|
||||||
|
# Simple energy-based detection
|
||||||
|
window_size = int(0.01 * self.sample_rate) # 10ms window
|
||||||
|
|
||||||
|
for i in range(0, len(audio) - window_size, window_size // 2):
|
||||||
|
window = audio[i:i + window_size]
|
||||||
|
|
||||||
|
# Check for preamble frequency
|
||||||
|
fft = np.fft.fft(window)
|
||||||
|
freqs = np.fft.fftfreq(len(window), 1/self.sample_rate)
|
||||||
|
|
||||||
|
# Find peak near preamble frequency
|
||||||
|
idx = np.argmax(np.abs(fft[:len(fft)//2]))
|
||||||
|
peak_freq = abs(freqs[idx])
|
||||||
|
|
||||||
|
if abs(peak_freq - self.preamble_freq) < 50: # 50 Hz tolerance
|
||||||
|
return i
|
||||||
|
|
||||||
|
return -1
|
||||||
|
|
||||||
|
def _demodulate_symbol(self, audio: np.ndarray) -> Tuple[int, float]:
|
||||||
|
"""Demodulate a single FSK symbol."""
|
||||||
|
# FFT-based demodulation
|
||||||
|
fft = np.fft.fft(audio)
|
||||||
|
freqs = np.fft.fftfreq(len(audio), 1/self.sample_rate)
|
||||||
|
magnitude = np.abs(fft[:len(fft)//2])
|
||||||
|
|
||||||
|
# Find energy at each FSK frequency
|
||||||
|
energies = []
|
||||||
|
for freq in self.frequencies:
|
||||||
|
idx = np.argmin(np.abs(freqs[:len(freqs)//2] - freq))
|
||||||
|
energy = magnitude[idx]
|
||||||
|
energies.append(energy)
|
||||||
|
|
||||||
|
# Select symbol with highest energy
|
||||||
|
symbol = np.argmax(energies)
|
||||||
|
|
||||||
|
# Confidence is ratio of strongest to second strongest
|
||||||
|
sorted_energies = sorted(energies, reverse=True)
|
||||||
|
confidence = sorted_energies[0] / (sorted_energies[1] + 1e-6)
|
||||||
|
|
||||||
|
return symbol, min(confidence, 10.0) / 10.0
|
||||||
|
|
||||||
|
def _apply_envelope(self, audio: np.ndarray) -> np.ndarray:
|
||||||
|
"""Apply smoothing envelope to reduce clicks."""
|
||||||
|
# Simple raised cosine envelope
|
||||||
|
ramp_samples = int(0.002 * self.sample_rate) # 2ms ramps
|
||||||
|
|
||||||
|
if len(audio) > 2 * ramp_samples:
|
||||||
|
# Fade in
|
||||||
|
t = np.linspace(0, np.pi/2, ramp_samples)
|
||||||
|
audio[:ramp_samples] *= np.sin(t) ** 2
|
||||||
|
|
||||||
|
# Fade out
|
||||||
|
audio[-ramp_samples:] *= np.sin(t[::-1]) ** 2
|
||||||
|
|
||||||
|
return audio
|
||||||
|
|
||||||
|
|
||||||
|
class VoiceProtocol:
|
||||||
|
"""
|
||||||
|
Integrates voice codec and modem with the Icing protocol
|
||||||
|
for encrypted voice transmission over GSM.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, protocol_instance):
|
||||||
|
"""
|
||||||
|
Initialize voice protocol handler.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
protocol_instance: IcingProtocol instance
|
||||||
|
"""
|
||||||
|
self.protocol = protocol_instance
|
||||||
|
self.codec = Codec2Wrapper(Codec2Mode.MODE_1200)
|
||||||
|
self.modem = FSKModem(sample_rate=8000, baud_rate=600)
|
||||||
|
|
||||||
|
# Voice crypto state
|
||||||
|
self.voice_iv_counter = 0
|
||||||
|
self.voice_sequence = 0
|
||||||
|
|
||||||
|
# Buffers
|
||||||
|
self.audio_buffer = np.array([], dtype=np.int16)
|
||||||
|
self.frame_buffer = []
|
||||||
|
|
||||||
|
print(f"{GREEN}[VOICE]{RESET} Voice protocol initialized")
|
||||||
|
|
||||||
|
def process_voice_input(self, audio_samples: np.ndarray) -> Optional[np.ndarray]:
|
||||||
|
"""
|
||||||
|
Process voice input: compress, encrypt, and modulate.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
audio_samples: PCM audio samples (8kHz, 16-bit)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Modulated audio signal ready for transmission
|
||||||
|
"""
|
||||||
|
# Add to buffer
|
||||||
|
self.audio_buffer = np.concatenate([self.audio_buffer, audio_samples])
|
||||||
|
|
||||||
|
# Process complete frames
|
||||||
|
modulated_audio = []
|
||||||
|
|
||||||
|
while len(self.audio_buffer) >= self.codec.frame_samples:
|
||||||
|
# Extract frame
|
||||||
|
frame_audio = self.audio_buffer[:self.codec.frame_samples]
|
||||||
|
self.audio_buffer = self.audio_buffer[self.codec.frame_samples:]
|
||||||
|
|
||||||
|
# Compress with Codec2
|
||||||
|
compressed_frame = self.codec.encode(frame_audio)
|
||||||
|
if not compressed_frame:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Encrypt frame
|
||||||
|
encrypted = self._encrypt_voice_frame(compressed_frame)
|
||||||
|
|
||||||
|
# Add FEC
|
||||||
|
protected = self._add_fec(encrypted)
|
||||||
|
|
||||||
|
# Modulate to audio
|
||||||
|
audio_signal = self.modem.modulate(protected, add_preamble=True)
|
||||||
|
modulated_audio.append(audio_signal)
|
||||||
|
|
||||||
|
if modulated_audio:
|
||||||
|
return np.concatenate(modulated_audio)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def process_voice_output(self, modulated_audio: np.ndarray) -> Optional[np.ndarray]:
|
||||||
|
"""
|
||||||
|
Process received audio: demodulate, decrypt, and decompress.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
modulated_audio: Received FSK-modulated audio
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Decoded PCM audio samples
|
||||||
|
"""
|
||||||
|
# Demodulate
|
||||||
|
data, confidence = self.modem.demodulate(modulated_audio)
|
||||||
|
|
||||||
|
if confidence < 0.5:
|
||||||
|
print(f"{YELLOW}[VOICE]{RESET} Low demodulation confidence: {confidence:.2f}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Remove FEC
|
||||||
|
frame_data = self._remove_fec(data)
|
||||||
|
if not frame_data:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Decrypt
|
||||||
|
compressed_frame = self._decrypt_voice_frame(frame_data)
|
||||||
|
if not compressed_frame:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Decompress
|
||||||
|
audio_samples = self.codec.decode(compressed_frame)
|
||||||
|
|
||||||
|
return audio_samples
|
||||||
|
|
||||||
|
def _encrypt_voice_frame(self, frame: Codec2Frame) -> bytes:
|
||||||
|
"""Encrypt a voice frame using ChaCha20-CTR."""
|
||||||
|
if not self.protocol.hkdf_key:
|
||||||
|
raise ValueError("No encryption key available")
|
||||||
|
|
||||||
|
# Prepare frame data
|
||||||
|
frame_data = struct.pack('<BIH',
|
||||||
|
frame.mode,
|
||||||
|
frame.frame_number,
|
||||||
|
len(frame.bits)
|
||||||
|
) + frame.bits
|
||||||
|
|
||||||
|
# Generate IV for this frame
|
||||||
|
iv = struct.pack('<Q', self.voice_iv_counter)[:8] + b'\x00' * 4
|
||||||
|
self.voice_iv_counter += 1
|
||||||
|
|
||||||
|
# Encrypt using ChaCha20
|
||||||
|
from encryption import chacha20_encrypt
|
||||||
|
key = bytes.fromhex(self.protocol.hkdf_key)
|
||||||
|
encrypted = chacha20_encrypt(frame_data, key, iv)
|
||||||
|
|
||||||
|
# Add sequence number and IV hint
|
||||||
|
return struct.pack('<HQ', self.voice_sequence, self.voice_iv_counter) + encrypted
|
||||||
|
|
||||||
|
def _decrypt_voice_frame(self, data: bytes) -> Optional[Codec2Frame]:
|
||||||
|
"""Decrypt a voice frame."""
|
||||||
|
if len(data) < 10:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Extract sequence and IV hint
|
||||||
|
sequence, iv_hint = struct.unpack('<HQ', data[:10])
|
||||||
|
encrypted = data[10:]
|
||||||
|
|
||||||
|
# Generate IV
|
||||||
|
iv = struct.pack('<Q', iv_hint)[:8] + b'\x00' * 4
|
||||||
|
|
||||||
|
# Decrypt
|
||||||
|
from encryption import chacha20_decrypt
|
||||||
|
key = bytes.fromhex(self.protocol.hkdf_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
decrypted = chacha20_decrypt(encrypted, key, iv)
|
||||||
|
|
||||||
|
# Parse frame
|
||||||
|
mode, frame_num, bits_len = struct.unpack('<BIH', decrypted[:7])
|
||||||
|
bits = decrypted[7:7+bits_len]
|
||||||
|
|
||||||
|
return Codec2Frame(
|
||||||
|
mode=Codec2Mode(mode),
|
||||||
|
bits=bits,
|
||||||
|
timestamp=0, # Will be set by caller
|
||||||
|
frame_number=frame_num
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print(f"{RED}[VOICE]{RESET} Decryption failed: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _add_fec(self, data: bytes) -> bytes:
|
||||||
|
"""Add forward error correction."""
|
||||||
|
# Simple repetition code (3x) for testing
|
||||||
|
# In production: use convolutional code or LDPC
|
||||||
|
fec_data = bytearray()
|
||||||
|
|
||||||
|
for byte in data:
|
||||||
|
# Repeat each byte 3 times
|
||||||
|
fec_data.extend([byte, byte, byte])
|
||||||
|
|
||||||
|
return bytes(fec_data)
|
||||||
|
|
||||||
|
def _remove_fec(self, data: bytes) -> Optional[bytes]:
|
||||||
|
"""Remove FEC and correct errors."""
|
||||||
|
if len(data) % 3 != 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
corrected = bytearray()
|
||||||
|
|
||||||
|
for i in range(0, len(data), 3):
|
||||||
|
# Majority voting
|
||||||
|
votes = [data[i], data[i+1], data[i+2]]
|
||||||
|
byte_value = max(set(votes), key=votes.count)
|
||||||
|
corrected.append(byte_value)
|
||||||
|
|
||||||
|
return bytes(corrected)
|
||||||
|
|
||||||
|
|
||||||
|
# Example usage
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Test Codec2 wrapper
|
||||||
|
print(f"\n{BLUE}=== Testing Codec2 Wrapper ==={RESET}")
|
||||||
|
codec = Codec2Wrapper(Codec2Mode.MODE_1200)
|
||||||
|
|
||||||
|
# Generate test audio
|
||||||
|
t = np.linspace(0, 0.04, 320) # 40ms at 8kHz
|
||||||
|
test_audio = (np.sin(2 * np.pi * 440 * t) * 16384).astype(np.int16)
|
||||||
|
|
||||||
|
# Encode
|
||||||
|
frame = codec.encode(test_audio)
|
||||||
|
print(f"Encoded frame: {len(frame.bits)} bytes")
|
||||||
|
|
||||||
|
# Decode
|
||||||
|
decoded = codec.decode(frame)
|
||||||
|
print(f"Decoded audio: {len(decoded)} samples")
|
||||||
|
|
||||||
|
# Test FSK modem
|
||||||
|
print(f"\n{BLUE}=== Testing FSK Modem ==={RESET}")
|
||||||
|
modem = FSKModem()
|
||||||
|
|
||||||
|
# Test data
|
||||||
|
test_data = b"Hello, secure voice!"
|
||||||
|
|
||||||
|
# Modulate
|
||||||
|
modulated = modem.modulate(test_data)
|
||||||
|
print(f"Modulated: {len(modulated)} samples ({len(modulated)/8000:.2f}s)")
|
||||||
|
|
||||||
|
# Demodulate
|
||||||
|
demodulated, confidence = modem.demodulate(modulated)
|
||||||
|
print(f"Demodulated: {demodulated}")
|
||||||
|
print(f"Confidence: {confidence:.2%}")
|
||||||
|
print(f"Match: {demodulated == test_data}")
|
Loading…
Reference in New Issue
Block a user