Source code for atmoswing_vigicrues.preactions.transfer_sftp_in

import fnmatch
import os
import tarfile
from pathlib import Path

import paramiko

import atmoswing_vigicrues as asv

from .preaction import PreAction


[docs] class TransferSftpIn(PreAction): """ Récupération des prévisions des modèles météo par SFTP. Parameters ---------- name: str Le nom de l'action options: dict Un dictionnaire contenant les options de l'action. Les champs possibles sont: * local_dir : str Répertoire cible pour l'enregistrement des fichiers. * prefix : str Prefix des fichiers à importer. * variables : list (optionnel) Liste des variables météorologiques à importer. * hostname : str Adresse du serveur distant. * port : int Port du serveur distant. * username : str Utilisateur ayant un accès au serveur. * password : str Mot de passe de l'utilisateur sur le serveur. * proxy_host : str Adresse du proxy, si nécessaire. * proxy_port : int Port du proxy si nécessaire (par défaut: 1080). * remote_dir : str Chemin sur le serveur distant où se trouvent les fichiers. * attempts_max_hours : int Décalage temporel autorisé pour rechercher d'anciens fichiers * attempts_step_hours : int Pas de temps auquel décrémenter la date pour rechercher d'anciens fichiers Attributes ---------- type_name : str Le nom du type de l'action name : str Le nom de l'action local_dir : str Répertoire cible pour l'enregistrement des fichiers. prefix : str Prefix des fichiers à importer. hostname : str Adresse du serveur distant. port : int Port du serveur distant. username : str Utilisateur ayant un accès au serveur. password : str Mot de passe de l'utilisateur sur le serveur. remote_dir : str Chemin sur le serveur distant où se trouvent les fichiers. variables : list Liste des variables météorologiques à importer. proxy_host : str Adresse du proxy, si nécessaire. proxy_port : int Port du proxy si nécessaire (par défaut: 1080). """ def __init__(self, name, options): """ Initialisation de l'instance TransferSftp """ self.type_name = "Transfert SFTP" self.name = name self.local_dir = options['local_dir'] self.prefix = options['prefix'] self.hostname = options['hostname'] self.port = int(options['port']) self.username = options['username'] self.password = options['password'] self.remote_dir = options['remote_dir'] self._set_attempts_attributes(options) if 'variables' in options and len(options['variables']) > 0: self.variables = options['variables'] else: self.variables = None if 'proxy_host' in options and len(options['proxy_host']) > 0: self.proxy_host = options['proxy_host'] if 'proxy_port' in options: if isinstance(options['proxy_port'], str) and len( options['proxy_port']) > 0: self.proxy_port = int(options['proxy_port']) elif isinstance(options['proxy_port'], int): self.proxy_port = options['proxy_port'] else: raise asv.Error("Le port du proxy doit être une chaîne de " "caractères ou un entier.") else: self.proxy_port = 1080 else: self.proxy_host = None super().__init__()
[docs] def run(self, date) -> bool: """ Exécution de la récupération par SFTP. Parameters ---------- date : datetime.datetime Date de la prévision. Returns ------- bool Vrai (True) en cas de succès, faux (False) autrement. """ try: # Check if files already in the local folder (only with defined variables) if self.variables is not None: if self._files_already_present(date): print(" -> Fichiers déjà présents localement.") return True # Create a transport object for the SFTP connection transport = paramiko.Transport((self.hostname, self.port)) if self.proxy_host: transport.start_client() transport.open_channel('direct-tcpip', (self.hostname, self.port), (self.proxy_host, self.proxy_port)) # Authenticate with the SFTP server transport.connect(username=self.username, password=self.password) # Create an SFTP client object sftp = transport.open_sftp_client() # Change the directory to the desired remote directory sftp.chdir(self.remote_dir) # Download files local_path = Path(self._get_local_path(date)) forecast_datetime = date.strftime("%Y%m%d%H") f_exist_dt, f_new_dt = self._get_files(sftp, forecast_datetime, local_path) if f_exist_dt + f_new_dt == 0: print(f" -> Pas de fichier disponible pour {forecast_datetime}.") return False forecast_date = date.strftime("%Y%m%d") f_exist_d, f_new_d = self._get_files(sftp, forecast_date, local_path) print(f" -> Nombre de fichiers existants : {f_exist_d - f_new_dt}.") print(f" -> Nombre de fichiers récupérés : {f_new_dt + f_new_d}.") sftp.close() transport.close() return True except paramiko.ssh_exception.PasswordRequiredException as e: print(f"SFTP PasswordRequiredException {e}") except paramiko.ssh_exception.BadAuthenticationType as e: print(f"SFTP BadAuthenticationType {e}") except paramiko.ssh_exception.AuthenticationException as e: print(f"SFTP AuthenticationException {e}") except paramiko.ssh_exception.ChannelException as e: print(f"SFTP ChannelException {e}") except paramiko.ssh_exception.ProxyCommandFailure as e: print(f"SFTP ProxyCommandFailure {e}") except paramiko.ssh_exception.SSHException as e: print(f"SFTP SSHException {e}") except FileNotFoundError as e: print(f"SFTP FileNotFoundError {e}") except Exception as e: print(f"Le rapatriement des données par SFTP a échoué ({e}).") if 'sftp' in locals(): sftp.close() if 'transport' in locals(): transport.close() return False
def _get_files(self, sftp, forecast_date, local_path): files_count_existing = 0 files_count_new = 0 for remote_file in sftp.listdir('.'): pattern = f'{self.prefix.lower()}*_{forecast_date}*.*' if self.variables is not None: for variable in self.variables: pattern = f'{self.prefix.lower()}_{variable.lower()}' \ f'_{forecast_date}*.*' if fnmatch.fnmatch(remote_file.lower(), pattern): break if fnmatch.fnmatch(remote_file.lower(), pattern): local_file = local_path / remote_file if local_file.exists(): files_count_existing += 1 continue sftp.get(remote_file, str(local_file), prefetch=False) self._unpack_if_needed(local_file, local_path) files_count_new += 1 return files_count_existing, files_count_new @staticmethod def _chdir_or_mkdir(dir_path, sftp): try: sftp.chdir(dir_path) except OSError: sftp.mkdir(dir_path) sftp.chdir(dir_path) def _get_local_path(self, date): local_path = asv.build_date_dir_structure(self.local_dir, date) local_path.mkdir(parents=True, exist_ok=True) return local_path def _files_already_present(self, date): local_path = Path(self._get_local_path(date)) forecast_datetime = date.strftime("%Y%m%d%H") for variable in self.variables: pattern = f'{self.prefix.lower()}_{variable.lower()}' \ f'_{forecast_datetime}*.*' local_files = local_path.glob(pattern) file_found = False for local_file in local_files: if fnmatch.fnmatch(str(local_file.name).lower(), pattern): file_found = True break if not file_found: return False return True @staticmethod def _unpack_if_needed(local_file, local_path): if local_file.suffix in ['.gz', '.tgz', '.xz', '.txz', '.bz2', '.tbz', '.tbz2', '.tb2']: file = tarfile.open(local_file) for member in file.getmembers(): if member.isreg(): member.name = os.path.basename(member.name) file.extract(member, local_path) file.close()