2025-02-16 17:58:30 +05:30

527 lines
18 KiB
Python

"""
API Server Module
This module implements the Flask REST API server that handles agent creation,
chat interactions, content generation and other core functionality.
Key Features:
- Agent creation and management
- Chat interactions with agents
- Season and episode content generation
- Chat history tracking
"""
from flask import Flask, jsonify, request, make_response
from flask_cors import CORS
from dotenv import load_dotenv
import os
import json
import glob
from pprint import pprint
from prompt_chaining.step_1_create_agent import create_agent as generateAgent
from models.openai_model import OpenAIModel
from prompt_chaining.step_5_agent_chat import agent_chat
from prompt_chaining.step_2_create_content import create_seasons_and_episodes
from prompt_chaining.step_3_create_posts import create_episode_posts
from utils.post_manager import PostManager
from utils.scheduler import AgentScheduler
# Load environment variables
load_dotenv()
#Global Post Manager - Will be instantiated when a user logs in to Twitter
global post_manager_twitter
# Initialize Flask app with 20MB max content size
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 20 * 1024 * 1024 # 20 MB
# Configure CORS for frontend origin
CORS(app, resources={r"/api/*": {
"origins": ["https://g0c0848ggco0cgw4gss8gws0.dev3vds1.link", "http://localhost:5173","https://goo0wsoskgg4w8sswsc80c0w.dev3vds1.link","*"],
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type"]
}})
# Create global AI model instance
ai_model = OpenAIModel()
@app.route('/')
def show():
return "Hello APi works"
# Post reques to create a random agent with no prompt
@app.route('/api/agents/random', methods=['POST'])
def create_random_agent():
"""
Creates a new random agent with optional concept.
Request Body:
concept (str, optional): Initial concept for the agent
Returns:
JSON: Generated agent data
int: HTTP status code
"""
# Get concept from request body if provided
data = request.get_json() if request.is_json else {}
concept = data.get('concept', '') # Default to empty string if no concept provided
print("[create_random_agent] - Creating a random agent", f"concept: {concept}" if concept else "")
# Remove the local instantiation and use global ai_model
print("Using global Gemini Model instance", ai_model)
# Create RandomAgents directory if it doesn't exist
random_agents_dir = os.path.join('configs', 'RandomAgents')
os.makedirs(random_agents_dir, exist_ok=True)
# Call the generateAgent function with the concept
generated_master_file_path = generateAgent(ai_model, concept)
print(f"[create_random_agent] - generatedMasterFilePath for generatedAgent: {generated_master_file_path}")
try:
if not generated_master_file_path:
return jsonify({"error": "No file path generated"}), 500
# Get just the filename without path and extension
base_filename = os.path.basename(generated_master_file_path)
name_without_ext = os.path.splitext(base_filename)[0]
# Replace _master with _random in the filename
name_without_ext = name_without_ext.replace('_master', '_random')
# Generate unique filename in the RandomAgents directory
counter = 1
final_path = os.path.join(random_agents_dir, f"{name_without_ext}.json")
while os.path.exists(final_path):
filename = f"{name_without_ext}_{counter}.json"
final_path = os.path.join(random_agents_dir, filename)
counter += 1
# Move the generated file to the RandomAgents directory
if os.path.exists(generated_master_file_path):
os.rename(generated_master_file_path, final_path)
print(f"[create_random_agent] - Moved file to: {final_path}")
# Clean up any empty character directory that might have been created
char_dir = os.path.dirname(generated_master_file_path)
if os.path.exists(char_dir) and not os.listdir(char_dir):
os.rmdir(char_dir)
# Read the JSON data from the file
with open(final_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[create_random_agent] - Loaded data from file: {data}")
return jsonify(data), 200
except Exception as e:
print(f"[create_random_agent] - Error: {str(e)}")
return jsonify({"error": "Failed to load agent data"}), 500
@app.before_request
def handle_preflight():
"""
Handles CORS preflight requests by adding required headers.
Returns:
Response: Flask response with CORS headers
"""
if request.method == "OPTIONS":
response = make_response()
response.headers.add("Access-Control-Allow-Headers", "*")
response.headers.add("Access-Control-Allow-Methods", "*")
return response
# Your existing routes...
@app.route('/api/agents', methods=['GET'])
def get_agents():
"""
Retrieves list of all agents.
Returns:
JSON: List of agent data
"""
return jsonify(agents)
@app.route('/api/agents/', methods=['POST'])
def create_agent():
"""
Creates a new agent from provided configuration. If an agent with the same name exists,
it will be replaced.
Request Body:
agent_details (dict): Agent configuration including name, personality, etc.
concept (str): Initial concept for the agent
Returns:
JSON: Created agent data
int: HTTP status code
"""
data = request.get_json()
print(f"[create_agent] - Received data: {data}")
# Extract character name from the agent_details
character_name = data.get('agent_details', {}).get('name')
if not character_name:
return jsonify({"error": "Character name is required"}), 400
# Replace spaces with underscores in the character name
character_name = character_name.replace(' ', '_')
# Create a directory for the character
character_dir = os.path.join('configs', character_name)
os.makedirs(character_dir, exist_ok=True)
# Generate filename - always use _master.json
filename = f"{character_name}_master.json"
file_path = os.path.normpath(os.path.join(character_dir, filename))
# Create the new character structure based on the incoming data structure
new_character_data = {
"concept": data.get('concept', ''),
"agent": {
"agent_details": {
"name": character_name.replace('_', ' '),
"personality": data.get('agent_details', {}).get('personality', []),
"communication_style": data.get('agent_details', {}).get('communication_style', []),
"backstory": data.get('agent_details', {}).get('backstory', ''),
"universe": data.get('agent_details', {}).get('universe', ''),
"topic_expertise": data.get('agent_details', {}).get('topic_expertise', []),
"hashtags": data.get('agent_details', {}).get('hashtags', []),
"emojis": data.get('agent_details', {}).get('emojis', []),
"concept": data.get('concept', '')
},
"ai_model": {
"model_type": "",
"model_name": "",
"memory_store": ""
},
"connectors": {
"twitter": False,
"telegram": False,
"discord": False
},
"profile_image": data.get('profile_image', []),
"profile_image_options": data.get('profile_image_options', []),
"tracker": {
"current_season_number": 0,
"current_episode_number": 0,
"current_post_number": 0,
"post_every_x_minutes": 0
},
"seasons": data.get('seasons', [])
}
}
# Write data to the JSON file, overwriting if it exists
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(new_character_data, f, ensure_ascii=False, indent=4)
return jsonify(new_character_data), 201
@app.route('/api/characters', methods=['GET'])
def get_characters():
"""
Retrieves all character configurations from the configs directory.
Returns:
JSON: List of character configurations
int: HTTP status code
"""
try:
# Use os.path.join for cross-platform path handling
config_dir = 'configs'
pattern = os.path.join(config_dir, '**', '*master*.json')
# Use os.path.normpath to normalize path separators
files = [os.path.normpath(f) for f in glob.glob(pattern, recursive=True)]
print(f"[get_characters] - Found {len(files)} files: {files}")
if not files:
print("[get_characters] - No files found in configs directory or subdirectories")
return jsonify({"error": "No character files found"}), 404
characters = []
for file in files:
try:
with open(file, 'r', encoding='utf-8') as f:
characters.append(json.load(f))
except json.JSONDecodeError as e:
print(f"[get_characters] - Error parsing JSON from {file}: {str(e)}")
continue
except Exception as e:
print(f"[get_characters] - Error reading file {file}: {str(e)}")
continue
return jsonify(characters)
except Exception as e:
print(f"[get_characters] - Unexpected error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/agents/chat', methods=['POST'])
def chat_with_agent():
"""
Handles chat interactions with an agent.
Request Body:
prompt (str): User message to the agent
master_file_path (str): Path to agent's master configuration file
chat_history (dict): Previous chat history
Returns:
JSON: Agent response and updated chat history
int: HTTP status code
"""
data = request.get_json()
prompt = data.get('prompt')
master_file_path = data.get('master_file_path')
print(f"[chat_with_agent] - master_file_path: {master_file_path}")
print("\n\n\n")
chat_history = data.get('chat_history', {'chat_history': []})
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
if not os.path.exists(master_file_path):
return jsonify({"error": "Agent master file not found"}), 404
try:
# Initialize AI model
ai_model = OpenAIModel()
# Call the agent_chat function from step_5
agent_response, updated_chat_history = agent_chat(
ai_model=ai_model,
master_file_path=master_file_path,
prompt=prompt,
chat_history=chat_history
)
return jsonify({
"response": agent_response,
"chat_history": updated_chat_history
})
except Exception as e:
print(f"Error in chat_with_agent: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/agents/chat-history', methods=['GET'])
def get_chat_history():
"""
Retrieves chat history for a specific agent.
Query Parameters:
master_file_path (str): Path to agent's master configuration file
Returns:
JSON: Agent's chat history
int: HTTP status code
"""
master_file_path = request.args.get('master_file_path')
print(f"[get_chat_history] - master_file_path: {master_file_path}")
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
# Extract agent name from master file path
agent_name = os.path.basename(master_file_path).replace('_master.json', '')
# Create the chat history file path using the same format as in step_5_agent_chat.py
chat_file_path = os.path.join(os.path.dirname(master_file_path), f"{agent_name}_chat_log.json")
print(f"[get_chat_history] - chat_file_path: {chat_file_path}")
print("\n\n\n")
# If chat history doesn't exist, return empty history with agent name
if not os.path.exists(chat_file_path):
return jsonify({
"agent_name": agent_name,
"chat_history": []
})
# Load and return the chat history
with open(chat_file_path, 'r', encoding='utf-8') as file:
chat_history = json.load(file)
return jsonify(chat_history)
@app.route('/api/agents/seasons', methods=['POST'])
def create_season():
"""
Generates a new season of content for an agent.
Request Body:
master_file_path (str): Path to agent's master configuration file
number_of_episodes (int): Number of episodes to generate
Returns:
JSON: Updated agent data with new season
int: HTTP status code
"""
try:
data = request.get_json()
master_file_path = data.get('master_file_path')
number_of_episodes = data.get('number_of_episodes', 3) # Default to 3 episodes
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
# Create a new season using the global AI model
result = create_seasons_and_episodes(
ai_model=ai_model,
master_file_path=master_file_path,
number_of_episodes=number_of_episodes
)
# Load and return the updated agent data
with open(master_file_path, 'r', encoding='utf-8') as f:
updated_agent = json.load(f)
return jsonify(updated_agent), 200
except Exception as e:
print(f"Error creating season: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/agents/episodes/posts', methods=['POST'])
def create_episode_content():
"""
Generates posts for an agent's episodes.
Request Body:
master_file_path (str): Path to agent's master configuration file
number_of_posts (int): Number of posts to generate per episode
Returns:
JSON: Updated agent data with new posts
int: HTTP status code
"""
try:
data = request.get_json()
master_file_path = data.get('master_file_path')
number_of_posts = data.get('number_of_posts', 6) # Default to 6 posts
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
# Create posts for the episodes using the global AI model
result = create_episode_posts(
ai_model=ai_model,
master_file_path=master_file_path,
number_of_posts=number_of_posts
)
# Load and return the updated agent data
with open(master_file_path, 'r', encoding='utf-8') as f:
updated_agent = json.load(f)
return jsonify(updated_agent), 200
except Exception as e:
print(f"Error creating episode posts: {str(e)}")
return jsonify({"error": str(e)}), 500
# Twitter Posting
@app.route('/api/start-post-manager/twitter', methods=['POST'])
def start_post_manager_twitter():
global post_manager_twitter
data = request.json
agent_name = data.get('agent_name')
print("\n")
if not agent_name:
print("[start_post_manager_twitter] - Agent name is required")
return jsonify({'error': 'Agent name is required'}), 400
try:
# Create PostManager instance with the agent name
post_manager_twitter = PostManager(agent_name=agent_name)
print(f"[start_post_manager_twitter] - post_manager created: {post_manager_twitter}")
# Check if the PostManager is logged in
if post_manager_twitter and post_manager_twitter.is_logged_in:
return jsonify({'success': True, 'message': f'Post manager started for {agent_name}'}), 200
else:
return jsonify({'error': 'Failed to start post manager or login to Twitter'}), 500
except Exception as e:
print(f"[start_post_manager] - Error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/post-to-twitter', methods=['POST'])
def post_to_twitter():
global post_manager_twitter
print("\n")
try:
data = request.json
master_data = data.get('master_data')
post_content = data.get('content')
print(f"[post_to_twitter api ] - post_content: {post_content}")
if not master_data or not post_content:
return jsonify({'error': 'Master data or post content is required'}), 400
if post_manager_twitter:
post_success = post_manager_twitter.post_single_tweet(post_content)
if post_success:
return jsonify({'success': True}), 200
else:
return jsonify({'error': 'Failed to post to Twitter'}), 500
else:
return jsonify({'error': 'Post manager not initialized'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/agents/update-seasons', methods=['PUT'])
def update_seasons():
"""
Updates the seasons array for a specific agent using the agent's name.
Request Body:
agent_name (str): Name of the agent
seasons (list): New seasons data to update
Returns:
JSON: Updated agent data
int: HTTP status code
"""
try:
data = request.get_json()
agent_name = data.get('agent_name')
new_seasons = data.get('seasons')
if not agent_name or not new_seasons:
return jsonify({"error": "Agent name and seasons data are required"}), 400
# Construct the master file path using the agent's name
master_file_path = os.path.join('configs', agent_name, f"{agent_name}_master.json")
if not os.path.exists(master_file_path):
return jsonify({"error": "Agent master file not found"}), 404
# Load the existing agent data
with open(master_file_path, 'r', encoding='utf-8') as f:
agent_data = json.load(f)
# Update the seasons array
agent_data['agent']['seasons'] = new_seasons
# Save the updated agent data back to the file
with open(master_file_path, 'w', encoding='utf-8') as f:
json.dump(agent_data, f, ensure_ascii=False, indent=4)
return jsonify(agent_data), 200
except Exception as e:
print(f"Error updating seasons: {str(e)}")
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
print("API Server starting on port 8080...")
app.run(host='0.0.0.0',debug=True, port=8080)