Source code for ams.main

"""
Main entry point for the AMS CLI and scripting interfaces.
"""

import cProfile
import io
import logging
import os
import platform
import pstats
import sys
from functools import partial
from subprocess import call
from time import sleep
from typing import Optional, Union
import textwrap

from ._version import get_versions

from andes.main import _find_cases
from andes.shared import Pool, Process, coloredlogs, unittest, NCPUS_PHYSICAL
from andes.utils.misc import elapsed, is_interactive

import ams
from ams.routines import routine_cli
from ams.system import System
from ams.utils.paths import get_config_path, get_log_dir, tests_root
from ams.shared import copyright_msg

logger = logging.getLogger(__name__)


[docs] def config_logger(stream_level=logging.INFO, *, stream=True, file=True, log_file='ams.log', log_path=None, file_level=logging.DEBUG, ): """ Configure an AMS logger with a `FileHandler` and a `StreamHandler`. This function is called at the beginning of ``ams.main.main()``. Updating ``stream_level`` and ``file_level`` is now supported. Parameters ---------- stream : bool, optional Create a `StreamHandler` for `stdout` if ``True``. If ``False``, the handler will not be created. file : bool, optionsl True if logging to ``log_file``. log_file : str, optional Logg file name for `FileHandler`, ``'ams.log'`` by default. If ``None``, the `FileHandler` will not be created. log_path : str, optional Path to store the log file. By default, the path is generated by get_log_dir() in utils.misc. stream_level : {10, 20, 30, 40, 50}, optional `StreamHandler` verbosity level. file_level : {10, 20, 30, 40, 50}, optional `FileHandler` verbosity level. Returns ------- None Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ lg = logging.getLogger('ams') lg.setLevel(logging.DEBUG) if log_path is None: log_path = get_log_dir() sh_formatter_str = '%(message)s' if stream_level == 1: sh_formatter_str = '%(name)s:%(lineno)d - %(levelname)s - %(message)s' stream_level = 10 sh_formatter = logging.Formatter(sh_formatter_str) if len(lg.handlers) == 0: # create a StreamHandler if stream is True: sh = logging.StreamHandler() sh.setFormatter(sh_formatter) sh.setLevel(stream_level) lg.addHandler(sh) # file handler for level DEBUG and up if file is True and (log_file is not None): log_full_path = os.path.join(log_path, log_file) fh_formatter = logging.Formatter('%(process)d: %(asctime)s - %(name)s - %(levelname)s - %(message)s') fh = logging.FileHandler(log_full_path) fh.setLevel(file_level) fh.setFormatter(fh_formatter) lg.addHandler(fh) globals()['logger'] = lg else: # update the handlers set_logger_level(logger, logging.StreamHandler, stream_level) set_logger_level(logger, logging.FileHandler, file_level) if not is_interactive(): coloredlogs.install(logger=lg, level=stream_level, fmt=sh_formatter_str)
[docs] def load(case, setup=True, use_input_path=True, **kwargs): """ Load a case and set up a system without running routine. Return a system. Takes other kwargs recognizable by ``System``, such as ``addfile``, ``input_path``, and ``no_putput``. Parameters ---------- case: str Path to the test case setup : bool, optional Call `System.setup` after loading use_input_path : bool, optional True to use the ``input_path`` argument to behave the same as ``ams.main.run``. Warnings -------- If one need to add devices in addition to these from the case file, do ``setup=False`` and call ``System.add()`` to add devices. When done, manually invoke ``setup()`` to set up the system. Notes ----- Revised from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ if use_input_path: input_path = kwargs.get('input_path', '') case = _find_cases(case, input_path) if len(case) > 1: logger.error("`ams.load` does not support mulitple cases.") return None elif len(case) == 0: logger.error("No valid case found.") return None case = case[0] system = System(case, **kwargs) if not ams.io.parse(system): return None if setup: system.setup() return system
[docs] def run_case(case, *, routine='pflow', profile=False, convert='', convert_all='', add_book=None, **kwargs): """ Run single simulation case for the given full path. Use ``run`` instead of ``run_case`` whenever possible. Argument ``input_path`` will not be prepended to ``case``. Arguments recognizable by ``load`` can be passed to ``run_case``. Parameters ---------- case : str Full path to the test case routine : str, ('pflow', 'tds', 'eig') Computation routine to run profile : bool, optional True to enable profiler convert : str, optional Format name for case file conversion. convert_all : str, optional Format name for case file conversion, output sheets for all available devices. add_book : str, optional Name of the device to be added to an excel case as a new sheet. Notes ----- Revised from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ pr = cProfile.Profile() # enable profiler if requested if profile is True: pr.enable() system = load(case, use_input_path=False, **kwargs) if system is None: return None skip_empty = True overwrite = None # convert to xlsx and process `add-book` option if add_book is not None: convert = 'xlsx' overwrite = True if convert_all != '': convert = 'xlsx' skip_empty = False # convert to the requested format if convert != '': ams.io.dump(system, convert, overwrite=overwrite, skip_empty=skip_empty, add_book=add_book) return system # run the requested routine if routine is not None: if isinstance(routine, str): routine = [routine] if system.is_setup: for r in routine: system.__dict__[routine_cli[r.lower()]].run(**kwargs) else: logger.error("System is not set up. Routines cannot continue.") # Disable profiler and output results if profile: pr.disable() if system.files.no_output: nlines = 40 s = io.StringIO() ps = pstats.Stats(pr, stream=sys.stdout).sort_stats('cumtime') ps.print_stats(nlines) logger.info(s.getvalue()) s.close() else: nlines = 999 with open(system.files.prof, 'w') as s: ps = pstats.Stats(pr, stream=s).sort_stats('cumtime') ps.print_stats(nlines) ps.dump_stats(system.files.prof_raw) logger.info('cProfile text data written to "%s".', system.files.prof) logger.info('cProfile raw data written to "%s". View with tool `snakeviz`.', system.files.prof_raw) return system
def _run_mp_proc(cases, ncpu=NCPUS_PHYSICAL, **kwargs): """ Run multiprocessing with `Process`. Return values from `run_case` are not preserved. Always return `True` when done. Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ # start processes jobs = [] for idx, file in enumerate(cases): job = Process(name=f'Process {idx:d}', target=run_case, args=(file,), kwargs=kwargs) jobs.append(job) job.start() start_msg = f'Process {idx:d} for "{file:s}" started.' print(start_msg) logger.debug(start_msg) if (idx % ncpu == ncpu - 1) or (idx == len(cases) - 1): sleep(0.1) for job in jobs: job.join() jobs = [] return True def _run_mp_pool(cases, ncpu=NCPUS_PHYSICAL, verbose=logging.INFO, **kwargs): """ Run multiprocessing jobs using Pool. This function returns all System instances in a list, but requires longer computation time. Parameters ---------- ncpu : int, optional = os.cpu_cout() Number of cpu cores to use in parallel mp_verbose : 10 - 50 Verbosity level during multiprocessing verbose : 10, 20, 30, 40, 50 Verbosity level outside multiprocessing Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ pool = Pool(ncpu) print("Cases are processed in the following order:") print('\n'.join([f'"{name}"' for name in cases])) ret = pool.map(partial(run_case, verbose=verbose, remove_pycapsule=True, autogen_stale=False, **kwargs), cases) # FIXME: does following code work in AMS? # # fix address for in-place arrays # for ss in ret: # fix_view_arrays(ss) return ret
[docs] def run(filename, input_path='', verbose=20, mp_verbose=30, ncpu=NCPUS_PHYSICAL, pool=False, cli=False, shell=False, **kwargs): """ Entry point to run AMS routines. Parameters ---------- filename : str file name (or pattern) input_path : str, optional input search path verbose : int, 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL) Verbosity level. If ``config_logger`` is called prior to ``run``, this option will be ignored. mp_verbose : int Verbosity level for multiprocessing tasks ncpu : int, optional Number of cpu cores to use in parallel pool: bool, optional Use Pool for multiprocessing to return a list of created Systems. kwargs Other supported keyword arguments cli : bool, optional If is running from command-line. If True, returns exit code instead of System shell : bool, optional If True, enter IPython shell after routine. Returns ------- System or exit_code An instance of system (if `cli == False`) or an exit code otherwise.. Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ if is_interactive() and len(logger.handlers) == 0: config_logger(verbose, file=False) # put some args back to `kwargs` kwargs['input_path'] = input_path kwargs['verbose'] = verbose cases = _find_cases(filename, input_path) system = None ex_code = 0 if len(filename) > 0 and len(cases) == 0: ex_code = 1 # file specified but not found t0, _ = elapsed() if len(cases) == 1: system = run_case(cases[0], **kwargs) elif len(cases) > 1: # FIXME: after standardize code generation, enable following code # # import `pycode` to local namespace to avoid a picking issue # import_pycode() # suppress logging output during multiprocessing logger.info('-> Processing %s jobs on %s CPUs.', len(cases), ncpu) set_logger_level(logger, logging.StreamHandler, mp_verbose) set_logger_level(logger, logging.FileHandler, logging.DEBUG) if pool is True: system = _run_mp_pool(cases, ncpu=ncpu, mp_verbose=mp_verbose, **kwargs) else: system = _run_mp_proc(cases, ncpu=ncpu, mp_verbose=mp_verbose, **kwargs) # restore command line output when all jobs are done set_logger_level(logger, logging.StreamHandler, verbose) log_files = find_log_path(logger) if len(log_files) > 0: log_paths = '\n'.join(log_files) print(f'Log saved to "{log_paths}".') t0, s0 = elapsed(t0) if len(cases) == 1: if system is not None: ex_code += system.exit_code else: ex_code += 1 elif len(cases) > 1: if isinstance(system, list): for s in system: ex_code += s.exit_code if len(cases) == 1: if ex_code == 0: print(f'-> Single process finished in {s0}.') else: print(f'-> Single process exit with an error in {s0}.') elif len(cases) > 1: if ex_code == 0: print(f'-> Multiprocessing finished in {s0}.') else: print(f'-> Multiprocessing exit with an error in {s0}.') # IPython interactive shell if shell is True: try: from IPython import embed # load plotter before entering IPython if system is None: logger.warning("IPython: The System object has not been created.") elif isinstance(system, System): logger.info("IPython: Access System object in variable `system`.") system.TDS.load_plotter() elif isinstance(system, list): logger.warning("IPython: System objects stored in list `system`.\n" "Call `TDS.load_plotter()` on each for plotter.") embed() except ImportError: logger.warning("IPython import error. Installed?") if cli is True: return ex_code return system
[docs] def set_logger_level(lg, type_to_set, level): """ Set logging level for the given type of handler. """ for h in lg.handlers: if isinstance(h, type_to_set): h.setLevel(level)
[docs] def find_log_path(lg): """ Find the file paths of the FileHandlers. """ out = [] for h in lg.handlers: if isinstance(h, logging.FileHandler): out.append(h.baseFilename) return out
[docs] def misc(edit_config='', save_config='', show_license=False, clean=True, recursive=False, overwrite=None, version=False, **kwargs): """ Miscellaneous commands. Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ if edit_conf(edit_config): return if show_license: print_license() return if save_config != '': save_conf(save_config, overwrite=overwrite, **kwargs) return if clean is True: remove_output(recursive) return if demo is True: demo(**kwargs) return if version is True: versioninfo() return logger.info("info: no option specified. Use 'ams misc -h' for help.")
[docs] def doc(attribute=None, list_supported=False, config=False, **kwargs): """ Quick documentation from command-line. Notes ----- Revised from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ system = System() if attribute is not None: if attribute in system.__dict__ and hasattr(system.__dict__[attribute], 'doc'): logger.info(system.__dict__[attribute].doc()) else: logger.error('Model <%s> does not exist.', attribute) elif list_supported is True: logger.info(system.supported_routines()) else: logger.info('info: no option specified. Use \'ams doc -h\' for help.')
[docs] def demo(**kwargs): """ TODO: show some demonstrations from CLI. """ raise NotImplementedError("Demos have not been implemented")
[docs] def versioninfo(): """ Print version info for AMS and dependencies. Notes ----- Revised from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ import numpy as np import cvxpy import andes from ams.shared import installed_solvers versions = {'Python': platform.python_version(), 'ams': get_versions()['version'], 'andes': andes.__version__, 'numpy': np.__version__, 'cvxpy': cvxpy.__version__, 'solvers': ', '.join(installed_solvers), } maxwidth = max([len(k) for k in versions.keys()]) for key, val in versions.items(): if key == 'solvers': val = textwrap.fill(val, width=30) # Wrap solvers to a new line if too long val = val.replace('\n', '\n' + ' ' * (maxwidth + 2)) print(f"{key: <{maxwidth}} {val}")
[docs] def edit_conf(edit_config: Optional[Union[str, bool]] = ''): """ Edit the AMS config file which occurs first in the search path. Parameters ---------- edit_config : bool If ``True``, try to open up an editor and edit the config file. Otherwise returns. Returns ------- bool ``True`` is a config file is found and an editor is opened. ``False`` if ``edit_config`` is False. Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ ret = False # no `edit-config` supplied if edit_config == '': return ret conf_path = get_config_path() if conf_path is None: logger.info('Config file does not exist. Automatically saving.') system = System() conf_path = system.save_config() logger.info('Editing config file "%s"', conf_path) editor = '' if edit_config is not None: # use `edit_config` as default editor editor = edit_config else: # use the following default editors if platform.system() == 'Linux': editor = os.environ.get('EDITOR', 'vim') elif platform.system() == 'Darwin': editor = os.environ.get('EDITOR', 'vim') elif platform.system() == 'Windows': editor = 'notepad.exe' editor_cmd = editor.split() editor_cmd.append(conf_path) call(editor_cmd) ret = True return ret
[docs] def save_conf(config_path=None, overwrite=None, **kwargs): """ Save the AMS config to a file at the path specified by ``save_config``. The save action will not run if ``save_config = ''``. Parameters ---------- config_path : None or str, optional, ('' by default) Path to the file to save the config file. If the path is an emtpy string, the save action will not run. Save to `~/.ams/ams.conf` if ``None``. Returns ------- bool ``True`` is the save action is run. ``False`` otherwise. Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ ret = False # no ``--save-config `` if config_path == '': return ret if config_path is not None and os.path.isdir(config_path): config_path = os.path.join(config_path, 'ams.rc') ps = System(**kwargs) ps.save_config(config_path, overwrite=overwrite) ret = True return ret
# TODO: change to AMS output types
[docs] def remove_output(recursive=False): """ Remove the outputs generated by AMS, including power flow reports ``_out.txt``, time-domain list ``_out.lst`` and data ``_out.dat``, eigenvalue analysis report ``_eig.txt``. Parameters ---------- recursive : bool Recursively clean all subfolders Returns ------- bool ``True`` is the function body executes with success. ``False`` otherwise. Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ found = False cwd = os.getcwd() if recursive: dirs = [x[0] for x in os.walk(cwd)] else: dirs = (cwd,) for d in dirs: for file in os.listdir(d): if file.endswith('_eig.txt') or \ file.endswith('_out.txt'): found = True try: os.remove(os.path.join(d, file)) logger.info('"%s" removed.', os.path.join(d, file)) except IOError: logger.error('Error removing file "%s".', os.path.join(d, file)) if not found: logger.info('No output file found in the working directory.') return True
[docs] def selftest(quick=False, extra=False, **kwargs): """ Run unit tests. Notes ----- Copied from the ANDES project (https://github.com/CURENT/andes). Original author: Hantao Cui License: GPL3 """ # map verbosity level from logging to unittest vmap = {1: 3, 10: 3, 20: 2, 30: 1, 40: 1, 50: 1} verbose = vmap[kwargs.get('verbose', 20)] # skip if quick quick_skips = ('test_1_docs', 'test_codegen_inc') # extra test naming convention extra_test = 'extra_test' try: logger.handlers[0].setLevel(logging.WARNING) sys.stdout = open(os.devnull, 'w') # suppress print statements except IndexError: # logger not set up pass # discover test cases test_directory = tests_root() suite = unittest.TestLoader().discover(test_directory) # remove codegen for quick mode for test_group in suite._tests: for test_class in test_group._tests: tests_keep = list() if not hasattr(test_class, '_tests'): continue for t in test_class._tests: # skip the extra tests if `extra` is not True if (extra is not True) and (extra_test in t._testMethodName): continue # skip the ones for `quick` if quick is True and (t._testMethodName in quick_skips): continue tests_keep.append(t) test_class._tests = tests_keep unittest.TextTestRunner(verbosity=verbose).run(suite) sys.stdout = sys.__stdout__