feat: gsm_sim generates log | removed redundant code from python_ui and bugfixes
All checks were successful
/ mirror (push) Successful in 4s

This commit is contained in:
Florian Griffon 2025-05-23 20:33:37 +03:00
parent d42c41edf1
commit 225b40b34f
12 changed files with 137 additions and 172 deletions

View File

@ -1,8 +1,6 @@
import sys
import random
import socket
import threading
import time
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QLabel, QFrame, QSizePolicy, QStyle
@ -155,20 +153,6 @@ class PhoneUI(QMainWindow):
}
""")
# Phone states
self.phone1_state = PhoneState.IDLE
self.phone2_state = PhoneState.IDLE
# Phone clients
self.phone1_client = PhoneClient("localhost", 12345, 0)
self.phone2_client = PhoneClient("localhost", 12345, 1)
self.phone1_client.data_received.connect(lambda data, cid: self.update_waveform(cid, data))
self.phone2_client.data_received.connect(lambda data, cid: self.update_waveform(cid, data))
self.phone1_client.state_changed.connect(lambda state, num, cid: self.set_phone_state(cid, self.map_state(state), num))
self.phone2_client.state_changed.connect(lambda state, num, cid: self.set_phone_state(cid, self.map_state(state), num))
self.phone1_client.start()
self.phone2_client.start()
# Main widget and layout
main_widget = QWidget()
self.setCentralWidget(main_widget)
@ -190,13 +174,29 @@ class PhoneUI(QMainWindow):
phone_controls_layout.setAlignment(Qt.AlignCenter)
main_layout.addLayout(phone_controls_layout)
# Phone 1
phone1_widget_container, self.phone1_display, self.phone1_button, self.phone1_waveform = self._create_phone_ui("Phone 1", self.phone1_action)
phone_controls_layout.addWidget(phone1_widget_container)
# Initialize phones
self.phones = []
for i in range(2):
client = PhoneClient("localhost", 12345, i)
client.data_received.connect(lambda data, cid=i: self.update_waveform(cid, data))
client.state_changed.connect(lambda state, num, cid=i: self.set_phone_state(cid, self.map_state(state), num))
client.start()
# Phone 2
phone2_widget_container, self.phone2_display, self.phone2_button, self.phone2_waveform = self._create_phone_ui("Phone 2", self.phone2_action)
phone_controls_layout.addWidget(phone2_widget_container)
# Corrected lambda to handle 'checked' and capture phone_id
phone_widget_container, phone_display, phone_button, phone_waveform, phone_status_label = self._create_phone_ui(
f"Phone {i+1}", lambda checked, phone_id=i: self.phone_action(phone_id)
)
self.phones.append({
'id': i,
'client': client,
'state': PhoneState.IDLE,
'button': phone_button,
'waveform': phone_waveform,
'number': "123-4567" if i == 0 else "987-6543",
'audio_timer': None,
'status_label': phone_status_label
})
phone_controls_layout.addWidget(phone_widget_container)
# Spacer
main_layout.addStretch(1)
@ -215,8 +215,8 @@ class PhoneUI(QMainWindow):
main_layout.addLayout(settings_layout)
# Initialize button states
self._update_phone_button_ui(self.phone1_button, self.phone1_state)
self._update_phone_button_ui(self.phone2_button, self.phone2_state)
for phone in self.phones:
self._update_phone_button_ui(phone['button'], phone['status_label'], phone['state'])
def _create_phone_ui(self, title, action_slot):
phone_container_widget = QWidget()
@ -257,16 +257,9 @@ class PhoneUI(QMainWindow):
waveform_widget = WaveformWidget(dynamic=False)
phone_layout.addWidget(waveform_widget, alignment=Qt.AlignCenter)
phone_display_frame.setProperty("statusLabel", phone_status_label)
return phone_container_widget, phone_display_frame, phone_button, waveform_widget
return phone_container_widget, phone_display_frame, phone_button, waveform_widget, phone_status_label
def _update_phone_button_ui(self, button, state, phone_number=""):
parent_widget = button.parentWidget()
if parent_widget:
frame = parent_widget.findChild(QFrame, "phoneDisplay")
if frame:
status_label = frame.property("statusLabel")
if status_label:
def _update_phone_button_ui(self, button, status_label, state, phone_number=""):
if state == PhoneState.IDLE:
button.setText("Call")
button.setIcon(self.style().standardIcon(QStyle.SP_MediaPlay))
@ -287,16 +280,55 @@ class PhoneUI(QMainWindow):
button.setIcon(self.style().standardIcon(QStyle.SP_DialogApplyButton))
status_label.setText(f"Incoming Call from {phone_number}")
button.setStyleSheet("background-color: #107C10;")
else:
print("Warning: statusLabel property not found")
else:
print("Warning: QFrame not found")
else:
print("Warning: Parent widget not found")
def phone_action(self, phone_id):
phone = self.phones[phone_id]
other_phone = self.phones[1 - phone_id]
print(f"Phone {phone_id + 1} Action, current state: {phone['state']}")
if phone['state'] == PhoneState.IDLE:
# Initiate a call
phone['state'] = PhoneState.CALLING
other_phone['state'] = PhoneState.RINGING
self._update_phone_button_ui(phone['button'], phone['status_label'], phone['state'], other_phone['number'])
self._update_phone_button_ui(other_phone['button'], other_phone['status_label'], other_phone['state'], phone['number'])
phone['client'].send("RINGING")
elif phone['state'] == PhoneState.RINGING:
# Answer the call
phone['state'] = PhoneState.IN_CALL
other_phone['state'] = PhoneState.IN_CALL
self._update_phone_button_ui(phone['button'], phone['status_label'], phone['state'], other_phone['number'])
self._update_phone_button_ui(other_phone['button'], other_phone['status_label'], other_phone['state'], phone['number'])
phone['client'].send("IN_CALL")
# Start audio timers for both phones
for p in [phone, other_phone]:
if not p['audio_timer'] or not p['audio_timer'].isActive():
p['audio_timer'] = QTimer(self)
p['audio_timer'].timeout.connect(lambda pid=p['id']: self.send_audio(pid))
p['audio_timer'].start(1000)
elif phone['state'] == PhoneState.IN_CALL or phone['state'] == PhoneState.CALLING:
# Hang up or cancel
phone['state'] = PhoneState.IDLE
other_phone['state'] = PhoneState.IDLE
self._update_phone_button_ui(phone['button'], phone['status_label'], phone['state'], "")
self._update_phone_button_ui(other_phone['button'], other_phone['status_label'], other_phone['state'], "")
phone['client'].send("CALL_END")
# Stop audio timers for both phones
for p in [phone, other_phone]:
if p['audio_timer']:
p['audio_timer'].stop()
def send_audio(self, phone_id):
phone = self.phones[phone_id]
if phone['state'] == PhoneState.IN_CALL:
message = f"Audio packet {random.randint(1, 1000)}"
phone['client'].send(message)
def update_waveform(self, client_id, data):
print(f"Updating waveform for client_id {client_id}")
waveform = self.phone1_waveform if client_id == 0 else self.phone2_waveform
waveform = self.phones[client_id]['waveform']
waveform.set_data(data)
def map_state(self, state_str):
@ -306,106 +338,31 @@ class PhoneUI(QMainWindow):
return PhoneState.IDLE
elif state_str == "IN_CALL":
return PhoneState.IN_CALL
return PhoneState.IDLE # Default to IDLE
return PhoneState.IDLE
def set_phone_state(self, client_id, state, number=""):
if client_id == 0:
self.phone1_state = state
self._update_phone_button_ui(self.phone1_button, self.phone1_state, number if number else "123-4567")
if state == PhoneState.IDLE and hasattr(self, 'phone1_audio_timer'):
self.phone1_audio_timer.stop()
elif state == PhoneState.IN_CALL and (not hasattr(self, 'phone1_audio_timer') or not self.phone1_audio_timer.isActive()):
self.phone1_audio_timer = QTimer(self)
self.phone1_audio_timer.timeout.connect(self.send_phone1_audio)
self.phone1_audio_timer.start(1000)
phone = self.phones[client_id]
other_phone = self.phones[1 - client_id]
phone['state'] = state
if state == PhoneState.RINGING:
self._update_phone_button_ui(phone['button'], phone['status_label'], state, other_phone['number'])
elif state == PhoneState.IN_CALL:
self._update_phone_button_ui(phone['button'], phone['status_label'], state, other_phone['number'])
else:
self.phone2_state = state
self._update_phone_button_ui(self.phone2_button, self.phone2_state, number if number else "987-6543")
if state == PhoneState.IDLE and hasattr(self, 'phone2_audio_timer'):
self.phone2_audio_timer.stop()
elif state == PhoneState.IN_CALL and (not hasattr(self, 'phone2_audio_timer') or not self.phone2_audio_timer.isActive()):
self.phone2_audio_timer = QTimer(self)
self.phone2_audio_timer.timeout.connect(self.send_phone2_audio)
self.phone2_audio_timer.start(1000)
def phone1_action(self):
print("Phone 1 Action")
if self.phone1_state == PhoneState.IDLE:
self.phone1_state = PhoneState.CALLING
self.phone1_client.send("RINGING")
self._update_phone_button_ui(self.phone1_button, self.phone1_state, "123-4567")
elif self.phone1_state == PhoneState.CALLING:
self.phone1_state = PhoneState.IDLE
self.phone1_client.send("CALL_END")
self._update_phone_button_ui(self.phone1_button, self.phone1_state)
if hasattr(self, 'phone1_audio_timer'):
self.phone1_audio_timer.stop()
elif self.phone1_state == PhoneState.RINGING:
self.phone1_state = PhoneState.IN_CALL
self.phone2_state = PhoneState.IN_CALL # Sync both phones
self.phone1_client.send("IN_CALL")
self._update_phone_button_ui(self.phone1_button, self.phone1_state, "123-4567")
self._update_phone_button_ui(self.phone2_button, self.phone2_state, "987-6543")
# Start audio timer
self.phone1_audio_timer = QTimer(self)
self.phone1_audio_timer.timeout.connect(self.send_phone1_audio)
self.phone1_audio_timer.start(1000)
elif self.phone1_state == PhoneState.IN_CALL:
self.phone1_state = PhoneState.IDLE
self.phone2_state = PhoneState.IDLE # Sync both phones
self.phone1_client.send("CALL_END")
self._update_phone_button_ui(self.phone1_button, self.phone1_state)
self._update_phone_button_ui(self.phone2_button, self.phone2_state)
if hasattr(self, 'phone1_audio_timer'):
self.phone1_audio_timer.stop()
def send_phone1_audio(self):
if self.phone1_state == PhoneState.IN_CALL:
message = f"Audio packet {random.randint(1, 1000)}"
self.phone1_client.send(message)
def phone2_action(self):
print("Phone 2 Action")
if self.phone2_state == PhoneState.IDLE:
self.phone2_state = PhoneState.CALLING
self.phone2_client.send("RINGING")
self._update_phone_button_ui(self.phone2_button, self.phone2_state, "987-6543")
elif self.phone2_state == PhoneState.CALLING:
self.phone2_state = PhoneState.IDLE
self.phone2_client.send("CALL_END")
self._update_phone_button_ui(self.phone2_button, self.phone2_state)
if hasattr(self, 'phone2_audio_timer'):
self.phone2_audio_timer.stop()
elif self.phone2_state == PhoneState.RINGING:
self.phone2_state = PhoneState.IN_CALL
self.phone1_state = PhoneState.IN_CALL # Sync both phones
self.phone2_client.send("IN_CALL")
self._update_phone_button_ui(self.phone2_button, self.phone2_state, "987-6543")
self._update_phone_button_ui(self.phone1_button, self.phone1_state, "123-4567")
# Start audio timer
self.phone2_audio_timer = QTimer(self)
self.phone2_audio_timer.timeout.connect(self.send_phone2_audio)
self.phone2_audio_timer.start(1000)
elif self.phone2_state == PhoneState.IN_CALL:
self.phone2_state = PhoneState.IDLE
self.phone1_state = PhoneState.IDLE # Sync both phones
self.phone2_client.send("CALL_END")
self._update_phone_button_ui(self.phone2_button, self.phone2_state)
self._update_phone_button_ui(self.phone1_button, self.phone1_state)
if hasattr(self, 'phone2_audio_timer'):
self.phone2_audio_timer.stop()
def send_phone2_audio(self):
if self.phone2_state == PhoneState.IN_CALL:
message = f"Audio packet {random.randint(1, 1000)}"
self.phone2_client.send(message)
self._update_phone_button_ui(phone['button'], phone['status_label'], state, "")
if state == PhoneState.IDLE and phone['audio_timer']:
phone['audio_timer'].stop()
elif state == PhoneState.IN_CALL and (not phone['audio_timer'] or not phone['audio_timer'].isActive()):
phone['audio_timer'] = QTimer(self)
phone['audio_timer'].timeout.connect(lambda: self.send_audio(client_id))
phone['audio_timer'].start(1000)
def settings_action(self):
print("Settings clicked")
def closeEvent(self, event):
self.phone1_client.stop()
self.phone2_client.stop()
for phone in self.phones:
phone['client'].stop()
event.accept()
if __name__ == "__main__":

View File

@ -5,7 +5,8 @@
# Variables
IMAGE_NAME="gsm-simulator"
CONTAINER_NAME="gsm-sim"
PORT="5555"
PORT="12345"
LOG_FILE="gsm_simulator.log"
# Check if Docker is installed
if ! command -v docker &> /dev/null; then
@ -13,7 +14,7 @@ if ! command -v docker &> /dev/null; then
exit 1
fi
# Check if the gsm_simulator.py file exists in the current directory
# Check if gsm_simulator.py exists
if [ ! -f "gsm_simulator.py" ]; then
echo "Error: gsm_simulator.py not found in the current directory."
echo "Please ensure gsm_simulator.py is present and try again."
@ -26,11 +27,16 @@ if [ ! -f "Dockerfile" ]; then
cat <<EOF > Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY gsm_simulator.py /app
COPY gsm_simulator.py .
EXPOSE 12345
CMD ["python", "gsm_simulator.py"]
EOF
fi
# Ensure log file is writable
touch $LOG_FILE
chmod 666 $LOG_FILE
# Build the Docker image
echo "Building Docker image: $IMAGE_NAME..."
docker build -t $IMAGE_NAME .
@ -41,7 +47,7 @@ if [ $? -ne 0 ]; then
exit 1
fi
# Stop and remove any existing container with the same name
# Stop and remove any existing container
if [ "$(docker ps -q -f name=$CONTAINER_NAME)" ]; then
echo "Stopping existing container: $CONTAINER_NAME..."
docker stop $CONTAINER_NAME
@ -51,16 +57,12 @@ if [ "$(docker ps -aq -f name=$CONTAINER_NAME)" ]; then
docker rm $CONTAINER_NAME
fi
# Run the Docker container
echo "Launching GSM Simulator in Docker container: $CONTAINER_NAME..."
docker run -d -p $PORT:$PORT --name $CONTAINER_NAME $IMAGE_NAME
# Clean up dangling images
docker image prune -f
# Check if the container is running
if [ $? -eq 0 ]; then
echo "GSM Simulator is running on port $PORT."
echo "Container ID: $(docker ps -q -f name=$CONTAINER_NAME)"
echo "You can now connect your external Python programs to localhost:$PORT."
else
echo "Error: Failed to launch the container."
exit 1
fi
# Run the Docker container interactively
echo "Launching GSM Simulator in Docker container: $CONTAINER_NAME..."
docker run -it --rm -p $PORT:$PORT --name $CONTAINER_NAME $IMAGE_NAME | tee $LOG_FILE
# Note: Script will block here until container exits
echo "GSM Simulator stopped. Logs saved to $LOG_FILE."

View File

@ -5,7 +5,7 @@ import time
def connect():
caller_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
caller_socket.connect(('localhost', 5555))
caller_socket.connect(('localhost', 12345))
caller_socket.send("CALLER".encode())
print("Connected to GSM simulator as CALLER")
time.sleep(2) # Wait 2 seconds for receiver to connect

View File

@ -4,7 +4,7 @@ import socket
def connect():
receiver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
receiver_socket.settimeout(15) # Increase timeout to 15 seconds
receiver_socket.connect(('localhost', 5555))
receiver_socket.connect(('localhost', 12345))
receiver_socket.send("RECEIVER".encode())
print("Connected to GSM simulator as RECEIVER")

View File

@ -6,8 +6,8 @@ import subprocess
# Configuration
HOST = "localhost"
PORT = 12345
INPUT_FILE = "input.wav"
OUTPUT_FILE = "received.wav"
INPUT_FILE = "wav/input.wav"
OUTPUT_FILE = "wav/received.wav"
def encrypt_data(data):

View File

@ -0,0 +1,6 @@
# System install
Docker
Python3
# Venv install
PyQt5