import os import time import random import string import wave import pyaudio import soundfile as sf from flask import Flask, request, jsonify from kokoro import KPipeline # Assuming you have this library available app = Flask(__name__) selected_voice = "am_fenrir" # Initialize the pipeline at app startup so it doesn’t re-initialize for every request try: print("[DEBUG] Initializing Kokoro pipeline.") pipeline = KPipeline(lang_code='a') except Exception as e: print("[DEBUG] Failed to create pipeline:", e) pipeline = None def play_audio_pyaudio(filename): """ Plays a .wav file to the default audio output using PyAudio. """ wf = wave.open(filename, 'rb') p = pyaudio.PyAudio() # Open a stream with the correct settings stream = p.open( format=p.get_format_from_width(wf.getsampwidth()), channels=wf.getnchannels(), rate=wf.getframerate(), output=True ) data = wf.readframes(1024) while data: stream.write(data) data = wf.readframes(1024) stream.stop_stream() stream.close() wf.close() p.terminate() print("[DEBUG] Done playing via PyAudio.") @app.route('/tts', methods=['POST']) def tts(): """ POST JSON: { "text": "Hello world!" } """ if pipeline is None: return jsonify({"error": "Pipeline not initialized."}), 500 data = request.get_json() if not data or 'text' not in data: return jsonify({"error": "JSON body must include 'text' key."}), 400 user_input = data['text'].strip() if not user_input: return jsonify({"error": "Empty text provided."}), 400 print(f"[DEBUG] Received text: {user_input}") # Generate a random string for filename uniqueness random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) print(f"[DEBUG] random_str is {random_str}") # Attempt generating speech audio try: generator = pipeline(user_input, voice=selected_voice, speed=1) print("[DEBUG] Pipeline generator created.") except Exception as e: print("[DEBUG] Pipeline generation failed:", e) return jsonify({"error": str(e)}), 500 script_directory = os.path.dirname(os.path.abspath(__file__)) segment_count = 0 for i, (gs, ps, audio) in enumerate(generator): print(f"[DEBUG] Got segment {i} from pipeline.") try: part_filename = os.path.join(script_directory, f"{random_str}_{i}.wav") sf.write(part_filename, audio, 24000) print(f"[DEBUG] Wrote file: {part_filename}") # Play the generated audio via PyAudio play_audio_pyaudio(part_filename) # Delay to let the system finish releasing the file handle time.sleep(0.5) # Remove the file after playback if os.path.exists(part_filename): try: os.remove(part_filename) print(f"[DEBUG] Deleted file: {part_filename}") except PermissionError: print(f"[DEBUG] Could not delete {part_filename} (PermissionError). Retrying in 0.5s...") time.sleep(0.5) try: os.remove(part_filename) print(f"[DEBUG] Deleted file on retry: {part_filename}") except Exception as remove_err: print(f"[DEBUG] Still could not delete {part_filename}: {remove_err}") else: print(f"[DEBUG] File {part_filename} not found. Possibly removed externally.") segment_count += 1 except Exception as seg_err: print(f"[DEBUG] Error handling segment {i}: {seg_err}") return jsonify({"error": str(seg_err)}), 500 if segment_count == 0: print("[DEBUG] No audio was generated.") return jsonify({"warning": "No audio generated."}), 200 print("[DEBUG] Finished generating and playing audio.") return jsonify({"status": "OK", "message": "Audio played successfully."}), 200 if __name__ == '__main__': # Run the Flask server app.run(host='0.0.0.0', port=5000, debug=True)