"""
Main entry point for the AGVis CLI.
"""
import glob
import os
import platform
import pprint
import logging
import tempfile
from ._version import get_versions
from andes.utils.misc import is_interactive
import agvis
from agvis.web import AgvisWeb
agvis_web = AgvisWeb()
app = agvis_web.app
logger = logging.getLogger(__name__)
[docs]def config_logger(stream_level=logging.INFO, *,
stream=True,
file=True,
log_file='agvis.log',
log_path=None,
file_level=logging.DEBUG,
):
"""
Configure an AGVis logger with a `FileHandler` and a `StreamHandler`.
This function is called at the beginning of ``agvis.main.main()``.
Updating ``stream_level`` and ``file_level`` is now supported.
Parameters
----------
stream : bool, optional
Create a `StreamHandler` for `stdout` if ``True``.
If ``False``, the handler will not be created.
file : bool, optionsl
True if logging to ``log_file``.
log_file : str, optional
Logg file name for `FileHandler`, ``'agvis.log'`` by default.
If ``None``, the `FileHandler` will not be created.
log_path : str, optional
Path to store the log file. By default, the path is generated by
get_log_dir() in utils.misc.
stream_level : {10, 20, 30, 40, 50}, optional
`StreamHandler` verbosity level.
file_level : {10, 20, 30, 40, 50}, optional
`FileHandler` verbosity level.
Returns
-------
None
"""
lg = logging.getLogger('agvis')
lg.setLevel(logging.DEBUG)
if log_path is None:
log_path = get_log_dir()
sh_formatter_str = '%(message)s'
if stream_level == 1:
sh_formatter_str = '%(name)s:%(lineno)d - %(levelname)s - %(message)s'
stream_level = 10
sh_formatter = logging.Formatter(sh_formatter_str)
if len(lg.handlers) == 0:
# create a StreamHandler
if stream is True:
sh = logging.StreamHandler()
sh.setFormatter(sh_formatter)
sh.setLevel(stream_level)
lg.addHandler(sh)
# file handler for level DEBUG and up
if file is True and (log_file is not None):
log_full_path = os.path.join(log_path, log_file)
fh_formatter = logging.Formatter('%(process)d: %(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh = logging.FileHandler(log_full_path)
fh.setLevel(file_level)
fh.setFormatter(fh_formatter)
lg.addHandler(fh)
globals()['logger'] = lg
else:
# update the handlers
set_logger_level(logger, logging.StreamHandler, stream_level)
set_logger_level(logger, logging.FileHandler, file_level)
[docs]def set_logger_level(lg, type_to_set, level):
"""
Set logging level for the given type of handler.
"""
for h in lg.handlers:
if isinstance(h, type_to_set):
h.setLevel(level)
[docs]def get_log_dir():
"""
Get the directory for log file.
The default is ``<tempdir>/agvis``, where ``<tempdir>`` is provided by ``tempfile.gettempdir()``.
Returns
-------
str
The path to the temporary logging directory
"""
tempdir = tempfile.gettempdir()
path = tempfile.mkdtemp(prefix='agvis-', dir=tempdir)
return path
[docs]def find_log_path(lg):
"""
Find the file paths of the FileHandlers.
"""
out = []
for h in lg.handlers:
if isinstance(h, logging.FileHandler):
out.append(h.baseFilename)
return out
[docs]def remove_output(recursive=False):
"""
TODO: Remove the outputs generated by AGVis, and ANDES.
Parameters
----------
recursive : bool
Recursively clean all subfolders
Returns
-------
bool
``True`` is the function body executes with success. ``False``
otherwise.
"""
found = False
cwd = os.getcwd()
if recursive:
dirs = [x[0] for x in os.walk(cwd)]
else:
dirs = (cwd,)
for d in dirs:
for file in os.listdir(d):
if file.endswith('_eig.txt') or \
file.endswith('_out.txt') or \
file.endswith('_out.lst') or \
file.endswith('_out.npy') or \
file.endswith('_out.npz') or \
file.endswith('_prof.prof') or \
file.endswith('_prof.txt'):
found = True
try:
os.remove(os.path.join(d, file))
logger.info('"%s" removed.', os.path.join(d, file))
except IOError:
logger.error('Error removing file "%s".',
os.path.join(d, file))
if not found:
logger.info('No output file found in the working directory.')
return True
[docs]def run(filename='', input_path='', verbose=20,
host='localhost', port=8810, dev=False,
socket_path=None, static=None,
**kwargs):
"""
Entry point to run AGVis.
Parameters
----------
filename : str, optional
file name (or pattern)
input_path : str, optional
input search path
verbose : int, 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL)
Verbosity level. If ``config_logger`` is called prior to ``run``,
this option will be ignored.
kwargs
Other supported keyword arguments
cli : bool, optional
If is running from command-line. If True, returns exit code instead of System
Returns
-------
System or exit_code
An instance of system (if `cli == False`) or an exit code otherwise..
"""
if is_interactive() and len(logger.handlers) == 0:
config_logger(verbose, file=False)
# put some args back to `kwargs`
kwargs['input_path'] = input_path
kwargs['verbose'] = verbose
# TODO: visualize a case or given inpu file
cases = _find_cases(filename, input_path) #NOQA
# Run the flask web app
agvis_web.run(host=host, port=port, dev=dev)
return True
def _find_cases(filename, path):
"""
Find valid cases using the provided names and path
Parameters
----------
filename : str
Test case file name
Returns
-------
list
A list of valid cases.
"""
logger.info('Working directory: "%s"', os.getcwd())
if len(filename) == 0:
return None
if isinstance(filename, str):
filename = [filename]
cases = []
for file in filename:
full_paths = os.path.join(path, file)
found = glob.glob(full_paths)
if len(found) == 0:
logger.error('error: file "%s" does not exist.', full_paths)
else:
cases += found
# remove folders and make cases unique
unique_cases = list(set(cases))
valid_cases = []
for case in unique_cases:
if os.path.isfile(case):
valid_cases.append(case)
if len(valid_cases) > 0:
valid_cases = sorted(valid_cases)
logger.debug('Found files: %s', pprint.pformat(valid_cases))
return valid_cases
[docs]def print_license():
"""
Print out AGVis license to stdout.
"""
print(f"""
AGVis version {agvis.__version__}
Copyright (c) 2020-2024 Nick West, Nicholas Parsly, Jinning Wang
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
A copy of the GNU General Public License is included below.
For further information, see <http://www.gnu.org/licenses/>.
""")
return True
[docs]def misc(show_license=False, clean=True, recursive=False,
version=False, **kwargs):
"""
Miscellaneous commands.
"""
if show_license:
print_license()
return
if clean is True:
remove_output(recursive)
return
if demo is True:
demo(**kwargs)
return
if version is True:
versioninfo()
return
logger.info("info: no option specified. Use 'andes misc -h' for help.")
[docs]def selftest(**kwargs):
"""
TODO: Run unit tests.
"""
logger.warning("Tests have not been implemented")
[docs]def demo(**kwargs):
"""
TODO: show some demonstrations from CLI.
"""
logger.warning("Demos have not been implemented")
[docs]def versioninfo():
"""
Print version info for ANDES and dependencies.
"""
import numpy as np
import andes
versions = {'Python': platform.python_version(),
'agvis': get_versions()['version'],
'andes': andes.__version__,
'numpy': np.__version__,
}
maxwidth = max([len(k) for k in versions.keys()])
for key, val in versions.items():
print(f"{key: <{maxwidth}} {val}")