add full cycle with amr compression and voice recording

This commit is contained in:
Bartoszkk 2024-07-05 09:02:58 +02:00
parent c3dbdefc3f
commit 6567bd020e
3 changed files with 77 additions and 40 deletions

View File

@ -1 +0,0 @@
Ϩ -‡Ş™oŹÂh`hŃ´´dď*ă_6j>/Ä7zCqóŢąS´WK#ĐŇ\بŠ‰G>o,>çU§ t'Qΰńp2GXpć_ü#v2üŻŽ _}4zŘ)_Ý<5F>úŘ·Y>erĂ^®S”!µÇb˛˘‰[ßó

View File

@ -1,16 +1,14 @@
#!/usr/bin/env python3 import pyaudio
import wave
import sys import os
from ffmpy import FFmpeg
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.padding import PKCS7
from cryptography.hazmat.primitives import padding
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
import os
import subprocess import subprocess
import tempfile
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend()) private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
public_key = private_key.public_key() public_key = private_key.public_key()
@ -19,8 +17,6 @@ public_key_bytes = public_key.public_bytes(
format=serialization.PublicFormat.SubjectPublicKeyInfo format=serialization.PublicFormat.SubjectPublicKeyInfo
) )
def encrypt_bitstream(bitstream, peer_public_key_bytes): def encrypt_bitstream(bitstream, peer_public_key_bytes):
peer_public_key = serialization.load_pem_public_key(peer_public_key_bytes, backend=default_backend()) peer_public_key = serialization.load_pem_public_key(peer_public_key_bytes, backend=default_backend())
shared_key = private_key.exchange(ec.ECDH(), peer_public_key) shared_key = private_key.exchange(ec.ECDH(), peer_public_key)
@ -34,13 +30,11 @@ def encrypt_bitstream(bitstream, peer_public_key_bytes):
iv = os.urandom(16) iv = os.urandom(16)
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend()) cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor() encryptor = cipher.encryptor()
padder = padding.PKCS7(algorithms.AES.block_size).padder() padder = PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(bitstream) + padder.finalize() padded_data = padder.update(bitstream) + padder.finalize()
ciphertext = encryptor.update(padded_data) + encryptor.finalize() ciphertext = encryptor.update(padded_data) + encryptor.finalize()
return iv + ciphertext return iv + ciphertext
def decrypt_bitstream(ciphertext, peer_public_key_bytes): def decrypt_bitstream(ciphertext, peer_public_key_bytes):
peer_public_key = serialization.load_pem_public_key(peer_public_key_bytes, backend=default_backend()) peer_public_key = serialization.load_pem_public_key(peer_public_key_bytes, backend=default_backend())
shared_key = private_key.exchange(ec.ECDH(), peer_public_key) shared_key = private_key.exchange(ec.ECDH(), peer_public_key)
@ -56,41 +50,85 @@ def decrypt_bitstream(ciphertext, peer_public_key_bytes):
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend()) cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor() decryptor = cipher.decryptor()
padded_data = decryptor.update(actual_ciphertext) + decryptor.finalize() padded_data = decryptor.update(actual_ciphertext) + decryptor.finalize()
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() unpadder = PKCS7(algorithms.AES.block_size).unpadder()
data = unpadder.update(padded_data) + unpadder.finalize() data = unpadder.update(padded_data) + unpadder.finalize()
return data return data
def process_audio(input_wav_data):
original_size = len(input_wav_data)
with tempfile.NamedTemporaryFile(delete=True) as tmp_file:
tmp_file.write(input_wav_data)
tmp_file.flush()
command = [
def process_audio(input_wav): 'ffmpeg',
'-i', tmp_file.name,
'-f', 'amr',
'-ac', '1',
'-ar', '8000',
'-b:a', '12.2k',
'-filter:a', 'highpass=f=200,lowpass=f=3400',
'pipe:1'
]
process = subprocess.Popen( process = subprocess.Popen(
['ffmpeg', '-i', input_wav, '-ar', '8000', '-b:a', '128k', '-f', 'mp3', 'pipe:1'], command, stdout=subprocess.PIPE, stderr=subprocess.PIPE
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
) )
return process.stdout.read() stdout, stderr = process.communicate()
if process.returncode != 0:
print(f"Error processing audio: {stderr.decode()}")
return None, original_size, 0
compressed_size = len(stdout)
return stdout, original_size, compressed_size
def record_and_process_audio():
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024
audio = pyaudio.PyAudio()
try:
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
except IOError as e:
print(f"Error opening audio stream: {e}")
return
print("Recording... Press Ctrl+C to stop.")
frames = []
try:
while True:
data = stream.read(CHUNK, exception_on_overflow=False)
frames.append(data)
except KeyboardInterrupt:
print("Recording stopped.")
finally:
stream.stop_stream()
stream.close()
audio.terminate()
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav:
wf = wave.open(tmp_wav, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(audio.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(frames))
wf.close()
with open(tmp_wav.name, 'rb') as wav_file:
wav_data = wav_file.read()
processed_data, original_size, compressed_size = process_audio(wav_data)
if processed_data:
ciphertext = encrypt_bitstream(processed_data, public_key_bytes)
decrypted_chunk = decrypt_bitstream(ciphertext, public_key_bytes)
with open('output.amr', 'wb') as f:
f.write(decrypted_chunk)
print(f"Original size: {original_size} bytes")
print(f"Compressed size: {compressed_size} bytes")
print(f"Encrypted size: {len(ciphertext)} bytes")
os.remove(tmp_wav.name)
def main(): def main():
input_wav = 'CantinaBand60.wav' record_and_process_audio()
with open(input_wav, 'rb') as f:
original_bitstream = f.read()
processed_bitstream = process_audio(input_wav)
print("Original WAV bitstream length:", len(original_bitstream))
print("Processed MP3 bitstream length:", len(processed_bitstream))
chunk_size = 1024
for i in range(0, len(processed_bitstream), chunk_size):
chunk = processed_bitstream[i:i + chunk_size]
ciphertext = encrypt_bitstream(chunk, public_key_bytes)
decrypted_chunk = decrypt_bitstream(ciphertext, public_key_bytes)
with open('output.mp3', 'ab') as f:
f.write(decrypted_chunk)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

BIN
encryption/output.amr Normal file

Binary file not shown.