Source code for isek.team.isek_team

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, List, Optional, Union, Any, Dict, Sequence, Literal
from uuid import uuid4

from isek.agent.isek_agent import IsekAgent
from isek.memory.memory import Memory, UserMemory
from isek.models.base import Model, SimpleMessage
from isek.tools.toolkit import Toolkit
from isek.utils.log import log
from isek.utils.print_utils import print_response


[docs] @dataclass class IsekTeam: """Ultra-simplified Team class that coordinates multiple agents.""" # List of team members (agents or other teams) - required field members: List[Union[IsekAgent, "IsekTeam"]] # Team name name: Optional[str] = None # Team UUID (autogenerated if not set) team_id: Optional[str] = None # Model for this Team (used for coordination) model: Optional[Model] = None # Team memory memory: Optional[Memory] = None # Tools provided to the Team tools: Optional[List[Toolkit]] = None # A description of the Team description: Optional[str] = None # Success criteria for the task success_criteria: Optional[str] = None # List of instructions for the team instructions: Optional[Union[str, List[str], Callable]] = None # Enable debug logs debug_mode: bool = False # Coordination mode: 'coordinate' (default), 'route', 'collaborate', or 'sequential' mode: Literal["coordinate", "route", "collaborate", "sequential"] = "coordinate" def __post_init__(self): """Initialize the team after creation.""" # Set team ID if not provided if self.team_id is None: self.team_id = str(uuid4()) # Set debug mode if self.debug_mode: log.debug( f"Team initialized: {self.name or 'Unnamed'} (ID: {self.team_id})" )
[docs] def run( self, message: str, user_id: str = "default", session_id: Optional[str] = None, messages: Optional[List[Union[Dict, Any]]] = None, audio: Optional[Sequence[Any]] = None, images: Optional[Sequence[Any]] = None, videos: Optional[Sequence[Any]] = None, files: Optional[Sequence[Any]] = None, stream: Optional[bool] = None, stream_intermediate_steps: bool = False, knowledge_filters: Optional[Dict[str, Any]] = None, **kwargs: Any, ) -> str: """Execute the team's main functionality with the given message.""" return self.run_with_context(message, user_id, session_id)
[docs] def print_response(self, *args, **kwargs): """ Proxy to the shared print_response utility, passing self.run as run_func. """ return print_response(self.run, *args, **kwargs)
[docs] def run_with_context( self, message: str, user_id: str = "default", session_id: Optional[str] = None ) -> str: """Run the team with a message and return the response.""" if not self.members: raise ValueError("Team must have at least one member") # Generate session ID if not provided if session_id is None: session_id = str(uuid4()) # Handle single agent case if len(self.members) == 1 and isinstance(self.members[0], IsekAgent): return self.members[0].run(message, user_id, session_id) # Handle team coordination if self.mode == "coordinate": return self._coordinate_response(message, user_id, session_id) elif self.mode == "route": return self._route_response(message, user_id, session_id) elif self.mode == "collaborate": return self._collaborate_response(message, user_id, session_id) elif self.mode == "sequential": return self._sequential_response(message, user_id, session_id) else: raise ValueError(f"Unknown coordination mode: {self.mode}")
def _coordinate_response(self, message: str, user_id: str, session_id: str) -> str: """Coordinate multiple agents to generate a response.""" if self.model is None: raise ValueError("Model is required for team coordination") # Create system message for coordination system_message = self._build_coordination_message(message) # Get relevant memories if memory is available memory_context = self._get_memory_context(user_id) # Conversation history messages = [] if system_message: messages.append(SimpleMessage(role="system", content=system_message)) if memory_context: messages.append( SimpleMessage( role="system", content=f"Previous context:\n{memory_context}" ) ) messages.append(SimpleMessage(role="user", content=message)) # Call the model for coordination response = self.model.response(messages=messages) response_content = response.content or "No response generated" # Store in memory if available if self.memory: self._store_conversation(user_id, session_id, message, response_content) if self.debug_mode: log.debug(f"Team coordination response: {response_content}") return response_content def _route_response(self, message: str, user_id: str, session_id: str) -> str: """Route the task to the most appropriate team member.""" if self.model is None: raise ValueError("Model is required for team routing") # Create system message for routing system_message = self._build_routing_message(message) # Get relevant memories if memory is available memory_context = self._get_memory_context(user_id) # Conversation history messages = [] if system_message: messages.append(SimpleMessage(role="system", content=system_message)) if memory_context: messages.append( SimpleMessage( role="system", content=f"Previous context:\n{memory_context}" ) ) messages.append(SimpleMessage(role="user", content=message)) # Call the model for routing response = self.model.response(messages=messages) response_content = response.content or "No response generated" # Store in memory if available if self.memory: self._store_conversation(user_id, session_id, message, response_content) if self.debug_mode: log.debug(f"Team routing response: {response_content}") return response_content def _collaborate_response(self, message: str, user_id: str, session_id: str) -> str: """Collaborate with all team members to generate a response.""" if self.model is None: raise ValueError("Model is required for team collaboration") # Create system message for collaboration system_message = self._build_collaboration_message(message) # Get relevant memories if memory is available memory_context = self._get_memory_context(user_id) # Conversation history messages = [] if system_message: messages.append(SimpleMessage(role="system", content=system_message)) if memory_context: messages.append( SimpleMessage( role="system", content=f"Previous context:\n{memory_context}" ) ) messages.append(SimpleMessage(role="user", content=message)) # Call the model for collaboration response = self.model.response(messages=messages) response_content = response.content or "No response generated" # Store in memory if available if self.memory: self._store_conversation(user_id, session_id, message, response_content) if self.debug_mode: log.debug(f"Team collaboration response: {response_content}") return response_content def _sequential_response(self, message: str, user_id: str, session_id: str) -> str: """Run agents sequentially, passing output to next agent.""" current_message = message for member in self.members: if isinstance(member, IsekAgent): current_message = member.run(current_message, user_id, session_id) elif isinstance(member, IsekTeam): current_message = member.run_with_context( current_message, user_id, session_id ) return current_message def _build_coordination_message(self, user_message: str) -> str: """Build the coordination message for the team.""" parts = [] # Team description if self.description: parts.append(f"Team Description: {self.description}") else: parts.append( "You are coordinating a team of agents to respond to user requests." ) # Team members info parts.append("\nTeam Members:") for i, member in enumerate(self.members, 1): if isinstance(member, IsekAgent): member_desc = member.description or f"Agent {member.name or i}" parts.append(f"{i}. {member.name or f'Agent {i}'}: {member_desc}") elif isinstance(member, IsekTeam): member_desc = member.description or f"Team {member.name or i}" parts.append(f"{i}. {member.name or f'Team {i}'}: {member_desc}") # Success criteria if self.success_criteria: parts.append(f"\nSuccess Criteria: {self.success_criteria}") # Instructions if self.instructions: if isinstance(self.instructions, str): parts.append(f"\nInstructions: {self.instructions}") elif isinstance(self.instructions, list): parts.append("\nInstructions:") for instruction in self.instructions: parts.append(f"- {instruction}") elif callable(self.instructions): parts.append(f"\nInstructions: {self.instructions()}") # Coordination instructions parts.append( "\nCoordination Instructions:" "\n- Analyze the user's request" "\n- Determine which team members should be involved" "\n- Provide a comprehensive response that leverages the team's capabilities" "\n- If specific tools or calculations are needed, mention which agent would handle them" ) return "\n".join(parts) def _build_routing_message(self, user_message: str) -> str: """Build the routing message for the team.""" parts = [] # Team description if self.description: parts.append(f"Team Description: {self.description}") else: parts.append("You are routing tasks to the most appropriate team members.") # Team members info parts.append("\nTeam Members:") for i, member in enumerate(self.members, 1): if isinstance(member, IsekAgent): member_desc = member.description or f"Agent {member.name or i}" parts.append(f"{i}. {member.name or f'Agent {i}'}: {member_desc}") elif isinstance(member, IsekTeam): member_desc = member.description or f"Team {member.name or i}" parts.append(f"{i}. {member.name or f'Team {i}'}: {member_desc}") # Success criteria if self.success_criteria: parts.append(f"\nSuccess Criteria: {self.success_criteria}") # Instructions if self.instructions: if isinstance(self.instructions, str): parts.append(f"\nInstructions: {self.instructions}") elif isinstance(self.instructions, list): parts.append("\nInstructions:") for instruction in self.instructions: parts.append(f"- {instruction}") elif callable(self.instructions): parts.append(f"\nInstructions: {self.instructions()}") # Routing instructions parts.append( "\nRouting Instructions:" "\n- Analyze the user's request" "\n- Determine which team member is most suitable for the task" "\n- Route the task to the appropriate member" "\n- Provide a response that leverages the team's capabilities" "\n- If specific tools or calculations are needed, mention which agent would handle them" ) return "\n".join(parts) def _build_collaboration_message(self, user_message: str) -> str: """Build the collaboration message for the team.""" parts = [] # Team description if self.description: parts.append(f"Team Description: {self.description}") else: parts.append( "You are collaborating with all team members to respond to user requests." ) # Team members info parts.append("\nTeam Members:") for i, member in enumerate(self.members, 1): if isinstance(member, IsekAgent): member_desc = member.description or f"Agent {member.name or i}" parts.append(f"{i}. {member.name or f'Agent {i}'}: {member_desc}") elif isinstance(member, IsekTeam): member_desc = member.description or f"Team {member.name or i}" parts.append(f"{i}. {member.name or f'Team {i}'}: {member_desc}") # Success criteria if self.success_criteria: parts.append(f"\nSuccess Criteria: {self.success_criteria}") # Instructions if self.instructions: if isinstance(self.instructions, str): parts.append(f"\nInstructions: {self.instructions}") elif isinstance(self.instructions, list): parts.append("\nInstructions:") for instruction in self.instructions: parts.append(f"- {instruction}") elif callable(self.instructions): parts.append(f"\nInstructions: {self.instructions()}") # Collaboration instructions parts.append( "\nCollaboration Instructions:" "\n- Work together with all team members" "\n- Coordinate efforts to provide comprehensive responses" "\n- Leverage each member's unique capabilities" "\n- Provide a unified response that combines all members' expertise" "\n- If specific tools or calculations are needed, coordinate with the appropriate agents" ) return "\n".join(parts) def _get_memory_context(self, user_id: str) -> Optional[str]: """Get relevant memory context for the user.""" if not self.memory: return None memories = self.memory.get_user_memories(user_id) if not memories: return None memory_texts = [] for memory in memories: memory_texts.append(f"- {memory.memory}") return "Previous interactions:\n" + "\n".join(memory_texts) def _store_conversation( self, user_id: str, session_id: str, user_message: str, team_response: str ) -> None: """Store the conversation in memory.""" if not self.memory: return # Store user memory user_memory = UserMemory( memory=f"User: {user_message}\nTeam: {team_response}", topics=["conversation"], ) self.memory.add_user_memory(user_memory, user_id) # Store run run_data = { "user_message": user_message, "team_response": team_response, "timestamp": str(uuid4()), } self.memory.add_run(session_id, run_data) if self.debug_mode: log.debug( f"Stored conversation in memory for user {user_id}, session {session_id}" )
[docs] def get_agent_config(self) -> dict: """Get agent configuration for A2A protocol.""" return { "name": self.name or "ISEK Team", "description": self.description or "A team of ISEK agents", "instructions": self.instructions or "Coordinate team members effectively", "success_criteria": self.success_criteria or "Complete the task successfully", "lore": "This is a team of agents.", "knowledge": "", # Knowledge is specific to agents, so leave blank for team. }
[docs] def get_available_tools(self) -> List[dict]: """Get all available tools from team members.""" tools = [] for member in self.members: if isinstance(member, IsekAgent): if member.tools: for toolkit in member.tools: for func in toolkit.functions.values(): tools.append( {"type": "function", "function": func.to_dict()} ) elif isinstance(member, IsekTeam): tools.extend(member.get_available_tools()) return tools
[docs] def get_member_names(self) -> List[str]: """Get names of all team members.""" names = [] for member in self.members: if isinstance(member, IsekAgent): names.append(member.name or "Unnamed Agent") elif isinstance(member, IsekTeam): names.append(member.name or "Unnamed Team") return names
[docs] def get_member_by_name(self, name: str) -> Optional[Union[IsekAgent, "IsekTeam"]]: """Get a team member by name.""" for member in self.members: if isinstance(member, IsekAgent): if member.name == name: return member elif isinstance(member, IsekTeam): if member.name == name: return member return None
[docs] def add_member(self, member: Union[IsekAgent, "IsekTeam"]) -> None: """Add a new member to the team.""" self.members.append(member) if self.debug_mode: member_name = member.name or "Unnamed" log.debug(f"Added member '{member_name}' to team '{self.name}'")
[docs] def remove_member(self, member: Union[IsekAgent, "IsekTeam"]) -> bool: """Remove a member from the team.""" try: self.members.remove(member) if self.debug_mode: member_name = member.name or "Unnamed" log.debug(f"Removed member '{member_name}' from team '{self.name}'") return True except ValueError: return False
def __repr__(self) -> str: return f"Team(name='{self.name}', id='{self.team_id}', members={len(self.members)})"