commit 79ee3aa8cc6a5d64453ab88566aa69ddc22104be Author: yoginawaka Date: Tue Feb 11 23:14:01 2025 +0530 Some changes diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..4e3fadc --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +OPENAI_API_KEY=your_openai_api_key_here +LOG_LEVEL=INFO +HOST=127.0.0.1 +PORT=8000 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ead652b --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +.env +__pycache__/ +*.pyc +.pytest_cache/ +.coverage +htmlcov/ +dist/ +build/ +*.egg-info/ +.venv/ +venv/ +.idea/ +.vscode/ \ No newline at end of file diff --git a/CSS b/CSS new file mode 100644 index 0000000..d27d818 --- /dev/null +++ b/CSS @@ -0,0 +1,29 @@ +/* Basic responsive design */ +body { + font-family: Arial, sans-serif; + padding: 20px; + margin: 0; + box-sizing: border-box; +} + +header { + text-align: center; +} + +#projects { + margin-top: 20px; +} + +.project { + border: 1px solid #ddd; + padding: 10px; + margin-bottom: 20px; +} + +@media (min-width: 600px) { + .project { + display: flex; + justify-content: space-between; + align-items: center; + } +} \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..90cc41b --- /dev/null +++ b/Dockerfile @@ -0,0 +1,38 @@ +# Use an official Python runtime as a parent image +FROM python:3.9-slim + +# Set environment variables +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + WORKER_TIMEOUT=300 \ + GRACEFUL_TIMEOUT=120 \ + KEEP_ALIVE=120 + +# Set work directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + gcc \ + python3-dev \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy project +COPY . . + +# Create a non-root user +RUN adduser --disabled-password --gecos "" appuser +RUN chown -R appuser:appuser /app +USER appuser + +# Expose the port the app runs on +EXPOSE 8000 + +# Command to run the application using Gunicorn +CMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--timeout", "300", "--keep-alive", "120", "--graceful-timeout", "120", "--bind", "0.0.0.0:8000", "app:app"] diff --git a/Procfile b/Procfile new file mode 100644 index 0000000..fdce7ba --- /dev/null +++ b/Procfile @@ -0,0 +1 @@ +web: gunicorn -w 2 -k uvicorn.workers.UvicornWorker --timeout 300 --graceful-timeout 120 --keep-alive 120 --max-requests 1000 --max-requests-jitter 50 --worker-tmp-dir /dev/shm --worker-class uvicorn.workers.UvicornWorker --log-level info app:app \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d607c43 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# almaze-api diff --git a/agents/__init__.py b/agents/__init__.py new file mode 100644 index 0000000..9e1a490 --- /dev/null +++ b/agents/__init__.py @@ -0,0 +1,13 @@ +from .compass import compass +from .tool_smith import tool_smith +from .architect import architect +from .scout import scout +from .techsage import techsage + +__all__ = [ + 'compass', + 'tool_smith', + 'architect', + 'scout', + 'techsage' +] \ No newline at end of file diff --git a/agents/architect.py b/agents/architect.py new file mode 100644 index 0000000..407b421 --- /dev/null +++ b/agents/architect.py @@ -0,0 +1,34 @@ +from langchain_core.messages import SystemMessage, HumanMessage +from agents.base import BaseAgent +from utils import all_tool_functions + +class ArchitectAgent(BaseAgent): + def __init__(self): + system_prompt = """You are architect, a ReAct agent that develops LangChain tools for other agents. + +You approach your given task this way: +1. Write the tool implementation and tests to disk. +2. Verify the tests pass. +3. Confirm the tool is complete with its name and a succinct description of its purpose. + +Tools MUST: +- Go in the `tools` directory +- Use the `@tool` decorator +- Include a docstring that succinctly describes what the tool does +- Have a corresponding test file that verifies the intended behavior +""" + super().__init__("architect", system_prompt, all_tool_functions()) + + def process(self, input_text: str) -> str: + """Process request to create a new tool.""" + return self.graph.invoke({ + "messages": [ + SystemMessage(self.system_prompt), + HumanMessage(input_text) + ] + }) + +def architect(task: str) -> str: + """Creates new tools for agents to use.""" + agent = ArchitectAgent() + return agent.process(task) \ No newline at end of file diff --git a/agents/base.py b/agents/base.py new file mode 100644 index 0000000..9bfaf1f --- /dev/null +++ b/agents/base.py @@ -0,0 +1,196 @@ +# agents/base.py +from typing import List, Any, Dict, TypedDict, Union, Literal +from abc import ABC, abstractmethod + +from langchain_core.messages import HumanMessage, SystemMessage, AIMessage +from langchain_core.utils.function_calling import convert_to_openai_function +from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder +from langchain_openai import ChatOpenAI +from langgraph.graph import StateGraph, END + +class AgentState(TypedDict): + """Type definition for agent state.""" + messages: List[Any] + iterations: int + final_answer: str | None + +class BaseAgent(ABC): + """Base class for all agents in the system.""" + + def __init__(self, name: str, system_prompt: str, tools: List[Any], max_iterations: int = 5): + self.name = name + self.system_prompt = system_prompt + self.tools = tools if tools is not None else [] + self.max_iterations = max_iterations + self.llm = ChatOpenAI( + model="gpt-4-turbo-preview", + temperature=0, + timeout=30 # 30 second timeout + ) + self.prompt = self._create_prompt() + self.tool_schemas = [convert_to_openai_function(t) for t in self.tools] + self.graph = self._build_graph() + + def _create_prompt(self) -> ChatPromptTemplate: + """Create the prompt template.""" + return ChatPromptTemplate.from_messages([ + ("system", f"""{self.system_prompt} + +Available tools: +{self._format_tools_description()} + +Instructions: +1. If you can answer directly, do so without using tools. +2. If you need to use tools, use them and then provide a clear final answer. +3. Always ensure you provide a clear, complete response."""), + MessagesPlaceholder(variable_name="chat_history"), + ("human", "{input}"), + ]) + + def _format_tools_description(self) -> str: + """Format tools description for the prompt.""" + if not self.tools: + return "No tools available" + return "\n".join(f"- {t.name}: {t.description}" for t in self.tools) + + def _build_graph(self): + """Build the agent's processing graph.""" + workflow = StateGraph(AgentState) + + # Add processing steps + workflow.add_node("process", self._process_step) + + # Set entry point + workflow.set_entry_point("process") + + # Add conditional ending + workflow.add_conditional_edges( + "process", + self._should_continue, + { + "continue": "process", + END: END + } + ) + + return workflow.compile() + + def _process_step(self, state: AgentState) -> AgentState: + """Process a single step.""" + messages = state.get('messages', []) + iterations = state.get('iterations', 0) + final_answer = state.get('final_answer') + + print(f"\n{self.name} is thinking... (iteration {iterations + 1}/{self.max_iterations})") + + try: + # If we've reached max iterations, generate final answer + if iterations >= self.max_iterations: + final_response = self._generate_final_answer(messages) + return { + "messages": messages, + "iterations": iterations + 1, + "final_answer": final_response + } + + # Get the last message + last_message = messages[-1] if messages else None + if not last_message: + return state + + # Get chat history + chat_history = messages[:-1] if len(messages) > 1 else [] + + # Get model response + response = self.llm.invoke( + self.prompt.format_messages( + input=last_message.content if hasattr(last_message, 'content') else str(last_message), + chat_history=chat_history + ), + functions=self.tool_schemas if self.tools else None + ) + + # If response has function call, execute tool + if hasattr(response, 'function_call') and response.function_call: + tool_result = self._execute_tool(response.function_call) + new_messages = messages + [ + AIMessage(content="", function_call=response.function_call), + SystemMessage(content=str(tool_result)) + ] + else: + new_messages = messages + [AIMessage(content=response.content)] + final_answer = response.content + + return { + "messages": new_messages, + "iterations": iterations + 1, + "final_answer": final_answer + } + + except Exception as e: + print(f"Error in processing: {str(e)}") + error_msg = f"Error occurred: {str(e)}" + return { + "messages": messages + [SystemMessage(content=error_msg)], + "iterations": iterations + 1, + "final_answer": error_msg + } + + def _execute_tool(self, function_call: Any) -> str: + """Execute a tool call.""" + try: + tool = next((t for t in self.tools if t.name == function_call.name), None) + if tool: + return tool.invoke(function_call.arguments) + return f"Tool {function_call.name} not found" + except Exception as e: + return f"Error executing tool: {str(e)}" + + def _generate_final_answer(self, messages: List[Any]) -> str: + """Generate a final answer from the conversation history.""" + try: + # Create a prompt to summarize the conversation + summary_prompt = f"""Based on the conversation history, provide a clear final answer. + If no clear answer was reached, provide the best possible response based on available information. + + History: {[m.content for m in messages if hasattr(m, 'content')]}""" + + response = self.llm.invoke(summary_prompt) + return response.content + except Exception as e: + return f"Failed to generate final answer: {str(e)}" + + def _should_continue(self, state: AgentState) -> Literal["continue", END]: + """Determine whether to continue processing.""" + iterations = state.get('iterations', 0) + final_answer = state.get('final_answer') + + if final_answer is not None or iterations >= self.max_iterations: + return END + return "continue" + + def process(self, input_text: str) -> str: + """Process input and return response.""" + initial_state = { + "messages": [ + SystemMessage(content=self.system_prompt), + HumanMessage(content=input_text) + ], + "iterations": 0, + "final_answer": None + } + + try: + final_state = self.graph.invoke(initial_state) + + if final_state.get('final_answer'): + return final_state['final_answer'] + + if final_state and "messages" in final_state: + messages = final_state["messages"] + return messages[-1].content if messages else "No response generated" + + return "No response generated" + except Exception as e: + print(f"Processing error: {str(e)}") + return f"Error processing request: {str(e)}" \ No newline at end of file diff --git a/agents/compass.py b/agents/compass.py new file mode 100644 index 0000000..40aebb1 --- /dev/null +++ b/agents/compass.py @@ -0,0 +1,255 @@ +from typing import List, Any, Dict, Literal +from langchain_core.messages import HumanMessage, SystemMessage, AIMessage +from agents.base import BaseAgent, AgentState +from tools.agent.list_available_agents import list_available_agents +from tools.agent.assign_agent_to_task import assign_agent_to_task +from langgraph.graph import END +import json +import re + +AGENT_DESCRIPTIONS = { + "tool_smith": "Creates new specialized agents for specific tasks", + "architect": "Creates and manages tools that other agents can use", + "scout": "Performs internet research and gathers information", + "techsage": "Handles code-related tasks and software development" +} + +class CompassAgent(BaseAgent): + def __init__(self): + system_prompt = """You are Compass, the orchestrator agent of ALMAZE. +Your role is to: +1. Understand the user's request +2. Determine which agent(s) would be best suited for the task +3. Coordinate between agents to accomplish the goal + +Available specialized agents: +- tool_smith: Creates new specialized agents for specific tasks +- architect: Creates and manages tools that other agents can use +- scout: Performs internet research and gathers information +- techsage: Handles code-related tasks and software development + +Follow these steps: +1. Analyze the user's request +2. If the task requires specialized capabilities: + - Delegate to the appropriate agent using the assign_agent_to_task tool + - Wait for their response and coordinate any follow-up tasks +3. If no specialized agent is needed: + - Respond directly to simple queries + - For complex tasks, break them down and coordinate multiple agents""" + + super().__init__("compass", system_prompt, [list_available_agents, assign_agent_to_task]) + + def _clean_agent_name(self, name: str) -> str: + cleaned_name = re.sub(r'^[\d\s\.]+', '', name).strip().lower() + + # Ensure the cleaned name matches one of the available agents + for valid_agent in AGENT_DESCRIPTIONS.keys(): + if valid_agent in cleaned_name: + return valid_agent + + return 'direct' + + def _analyze_task(self, task: str) -> Dict[str, Any]: + """Analyze the task to determine required agents with stricter agent selection.""" + analysis_prompt = f"""Carefully analyze this task and determine the MOST APPROPRIATE single agent to handle it. +Do NOT suggest multiple agents unless absolutely necessary. + +Task: {task} + +Available agents: +{AGENT_DESCRIPTIONS} + +Return your analysis in this format: +1. Primary agent needed (or 'direct' if compass can handle it) +2. Precise reason for choosing this agent +3. Additional agents (ONLY if truly required, otherwise 'None') +4. Brief task breakdown (if needed)""" + + response = self.llm.invoke(analysis_prompt) + + # Analyze the response to ensure a more focused agent selection + analysis = self._parse_analysis(response.content) + + # Additional filtering to prevent unnecessary agent chaining + if analysis['primary_agent'] == 'scout': + # For web research tasks, prevent automatic additional agents + analysis['additional_agents'] = [] + + return analysis + + def _parse_analysis(self, analysis: str) -> Dict[str, Any]: + """Parse the analysis response into a structured format.""" + try: + lines = analysis.split('\n') + return { + 'primary_agent': self._clean_agent_name(lines[0].split(':')[1].strip()), + 'reason': lines[1].split(':')[1].strip() if len(lines) > 1 else '', + 'additional_agents': [ + self._clean_agent_name(a.strip()) + for a in (lines[2].split(':')[1].split(',') if len(lines) > 2 and ':' in lines[2] + else lines[2].split(',') if len(lines) > 2 else []) + if a.strip() and a.strip().lower() != 'none' + ], + 'task_breakdown': lines[3:] if len(lines) > 3 else [] + } + except Exception as e: + print(f"Error parsing analysis: {str(e)}") + return { + 'primary_agent': 'direct', + 'reason': 'Error in analysis', + 'additional_agents': [], + 'task_breakdown': [] + } + + def _process_step(self, state: AgentState) -> AgentState: + """Process a single step with proper agent coordination.""" + print(f"\n{self.name} is thinking...") + messages = state.get('messages', []) + iterations = state.get('iterations', 0) + + try: + # Get the last message + last_message = messages[-1] if messages else None + if not last_message: + return state + + # Prepare response data + response_data = { + "task": last_message.content, + "analysis": None, + "response": None, + "agent_responses": [], + "error": None + } + + # Analyze task and coordinate agents + analysis = self._analyze_task(last_message.content) + response_data["analysis"] = analysis + + if analysis['primary_agent'] == 'direct': + # Direct response from Compass + chat_history = messages[:-1] if len(messages) > 1 else [] + response = self.llm.invoke( + self.prompt.format_messages( + input=last_message.content, + chat_history=chat_history + ) + ).content + response_data["response"] = response + response_data["agent_responses"] = [ + { + "agent": "compass", + "response": response + } + ] + else: + # Delegate to appropriate agents + response_data["agent_responses"] = self._delegate_to_agents(analysis, last_message.content) + response = self._format_responses( + [resp['response'] for resp in response_data["agent_responses"]], + analysis + ) + response_data["response"] = response + + # Convert response to JSON + json_response = json.dumps(response_data) + + # Update state + return { + "messages": messages + [AIMessage(content=json_response)], + "iterations": iterations + 1 + } + + except Exception as e: + error_msg = f"Error in processing: {str(e)}" + print(error_msg) + + error_response = { + "task": last_message.content if last_message else "No task", + "error": error_msg, + "suggestions": [ + "Try rephrasing your request", + "Break down the task into smaller steps", + "Check the task requirements" + ] + } + + return { + "messages": messages + [SystemMessage(content=json.dumps(error_response))], + "iterations": iterations + 1 + } + + def _delegate_to_agents(self, analysis: Dict[str, Any], task: str) -> List[Dict[str, str]]: + """Delegate task to appropriate agents and return their responses.""" + agent_responses = [] + + # Handle primary agent + primary_response = assign_agent_to_task.invoke({ + "agent_name": analysis['primary_agent'], + "task": task + }) + agent_responses.append({ + "agent": analysis['primary_agent'], + "response": primary_response + }) + + # Handle additional agents if needed + for agent in analysis['additional_agents']: + if agent != analysis['primary_agent']: + subtask = self._create_subtask(task, agent, agent_responses) + response = assign_agent_to_task.invoke({ + "agent_name": agent, + "task": subtask + }) + agent_responses.append({ + "agent": agent, + "response": response + }) + + return agent_responses + + def _create_subtask(self, original_task: str, agent: str, previous_responses: List[Dict[str, str]]) -> str: + """Create a subtask for an agent based on context.""" + previous_responses_str = "\n".join([ + f"[{resp['agent']}]: {resp['response']}" for resp in previous_responses + ]) + + return f"""Original task: {original_task} + +Previous responses: +{previous_responses_str} + +Based on the above, complete your part of the task as the {agent} agent. +Focus on your specialization: {AGENT_DESCRIPTIONS.get(agent, 'Complete the task')}""" + + def _format_responses(self, responses: List[str], analysis: Dict[str, Any]) -> str: + """Format responses into a coherent reply.""" + return f"""Task Analysis: +Primary Agent: {analysis['primary_agent']} +Reason: {analysis['reason']} + +Agent Responses: +{chr(10).join(responses)} + +Summary: +I've coordinated the appropriate agents to address your request. Above are their combined responses. +Let me know if you need any clarification or have additional questions.""" + + def _should_continue(self, state: AgentState) -> Literal["continue", END]: + """Determine whether to continue processing.""" + iterations = state.get('iterations', 0) + messages = state.get('messages', []) + + if iterations >= self.max_iterations: + return END + + last_message = messages[-1] if messages else None + if last_message and not hasattr(last_message, 'function_call'): + return END + + return "continue" + +def compass(session_id: str, task: str) -> str: + """The orchestrator that interacts with users and coordinates other agents.""" + agent = CompassAgent() + return agent.process(task) \ No newline at end of file diff --git a/agents/scout.py b/agents/scout.py new file mode 100644 index 0000000..01875e7 --- /dev/null +++ b/agents/scout.py @@ -0,0 +1,240 @@ +from typing import List, Any, Dict, Literal, Optional +from langchain_core.messages import HumanMessage, SystemMessage, AIMessage +from langgraph.graph import END +from agents.base import BaseAgent, AgentState +from tools.web.duck_duck_go_web_search import duck_duck_go_web_search +import json +import re +import logging +from datetime import datetime + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class ScoutAgent(BaseAgent): + def __init__(self): + system_prompt = """# Scout Agent + +## Role & Objective +You are a professional web researcher focused on delivering accurate, comprehensive, and well-structured information for any query. + +## Core Principles +1. Research Quality + - Thorough search across credible sources + - Fact verification and cross-referencing + - Focus on recent, reliable information + +2. Response Structure + - Clear, logical organization + - Key points with bullet points + - Supporting evidence and examples + - Source citations when relevant + +3. Content Balance + - Accuracy over speculation + - Clarity over complexity + - Concise yet comprehensive + - Neutral and objective tone + +## Process Flow +1. Analyze query intent +2. Gather relevant information +3. Synthesize findings +4. Present structured response with: + - Main concept explanation + - Key facts and details + - Practical implications + - Related context + +Your responses should be informative, clear, and well-organized, focusing on providing maximum value with optimal efficiency.""" + + super().__init__( + name="scout", + system_prompt=system_prompt, + tools=[duck_duck_go_web_search], + max_iterations=1 + ) + + def _clean_text(self, text: str) -> str: + try: + # Remove HTML tags + text = re.sub(r'<[^>]+>', '', text) + + # Normalize whitespace + text = re.sub(r'\s+', ' ', text).strip() + + # Remove special characters and normalize + text = re.sub(r'[^\w\s.,!?-]', '', text) + + return text + except Exception as e: + logger.warning(f"Text cleaning error: {e}") + return text + + def _format_query(self, query: str) -> str: + query = query.lower().strip() + + # Common query transformations + patterns = [ + (r'^what\s+is\s+', ''), + (r'^who\s+is\s+', ''), + (r'^how\s+does\s+', ''), + (r'^why\s+', '') + ] + + for pattern, repl in patterns: + query = re.sub(pattern, repl, query).strip() + + # Enhance query with descriptive terms + enhance_terms = [ + "definition", "explanation", "overview", + "key concepts", "main features", "important aspects" + ] + + return f"{query} {' '.join(enhance_terms)}" + + def _process_search_results(self, search_results: List[Dict[str, Any]], query: str) -> Dict[str, Any]: + try: + compiled_info = [] + sources = [] + + for result in search_results: + # Only add non-empty, unique snippets + snippet = self._clean_text(result.get('snippet', '')) + if snippet and snippet not in compiled_info: + compiled_info.append(snippet) + + # Collect unique sources + link = result.get('link', '') + if link and link not in sources: + sources.append(link) + + # Limit sources and info + sources = sources[:3] + compiled_info = compiled_info[:5] + + if not compiled_info: + return { + "status": "no_results", + "query": query, + "timestamp": datetime.now().isoformat(), + "error": "No information found", + "suggestions": [ + f"Ask about specific aspects of {query}", + "Use more specific terms", + "Rephrase your question" + ] + } + + return { + "status": "success", + "query": query, + "timestamp": datetime.now().isoformat(), + "compiled_info": compiled_info, + "sources": sources + } + + except Exception as e: + logger.error(f"Search result processing error: {e}") + return { + "status": "error", + "query": query, + "timestamp": datetime.now().isoformat(), + "error": str(e), + "suggestions": [ + "Try a different search approach", + "Check your internet connection", + "Simplify your query" + ] + } + + def _process_step(self, state: AgentState) -> AgentState: + logger.info(f"{self.name} is researching...") + messages = state.get('messages', []) + + try: + # Extract and format query + query = messages[-1].content if messages and hasattr(messages[-1], 'content') else "No query provided" + formatted_query = self._format_query(query) + + # Perform web search + search_results = duck_duck_go_web_search.invoke({ + "query": formatted_query, + "max_results": 3 + }) + + # Process search results + processed_results = self._process_search_results(search_results, query) + + # Handle no results scenario + if processed_results["status"] == "no_results": + return { + "messages": messages + [AIMessage(content=json.dumps(processed_results))], + "iterations": 1 + } + + # Generate comprehensive response + response_prompt = f"""Based on the following information, provide a comprehensive response about: {query} + +Information: +{chr(10).join(processed_results['compiled_info'])} + +Please structure your response as: +1. Direct, concise explanation and example (2-3 sentences) +2. Key characteristics or facts (3-4 bullet points) +3. Additional contextual information +4. Practical applications or implications (if relevant) + +and dont show these points as heading instead directly show your response in place of these points. + +Focus on clarity, accuracy, and providing meaningful insights.""" + + # Generate LLM response + llm_response = self.llm.invoke(response_prompt) + + # Prepare final response + response_data = { + "status": "success", + "query": query, + "timestamp": datetime.now().isoformat(), + "message": llm_response.content, + "sources": processed_results.get("sources", []), + "key_points": [ + point.strip() for point in llm_response.content.split('\n') + if point.strip() and not point.strip().startswith('1.') and not point.strip().startswith('2.') + ] + } + + return { + "messages": messages + [AIMessage(content=json.dumps(response_data))], + "iterations": 1 + } + + except Exception as e: + logger.error(f"Research process error: {e}") + error_response = { + "status": "error", + "query": query, + "timestamp": datetime.now().isoformat(), + "error": str(e), + "message": "Error occurred during research", + "suggestions": [ + "Try being more specific", + "Rephrase your question", + "Check your internet connection" + ] + } + return { + "messages": messages + [SystemMessage(content=json.dumps(error_response))], + "iterations": 1 + } + + def _should_continue(self, state: AgentState) -> Literal["continue", END]: + """Always end after one iteration.""" + return END + +def scout(task: str) -> str: + """Execute research task and return findings.""" + agent = ScoutAgent() + return agent.process(task) \ No newline at end of file diff --git a/agents/techsage.py b/agents/techsage.py new file mode 100644 index 0000000..3f0fd26 --- /dev/null +++ b/agents/techsage.py @@ -0,0 +1,285 @@ +from typing import List, Any, Dict, Literal, Optional +from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage +from langgraph.graph import END +from agents.base import BaseAgent, AgentState +from tools.file.write_to_file import write_to_file +from tools.agent.assign_agent_to_task import assign_agent_to_task +from tools.file.delete_file import delete_file +import json +import re +import logging +from datetime import datetime + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class TechSageAgent(BaseAgent): + def __init__(self, max_iterations: int = 1): + system_prompt = """You are techsage, a specialized development agent. + +Your primary task is to generate well-structured, production-ready code based on user requirements. + +Response Format: +Always structure your response with: +1. Code implementation in clear, distinct sections with proper file markers +2. Each file should be marked with ```language filename.ext +3. Include setup instructions and usage examples +4. Provide clear API documentation if applicable + +Follow these guidelines: +1. Use modern best practices +2. Include error handling +3. Add proper comments and documentation +4. Optimize for readability and maintainability +5. Consider scalability and performance + +Code Structure: +- Organize code logically +- Use consistent formatting +- Include necessary imports +- Add type hints where applicable +- Implement error handling +- Add proper validation + +Documentation Guidelines: +- Clear setup instructions +- Usage examples +- API documentation +- Configuration details +- Dependencies list +- Error handling guide""" + + super().__init__( + name="techsage", + system_prompt=system_prompt, + tools=[write_to_file, assign_agent_to_task], + max_iterations=max_iterations + ) + + def _analyze_task(self, task: str) -> Dict[str, Any]: + """Analyze the task to determine type and requirements with improved error handling.""" + analysis_prompt = f"""Analyze this development task and provide structured output: +Task: {task} + +Format response as a valid JSON with these keys: +- task_type: web/script/config/documentation +- language: programming language name +- files_required: list of filenames +- technologies: relevant technologies +- implementation_approach: brief strategy +- primary_features: key features""" + + try: + response = self.llm.invoke(analysis_prompt) + # Enhanced parsing to handle various JSON formats + content = response.content.strip() + + # Remove code block markers if present + content = re.sub(r'^```(json)?|```$', '', content, flags=re.MULTILINE).strip() + + # Attempt to parse JSON with fallback + try: + analysis = json.loads(content) + except json.JSONDecodeError: + # Attempt to fix common JSON formatting issues + content = re.sub(r'(?<=\w)\'', '"', content) # Replace single quotes with double quotes + analysis = json.loads(content) + + return analysis + except Exception as e: + logger.error(f"Task analysis error: {e}") + return { + "task_type": "script", + "language": "python", + "files_required": ["main.py"], + "technologies": ["python"], + "implementation_approach": "Basic implementation", + "primary_features": ["core functionality"] + } + + def _get_implementation_prompt(self, task: str, analysis: Dict[str, Any]) -> str: + """Generate a more comprehensive implementation prompt.""" + return f"""Comprehensive Code Generation Task + +Detailed Requirements: +- Primary Task: {task} +- Language: {analysis['language']} +- Project Type: {analysis['task_type']} +- Key Features: {', '.join(analysis['primary_features'])} + +Comprehensive Implementation Guidelines: +1. Create full implementation for each required file +2. Use modern {analysis['language']} best practices +3. Include robust error handling +4. Implement input validation +5. Add comprehensive type hints +6. Write clear, explanatory comments + +Structural Requirements: +- Each file must be marked with: ```{analysis['language']} filename.ext +- Include complete implementation +- Add section headers for: + a. Setup Instructions + b. Usage Examples + c. API Documentation (if applicable) + d. Configuration Guide + e. Error Handling Guide + +Provide a production-ready solution that emphasizes: +- Code quality +- Maintainability +- Scalability +- Performance considerations""" + + def _extract_code_blocks(self, content: str) -> Dict[str, Dict[str, Any]]: + """Enhanced code block extraction with robust parsing.""" + code_blocks = {} + code_block_pattern = re.compile(r'```(\w+)?\s*(\S+)\n(.*?)```', re.DOTALL) + + for match in code_block_pattern.finditer(content): + language = match.group(1) or 'text' + filename = match.group(2) + code = match.group(3).strip() + + code_blocks[filename] = { + "language": language, + "content": code, + "filename": filename + } + + return code_blocks + + def _process_step(self, state: AgentState) -> AgentState: + """Process a development task with enhanced error handling and logging.""" + logger.info(f"{self.name} is processing development task...") + messages: List[BaseMessage] = state.get('messages', []) + + try: + task = messages[-1].content if messages and hasattr(messages[-1], 'content') else "No task provided" + + # Task analysis + analysis = self._analyze_task(task) + + # Implementation generation + implementation_prompt = self._get_implementation_prompt(task, analysis) + implementation = self.llm.invoke(implementation_prompt) + + # Process code blocks + code_blocks = self._extract_code_blocks(implementation.content) + + # Write files + files_created = [] + for filename, file_data in code_blocks.items(): + try: + write_to_file.invoke({ + "filepath": filename, + "content": file_data["content"] + }) + files_created.append(filename) + except Exception as write_error: + logger.error(f"File write error for {filename}: {write_error}") + + # Comprehensive response generation + response_data = { + "status": "success", + "query": task, + "timestamp": datetime.now().isoformat(), + "analysis": { + "task_type": analysis.get("task_type", "undefined"), + "language": analysis.get("language", "undefined"), + "technologies": analysis.get("technologies", []) + }, + "implementation": { + "files": [ + { + "filename": file_data["filename"], + "language": file_data["language"], + "content": file_data["content"] + } + for file_data in code_blocks.values() + ], + "setup": self._extract_section(implementation.content, "Setup Instructions"), + "usage": self._extract_section(implementation.content, "Usage Examples"), + "api_docs": self._extract_section(implementation.content, "API Documentation"), + "configuration": self._extract_section(implementation.content, "Configuration Guide") + }, + "files_created": files_created + } + + for filename in files_created: + try: + delete_file.invoke({"filepath": filename}) + except Exception as delete_error: + logger.error(f"File deletion error for {filename}: {delete_error}") + + return { + "messages": messages + [AIMessage(content=json.dumps(response_data, indent=2))], + "iterations": 1 + } + + except Exception as e: + logger.error(f"Development task processing error: {e}") + error_response = { + "status": "error", + "query": task, + "timestamp": datetime.now().isoformat(), + "error": str(e), + "message": "Comprehensive error in code generation", + "suggestions": [ + "Provide more specific and granular requirements", + "Clearly specify the programming language and framework", + "Break down complex requirements into smaller, manageable tasks", + "Verify the input task description" + ] + } + return { + "messages": messages + [AIMessage(content=json.dumps(error_response, indent=2))], + "iterations": 1 + } + + def _extract_section(self, content: str, section_name: str) -> str: + """Enhanced section extraction with regex and multiple parsing strategies.""" + try: + # Regex pattern to find section content + section_pattern = re.compile( + rf'{section_name}:\n(.*?)(?=\n\n|\Z)', + re.DOTALL | re.IGNORECASE + ) + match = section_pattern.search(content) + + if match: + return match.group(1).strip() + + # Fallback parsing strategy + if section_name in content: + parts = content.split(section_name) + if len(parts) > 1: + section = parts[1].split('\n\n')[0].strip() + return section + except Exception as e: + logger.warning(f"Section extraction error for {section_name}: {e}") + + return "" + + def _should_continue(self, state: AgentState) -> Literal["continue", END]: + """Always terminate after one iteration.""" + return END + +def techsage(task: str) -> str: + """Execute development task and return comprehensive results.""" + try: + agent = TechSageAgent() + return agent.process(task) + except Exception as e: + logger.error(f"Tech Sage agent execution failed: {e}") + return json.dumps({ + "status": "critical_error", + "message": "Failed to execute engineering task", + "error": str(e), + "suggestions": [ + "Retry the task", + "Verify input requirements", + "Contact system administrator" + ] + }) \ No newline at end of file diff --git a/agents/tool_smith.py b/agents/tool_smith.py new file mode 100644 index 0000000..bc351ac --- /dev/null +++ b/agents/tool_smith.py @@ -0,0 +1,37 @@ +from typing import List, Any +from langchain_core.messages import SystemMessage, HumanMessage +from agents.base import BaseAgent, AgentState +from tools.file.write_to_file import write_to_file +from tools.file.read_file import read_file +from tools.file.delete_file import delete_file +from tools.file.overwrite_file import overwrite_file +from tools.agent.assign_agent_to_task import assign_agent_to_task + +class ToolSmithAgent(BaseAgent): + def __init__(self): + system_prompt = """You are tool_smith, a ReAct agent that develops other ReAct agents. + + You develop agents in python using LangGraph to define their flow. + You design agents with the tools they potentially need to complete their tasks. + + You approach your given task this way: + 1. Create a detailed plan for how to design an agent to achieve the task. + 2. If new tools are required, assign tasks to the architect agent. + 3. Write the agent implementation and a smoke test to disk. + 4. Verify the smoke test doesn't error. + 5. Confirm the agent is complete with its name and a succinct description of its purpose.""" + + tools = [ + write_to_file, + read_file, + delete_file, + overwrite_file, + assign_agent_to_task + ] + + super().__init__("tool_smith", system_prompt, tools) + +def tool_smith(task: str) -> str: + """Creates new agents for specific purposes.""" + agent = ToolSmithAgent() + return agent.process(task) \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..7a76538 --- /dev/null +++ b/app.py @@ -0,0 +1,94 @@ +# api_server.py +import os +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from pydantic import BaseModel +import uvicorn +import uuid +from typing import Dict, Optional +import gc +import psutil + +from agents.compass import compass + +app = FastAPI() + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # This allows all origins + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@app.middleware("http") +async def check_memory_usage(request, call_next): + # Force garbage collection before processing request + gc.collect() + + # Get current memory usage + process = psutil.Process(os.getpid()) + mem_before = process.memory_info().rss / 1024 / 1024 # Memory in MB + + # If memory usage is too high, refuse new requests + if mem_before > 900: # 900MB threshold + raise HTTPException( + status_code=503, + detail="Server is currently overloaded. Please try again later." + ) + + response = await call_next(request) + return response + +class ChatRequest(BaseModel): + message: str + session_id: Optional[str] = None + +class ChatResponse(BaseModel): + response: str + session_id: str + +# Store active sessions +active_sessions: Dict[str, dict] = {} + +@app.post("/api/chat", response_model=ChatResponse) +async def chat_endpoint(request: ChatRequest): + try: + # Generate session_id if not provided + session_id = request.session_id or str(uuid.uuid4()) + + # Initialize session if it doesn't exist + if session_id not in active_sessions: + active_sessions[session_id] = { + "history": [] + } + + # Process message through Compass agent + response = compass(session_id, request.message) + # print(response) + + # Update session history + active_sessions[session_id]["history"].append({ + "role": "user", + "content": request.message + }) + active_sessions[session_id]["history"].append({ + "role": "assistant", + "content": response + }) + + return ChatResponse( + response=response, + session_id=session_id + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.get("/api/health") +async def health_check(): + return {"status": "healthy"} + +if __name__ == "__main__": + uvicorn.run(app, port=8000) \ No newline at end of file diff --git a/config.py b/config.py new file mode 100644 index 0000000..479f480 --- /dev/null +++ b/config.py @@ -0,0 +1,29 @@ +import os +from pathlib import Path +from dotenv import load_dotenv +from langchain_openai import ChatOpenAI + +# Load environment variables +env_path = Path(__file__).parent / '.env' +load_dotenv(env_path) + +# Server configuration +HOST = os.getenv('HOST', '127.0.0.1') +PORT = int(os.getenv('PORT', 8000)) + +# OpenAI configuration +OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') +if not OPENAI_API_KEY: + raise ValueError("OPENAI_API_KEY must be set in .env file") +SERPER_API_KEY = os.getenv('SERPER_API_KEY') +if not SERPER_API_KEY: + raise ValueError("SERPER_API_KEY must be set in .env file") + +# Configure default language model +default_langchain_model = ChatOpenAI( + model="gpt-4-turbo-preview", + temperature=0 +) + +# Logging configuration +LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO') \ No newline at end of file diff --git a/nixpacks.toml b/nixpacks.toml new file mode 100644 index 0000000..b7340be --- /dev/null +++ b/nixpacks.toml @@ -0,0 +1,18 @@ + +[phases.setup] +cmds = [ + "curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y", + 'source $HOME/.cargo/env' +] +aptPkgs = [ + "build-essential", + "curl", + "pkg-config" +] +[phases.install] +cmds = [ + "python -m venv --copies /opt/venv", + ". /opt/venv/bin/activate", + "pip install --upgrade pip", + "pip install -r requirements.txt" +] diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..d8b4d6b --- /dev/null +++ b/pytest.ini @@ -0,0 +1,5 @@ +[pytest] +markers = + webtest: marks tests that require internet connection +addopts = -v --tb=short +asyncio_mode = auto \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..059f73e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,32 @@ +# langchain==0.1.0 +# langchain-openai==0.0.2 +# langgraph>=0.0.15 +# duckduckgo-search==4.1.1 +# beautifulsoup4==4.12.2 +# requests==2.31.0 +# pytest==8.0.0 +# python-dotenv==1.0.0 +# aiohttp==3.9.1 +# black==24.1.1 +# isort==5.13.2 +# mypy==1.8.0 +langchain +langchain-openai +langgraph +duckduckgo-search +beautifulsoup4 +requests +pytest +python-dotenv +aiohttp +black +isort +mypy +fastapi +uvicorn +pydantic +python-multipart +gunicorn +setuptools +wheel +psutil \ No newline at end of file diff --git a/runtime.txt b/runtime.txt new file mode 100644 index 0000000..dfe813b --- /dev/null +++ b/runtime.txt @@ -0,0 +1 @@ +python-3.11.6 \ No newline at end of file diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/agent/__init__.py b/tools/agent/__init__.py new file mode 100644 index 0000000..75ed740 --- /dev/null +++ b/tools/agent/__init__.py @@ -0,0 +1,7 @@ +from .list_available_agents import list_available_agents +from .assign_agent_to_task import assign_agent_to_task + +__all__ = [ + 'list_available_agents', + 'assign_agent_to_task' +] \ No newline at end of file diff --git a/tools/agent/assign_agent_to_task.py b/tools/agent/assign_agent_to_task.py new file mode 100644 index 0000000..70825ba --- /dev/null +++ b/tools/agent/assign_agent_to_task.py @@ -0,0 +1,24 @@ +from typing import Dict +from langchain_core.tools import tool + +@tool +def assign_agent_to_task(agent_name: str, task: str) -> str: + """Assigns a task to a specific agent.""" + try: + # Import agent module + agent_module = __import__(f"agents.{agent_name}", fromlist=[agent_name]) + + # Get agent function + agent_func = getattr(agent_module, agent_name) + + # Execute task with session ID (if required) + if agent_name == 'compass': + return agent_func('internal_session', task) + else: + return agent_func(task) + except ImportError: + return f"Error: Agent '{agent_name}' not found" + except AttributeError: + return f"Error: Agent function '{agent_name}' not found in module" + except Exception as e: + return f"Error assigning task to agent {agent_name}: {str(e)}" \ No newline at end of file diff --git a/tools/agent/list_available_agents.py b/tools/agent/list_available_agents.py new file mode 100644 index 0000000..76c5b5c --- /dev/null +++ b/tools/agent/list_available_agents.py @@ -0,0 +1,8 @@ +from typing import List +from langchain_core.tools import tool +from utils import all_agents + +@tool +def list_available_agents() -> List[str]: + """List all available agents in the system.""" + return all_agents() \ No newline at end of file diff --git a/tools/file/__init__.py b/tools/file/__init__.py new file mode 100644 index 0000000..c53a381 --- /dev/null +++ b/tools/file/__init__.py @@ -0,0 +1,11 @@ +from .write_to_file import write_to_file +from .read_file import read_file +from .delete_file import delete_file +from .overwrite_file import overwrite_file + +__all__ = [ + 'write_to_file', + 'read_file', + 'delete_file', + 'overwrite_file' +] \ No newline at end of file diff --git a/tools/file/delete_file.py b/tools/file/delete_file.py new file mode 100644 index 0000000..72729f9 --- /dev/null +++ b/tools/file/delete_file.py @@ -0,0 +1,15 @@ +from pathlib import Path +from langchain_core.tools import tool + +@tool +def delete_file(filepath: str) -> str: + """Delete a file.""" + try: + path = Path(filepath) + if not path.exists(): + return f"Error: File {filepath} does not exist" + + path.unlink() + return f"Successfully deleted {filepath}" + except Exception as e: + return f"Error deleting file: {str(e)}" \ No newline at end of file diff --git a/tools/file/overwrite_file.py b/tools/file/overwrite_file.py new file mode 100644 index 0000000..b7b0fc3 --- /dev/null +++ b/tools/file/overwrite_file.py @@ -0,0 +1,16 @@ +from pathlib import Path +from langchain_core.tools import tool + +@tool +def overwrite_file(filepath: str, content: str) -> str: + """Overwrite content in an existing file.""" + try: + path = Path(filepath) + if not path.exists(): + return f"Error: File {filepath} does not exist" + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + return f"Successfully overwrote {filepath}" + except Exception as e: + return f"Error overwriting file: {str(e)}" \ No newline at end of file diff --git a/tools/file/read_file.py b/tools/file/read_file.py new file mode 100644 index 0000000..2cd9672 --- /dev/null +++ b/tools/file/read_file.py @@ -0,0 +1,15 @@ +from pathlib import Path +from langchain_core.tools import tool + +@tool +def read_file(filepath: str) -> str: + """Read content from a file.""" + try: + path = Path(filepath) + if not path.exists(): + return f"Error: File {filepath} does not exist" + + with open(filepath, 'r', encoding='utf-8') as f: + return f.read() + except Exception as e: + return f"Error reading file: {str(e)}" \ No newline at end of file diff --git a/tools/file/write_to_file.py b/tools/file/write_to_file.py new file mode 100644 index 0000000..361a908 --- /dev/null +++ b/tools/file/write_to_file.py @@ -0,0 +1,17 @@ +import os +from pathlib import Path +from langchain_core.tools import tool + +@tool +def write_to_file(filepath: str, content: str) -> str: + """Write content to a file, creating directories if they don't exist.""" + try: + path = Path(filepath) + path.parent.mkdir(parents=True, exist_ok=True) + + with open(filepath, 'w', encoding='utf-8') as f: + f.write(content) + + return f"Successfully wrote content to {filepath}" + except Exception as e: + return f"Error writing to file: {str(e)}" \ No newline at end of file diff --git a/tools/web/__init__.py b/tools/web/__init__.py new file mode 100644 index 0000000..545c2ac --- /dev/null +++ b/tools/web/__init__.py @@ -0,0 +1,7 @@ +from .duck_duck_go_web_search import duck_duck_go_web_search +from .fetch_web_page_content import fetch_web_page_content + +__all__ = [ + 'duck_duck_go_web_search', + 'fetch_web_page_content' +] \ No newline at end of file diff --git a/tools/web/duck_duck_go_web_search.py b/tools/web/duck_duck_go_web_search.py new file mode 100644 index 0000000..19f139a --- /dev/null +++ b/tools/web/duck_duck_go_web_search.py @@ -0,0 +1,125 @@ +import os +from typing import List, Dict, Optional, Union +from langchain_core.tools import tool +import http.client +import json +import logging +from urllib.parse import quote_plus + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class WebSearchTool: + + def __init__(self, max_results: int = 5): + self.max_results = max_results + # Serper API key + self.serper_api_key = os.getenv('SERPER_API_KEY') + + def _clean_text(self, text: str) -> str: + return ' '.join(text.split()) + + def search_duckduckgo(self, query: str) -> List[Dict[str, str]]: + try: + import requests + + encoded_query = quote_plus(query) + url = f"https://api.duckduckgo.com/?q={encoded_query}&format=json" + + response = requests.get(url, timeout=10) + response.raise_for_status() + + data = response.json() + results = [] + + # Add abstract if available + if data.get('Abstract'): + results.append({ + 'title': data.get('AbstractSource', 'DuckDuckGo Abstract'), + 'link': data.get('AbstractURL', ''), + 'snippet': self._clean_text(data.get('Abstract', '')) + }) + + # Add related topics + for topic in data.get('RelatedTopics', [])[:self.max_results]: + if isinstance(topic, dict) and 'Text' in topic: + results.append({ + 'title': topic.get('FirstURL', '').split('/')[-1].replace('_', ' '), + 'link': topic.get('FirstURL', ''), + 'snippet': self._clean_text(topic.get('Text', '')) + }) + + return results + except Exception as e: + logger.error(f"DuckDuckGo search error: {e}") + return [] + + def search_serper(self, query: str) -> List[Dict[str, str]]: + if not self.serper_api_key: + logger.warning("Serper API key not found") + return [] + + try: + conn = http.client.HTTPSConnection("google.serper.dev") + payload = json.dumps({ + "q": query, + "num": self.max_results + }) + headers = { + 'X-API-KEY': self.serper_api_key, + 'Content-Type': 'application/json' + } + + conn.request("POST", "/search", payload, headers) + res = conn.getresponse() + data = res.read().decode("utf-8") + + # Parse the JSON response + search_results = json.loads(data) + + results = [] + for result in search_results.get('organic', []): + results.append({ + 'title': result.get('title', ''), + 'link': result.get('link', ''), + 'snippet': self._clean_text(result.get('snippet', '')) + }) + + return results + except Exception as e: + logger.error(f"Serper search error: {e}") + return [] + + def search(self, query: str) -> List[Dict[str, str]]: + + search_methods = [ + self.search_serper, # Changed order to try Serper first + self.search_duckduckgo, + ] + + for method in search_methods: + results = method(query) + if results: + return results + + # Fallback if all methods fail + logger.warning(f"No results found for query: {query}") + return [{ + 'title': 'Search Unavailable', + 'link': '', + 'snippet': f"Unable to find information about {query}. Please try a different query." + }] + +# Create a tool wrapper +@tool +def duck_duck_go_web_search(query: str, max_results: int = 5) -> List[Dict[str, str]]: + """ + Web search tool with multiple search strategies. + + :param query: Search query + :param max_results: Maximum number of results to return + :return: List of search results + """ + search_tool = WebSearchTool(max_results=max_results) + return search_tool.search(query) \ No newline at end of file diff --git a/tools/web/fetch_web_page_content.py b/tools/web/fetch_web_page_content.py new file mode 100644 index 0000000..c7547ea --- /dev/null +++ b/tools/web/fetch_web_page_content.py @@ -0,0 +1,33 @@ +import requests +from bs4 import BeautifulSoup +from langchain_core.tools import tool + +@tool +def fetch_web_page_content(url: str) -> str: + """Fetch and process the content of a web page.""" + try: + # Add user agent to avoid blocks + headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + } + + response = requests.get(url, headers=headers, timeout=10) + response.raise_for_status() + + soup = BeautifulSoup(response.text, 'html.parser') + + # Remove unwanted elements + for element in soup(['script', 'style', 'header', 'footer', 'nav']): + element.decompose() + + # Extract text content + text = soup.get_text(separator='\n', strip=True) + + # Clean up text + lines = (line.strip() for line in text.splitlines()) + chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) + text = '\n'.join(chunk for chunk in chunks if chunk) + + return text + except Exception as e: + return f"Error fetching web page: {str(e)}" \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..430a664 --- /dev/null +++ b/utils.py @@ -0,0 +1,72 @@ +import os +import importlib +import inspect +import logging +from typing import List, Optional, Dict, Any +from pathlib import Path + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +def setup_logging(log_level: str = 'INFO') -> None: + """Configure logging for the application.""" + numeric_level = getattr(logging, log_level.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f'Invalid log level: {log_level}') + logging.getLogger().setLevel(numeric_level) + +def get_project_root() -> Path: + """Get the project root directory.""" + return Path(__file__).parent + +def load_module_functions(directory: str, module_type: str) -> List[Any]: + """Load all functions from modules in a directory.""" + functions = [] + dir_path = get_project_root() / directory + + for item in os.listdir(dir_path): + if os.path.isdir(dir_path / item) and not item.startswith('__'): + # Handle subdirectories + for file in os.listdir(dir_path / item): + if file.endswith('.py') and not file.startswith('__'): + module_name = f"{directory}.{item}.{file[:-3]}" + module = importlib.import_module(module_name) + for name, obj in inspect.getmembers(module): + if inspect.isfunction(obj) and hasattr(obj, f"_{module_type}"): + functions.append(obj) + elif item.endswith('.py') and not item.startswith('__'): + # Handle files in the root of the directory + module_name = f"{directory}.{item[:-3]}" + module = importlib.import_module(module_name) + for name, obj in inspect.getmembers(module): + if inspect.isfunction(obj) and hasattr(obj, f"_{module_type}"): + functions.append(obj) + + return functions + +def all_tool_functions(exclude: Optional[List[str]] = None) -> List[Any]: + """Get all available tool functions.""" + return load_module_functions('tools', 'tool') + +def all_agents(exclude: Optional[List[str]] = None) -> List[str]: + """Get all available agents.""" + agents = [] + agents_dir = get_project_root() / 'agents' + + for file in os.listdir(agents_dir): + if file.endswith('.py') and not file.startswith('__'): + agent_name = file[:-3] + if exclude and agent_name in exclude: + continue + agents.append(agent_name) + + return agents + +def checkpointer(state: Dict[str, Any]) -> Dict[str, Any]: + """Checkpoint the current state.""" + logger.debug(f"Current state: {state}") + return state \ No newline at end of file