Added new media files (video, audio, subtitles), two scripts for audio combination and captcha audio generation, and updated main.py and server.py for voice and speed settings. Script files were revised for clarity and additional instructions, supporting new workflow for proctor scheduling system documentation and audio generation.
135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
import os
|
||
import time
|
||
import random
|
||
import string
|
||
import wave
|
||
import pyaudio
|
||
import soundfile as sf
|
||
|
||
from flask import Flask, request, jsonify
|
||
from kokoro import KPipeline # Assuming you have this library available
|
||
|
||
app = Flask(__name__)
|
||
|
||
selected_voice = "am_fenrir"
|
||
|
||
# Initialize the pipeline at app startup so it doesn’t re-initialize for every request
|
||
try:
|
||
print("[DEBUG] Initializing Kokoro pipeline.")
|
||
pipeline = KPipeline(lang_code='a')
|
||
except Exception as e:
|
||
print("[DEBUG] Failed to create pipeline:", e)
|
||
pipeline = None
|
||
|
||
|
||
def play_audio_pyaudio(filename):
|
||
"""
|
||
Plays a .wav file to the default audio output using PyAudio.
|
||
"""
|
||
wf = wave.open(filename, 'rb')
|
||
p = pyaudio.PyAudio()
|
||
|
||
# Open a stream with the correct settings
|
||
stream = p.open(
|
||
format=p.get_format_from_width(wf.getsampwidth()),
|
||
channels=wf.getnchannels(),
|
||
rate=wf.getframerate(),
|
||
output=True
|
||
)
|
||
|
||
data = wf.readframes(1024)
|
||
while data:
|
||
stream.write(data)
|
||
data = wf.readframes(1024)
|
||
|
||
stream.stop_stream()
|
||
stream.close()
|
||
wf.close()
|
||
p.terminate()
|
||
|
||
print("[DEBUG] Done playing via PyAudio.")
|
||
|
||
|
||
@app.route('/tts', methods=['POST'])
|
||
def tts():
|
||
"""
|
||
POST JSON:
|
||
{
|
||
"text": "Hello world!"
|
||
}
|
||
"""
|
||
if pipeline is None:
|
||
return jsonify({"error": "Pipeline not initialized."}), 500
|
||
|
||
data = request.get_json()
|
||
if not data or 'text' not in data:
|
||
return jsonify({"error": "JSON body must include 'text' key."}), 400
|
||
|
||
user_input = data['text'].strip()
|
||
if not user_input:
|
||
return jsonify({"error": "Empty text provided."}), 400
|
||
|
||
print(f"[DEBUG] Received text: {user_input}")
|
||
|
||
# Generate a random string for filename uniqueness
|
||
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
|
||
print(f"[DEBUG] random_str is {random_str}")
|
||
|
||
# Attempt generating speech audio
|
||
try:
|
||
generator = pipeline(user_input, voice=selected_voice, speed=1)
|
||
print("[DEBUG] Pipeline generator created.")
|
||
except Exception as e:
|
||
print("[DEBUG] Pipeline generation failed:", e)
|
||
return jsonify({"error": str(e)}), 500
|
||
|
||
script_directory = os.path.dirname(os.path.abspath(__file__))
|
||
segment_count = 0
|
||
|
||
for i, (gs, ps, audio) in enumerate(generator):
|
||
print(f"[DEBUG] Got segment {i} from pipeline.")
|
||
try:
|
||
part_filename = os.path.join(script_directory, f"{random_str}_{i}.wav")
|
||
sf.write(part_filename, audio, 24000)
|
||
print(f"[DEBUG] Wrote file: {part_filename}")
|
||
|
||
# Play the generated audio via PyAudio
|
||
play_audio_pyaudio(part_filename)
|
||
|
||
# Delay to let the system finish releasing the file handle
|
||
time.sleep(0.5)
|
||
|
||
# Remove the file after playback
|
||
if os.path.exists(part_filename):
|
||
try:
|
||
os.remove(part_filename)
|
||
print(f"[DEBUG] Deleted file: {part_filename}")
|
||
except PermissionError:
|
||
print(f"[DEBUG] Could not delete {part_filename} (PermissionError). Retrying in 0.5s...")
|
||
time.sleep(0.5)
|
||
try:
|
||
os.remove(part_filename)
|
||
print(f"[DEBUG] Deleted file on retry: {part_filename}")
|
||
except Exception as remove_err:
|
||
print(f"[DEBUG] Still could not delete {part_filename}: {remove_err}")
|
||
else:
|
||
print(f"[DEBUG] File {part_filename} not found. Possibly removed externally.")
|
||
|
||
segment_count += 1
|
||
|
||
except Exception as seg_err:
|
||
print(f"[DEBUG] Error handling segment {i}: {seg_err}")
|
||
return jsonify({"error": str(seg_err)}), 500
|
||
|
||
if segment_count == 0:
|
||
print("[DEBUG] No audio was generated.")
|
||
return jsonify({"warning": "No audio generated."}), 200
|
||
|
||
print("[DEBUG] Finished generating and playing audio.")
|
||
return jsonify({"status": "OK", "message": "Audio played successfully."}), 200
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# Run the Flask server
|
||
app.run(host='0.0.0.0', port=5000, debug=True)
|