From 5b0ca8d7faefa7a6fb7d8ab1d9bcdd7c22e67ef3 Mon Sep 17 00:00:00 2001 From: "Brandon N. Benton" Date: Thu, 19 Sep 2024 14:31:23 -0600 Subject: [PATCH] better node distribution for solar module --- sup3r/solar/solar.py | 86 +++++++++++++++++++++++++------------ sup3r/solar/solar_cli.py | 92 +++++++++++++++++++++++++++++----------- 2 files changed, 126 insertions(+), 52 deletions(-) diff --git a/sup3r/solar/solar.py b/sup3r/solar/solar.py index 57b9ae052b..742598764b 100644 --- a/sup3r/solar/solar.py +++ b/sup3r/solar/solar.py @@ -35,7 +35,6 @@ def __init__( nsrdb_fp, t_slice=slice(None), tz=-6, - time_shift=-12, agg_factor=1, nn_threshold=0.5, cloud_threshold=0.99, @@ -67,12 +66,6 @@ def __init__( the GAN is trained on data in local time and therefore the output in sup3r_fps should be treated as local time. For example, -6 is CST which is default for CONUS training data. - time_shift : int | None - Number of hours to shift time axis. This can be used, for - example, to shift the time index for daily data so that the time - stamp for a given day starts at hour zero instead of at - noon, as is the case for most GCM data. In this case ``time_shift`` - would be -12 agg_factor : int Spatial aggregation factor for nsrdb-to-GAN-meta e.g. the number of NSRDB spatial pixels to average for a single sup3r GAN output site. @@ -91,7 +84,6 @@ def __init__( self.nn_threshold = nn_threshold self.cloud_threshold = cloud_threshold self.tz = tz - self.time_shift = time_shift self._nsrdb_fp = nsrdb_fp self._sup3r_fps = sup3r_fps if isinstance(self._sup3r_fps, str): @@ -203,8 +195,7 @@ def time_index(self): ------- pd.DatetimeIndex """ - ti = self.gan_data.time_index[self.t_slice] - return ti.shift(self.time_shift, freq='h') + return self.gan_data.time_index[self.t_slice] @property def out_of_bounds(self): @@ -521,7 +512,7 @@ def get_node_cmd(cls, config): import_str += 'from rex import init_logger;\n' import_str += f'from sup3r.solar import {cls.__name__}' - fun_str = get_fun_call_str(cls.run_temporal_chunk, config) + fun_str = get_fun_call_str(cls.run_temporal_chunks, config) log_file = config.get('log_file', None) log_level = config.get('log_level', 'INFO') @@ -590,22 +581,21 @@ def write(self, fp_out, features=('ghi', 'dni', 'dhi')): logger.info(f'Finished writing file: {fp_out}') @classmethod - def run_temporal_chunk( + def run_temporal_chunks( cls, fp_pattern, nsrdb_fp, fp_out_suffix='irradiance', tz=-6, - time_shift=-12, agg_factor=1, nn_threshold=0.5, cloud_threshold=0.99, features=('ghi', 'dni', 'dhi'), - temporal_id=None, + temporal_ids=None, ): - """Run the solar module on all spatial chunks for a single temporal - chunk corresponding to the fp_pattern. This typically gets run from the - CLI. + """Run the solar module on all spatial chunks for each temporal + chunk corresponding to the fp_pattern and the given list of + temporal_ids. This typically gets run from the CLI. Parameters ---------- @@ -623,12 +613,6 @@ def run_temporal_chunk( the GAN is trained on data in local time and therefore the output in sup3r_fps should be treated as local time. For example, -6 is CST which is default for CONUS training data. - time_shift : int | None - Number of hours to shift time axis. This can be used, for - example, to shift the time index for daily data so that the time - stamp for a given day starts at hour zero instead of at - noon, as is the case for most GCM data. In this case ``time_shift`` - would be -12 agg_factor : int Spatial aggregation factor for nsrdb-to-GAN-meta e.g. the number of NSRDB spatial pixels to average for a single sup3r GAN output site. @@ -643,10 +627,57 @@ def run_temporal_chunk( features : list | tuple List of features to write to disk. These have to be attributes of the Solar class (ghi, dni, dhi). - temporal_id : str | None - One of the unique zero-padded temporal id's from the file chunks - that match fp_pattern. This input typically gets set from the CLI. - If None, this will run all temporal indices. + temporal_ids : list | None + Lise of zero-padded temporal ids from the file chunks that match + fp_pattern. This input typically gets set from the CLI. If None, + this will run all temporal indices. + """ + if temporal_ids is None: + cls._run_temporal_chunk( + fp_pattern=fp_pattern, + nsrdb_fp=nsrdb_fp, + fp_out_suffix=fp_out_suffix, + tz=tz, + agg_factor=agg_factor, + nn_threshold=nn_threshold, + cloud_threshold=cloud_threshold, + features=features, + temporal_id=temporal_ids, + ) + else: + for temporal_id in temporal_ids: + cls._run_temporal_chunk( + fp_pattern=fp_pattern, + nsrdb_fp=nsrdb_fp, + fp_out_suffix=fp_out_suffix, + tz=tz, + agg_factor=agg_factor, + nn_threshold=nn_threshold, + cloud_threshold=cloud_threshold, + features=features, + temporal_id=temporal_id, + ) + + @classmethod + def _run_temporal_chunk( + cls, + fp_pattern, + nsrdb_fp, + fp_out_suffix='irradiance', + tz=-6, + agg_factor=1, + nn_threshold=0.5, + cloud_threshold=0.99, + features=('ghi', 'dni', 'dhi'), + temporal_id=None, + ): + """Run the solar module on all spatial chunks for a single temporal + chunk corresponding to the fp_pattern. This typically gets run from the + CLI. + + See Also + -------- + :meth:`run_temporal_chunks` """ temp = cls.get_sup3r_fps(fp_pattern, ignore=f'_{fp_out_suffix}.h5') @@ -685,7 +716,6 @@ def run_temporal_chunk( kwargs = { 't_slice': t_slice, 'tz': tz, - 'time_shift': time_shift, 'agg_factor': agg_factor, 'nn_threshold': nn_threshold, 'cloud_threshold': cloud_threshold, diff --git a/sup3r/solar/solar_cli.py b/sup3r/solar/solar_cli.py index 797445078b..7ca0aa3e4d 100644 --- a/sup3r/solar/solar_cli.py +++ b/sup3r/solar/solar_cli.py @@ -3,11 +3,13 @@ TODO: This should be modified to enable distribution of file groups across nodes instead of requesting a node for a single file """ + import copy import logging import os import click +import numpy as np from sup3r import __version__ from sup3r.solar import Solar @@ -19,8 +21,12 @@ @click.group() @click.version_option(version=__version__) -@click.option('-v', '--verbose', is_flag=True, - help='Flag to turn on debug logging. Default is not verbose.') +@click.option( + '-v', + '--verbose', + is_flag=True, + help='Flag to turn on debug logging. Default is not verbose.', +) @click.pass_context def main(ctx, verbose): """Sup3r Solar Command Line Interface""" @@ -29,37 +35,59 @@ def main(ctx, verbose): @main.command() -@click.option('--config_file', '-c', required=True, - type=click.Path(exists=True), - help='sup3r solar configuration .json file.') -@click.option('-v', '--verbose', is_flag=True, - help='Flag to turn on debug logging. Default is not verbose.') +@click.option( + '--config_file', + '-c', + required=True, + type=click.Path(exists=True), + help='sup3r solar configuration .json file.', +) +@click.option( + '-v', + '--verbose', + is_flag=True, + help='Flag to turn on debug logging. Default is not verbose.', +) @click.pass_context def from_config(ctx, config_file, verbose=False, pipeline_step=None): """Run sup3r solar from a config file.""" - config = BaseCLI.from_config_preflight(ModuleName.SOLAR, ctx, config_file, - verbose) + config = BaseCLI.from_config_preflight( + ModuleName.SOLAR, ctx, config_file, verbose + ) exec_kwargs = config.get('execution_control', {}) hardware_option = exec_kwargs.pop('option', 'local') log_pattern = config.get('log_pattern', None) fp_pattern = config['fp_pattern'] basename = config['job_name'] fp_sets, _, temporal_ids, _, _ = Solar.get_sup3r_fps(fp_pattern) - logger.info('Solar module found {} sets of chunked source files to run ' - 'on. Submitting to {} nodes based on the number of temporal ' - 'chunks'.format(len(fp_sets), len(set(temporal_ids)))) - - for i_node, temporal_id in enumerate(sorted(set(temporal_ids))): + temporal_ids = sorted(set(temporal_ids)) + max_nodes = config.get('max_nodes', len(temporal_ids)) + max_nodes = min((max_nodes, len(temporal_ids))) + logger.info( + 'Solar module found {} sets of chunked source files to run ' + 'on. Submitting to {} nodes based on the number of temporal ' + 'chunks {} and the requested number of nodes {}'.format( + len(fp_sets), + max_nodes, + len(temporal_ids), + config.get('max_nodes', None), + ) + ) + + temporal_id_chunks = np.array_split(temporal_ids, max_nodes) + for i_node, temporal_ids in enumerate(temporal_id_chunks): node_config = copy.deepcopy(config) node_config['log_file'] = ( - log_pattern if log_pattern is None - else os.path.normpath(log_pattern.format(node_index=i_node))) - name = ('{}_{}'.format(basename, str(i_node).zfill(6))) + log_pattern + if log_pattern is None + else os.path.normpath(log_pattern.format(node_index=i_node)) + ) + name = '{}_{}'.format(basename, str(i_node).zfill(6)) ctx.obj['NAME'] = name node_config['job_name'] = name - node_config["pipeline_step"] = pipeline_step + node_config['pipeline_step'] = pipeline_step - node_config['temporal_id'] = temporal_id + node_config['temporal_ids'] = temporal_ids cmd = Solar.get_node_cmd(node_config) if hardware_option.lower() in AVAILABLE_HARDWARE_OPTIONS: @@ -68,9 +96,16 @@ def from_config(ctx, config_file, verbose=False, pipeline_step=None): kickoff_local_job(ctx, cmd, pipeline_step) -def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', - memory=None, walltime=4, feature=None, - stdout_path='./stdout/'): +def kickoff_slurm_job( + ctx, + cmd, + pipeline_step=None, + alloc='sup3r', + memory=None, + walltime=4, + feature=None, + stdout_path='./stdout/', +): """Run sup3r on HPC via SLURM job submission. Parameters @@ -96,8 +131,17 @@ def kickoff_slurm_job(ctx, cmd, pipeline_step=None, alloc='sup3r', stdout_path : str Path to print .stdout and .stderr files. """ - BaseCLI.kickoff_slurm_job(ModuleName.SOLAR, ctx, cmd, alloc, memory, - walltime, feature, stdout_path, pipeline_step) + BaseCLI.kickoff_slurm_job( + ModuleName.SOLAR, + ctx, + cmd, + alloc, + memory, + walltime, + feature, + stdout_path, + pipeline_step, + ) def kickoff_local_job(ctx, cmd, pipeline_step=None):