Source code for iceprod.core.exe

"""
The core execution functions for running on a node.

The fundamental design of the core is to run a task composed of trays and
modules.  The general heirarchy looks like::

    task
    |
    |- tray1
       |
       |- module1
       |
       |- module2
    |
    |- tray2
       |
       |- module3
       |
       |- module4

Parameters can be defined at every level, and each level is treated as a
scope (such that inner scopes inherit from outer scopes).  This is
accomplished via an internal evironment for each scope.

Data movement should be defined at the task level.
"""

from contextlib import contextmanager
import copy
from dataclasses import dataclass
from enum import StrEnum
import logging
import os
from pathlib import Path
from typing import Any, Iterator, Optional


from iceprod.core import config
from iceprod.core.defaults import add_default_options
from iceprod.core import util
from iceprod.core import functions
import iceprod.core.parser
from iceprod.core.jsonUtil import json_encode,json_decode


class ConfigError(Exception):
    pass


class ConfigParser:
    """
    Parse things using a config and the tray/task/module environment.

    Note: dataset config must be valid!

    Args:
        dataset: a dataset object with config
        logger: a logger object, for localized logging
    """
    def __init__(self, dataset: config.Dataset, logger: Optional[logging.Logger] = None):
        dataset.validate()
        self.config = dataset.config
        self.logger = logger if logger else logging.getLogger()
        self.parser = iceprod.core.parser.ExpParser()

    def parseValue(self, value: Any, env: dict = {}) -> Any:
        """
        Parse a value from the available env and global config.

        If the value is a string:
        1. Use the :class:`Meta Parser <iceprod.core.parser>` to parse the string.
        2. Expand any env variables in the result.

        If the value is not a string, pass through the value.

        Args:
            value: the value to parse
            env: tray/task/module env

        Returns:
            the parsed value
        """
        if isinstance(value, str):
            self.logger.debug('parse before:%r| env=%r| options=%r', value, env, self.config.get('options'))
            while value != (ret := self.parser.parse(value, self.config, env)):
                value = ret
            if isinstance(value, str):
                value = os.path.expandvars(value)
            self.logger.debug('parse after:%r', value)
        return value

    def parseObject(self, obj: Any, env: dict) -> Any:
        """
        Recursively parse a dict or list.

        Do not modify original object.

        Args:
            obj: object to parse
            env: tray/task/module env

        Returns:
            the parsed object
        """
        if isinstance(obj, str):
            return self.parseValue(obj, env)
        elif isinstance(obj, (list, tuple)):
            return [self.parseObject(v, env) for v in obj]
        elif isinstance(obj, dict):
            ret = copy.copy(obj)  # use copy.copy in case it's a subclass of dict
            for k in obj:
                ret[k] = self.parseObject(obj[k], env)
            return ret
        else:
            return obj


Env = dict[str, dict[str, Any]]


@contextmanager
def scope_env(cfg: ConfigParser, obj: dict, upperenv: Optional[Env] = None, logger: Optional[logging.Logger] = None) -> Iterator[Env]:
    """
    A context manager for parsing scoped config, such as parameters.

    The returned environment is a dictionary composed of several objects:

    * parameters
        Parameters are defined directly as an object, or as a string pointing
        to another object.  They can use the IceProd meta-language to be
        defined in relation to other parameters specified in inherited
        scopes, or as eval or sprinf functions.

    * input_files
        A set of Data objects (urls and local paths), for files to download before
        the task starts.

    * output_files
        A set of Data objects (urls and local paths), for files to upload after the
        task successfully completes.

    `input_files` and `output_files` are global, while `parameters` is inherited
    at each scope level.

    Args:
        cfg: ConfigParser object
        obj: A partial dataset config section to operate on. The local scope.
        upperenv: previous scope's env output
        logger: a logger object, for localized logging
    """
    env: Env = {
        'parameters': {},
        'input_files': [],
        'output_files': [],
        'upper_input_files': [],
        'upper_output_files': [],
        'environment': {
            'OS_ARCH': '$OS_ARCH',
        }
    }
    if upperenv:
        env['parameters'].update(upperenv['parameters'])
        env['upper_input_files'] = upperenv['input_files'] + upperenv['upper_input_files']
        env['upper_output_files'] = upperenv['output_files'] + upperenv['upper_output_files']

    logger = logger if logger else logging.getLogger()

    try:
        # copy parameters
        if 'parameters' in obj:
            # copy new parameters to env first so local referrals work
            env['parameters'].update(obj['parameters'])
            # parse parameter values and update if necessary
            for p in obj['parameters']:
                newval = cfg.parseValue(obj['parameters'][p], env)
                if newval != obj['parameters'][p]:
                    env['parameters'][p] = newval

        if 'data' in obj:
            # download data
            for data in obj['data']:
                d = cfg.parseObject(data, env)
                if d['movement'] in ('input','both'):
                    ret = downloadData(d, cfg=cfg, logger=logger)
                    if ret:
                        env['input_files'].append(ret)
                if d['movement'] in ('output','both'):
                    ret = uploadData(d, cfg=cfg, logger=logger)
                    if ret:
                        env['output_files'].append(ret)

        # add input and output to parseable options
        cfg.config['options']['input'] = ' '.join(d.local for d in (env['input_files'] + env['upper_input_files']))
        cfg.config['options']['output'] = ' '.join(d.local for d in (env['output_files'] + env['upper_output_files']))
        logging.info('input: %r', cfg.config['options']['input'])
        logging.info('output: %r', cfg.config['options']['output'])

    except util.NoncriticalError:
        logger.warning('Noncritical error when setting up environment', exc_info=True)
    except Exception:
        logger.critical('Serious error when setting up environment', exc_info=True)
        raise

    yield env


class Transfer(StrEnum):
    TRUE = 'true'
    MAYBE = 'maybe'
    FALSE = 'false'


@dataclass(frozen=True, slots=True)
class Data:
    """
    IceProd Data instance

    Args:
        url: url location
        local: local filename
        transfer: whether to transfer file (true | maybe | false)
    """
    url: str
    local: str
    transfer: Transfer

    def __str__(self):
        return f"Data(url='{self.url}', local='{self.local}', transfer='{str(self.transfer)}')"


def storage_location(data: dict, parser: ConfigParser) -> str:
    """
    Get data storage location from the config.

    Args:
        data: data config object
        parser: config parser

    Returns:
        storage location
    """
    config = parser.config
    type_ = data['type'].lower()
    if type_ not in ['permanent', 'job_temp', 'dataset_temp', 'site_temp']:
        raise ConfigError('data movement "type" is unknown')
    if 'options' in config and type_ in config['options']:
        return parser.parseValue(config['options'][type_])
    elif type_ == 'permanent':
        if 'options' in config and 'data_url' in config['options']:
            return parser.parseValue(config['options']['data_url'])
        else:
            raise ConfigError('"data_url" not defined in config["options"]')
    else:
        raise ConfigError(f'{type_} not defined in config["options"]')


def do_transfer(data: dict) -> Transfer:
    """
    Test if we should actually transfer the file.

    Args:
        data: data config object
    """
    ret = Transfer.TRUE
    if isinstance(data['transfer'], bool):
        ret = Transfer.TRUE if data['transfer'] else Transfer.FALSE
    elif isinstance(data['transfer'], str):
        t = data['transfer'].lower()
        if t in ('n', 'no', 'not', 'f', 'false'):
            ret = Transfer.FALSE
        elif t in ('y', 'yes', 't', 'true'):
            ret = Transfer.TRUE
        elif t in ('maybe', 'exists'):
            ret = Transfer.MAYBE
        else:
            raise Exception('unknown transfer type')
    elif isinstance(data['transfer'], (int, float)):
        ret = Transfer.FALSE if data['transfer'] == 0 else Transfer.TRUE
    return ret


[docs] def downloadData(data: dict, cfg: ConfigParser, logger=None) -> Optional[Data]: """ Parse download url and local filename. Args: data: data config object cfg: config parser Returns: either None or a Data object """ if not logger: logger = logging remote_base = storage_location(data, cfg) logger.debug('downloadData(): remote_base: %r', remote_base) remote = str(data['remote']) if data['remote'] is not None else '' local = str(data['local']) if data['local'] is not None else '' if not remote and not local: raise ConfigError('need either "remote" or "local" defined for data') if not remote: url = os.path.join(remote_base, local) elif functions.isurl(remote): url = remote else: url = os.path.join(remote_base, remote) transfer = do_transfer(data) if transfer == Transfer.FALSE: logger.info('not transferring file %s', url) return if not local: local = os.path.basename(remote) return Data(url, local, transfer)
[docs] def uploadData(data: dict, cfg: ConfigParser, logger=None) -> Optional[Data]: """ Parse download url and local filename. Args: data: data config object cfg: config parser Returns: either None or a Data object """ if not logger: logger = logging remote_base = storage_location(data, cfg) logger.debug('uploadData(): remote_base: %r', remote_base) remote = str(data['remote']) if data['remote'] is not None else '' local = str(data['local']) if data['local'] is not None else '' if not remote and not local: raise ConfigError('need either "remote" or "local" defined for data') if not remote: url = os.path.join(remote_base, local) elif not functions.isurl(remote): url = os.path.join(remote_base, remote) else: url = remote if not local: local = os.path.basename(remote) transfer = do_transfer(data) if transfer == Transfer.FALSE: logger.info('not transferring file %s', local) return return Data(url, local, transfer)
# Run Functions # class WriteToScript: """ Write a task to a Bash script, to execute manually. Args: task: a task object, with dataset config workdir: a directory to write the task and any related files options: extra dataset config options logger: a logger object, for localized logging """ def __init__(self, task: config.Task, workdir: Path, options: Optional[dict] = None, logger: Optional[logging.Logger] = None): self.task = task self.workdir = workdir self.logger = logger if logger else logging.getLogger() # default config setup self.options = self.task.dataset.config['options'] self._fill_options() if options: self.options.update(options) self.cfgparser = ConfigParser(self.task.dataset, logger=self.logger) # set up script self.infiles: set[Data] = set() self.outfiles: set[Data] = set() def _fill_options(self): self.options['dataset_id'] = self.task.dataset.dataset_id self.options['dataset'] = self.task.dataset.dataset_num self.options['job'] = self.task.job.job_index self.options['jobs_submitted'] = self.task.dataset.jobs_submitted self.options['task_id'] = self.task.task_id self.options['task'] = self.task.name self.options['debug'] = self.task.dataset.debug def _add_input_files(self, files, f=None): if f: for data in files: if data.transfer is Transfer.FALSE: continue if data.url.startswith('gsiftp://'): python_cmd = f'from iceprod.core.gridftp import GridFTP\nGridFTP.get("{data.url}", filename="$PWD/{data.local}")' cmd = [ '/cvmfs/icecube.opensciencegrid.org/iceprod/v2.7.1/env-shell.sh', 'python', '-', "<<____HERE\n" + python_cmd + '\n____HERE\n', ] print(f'# Input: {data}', file=f) print(' '.join(cmd), file=f) print('', file=f) else: self.infiles.add(data) else: self.infiles.update(files) def _add_output_files(self, files, f=None): if f: for data in files: if data.transfer is Transfer.FALSE: continue if data.url.startswith('gsiftp://'): python_cmd = f'from iceprod.core.gridftp import GridFTP\nGridFTP.put("{data.url}", filename="$PWD/{data.local}")' cmd_core = [ '/cvmfs/icecube.opensciencegrid.org/iceprod/v2.7.1/env-shell.sh', 'python', '-', "<<____HERE\n" + python_cmd + '\n____HERE\n', ] if data.transfer is Transfer.MAYBE: cmd = [f'if [ -f {data.local} ]; then\n'] cmd.extend(cmd_core) cmd += ['fi'] else: cmd = cmd_core print(f'# Output: {data}', file=f) print(' '.join(cmd), file=f) print('', file=f) else: self.outfiles.add(data) else: self.outfiles.update(files) async def convert(self, transfer=False): """ Convert to bash script. Args: transfer: embed the file transfer into the script (default False) """ scriptname = self.workdir / 'task_runner.sh' with open(scriptname, 'w') as f: print('#!/bin/sh', file=f) print('set -e', file=f) add_default_options(self.options) print('# Options:', file=f) for field in self.options: print(f'# {field}={self.options[field]}', file=f) print('', file=f) print('# set some env vars for expansion', file=f) print('OS_ARCH=$(/cvmfs/icecube.opensciencegrid.org/py3-v4.3.0/os_arch.sh)', file=f) print('', file=f) with scope_env(self.cfgparser, self.task.dataset.config['steering'], logger=self.logger) as globalenv: task = self.task.get_task_config() if self.task.task_files: task['data'].extend(self.task.task_files) self._add_input_files(globalenv['input_files'], f=(f if transfer else None)) self.logger.debug('converting task %s', self.task.name) with scope_env(self.cfgparser, task, globalenv, logger=self.logger) as taskenv: self._add_input_files(taskenv['input_files'], f=(f if transfer else None)) for i, tray in enumerate(task['trays']): trayname = tray['name'] if tray.get('name', '') else i for iteration in range(tray['iterations']): self.options['iter'] = iteration self.logger.debug('converting tray %r iter %d', trayname, iteration) print(f'# running tray {trayname}, iter {iteration}', file=f) with scope_env(self.cfgparser, tray, taskenv, logger=self.logger) as trayenv: self._add_input_files(trayenv['input_files'], f=(f if transfer else None)) for j, module in enumerate(tray['modules']): modulename = module['name'] if module.get('name', '') else j self.logger.debug('converting module %r', modulename) print(f'# running module {modulename}', file=f) with scope_env(self.cfgparser, module, trayenv, logger=self.logger) as moduleenv: self._add_input_files(moduleenv['input_files'], f=(f if transfer else None)) await self._write_module(module, moduleenv, file=f) self._add_output_files(moduleenv['output_files'], f=(f if transfer else None)) print('', file=f) self._add_output_files(trayenv['output_files'], f=(f if transfer else None)) self._add_output_files(taskenv['output_files'], f=(f if transfer else None)) self._add_output_files(globalenv['output_files'], f=(f if transfer else None)) scriptname.chmod(scriptname.stat().st_mode | 0o700) return scriptname async def _write_module(self, module, env, file): module = module.copy() if module['src']: module_src = self.cfgparser.parseValue(module['src'], env) if functions.isurl(module_src): path = os.path.basename(module_src).split('?', 0)[0].split('#', 0)[0] env['input_files'].add(Data( url=module_src, local=path, transfer=Transfer.TRUE, )) module_src = path self.logger.info('running module %r with src %s', module['name'], module_src) elif module['running_class']: module_src = None module_class = self.cfgparser.parseValue(module['running_class'], env) self.logger.info('running module %r with class %s', module['name'], module_class) else: self.logger.error('module is missing src') raise ConfigError('error running module - need "src"') if module['args']: module['args'] = self.cfgparser.parseObject(module['args'], env) if module['env_shell']: module['env_shell'] = self.cfgparser.parseValue(module['env_shell'], env) if module['configs']: # parse twice to make sure it's parsed, even if it starts as a string module['configs'] = self.cfgparser.parseObject(module['configs'], env) module['configs'] = self.cfgparser.parseObject(module['configs'], env) # set up env_shell env_shell = [] if module['env_shell']: env_shell = module['env_shell'].split() if functions.isurl(env_shell[0]): path = os.path.basename(env_shell[0]).split('?', 0)[0].split('#', 0)[0] env['input_files'].add(Data( url=env_shell[0], local=path, transfer=Transfer.TRUE, )) env_shell[0] = f'./{path}' # set up the args args = module['args'] if module_src: if args is not None and args != '': self.logger.warning('args=%s', args) if args and isinstance(args, str) and args[0] in ('{', '['): args = json_decode(args) if args and isinstance(args, dict) and set(args) == {'args', 'kwargs'}: args = self.cfgparser.parseObject(args, env) elif isinstance(args, str): args = {"args": [self.cfgparser.parseValue(x, env) for x in args.split()], "kwargs": {}} elif isinstance(args, list): args = {"args": [self.cfgparser.parseValue(x, env) for x in args], "kwargs": {}} elif isinstance(args, dict): args = {"args": [], "kwargs": self.cfgparser.parseObject(args, env)} else: args = {"args": [str(args)], "kwargs": {}} # convert to cmdline args def splitter(a,b): ret = ('-%s' if len(str(a)) <= 1 else '--%s')%str(a) if b is None: return ret else: return ret+'='+str(b) args = args['args'] + [splitter(a, args['kwargs'][a]) for a in args['kwargs']] # force args to string def toStr(a): if isinstance(a,(bytes,str)): return a else: return str(a) args = [toStr(a) for a in args] else: args = [] else: # construct a python file to call the class parsed_args = self.cfgparser.parseObject(args, env) pymodule, class_ = module_class.rsplit('.', 1) args = f"""import json from {pymodule} import {class_} args = json.loads('''{json_encode(parsed_args)}''') obj = {class_}() for k,v in args.items(): obj.SetParameter(k, v) obj.Execute({{}})""" # set up the environment cmd = [] if env_shell: cmd.extend(env_shell) # set up configs if module['configs']: for filename in module['configs']: self.logger.info('creating config %r', filename) with open(self.workdir / filename, 'w') as f: f.write(json_encode(module['configs'][filename])) data = Data( url=str(self.workdir / filename), local=filename, transfer=Transfer.TRUE, ) env['input_files'].append(data) self.infiles.add(data) # run the module if (not module_src): cmd.extend(['python', '-', "<<____HERE\n" + args + '\n____HERE\n']) elif module_src[-3:] == '.py': # call as python script cmd.extend(['python', module_src] + args) elif module_src[-3:] == '.sh': # call as shell script cmd.extend(['/bin/sh', module_src] + args) else: # call as regular executable if module_src[0] != '/': module_src = f'./{module_src}' cmd.extend([module_src] + args) if module['env_clear']: # must be on cvmfs-like environ for this to apply envstr = 'env -i PYTHONNOUSERSITE=1 ' for k in ('OPENCL_VENDOR_PATH', 'http_proxy', 'TMP', 'TMPDIR', '_CONDOR_SCRATCH_DIR', 'CUDA_VISIBLE_DEVICES', 'COMPUTE', 'GPU_DEVICE_ORDINAL'): envstr += f'{k}=${k} ' cmd = envstr.split()+cmd self.logger.info('cmd=%r',cmd) print(' '.join(cmd), file=file)