527 lines
18 KiB
Python
Raw Normal View History

2025-02-12 17:38:06 +05:30
"""
API Server Module
This module implements the Flask REST API server that handles agent creation,
chat interactions, content generation and other core functionality.
Key Features:
- Agent creation and management
- Chat interactions with agents
- Season and episode content generation
- Chat history tracking
"""
from flask import Flask, jsonify, request, make_response
from flask_cors import CORS
from dotenv import load_dotenv
import os
import json
import glob
from pprint import pprint
from prompt_chaining.step_1_create_agent import create_agent as generateAgent
from models.openai_model import OpenAIModel
from prompt_chaining.step_5_agent_chat import agent_chat
from prompt_chaining.step_2_create_content import create_seasons_and_episodes
from prompt_chaining.step_3_create_posts import create_episode_posts
from utils.post_manager import PostManager
from utils.scheduler import AgentScheduler
# Load environment variables
load_dotenv()
#Global Post Manager - Will be instantiated when a user logs in to Twitter
global post_manager_twitter
# Initialize Flask app with 20MB max content size
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 20 * 1024 * 1024 # 20 MB
# Configure CORS for frontend origin
CORS(app, resources={r"/api/*": {
2025-02-16 17:58:30 +05:30
"origins": ["https://g0c0848ggco0cgw4gss8gws0.dev3vds1.link", "http://localhost:5173","https://goo0wsoskgg4w8sswsc80c0w.dev3vds1.link","*"],
2025-02-12 17:38:06 +05:30
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type"]
}})
# Create global AI model instance
ai_model = OpenAIModel()
@app.route('/')
def show():
return "Hello APi works"
# Post reques to create a random agent with no prompt
@app.route('/api/agents/random', methods=['POST'])
def create_random_agent():
"""
Creates a new random agent with optional concept.
Request Body:
concept (str, optional): Initial concept for the agent
Returns:
JSON: Generated agent data
int: HTTP status code
"""
# Get concept from request body if provided
data = request.get_json() if request.is_json else {}
concept = data.get('concept', '') # Default to empty string if no concept provided
print("[create_random_agent] - Creating a random agent", f"concept: {concept}" if concept else "")
# Remove the local instantiation and use global ai_model
print("Using global Gemini Model instance", ai_model)
# Create RandomAgents directory if it doesn't exist
random_agents_dir = os.path.join('configs', 'RandomAgents')
os.makedirs(random_agents_dir, exist_ok=True)
# Call the generateAgent function with the concept
generated_master_file_path = generateAgent(ai_model, concept)
print(f"[create_random_agent] - generatedMasterFilePath for generatedAgent: {generated_master_file_path}")
try:
if not generated_master_file_path:
return jsonify({"error": "No file path generated"}), 500
# Get just the filename without path and extension
base_filename = os.path.basename(generated_master_file_path)
name_without_ext = os.path.splitext(base_filename)[0]
# Replace _master with _random in the filename
name_without_ext = name_without_ext.replace('_master', '_random')
# Generate unique filename in the RandomAgents directory
counter = 1
final_path = os.path.join(random_agents_dir, f"{name_without_ext}.json")
while os.path.exists(final_path):
filename = f"{name_without_ext}_{counter}.json"
final_path = os.path.join(random_agents_dir, filename)
counter += 1
# Move the generated file to the RandomAgents directory
if os.path.exists(generated_master_file_path):
os.rename(generated_master_file_path, final_path)
print(f"[create_random_agent] - Moved file to: {final_path}")
# Clean up any empty character directory that might have been created
char_dir = os.path.dirname(generated_master_file_path)
if os.path.exists(char_dir) and not os.listdir(char_dir):
os.rmdir(char_dir)
# Read the JSON data from the file
with open(final_path, 'r', encoding='utf-8') as f:
data = json.load(f)
print(f"[create_random_agent] - Loaded data from file: {data}")
return jsonify(data), 200
except Exception as e:
print(f"[create_random_agent] - Error: {str(e)}")
return jsonify({"error": "Failed to load agent data"}), 500
@app.before_request
def handle_preflight():
"""
Handles CORS preflight requests by adding required headers.
Returns:
Response: Flask response with CORS headers
"""
if request.method == "OPTIONS":
response = make_response()
response.headers.add("Access-Control-Allow-Headers", "*")
response.headers.add("Access-Control-Allow-Methods", "*")
return response
# Your existing routes...
@app.route('/api/agents', methods=['GET'])
def get_agents():
"""
Retrieves list of all agents.
Returns:
JSON: List of agent data
"""
return jsonify(agents)
@app.route('/api/agents/', methods=['POST'])
def create_agent():
"""
Creates a new agent from provided configuration. If an agent with the same name exists,
it will be replaced.
Request Body:
agent_details (dict): Agent configuration including name, personality, etc.
concept (str): Initial concept for the agent
Returns:
JSON: Created agent data
int: HTTP status code
"""
data = request.get_json()
print(f"[create_agent] - Received data: {data}")
# Extract character name from the agent_details
character_name = data.get('agent_details', {}).get('name')
if not character_name:
return jsonify({"error": "Character name is required"}), 400
# Replace spaces with underscores in the character name
character_name = character_name.replace(' ', '_')
# Create a directory for the character
character_dir = os.path.join('configs', character_name)
os.makedirs(character_dir, exist_ok=True)
# Generate filename - always use _master.json
filename = f"{character_name}_master.json"
file_path = os.path.normpath(os.path.join(character_dir, filename))
# Create the new character structure based on the incoming data structure
new_character_data = {
"concept": data.get('concept', ''),
"agent": {
"agent_details": {
"name": character_name.replace('_', ' '),
"personality": data.get('agent_details', {}).get('personality', []),
"communication_style": data.get('agent_details', {}).get('communication_style', []),
"backstory": data.get('agent_details', {}).get('backstory', ''),
"universe": data.get('agent_details', {}).get('universe', ''),
"topic_expertise": data.get('agent_details', {}).get('topic_expertise', []),
"hashtags": data.get('agent_details', {}).get('hashtags', []),
"emojis": data.get('agent_details', {}).get('emojis', []),
"concept": data.get('concept', '')
},
"ai_model": {
"model_type": "",
"model_name": "",
"memory_store": ""
},
"connectors": {
"twitter": False,
"telegram": False,
"discord": False
},
"profile_image": data.get('profile_image', []),
"profile_image_options": data.get('profile_image_options', []),
"tracker": {
"current_season_number": 0,
"current_episode_number": 0,
"current_post_number": 0,
"post_every_x_minutes": 0
},
"seasons": data.get('seasons', [])
}
}
# Write data to the JSON file, overwriting if it exists
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(new_character_data, f, ensure_ascii=False, indent=4)
return jsonify(new_character_data), 201
@app.route('/api/characters', methods=['GET'])
def get_characters():
"""
Retrieves all character configurations from the configs directory.
Returns:
JSON: List of character configurations
int: HTTP status code
"""
try:
# Use os.path.join for cross-platform path handling
config_dir = 'configs'
pattern = os.path.join(config_dir, '**', '*master*.json')
# Use os.path.normpath to normalize path separators
files = [os.path.normpath(f) for f in glob.glob(pattern, recursive=True)]
print(f"[get_characters] - Found {len(files)} files: {files}")
if not files:
print("[get_characters] - No files found in configs directory or subdirectories")
return jsonify({"error": "No character files found"}), 404
characters = []
for file in files:
try:
with open(file, 'r', encoding='utf-8') as f:
characters.append(json.load(f))
except json.JSONDecodeError as e:
print(f"[get_characters] - Error parsing JSON from {file}: {str(e)}")
continue
except Exception as e:
print(f"[get_characters] - Error reading file {file}: {str(e)}")
continue
return jsonify(characters)
except Exception as e:
print(f"[get_characters] - Unexpected error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/agents/chat', methods=['POST'])
def chat_with_agent():
"""
Handles chat interactions with an agent.
Request Body:
prompt (str): User message to the agent
master_file_path (str): Path to agent's master configuration file
chat_history (dict): Previous chat history
Returns:
JSON: Agent response and updated chat history
int: HTTP status code
"""
data = request.get_json()
prompt = data.get('prompt')
master_file_path = data.get('master_file_path')
print(f"[chat_with_agent] - master_file_path: {master_file_path}")
print("\n\n\n")
chat_history = data.get('chat_history', {'chat_history': []})
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
if not os.path.exists(master_file_path):
return jsonify({"error": "Agent master file not found"}), 404
try:
# Initialize AI model
ai_model = OpenAIModel()
# Call the agent_chat function from step_5
agent_response, updated_chat_history = agent_chat(
ai_model=ai_model,
master_file_path=master_file_path,
prompt=prompt,
chat_history=chat_history
)
return jsonify({
"response": agent_response,
"chat_history": updated_chat_history
})
except Exception as e:
print(f"Error in chat_with_agent: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/agents/chat-history', methods=['GET'])
def get_chat_history():
"""
Retrieves chat history for a specific agent.
Query Parameters:
master_file_path (str): Path to agent's master configuration file
Returns:
JSON: Agent's chat history
int: HTTP status code
"""
master_file_path = request.args.get('master_file_path')
print(f"[get_chat_history] - master_file_path: {master_file_path}")
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
# Extract agent name from master file path
agent_name = os.path.basename(master_file_path).replace('_master.json', '')
# Create the chat history file path using the same format as in step_5_agent_chat.py
chat_file_path = os.path.join(os.path.dirname(master_file_path), f"{agent_name}_chat_log.json")
print(f"[get_chat_history] - chat_file_path: {chat_file_path}")
print("\n\n\n")
# If chat history doesn't exist, return empty history with agent name
if not os.path.exists(chat_file_path):
return jsonify({
"agent_name": agent_name,
"chat_history": []
})
# Load and return the chat history
with open(chat_file_path, 'r', encoding='utf-8') as file:
chat_history = json.load(file)
return jsonify(chat_history)
@app.route('/api/agents/seasons', methods=['POST'])
def create_season():
"""
Generates a new season of content for an agent.
Request Body:
master_file_path (str): Path to agent's master configuration file
number_of_episodes (int): Number of episodes to generate
Returns:
JSON: Updated agent data with new season
int: HTTP status code
"""
try:
data = request.get_json()
master_file_path = data.get('master_file_path')
number_of_episodes = data.get('number_of_episodes', 3) # Default to 3 episodes
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
# Create a new season using the global AI model
result = create_seasons_and_episodes(
ai_model=ai_model,
master_file_path=master_file_path,
number_of_episodes=number_of_episodes
)
# Load and return the updated agent data
with open(master_file_path, 'r', encoding='utf-8') as f:
updated_agent = json.load(f)
return jsonify(updated_agent), 200
except Exception as e:
print(f"Error creating season: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/agents/episodes/posts', methods=['POST'])
def create_episode_content():
"""
Generates posts for an agent's episodes.
Request Body:
master_file_path (str): Path to agent's master configuration file
number_of_posts (int): Number of posts to generate per episode
Returns:
JSON: Updated agent data with new posts
int: HTTP status code
"""
try:
data = request.get_json()
master_file_path = data.get('master_file_path')
number_of_posts = data.get('number_of_posts', 6) # Default to 6 posts
if not master_file_path:
return jsonify({"error": "Master file path is required"}), 400
# Create posts for the episodes using the global AI model
result = create_episode_posts(
ai_model=ai_model,
master_file_path=master_file_path,
number_of_posts=number_of_posts
)
# Load and return the updated agent data
with open(master_file_path, 'r', encoding='utf-8') as f:
updated_agent = json.load(f)
return jsonify(updated_agent), 200
except Exception as e:
print(f"Error creating episode posts: {str(e)}")
return jsonify({"error": str(e)}), 500
# Twitter Posting
@app.route('/api/start-post-manager/twitter', methods=['POST'])
def start_post_manager_twitter():
global post_manager_twitter
data = request.json
agent_name = data.get('agent_name')
print("\n")
if not agent_name:
print("[start_post_manager_twitter] - Agent name is required")
return jsonify({'error': 'Agent name is required'}), 400
try:
# Create PostManager instance with the agent name
post_manager_twitter = PostManager(agent_name=agent_name)
print(f"[start_post_manager_twitter] - post_manager created: {post_manager_twitter}")
# Check if the PostManager is logged in
if post_manager_twitter and post_manager_twitter.is_logged_in:
return jsonify({'success': True, 'message': f'Post manager started for {agent_name}'}), 200
else:
return jsonify({'error': 'Failed to start post manager or login to Twitter'}), 500
except Exception as e:
print(f"[start_post_manager] - Error: {str(e)}")
return jsonify({'error': str(e)}), 500
@app.route('/api/post-to-twitter', methods=['POST'])
def post_to_twitter():
global post_manager_twitter
print("\n")
try:
data = request.json
master_data = data.get('master_data')
post_content = data.get('content')
print(f"[post_to_twitter api ] - post_content: {post_content}")
if not master_data or not post_content:
return jsonify({'error': 'Master data or post content is required'}), 400
if post_manager_twitter:
post_success = post_manager_twitter.post_single_tweet(post_content)
if post_success:
return jsonify({'success': True}), 200
else:
return jsonify({'error': 'Failed to post to Twitter'}), 500
else:
return jsonify({'error': 'Post manager not initialized'}), 500
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/agents/update-seasons', methods=['PUT'])
def update_seasons():
"""
Updates the seasons array for a specific agent using the agent's name.
Request Body:
agent_name (str): Name of the agent
seasons (list): New seasons data to update
Returns:
JSON: Updated agent data
int: HTTP status code
"""
try:
data = request.get_json()
agent_name = data.get('agent_name')
new_seasons = data.get('seasons')
if not agent_name or not new_seasons:
return jsonify({"error": "Agent name and seasons data are required"}), 400
# Construct the master file path using the agent's name
master_file_path = os.path.join('configs', agent_name, f"{agent_name}_master.json")
if not os.path.exists(master_file_path):
return jsonify({"error": "Agent master file not found"}), 404
# Load the existing agent data
with open(master_file_path, 'r', encoding='utf-8') as f:
agent_data = json.load(f)
# Update the seasons array
agent_data['agent']['seasons'] = new_seasons
# Save the updated agent data back to the file
with open(master_file_path, 'w', encoding='utf-8') as f:
json.dump(agent_data, f, ensure_ascii=False, indent=4)
return jsonify(agent_data), 200
except Exception as e:
print(f"Error updating seasons: {str(e)}")
return jsonify({"error": str(e)}), 500
if __name__ == '__main__':
print("API Server starting on port 8080...")
app.run(host='0.0.0.0',debug=True, port=8080)