63 lines
2.6 KiB
Python
63 lines
2.6 KiB
Python
import csv
|
|
from pathlib import Path
|
|
from typing import List, Optional, Dict, Any
|
|
|
|
class CSVDatabaseManager:
|
|
def __init__(self, db_path: Path):
|
|
self.db_path = db_path
|
|
self.fieldnames = ['filename', 'filepath', 'status', 'ocr_text', 'detected_category', 'confidence', 'created_at']
|
|
self._initialize_csv()
|
|
|
|
def _initialize_csv(self):
|
|
if not self.db_path.exists():
|
|
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(self.db_path, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=self.fieldnames)
|
|
writer.writeheader()
|
|
|
|
def _read_all(self) -> List[Dict[str, Any]]:
|
|
with open(self.db_path, 'r', newline='', encoding='utf-8') as f:
|
|
return list(csv.DictReader(f))
|
|
|
|
def _write_all(self, data: List[Dict[str, Any]]):
|
|
with open(self.db_path, 'w', newline='', encoding='utf-8') as f:
|
|
writer = csv.DictWriter(f, fieldnames=self.fieldnames)
|
|
writer.writeheader()
|
|
writer.writerows(data)
|
|
|
|
def add_files(self, file_paths: List[Path]):
|
|
data = self._read_all()
|
|
existing_filenames = {row['filename'] for row in data}
|
|
|
|
new_entries = []
|
|
for path in file_paths:
|
|
if path.name not in existing_filenames:
|
|
new_entries.append({
|
|
'filename': path.name,
|
|
'filepath': str(path.absolute()),
|
|
'status': 'pending',
|
|
'ocr_text': '',
|
|
'detected_category': '',
|
|
'confidence': '',
|
|
'created_at': '' # Could add timestamp here
|
|
})
|
|
|
|
if new_entries:
|
|
data.extend(new_entries)
|
|
self._write_all(data)
|
|
|
|
def get_pending_files(self) -> List[Dict[str, Any]]:
|
|
return [row for row in self._read_all() if row['status'] == 'pending']
|
|
|
|
def update_file_status(self, filename: str, status: str, ocr_text: Optional[str] = None, category: Optional[str] = None, confidence: Optional[float] = None, new_filepath: Optional[str] = None):
|
|
data = self._read_all()
|
|
for row in data:
|
|
if row['filename'] == filename:
|
|
row['status'] = status
|
|
if ocr_text is not None: row['ocr_text'] = ocr_text
|
|
if category is not None: row['detected_category'] = category
|
|
if confidence is not None: row['confidence'] = str(confidence)
|
|
if new_filepath is not None: row['filepath'] = new_filepath
|
|
break
|
|
self._write_all(data)
|