first commit
This commit is contained in:
@@ -0,0 +1,62 @@
|
||||
import csv
|
||||
from pathlib import Path
|
||||
from typing import List, Optional, Dict, Any
|
||||
|
||||
class CSVDatabaseManager:
|
||||
def __init__(self, db_path: Path):
|
||||
self.db_path = db_path
|
||||
self.fieldnames = ['filename', 'filepath', 'status', 'ocr_text', 'detected_category', 'confidence', 'created_at']
|
||||
self._initialize_csv()
|
||||
|
||||
def _initialize_csv(self):
|
||||
if not self.db_path.exists():
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(self.db_path, 'w', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=self.fieldnames)
|
||||
writer.writeheader()
|
||||
|
||||
def _read_all(self) -> List[Dict[str, Any]]:
|
||||
with open(self.db_path, 'r', newline='', encoding='utf-8') as f:
|
||||
return list(csv.DictReader(f))
|
||||
|
||||
def _write_all(self, data: List[Dict[str, Any]]):
|
||||
with open(self.db_path, 'w', newline='', encoding='utf-8') as f:
|
||||
writer = csv.DictWriter(f, fieldnames=self.fieldnames)
|
||||
writer.writeheader()
|
||||
writer.writerows(data)
|
||||
|
||||
def add_files(self, file_paths: List[Path]):
|
||||
data = self._read_all()
|
||||
existing_filenames = {row['filename'] for row in data}
|
||||
|
||||
new_entries = []
|
||||
for path in file_paths:
|
||||
if path.name not in existing_filenames:
|
||||
new_entries.append({
|
||||
'filename': path.name,
|
||||
'filepath': str(path.absolute()),
|
||||
'status': 'pending',
|
||||
'ocr_text': '',
|
||||
'detected_category': '',
|
||||
'confidence': '',
|
||||
'created_at': '' # Could add timestamp here
|
||||
})
|
||||
|
||||
if new_entries:
|
||||
data.extend(new_entries)
|
||||
self._write_all(data)
|
||||
|
||||
def get_pending_files(self) -> List[Dict[str, Any]]:
|
||||
return [row for row in self._read_all() if row['status'] == 'pending']
|
||||
|
||||
def update_file_status(self, filename: str, status: str, ocr_text: Optional[str] = None, category: Optional[str] = None, confidence: Optional[float] = None, new_filepath: Optional[str] = None):
|
||||
data = self._read_all()
|
||||
for row in data:
|
||||
if row['filename'] == filename:
|
||||
row['status'] = status
|
||||
if ocr_text is not None: row['ocr_text'] = ocr_text
|
||||
if category is not None: row['detected_category'] = category
|
||||
if confidence is not None: row['confidence'] = str(confidence)
|
||||
if new_filepath is not None: row['filepath'] = new_filepath
|
||||
break
|
||||
self._write_all(data)
|
||||
Reference in New Issue
Block a user