import csv from pathlib import Path from typing import List, Optional, Dict, Any class CSVDatabaseManager: def __init__(self, db_path: Path): self.db_path = db_path self.fieldnames = ['filename', 'filepath', 'status', 'ocr_text', 'detected_category', 'confidence', 'created_at'] self._initialize_csv() def _initialize_csv(self): if not self.db_path.exists(): self.db_path.parent.mkdir(parents=True, exist_ok=True) with open(self.db_path, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=self.fieldnames) writer.writeheader() def _read_all(self) -> List[Dict[str, Any]]: with open(self.db_path, 'r', newline='', encoding='utf-8') as f: return list(csv.DictReader(f)) def _write_all(self, data: List[Dict[str, Any]]): with open(self.db_path, 'w', newline='', encoding='utf-8') as f: writer = csv.DictWriter(f, fieldnames=self.fieldnames) writer.writeheader() writer.writerows(data) def add_files(self, file_paths: List[Path]): data = self._read_all() existing_filenames = {row['filename'] for row in data} new_entries = [] for path in file_paths: if path.name not in existing_filenames: new_entries.append({ 'filename': path.name, 'filepath': str(path.absolute()), 'status': 'pending', 'ocr_text': '', 'detected_category': '', 'confidence': '', 'created_at': '' # Could add timestamp here }) if new_entries: data.extend(new_entries) self._write_all(data) def get_pending_files(self) -> List[Dict[str, Any]]: return [row for row in self._read_all() if row['status'] == 'pending'] def update_file_status(self, filename: str, status: str, ocr_text: Optional[str] = None, category: Optional[str] = None, confidence: Optional[float] = None, new_filepath: Optional[str] = None): data = self._read_all() for row in data: if row['filename'] == filename: row['status'] = status if ocr_text is not None: row['ocr_text'] = ocr_text if category is not None: row['detected_category'] = category if confidence is not None: row['confidence'] = str(confidence) if new_filepath is not None: row['filepath'] = new_filepath break self._write_all(data)