From 016beb318c8ba423ac72ad6352dffe4226475985 Mon Sep 17 00:00:00 2001 From: Kepka Ludovic Date: Sun, 14 Sep 2025 10:46:57 +0200 Subject: [PATCH] =?UTF-8?q?Procedure=20de=20migration=20des=20donn=C3=A9e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- database/__init__.py | 72 +++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/database/__init__.py b/database/__init__.py index 234698c..eb3a05b 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -1,42 +1,60 @@ -from flask_sqlalchemy import SQLAlchemy -from webapp import webapp +import logging +import json import os +from flask_sqlalchemy import SQLAlchemy +from sqlite3 import Cursor, Connection +from webapp import webapp + + basedir = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) webapp.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(basedir, "instance", "database.db")}' db = SQLAlchemy(webapp) -def migrate_game_bundle_if_needed(): - """Migre la table game_bundle si elle a l'ancienne structure (id comme clé primaire)""" - try: - connection = db.session.connection().connection - cursor = connection.cursor() - - cursor.execute("PRAGMA table_info(game_bundle)") - columns = cursor.fetchall() - - if columns: - has_id_primary = any(col[1] == 'id' and col[5] == 1 for col in columns) - - if has_id_primary: - print("Migration de game_bundle: ancienne structure détectée") - cursor.execute("DROP TABLE game_bundle") - connection.commit() - print("Ancienne table game_bundle supprimée") - - cursor.close() - except Exception as e: - print(f"Erreur lors de la vérification de migration: {e}") +def _tableHaveColumn(table_name:str, column_name:str, cursor:Cursor) -> bool: + cursor.execute(f'PRAGMA table_info({table_name})') + columns = cursor.fetchall() + return any(col[1] == column_name for col in columns) + +def _tableEmpty(table:str, cursor:Cursor) -> bool: + return cursor.execute(f'SELECT COUNT(*) FROM {table}').fetchone()[0] == 0 + +def _renameTable(old_name:str, new_name:str, cursor:Cursor) : + cursor.execute(f'ALTER TABLE {old_name} RENAME TO {new_name}') + +def _dropTable(table_name:str, cursor:Cursor) : + cursor.execute(f'DROP TABLE {table_name}') + +def _doPreImportMigration(cursor:Cursor): + if _tableHaveColumn('game_bundle', 'id', cursor) : + logging.info("Table game_bundle détécté, rennomage en game_bundle_old") + _renameTable('game_bundle', 'game_bundle_old', cursor) + +def _doPostImportMigration(cursor:Cursor): + if _tableEmpty('game_bundle', cursor) : + logging.info("remplir game_bundle avec game_bundle_old") + bundles = cursor.execute(f'SELECT * FROM game_bundle_old').fetchall() + for bundle in bundles : + name = bundle[1] + json_data = json.loads(bundle[2]) + url = json_data['url'] + logging.info(f'import du bundle {name}, {url}') + cursor.execute('INSERT INTO game_bundle(url, name, json) VALUES (?, ?, ?)', (url, name, json.dumps(json_data))) + logging.info("suppression de la table temporaire game_bundle_old") + _dropTable('game_bundle_old', cursor) with webapp.app_context(): - migrate_game_bundle_if_needed() - with open('database/schema.sql', 'r') as f: sql = f.read() - connection = db.session.connection().connection + connection : Connection = db.session.connection().connection try: - cursor = connection.cursor() + cursor : Cursor = connection.cursor() + _doPreImportMigration(cursor) cursor.executescript(sql) + _doPostImportMigration(cursor) + connection.commit() cursor.close() + except Exception as e: + logging.error(f"lors de l'import de la bdd : {e}") finally: connection.close()