This repository has been archived on 2024-11-18. You can view files and clone it, but cannot push or open issues or pull requests.
poc/encryption/main.py

135 lines
4.8 KiB
Python
Raw Permalink Normal View History

import pyaudio
import wave
import os
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.backends import default_backend
import subprocess
import tempfile
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
public_key = private_key.public_key()
public_key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
def encrypt_bitstream(bitstream, peer_public_key_bytes):
peer_public_key = serialization.load_pem_public_key(peer_public_key_bytes, backend=default_backend())
shared_key = private_key.exchange(ec.ECDH(), peer_public_key)
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b'handshake data',
backend=default_backend()
).derive(shared_key)
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
padder = PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(bitstream) + padder.finalize()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return iv + ciphertext
def decrypt_bitstream(ciphertext, peer_public_key_bytes):
peer_public_key = serialization.load_pem_public_key(peer_public_key_bytes, backend=default_backend())
shared_key = private_key.exchange(ec.ECDH(), peer_public_key)
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b'handshake data',
backend=default_backend()
).derive(shared_key)
iv = ciphertext[:16]
actual_ciphertext = ciphertext[16:]
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
padded_data = decryptor.update(actual_ciphertext) + decryptor.finalize()
unpadder = PKCS7(algorithms.AES.block_size).unpadder()
data = unpadder.update(padded_data) + unpadder.finalize()
return data
def process_audio(input_wav_data):
original_size = len(input_wav_data)
with tempfile.NamedTemporaryFile(delete=True) as tmp_file:
tmp_file.write(input_wav_data)
tmp_file.flush()
command = [
'ffmpeg',
'-i', tmp_file.name,
'-f', 'amr',
'-ac', '1',
'-ar', '8000',
'-b:a', '12.2k',
'-filter:a', 'highpass=f=200,lowpass=f=3400',
'pipe:1'
]
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = process.communicate()
if process.returncode != 0:
print(f"Error processing audio: {stderr.decode()}")
return None, original_size, 0
compressed_size = len(stdout)
return stdout, original_size, compressed_size
def record_and_process_audio():
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
audio = pyaudio.PyAudio()
try:
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
except IOError as e:
print(f"Error opening audio stream: {e}")
return
print("Recording... Press Ctrl+C to stop.")
frames = []
try:
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
frames.append(data)
except KeyboardInterrupt:
print("Recording stopped.")
finally:
stream.stop_stream()
stream.close()
audio.terminate()
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
wf = wave.open(tmp_wav, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(audio.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
with open(tmp_wav.name, 'rb') as wav_file:
wav_data = wav_file.read()
processed_data, original_size, compressed_size = process_audio(wav_data)
if processed_data:
ciphertext = encrypt_bitstream(processed_data, public_key_bytes)
decrypted_chunk = decrypt_bitstream(ciphertext, public_key_bytes)
with open('output.amr', 'wb') as f:
f.write(decrypted_chunk)
print(f"Original size: {original_size} bytes")
print(f"Compressed size: {compressed_size} bytes")
print(f"Encrypted size: {len(ciphertext)} bytes")
os.remove(tmp_wav.name)
def main():
record_and_process_audio()
if __name__ == "__main__":
main()