Source code for isek.agent.memory

import datetime
import uuid
from typing import Dict, List, Any, Optional, Callable, TypeVar, Union
from isek.util.logger import logger # Assuming logger has a standard logging interface
import json

T = TypeVar('T')

[docs] class AgentMemory: """ Manages the complete cognitive state of an agent. This includes working memory for recent interactions, a long-term knowledge store, a queue for scheduled tasks, and internal state variables that track the agent's status and goals. """ def __init__(self): """ Initializes the agent's memory systems. Sets up the core components: - ``state_variables``: A dictionary to store key-value pairs representing the agent's current state. - ``working_memory``: A list to store recent interactions or thoughts. - ``task_queue``: A dictionary to manage tasks, their status, and priority. - ``knowledge_store``: A dictionary to store learned information or facts. """ self.logger = logger # Assuming logger is an instance of a logging object # Core memory components self.state_variables: Dict[str, Any] = {"is_registered": False, "goal": None} self.working_memory: List[str] = [] self.task_queue: Dict[str, Dict[str, Any]] = {} # task_id -> task_details self.knowledge_store: Dict[str, Dict[str, str]] = {} # knowledge_id -> knowledge_details
[docs] def log_operation(self, operation_name: str, details: str) -> None: """ Logs a memory operation using the configured logger. :param operation_name: The name of the memory operation performed (e.g., "set_state_variable"). :type operation_name: str :param details: Specific details about the operation (e.g., variable name and value). :type details: str """ if self.logger: self.logger.debug(f"Memory: {operation_name} - {details}")
[docs] def generate_timestamp(self) -> str: """ Generates a current timestamp string. The format is "YYYY-MM-DD HH:MM:SS". :return: The current timestamp as a formatted string. :rtype: str """ return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
[docs] def generate_unique_id(self) -> str: """ Generates a short unique ID. The ID is the first 8 characters of a UUID version 4. :return: A unique 8-character string identifier. :rtype: str """ return str(uuid.uuid4())[:8]
# --- State variable methods ---
[docs] def set_state_variable(self, variable_name: str, value: Any) -> str: """ Sets or updates a state variable in the agent's memory. :param variable_name: The name of the state variable to set or update. :type variable_name: str :param value: The new value for the state variable. :type value: typing.Any :return: A confirmation message indicating the variable was set. :rtype: str """ self.state_variables[variable_name] = value self.log_operation("set_state_variable", f"{variable_name}: {value}") return f"State variable '{variable_name}' set to '{value}'"
[docs] def get_state_variable(self, variable_name: str, default: T = None) -> Union[Any, T]: """ Retrieves the value of a state variable. :param variable_name: The name of the state variable to retrieve. :type variable_name: str :param default: The default value to return if the variable_name is not found. Defaults to None. :type default: T :return: The value of the state variable, or the default value if not found. :rtype: typing.Union[typing.Any, T] """ return self.state_variables.get(variable_name, default)
[docs] def get_all_state_variables(self) -> Dict[str, Any]: """ Retrieves all current state variables and their values. :return: A dictionary containing all state variables. :rtype: typing.Dict[str, typing.Any] """ self.log_operation("get_all_state_variables", f"count: {len(self.state_variables)}") return self.state_variables.copy()
# --- Working memory methods ---
[docs] def store_memory_item(self, content: str) -> str: """ Adds an item to the agent's working memory. Working memory typically stores recent interactions or temporary thoughts. :param content: The string content to store in working memory. :type content: str :return: A confirmation message. :rtype: str """ self.working_memory.append(content) self.log_operation("store_memory_item", f"item: {content[:50]}...") # Log truncated content return f"Memory item stored"
[docs] def get_recent_memory_items(self, count: int = 4) -> List[str]: """ Retrieves the most recent items from working memory. :param count: The maximum number of recent items to retrieve. Defaults to 4. :type count: int :return: A list of the most recent memory items, up to `count`. :rtype: typing.List[str] """ if len(self.working_memory) > count: return self.working_memory[-count:] return self.working_memory.copy() # Return a copy to prevent external modification
# --- Task queue methods ---
[docs] def create_task(self, task_description: str, priority: int = 1) -> str: """ Creates a new task and adds it to the task queue. Each task is assigned a unique ID and includes its description, creation timestamp, completion status, and priority. :param task_description: A description of the task to be performed. :type task_description: str :param priority: The priority of the task (e.g., higher number for higher priority). Defaults to 1. :type priority: int :return: The unique ID of the created task. :rtype: str """ task_id = self.generate_unique_id() self.task_queue[task_id] = { "description": task_description, "created_at": self.generate_timestamp(), "completed": False, "priority": priority } self.log_operation("create_task", f"id: {task_id}, task: {task_description}") return task_id
[docs] def mark_task_completed(self, task_id: str) -> bool: """ Marks a specified task in the queue as completed. Sets the 'completed' flag to True and records the completion timestamp. :param task_id: The ID of the task to mark as completed. :type task_id: str :return: `True` if the task was found and marked completed, `False` otherwise. :rtype: bool """ if task_id in self.task_queue: self.task_queue[task_id]["completed"] = True self.task_queue[task_id]["completed_at"] = self.generate_timestamp() self.log_operation("mark_task_completed", f"id: {task_id}") return True self.log_operation("mark_task_completed", f"Task ID not found: {task_id}") return False
[docs] def get_pending_tasks(self) -> Dict[str, Dict[str, Any]]: """ Retrieves all tasks that are currently not marked as completed. :return: A dictionary of pending tasks, where keys are task IDs and values are dictionaries of task details. :rtype: typing.Dict[str, typing.Dict[str, typing.Any]] """ pending = { task_id: task_info for task_id, task_info in self.task_queue.items() if not task_info.get("completed", False) } self.log_operation("get_pending_tasks", f"count: {len(pending)}") return pending
[docs] def get_all_tasks(self) -> Dict[str, Dict[str, Any]]: """ Retrieves all tasks from the queue, both pending and completed. :return: A dictionary of all tasks, where keys are task IDs and values are dictionaries of task details. :rtype: typing.Dict[str, typing.Dict[str, typing.Any]] """ return self.task_queue.copy() # Return a copy
# --- Knowledge store methods ---
[docs] def store_knowledge(self, topic: str, content: str) -> str: """ Stores a piece of knowledge in the knowledge store. Each knowledge item is associated with a topic and includes the content and a creation timestamp. :param topic: The topic or title for the knowledge item. :type topic: str :param content: The actual content of the knowledge item. :type content: str :return: The unique ID of the stored knowledge item. :rtype: str """ knowledge_id = self.generate_unique_id() self.knowledge_store[knowledge_id] = { "topic": topic, "content": content, "created_at": self.generate_timestamp() } self.log_operation("store_knowledge", f"id: {knowledge_id}, topic: {topic}") return knowledge_id
[docs] def retrieve_knowledge(self, topic: str) -> Optional[str]: """ Retrieves knowledge content associated with a specific topic. Searches the existing knowledge store for an item matching the given topic. .. note:: The original docstring mentioned a `generator_func` parameter, which is not present in the method signature. This version reflects the current signature. :param topic: The topic to retrieve knowledge about. :type topic: str :return: The content of the knowledge item if found, otherwise `None`. :rtype: typing.Optional[str] """ # Search existing knowledge for k_id, item in self.knowledge_store.items(): # k_id is not used here, could be `_` if item["topic"] == topic: self.log_operation("retrieve_knowledge", f"found - topic: {topic}") return item["content"] self.log_operation("retrieve_knowledge", f"not found - topic: {topic}") return None
[docs] def get_all_knowledge_topics(self) -> List[str]: """ Retrieves a list of all topics currently in the knowledge store. :return: A list of all knowledge topics. :rtype: typing.List[str] """ return [item["topic"] for item in self.knowledge_store.values()]
# --- Interaction Methods ---
[docs] def state_interaction(self, action: str, title: str, value: Any) -> str: """ Manages interactions with the agent's state variables. This method allows for updating, inserting (which is equivalent to updating), or fetching the agent's state. :param action: The action to perform: "update", "insert", or "fetch". :type action: str :param title: The name (key) of the state variable. :type title: str :param value: The value to set for the state variable (used for "update" and "insert"). Ignored for "fetch". :type value: typing.Any :return: A string confirming the action taken or providing the fetched state. :rtype: str """ if action == "update": self.set_state_variable(title, value) return f"State updated with '{title}' as '{value}'" elif action == "insert": # Functionally same as update for a dict self.set_state_variable(title, value) return f"State inserted with '{title}' as '{value}'" elif action == "fetch": return f"Your state is {str(self.get_all_state_variables())}" else: self.log_operation("state_interaction", f"Unknown action: {action}") return "No action is taken for state interaction due to unknown action."
[docs] def knowledge_interaction(self, action: str, content: str) -> str: #TODO: # For "insert" action, the `topic` should ideally be extracted from the `content` # or provided as a separate parameter, rather than using `content` for both. """ Manages interactions with the agent's knowledge store. Allows querying for existing knowledge or inserting new knowledge. :param action: The action to perform: "query" or "insert". :type action: str :param content: For "query", this is the topic to search for. For "insert", this is the content to store (also used as topic currently). :type content: str :return: A string confirming the action or the result of the query. :rtype: str """ if action == "query": retrieved_knowledge = self.retrieve_knowledge(content) # `content` is treated as topic if retrieved_knowledge: return f"Knowledge found for '{content}': {retrieved_knowledge}" else: return f"No knowledge found for topic: '{content}'" elif action == "insert": # TODO: topic should be extracted from the content knowledge_id = self.store_knowledge(topic=content, content=content) # Using content as topic return f"Knowledge '{content[:50]}...' inserted with ID {knowledge_id} (topic: '{content[:30]}...')" else: self.log_operation("knowledge_interaction", f"Unknown action: {action}") return "No action is taken for knowledge interaction due to unknown action."
[docs] def scheduling(self, option: str, id: Optional[str] = None, task: Optional[str] = None, finished: bool = False) -> str: """ Manages the agent's task schedule. Allows adding new tasks, marking tasks as complete, or fetching all tasks. .. warning:: The `finished` parameter is present in the signature but not currently used. In the "complete" option, `task_id` is used in the return message but refers to the local `id` parameter. :param option: The scheduling action: "add", "complete", or "fetch". :type option: str :param id: The ID of the task (used for "complete"). Defaults to None. :type id: typing.Optional[str] :param task: The description of the task (used for "add"). Defaults to None. :type task: typing.Optional[str] :param finished: This parameter is currently unused. Defaults to False. :type finished: bool :return: A string confirming the action or providing the fetched schedule. :rtype: str """ if option == "add": if task is None: return "Task description cannot be None for 'add' option." task_id_created = self.create_task(task) return f"Task '{task}' added to your schedule. Task ID is {task_id_created}." elif option == "complete": if id is None: return "Task ID cannot be None for 'complete' option." if self.mark_task_completed(id): # The original code used 'task_id' here, which was not defined in this scope. # It should probably refer to 'id' or fetch the task description. # For now, let's use 'id' as the identifier. task_description = self.task_queue.get(id, {}).get("description", "Unknown task") return f"Task '{task_description}' (ID: {id}) marked as completed in your schedule." else: return f"Failed to mark task with ID '{id}' as completed (task not found)." elif option == "fetch": return f"Your schedule is {str(self.get_all_tasks())}" else: self.log_operation("scheduling", f"Unknown option: {option}") return "No action is taken for scheduling due to unknown option."