Source code for isek.node.etcd_registry

import json
from typing import Optional, Dict

import etcd3gw
import base64
from ecdsa.keys import SigningKey, VerifyingKey
from ecdsa.curves import NIST256p

from isek.utils.log import log
from isek.node.registry import Registry


[docs] class EtcdRegistry(Registry): def __init__( self, host: Optional[str] = None, port: Optional[int] = None, parent_node_id: Optional[str] = "root", etcd_client: Optional[etcd3gw.Etcd3Client] = None, ttl: int = 30, ): if host and port and etcd_client: log.warning( "Both 'host/port' and 'etcd_client' provided. Using 'etcd_client'." ) if etcd_client: self.etcd_client = etcd_client elif host and port: self.etcd_client = etcd3gw.client(host=host, port=port) else: raise TypeError( "Either 'host' and 'port' or 'etcd_client' must be provided." ) self.parent_node_id = parent_node_id or "root" if not self.etcd_client.status(): raise ConnectionError("Failed to connect to the etcd server.") self.sk = SigningKey.generate(curve=NIST256p) self.ttl = ttl self.leases: Dict[str, etcd3gw.Lease] = {}
[docs] def register_node( self, node_id: str, host: str, port: int, metadata: Optional[Dict[str, str]] = None, ): vk = self.sk.verifying_key if vk is None: raise ValueError("Verifying key could not be generated.") vk_bytes = vk.to_string() vk_base64 = base64.b64encode(vk_bytes).decode("utf-8") node_info = { "node_id": node_id, "host": host, "port": port, "public_key": vk_base64, "metadata": metadata or {}, } node_info_json = json.dumps(node_info, sort_keys=True).encode("utf-8") signature = base64.b64encode(self.sk.sign_deterministic(node_info_json)).decode( "utf-8" ) node_entry = {"node_info": node_info, "signature": signature} lease = self.etcd_client.lease(self.ttl) if lease: self.leases[node_id] = lease key = f"/{self.parent_node_id}/{node_id}" self.etcd_client.put(key, json.dumps(node_entry), lease=lease) log.info(f"Node {node_id} has been registered to etcd.")
[docs] def lease_refresh(self, node_id: str): lease_refresh_response = None try: self.__verify_signature(node_id) if node_id in self.leases: self.leases[node_id].refresh() # log.debug(f"Lease renewed for node: {node_id}, response: {lease_refresh_response}") except Exception as e: log.exception( f"Lease renewal failed for node {node_id}, response: {lease_refresh_response}: {e}" )
[docs] def get_available_nodes(self) -> Dict[str, dict]: nodes = {} key_prefix = f"/{self.parent_node_id}/" for value, metadata in self.etcd_client.get_prefix(key_prefix): if not isinstance(metadata, dict) or "key" not in metadata: continue key_bytes = metadata.get("key") if not isinstance(key_bytes, bytes): continue node_id = key_bytes.decode("utf-8").split(key_prefix)[-1] try: if isinstance(value, bytes): nodes[node_id] = json.loads(value.decode("utf-8"))["node_info"] except Exception as e: log.exception(f"Error decoding node {node_id}: {e}") return nodes
[docs] def deregister_node(self, node_id: str): key = f"/{self.parent_node_id}/{node_id}" self.__verify_signature(node_id) self.etcd_client.delete(key) if node_id in self.leases: self.leases[node_id].revoke() del self.leases[node_id] log.info(f"Node {node_id} deregistered.")
def __verify_signature(self, node_id): key = f"/{self.parent_node_id}/{node_id}" result = self.etcd_client.get(key) if not result or not result[0]: raise ValueError(f"Node {node_id} not found") node_entry_json = result[0] if not isinstance(node_entry_json, (bytes, str)): raise TypeError(f"Expected bytes or str, but got {type(node_entry_json)}") node_entry = json.loads(node_entry_json) node_info = node_entry["node_info"] node_base64_signature = node_entry["signature"] vk_bytes = base64.b64decode(node_info["public_key"]) vk = VerifyingKey.from_string(vk_bytes, curve=NIST256p) node_info_json = json.dumps(node_info, sort_keys=True).encode("utf-8") signature_bytes = base64.b64decode(node_base64_signature) try: vk.verify(signature_bytes, node_info_json) except Exception as e: raise ValueError( f"Signature verification failed for node {node_id}! Reason: {e}" )