import datetime
import glob
import importlib
import subprocess
import tempfile
from pathlib import Path
import atmoswing_vigicrues as asv
[docs]
class Controller:
"""
Classe principale pour la gestion des prévisions AtmoSwing pour le réseau Vigicrues.
Parameters
----------
cli_options : retour de la fonction parse_args() de la classe
argparse.ArgumentParser
Options passées en lignes de commandes à la fonction main()
Attributes
----------
options : instance de la classe Options
Options de la prévision combinant les arguments passés lors de l'utilisation en
lignes de commandes et les options du fichier de configuration.
time_increment : int
Incrément de temps en heures pour l'émission de la prévision
(par défaut 6 heures).
date : datetime.datetime
Date de la prévision.
existing_files : list
Liste des fichiers de prévision d'AtmoSwing Forecaster déjà existants pour
l'échéance en cours.
pre_actions : list
Liste des actions préalables à la prévision.
post_actions : list
Liste des actions postérieures à la prévision.
disseminations : list
Liste des actions de dissémination.
"""
def __init__(self, cli_options):
"""
Initialisation de l'instance Controller
"""
self.options = asv.Options(cli_options)
self.time_increment = 6
if hasattr(cli_options, 'time_increment') and \
cli_options.time_increment is not None:
self.time_increment = cli_options.time_increment
self.date = datetime.datetime.now(datetime.timezone.utc)
self.existing_files = []
self.pre_actions = []
self.post_actions = []
self.disseminations = []
self._register_pre_actions()
self._register_post_actions()
self._register_disseminations()
[docs]
def run(self, date=None) -> int:
"""
Exécution du flux de la prévision et du postprocessing.
Parameters
----------
date : datetime.datetime
La date de la prévision (par défaut, la date actuelle est utilisée).
Returns
-------
int
Le code de retour (0 en cas de succès)
"""
if date:
self.date = date
self._fix_date()
try:
self._run_pre_actions()
self.existing_files = self._list_atmoswing_output_files()
self._run_atmoswing()
self._run_post_actions()
self._run_disseminations()
except asv.Error as e:
print("La prévision a échoué.")
print(f"Erreur: {e}")
return -1
except Exception as e:
print("La prévision a échoué.")
print(f"Erreur: {e}")
return -1
return 0
def _register_pre_actions(self):
"""
Enregistre les actions préalables à la prévision
"""
if self.options.has('pre_actions'):
for action in self.options.get('pre_actions'):
if 'active' in action and not action['active']:
continue
name = action['name']
module = action['uses']
print(f"Chargement de la pre-action '{name}'")
if not hasattr(importlib.import_module('atmoswing_vigicrues'), module):
raise asv.Error(f"L'action {module} est inconnue.")
fct = getattr(importlib.import_module('atmoswing_vigicrues'), module)
self.pre_actions.append(fct(name, action['with']))
def _register_post_actions(self):
"""
Enregistre les actions postérieures à la prévision
"""
if self.options.has('post_actions'):
for action in self.options.get('post_actions'):
if 'active' in action and not action['active']:
continue
name = action['name']
module = action['uses']
print(f"Chargement de la post-action '{name}'")
if not hasattr(importlib.import_module('atmoswing_vigicrues'), module):
raise asv.Error(f"L'action {module} est inconnue.")
fct = getattr(importlib.import_module('atmoswing_vigicrues'), module)
self.post_actions.append(fct(name, action['with']))
def _register_disseminations(self):
"""
Enregistre les actions préalables à la prévision
"""
if self.options.has('disseminations'):
for action in self.options.get('disseminations'):
if 'active' in action and not action['active']:
continue
name = action['name']
module = action['uses']
print(f"Chargement de la disseminations '{name}'")
if not hasattr(importlib.import_module('atmoswing_vigicrues'), module):
raise asv.Error(f"L'action {module} est inconnue.")
fct = getattr(importlib.import_module('atmoswing_vigicrues'), module)
self.disseminations.append(fct(name, action['with']))
def _run_pre_actions(self):
"""
Exécute les opérations préalables à la prévision par AtmoSwing.
"""
if not self.pre_actions or len(self.pre_actions) == 0:
return
attempts_max_hours = 7 * 24
attempts_step_hours = 6
for action in self.pre_actions:
attempts_max_hours = min(attempts_max_hours, action.attempts_max_hours)
attempts_step_hours = max(attempts_step_hours, action.attempts_step_hours)
attempts_hours = 0
while attempts_hours < attempts_max_hours:
success = True
for action in self.pre_actions:
print(f"Exécution de : '{action.type_name}' [{action.name}]")
if not action.run(self.date):
attempts_hours += attempts_step_hours
success = False
break
if success:
print(" -> Exécution correcte.")
break
else:
print(" -> Recul de l'heure de la prévision.")
self._back_in_time(attempts_step_hours)
else:
print(" -> Échec de l'exécution.")
print(" -> Nombre maximum de tentatives atteint pour la pré-action.")
def _run_atmoswing(self):
"""
Exécution d'AtmoSwing.
"""
run = self.options.get('atmoswing')
if 'active' in run and run['active'] is False:
print(" -> Prévision par AtmoSwing Forecaster désactivée.")
return True
name = run['name']
options = run['with']
cmd = self._build_atmoswing_cmd(options)
print(f"Exécution de : '{name}'")
print(f"Prévision pour la date : {self.date.strftime('%Y-%m-%d %H')}")
print("Commande: " + ' '.join(cmd))
try:
ret = subprocess.run(cmd, capture_output=True)
if ret.returncode != 0:
print(" -> Échec de l'exécution.")
self._parse_log_file()
raise asv.Error("Erreur de AtmoSwing Forecaster.")
else:
print(" -> Exécution correcte.")
except Exception as e:
print(" -> Échec de l'exécution.")
self._parse_log_file()
raise asv.Error(f"Exception de AtmoSwing Forecaster: {e}")
def _build_atmoswing_cmd(self, options):
now_str = self.date.strftime("%Y%m%d%H")
cmd = []
if 'atmoswing_path' not in options or not options['atmoswing_path']:
cmd.append("atmoswing-forecaster")
else:
cmd.append(options['atmoswing_path'])
if 'batch_file' not in options or not options['batch_file']:
raise asv.Error("Option 'batch_file' non fournie.")
cmd.append("-f")
cmd.append(options['batch_file'])
if 'target' in options:
if options['target'] == 'now':
cmd.append(f"--forecast-date={now_str}")
elif options['target'] == 'past':
if 'target_nb_days' not in options or not options['target_nb_days']:
raise asv.Error("Option 'target_nb_days' non fournie.")
nb_days = options['target_nb_days']
cmd.append(f"--forecast-past={nb_days}")
elif options['target'] == 'date':
if 'target_date' not in options or not options['target_date']:
raise asv.Error("Option 'target_date' non fournie.")
date = options['target_date']
cmd.append(f"--forecast-date={date}")
else:
cmd.append(f"--forecast-date={now_str}")
if 'proxy' in options and options['proxy']:
cmd.append(f"--proxy={options['proxy']}")
if 'proxy_user' in options and options['proxy_user']:
cmd.append(f"--proxy-user={options['proxy_user']}")
return cmd
def _run_post_actions(self):
"""
Exécute les opérations postérieures à la prévision par AtmoSwing.
"""
if not self.post_actions or len(self.post_actions) == 0:
return
files = self._list_atmoswing_output_files()
if len(files) == 0:
print(" -> Aucun nouveau fichier à traiter en post-action.")
return
for action in self.post_actions:
print(f"Exécution de : '{action.type_name}' [{action.name}]")
action.feed(files, {'forecast_date': self.date})
if action.run():
print(" -> Exécution correcte.")
else:
print(" -> Échec de l'exécution.")
def _run_disseminations(self):
"""
Exécute les opérations de diffusion.
"""
if not self.disseminations or len(self.disseminations) == 0:
return
for action in self.disseminations:
print(f"Exécution de : '{action.type_name}' [{action.name}]")
local_dir = action.local_dir
extension = action.extension
files = self._list_files(local_dir, extension)
action.feed(files)
if action.run(self.date):
print(" -> Exécution correcte.")
else:
print(" -> Échec de l'exécution.")
def _fix_date(self):
date = self.date
if isinstance(date, str):
date = datetime.datetime.strptime(date, "%Y-%m-%d %H")
hour = date.hour
hour = self.time_increment * (hour // self.time_increment)
self.date = datetime.datetime(date.year, date.month, date.day, hour)
def _back_in_time(self, time_increment):
self.date = self.date - datetime.timedelta(hours=time_increment)
def _list_atmoswing_output_files(self):
output_dir = self.options.get('atmoswing')['with']['output_dir']
return self._list_files(output_dir, '.nc', '%Y-%m-%d_%H')
def _get_files_for_post_actions(self):
files = self._list_atmoswing_output_files()
files = [x for x in files if x not in self.existing_files]
return files
def _list_files(self, local_dir, ext, pattern='%Y-%m-%d_%H'):
local_dir = asv.utils.build_date_dir_structure(local_dir, self.date)
pattern = f"{str(local_dir)}/{self.date.strftime(pattern)}{f'.*{ext}'}"
files = glob.glob(pattern)
return files
@staticmethod
def _parse_log_file():
tmp_dir = Path(tempfile.gettempdir())
log_file = tmp_dir / "AtmoSwingForecaster.log"
if not log_file.exists():
print(f" -> Le journal des logs n'a pas été trouvé ({str(log_file)}).")
with open(str(log_file)) as file:
for item in file:
content = item.replace("\r\n", "").replace("\n", "")
print(f" | {content}")