238 lines
8.2 KiB
Python
238 lines
8.2 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import os, time, random, subprocess, signal, sys, wave
|
|
import pyVoIP
|
|
from pyVoIP.VoIP import VoIPPhone, InvalidStateError, CallState
|
|
from datetime import datetime
|
|
|
|
# ========= FRITZ!BOX CREDENTIALS =========
|
|
FRITZBOX_IP = "192.168.188.1" # Fritz!Box IP
|
|
FRITZBOX_PORT = 5060 # Fritz!Box Port
|
|
SIP_USER = "raspiphone" # Fritz!Box SIP username (Telefoniegerät)
|
|
SIP_PASS = "3000garagen" # SIP password
|
|
LOCAL_IP = "192.168.188.23" # IP of the Raspberry Pi (for RTP)
|
|
DIAL_TARGET = "**1" # internal target that drives your bells
|
|
|
|
# ========= TIMING / BEHAVIOR =========
|
|
DELAY_S = 5
|
|
RING_TIMEOUT_S = 10
|
|
DOWNTIME_S = 90
|
|
|
|
WAIT_NAME_S = 5 # pause after "What's your name?"
|
|
WAIT_MONTH_S = 5 # pause after "Which month were you born?"
|
|
pyVoIP.DEBUG = True
|
|
|
|
# ========= AUDIO CONFIG =========
|
|
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
AUDIO_DIR = os.path.join(SCRIPT_DIR, "audio")
|
|
AUDIO_DEV = None
|
|
|
|
# ========= AUDIO FILE DISCOVERY =========
|
|
def discover_wav_files(directory_path):
|
|
"""Scans a directory for .wav files and returns a list of their absolute paths."""
|
|
if not os.path.isdir(directory_path):
|
|
print(f"WARNING: Directory not found, cannot discover WAVs: {directory_path}")
|
|
return []
|
|
wav_files = [os.path.join(directory_path, f) for f in os.listdir(directory_path) if f.lower().endswith('.wav')]
|
|
if not wav_files:
|
|
print(f"WARNING: No .wav files found in {directory_path}")
|
|
return wav_files
|
|
|
|
# ========= AUDIO SUBFOLDERS =========
|
|
SYSTEM_AUDIO_DIR = os.path.join(AUDIO_DIR, "system")
|
|
DEVICES_AUDIO_DIR = os.path.join(AUDIO_DIR, "devices")
|
|
POIS_AUDIO_DIR = os.path.join(AUDIO_DIR, "pois")
|
|
|
|
# ========= AUDIO SYSTEM PROMPTS =========
|
|
WAV_ASK_NAME = os.path.join(SYSTEM_AUDIO_DIR, "ask_firstname.wav")
|
|
WAV_ASK_MONTH = os.path.join(SYSTEM_AUDIO_DIR, "ask_month.wav")
|
|
WAV_BYE = os.path.join(SYSTEM_AUDIO_DIR, "bye.wav")
|
|
WAV_GARAGE_INTRO = os.path.join(SYSTEM_AUDIO_DIR, "garage_intro.wav")
|
|
WAV_POI_INTRO = os.path.join(SYSTEM_AUDIO_DIR, "poi_intro.wav") # Assuming you have this
|
|
|
|
# ========= AUDIO LISTS =========
|
|
DEVICE_STORIES = discover_wav_files(DEVICES_AUDIO_DIR)
|
|
POI_POOL = discover_wav_files(POIS_AUDIO_DIR)
|
|
|
|
# ========= AUDIO CHECKUP =========
|
|
if not DEVICE_STORIES or not POI_POOL:
|
|
print("\nFATAL ERROR: Could not find required audio files in 'devices' or 'pois' directories.")
|
|
print(f"Please check that the following directories exist and contain .wav files:")
|
|
print(f"- {DEVICES_AUDIO_DIR}")
|
|
print(f"- {POIS_AUDIO_DIR}")
|
|
sys.exit(1) # Exit the script if essential audio is missing.
|
|
|
|
# ========= PICK RANDOM AUDIO =========
|
|
def get_random_device_story():
|
|
return random.choice(DEVICE_STORIES)
|
|
|
|
def get_random_poi():
|
|
return random.choice(POI_POOL)
|
|
|
|
# ========= AUDIO =========
|
|
def play_wav(call, file_path):
|
|
try:
|
|
with wave.open(file_path, 'rb') as wav_file:
|
|
total_frames = wav_file.getnframes()
|
|
framerate = wav_file.getframerate()
|
|
|
|
if total_frames == 0 or framerate == 0:
|
|
print(f"INFO: Audio file is empty or has invalid headers: {file_path}")
|
|
return True
|
|
|
|
duration_seconds = total_frames / framerate
|
|
print(f"INFO: Playing '{file_path}'. Duration: {duration_seconds:.2f} seconds.")
|
|
|
|
start_time = time.time()
|
|
data = wav_file.readframes(160)
|
|
while data:
|
|
if call.state is not CallState.ANSWERED:
|
|
print("INFO: Playback aborted by user hang-up during write.")
|
|
return False
|
|
call.write_audio(data)
|
|
data = wav_file.readframes(160)
|
|
|
|
while time.time() - start_time < duration_seconds:
|
|
if call.state is not CallState.ANSWERED:
|
|
print("INFO: Playback interrupted by user hang-up during wait.")
|
|
return False
|
|
time.sleep(0.1)
|
|
|
|
return True
|
|
except FileNotFoundError:
|
|
print(f"ERROR: Cannot find audio file: {file_path}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"ERROR: An error occurred during audio playback: {e}")
|
|
return False
|
|
|
|
|
|
|
|
def wait_interruptible(duration_seconds, call):
|
|
start_time = time.time()
|
|
while time.time() - start_time < duration_seconds:
|
|
if call.state is not CallState.ANSWERED:
|
|
print("INFO: Wait interrupted by user hang-up.")
|
|
return False
|
|
time.sleep(0.1)
|
|
return True
|
|
|
|
'''
|
|
# ========= RINGING FUNCTION =========
|
|
def ring_with_timeout(phone, dial_target, timeout_seconds):
|
|
print(f"INFO: Attempting to ring {dial_target} for a maximum of {timeout_seconds}s...")
|
|
call = None
|
|
try:
|
|
call = phone.call(dial_target)
|
|
start_time = time.time()
|
|
|
|
while time.time() - start_time < timeout_seconds:
|
|
if call.state is CallState.ANSWERED:
|
|
print("SUCCESS: Receiver has been picked up.")
|
|
return call
|
|
|
|
# Condition 2: The call object enters an error state or is ended remotely.
|
|
if call.state not in [CallState.CALLING, CallState.RINGING]:
|
|
print(f"INFO: Call state changed to {call.state}. Stopping ring attempt.")
|
|
return None
|
|
time.sleep(0.2) # Check 5 times per second
|
|
# If the while loop finishes, it means we timed out
|
|
print("INFO: Ringing timed out. No one picked up.")
|
|
call.hangup()
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: An exception occurred during ringing: {e}")
|
|
if call:
|
|
call.hangup()
|
|
return None
|
|
|
|
'''
|
|
|
|
# ========= CALLING FUNCTION =========
|
|
def calling(call):
|
|
try:
|
|
start_time = time.time()
|
|
while (call.state is not CallState.ANSWERED):
|
|
print(call.state)
|
|
time.sleep(0.5)
|
|
if time.time() - start_time > RING_TIMEOUT_S:
|
|
#call.hangup()
|
|
call.cancel()
|
|
time.sleep(2)
|
|
#phone.stop()
|
|
return False
|
|
time.sleep(2)
|
|
|
|
play_wav(call, WAV_ASK_NAME)
|
|
|
|
random_device_file = get_random_device_story()
|
|
play_wav(call, random_device_file)
|
|
|
|
random_poi_file = get_random_poi()
|
|
play_wav(call, random_poi_file)
|
|
|
|
play_wav(call, WAV_BYE)
|
|
|
|
call.hangup()
|
|
print("end of call")
|
|
except InvalidStateError as e:
|
|
print(e)
|
|
pass
|
|
except Exception as e:
|
|
print(e)
|
|
call.hangup()
|
|
|
|
# ========= MAIN LOOP =========
|
|
def main():
|
|
print("--- Audio File Configuration ---")
|
|
print(f"Script Directory: {SCRIPT_DIR}")
|
|
print(f"Audio Directory: {AUDIO_DIR}")
|
|
print("\nDiscovered Device Stories:")
|
|
for story in DEVICE_STORIES:
|
|
print(f"- {os.path.basename(story)}")
|
|
|
|
print("\nDiscovered Points of Interest:")
|
|
for poi in POI_POOL:
|
|
print(f"- {os.path.basename(poi)}")
|
|
|
|
print("\nExample of selecting a random story:")
|
|
print(f"-> {os.path.basename(get_random_device_story())}")
|
|
phone = VoIPPhone(FRITZBOX_IP, FRITZBOX_PORT, SIP_USER, SIP_PASS, myIP=LOCAL_IP, callCallback=None)
|
|
phone.start()
|
|
call = phone.call(DIAL_TARGET)
|
|
calling(call)
|
|
phone.stop()
|
|
print(phone.get_status())
|
|
|
|
|
|
'''
|
|
phone = None
|
|
try:
|
|
phone = VoIPPhone(FRITZBOX_IP, FRITZBOX_PORT, SIP_USER, SIP_PASS, myIP=LOCAL_IP, callCallback=None)
|
|
phone.start()
|
|
|
|
active_call = ring_with_timeout(phone, DIAL_TARGET, RING_TIMEOUT_S)
|
|
|
|
if active_call:
|
|
print("INFO: Call established. Starting dialogue...")
|
|
calling(active_call)
|
|
|
|
print("INFO: Dialogue finished. Hanging up.")
|
|
active_call.hangup()
|
|
else:
|
|
print("INFO: No call was established. Returning to idle state (or exiting script).")
|
|
|
|
except Exception as e:
|
|
print(f"FATAL ERROR in main loop: {e}")
|
|
finally:
|
|
if phone:
|
|
print("INFO: Shutting down VoIP connection.")
|
|
phone.stop()
|
|
'''
|
|
|
|
if __name__ == "__main__":
|
|
# Ensure clean exit on SIGTERM (systemd)
|
|
signal.signal(signal.SIGTERM, lambda s, f: sys.exit(0))
|
|
main()
|