import datetime
from pathlib import Path
import numpy as np
import atmoswing_vigicrues as asv
from .postaction import PostAction
[docs]
class ExportPrv(PostAction):
"""
Export des prévisions au format PRV du logiciel Scores.
Parameters
----------
name: str
Le nom de l'action
options: dict
Un dictionnaire contenant les options de l'action. Les champs possibles sont:
* output_dir : str
Chemin cible pour l'enregistrement des fichiers.
* date_format : str
Format pour l'écriture des dates cibles. Défaut: "%d-%m-%Y"
* frequencies : list
Les fréquences à extraire.
Par défaut : [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95]
* combine_stations_in_one_file : bool
Combinaison des différentes stations (entités) dans un seul fichier.
Attributes
----------
type_name : str
Le nom du type de l'action.
name : str
Le nom de l'action.
output_dir : str
Chemin cible pour l'enregistrement des fichiers.
date_format : str
Format pour l'écriture des dates cibles. Défaut: "%d-%m-%Y"
frequencies : list
Les fréquences à extraire.
Par défaut : [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95]
combine_stations_in_one_file : bool
Combinaison des différentes stations (entités) dans un seul fichier.
"""
def __init__(self, name, options):
if not asv.has_netcdf:
raise ImportError("Le paquet netCDF4 est requis pour cette action.")
self.type_name = "Export PRV"
self.name = name
self.output_dir = options['output_dir']
asv.check_dir_exists(self.output_dir, True)
if 'date_format' in options:
self.date_format = options['date_format']
else:
self.date_format = "%d-%m-%Y"
if 'frequencies' in options:
self.frequencies = options['frequencies']
else:
self.frequencies = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 0.95]
if 'combine_stations_in_one_file' in options:
self.combine_stations_in_one_file = options['combine_stations_in_one_file']
else:
self.combine_stations_in_one_file = True
super().__init__()
[docs]
def run(self) -> bool:
"""
Exécution de la post-action.
Returns
-------
bool
Vrai (True) en cas de succès, faux (False) autrement.
"""
if not self._file_paths:
print(" -> Aucun fichier à traiter")
return True
files_count = 0
for file in self._file_paths:
nc_file = asv.Dataset(file, 'r', format='NETCDF4')
station_ids = self._extract_station_ids(nc_file)
header_comments = self._create_header_comments(nc_file)
if self.combine_stations_in_one_file:
file_path = self._build_file_path(file)
if file_path.exists():
continue
header_data = self._create_header_data(nc_file, station_ids)
content = self._create_content(nc_file, station_ids)
full_content = f"{header_comments}{header_data}{content}"
with open(file_path, 'w', encoding="utf-8", newline='\r\n') as outfile:
outfile.write(full_content)
else:
for station_id in station_ids:
file_path = self._build_file_path(file, station_id)
if file_path.exists():
continue
header_data = self._create_header_data(nc_file, station_id)
content = self._create_content(nc_file, station_id)
full_content = f"{header_comments}{header_data}{content}"
with open(file_path, 'w', encoding="utf-8", newline='\r\n') \
as outfile:
outfile.write(full_content)
nc_file.close()
files_count += 1
print(f" -> Nombre de fichiers exportés : {files_count}.")
return True
def _create_header_comments(self, nc_file):
list_frequencies = [str(int(100 * i)) for i in self.frequencies]
header = \
f"# Sortie du module ExportPrv de AtmoSwing-Vigicrues\n" \
f"# origin;{nc_file.origin}\n" \
f"# creation_date;{nc_file.creation_date}\n" \
f"# method_id;{nc_file.method_id}\n" \
f"# specific_tag;{nc_file.specific_tag}\n" \
f"# dataset_id;{nc_file.predictand_dataset_id}\n" \
f"# freqs;{';'.join(list_frequencies)}\n"
return header
def _create_header_data(self, nc_file, station_ids):
n = len(self.frequencies)
if isinstance(station_ids, list):
stat_ids = [f";{id}" * n for id in station_ids]
stat_ids = "".join(stat_ids)
elements = ";RR" * (n * len(station_ids))
series_ids = self._build_id_series(nc_file) * len(station_ids)
else:
stat_ids = f";{station_ids}" * n
elements = ";RR" * n
series_ids = self._build_id_series(nc_file)
header = \
f"Stations{stat_ids}\n" \
f"Grandeur{elements}\n" \
f"IdSeries;{series_ids}\n"
return header
def _create_content(self, nc_file, station_ids):
# Extracting variables
ids = nc_file['station_ids'][:]
target_dates = nc_file['target_dates'][:]
target_dates = asv.utils.mjd_to_datetime(target_dates)
analogs_nb = nc_file['analogs_nb'][:]
analog_values = nc_file['analog_values_raw'][:]
if not self.combine_stations_in_one_file:
station_ids = [station_ids]
time_format_target = self._get_time_format(target_dates)
content = ""
for i_target, target_date in enumerate(target_dates):
# Get start/end of the analogs
start = np.sum(analogs_nb[0:i_target])
n_analogs = analogs_nb[i_target]
end = start + n_analogs
target_date_str = target_date.item().strftime(time_format_target)
new_line = target_date_str
for station_id in station_ids:
i_station = np.where(ids == station_id)
if len(i_station[0]) == 0:
raise RuntimeError("La station n'a pas été trouvée lors de "
"l'export PRV.")
if len(i_station[0]) > 1:
raise RuntimeError("Le nombre d'entités trouvées est supérieur à 1"
" lors de l'export PRV.")
# Extract relevant values and build frequencies
analog_values_sub = analog_values[i_station, start:end]
analog_values_sub = np.sort(analog_values_sub).flatten()
frequencies = asv.utils.build_cumulative_frequency(n_analogs)
for freq in self.frequencies:
if len(frequencies) != len(analog_values_sub):
raise RuntimeError("La taille des vecteurs dans l'export PRV "
"n'est pas cohérente.")
val = np.interp(freq, frequencies, analog_values_sub)
new_line += f";{round(val, 2)}"
content += f"{new_line}\n"
return content
def _get_output_path(self, date):
local_path = asv.build_date_dir_structure(self.output_dir, date)
local_path.mkdir(parents=True, exist_ok=True)
return local_path
def _build_file_path(self, file, station_id=None):
original_file_name = Path(file).name
if not original_file_name:
now = datetime.datetime.now()
original_file_name = now.strftime("%Y-%m-%d_%H%M%S") + '_missing'
if station_id:
file_name = f'{original_file_name}_{station_id}.csv'
else:
file_name = f'{original_file_name}.csv'
if '.nc' in original_file_name:
if station_id:
file_name = original_file_name.replace('.nc', f'_{station_id}.csv')
else:
file_name = original_file_name.replace('.nc', '.csv')
output_dir = self._get_output_path(self._get_metadata('forecast_date'))
file_path = output_dir / file_name
return file_path
def _build_id_series(self, nc_file):
ids = ""
for freq in self.frequencies:
ids += f"{nc_file.method_id}.{nc_file.specific_tag}.{int(100 * freq):03d};"
return ids
def _get_time_format(self, target_dates):
assert len(target_dates) > 1
time_step = target_dates[1].astype(datetime.datetime) - \
target_dates[0].astype(datetime.datetime)
time_step = time_step.total_seconds()
show_hour = time_step < 24 * 3600
time_format_target = self.date_format
if show_hour:
time_format_target = self.date_format + " %H:%M"
return time_format_target