Source code for isek.node.node_v2

import threading
import uuid
from abc import ABC
from typing import Dict, Any, Optional
from isek.exceptions import NodeUnavailableError
from isek.node.default_registry import DefaultRegistry
from isek.node.registry import Registry
from isek.protocol.a2a_protocol import A2AProtocol
from isek.protocol.protocol import Protocol
from isek.adapter.base import Adapter
from isek.adapter.simple_adapter import SimpleAdapter
from isek.utils.log import log

NodeDetails = Dict[str, Any]


[docs] class Node(ABC): def __init__( self, host: str = "localhost", port: int = 8080, p2p: bool = False, p2p_server_port: int = 9000, node_id: Optional[str] = None, protocol: Optional[Protocol] = None, registry: Optional[Registry] = None, adapter: Optional[Adapter] = None, **kwargs: Any, # To absorb any extra arguments ): if not host: raise ValueError("Node host cannot be empty.") if not isinstance(port, int) or not (0 < port < 65536): raise ValueError(f"Invalid port number for Node: {port}") if not node_id: node_id = uuid.uuid4().hex self.host: str = host self.port: int = port self.p2p: bool = p2p self.p2p_server_port: int = p2p_server_port self.node_id: str = node_id self.all_nodes: Dict[str, NodeDetails] = {} self.registry = registry or DefaultRegistry() self.adapter = adapter or SimpleAdapter() self.protocol = protocol or A2AProtocol( host=self.host, port=self.port, adapter=self.adapter, p2p=self.p2p, p2p_server_port=self.p2p_server_port, )
[docs] def send_message(self, receiver_node_id: str, message: str, retry_count: int = 3): current_retry = 0 while current_retry < retry_count: try: receiver_node_details = self.all_nodes.get(receiver_node_id) if not receiver_node_details: # Refresh nodes once if not found, in case cache is stale. log.warning( f"Receiver node '{receiver_node_id}' not found in local cache. Refreshing node list once." ) self.__refresh_nodes() receiver_node_details = self.all_nodes.get(receiver_node_id) if not receiver_node_details: raise NodeUnavailableError( receiver_node_id, "Node not found in registry after refresh.", ) if self.p2p: return self.protocol.send_p2p_message( self.node_id, receiver_node_details["metadata"]["p2p_address"], message, ) return self.protocol.send_message( self.node_id, receiver_node_details["metadata"]["url"], message ) except Exception as e: log.exception( f"Attempt {current_retry + 1}/{retry_count}: Unexpected error sending message to " f"node '{receiver_node_id}'. Message: '{message}'. Error: {e}", exc_info=True, ) current_retry += 1 log.error( f"Failed to send message to node '{receiver_node_id}' after {retry_count} retries." ) return f"Error: Message delivery to '{receiver_node_id}' failed after {retry_count} attempts."
[docs] def build_server(self, daemon: bool = False) -> None: if self.p2p: self.protocol.bootstrap_p2p_extension() log.info("The p2p service has been launched.") if self.registry and self.adapter: node_metadata = self.adapter.get_adapter_card().__dict__ node_metadata["url"] = f"http://{self.host}:{self.port}" node_metadata["peer_id"] = self.protocol.peer_id node_metadata["p2p_address"] = self.protocol.p2p_address self.registry.register_node( node_id=self.node_id, host=self.host, port=self.port, metadata=node_metadata, ) self.__bootstrap_heartbeat() # Starts the recurring heartbeat if self.p2p: if not self.protocol.peer_id or not self.protocol.p2p_address: raise RuntimeError("p2p server not started, please check.") if not daemon: self.protocol.bootstrap_server() else: server_thread = threading.Thread( target=self.protocol.bootstrap_server, daemon=True ) server_thread.start()
def __bootstrap_heartbeat(self) -> None: """ Manages the node's heartbeat to the registry. This method performs two main actions: 1. Refreshes the node's lease with the registry to keep it active. 2. Refreshes the local cache of available nodes (`self.all_nodes`). It then schedules itself to run again after a fixed interval (5 seconds). """ try: self.registry.lease_refresh(self.node_id) log.debug(f"Node '{self.node_id}' lease refreshed successfully.") except Exception as e: log.warning( f"Failed to refresh lease for node '{self.node_id}': {e}. " "Node might be deregistered if this persists.", exc_info=True, ) # Depending on severity, could attempt re-registration or shutdown. try: self.__refresh_nodes() except Exception as e: log.warning( f"Failed to refresh node list for '{self.node_id}': {e}", exc_info=True ) # Schedule next heartbeat # Ensure timer is managed properly if the node is shut down timer = threading.Timer(5, self.__bootstrap_heartbeat) timer.daemon = True # Allows main program to exit even if timer is active timer.start() log.debug(f"Node '{self.node_id}' heartbeat scheduled.") def __refresh_nodes(self) -> None: """ Refreshes the local cache of available nodes (`self.all_nodes`) by querying the service registry. The commented-out section suggests plans for integrating a vector-based node index (e.g., using Faiss) for more advanced node discovery. """ try: current_available_nodes = self.registry.get_available_nodes() # Simple update: replace the old list. # More sophisticated updates might involve diffing for logging or specific actions. if self.all_nodes != current_available_nodes: # Basic check for changes # logger.info(f"Node list updated for '{self.node_id}'. " # f"Previous count: {len(self.all_nodes)}, New count: {len(current_available_nodes)}.") self.all_nodes = current_available_nodes else: log.debug( f"Node list for '{self.node_id}' remains unchanged. Count: {len(self.all_nodes)}." ) except Exception as e: # This exception is from self.registry.get_available_nodes() log.error( f"Failed to retrieve available nodes from registry: {e}", exc_info=True ) # self.all_nodes might become stale if this fails repeatedly.
[docs] def stop_server(self) -> None: self.protocol.stop_server()