#!/usr/bin/env python3 import json import os import sys import threading import time import traceback import warnings from pathlib import Path from typing import Optional import psutil from ci.util.running_process_manager import RunningProcessManagerSingleton from ci.util.test_args import parse_args from ci.util.test_commands import run_command from ci.util.test_env import ( dump_thread_stacks, get_process_tree_info, setup_environment, setup_force_exit, setup_watchdog, ) from ci.util.test_runner import runner as test_runner from ci.util.test_types import ( FingerprintResult, TestArgs, calculate_fingerprint, process_test_flags, ) _CANCEL_WATCHDOG = threading.Event() _TIMEOUT_EVERYTHING = 600 if os.environ.get("_GITHUB"): _TIMEOUT_EVERYTHING = 1200 # Extended timeout for GitHub Linux builds print( f"GitHub Windows environment detected - using extended timeout: {_TIMEOUT_EVERYTHING} seconds" ) def make_watch_dog_thread( seconds: int, ) -> threading.Thread: # 60 seconds default timeout def watchdog_timer() -> None: time.sleep(seconds) if _CANCEL_WATCHDOG.is_set(): return warnings.warn(f"Watchdog timer expired after {seconds} seconds.") dump_thread_stacks() print(f"Watchdog timer expired after {seconds} seconds - forcing exit") # Dump outstanding running processes (if any) try: RunningProcessManagerSingleton.dump_active() except Exception as e: print(f"Failed to dump active processes: {e}") traceback.print_stack() time.sleep(0.5) os._exit(2) # Exit with error code 2 to indicate timeout (SIGTERM) thr = threading.Thread(target=watchdog_timer, daemon=True, name="WatchdogTimer") thr.start() return thr def run_qemu_tests(args: TestArgs) -> None: """Run examples in QEMU emulation using Docker.""" from pathlib import Path from ci.dockerfiles.qemu_esp32_docker import DockerQEMURunner from ci.util.running_process import RunningProcess if not args.qemu or len(args.qemu) < 1: print("Error: --qemu requires a platform (e.g., esp32s3)") sys.exit(1) platform = args.qemu[0].lower() supported_platforms = ["esp32dev", "esp32c3", "esp32s3"] if platform not in supported_platforms: print( f"Error: Unsupported QEMU platform: {platform}. Supported platforms: {', '.join(supported_platforms)}" ) sys.exit(1) print(f"Running {platform.upper()} QEMU tests using Docker...") # Determine which examples to test (skip the platform argument) examples_to_test = args.qemu[1:] if len(args.qemu) > 1 else ["BlinkParallel"] if not examples_to_test: # Empty list means test all available examples examples_to_test = ["BlinkParallel", "RMT5WorkerPool"] print(f"Testing examples: {examples_to_test}") # Quick test mode - just validate the setup if os.getenv("FASTLED_QEMU_QUICK_TEST") == "true": print("Quick test mode - validating Docker QEMU setup only") print("QEMU ESP32 Docker option is working correctly!") return # Initialize Docker QEMU runner docker_runner = DockerQEMURunner() # Check if Docker is available if not docker_runner.check_docker_available(): print("ERROR: Docker is not available or not running") print("Please install Docker and ensure it's running") sys.exit(1) success_count = 0 failure_count = 0 # Test each example for example in examples_to_test: print(f"\n--- Testing {example} ---") try: # Build the example for the specified platform print(f"Building {example} for {platform}...") build_proc = RunningProcess( ["uv", "run", "ci/ci-compile.py", platform, "--examples", example], timeout=600, auto_run=True, ) # Stream build output with build_proc.line_iter(timeout=None) as it: for line in it: print(line) build_returncode = build_proc.wait() if build_returncode != 0: print(f"Build failed for {example} with exit code: {build_returncode}") failure_count += 1 continue print(f"Build successful for {example}") # Check if build artifacts exist build_dir = Path(f".build/pio/{platform}") if not build_dir.exists(): print(f"Build directory not found: {build_dir}") failure_count += 1 continue print(f"Build artifacts found in {build_dir}") # Run in QEMU using Docker print(f"Running {example} in Docker QEMU...") # Use the nested PlatformIO build directory path pio_build_dir = build_dir / ".pio" / "build" / platform # Set up interrupt regex pattern interrupt_regex = "(FL_WARN.*test finished)|(Setup complete - starting blink animation)|(guru meditation)|(abort\\(\\))|(LoadProhibited)" # Set up output file for GitHub Actions output_file = "qemu_output.log" # Determine machine type based on platform if platform == "esp32c3": machine_type = "esp32c3" elif platform == "esp32s3": machine_type = "esp32s3" else: machine_type = "esp32" # Run QEMU in Docker qemu_returncode = docker_runner.run( firmware_path=pio_build_dir, timeout=30, flash_size=4, interrupt_regex=interrupt_regex, interactive=False, output_file=output_file, machine=machine_type, ) if qemu_returncode == 0: print(f"SUCCESS: {example} ran successfully in Docker QEMU") success_count += 1 else: print( f"FAILED: {example} failed in Docker QEMU with exit code: {qemu_returncode}" ) failure_count += 1 except Exception as e: print(f"ERROR: {example} failed with exception: {e}") failure_count += 1 # Summary print(f"\n=== QEMU {platform.upper()} Test Summary ===") print(f"Examples tested: {len(examples_to_test)}") print(f"Successful: {success_count}") print(f"Failed: {failure_count}") if failure_count > 0: print("Some tests failed. See output above for details.") sys.exit(1) else: print("All QEMU tests passed!") def main() -> None: try: # Record start time start_time = time.time() # Change to script directory first os.chdir(Path(__file__).parent) # Parse and process arguments args = parse_args() # Default to parallel execution for better performance # Users can disable parallel compilation by setting NO_PARALLEL=1 or using --no-parallel if os.environ.get("NO_PARALLEL", "0") == "1": args.no_parallel = True args = process_test_flags(args) timeout = _TIMEOUT_EVERYTHING # Adjust watchdog timeout based on test configuration # Sequential examples compilation can take up to 30 minutes if args.examples is not None and args.no_parallel: # 35 minutes for sequential examples compilation timeout = 2100 print( f"Adjusted watchdog timeout for sequential examples compilation: {timeout} seconds" ) # Set up watchdog timer watchdog = make_watch_dog_thread(seconds=timeout) # Handle --no-interactive flag if args.no_interactive: os.environ["FASTLED_CI_NO_INTERACTIVE"] = "true" os.environ["GITHUB_ACTIONS"] = ( "true" # This ensures all subprocess also run in non-interactive mode ) # Handle --interactive flag if args.interactive: os.environ.pop("FASTLED_CI_NO_INTERACTIVE", None) os.environ.pop("GITHUB_ACTIONS", None) # Set up remaining environment based on arguments setup_environment(args) # Handle stack trace control enable_stack_trace = not args.no_stack_trace if enable_stack_trace: print("Stack trace dumping enabled for test timeouts") else: print("Stack trace dumping disabled for test timeouts") # Validate conflicting arguments if args.no_interactive and args.interactive: print( "Error: --interactive and --no-interactive cannot be used together", file=sys.stderr, ) sys.exit(1) # Set up fingerprint caching cache_dir = Path(".cache") cache_dir.mkdir(exist_ok=True) fingerprint_file = cache_dir / "fingerprint.json" def write_fingerprint(fingerprint: FingerprintResult) -> None: fingerprint_dict = { "hash": fingerprint.hash, "elapsed_seconds": fingerprint.elapsed_seconds, "status": fingerprint.status, } with open(fingerprint_file, "w") as f: json.dump(fingerprint_dict, f, indent=2) def read_fingerprint() -> FingerprintResult | None: if fingerprint_file.exists(): with open(fingerprint_file, "r") as f: try: data = json.load(f) return FingerprintResult( hash=data.get("hash", ""), elapsed_seconds=data.get("elapsed_seconds"), status=data.get("status"), ) except json.JSONDecodeError: print("Invalid fingerprint file. Recalculating...") return None # Calculate and save fingerprint prev_fingerprint = read_fingerprint() fingerprint_data = calculate_fingerprint() src_code_change = ( True if prev_fingerprint is None else fingerprint_data.hash != prev_fingerprint.hash ) write_fingerprint(fingerprint_data) # Handle QEMU testing if args.qemu is not None: print("=== QEMU Testing ===") run_qemu_tests(args) return # Run tests using the test runner with sequential example compilation # Check if we need to use sequential execution to avoid resource conflicts if not args.unit and not args.examples and not args.py and args.full: # Full test mode - use RunningProcessGroup for dependency-based execution from ci.util.running_process import RunningProcess from ci.util.running_process_group import ( ExecutionMode, ProcessExecutionConfig, RunningProcessGroup, ) from ci.util.test_runner import ( create_examples_test_process, create_python_test_process, ) # Create Python test process (runs first) python_process = create_python_test_process( enable_stack_trace=False, full_tests=True ) python_process.auto_run = False # Create examples compilation process examples_process = create_examples_test_process( args, not args.no_stack_trace ) examples_process.auto_run = False # Configure sequential execution with dependencies config = ProcessExecutionConfig( execution_mode=ExecutionMode.SEQUENTIAL_WITH_DEPENDENCIES, verbose=args.verbose, timeout_seconds=2100, # 35 minutes for sequential examples compilation live_updates=True, # Enable real-time display display_type="auto", # Auto-detect best display format ) # Create process group and set up dependency group = RunningProcessGroup(config=config, name="FullTestSequence") group.add_process(python_process) group.add_dependency( examples_process, python_process ) # examples depends on python try: # Start real-time display for full test mode display_thread = None if not args.verbose and config.live_updates: try: from ci.util.process_status_display import ( display_process_status, ) display_thread = display_process_status( group, display_type=config.display_type, update_interval=config.update_interval, ) except ImportError: pass # Fall back to normal execution timings = group.run() # Stop display thread if it was started if display_thread: time.sleep(0.5) print("Sequential test execution completed successfully") # Print timing summary if timings: print(f"\nExecution Summary:") for timing in timings: print(f" {timing.name}: {timing.duration:.2f}s") except Exception as e: print(f"Sequential test execution failed: {e}") sys.exit(1) else: # Use normal test runner for other cases test_runner(args, src_code_change) # Set up force exit daemon and exit daemon_thread = setup_force_exit() _CANCEL_WATCHDOG.set() # Print total execution time elapsed_time = time.time() - start_time print(f"\nTotal execution time: {elapsed_time:.2f} seconds") sys.exit(0) except KeyboardInterrupt: sys.exit(130) # Standard Unix practice: 128 + SIGINT's signal number (2) if __name__ == "__main__": main()