Source code for atmoswing_vigicrues.postactions.export_bdapbp

import datetime
import json
from pathlib import Path

import numpy as np

import atmoswing_vigicrues as asv

from .postaction import PostAction


[docs] class ExportBdApBp(PostAction): """ Export des prévisions au format Json de la BdApBp. Parameters ---------- name: str Le nom de l'action options: dict Un dictionnaire contenant les options de l'action. Les champs possibles sont: * output_dir : str Chemin cible pour l'enregistrement des fichiers. * number_analogs : int Nombre d'analogues maximal à conserver (valeurs les plus élevées). -1 pour toutes les analogues. * only_relevant_stations : bool Exporter uniquement les stations pour lesquelles la méthode a été calibrée. * use_indentation : bool Ajouter une indentation aux fichiers produits. Attributes ---------- type_name : str Le nom du type de post-action. name : str Le nom de l'action. status : int Le statut de l'action. message : str Un éventuel message d'erreur de l'action. output_dir : str Chemin cible pour l'enregistrement des fichiers. number_analogs : int Nombre d'analogues maximal à conserver (valeurs les plus élevées). -1 pour toutes les analogues. only_relevant_stations : bool Exporter uniquement les stations pour lesquelles la méthode a été calibrée. use_indentation : bool Ajouter une indentation aux fichiers produits. """ def __init__(self, name, options): if not asv.has_netcdf: raise ImportError("Le paquet netCDF4 est requis pour cette action.") self.type_name = "Export BdApBp" self.name = name self.status = 100 self.message = "" self.output_dir = options['output_dir'] asv.check_dir_exists(self.output_dir, True) if 'number_analogs' in options: self.number_analogs = options['number_analogs'] else: self.number_analogs = -1 if 'only_relevant_stations' in options: self.only_relevant_stations = options['only_relevant_stations'] else: self.only_relevant_stations = True if 'use_indentation' in options: self.use_indentation = options['use_indentation'] else: self.use_indentation = False self._reset_status() super().__init__()
[docs] def run(self) -> bool: """ Exécution de la post-action. Erreurs possibles: * 100 : Absence du fichier netcdf. * 110 : Fichier netcdf corrompu. * 200 : Erreur lors du traitement fichier netcdf. Returns ------- bool Vrai (True) en cas de succès, faux (False) autrement. """ if not self._file_paths: print(" -> Aucun fichier à traiter") return True files_count = 0 for file in self._file_paths: file = Path(file) # Nom du fichier file_path = self._build_file_path(file) if file_path.exists(): continue self._reset_status() nc_file = None if not asv.file_exists(file): self.status = 100 self.message = "Absence du fichier netcdf." else: try: nc_file = asv.Dataset(file, 'r', format='NETCDF4') except Exception: self.status = 110 self.message = "Fichier netcdf corrompu." try: metadata = self._create_metadata_block(nc_file) data = self._create_data_block(nc_file) statistics = self._create_statistics_block(nc_file) except Exception: metadata = None data = None statistics = None self.status = 200 self.message = "Erreur lors du traitement fichier netcdf." exported_analogs = "full" if self.number_analogs > 0: exported_analogs = f"{self.number_analogs} best" data = { 'status': self.status, 'report': { 'file': file.name, 'date': self._get_now_formatted(), 'message': self.message, 'exported_analogs': exported_analogs, 'only_relevant_stations': self.only_relevant_stations }, 'metadata': metadata, 'data': data, 'statistics': statistics, } with open(file_path, "w", encoding="utf-8", newline='\r\n') as outfile: if self.use_indentation: json.dump(data, outfile, indent=4, ensure_ascii=False) else: json.dump(data, outfile, ensure_ascii=False) if nc_file: nc_file.close() files_count += 1 print(f" -> Nombre de fichiers exportés : {files_count}.") return True
def _create_metadata_block(self, nc_file): block = { 'atmoswing': { 'creation_date': nc_file.creation_date, 'origin': nc_file.origin }, 'predictand': { 'temporal_resolution': nc_file.predictand_temporal_resolution, 'dataset_id': nc_file.predictand_dataset_id, 'database': nc_file.predictand_database, 'station_ids': nc_file.predictand_station_ids }, 'description': { 'method_id': nc_file.method_id, 'method_id_display': nc_file.method_id_display, 'specific_tag': nc_file.specific_tag, 'specific_tag_display': nc_file.specific_tag_display, }, 'entities': self._create_entities_block(nc_file), } return block @staticmethod def _create_entities_block(nc_file): ids = [str(x) for x in nc_file['station_ids'][:]] names = [str(x) for x in nc_file['station_names'][:]] oids = [str(x) for x in nc_file['station_official_ids'][:]] block = {} for id, name, oid in zip(ids, names, oids): block[id] = [name, oid] return block def _create_data_block(self, nc_file): # Extracting variables station_ids = nc_file['station_ids'][:] target_dates = nc_file['target_dates'][:] target_dates = asv.utils.mjd_to_datetime(target_dates) analog_dates = nc_file['analog_dates'][:] analog_dates = asv.utils.mjd_to_datetime(analog_dates) analogs_nb = nc_file['analogs_nb'][:] analog_criteria = nc_file['analog_criteria'][:] analog_values = nc_file['analog_values_raw'][:] assert analog_values.shape[0] == len(station_ids) time_format_analogs, time_format_target = self._get_time_format(target_dates) if self.only_relevant_stations: station_ids_slct = self._extract_station_ids(nc_file) else: station_ids_slct = station_ids block = {} for station_id in station_ids_slct: i_station = np.where(station_ids == station_id) block_target_date = {} for i_target, target_date in enumerate(target_dates): block_analogs = [] # Get start/end of the analogs start = np.sum(analogs_nb[0:i_target]) n_analogs = analogs_nb[i_target] end = start + n_analogs # Extract relevant values analog_dates_sub = analog_dates[start:end] analog_criteria_sub = analog_criteria[start:end] analog_values_sub = analog_values[i_station, start:end].flatten() # Sort by decreasing precipitation values permutation = (-analog_values_sub).argsort() analog_dates_sub = analog_dates_sub[permutation] analog_criteria_sub = analog_criteria_sub[permutation] analog_values_sub = analog_values_sub[permutation] ranks = np.arange(1, n_analogs + 1)[permutation] frequency = asv.utils.build_cumulative_frequency(n_analogs) frequency = np.flip(frequency) if 0 < self.number_analogs < n_analogs: analog_dates_sub = analog_dates_sub[0:self.number_analogs] analog_criteria_sub = analog_criteria_sub[0:self.number_analogs] analog_values_sub = analog_values_sub[0:self.number_analogs] ranks = ranks[0:self.number_analogs] frequency = frequency[0:self.number_analogs] for i_analog, analog_date in enumerate(analog_dates_sub): block_analogs.append([ round(frequency[i_analog], 3), analog_date.item().strftime(time_format_analogs), round(float(analog_criteria_sub[i_analog]), 2), round(float(analog_values_sub[i_analog]), 2) ]) target_date_str = target_date.item().strftime(time_format_target) block_target_date[target_date_str] = block_analogs block[str(station_id)] = block_target_date return block def _create_statistics_block(self, nc_file): # Extracting variables station_ids = nc_file['station_ids'][:] target_dates = nc_file['target_dates'][:] target_dates = asv.utils.mjd_to_datetime(target_dates) analogs_nb = nc_file['analogs_nb'][:] analog_values = nc_file['analog_values_raw'][:] time_format_analogs, time_format_target = self._get_time_format(target_dates) if self.only_relevant_stations: station_ids_slct = self._extract_station_ids(nc_file) else: station_ids_slct = station_ids block = {} for station_id in station_ids_slct: i_station = np.where(station_ids == station_id) block_target_date = {} for i_target, target_date in enumerate(target_dates): block_analogs = [] # Get start/end of the analogs start = np.sum(analogs_nb[0:i_target]) n_analogs = analogs_nb[i_target] end = start + n_analogs # Extract relevant values analog_values_sub = analog_values[i_station, start:end].flatten() # Sort by decreasing precipitation values analog_values_sub = np.sort(analog_values_sub)[::-1] frequency = asv.utils.build_cumulative_frequency(n_analogs) frequency = np.flip(frequency) for i_analog, analog_value in enumerate(analog_values_sub): block_analogs.append([ round(frequency[i_analog], 3), round(float(analog_value), 2) ]) target_date_str = target_date.item().strftime(time_format_target) block_target_date[target_date_str] = block_analogs block[str(station_id)] = block_target_date return block @staticmethod def _get_time_format(target_dates): assert len(target_dates) > 1 time_step = target_dates[1].astype(datetime.datetime) - \ target_dates[0].astype(datetime.datetime) time_step = time_step.total_seconds() show_hour = time_step < 24 * 3600 time_format_target = "%Y%m%d" time_format_analogs = "%Y-%m-%d" if show_hour: time_format_target = "%Y%m%d%H" time_format_analogs = "%Y-%m-%d %H" return time_format_analogs, time_format_target @staticmethod def _to_str_dict(var): var = var.flatten() new_list = [str(x) for x in var] return dict(enumerate(new_list)) @staticmethod def _to_int_dict(var): var = var.flatten() new_list = [int(x) for x in var] return dict(enumerate(new_list)) def _reset_status(self): self.status = 0 self.message = "Exécution correcte" @staticmethod def _get_now_formatted(): now = datetime.datetime.now() return now.strftime("%Y-%m-%d %H:%M:%S") def _get_output_path(self, date): local_path = asv.build_date_dir_structure(self.output_dir, date) local_path.mkdir(parents=True, exist_ok=True) return local_path def _build_file_path(self, file): original_file_name = Path(file).name if not original_file_name: now = datetime.datetime.now() original_file_name = now.strftime("%Y-%m-%d_%H%M%S") + '_missing' file_name = f'{original_file_name}.json' if '.nc' in original_file_name: file_name = original_file_name.replace('.nc', '.json') output_dir = self._get_output_path(self._get_metadata('forecast_date')) file_path = output_dir / file_name return file_path