#!/usr/bin/env python3 import sys import os import signal import logging from typing import Optional sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from common.utils import setup_logging from config.settings import ( ALGORITHM_SERVER_HOST, ALGORITHM_SERVER_PORT, LOG_LEVEL, LOG_FILE, RCS_SERVER_HOST, RCS_SERVER_PORT, MONITOR_POLLING_INTERVAL ) from algorithm_system.algorithm_server import AlgorithmServer class AlgorithmApplication: def __init__(self, host: str = None, port: int = None, log_level: str = None, enable_path_monitor: bool = True, monitor_interval: float = None, rcs_host: str = None, rcs_port: int = None): # Initialize algorithm system application self.host = host or ALGORITHM_SERVER_HOST self.port = port or ALGORITHM_SERVER_PORT self.log_level = log_level or LOG_LEVEL self.enable_path_monitor = enable_path_monitor self.monitor_interval = monitor_interval or MONITOR_POLLING_INTERVAL self.rcs_host = rcs_host or RCS_SERVER_HOST self.rcs_port = rcs_port or RCS_SERVER_PORT setup_logging(self.log_level, LOG_FILE) self.logger = logging.getLogger(__name__) self.server: Optional[AlgorithmServer] = None signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler) def signal_handler(self, signum, frame): # Handle system signals, shutdown program self.logger.info(f"Received signal {signum}, shutting down algorithm system...") self.shutdown() sys.exit(0) def start_server(self): # Start algorithm server try: self.logger.info("Initializing algorithm system server...") # Set RCS connection configuration os.environ['RCS_SERVER_HOST'] = self.rcs_host os.environ['RCS_SERVER_PORT'] = str(self.rcs_port) self.server = AlgorithmServer(self.host, self.port, self.enable_path_monitor, self.monitor_interval) status = self.server.get_server_status() self.logger.info("Algorithm system server status:") self.logger.info(f" - Host: {status['host']}") self.logger.info(f" - Port: {status['port']}") self.logger.info(f" - Path mapping loaded: {status['path_mapping_loaded']}") self.logger.info(f" - AGV count: {status['agv_count']}") self.logger.info(f" - Task allocation algorithm: {status['algorithms']['task_allocation']}") self.logger.info(f" - Path planning algorithm: {status['algorithms']['path_planning']}") self.logger.info("Algorithm system server initialization completed") self.server.start_server() except KeyboardInterrupt: self.logger.info("Received interrupt signal, shutting down algorithm server...") except Exception as e: self.logger.error(f"Algorithm server startup failed: {e}") raise finally: self.shutdown() def shutdown(self): # Shutdown algorithm system components self.logger.info("Shutting down algorithm system...") if self.server: try: self.server.stop_server() except Exception as e: self.logger.error(f"Failed to shutdown server: {e}") self.logger.info("Algorithm system shutdown completed") def main(): # Main function entry point import argparse parser = argparse.ArgumentParser(description="Algorithm System - Provides task allocation and path planning services") parser.add_argument( "--host", default=ALGORITHM_SERVER_HOST, help=f"Algorithm server host address (default: {ALGORITHM_SERVER_HOST})" ) parser.add_argument( "--port", type=int, default=ALGORITHM_SERVER_PORT, help=f"Algorithm server port (default: {ALGORITHM_SERVER_PORT})" ) parser.add_argument( "--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], default="DEBUG", help=f"Log level (default: DEBUG)" ) parser.add_argument( "--task-algorithm", choices=["NEAREST_FIRST", "LOAD_BALANCED", "PRIORITY_FIRST", "MULTI_OBJECTIVE"], default="LOAD_BALANCED", help="Task allocation algorithm (default: LOAD_BALANCED)" ) parser.add_argument( "--path-algorithm", choices=["A_STAR", "DIJKSTRA", "GREEDY"], default="DIJKSTRA", help="Path planning algorithm (default: DIJKSTRA)" ) parser.add_argument( "--enable-path-monitor", action="store_true", default=True, help="Enable path monitoring service (default: True)" ) parser.add_argument( "--disable-path-monitor", action="store_true", help="Disable path monitoring service" ) parser.add_argument( "--monitor-interval", type=float, default=MONITOR_POLLING_INTERVAL, help=f"Path monitoring polling interval in seconds (default: {MONITOR_POLLING_INTERVAL})" ) parser.add_argument( "--rcs-host", default=RCS_SERVER_HOST, help=f"RCS server host address (default: {RCS_SERVER_HOST})" ) parser.add_argument( "--rcs-port", type=int, default=RCS_SERVER_PORT, help=f"RCS server port (default: {RCS_SERVER_PORT})" ) args = parser.parse_args() # Handle path monitoring switch enable_path_monitor = args.enable_path_monitor and not args.disable_path_monitor app = AlgorithmApplication( host=args.host, port=args.port, log_level=args.log_level, enable_path_monitor=enable_path_monitor, monitor_interval=args.monitor_interval, rcs_host=args.rcs_host, rcs_port=args.rcs_port ) try: print("=" * 60) print("CTU Warehouse Management System - Algorithm System") print("=" * 60) print(f"Server Address: http://{args.host}:{args.port}") print(f"Task Allocation Algorithm: {args.task_algorithm}") print(f"Path Planning Algorithm: {args.path_algorithm}") print(f"Log Level: {args.log_level}") print(f"Path Monitoring Service: {'Enabled' if enable_path_monitor else 'Disabled'}") if enable_path_monitor: print(f"Monitor Polling Interval: {args.monitor_interval}s") print(f"RCS Server Address: {args.rcs_host}:{args.rcs_port}") print("=" * 60) print("Available APIs:") print(f" POST http://{args.host}:{args.port}/open/task/send/v1") print(f" - Task allocation API") print(f" - Supports new format: {{\"tasks\": [...], \"agvStatus\": [...]}}") print(f" - Compatible with old format: [task1, task2, ...]") print(f" POST http://{args.host}:{args.port}/open/path/plan/v1") print(f" - Path planning API") print(f" POST http://{args.host}:{args.port}/open/path/batch/plan/v1") print(f" - Batch path planning API") print(f" POST http://{args.host}:{args.port}/open/algorithm/config/v1") print(f" - Algorithm configuration API") if enable_path_monitor: print(f" POST http://{args.host}:{args.port}/monitor/path/start/v1") print(f" - Start path monitoring service") print(f" POST http://{args.host}:{args.port}/monitor/path/stop/v1") print(f" - Stop path monitoring service") print(f" GET http://{args.host}:{args.port}/monitor/path/status/v1") print(f" - Path monitoring service status query") print(f" GET http://{args.host}:{args.port}/health") print(f" - Health check API") print("=" * 60) print("Press Ctrl+C to stop server") print() app.start_server() except KeyboardInterrupt: print("\nReceived interrupt signal, shutting down...") except Exception as e: print(f"Startup failed: {e}") sys.exit(1) if __name__ == "__main__": main()