""" API Server Module This module implements the Flask REST API server that handles agent creation, chat interactions, content generation and other core functionality. Key Features: - Agent creation and management - Chat interactions with agents - Season and episode content generation - Chat history tracking """ from flask import Flask, jsonify, request, make_response from flask_cors import CORS from dotenv import load_dotenv import os import json import glob from pprint import pprint from prompt_chaining.step_1_create_agent import create_agent as generateAgent from models.openai_model import OpenAIModel from prompt_chaining.step_5_agent_chat import agent_chat from prompt_chaining.step_2_create_content import create_seasons_and_episodes from prompt_chaining.step_3_create_posts import create_episode_posts from utils.post_manager import PostManager from utils.scheduler import AgentScheduler # Load environment variables load_dotenv() #Global Post Manager - Will be instantiated when a user logs in to Twitter global post_manager_twitter # Initialize Flask app with 20MB max content size app = Flask(__name__) app.config['MAX_CONTENT_LENGTH'] = 20 * 1024 * 1024 # 20 MB # Configure CORS for frontend origin CORS(app, resources={r"/api/*": { "origins": ["https://g0c0848ggco0cgw4gss8gws0.dev3vds1.link", "http://localhost:5173","https://goo0wsoskgg4w8sswsc80c0w.dev3vds1.link","*"], "methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"], "allow_headers": ["Content-Type"] }}) # Create global AI model instance ai_model = OpenAIModel() @app.route('/') def show(): return "Hello APi works" # Post reques to create a random agent with no prompt @app.route('/api/agents/random', methods=['POST']) def create_random_agent(): """ Creates a new random agent with optional concept. Request Body: concept (str, optional): Initial concept for the agent Returns: JSON: Generated agent data int: HTTP status code """ # Get concept from request body if provided data = request.get_json() if request.is_json else {} concept = data.get('concept', '') # Default to empty string if no concept provided print("[create_random_agent] - Creating a random agent", f"concept: {concept}" if concept else "") # Remove the local instantiation and use global ai_model print("Using global Gemini Model instance", ai_model) # Create RandomAgents directory if it doesn't exist random_agents_dir = os.path.join('configs', 'RandomAgents') os.makedirs(random_agents_dir, exist_ok=True) # Call the generateAgent function with the concept generated_master_file_path = generateAgent(ai_model, concept) print(f"[create_random_agent] - generatedMasterFilePath for generatedAgent: {generated_master_file_path}") try: if not generated_master_file_path: return jsonify({"error": "No file path generated"}), 500 # Get just the filename without path and extension base_filename = os.path.basename(generated_master_file_path) name_without_ext = os.path.splitext(base_filename)[0] # Replace _master with _random in the filename name_without_ext = name_without_ext.replace('_master', '_random') # Generate unique filename in the RandomAgents directory counter = 1 final_path = os.path.join(random_agents_dir, f"{name_without_ext}.json") while os.path.exists(final_path): filename = f"{name_without_ext}_{counter}.json" final_path = os.path.join(random_agents_dir, filename) counter += 1 # Move the generated file to the RandomAgents directory if os.path.exists(generated_master_file_path): os.rename(generated_master_file_path, final_path) print(f"[create_random_agent] - Moved file to: {final_path}") # Clean up any empty character directory that might have been created char_dir = os.path.dirname(generated_master_file_path) if os.path.exists(char_dir) and not os.listdir(char_dir): os.rmdir(char_dir) # Read the JSON data from the file with open(final_path, 'r', encoding='utf-8') as f: data = json.load(f) print(f"[create_random_agent] - Loaded data from file: {data}") return jsonify(data), 200 except Exception as e: print(f"[create_random_agent] - Error: {str(e)}") return jsonify({"error": "Failed to load agent data"}), 500 @app.before_request def handle_preflight(): """ Handles CORS preflight requests by adding required headers. Returns: Response: Flask response with CORS headers """ if request.method == "OPTIONS": response = make_response() response.headers.add("Access-Control-Allow-Headers", "*") response.headers.add("Access-Control-Allow-Methods", "*") return response # Your existing routes... @app.route('/api/agents', methods=['GET']) def get_agents(): """ Retrieves list of all agents. Returns: JSON: List of agent data """ return jsonify(agents) @app.route('/api/agents/', methods=['POST']) def create_agent(): """ Creates a new agent from provided configuration. If an agent with the same name exists, it will be replaced. Request Body: agent_details (dict): Agent configuration including name, personality, etc. concept (str): Initial concept for the agent Returns: JSON: Created agent data int: HTTP status code """ data = request.get_json() print(f"[create_agent] - Received data: {data}") # Extract character name from the agent_details character_name = data.get('agent_details', {}).get('name') if not character_name: return jsonify({"error": "Character name is required"}), 400 # Replace spaces with underscores in the character name character_name = character_name.replace(' ', '_') # Create a directory for the character character_dir = os.path.join('configs', character_name) os.makedirs(character_dir, exist_ok=True) # Generate filename - always use _master.json filename = f"{character_name}_master.json" file_path = os.path.normpath(os.path.join(character_dir, filename)) # Create the new character structure based on the incoming data structure new_character_data = { "concept": data.get('concept', ''), "agent": { "agent_details": { "name": character_name.replace('_', ' '), "personality": data.get('agent_details', {}).get('personality', []), "communication_style": data.get('agent_details', {}).get('communication_style', []), "backstory": data.get('agent_details', {}).get('backstory', ''), "universe": data.get('agent_details', {}).get('universe', ''), "topic_expertise": data.get('agent_details', {}).get('topic_expertise', []), "hashtags": data.get('agent_details', {}).get('hashtags', []), "emojis": data.get('agent_details', {}).get('emojis', []), "concept": data.get('concept', '') }, "ai_model": { "model_type": "", "model_name": "", "memory_store": "" }, "connectors": { "twitter": False, "telegram": False, "discord": False }, "profile_image": data.get('profile_image', []), "profile_image_options": data.get('profile_image_options', []), "tracker": { "current_season_number": 0, "current_episode_number": 0, "current_post_number": 0, "post_every_x_minutes": 0 }, "seasons": data.get('seasons', []) } } # Write data to the JSON file, overwriting if it exists with open(file_path, 'w', encoding='utf-8') as f: json.dump(new_character_data, f, ensure_ascii=False, indent=4) return jsonify(new_character_data), 201 @app.route('/api/characters', methods=['GET']) def get_characters(): """ Retrieves all character configurations from the configs directory. Returns: JSON: List of character configurations int: HTTP status code """ try: # Use os.path.join for cross-platform path handling config_dir = 'configs' pattern = os.path.join(config_dir, '**', '*master*.json') # Use os.path.normpath to normalize path separators files = [os.path.normpath(f) for f in glob.glob(pattern, recursive=True)] print(f"[get_characters] - Found {len(files)} files: {files}") if not files: print("[get_characters] - No files found in configs directory or subdirectories") return jsonify({"error": "No character files found"}), 404 characters = [] for file in files: try: with open(file, 'r', encoding='utf-8') as f: characters.append(json.load(f)) except json.JSONDecodeError as e: print(f"[get_characters] - Error parsing JSON from {file}: {str(e)}") continue except Exception as e: print(f"[get_characters] - Error reading file {file}: {str(e)}") continue return jsonify(characters) except Exception as e: print(f"[get_characters] - Unexpected error: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/api/agents/chat', methods=['POST']) def chat_with_agent(): """ Handles chat interactions with an agent. Request Body: prompt (str): User message to the agent master_file_path (str): Path to agent's master configuration file chat_history (dict): Previous chat history Returns: JSON: Agent response and updated chat history int: HTTP status code """ data = request.get_json() prompt = data.get('prompt') master_file_path = data.get('master_file_path') print(f"[chat_with_agent] - master_file_path: {master_file_path}") print("\n\n\n") chat_history = data.get('chat_history', {'chat_history': []}) if not master_file_path: return jsonify({"error": "Master file path is required"}), 400 if not os.path.exists(master_file_path): return jsonify({"error": "Agent master file not found"}), 404 try: # Initialize AI model ai_model = OpenAIModel() # Call the agent_chat function from step_5 agent_response, updated_chat_history = agent_chat( ai_model=ai_model, master_file_path=master_file_path, prompt=prompt, chat_history=chat_history ) return jsonify({ "response": agent_response, "chat_history": updated_chat_history }) except Exception as e: print(f"Error in chat_with_agent: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/api/agents/chat-history', methods=['GET']) def get_chat_history(): """ Retrieves chat history for a specific agent. Query Parameters: master_file_path (str): Path to agent's master configuration file Returns: JSON: Agent's chat history int: HTTP status code """ master_file_path = request.args.get('master_file_path') print(f"[get_chat_history] - master_file_path: {master_file_path}") if not master_file_path: return jsonify({"error": "Master file path is required"}), 400 # Extract agent name from master file path agent_name = os.path.basename(master_file_path).replace('_master.json', '') # Create the chat history file path using the same format as in step_5_agent_chat.py chat_file_path = os.path.join(os.path.dirname(master_file_path), f"{agent_name}_chat_log.json") print(f"[get_chat_history] - chat_file_path: {chat_file_path}") print("\n\n\n") # If chat history doesn't exist, return empty history with agent name if not os.path.exists(chat_file_path): return jsonify({ "agent_name": agent_name, "chat_history": [] }) # Load and return the chat history with open(chat_file_path, 'r', encoding='utf-8') as file: chat_history = json.load(file) return jsonify(chat_history) @app.route('/api/agents/seasons', methods=['POST']) def create_season(): """ Generates a new season of content for an agent. Request Body: master_file_path (str): Path to agent's master configuration file number_of_episodes (int): Number of episodes to generate Returns: JSON: Updated agent data with new season int: HTTP status code """ try: data = request.get_json() master_file_path = data.get('master_file_path') number_of_episodes = data.get('number_of_episodes', 3) # Default to 3 episodes if not master_file_path: return jsonify({"error": "Master file path is required"}), 400 # Create a new season using the global AI model result = create_seasons_and_episodes( ai_model=ai_model, master_file_path=master_file_path, number_of_episodes=number_of_episodes ) # Load and return the updated agent data with open(master_file_path, 'r', encoding='utf-8') as f: updated_agent = json.load(f) return jsonify(updated_agent), 200 except Exception as e: print(f"Error creating season: {str(e)}") return jsonify({"error": str(e)}), 500 @app.route('/api/agents/episodes/posts', methods=['POST']) def create_episode_content(): """ Generates posts for an agent's episodes. Request Body: master_file_path (str): Path to agent's master configuration file number_of_posts (int): Number of posts to generate per episode Returns: JSON: Updated agent data with new posts int: HTTP status code """ try: data = request.get_json() master_file_path = data.get('master_file_path') number_of_posts = data.get('number_of_posts', 6) # Default to 6 posts if not master_file_path: return jsonify({"error": "Master file path is required"}), 400 # Create posts for the episodes using the global AI model result = create_episode_posts( ai_model=ai_model, master_file_path=master_file_path, number_of_posts=number_of_posts ) # Load and return the updated agent data with open(master_file_path, 'r', encoding='utf-8') as f: updated_agent = json.load(f) return jsonify(updated_agent), 200 except Exception as e: print(f"Error creating episode posts: {str(e)}") return jsonify({"error": str(e)}), 500 # Twitter Posting @app.route('/api/start-post-manager/twitter', methods=['POST']) def start_post_manager_twitter(): global post_manager_twitter data = request.json agent_name = data.get('agent_name') print("\n") if not agent_name: print("[start_post_manager_twitter] - Agent name is required") return jsonify({'error': 'Agent name is required'}), 400 try: # Create PostManager instance with the agent name post_manager_twitter = PostManager(agent_name=agent_name) print(f"[start_post_manager_twitter] - post_manager created: {post_manager_twitter}") # Check if the PostManager is logged in if post_manager_twitter and post_manager_twitter.is_logged_in: return jsonify({'success': True, 'message': f'Post manager started for {agent_name}'}), 200 else: return jsonify({'error': 'Failed to start post manager or login to Twitter'}), 500 except Exception as e: print(f"[start_post_manager] - Error: {str(e)}") return jsonify({'error': str(e)}), 500 @app.route('/api/post-to-twitter', methods=['POST']) def post_to_twitter(): global post_manager_twitter print("\n") try: data = request.json master_data = data.get('master_data') post_content = data.get('content') print(f"[post_to_twitter api ] - post_content: {post_content}") if not master_data or not post_content: return jsonify({'error': 'Master data or post content is required'}), 400 if post_manager_twitter: post_success = post_manager_twitter.post_single_tweet(post_content) if post_success: return jsonify({'success': True}), 200 else: return jsonify({'error': 'Failed to post to Twitter'}), 500 else: return jsonify({'error': 'Post manager not initialized'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/agents/update-seasons', methods=['PUT']) def update_seasons(): """ Updates the seasons array for a specific agent using the agent's name. Request Body: agent_name (str): Name of the agent seasons (list): New seasons data to update Returns: JSON: Updated agent data int: HTTP status code """ try: data = request.get_json() agent_name = data.get('agent_name') new_seasons = data.get('seasons') if not agent_name or not new_seasons: return jsonify({"error": "Agent name and seasons data are required"}), 400 # Construct the master file path using the agent's name master_file_path = os.path.join('configs', agent_name, f"{agent_name}_master.json") if not os.path.exists(master_file_path): return jsonify({"error": "Agent master file not found"}), 404 # Load the existing agent data with open(master_file_path, 'r', encoding='utf-8') as f: agent_data = json.load(f) # Update the seasons array agent_data['agent']['seasons'] = new_seasons # Save the updated agent data back to the file with open(master_file_path, 'w', encoding='utf-8') as f: json.dump(agent_data, f, ensure_ascii=False, indent=4) return jsonify(agent_data), 200 except Exception as e: print(f"Error updating seasons: {str(e)}") return jsonify({"error": str(e)}), 500 if __name__ == '__main__': print("API Server starting on port 8080...") app.run(host='0.0.0.0',debug=True, port=8080)