add of DryBox

This commit is contained in:
Bartosz 2025-03-28 18:06:07 +00:00
parent f9e64a73d9
commit 0e619309ea
5 changed files with 225 additions and 0 deletions

View File

@ -0,0 +1,4 @@
FROM python:3.9-slim
WORKDIR /app
COPY gsm_simulator.py /app
CMD ["python", "gsm_simulator.py"]

View File

@ -0,0 +1,25 @@
import socket
import socket
import time
def connect():
caller_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
caller_socket.connect(('localhost', 5555))
caller_socket.send("CALLER".encode())
print("Connected to GSM simulator as CALLER")
time.sleep(2) # Wait 2 seconds for receiver to connect
for i in range(5):
message = f"Audio packet {i + 1}"
caller_socket.send(message.encode())
print(f"Sent: {message}")
time.sleep(1)
caller_socket.send("CALL_END".encode())
print("Call ended.")
caller_socket.close()
if __name__ == "__main__":
connect()

View File

@ -0,0 +1,36 @@
import socket
def connect():
receiver_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
receiver_socket.settimeout(15) # Increase timeout to 15 seconds
receiver_socket.connect(('localhost', 5555))
receiver_socket.send("RECEIVER".encode())
print("Connected to GSM simulator as RECEIVER")
while True:
try:
data = receiver_socket.recv(1024).decode().strip()
if not data:
print("No data received. Connection closed.")
break
if data == "RINGING":
print("Incoming call... ringing")
elif data == "CALL_END":
print("Call ended by caller.")
break
elif data == "CALL_DROPPED":
print("Call dropped by network.")
break
else:
print(f"Received: {data}")
except socket.timeout:
print("Timed out waiting for data.")
break
except Exception as e:
print(f"Receiver error: {e}")
break
receiver_socket.close()
if __name__ == "__main__":
connect()

View File

@ -0,0 +1,94 @@
import socket
import threading
import random
import time
class GSMSimulator:
def __init__(self, host='0.0.0.0', port=5555):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((host, port))
self.server_socket.listen(2)
self.clients = {}
print("GSM Simulator started. Waiting for connections...")
def simulate_signal_strength(self):
return random.uniform(0, 1)
def handle_client(self, client_socket, address):
try:
client_socket.settimeout(5)
role = client_socket.recv(1024).decode().strip()
if role not in ["CALLER", "RECEIVER"]:
print(f"Invalid role received from {address}: {role}")
client_socket.close()
return
self.clients[role] = client_socket
print(f"{role} connected from {address}")
if role == "CALLER":
self.handle_caller(client_socket)
elif role == "RECEIVER":
self.handle_receiver(client_socket)
except Exception as e:
print(f"Error handling client {address}: {e}")
finally:
if role in self.clients:
del self.clients[role]
client_socket.close()
def handle_caller(self, caller_socket):
print("Caller connected, waiting for receiver...")
while "RECEIVER" not in self.clients:
time.sleep(1)
receiver_socket = self.clients["RECEIVER"]
print("Receiver found, sending RINGING...")
receiver_socket.send("RINGING".encode())
while True:
try:
data = caller_socket.recv(1024).decode().strip()
if not data:
print("Caller disconnected unexpectedly.")
receiver_socket.send("CALL_END".encode())
break
if data == "CALL_END":
print("Call terminated by caller.")
receiver_socket.send("CALL_END".encode())
break
signal = self.simulate_signal_strength()
if signal < 0.2:
print("Call dropped due to low signal strength.")
caller_socket.send("CALL_DROPPED".encode())
receiver_socket.send("CALL_DROPPED".encode())
break
print(f"Relaying to receiver: {data} (Signal: {signal:.2f})")
receiver_socket.send(data.encode())
time.sleep(0.1) # Small delay to ensure data is sent properly
except Exception as e:
print(f"Error in caller loop: {e}")
receiver_socket.send("CALL_END".encode())
break
def handle_receiver(self, receiver_socket):
print("Receiver connected, waiting for data...")
try:
while True:
data = receiver_socket.recv(1024)
if not data:
print("Receiver disconnected.")
break
except Exception as e:
print(f"Error in receiver loop: {e}")
def run(self):
while True:
client_socket, address = self.server_socket.accept()
print(f"New connection from {address}")
thread = threading.Thread(target=self.handle_client, args=(client_socket, address))
thread.start()
if __name__ == "__main__":
gsm = GSMSimulator()
gsm.run()

View File

@ -0,0 +1,66 @@
#!/bin/bash
# Script to launch the GSM Simulator in Docker
# Variables
IMAGE_NAME="gsm-simulator"
CONTAINER_NAME="gsm-sim"
PORT="5555"
# Check if Docker is installed
if ! command -v docker &> /dev/null; then
echo "Error: Docker is not installed. Please install Docker and try again."
exit 1
fi
# Check if the gsm_simulator.py file exists in the current directory
if [ ! -f "gsm_simulator.py" ]; then
echo "Error: gsm_simulator.py not found in the current directory."
echo "Please ensure gsm_simulator.py is present and try again."
exit 1
fi
# Create Dockerfile if it doesn't exist
if [ ! -f "Dockerfile" ]; then
echo "Creating Dockerfile..."
cat <<EOF > Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY gsm_simulator.py /app
CMD ["python", "gsm_simulator.py"]
EOF
fi
# Build the Docker image
echo "Building Docker image: $IMAGE_NAME..."
docker build -t $IMAGE_NAME .
# Check if the build was successful
if [ $? -ne 0 ]; then
echo "Error: Failed to build Docker image."
exit 1
fi
# Stop and remove any existing container with the same name
if [ "$(docker ps -q -f name=$CONTAINER_NAME)" ]; then
echo "Stopping existing container: $CONTAINER_NAME..."
docker stop $CONTAINER_NAME
fi
if [ "$(docker ps -aq -f name=$CONTAINER_NAME)" ]; then
echo "Removing existing container: $CONTAINER_NAME..."
docker rm $CONTAINER_NAME
fi
# Run the Docker container
echo "Launching GSM Simulator in Docker container: $CONTAINER_NAME..."
docker run -d -p $PORT:$PORT --name $CONTAINER_NAME $IMAGE_NAME
# Check if the container is running
if [ $? -eq 0 ]; then
echo "GSM Simulator is running on port $PORT."
echo "Container ID: $(docker ps -q -f name=$CONTAINER_NAME)"
echo "You can now connect your external Python programs to localhost:$PORT."
else
echo "Error: Failed to launch the container."
exit 1
fi