Source code for isek.isek_center

import json
import threading
import time
from typing import Dict, Any, Optional, Tuple, Union # Added Optional, Tuple, Union

from flask import Flask, request, jsonify, Blueprint
from werkzeug.exceptions import HTTPException # For type hinting Flask error responses

# --- Global State & Configuration ---
isek_center_blueprint = Blueprint('isek_center_blueprint', __name__, url_prefix='/isek_center')

# In-memory storage for registered nodes.
# Structure: { "node_id": {"node_id": str, "host": str, "port": int, "metadata": dict, "expires_at": float} }
nodes: Dict[str, Dict[str, Any]] = {}
NODE_LOCK = threading.Lock() # To protect access to the 'nodes' dictionary

LEASE_DURATION: int = 30  # Seconds for node lease


# --- Response Helper Class ---
[docs] class CommonResponse: """ A helper class to standardize JSON API responses. """ def __init__(self, data: Optional[Any], code: int, message: str): """ Initializes a CommonResponse object. :param data: The payload data of the response. Can be None. :type data: typing.Optional[typing.Any] :param code: The HTTP-like status code for the response (e.g., 200 for success, 500 for fail). :type code: int :param message: A descriptive message accompanying the response. :type message: str """ self.data: Optional[Any] = data self.code: int = code self.message: str = message
[docs] @classmethod def success(cls, data: Optional[Any] = None, code: int = 200, message: str = 'success') -> Tuple[Dict[str, Any], int]: """ Creates a standardized success response. :param data: Optional data payload for the success response. :type data: typing.Optional[typing.Any] :param code: The success code. Defaults to 200. :type code: int :param message: The success message. Defaults to 'success'. :type message: str :return: A tuple containing the response dictionary and the HTTP status code. :rtype: typing.Tuple[typing.Dict[str, typing.Any], int] """ return jsonify(cls(data, code, message).to_dict()), code
[docs] @classmethod def fail(cls, message: str, code: int = 400, data: Optional[Any] = None) -> Tuple[Dict[str, Any], int]: """ Creates a standardized failure/error response. Commonly used for client errors (e.g., bad request, not found). :param message: The failure message. :type message: str :param code: The error code. Defaults to 400 (Bad Request). :type code: int :param data: Optional data payload accompanying the failure. :type data: typing.Optional[typing.Any] :return: A tuple containing the response dictionary and the HTTP status code. :rtype: typing.Tuple[typing.Dict[str, typing.Any], int] """ return jsonify(cls(data, code, message).to_dict()), code
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the CommonResponse instance to a dictionary. :return: A dictionary representation of the response. :rtype: typing.Dict[str, typing.Any] """ return { "code": self.code, "message": self.message, "data": self.data }
# --- Flask Route Definitions --- FlaskResponse = Tuple[Dict[str, Any], int] # Type alias for Flask view return
[docs] @isek_center_blueprint.route('/register', methods=['POST']) def register_node_route() -> FlaskResponse: """ Registers a new node or updates an existing node's registration. Expects a JSON payload with `node_id`, `host`, `port`, and optional `metadata`. Assigns an expiration time to the node's registration (lease). **Request JSON Body:** .. code-block:: json { "node_id": "unique_node_identifier", "host": "node_hostname_or_ip", "port": 8080, "metadata": {"key": "value"} } **Responses:** - **200 OK:** Node registered successfully. - **400 Bad Request:** Missing required fields (`node_id`, `host`, `port`). :return: A Flask JSON response. :rtype: FlaskResponse """ data = request.json if not data: return CommonResponse.fail(message="Request body must be JSON.", code=400) node_id = data.get('node_id') host = data.get('host') port = data.get('port') metadata = data.get('metadata') # Optional if not all([node_id, host, port]): # Port can be 0, but typically not for services. return CommonResponse.fail(message="'node_id', 'host', and 'port' are required fields.", code=400) if not isinstance(port, int): return CommonResponse.fail(message="'port' must be an integer.", code=400) with NODE_LOCK: nodes[node_id] = { "node_id": node_id, "host": host, "port": port, "metadata": metadata or {}, # Ensure metadata is a dict "expires_at": time.time() + LEASE_DURATION } # print(f"Node registered/updated: {node_id}, Expires at: {nodes[node_id]['expires_at']}") # Debug return CommonResponse.success(message=f"Node '{node_id}' registered successfully.")
[docs] @isek_center_blueprint.route('/deregister', methods=['POST']) def deregister_node_route() -> FlaskResponse: """ Deregisters a node from the service. Expects a JSON payload with `node_id`. **Request JSON Body:** .. code-block:: json { "node_id": "unique_node_identifier" } **Responses:** - **200 OK:** Node deregistered successfully. - **400 Bad Request:** Missing or invalid `node_id`. - **404 Not Found:** If `node_id` does not exist (alternative to 400 for invalid). :return: A Flask JSON response. :rtype: FlaskResponse """ data = request.json if not data: return CommonResponse.fail(message="Request body must be JSON.", code=400) node_id = data.get('node_id') if not node_id: return CommonResponse.fail(message="'node_id' is required.", code=400) with NODE_LOCK: if node_id not in nodes: return CommonResponse.fail(message=f"Node '{node_id}' not found for deregistration.", code=404) removed_node = nodes.pop(node_id) # print(f"Node deregistered: {node_id}, Details: {removed_node}") # Debug return CommonResponse.success(message=f"Node '{node_id}' deregistered successfully.")
[docs] @isek_center_blueprint.route('/available_nodes', methods=['GET']) def get_available_nodes_route() -> FlaskResponse: """ Retrieves a list of all currently active (non-expired) registered nodes. **Responses:** - **200 OK:** Returns a list of available nodes. The response data will be structured as follows: .. code-block:: json { "available_nodes": { "node_id_1": { "node_id": "node_id_1", "host": "host1.example.com", "port": 8080, "metadata": {"key1": "value1"}, "expires_at": 1678886400.0 }, "node_id_2": { "node_id": "node_id_2", "host": "host2.example.com", "port": 8081, "metadata": {}, "expires_at": 1678886500.0 } } } :return: A Flask JSON response containing the available nodes. :rtype: FlaskResponse """ current_time = time.time() with NODE_LOCK: # Create a copy to iterate over, in case cleanup_expired_nodes runs concurrently # (though cleanup also uses the lock, this is safer for reads if not). # More importantly, this filters based on expiration. active_nodes = { k: v for k, v in nodes.items() if v.get('expires_at', 0) > current_time } response_payload = {"available_nodes": active_nodes} # print(f"Returning {len(active_nodes)} available nodes.") # Debug return CommonResponse.success(data=response_payload)
[docs] @isek_center_blueprint.route('/renew', methods=['POST']) def renew_lease_route() -> FlaskResponse: """ Renews the lease for an existing registered node. Expects a JSON payload with `node_id`. Updates the node's `expires_at` timestamp. **Request JSON Body:** .. code-block:: json { "node_id": "unique_node_identifier" } **Responses:** - **200 OK:** Lease renewed successfully. - **400 Bad Request:** Missing or invalid `node_id`. - **404 Not Found:** If `node_id` does not exist. :return: A Flask JSON response. :rtype: FlaskResponse """ data = request.json if not data: return CommonResponse.fail(message="Request body must be JSON.", code=400) node_id = data.get('node_id') if not node_id: return CommonResponse.fail(message="'node_id' is required.", code=400) with NODE_LOCK: if node_id not in nodes: return CommonResponse.fail(message=f"Node '{node_id}' not found for lease renewal.", code=404) # Check if it's already expired before renewing (optional, but good for consistency) # if nodes[node_id].get('expires_at', 0) <= time.time(): # return CommonResponse.fail(message=f"Node '{node_id}' lease has already expired. Please re-register.", code=410) # 410 Gone nodes[node_id]['expires_at'] = time.time() + LEASE_DURATION # print(f"Lease renewed for node: {node_id}, New expires_at: {nodes[node_id]['expires_at']}") # Debug return CommonResponse.success(message=f"Lease for node '{node_id}' renewed successfully.")
# --- Background Task for Node Cleanup ---
[docs] def cleanup_expired_nodes() -> None: """ Periodically removes expired nodes from the `nodes` dictionary. This function runs in a separate daemon thread. It checks every 5 seconds for nodes whose lease (`expires_at`) has passed. """ # print("Cleanup thread started.") # Debug while True: time.sleep(5) # Check interval current_time = time.time() expired_node_ids: List[str] = [] with NODE_LOCK: # Identify expired nodes without modifying dict during iteration for node_id, node_data in nodes.items(): if node_data.get('expires_at', 0) <= current_time: expired_node_ids.append(node_id) # Remove expired nodes for node_id in expired_node_ids: removed_node = nodes.pop(node_id, None) # Safely remove if removed_node: # print(f"Cleanup: Removed expired node '{node_id}'. Expired at: {removed_node.get('expires_at')}") # Debug pass # Optionally log to Flask/app logger if configured
# --- Main Application Setup ---
[docs] def main() -> None: """ Sets up and runs the Flask application for the Isek Center. Initializes a background thread for cleaning up expired node registrations and starts the Flask development server. """ # Start the cleanup thread cleanup_thread = threading.Thread(target=cleanup_expired_nodes, daemon=True) cleanup_thread.start() # Create and configure the Flask app app = Flask(__name__) app.register_blueprint(isek_center_blueprint) # Optional: Add a root/health check endpoint @app.route('/') def health_check() -> str: """Basic health check endpoint.""" return "Isek Center is running!" # Run the Flask app # For production, use a proper WSGI server like Gunicorn or uWSGI # print("Starting Isek Center Flask application on http://0.0.0.0:8088") # Debug app.run(host='0.0.0.0', port=8088, debug=False) # debug=False for production-like behavior
if __name__ == '__main__': main()