Source code for agvis.main

"""
Main entry point for the AGVis CLI.
"""

import glob
import os
import platform
import pprint
import logging
import tempfile

from ._version import get_versions

from andes.utils.misc import is_interactive

import agvis
from agvis.web import AgvisWeb

agvis_web = AgvisWeb()

app = agvis_web.app

logger = logging.getLogger(__name__)

[docs]def config_logger(stream_level=logging.INFO, *, stream=True, file=True, log_file='agvis.log', log_path=None, file_level=logging.DEBUG, ): """ Configure an AGVis logger with a `FileHandler` and a `StreamHandler`. This function is called at the beginning of ``agvis.main.main()``. Updating ``stream_level`` and ``file_level`` is now supported. Parameters ---------- stream : bool, optional Create a `StreamHandler` for `stdout` if ``True``. If ``False``, the handler will not be created. file : bool, optionsl True if logging to ``log_file``. log_file : str, optional Logg file name for `FileHandler`, ``'agvis.log'`` by default. If ``None``, the `FileHandler` will not be created. log_path : str, optional Path to store the log file. By default, the path is generated by get_log_dir() in utils.misc. stream_level : {10, 20, 30, 40, 50}, optional `StreamHandler` verbosity level. file_level : {10, 20, 30, 40, 50}, optional `FileHandler` verbosity level. Returns ------- None """ lg = logging.getLogger('agvis') lg.setLevel(logging.DEBUG) if log_path is None: log_path = get_log_dir() sh_formatter_str = '%(message)s' if stream_level == 1: sh_formatter_str = '%(name)s:%(lineno)d - %(levelname)s - %(message)s' stream_level = 10 sh_formatter = logging.Formatter(sh_formatter_str) if len(lg.handlers) == 0: # create a StreamHandler if stream is True: sh = logging.StreamHandler() sh.setFormatter(sh_formatter) sh.setLevel(stream_level) lg.addHandler(sh) # file handler for level DEBUG and up if file is True and (log_file is not None): log_full_path = os.path.join(log_path, log_file) fh_formatter = logging.Formatter('%(process)d: %(asctime)s - %(name)s - %(levelname)s - %(message)s') fh = logging.FileHandler(log_full_path) fh.setLevel(file_level) fh.setFormatter(fh_formatter) lg.addHandler(fh) globals()['logger'] = lg else: # update the handlers set_logger_level(logger, logging.StreamHandler, stream_level) set_logger_level(logger, logging.FileHandler, file_level)
[docs]def set_logger_level(lg, type_to_set, level): """ Set logging level for the given type of handler. """ for h in lg.handlers: if isinstance(h, type_to_set): h.setLevel(level)
[docs]def get_log_dir(): """ Get the directory for log file. The default is ``<tempdir>/agvis``, where ``<tempdir>`` is provided by ``tempfile.gettempdir()``. Returns ------- str The path to the temporary logging directory """ tempdir = tempfile.gettempdir() path = tempfile.mkdtemp(prefix='agvis-', dir=tempdir) return path
[docs]def find_log_path(lg): """ Find the file paths of the FileHandlers. """ out = [] for h in lg.handlers: if isinstance(h, logging.FileHandler): out.append(h.baseFilename) return out
[docs]def remove_output(recursive=False): """ TODO: Remove the outputs generated by AGVis, and ANDES. Parameters ---------- recursive : bool Recursively clean all subfolders Returns ------- bool ``True`` is the function body executes with success. ``False`` otherwise. """ found = False cwd = os.getcwd() if recursive: dirs = [x[0] for x in os.walk(cwd)] else: dirs = (cwd,) for d in dirs: for file in os.listdir(d): if file.endswith('_eig.txt') or \ file.endswith('_out.txt') or \ file.endswith('_out.lst') or \ file.endswith('_out.npy') or \ file.endswith('_out.npz') or \ file.endswith('_prof.prof') or \ file.endswith('_prof.txt'): found = True try: os.remove(os.path.join(d, file)) logger.info('"%s" removed.', os.path.join(d, file)) except IOError: logger.error('Error removing file "%s".', os.path.join(d, file)) if not found: logger.info('No output file found in the working directory.') return True
[docs]def run(filename='', input_path='', verbose=20, host='localhost', port=8810, dev=False, socket_path=None, static=None, **kwargs): """ Entry point to run AGVis. Parameters ---------- filename : str, optional file name (or pattern) input_path : str, optional input search path verbose : int, 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL) Verbosity level. If ``config_logger`` is called prior to ``run``, this option will be ignored. kwargs Other supported keyword arguments cli : bool, optional If is running from command-line. If True, returns exit code instead of System Returns ------- System or exit_code An instance of system (if `cli == False`) or an exit code otherwise.. """ if is_interactive() and len(logger.handlers) == 0: config_logger(verbose, file=False) # put some args back to `kwargs` kwargs['input_path'] = input_path kwargs['verbose'] = verbose # TODO: visualize a case or given inpu file cases = _find_cases(filename, input_path) #NOQA # Run the flask web app agvis_web.run(host=host, port=port, dev=dev) return True
def _find_cases(filename, path): """ Find valid cases using the provided names and path Parameters ---------- filename : str Test case file name Returns ------- list A list of valid cases. """ logger.info('Working directory: "%s"', os.getcwd()) if len(filename) == 0: return None if isinstance(filename, str): filename = [filename] cases = [] for file in filename: full_paths = os.path.join(path, file) found = glob.glob(full_paths) if len(found) == 0: logger.error('error: file "%s" does not exist.', full_paths) else: cases += found # remove folders and make cases unique unique_cases = list(set(cases)) valid_cases = [] for case in unique_cases: if os.path.isfile(case): valid_cases.append(case) if len(valid_cases) > 0: valid_cases = sorted(valid_cases) logger.debug('Found files: %s', pprint.pformat(valid_cases)) return valid_cases
[docs]def misc(show_license=False, clean=True, recursive=False, version=False, **kwargs): """ Miscellaneous commands. """ if show_license: print_license() return if clean is True: remove_output(recursive) return if demo is True: demo(**kwargs) return if version is True: versioninfo() return logger.info("info: no option specified. Use 'andes misc -h' for help.")
[docs]def selftest(**kwargs): """ TODO: Run unit tests. """ logger.warning("Tests have not been implemented")
[docs]def demo(**kwargs): """ TODO: show some demonstrations from CLI. """ logger.warning("Demos have not been implemented")
[docs]def versioninfo(): """ Print version info for ANDES and dependencies. """ import numpy as np import andes versions = {'Python': platform.python_version(), 'agvis': get_versions()['version'], 'andes': andes.__version__, 'numpy': np.__version__, } maxwidth = max([len(k) for k in versions.keys()]) for key, val in versions.items(): print(f"{key: <{maxwidth}} {val}")