Files
2026-06-28 20:21:40 +02:00

153 lines
5.6 KiB
Python

import logging
import shutil
from pathlib import Path
from typing import List, Optional
from tqdm import tqdm
from .ocr import TweetOCRExtractor
from .classifier import ZeroShotClassifier
from .database_manager import CSVDatabaseManager
logger = logging.getLogger(__name__)
# Extensions d'images supportées
SUPPORTED_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.webp', '.bmp', '.tiff'}
class ImageThemeOrganizer:
"""
Orchestrateur principal du traitement : parcourt les images, extrait le texte,
classifie, et organise les fichiers dans des sous-dossiers via une base de données.
"""
def __init__(
self,
input_dir: Path,
output_dir: Optional[Path] = None,
ocr_languages: Optional[List[str]] = None,
model_name: str = "MoritzLaurer/mDeBERTa-v3-base-xnli-multilingual-nli-2mil7",
confidence_threshold: float = 0.35,
copy_only: bool = False,
dry_run: bool = False,
db_path: Path = Path("captures/ok/tweets.csv")
):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir) if output_dir else self.input_dir / "ok"
self.confidence_threshold = confidence_threshold
self.copy_only = copy_only
self.dry_run = dry_run
self.db = CSVDatabaseManager(db_path)
# Initialisation des modules (lazy)
self.ocr_extractor = TweetOCRExtractor(languages=ocr_languages)
self.classifier = ZeroShotClassifier(model_name=model_name)
def scan_new_files(self):
"""
Trouve tous les fichiers images dans le dossier d'entrée et les ajoute à la base de données.
"""
images = []
for file in self.input_dir.iterdir():
if file.is_file() and file.suffix.lower() in SUPPORTED_EXTENSIONS:
images.append(file)
if images:
self.db.add_files(images)
logger.info(f"Ajout de {len(images)} fichiers à la base de données.")
def process_pending(self):
"""
Traite tous les fichiers en attente dans la base de données.
"""
pending_files = self.db.get_pending_files()
if not pending_files:
logger.info("Aucune image en attente.")
return
for record in tqdm(pending_files, desc="Traitement des tweets"):
self._process_record(record)
def _process_record(self, record: dict):
image_path = Path(record['filepath'])
logger.info(f"Traitement de l'image : {image_path.name}")
try:
# 1. Extraction du texte
text = self.ocr_extractor.extract_text(image_path)
dest_category = "Non-classifié"
if not text:
logger.info(f" Aucun texte extrait pour {image_path.name}.")
dest_category = "Sans_Texte"
confidence = 1.0
else:
# 2. Classification du texte (Harassment categories)
classification_result = self.classifier.classify(text)
# Récupérer la meilleure catégorie
if classification_result and classification_result.get("labels"):
best_label = classification_result["labels"][0]
best_score = classification_result["scores"][0]
logger.info(f" Classification : {best_label} (score: {best_score:.2f})")
if best_score >= self.confidence_threshold:
dest_category = best_label
confidence = best_score
else:
dest_category = "Inclassable"
confidence = best_score
else:
dest_category = "Inclassable"
confidence = 0.0
# 3. Organisation physique du fichier
dest_path = self._organize_file(image_path, dest_category)
# 4. Mise à jour DB (on enregistre le nouvel emplacement du fichier
# afin que le rapport HTML pointe vers l'image déplacée).
self.db.update_file_status(
record['filename'], 'processed', text, dest_category, confidence,
new_filepath=str(dest_path)
)
except Exception as e:
logger.error(f" Échec du traitement de {image_path.name} : {e}")
self.db.update_file_status(record['filename'], 'error')
def _organize_file(self, image_path: Path, category: str) -> Path:
"""
Crée le dossier de destination et y déplace ou copie le fichier image.
Retourne le chemin de destination du fichier (utilisé pour la base de
données et le rapport HTML).
"""
dest_dir = self.output_dir / category
dest_path = dest_dir / image_path.name
if self.dry_run:
logger.info(f"[DRY-RUN] Déplacer {image_path.name} -> {dest_dir.name}/")
return dest_path
# Créer le dossier de catégorie si nécessaire
dest_dir.mkdir(parents=True, exist_ok=True)
try:
if self.copy_only:
shutil.copy2(image_path, dest_path)
else:
shutil.move(image_path, dest_path)
logger.info(f" Organisé : {image_path.name} -> {dest_dir.name}/")
except Exception as e:
logger.error(f"Impossible d'organiser le fichier {image_path.name} vers {dest_dir} : {e}")
return dest_path
def run(self) -> dict:
"""
Exécute le processus complet.
"""
self.scan_new_files()
self.process_pending()
return {}