#!/usr/bin/env python
# coding=utf-8
# -------------------------------------------------------------------------------
# ____ _ _____ ____ _
# | _ \ _ _| | |_ _/ ___| _ __ (_) ___ ___
# | |_) | | | | | | | \___ \| '_ \| |/ __/ _ \
# | __/| |_| | |___| | ___) | |_) | | (_| __/
# |_| \__, |_____|_| |____/| .__/|_|\___\___|
# |___/ |_|
#
# Name: run_task.py
# Purpose: Class used for a spice tool using a process call
#
# Author: Nuno Brum (nuno.brum@gmail.com)
#
# Created: 23-12-2016
# Licence: refer to the LICENSE file
# -------------------------------------------------------------------------------
"""
Internal classes not to be used directly by the user
"""
__author__ = "Nuno Canto Brum <nuno.brum@gmail.com>"
__copyright__ = "Copyright 2023, Fribourg Switzerland"
from pathlib import Path
import sys
import threading
import time
import traceback
from time import sleep
from typing import Callable, Union, Any, Tuple, Type
import logging
_logger = logging.getLogger("PyLTSpice.RunTask")
from .process_callback import ProcessCallback
from .simulator import Simulator
END_LINE_TERM = '\n'
if sys.version_info.major >= 3 and sys.version_info.minor >= 6:
clock_function = time.time
else:
clock_function = time.clock
def format_time_difference(time_diff):
"""Formats the time difference in a human readable format, stripping the hours or minutes if they are zero"""
seconds_difference = int(time_diff)
milliseconds = int((time_diff - seconds_difference) * 1000)
hours, remainder = divmod(seconds_difference, 3600)
minutes, seconds = divmod(remainder, 60)
if hours == 0:
if minutes == 0:
return f"{int(seconds):02d}.{milliseconds:04d} secs"
else:
return f"{int(minutes):02d}:{int(seconds):02d}.{milliseconds:04d}"
else:
return f"{int(hours):02d}:{int(minutes):02d}:{int(seconds):02d}.{milliseconds:04d}"
[docs]class RunTask(threading.Thread):
"""This is an internal Class and should not be used directly by the User."""
def __init__(self, simulator: Type[Simulator], runno, netlist_file: Path,
callback: Union[Type[ProcessCallback], Callable[[Path, Path], Any]],
switches, timeout: float = None, verbose=True):
super().__init__(name=f"RunTask#{runno}")
self.start_time = None
self.stop_time = None
self.verbose = verbose
self.switches = switches
self.timeout = timeout # Thanks to Daniel Phili for implementing this
self.simulator = simulator
self.runno = runno
self.netlist_file = netlist_file
self.callback = callback
self.retcode = -1 # Signals an error by default
self.raw_file = None
self.log_file = None
self.callback_return = None
def print_info(self, logger_fun, message):
message = f"RunTask #{self.runno}:{message}"
logger_fun(message)
if self.verbose:
print(f"{time.asctime()} {logger_fun.__name__}: {message}{END_LINE_TERM}")
[docs] def run(self):
# Running the Simulation
self.start_time = clock_function()
self.print_info(_logger.info, ": Starting simulation %d" % self.runno)
# start execution
self.retcode = self.simulator.run(self.netlist_file.absolute().as_posix(), self.switches, self.timeout)
self.stop_time = clock_function()
# print simulation time with format HH:MM:SS.mmmmmm
# Calculate the time difference
sim_time = format_time_difference(self.stop_time - self.start_time)
# Format the time difference
self.log_file = self.netlist_file.with_suffix('.log')
# Cleanup everything
if self.retcode == 0:
# simulation successful
self.print_info(_logger.info, "Simulation Successful. Time elapsed: %s" % sim_time)
self.raw_file = self.netlist_file.with_suffix(self.simulator.raw_extension)
if self.raw_file.exists() and self.log_file.exists():
if self.callback:
self.print_info(_logger.info, "Simulation Finished. Calling...{}(rawfile, logfile)".format(
self.callback.__name__))
try:
return_or_process = self.callback(self.raw_file, self.log_file)
except Exception as err:
error = traceback.format_tb(err.__traceback__)
self.print_info(_logger.error, error)
else:
if isinstance(return_or_process, ProcessCallback):
proc = return_or_process
proc.start()
self.callback_return = proc.queue.get()
proc.join()
else:
self.callback_return = return_or_process
finally:
callback_start_time = self.stop_time
self.stop_time = clock_function()
self.print_info(_logger.info, "Callback Finished. Time elapsed: %s" % format_time_difference(
self.stop_time - callback_start_time))
else:
self.print_info(_logger.info, 'Simulation Finished. No Callback function given')
else:
self.print_info(_logger.error, "Simulation Raw file or Log file were not found")
else:
# simulation failed
self.print_info(_logger.warning, ": Simulation Failed. Time elapsed: %s" % sim_time)
if self.log_file.exists():
self.log_file = self.log_file.replace(self.log_file.with_suffix('.fail'))
[docs] def get_results(self) -> Union[None, Any, Tuple[str, str]]:
"""
Returns the simulation outputs if the simulation and callback function has already finished.
If the simulation is not finished, it simply returns None. If no callback function is defined, then
it returns a tuple with (raw_file, log_file). If a callback function is defined, it returns whatever
the callback function is returning.
"""
if self.is_alive():
return None
if self.retcode == 0: # All finished OK
if self.callback:
return self.callback_return
else:
return self.raw_file, self.log_file
else:
return '', ''
[docs] def wait_results(self) -> Union[Any, Tuple[str, str]]:
"""
Waits for the completion of the task and returns a tuple with the raw and log files.
:returns: Tuple with the path to the raw file and the path to the log file
:rtype: tuple(str, str)
"""
while self.is_alive() or self.retcode == -1:
sleep(0.1)
return self.get_results()