1 Commits

Author SHA1 Message Date
skylanix
45347be85b Améliore le healthcheck Docker pour détecter les erreurs en temps réel 2025-10-07 02:22:50 +02:00
25 changed files with 194 additions and 3367 deletions

2
.gitignore vendored
View File

@@ -3,5 +3,5 @@
**/.venv
__pycache__
instance
logs
.tio.tokens.json
**/logs

View File

@@ -5,6 +5,7 @@ WORKDIR /app
ENV DEBIAN_FRONTEND=noninteractive
ENV LANG=fr_FR.UTF-8
ENV LC_ALL=fr_FR.UTF-8
ENV PYTHONUNBUFFERED=1
RUN apt-get update && apt-get install -y --no-install-recommends \
apt-utils \
@@ -34,7 +35,7 @@ RUN python3 -m venv /app/venv && \
chmod +x /start.sh && \
mkdir -p /app/logs
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
HEALTHCHECK --interval=1s --timeout=10s --start-period=5s --retries=3 \
CMD pgrep python > /dev/null && ! (tail -n 1000 $(ls -t /app/logs/*.log 2>/dev/null | head -1) 2>/dev/null | grep -iE "(ERROR|CRITICAL|Exception|sqlite3\.OperationalError)")
CMD ["/start.sh"]

196
README.md
View File

@@ -14,8 +14,6 @@
- [Prérequis](#prérequis)
- [Création du bot Discord](#création-du-bot-discord)
- [Démarrage rapide](#démarrage-rapide)
- [Build local (développement)](#build-local-développement)
- [Déploiement avec Portainer](#déploiement-avec-portainer)
- [Volumes persistants](#volumes-persistants)
- [Commandes Docker utiles](#commandes-docker-utiles)
- [Mise à jour](#mise-à-jour)
@@ -52,49 +50,12 @@ Mamie Henriette est un bot intelligent open-source développé spécifiquement p
- **Statuts dynamiques** : Rotation automatique des humeurs (10 min)
- **Notifications Humble Bundle** : Surveillance et alertes automatiques (30 min)
- **Commandes personnalisées** : Gestion via interface web
- **Recherche ProtonDB** :
- Commande `!protondb nom_du_jeu` ou `!pdb nom_du_jeu` pour vérifier la compatibilité Linux/Steam Deck
- Recherche intelligente avec support des alias de jeux
- Affichage du score de compatibilité, nombre de rapports et lien direct
- **Intégration anti-cheat** : Affiche automatiquement les systèmes anti-cheat et leur statut (supporté, cassé, refusé)
- Cache mis à jour automatiquement depuis AreWeAntiCheatYet
- **Modération** : Système complet de modération avec historique
- **Avertissements** : `!averto`, `!warn`, `!av`, `!avertissement`
- Envoi automatique de DM à l'utilisateur averti
- Support des timeouts combinés : `!warn @user raison --to durée`
- **Timeout** : `!timeout`, `!to` - Exclusion temporaire d'un utilisateur
- Syntaxe : `!to @user durée raison` (ex: `!to @User 10m Spam`)
- Durées supportées : secondes (s), minutes (m), heures (h), jours (j/days)
- **Gestion des avertissements** : `!delaverto`, `!removewarn`, `!delwarn`
- **Liste des événements** : `!warnings`, `!listevent`, `!listwarn`
- **Inspection utilisateur** : `!inspect @user`
- Historique complet des sanctions
- Date d'arrivée et durée sur le serveur
- Détection des comptes suspects (< 7 jours)
- Affichage du code d'invitation utilisé et de l'inviteur
- **Bannissement** : `!ban @user raison`, `!banlist`
- `!unban @user raison` ou `!unban #ID raison` (débannir par ID de sanction)
- Invitation automatique par DM lors du débannissement
- **Expulsion** : `!kick @user raison`
- **Annonces** : `!say #canal message` - Envoi de messages en tant que bot (staff uniquement)
- **Aide** : `!aide`, `!help` - Liste complète des commandes disponibles
- **Configuration avancée** :
- Support de multiples rôles staff
- Canal de logs dédié pour toutes les actions
- Suppression automatique des messages de modération (délai configurable)
- Activation/désactivation individuelle des fonctionnalités
- Panneau d'administration web pour consulter, éditer et supprimer l'historique
- **Messages de bienvenue et départ** :
- Messages personnalisables avec variables : `{member.mention}`, `{member.name}`, `{server.name}`, `{server.member_count}`
- **Système de tracking d'invitations** : Affiche qui a invité le nouveau membre
- **Messages de départ intelligents** : Détection automatique de la raison (volontaire, kick, ban)
- Affichage de la durée passée sur le serveur
- Embeds enrichis avec avatar et informations détaillées
- **Recherche ProtonDB** : Commande `!protondb <nom_du_jeu>` pour vérifier la compatibilité Linux/Steam Deck
- **Modération** : Outils intégrés
### Twitch
- **Chat bot** : Commandes et interactions automatiques
- **Alertes Live** :
- Surveillance automatique des streamers
- **Alertes Live** : Surveillance automatique des streamers (vérification toutes les 5 minutes)
- Support jusqu'à 100 chaînes simultanément
- Notifications Discord avec aperçu du stream
- Gestion via interface d'administration
@@ -106,23 +67,10 @@ Mamie Henriette est un bot intelligent open-source développé spécifiquement p
### Interface d'administration
- **Dashboard** : Vue d'ensemble et statistiques
- **Configuration** :
- Tokens Discord/Twitch et paramètres des plateformes
- Configuration ProtonDB (API Algolia)
- Gestion des rôles staff (support de multiples rôles)
- Activation/désactivation individuelle des fonctionnalités (modération, ban, kick, welcome, leave)
- Configuration du délai de suppression automatique des messages de modération
- **Gestion des humeurs** : Création et modification des statuts Discord rotatifs
- **Commandes** : Édition des commandes personnalisées multi-plateformes
- **Modération** :
- Consultation de l'historique complet des sanctions
- Édition des raisons des événements de modération
- Suppression d'événements de modération
- Filtrage et recherche dans l'historique
- **Messages de bienvenue/départ** :
- Personnalisation des messages avec variables dynamiques
- Configuration des canaux de bienvenue et départ
- Activation/désactivation indépendante
- **Configuration** : Tokens, paramètres des plateformes, configuration ProtonDB
- **Gestion des humeurs** : Création et modification des statuts
- **Commandes** : Édition des commandes personnalisées
- **Modération** : Outils de gestion communautaire
## Installation
@@ -167,122 +115,21 @@ Avant d'installer MamieHenriette, vous devez créer un bot Discord et obtenir so
```bash
# 1. Cloner le projet
git clone https://github.com/skylanix/MamieHenriette.git
```
```bash
cd MamieHenriette
```
```bash
# 2. Récupérer l'image depuis GitHub Container Registry et lancer
docker compose pull
docker compose up -d
```
> 📝 L'interface web sera accessible sur http://localhost:5000
>
> ⚠️ **Important** : Après configuration via l'interface web, **redémarrez le conteneur** pour que les changements soient pris en compte :
> ```bash
> docker compose restart MamieHenriette
> ```
### Build local (développement)
Si vous souhaitez modifier le code et builder l'image localement :
```bash
# 1. Cloner et accéder au projet
git clone https://github.com/skylanix/MamieHenriette.git
cd MamieHenriette
```
```bash
# 2. Modifier le docker-compose.yml
# Commentez la ligne 'image:' et décommentez la section 'build:' :
```
```yaml
services:
mamiehenriette:
container_name: MamieHenriette
restart: unless-stopped
build: . # ← Décommentez cette ligne
image: mamiehenriette # ← Décommentez cette ligne
# image: ghcr.io/skylanix/mamiehenriette:latest # ← Commentez cette ligne
# ... reste de la configuration
```
```bash
# 3. Builder et lancer
# 2. Lancer avec Docker
docker compose up --build -d
```
### Déploiement avec Portainer
Si vous utilisez Portainer pour gérer vos conteneurs Docker, voici la configuration Docker Compose à utiliser :
```yaml
services:
mamiehenriette:
container_name: MamieHenriette
image: ghcr.io/skylanix/mamiehenriette:latest
restart: unless-stopped
environment:
TZ: Europe/Paris
volumes:
# Adaptez ces chemins selon votre configuration
- ./instance:/app/instance
- ./logs:/app/logs
ports:
- 5000:5000
watchtower: # Mise à jour automatique de l'image
image: containrrr/watchtower:latest
container_name: watchtower
restart: unless-stopped
environment:
TZ: Europe/Paris
WATCHTOWER_INCLUDE: "MamieHenriette"
WATCHTOWER_SCHEDULE: "0 */30 * * * *" # Vérification toutes les 30 min
WATCHTOWER_MONITOR_ONLY: "false"
WATCHTOWER_CLEANUP: "true"
WATCHTOWER_INCLUDE_RESTARTING: "true"
# Décommentez pour activer les notifications Discord :
# WATCHTOWER_NOTIFICATION_URL: "discord://token@id"
# WATCHTOWER_NOTIFICATIONS: shoutrrr
volumes:
- /var/run/docker.sock:/var/run/docker.sock
# Décommentez pour accéder à la base de données via interface web (localhost:5001)
# sqlite-web:
# image: ghcr.io/coleifer/sqlite-web:latest
# container_name: sqlite_web
# ports:
# - "5001:8080"
# volumes:
# - ./instance/database.db:/data/database.db
# environment:
# - SQLITE_DATABASE=/data/database.db
```
**Étapes dans Portainer :**
1. **Accéder à Portainer** : Ouvrez votre interface Portainer (généralement http://votre-serveur:9000)
2. **Créer une Stack** :
- Allez dans "Stacks" → "Add stack"
- Donnez un nom : `MamieHenriette`
- Collez la configuration ci-dessus dans l'éditeur
3. **Adapter les chemins des volumes** :
- Modifiez `./instance` et `./logs` selon votre configuration
- Exemple : `/opt/containers/MamieHenriette/instance` et `/opt/containers/MamieHenriette/logs`
4. **Déployer** :
- Cliquez sur "Deploy the stack"
- Attendez que le conteneur démarre
5. **Accéder à l'interface** :
- Ouvrez http://votre-serveur:5000
- Configurez le bot via l'interface web
- Redémarrez le conteneur depuis Portainer après configuration
> ⚠️ **Important** : Après configuration via l'interface web http://localhost:5000, **redémarrez le conteneur** pour que les changements soient pris en compte :
> ```bash
> docker compose restart MamieHenriette
> ```
### Volumes persistants
- `./instance/` : Base de données SQLite et configuration
@@ -318,12 +165,10 @@ git pull origin main
# 3. Mettre à jour l'image Docker
docker compose pull
# 4. Relancer
docker compose up -d
# 4. Reconstruire et relancer
docker compose up --build -d
```
> 💡 **Note** : Si vous utilisez Watchtower, les mises à jour de l'image sont automatiques (vérification toutes les 30 minutes).
#### Sans Docker (installation locale)
```bash
# 1. Arrêter l'application
@@ -391,16 +236,13 @@ python run-web.py
## Spécifications techniques
### Base de données (SQLite)
- **Configuration** : Paramètres et tokens des plateformes, configuration des fonctionnalités
- **Configuration** : Paramètres et tokens des plateformes
- **Humeur** : Statuts Discord rotatifs avec gestion automatique
- **Commande** : Commandes personnalisées multi-plateformes (Discord/Twitch)
- **LiveAlert** : Configuration surveillance streamers Twitch (nom, canal Discord, statut)
- **GameAlias** : Alias pour améliorer les recherches ProtonDB
- **GameBundle** : Historique et notifications Humble Bundle
- **AntiCheatCache** : Cache des informations anti-cheat pour ProtonDB (mise à jour automatique hebdomadaire)
- **Message** : Messages automatiques périodiques
- **Moderation** : Historique complet des actions de modération (avertissements, timeouts, bans, kicks, unbans) avec raison, staff, timestamp et durée
- **MemberInvites** : Tracking des invitations (code d'invitation, inviteur, date de join)
- **Message** : Messages automatiques périodiques (implémenté)
### Architecture multi-thread
- **Thread 1** : Interface web Flask (port 5000) avec logging rotatif

View File

@@ -1,8 +1,6 @@
import logging
import json
import os
from sqlalchemy import event
from sqlalchemy.engine import Engine
from flask_sqlalchemy import SQLAlchemy
from sqlite3 import Cursor, Connection
@@ -11,35 +9,9 @@ from webapp import webapp
basedir = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
webapp.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{os.path.join(basedir, "instance", "database.db")}'
# Options moteur pour améliorer la concurrence SQLite
webapp.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'connect_args': {
'check_same_thread': False,
'timeout': 30
},
}
webapp.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(webapp)
# PRAGMA pour SQLite (WAL, busy timeout)
@event.listens_for(Engine, "connect")
def _set_sqlite_pragma(dbapi_connection, connection_record):
try:
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA synchronous=NORMAL;")
cursor.execute("PRAGMA busy_timeout=30000;")
cursor.close()
except Exception:
pass
def _tableExists(table_name:str, cursor:Cursor) -> bool:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,))
return cursor.fetchone() is not None
def _tableHaveColumn(table_name:str, column_name:str, cursor:Cursor) -> bool:
if not _tableExists(table_name, cursor):
return False
cursor.execute(f'PRAGMA table_info({table_name})')
columns = cursor.fetchall()
return any(col[1] == column_name for col in columns)
@@ -70,27 +42,6 @@ def _doPostImportMigration(cursor:Cursor):
cursor.execute('INSERT INTO game_bundle(url, name, json) VALUES (?, ?, ?)', (url, name, json.dumps(json_data)))
logging.info("suppression de la table temporaire game_bundle_old")
_dropTable('game_bundle_old', cursor)
if _tableExists('youtube_notification', cursor):
logging.info("Migration de la table youtube_notification: ajout des colonnes d'embed")
embed_columns = [
('embed_title', 'VARCHAR(256)'),
('embed_description', 'VARCHAR(2000)'),
('embed_color', 'VARCHAR(8) DEFAULT "FF0000"'),
('embed_footer', 'VARCHAR(2048)'),
('embed_author_name', 'VARCHAR(256)'),
('embed_author_icon', 'VARCHAR(512)'),
('embed_thumbnail', 'BOOLEAN DEFAULT 1'),
('embed_image', 'BOOLEAN DEFAULT 1')
]
for col_name, col_type in embed_columns:
if not _tableHaveColumn('youtube_notification', col_name, cursor):
try:
cursor.execute(f'ALTER TABLE youtube_notification ADD COLUMN {col_name} {col_type}')
logging.info(f"Colonne {col_name} ajoutée à youtube_notification")
except Exception as e:
logging.error(f"Impossible d'ajouter la colonne {col_name}: {e}")
raise
with webapp.app_context():
with open('database/schema.sql', 'r') as f:

View File

@@ -40,42 +40,3 @@ class Commande(db.Model):
trigger = db.Column(db.String(32), unique=True)
response = db.Column(db.String(2000))
class ModerationEvent(db.Model):
id = db.Column(db.Integer, primary_key=True)
type = db.Column(db.String(32))
username = db.Column(db.String(256))
discord_id = db.Column(db.String(64))
created_at = db.Column(db.DateTime)
reason = db.Column(db.String(1024))
staff_id = db.Column(db.String(64))
staff_name = db.Column(db.String(256))
duration = db.Column(db.Integer)
class AntiCheatCache(db.Model):
__tablename__ = 'anticheat_cache'
steam_id = db.Column(db.String(32), primary_key=True)
game_name = db.Column(db.String(256))
status = db.Column(db.String(32))
anticheats = db.Column(db.String(512))
reference = db.Column(db.String(512))
notes = db.Column(db.String(1024))
updated_at = db.Column(db.DateTime)
class YouTubeNotification(db.Model):
__tablename__ = 'youtube_notification'
id = db.Column(db.Integer, primary_key=True)
enable = db.Column(db.Boolean, default=True)
channel_id = db.Column(db.String(128))
notify_channel = db.Column(db.Integer)
message = db.Column(db.String(2000))
video_type = db.Column(db.String(16), default='all')
last_video_id = db.Column(db.String(128))
embed_title = db.Column(db.String(256))
embed_description = db.Column(db.String(2000))
embed_color = db.Column(db.String(8), default='FF0000')
embed_footer = db.Column(db.String(2048))
embed_author_name = db.Column(db.String(256))
embed_author_icon = db.Column(db.String(512))
embed_thumbnail = db.Column(db.Boolean, default=True)
embed_image = db.Column(db.Boolean, default=True)

View File

@@ -45,52 +45,3 @@ CREATE TABLE IF NOT EXISTS `commande` (
`trigger` VARCHAR(16) UNIQUE NOT NULL,
`response` VARCHAR(2000) NOT NULL
);
CREATE TABLE IF NOT EXISTS `moderation_event` (
id INTEGER PRIMARY KEY AUTOINCREMENT,
`type` VARCHAR(32) NOT NULL,
`username` VARCHAR(256) NOT NULL,
`discord_id` VARCHAR(64) NOT NULL,
`created_at` DATETIME NOT NULL,
`reason` VARCHAR(1024) NOT NULL,
`staff_id` VARCHAR(64) NOT NULL,
`staff_name` VARCHAR(256) NOT NULL,
`duration` INTEGER NULL
);
CREATE TABLE IF NOT EXISTS `anticheat_cache` (
steam_id VARCHAR(32) PRIMARY KEY,
game_name VARCHAR(256) NOT NULL,
status VARCHAR(32) NOT NULL,
anticheats VARCHAR(512),
reference VARCHAR(512),
notes VARCHAR(1024),
updated_at DATETIME NOT NULL
);
CREATE TABLE IF NOT EXISTS `member_invites` (
id INTEGER PRIMARY KEY AUTOINCREMENT,
`user_id` VARCHAR(64) NOT NULL,
`guild_id` VARCHAR(64) NOT NULL,
`invite_code` VARCHAR(256),
`inviter_name` VARCHAR(256),
`join_date` DATETIME NOT NULL
);
CREATE TABLE IF NOT EXISTS `youtube_notification` (
id INTEGER PRIMARY KEY AUTOINCREMENT,
`enable` BOOLEAN NOT NULL DEFAULT TRUE,
`channel_id` VARCHAR(128) NOT NULL,
`notify_channel` INTEGER NOT NULL,
`message` VARCHAR(2000) NOT NULL,
`video_type` VARCHAR(16) NOT NULL DEFAULT 'all',
`last_video_id` VARCHAR(128),
`embed_title` VARCHAR(256),
`embed_description` VARCHAR(2000),
`embed_color` VARCHAR(8) NOT NULL DEFAULT 'FF0000',
`embed_footer` VARCHAR(2048),
`embed_author_name` VARCHAR(256),
`embed_author_icon` VARCHAR(512),
`embed_thumbnail` BOOLEAN NOT NULL DEFAULT TRUE,
`embed_image` BOOLEAN NOT NULL DEFAULT TRUE
);

View File

@@ -6,23 +6,8 @@ import random
from database import db
from database.helpers import ConfigurationHelper
from database.models import Configuration, Humeur, Commande
from discord import Message, TextChannel, Member
from discord import Message, TextChannel
from discordbot.humblebundle import checkHumbleBundleAndNotify
from discordbot.moderation import (
handle_warning_command,
handle_remove_warning_command,
handle_list_warnings_command,
handle_ban_command,
handle_kick_command,
handle_unban_command,
handle_inspect_command,
handle_ban_list_command,
handle_staff_help_command,
handle_timeout_command,
handle_say_command
)
from discordbot.welcome import sendWelcomeMessage, sendLeaveMessage, updateInviteCache
from discordbot.youtube import checkYouTubeVideos
from protondb import searhProtonDb
class DiscordBot(discord.Client):
@@ -31,12 +16,8 @@ class DiscordBot(discord.Client):
for c in self.get_all_channels() :
logging.info(f'{c.id} {c.name}')
for guild in self.guilds:
await updateInviteCache(guild)
self.loop.create_task(self.updateStatus())
self.loop.create_task(self.updateHumbleBundle())
self.loop.create_task(self.updateYouTube())
async def updateStatus(self):
while not self.is_closed():
@@ -46,18 +27,14 @@ class DiscordBot(discord.Client):
if humeur != None:
logging.info(f'Changement de statut : {humeur.text}')
await self.change_presence(status = discord.Status.online, activity = discord.CustomActivity(humeur.text))
# 10 minutes TODO à rendre configurable
await asyncio.sleep(10*60)
async def updateHumbleBundle(self):
while not self.is_closed():
await checkHumbleBundleAndNotify(self)
# toutes les 30 minutes
await asyncio.sleep(30*60)
async def updateYouTube(self):
while not self.is_closed():
await checkYouTubeVideos()
# Vérification toutes les 5 minutes (comme pour Twitch)
await asyncio.sleep(5*60)
def getAllTextChannel(self) -> list[TextChannel]:
channels = []
@@ -65,34 +42,20 @@ class DiscordBot(discord.Client):
if isinstance(channel, TextChannel):
channels.append(channel)
return channels
def getAllRoles(self):
guilds_roles = []
for guild in self.guilds:
roles = []
for role in guild.roles:
if role.name != "@everyone":
roles.append(role)
if roles:
guilds_roles.append({
'guild_name': guild.name,
'guild_id': guild.id,
'roles': roles
})
return guilds_roles
def begin(self) :
def begin(self) :
token = Configuration.query.filter_by(key='discord_token').first()
if token and token.value and token.value.strip():
self.run(token.value)
if token :
try:
self.run(token.value)
except Exception as e:
logging.error(f'Erreur fatale lors du démarrage du bot Discord : {e}')
else :
logging.error('Aucun token Discord configuré. Le bot ne peut pas être démarré')
intents = discord.Intents.default()
intents.message_content = True
intents.members = True
intents.invites = True
bot = DiscordBot(intents=intents)
# https://discordpy.readthedocs.io/en/stable/quickstart.html
@@ -103,172 +66,37 @@ async def on_message(message: Message):
if not message.content.startswith('!'):
return
command_name = message.content.split()[0]
if ConfigurationHelper().getValue('moderation_enable'):
if command_name in ['!averto', '!av', '!avertissement', '!warn']:
await handle_warning_command(message, bot)
return
if command_name in ['!to', '!timeout']:
await handle_timeout_command(message, bot)
return
if command_name in ['!delaverto', '!removewarn', '!unwarn']:
await handle_remove_warning_command(message, bot)
return
if command_name in ['!listevent', '!listwarn', '!warnings']:
await handle_list_warnings_command(message, bot)
return
if ConfigurationHelper().getValue('moderation_ban_enable'):
if command_name == '!ban':
await handle_ban_command(message, bot)
return
if command_name == '!unban':
await handle_unban_command(message, bot)
return
if command_name == '!banlist':
await handle_ban_list_command(message, bot)
return
if ConfigurationHelper().getValue('moderation_kick_enable'):
if command_name == '!kick':
await handle_kick_command(message, bot)
return
if ConfigurationHelper().getValue('moderation_enable'):
if command_name == '!inspect':
await handle_inspect_command(message, bot)
return
if command_name == '!say':
await handle_say_command(message, bot)
return
if command_name in ['!aide', '!help']:
await handle_staff_help_command(message, bot)
return
commande = Commande.query.filter_by(discord_enable=True, trigger=command_name).first()
if commande:
try:
await message.channel.send(commande.response, suppress_embeds=True)
await asyncio.wait_for(message.channel.send(commande.response, suppress_embeds=True), timeout=30.0)
return
except asyncio.TimeoutError:
logging.error(f'Timeout lors de l\'envoi de la commande Discord : {command_name}')
except Exception as e:
logging.error(f'Échec de l\'exécution de la commande Discord : {e}')
if (ConfigurationHelper().getValue('proton_db_enable_enable') and (message.content.startswith('!protondb') or message.content.startswith('!pdb'))):
if(ConfigurationHelper().getValue('proton_db_enable_enable') and message.content.find('!protondb')==0) :
if (message.content.find('<@')>0) :
mention = message.content[message.content.find('<@'):]
else :
mention = message.author.mention
name = message.content
if name.startswith('!protondb'):
name = name.replace('!protondb', '', 1)
elif name.startswith('!pdb'):
name = name.replace('!pdb', '', 1)
name = name.replace(f'{mention}', '').strip();
if not name or len(name) == 0:
try:
await message.delete()
delete_time = ConfigurationHelper().getIntValue('proton_db_delete_time') or 10
help_msg = await message.channel.send(
f"{mention} ⚠️ Utilisation: `!pdb nom du jeu` ou `!protondb nom du jeu`\n"
f"Exemple: `!pdb Elden Ring`",
suppress_embeds=True
)
await asyncio.sleep(delete_time)
await help_msg.delete()
except Exception as e:
logging.error(f"Échec de la gestion du message d'aide ProtonDB : {e}")
return
try:
searching_msg = await message.channel.send(f"🔍 Recherche en cours pour **{name}**...")
games = searhProtonDb(name)
await searching_msg.delete()
except:
games = searhProtonDb(name)
name = message.content.replace('!protondb', '').replace(f'{mention}', '').strip();
games = searhProtonDb(name)
if (len(games)==0) :
msg = f'{mention} Je n\'ai pas trouvé de jeux correspondant à **{name}**. Es-tu sûr que le jeu est disponible sur Steam ?'
try:
await message.channel.send(msg, suppress_embeds=True)
except Exception as e:
logging.error(f"Échec de l'envoi du message ProtonDB : {e}")
return
total_games = len(games)
tier_colors = {'platinum': '🟣', 'gold': '🟡', 'silver': '', 'bronze': '🟤', 'borked': '🔴'}
content = ""
max_games = 15
for count, game in enumerate(games[:max_games]):
g_name = str(game.get('name'))
g_id = str(game.get('id'))
tier = str(game.get('tier') or 'N/A').lower()
tier_icon = tier_colors.get(tier, '')
new_entry = f"**[{g_name}](<https://www.protondb.com/app/{g_id}>)**\n{tier_icon} Classé **{tier.capitalize()}**"
ac_status = game.get('anticheat_status')
if ac_status:
status_lower = str(ac_status).lower()
ac_map = {
'supported': ('', 'Supporté'),
'running': ('⚠️', 'Fonctionne'),
'broken': ('', 'Cassé'),
'denied': ('🚫', 'Refusé'),
'planned': ('📅', 'Planifié')
}
ac_emoji, ac_label = ac_map.get(status_lower, ('', str(ac_status)))
acs = game.get('anticheats') or []
ac_list = ', '.join([str(ac) for ac in acs if ac])
new_entry += f" • [Anti-cheat {ac_emoji} {ac_label}"
if ac_list:
new_entry += f" ({ac_list})"
new_entry += f"](<https://areweanticheatyet.com/game/{g_id}>)"
new_entry += "\n\n"
# Vérifier la limite avant d'ajouter
if len(content) + len(new_entry) > 3900:
rest = len(games) - count
content += f"*... et {rest} autre{'s' if rest > 1 else ''} jeu{'x' if rest > 1 else ''}*"
break
content += new_entry
else:
rest = max(0, len(games) - max_games)
if rest > 0:
content += f"*... et {rest} autre{'s' if rest > 1 else ''} jeu{'x' if rest > 1 else ''}*"
embed = discord.Embed(
title=f"🎮 Résultats ProtonDB - **{total_games} jeu{'x' if total_games > 1 else ''} trouvé{'s' if total_games > 1 else ''}**",
description=content,
color=0x5865F2
)
else :
msg = f'{mention} J\'ai trouvé {len(games)} jeux :\n'
ite = iter(games)
while (game := next(ite, None)) is not None and len(msg) < 1850 :
msg += f'- [{game.get('name')}](https://www.protondb.com/app/{game.get('id')}) classé **{game.get('tier')}**\n'
rest = sum(1 for _ in ite)
if (rest > 0):
msg += f'- et encore {rest} autres jeux'
try :
await message.channel.send(embed=embed)
await asyncio.wait_for(message.channel.send(msg, suppress_embeds=True), timeout=30.0)
except asyncio.TimeoutError:
logging.error(f'Timeout lors de l\'envoi du message ProtonDB')
except Exception as e:
logging.error(f"Échec de l'envoi de l'embed ProtonDB : {e}")
@bot.event
async def on_member_join(member: Member):
await sendWelcomeMessage(bot, member)
@bot.event
async def on_member_remove(member: Member):
await sendLeaveMessage(bot, member)
@bot.event
async def on_invite_create(invite):
await updateInviteCache(invite.guild)
@bot.event
async def on_invite_delete(invite):
await updateInviteCache(invite.guild)
logging.error(f'Échec de l\'envoi du message ProtonDB : {e}')

View File

@@ -1,3 +1,4 @@
import asyncio
import datetime
import logging
import json
@@ -14,11 +15,18 @@ def _isEnable():
return helper.getValue('humble_bundle_enable') and helper.getIntValue('humble_bundle_channel') != 0
def _callGithub():
response = requests.get("https://raw.githubusercontent.com/shionn/HumbleBundleGamePack/refs/heads/master/data/game-bundles.json")
if response.status_code == 200:
return response.json()
logging.error(f"Échec de la connexion à la ressource Humble Bundle. Code de statut HTTP : {response.status_code}")
return None
try:
response = requests.get("https://raw.githubusercontent.com/shionn/HumbleBundleGamePack/refs/heads/master/data/game-bundles.json", timeout=30)
if response.status_code == 200:
return response.json()
logging.error(f"Échec de la connexion à la ressource Humble Bundle. Code de statut HTTP : {response.status_code}")
return None
except (requests.exceptions.SSLError, requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
logging.error(f"Erreur de connexion à la ressource Humble Bundle : {e}")
return None
except Exception as e:
logging.error(f"Erreur inattendue lors de la récupération des bundles : {e}")
return None
def _isNotAlreadyNotified(bundle):
return GameBundle.query.filter_by(url=bundle['url']).first() == None
@@ -46,9 +54,14 @@ async def checkHumbleBundleAndNotify(bot: Client):
bundle = _findFirstNotNotified(bundles)
if bundle != None :
message = _formatMessage(bundle)
await bot.get_channel(ConfigurationHelper().getIntValue('humble_bundle_channel')).send(message)
db.session.add(GameBundle(url=bundle['url'], name=bundle['name'], json = json.dumps(bundle)))
db.session.commit()
try:
await asyncio.wait_for(bot.get_channel(ConfigurationHelper().getIntValue('humble_bundle_channel')).send(message), timeout=30.0)
db.session.add(GameBundle(url=bundle['url'], name=bundle['name'], json = json.dumps(bundle)))
db.session.commit()
except asyncio.TimeoutError:
logging.error(f'Timeout lors de l\'envoi du message Humble Bundle')
except Exception as send_error:
logging.error(f'Erreur lors de l\'envoi du message Humble Bundle : {send_error}')
except Exception as e:
logging.error(f"Échec de la vérification des offres Humble Bundle : {e}")
else:

File diff suppressed because it is too large Load Diff

View File

@@ -1,198 +0,0 @@
import discord
import logging
from database.helpers import ConfigurationHelper
from discord import Member, TextChannel
from datetime import datetime, timezone
invite_cache = {}
def replaceMessageVariables(message: str, member: Member) -> str:
replacements = {
'{member.mention}': member.mention,
'{member.name}': member.name,
'{member.display_name}': member.display_name,
'{member.id}': str(member.id),
'{server.name}': member.guild.name,
'{server.member_count}': str(member.guild.member_count)
}
for variable, value in replacements.items():
message = message.replace(variable, value)
return message
async def updateInviteCache(guild):
try:
invites = await guild.invites()
invite_cache[guild.id] = {invite.code: invite.uses for invite in invites}
except:
pass
async def getUsedInvite(guild):
try:
new_invites = await guild.invites()
for invite in new_invites:
old_uses = invite_cache.get(guild.id, {}).get(invite.code, 0)
if invite.uses > old_uses:
await updateInviteCache(guild)
invite_code = invite.code
inviter_name = invite.inviter.name if invite.inviter else None
display_text = f'`{invite_code}`'
if inviter_name:
display_text += f' (créée par {inviter_name})'
return (invite_code, inviter_name, display_text)
await updateInviteCache(guild)
except:
pass
return (None, None, 'Inconnue')
async def sendWelcomeMessage(bot: discord.Client, member: Member):
config = ConfigurationHelper()
if not config.getValue('welcome_enable'):
return
channel_id = config.getIntValue('welcome_channel_id')
if not channel_id:
logging.warning('Canal de bienvenue non configuré')
return
channel = bot.get_channel(channel_id)
if not channel or not isinstance(channel, TextChannel):
logging.error(f'Canal de bienvenue {channel_id} introuvable')
return
welcome_message = config.getValue('welcome_message')
if not welcome_message:
welcome_message = 'Bienvenue sur le serveur !'
welcome_message = replaceMessageVariables(welcome_message, member)
invite_code, inviter_name, invite_display = await getUsedInvite(member.guild)
try:
from database import db
from sqlalchemy import text
db.session.execute(
text("INSERT INTO member_invites (user_id, guild_id, invite_code, inviter_name, join_date) VALUES (:user_id, :guild_id, :invite_code, :inviter_name, :join_date)"),
{
'user_id': str(member.id),
'guild_id': str(member.guild.id),
'invite_code': invite_code,
'inviter_name': inviter_name,
'join_date': datetime.now(timezone.utc)
}
)
db.session.commit()
except Exception as e:
logging.error(f'Échec de la sauvegarde de l\'invitation : {e}')
embed = discord.Embed(
title='🎉 Nouveau membre !',
description=welcome_message,
color=discord.Color.green()
)
embed.set_thumbnail(url=member.display_avatar.url)
embed.add_field(name='Membre', value=member.name, inline=True)
embed.add_field(name='Nombre de membres', value=str(member.guild.member_count), inline=True)
embed.add_field(name='Invitation utilisée', value=invite_display, inline=False)
embed.set_footer(text=f'ID: {member.id}')
try:
message = await channel.send(embed=embed)
logging.info(f'Message de bienvenue envoyé pour {member.name}')
now = datetime.now(timezone.utc)
account_age = (now - member.created_at).days
if account_age < 7:
await message.add_reaction('⚠️')
logging.info(f'Réaction warning ajoutée pour {member.name} (compte créé il y a {account_age} jours)')
except Exception as e:
logging.error(f'Échec de l\'envoi du message de bienvenue : {e}')
def formatDuration(seconds: int) -> str:
days = seconds // 86400
hours = (seconds % 86400) // 3600
minutes = (seconds % 3600) // 60
parts = []
if days > 0:
parts.append(f'{days} jour{"s" if days > 1 else ""}')
if hours > 0:
parts.append(f'{hours} heure{"s" if hours > 1 else ""}')
if minutes > 0:
parts.append(f'{minutes} minute{"s" if minutes > 1 else ""}')
if not parts:
return 'moins d\'une minute'
return ' et '.join(parts)
async def sendLeaveMessage(bot: discord.Client, member: Member):
config = ConfigurationHelper()
if not config.getValue('leave_enable'):
return
channel_id = config.getIntValue('leave_channel_id')
if not channel_id:
logging.warning('Canal de départ non configuré')
return
channel = bot.get_channel(channel_id)
if not channel or not isinstance(channel, TextChannel):
logging.error(f'Canal de départ {channel_id} introuvable')
return
leave_message = config.getValue('leave_message')
if not leave_message:
leave_message = 'Un membre a quitté le serveur.'
leave_message = replaceMessageVariables(leave_message, member)
now = datetime.now(timezone.utc)
duration_seconds = int((now - member.joined_at).total_seconds()) if member.joined_at else 0
duration_text = formatDuration(duration_seconds)
reason = 'Départ volontaire'
try:
async for entry in member.guild.audit_logs(limit=5):
if not (entry.target and entry.target.id == member.id):
continue
time_diff = (now - entry.created_at).total_seconds()
if time_diff > 3:
continue
if entry.action == discord.AuditLogAction.kick:
reason = f'Expulsé par {entry.user.mention}'
if entry.reason:
reason += f' - Raison: {entry.reason}'
break
elif entry.action == discord.AuditLogAction.ban:
reason = f'Banni par {entry.user.mention}'
if entry.reason:
reason += f' - Raison: {entry.reason}'
break
except:
pass
embed = discord.Embed(
title='👋 Membre parti',
description=leave_message,
color=discord.Color.red()
)
embed.set_thumbnail(url=member.display_avatar.url)
embed.add_field(name='Membre', value=f'**{member.name}**', inline=True)
embed.add_field(name='Nombre de membres', value=str(member.guild.member_count), inline=True)
embed.add_field(name='Temps sur le serveur', value=duration_text, inline=False)
embed.set_footer(text=f'ID: {member.id}')
try:
await channel.send(embed=embed)
logging.info(f'Message de départ envoyé pour {member.name}')
except Exception as e:
logging.error(f'Échec de l\'envoi du message de départ : {e}')

View File

@@ -1,225 +0,0 @@
import logging
import asyncio
import xml.etree.ElementTree as ET
import requests
from database import db
from database.models import YouTubeNotification
from webapp import webapp
logger = logging.getLogger('youtube-notification')
logger.setLevel(logging.INFO)
async def checkYouTubeVideos():
with webapp.app_context():
try:
notifications: list[YouTubeNotification] = YouTubeNotification.query.filter_by(enable=True).all()
for notification in notifications:
try:
await _checkChannelVideos(notification)
except Exception as e:
logger.error(f"Erreur lors de la vérification de la chaîne {notification.channel_id}: {e}")
continue
except Exception as e:
logger.error(f"Erreur lors de la vérification YouTube: {e}")
async def _checkChannelVideos(notification: YouTubeNotification):
try:
channel_id = notification.channel_id
rss_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}"
response = await asyncio.to_thread(requests.get, rss_url, timeout=10)
if response.status_code != 200:
logger.error(f"Erreur HTTP {response.status_code} lors de la récupération du RSS pour {channel_id}")
return
root = ET.fromstring(response.content)
ns = {'atom': 'http://www.w3.org/2005/Atom', 'yt': 'http://www.youtube.com/xml/schemas/2015', 'media': 'http://search.yahoo.com/mrss/'}
entries = root.findall('atom:entry', ns)
if not entries:
logger.warning(f"Aucune vidéo trouvée dans le RSS pour {channel_id}")
return
videos = []
for entry in entries:
video_id = entry.find('yt:videoId', ns)
if video_id is None:
continue
video_id = video_id.text
title_elem = entry.find('atom:title', ns)
video_title = title_elem.text if title_elem is not None else 'Sans titre'
link_elem = entry.find('atom:link', ns)
video_url = link_elem.get('href') if link_elem is not None else f"https://www.youtube.com/watch?v={video_id}"
published_elem = entry.find('atom:published', ns)
published_at = published_elem.text if published_elem is not None else ''
author_elem = entry.find('atom:author/atom:name', ns)
channel_name = author_elem.text if author_elem is not None else 'Inconnu'
thumbnail = None
media_thumbnail = entry.find('media:group/media:thumbnail', ns)
if media_thumbnail is not None:
thumbnail = media_thumbnail.get('url')
is_short = False
if video_title and ('#shorts' in video_title.lower() or '#short' in video_title.lower()):
is_short = True
if notification.video_type == 'all':
videos.append((video_id, {
'title': video_title,
'url': video_url,
'published': published_at,
'channel_name': channel_name,
'thumbnail': thumbnail,
'is_short': is_short
}))
elif notification.video_type == 'short' and is_short:
videos.append((video_id, {
'title': video_title,
'url': video_url,
'published': published_at,
'channel_name': channel_name,
'thumbnail': thumbnail,
'is_short': is_short
}))
elif notification.video_type == 'video' and not is_short:
videos.append((video_id, {
'title': video_title,
'url': video_url,
'published': published_at,
'channel_name': channel_name,
'thumbnail': thumbnail,
'is_short': is_short
}))
videos.sort(key=lambda x: x[1]['published'], reverse=True)
if videos:
latest_video_id, latest_video = videos[0]
if not notification.last_video_id:
notification.last_video_id = latest_video_id
db.session.commit()
return
if latest_video_id != notification.last_video_id:
logger.info(f"Nouvelle vidéo détectée: {latest_video_id} pour la chaîne {notification.channel_id}")
await _notifyVideo(notification, latest_video, latest_video_id)
notification.last_video_id = latest_video_id
db.session.commit()
except Exception as e:
logger.error(f"Erreur lors de la vérification des vidéos: {e}")
async def _notifyVideo(notification: YouTubeNotification, video_data: dict, video_id: str):
from discordbot import bot
try:
channel_name = video_data.get('channel_name', 'Inconnu')
video_title = video_data.get('title', 'Sans titre')
video_url = video_data.get('url', f"https://www.youtube.com/watch?v={video_id}")
thumbnail = video_data.get('thumbnail', '')
published_at = video_data.get('published', '')
is_short = video_data.get('is_short', False)
try:
message = notification.message.format(
channel_name=channel_name or 'Inconnu',
video_title=video_title or 'Sans titre',
video_url=video_url,
video_id=video_id,
thumbnail=thumbnail or '',
published_at=published_at or '',
is_short=is_short
)
except KeyError as e:
logger.error(f"Variable manquante dans le message de notification: {e}")
message = f"🎥 Nouvelle vidéo de {channel_name}: [{video_title}]({video_url})"
logger.info(f"Envoi de notification YouTube: {message}")
bot.loop.create_task(_sendMessage(notification, message, video_url, thumbnail, video_title, channel_name, video_id, published_at, is_short))
except Exception as e:
logger.error(f"Erreur lors de la notification: {e}")
def _format_embed_text(text: str, channel_name: str, video_title: str, video_url: str, video_id: str, thumbnail: str, published_at: str, is_short: bool) -> str:
"""Formate un texte d'embed avec les variables disponibles"""
if not text:
return None
try:
return text.format(
channel_name=channel_name or 'Inconnu',
video_title=video_title or 'Sans titre',
video_url=video_url,
video_id=video_id,
thumbnail=thumbnail or '',
published_at=published_at or '',
is_short=is_short
)
except KeyError:
return text
async def _sendMessage(notification: YouTubeNotification, message: str, video_url: str, thumbnail: str, video_title: str, channel_name: str, video_id: str, published_at: str, is_short: bool):
from discordbot import bot
try:
discord_channel = bot.get_channel(notification.notify_channel)
if not discord_channel:
logger.error(f"Canal Discord {notification.notify_channel} introuvable")
return
import discord
embed_title = _format_embed_text(notification.embed_title, channel_name, video_title, video_url, video_id, thumbnail, published_at, is_short) if notification.embed_title else video_title
embed_description = _format_embed_text(notification.embed_description, channel_name, video_title, video_url, video_id, thumbnail, published_at, is_short) if notification.embed_description else None
try:
embed_color = int(notification.embed_color or 'FF0000', 16)
except ValueError:
embed_color = 0xFF0000
embed = discord.Embed(
title=embed_title,
url=video_url,
color=embed_color
)
if embed_description:
embed.description = embed_description
author_name = _format_embed_text(notification.embed_author_name, channel_name, video_title, video_url, video_id, thumbnail, published_at, is_short) if notification.embed_author_name else channel_name
author_icon = notification.embed_author_icon if notification.embed_author_icon else "https://www.youtube.com/img/desktop/yt_1200.png"
embed.set_author(name=author_name, icon_url=author_icon)
if notification.embed_thumbnail and thumbnail:
embed.set_thumbnail(url=thumbnail)
if notification.embed_image and thumbnail:
embed.set_image(url=thumbnail)
if notification.embed_footer:
footer_text = _format_embed_text(notification.embed_footer, channel_name, video_title, video_url, video_id, thumbnail, published_at, is_short)
if footer_text:
embed.set_footer(text=footer_text)
if message and message.strip():
await discord_channel.send(message, embed=embed)
else:
await discord_channel.send(embed=embed)
logger.info(f"Notification YouTube envoyée avec succès")
except Exception as e:
logger.error(f"Erreur lors de l'envoi du message Discord: {e}")

View File

@@ -1,11 +1,11 @@
services:
mamiehenriette:
container_name: MamieHenriette # Nom du conteneur
image: ghcr.io/skylanix/mamiehenriette:latest # Image hébergée sur GitHub Container Registry
restart: unless-stopped # Redémarre automatiquement sauf si arrêté manuellement
# build: . # Build du conteneur à partir d'un Dockerfile local (décommentez si nécessaire)
# image: mamiehenriette # Build du conteneur à partir d'un Dockerfile local (décommentez si nécessaire)
image: ghcr.io/skylanix/mamiehenriette:latest # Image hébergée sur GitHub Container Registry (commentez si nécessaire)
environment:
TZ: Europe/Paris # Fuseau horaire
@@ -45,4 +45,4 @@ services:
# volumes:
# - ./instance/database.db:/data/database.db # Monte la base de données locale dans le conteneur
# environment:
# - SQLITE_DATABASE=/data/database.db # Chemin vers la base de données dans le conteneur
# - SQLITE_DATABASE=/data/database.db # Chemin vers la base de données dans le conteneur

View File

@@ -1,33 +1,45 @@
import logging
import requests
import re
import json
from datetime import datetime, timedelta
from algoliasearch.search.client import SearchClientSync, SearchConfig
from database import db
from database.helpers import ConfigurationHelper
from database.models import GameAlias, AntiCheatCache, Configuration
from sqlalchemy import desc, func
from database.models import GameAlias
from sqlalchemy import desc,func
def _call_algoliasearch(search_name:str):
config = SearchConfig(ConfigurationHelper().getValue('proton_db_api_id'),
ConfigurationHelper().getValue('proton_db_api_key'))
config.set_default_hosts()
client = SearchClientSync(config=config)
return client.search_single_index(index_name="steamdb",
search_params={
"query":search_name,
"facetFilters":[["appType:Game"]],
"hitsPerPage":50},
request_options= {'headers':{'Referer':'https://www.protondb.com/'}})
def _call_algoliasearch(search_name:str):
try:
config = SearchConfig(ConfigurationHelper().getValue('proton_db_api_id'),
ConfigurationHelper().getValue('proton_db_api_key'))
config.set_default_hosts()
client = SearchClientSync(config=config)
return client.search_single_index(index_name="steamdb",
search_params={
"query":search_name,
"facetFilters":[["appType:Game"]],
"hitsPerPage":50},
request_options= {
'headers':{'Referer':'https://www.protondb.com/'},
'timeout': 30
})
except Exception as e:
logging.error(f'Erreur lors de la recherche Algolia pour "{search_name}" : {e}')
return None
def _call_summary(id):
response = requests.get(f'http://jazzy-starlight-aeea19.netlify.app/api/v1/reports/summaries/{id}.json')
if (response.status_code == 200) :
return response.json()
logging.error(f'Échec de la récupération des données ProtonDB pour le jeu {id}. Code de statut HTTP : {response.status_code}')
return None
try:
response = requests.get(f'http://jazzy-starlight-aeea19.netlify.app/api/v1/reports/summaries/{id}.json', timeout=30)
if (response.status_code == 200) :
return response.json()
logging.error(f'Échec de la récupération des données ProtonDB pour le jeu {id}. Code de statut HTTP : {response.status_code}')
return None
except (requests.exceptions.SSLError, requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
logging.error(f'Erreur de connexion ProtonDB pour le jeu {id} : {e}')
return None
except Exception as e:
logging.error(f'Erreur inattendue lors de la récupération ProtonDB pour le jeu {id} : {e}')
return None
def _is_name_match(name:str, search_name:str) -> bool:
normalized_game_name = re.sub("[^a-z0-9]", "", name.lower())
@@ -39,131 +51,12 @@ def _apply_game_aliases(search_name:str) -> str:
search_name = re.sub(re.escape(alias.alias), alias.name, search_name, flags=re.IGNORECASE)
return search_name
def _should_update_anticheat_cache() -> bool:
try:
last_update_conf = Configuration.query.filter_by(key='anticheat_last_update').first()
if not last_update_conf:
return True
try:
last_update = datetime.fromisoformat(last_update_conf.value)
return datetime.now() - last_update > timedelta(days=7)
except:
return True
except Exception as e:
logging.error(f'Erreur lors de la vérification du cache anti-cheat: {e}')
return False
def _fetch_anticheat_data():
try:
url = 'https://raw.githubusercontent.com/AreWeAntiCheatYet/AreWeAntiCheatYet/master/games.json'
response = requests.get(url, timeout=10)
if response.status_code == 200:
return response.json()
else:
logging.error(f'Échec de la récupération des données anti-cheat. Code HTTP: {response.status_code}')
return None
except Exception as e:
logging.error(f'Erreur lors de la récupération des données anti-cheat: {e}')
return None
def _update_anticheat_cache_if_needed():
try:
if not _should_update_anticheat_cache():
return
logging.info('Mise à jour du cache anti-cheat...')
anticheat_data = _fetch_anticheat_data()
if not anticheat_data:
return
for game in anticheat_data:
try:
steam_id = str(game.get('storeIds', {}).get('steam', ''))
if not steam_id or steam_id == '0':
continue
cache_entry = AntiCheatCache.query.filter_by(steam_id=steam_id).first()
status = game.get('status', 'Unknown')
anticheats_list = game.get('anticheats', [])
anticheats_str = json.dumps(anticheats_list) if anticheats_list else None
reference = game.get('reference', '')
notes_data = game.get('notes', '')
if isinstance(notes_data, list):
notes = json.dumps(notes_data)
else:
notes = str(notes_data) if notes_data else ''
game_name = game.get('name', '')
if cache_entry:
cache_entry.game_name = game_name
cache_entry.status = status
cache_entry.anticheats = anticheats_str
cache_entry.reference = reference
cache_entry.notes = notes
cache_entry.updated_at = datetime.now()
else:
cache_entry = AntiCheatCache(
steam_id=steam_id,
game_name=game_name,
status=status,
anticheats=anticheats_str,
reference=reference,
notes=notes,
updated_at=datetime.now()
)
db.session.add(cache_entry)
except Exception as e:
logging.error(f'Erreur lors de la mise à jour du jeu {game.get("name")}: {e}')
continue
last_update_conf = Configuration.query.filter_by(key='anticheat_last_update').first()
if last_update_conf:
last_update_conf.value = datetime.now().isoformat()
else:
last_update_conf = Configuration(key='anticheat_last_update', value=datetime.now().isoformat())
db.session.add(last_update_conf)
db.session.commit()
logging.info('Cache anti-cheat mis à jour avec succès')
except Exception as e:
try:
db.session.rollback()
except:
pass
logging.error(f'Erreur lors de la mise à jour du cache anti-cheat: {e}')
def _get_anticheat_info(steam_id: str) -> dict:
try:
cache_entry = AntiCheatCache.query.filter_by(steam_id=steam_id).first()
if not cache_entry:
return None
try:
anticheats = json.loads(cache_entry.anticheats) if cache_entry.anticheats else []
except:
anticheats = []
return {
'status': cache_entry.status,
'anticheats': anticheats,
'reference': cache_entry.reference,
'notes': cache_entry.notes
}
except Exception as e:
logging.error(f'Erreur lors de la récupération des infos anti-cheat pour {steam_id}: {e}')
return None
def searhProtonDb(search_name:str):
def searhProtonDb(search_name:str):
results = []
search_name = _apply_game_aliases(search_name)
try:
_update_anticheat_cache_if_needed()
except Exception as e:
logging.error(f'Erreur lors de la mise à jour du cache anti-cheat: {e}')
responses = _call_algoliasearch(search_name)
if responses is None:
return results
for hit in responses.model_dump().get('hits'):
id = hit.get('object_id')
name:str = hit.get('name')
@@ -172,27 +65,12 @@ def searhProtonDb(search_name:str):
summmary = _call_summary(id)
if (summmary != None) :
tier = summmary.get('tier')
anticheat_info = None
try:
anticheat_info = _get_anticheat_info(str(id))
except Exception as e:
logging.error(f'Erreur lors de la récupération anti-cheat pour {name}: {e}')
result = {
results.append({
'id':id,
'name' : name,
'tier' : tier
}
if anticheat_info:
result['anticheat_status'] = anticheat_info.get('status')
result['anticheats'] = anticheat_info.get('anticheats', [])
result['anticheat_reference'] = anticheat_info.get('reference')
result['anticheat_notes'] = anticheat_info.get('notes')
results.append(result)
logging.info(f'Trouvé {name}({id}) : {tier}' + (f' [Anti-cheat: {anticheat_info.get("status")}]' if anticheat_info else ''))
})
logging.info(f'Trouvé {name}({id}) : {tier}')
except Exception as e:
logging.error(f'Erreur lors du traitement du jeu {name} (ID: {id}) : {e}')
else:

View File

@@ -1,8 +1,6 @@
import locale
import logging
import threading
import os
from logging.handlers import RotatingFileHandler
from webapp import webapp
from discordbot import bot
@@ -25,40 +23,12 @@ def start_twitch_bot():
twitchBot.begin()
if __name__ == '__main__':
# Config logs (console + fichier avec rotation)
os.makedirs('logs', exist_ok=True)
log_formatter = logging.Formatter('%(asctime)s %(levelname)s [%(threadName)s] %(name)s: %(message)s')
handlers = []
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(log_formatter)
handlers.append(stream_handler)
file_handler = RotatingFileHandler('logs/app.log', maxBytes=5*1024*1024, backupCount=5, encoding='utf-8')
file_handler.setFormatter(log_formatter)
handlers.append(file_handler)
logging.basicConfig(level=logging.INFO, handlers=handlers)
# Calmer les logs verbeux de certaines libs si besoin
logging.getLogger('werkzeug').setLevel(logging.WARNING)
logging.getLogger('discord').setLevel(logging.WARNING)
# Hook exceptions non-capturées (threads inclus)
def _log_uncaught(exc_type, exc, tb):
logging.exception('Exception non capturée', exc_info=(exc_type, exc, tb))
import sys
sys.excepthook = _log_uncaught
if hasattr(threading, 'excepthook'):
def _thread_excepthook(args):
logging.exception(f"Exception dans le thread {args.thread.name}", exc_info=(args.exc_type, args.exc_value, args.exc_traceback))
threading.excepthook = _thread_excepthook
locale.setlocale(locale.LC_TIME, 'fr_FR.UTF-8')
jobs = []
jobs.append(threading.Thread(target=start_discord_bot, name='discord-bot'))
jobs.append(threading.Thread(target=start_server, name='web-server'))
jobs.append(threading.Thread(target=start_twitch_bot, name='twitch-bot'))
jobs.append(threading.Thread(target=start_discord_bot))
jobs.append(threading.Thread(target=start_server))
jobs.append(threading.Thread(target=start_twitch_bot))
for job in jobs:
job.start()
for job in jobs:
job.join()
for job in jobs: job.start()
for job in jobs: job.join()

View File

@@ -37,14 +37,16 @@ class TwitchBot() :
if _isConfigured() :
try :
helper = ConfigurationHelper()
self.twitch = await Twitch(helper.getValue('twitch_client_id'), helper.getValue('twitch_client_secret'))
await self.twitch.set_user_authentication(helper.getValue('twitch_access_token'), USER_SCOPE, helper.getValue('twitch_refresh_token'))
self.chat = await Chat(self.twitch)
self.twitch = await asyncio.wait_for(Twitch(helper.getValue('twitch_client_id'), helper.getValue('twitch_client_secret')), timeout=30.0)
await asyncio.wait_for(self.twitch.set_user_authentication(helper.getValue('twitch_access_token'), USER_SCOPE, helper.getValue('twitch_refresh_token')), timeout=30.0)
self.chat = await asyncio.wait_for(Chat(self.twitch), timeout=30.0)
self.chat.register_event(ChatEvent.READY, _onReady)
self.chat.register_event(ChatEvent.MESSAGE, _onMessage)
# chat.register_event(ChatEvent.SUB, on_sub)
self.chat.register_command('hello', _helloCommand)
self.chat.start()
except asyncio.TimeoutError:
logging.error('Timeout lors de la connexion à Twitch. Vérifiez votre connexion réseau.')
except Exception as e:
logging.error(f'Échec de l\'authentification Twitch. Vérifiez vos identifiants et redémarrez après correction : {e}')
else:

View File

@@ -1,3 +1,4 @@
import asyncio
import logging
from twitchAPI.twitch import Twitch
@@ -36,14 +37,24 @@ async def _notifyAlert(alert : LiveAlert, stream : Stream):
async def _sendMessage(channel : int, message : str) :
logger.info(f'Envoi de notification : {message}')
await bot.get_channel(channel).send(message)
logger.info(f'Notification envoyé')
try:
await asyncio.wait_for(bot.get_channel(channel).send(message), timeout=30.0)
logger.info(f'Notification envoyée')
except asyncio.TimeoutError:
logger.error(f'Timeout lors de l\'envoi de notification live alert')
except Exception as e:
logger.error(f'Erreur lors de l\'envoi de notification live alert : {e}')
async def _retreiveStreams(twitch: Twitch, alerts : list[LiveAlert]) -> list[Stream] :
streams : list[Stream] = []
logger.info(f'Recherche de streams pour : {alerts}')
async for stream in twitch.get_streams(user_login = [alert.login for alert in alerts]):
streams.append(stream)
logger.info(f'Ces streams sont en ligne : {streams}')
try:
async for stream in asyncio.wait_for(twitch.get_streams(user_login = [alert.login for alert in alerts]), timeout=30.0):
streams.append(stream)
logger.info(f'Ces streams sont en ligne : {streams}')
except asyncio.TimeoutError:
logger.error('Timeout lors de la récupération des streams Twitch')
except Exception as e:
logger.error(f'Erreur lors de la récupération des streams Twitch : {e}')
return streams

View File

@@ -2,4 +2,4 @@ from flask import Flask
webapp = Flask(__name__)
from webapp import commandes, configurations, index, humeurs, protondb, live_alert, twitch_auth, moderation, youtube
from webapp import commandes, configurations, index, humeurs, protondb, live_alert, twitch_auth

View File

@@ -6,37 +6,17 @@ from discordbot import bot
@webapp.route("/configurations")
def openConfigurations():
return render_template("configurations.html", configuration = ConfigurationHelper(), channels = bot.getAllTextChannel(), roles = bot.getAllRoles())
return render_template("configurations.html", configuration = ConfigurationHelper(), channels = bot.getAllTextChannel())
@webapp.route("/configurations/update", methods=['POST'])
def updateConfiguration():
checkboxes = {
'humble_bundle_enable': 'humble_bundle_channel',
'proton_db_enable_enable': 'proton_db_api_id',
'moderation_enable': 'moderation_staff_role_ids',
'moderation_ban_enable': 'moderation_staff_role_ids',
'moderation_kick_enable': 'moderation_staff_role_ids',
'welcome_enable': 'welcome_channel_id',
'leave_enable': 'leave_channel_id'
}
staff_roles = request.form.getlist('moderation_staff_role_ids')
if staff_roles:
ConfigurationHelper().createOrUpdate('moderation_staff_role_ids', ','.join(staff_roles))
else:
ConfigurationHelper().createOrUpdate('moderation_staff_role_ids', '')
for key in request.form:
if key == 'moderation_staff_role_ids':
continue
value = request.form.get(key)
if value and value.strip():
ConfigurationHelper().createOrUpdate(key, value)
for checkbox, reference_field in checkboxes.items():
if request.form.get(reference_field) is not None and request.form.get(checkbox) is None:
ConfigurationHelper().createOrUpdate(checkbox, False)
for key in request.form :
ConfigurationHelper().createOrUpdate(key, request.form.get(key))
# Je fais ça car HTML n'envoie pas le paramètre de checkbox quand il est décoché
if (request.form.get("humble_bundle_channel") != None and request.form.get("humble_bundle_enable") == None) :
ConfigurationHelper().createOrUpdate('humble_bundle_enable', False)
if (request.form.get("proton_db_api_id") != None and request.form.get("proton_db_enable_enable") == None) :
ConfigurationHelper().createOrUpdate('proton_db_enable_enable', False)
db.session.commit()
return redirect(request.referrer)

View File

@@ -1,30 +0,0 @@
from flask import render_template, request, redirect, url_for
from webapp import webapp
from database import db
from database.models import ModerationEvent
@webapp.route("/moderation")
def moderation():
events = ModerationEvent.query.order_by(ModerationEvent.created_at.desc()).all()
return render_template("moderation.html", events=events, event=None)
@webapp.route("/moderation/edit/<int:event_id>")
def open_edit_moderation_event(event_id):
event = ModerationEvent.query.get_or_404(event_id)
events = ModerationEvent.query.order_by(ModerationEvent.created_at.desc()).all()
return render_template("moderation.html", events=events, event=event)
@webapp.route("/moderation/update/<int:event_id>", methods=['POST'])
def update_moderation_event(event_id):
event = ModerationEvent.query.get_or_404(event_id)
event.reason = request.form.get('reason')
db.session.commit()
return redirect(url_for('moderation'))
@webapp.route("/moderation/delete/<int:event_id>")
def delete_moderation_event(event_id):
event = ModerationEvent.query.get_or_404(event_id)
db.session.delete(event)
db.session.commit()
return redirect(url_for('moderation'))

View File

@@ -2,192 +2,14 @@
{% block content %}
<h1>Configuration de Mamie</h1>
<p>Configurez les tokens Discord, les notifications Humble Bundle et l'API Twitch.</p>
<p>Configurez les tokens Discord, les notifications Humble Bundle et l'API ProtonDB pour la commande !protondb.</p>
<h2>Discord</h2>
<h2>API Discord</h2>
<form action="{{ url_for('updateConfiguration') }}" method="POST">
<fieldset>
<legend>API Discord</legend>
<label for="discord_token">Token Discord (caché)</label>
<input name="discord_token" type="password" placeholder="Votre token Discord" />
<small>Nécessite un redémarrage après modification</small>
</fieldset>
<fieldset>
<legend>Messages de bienvenue</legend>
<label for="welcome_enable">
<input type="checkbox" name="welcome_enable" {% if configuration.getValue('welcome_enable') %}checked="checked"{% endif %}>
Activer le message de bienvenue pour les nouveaux membres
</label>
<label for="welcome_channel_id">Canal de bienvenue</label>
<select name="welcome_channel_id">
{% for channel in channels %}
<option value="{{channel.id}}" {% if configuration.getIntValue('welcome_channel_id')==channel.id %}selected="selected"{% endif %}>
{{channel.name}}
</option>
{% endfor %}
</select>
<label for="welcome_message">Message personnalisé de bienvenue</label>
<textarea name="welcome_message" rows="3" placeholder="Bienvenue {member.mention} sur le serveur !">{{ configuration.getValue('welcome_message') }}</textarea>
<small>
<strong>Syntaxes disponibles :</strong><br>
<code>{member.mention}</code> - Mentionne l'utilisateur (@NomUtilisateur)<br>
<code>{member.name}</code> - Nom d'utilisateur (sans mention)<br>
<code>{member.display_name}</code> - Surnom sur le serveur<br>
<code>{member.id}</code> - ID de l'utilisateur<br>
<code>{server.name}</code> - Nom du serveur<br>
<code>{server.member_count}</code> - Nombre total de membres<br>
<code>&lt;#ID_DU_CHANNEL&gt;</code> - Mentionne un salon (ex: &lt;#123456789012345678&gt;)
</small>
</fieldset>
<fieldset>
<legend>Messages de départ</legend>
<label for="leave_enable">
<input type="checkbox" name="leave_enable" {% if configuration.getValue('leave_enable') %}checked="checked"{% endif %}>
Activer le message de départ quand un membre quitte le serveur
</label>
<label for="leave_channel_id">Canal de départ</label>
<select name="leave_channel_id">
{% for channel in channels %}
<option value="{{channel.id}}" {% if configuration.getIntValue('leave_channel_id')==channel.id %}selected="selected"{% endif %}>
{{channel.name}}
</option>
{% endfor %}
</select>
<label for="leave_message">Message personnalisé de départ</label>
<textarea name="leave_message" rows="3" placeholder="{member.mention} a quitté le serveur.">{{ configuration.getValue('leave_message') }}</textarea>
<small>
<strong>Syntaxes disponibles :</strong><br>
<code>{member.mention}</code> - Mentionne l'utilisateur (@NomUtilisateur)<br>
<code>{member.name}</code> - Nom d'utilisateur (sans mention)<br>
<code>{member.display_name}</code> - Surnom sur le serveur<br>
<code>{member.id}</code> - ID de l'utilisateur<br>
<code>{server.name}</code> - Nom du serveur<br>
<code>{server.member_count}</code> - Nombre total de membres<br>
<code>&lt;#ID_DU_CHANNEL&gt;</code> - Mentionne un salon (ex: &lt;#123456789012345678&gt;)
</small>
</fieldset>
<fieldset>
<legend>Modération</legend>
<label for="moderation_enable">
<input type="checkbox" name="moderation_enable" {% if configuration.getValue('moderation_enable') %}checked="checked"{% endif %}>
Activer les commandes d'avertissement (!warn, !unwarn, !inspect)
</label>
<label for="moderation_ban_enable">
<input type="checkbox" name="moderation_ban_enable" {% if configuration.getValue('moderation_ban_enable') %}checked="checked"{% endif %}>
Activer les commandes de bannissement (!ban, !unban)
</label>
<label for="moderation_kick_enable">
<input type="checkbox" name="moderation_kick_enable" {% if configuration.getValue('moderation_kick_enable') %}checked="checked"{% endif %}>
Activer la commande d'expulsion (!kick)
</label>
<label for="moderation_log_channel_id">Canal de logs de modération</label>
<select name="moderation_log_channel_id">
{% for channel in channels %}
<option value="{{channel.id}}" {% if configuration.getIntValue('moderation_log_channel_id')==channel.id %}selected="selected"{% endif %}>
{{channel.name}}
</option>
{% endfor %}
</select>
<small>Toutes les actions de modération seront notifiées dans ce canal</small>
<label>Rôles Staff autorisés</label>
{% set selected_roles = (configuration.getValue('moderation_staff_role_ids') or '').split(',') %}
{% if roles|length > 1 %}
<div class="tabs">
{% for guild_data in roles %}
<button type="button" class="tab-button" onclick="openTab(event, 'guild-{{guild_data.guild_id}}')" {% if loop.first %}id="defaultOpen"{% endif %}>
{{ guild_data.guild_name }}
</button>
{% endfor %}
</div>
{% endif %}
{% for guild_data in roles %}
<div id="guild-{{guild_data.guild_id}}" class="tab-content" {% if not loop.first %}style="display: none;"{% endif %}>
<div style="max-height: 300px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; border-radius: 5px;">
{% for role in guild_data.roles %}
<label style="display: block; margin: 5px 0;">
<input type="checkbox" name="moderation_staff_role_ids" value="{{role.id}}" {% if role.id|string in selected_roles %}checked="checked"{% endif %}>
{% if role.color.value != 0 %}
<span style="color:#{{ '%06x' % role.color.value }}"></span>
{% else %}
<span></span>
{% endif %}
{{role.name}}
</label>
{% endfor %}
</div>
</div>
{% endfor %}
<small>Sélectionnez un ou plusieurs rôles qui peuvent utiliser les commandes de modération</small>
<script>
function openTab(evt, tabName) {
var i, tabcontent, tabbuttons;
tabcontent = document.getElementsByClassName("tab-content");
for (i = 0; i < tabcontent.length; i++) {
tabcontent[i].style.display = "none";
}
tabbuttons = document.getElementsByClassName("tab-button");
for (i = 0; i < tabbuttons.length; i++) {
tabbuttons[i].className = tabbuttons[i].className.replace(" active", "");
}
document.getElementById(tabName).style.display = "block";
evt.currentTarget.className += " active";
}
document.getElementById("defaultOpen")?.click();
</script>
<style>
.tabs {
overflow: hidden;
border-bottom: 2px solid #ccc;
margin-bottom: 10px;
}
.tab-button {
background-color: #f1f1f1;
border: none;
outline: none;
cursor: pointer;
padding: 10px 20px;
transition: 0.3s;
font-size: 14px;
margin-right: 2px;
}
.tab-button:hover {
background-color: #ddd;
}
.tab-button.active {
background-color: #ccc;
font-weight: bold;
}
.tab-content {
animation: fadeEffect 0.3s;
}
@keyframes fadeEffect {
from {opacity: 0;}
to {opacity: 1;}
}
</style>
<label for="moderation_embed_delete_delay">Délai de suppression des embeds (en secondes)</label>
<input name="moderation_embed_delete_delay" type="number" value="{{ configuration.getValue('moderation_embed_delete_delay') or '0' }}" placeholder="0" min="0" />
<small>Mettre 0 pour ne pas supprimer automatiquement</small>
</fieldset>
<input type="Submit" value="Enregistrer la configuration Discord">
<label for="discord_token">API Discord (cachée)</label>
<input name="discord_token" type="password" />
<input type="Submit" value="Définir">
<p>Nécessite un redémarrage</p>
</form>
<h2>API Twitch</h2>
@@ -199,7 +21,7 @@
<label for="twitch_channel">Chaîne à rejoindre</label>
<input name="twitch_channel" type="text" value="{{ configuration.getValue('twitch_channel') }}"
placeholder="#machinTruc" />
<input type="Submit" value="Enregistrer la configuration Twitch">
<input type="Submit" value="Définir">
<p>
<a href="{{ url_for('twitchConfigurationHelp') }}">Aide</a>
</p>
@@ -219,22 +41,18 @@
<h2>Humble Bundle</h2>
<form action="{{ url_for('updateConfiguration') }}" method="POST">
<p>Humble Bundle propose régulièrement des bundles de jeux vidéo à des prix réduits. Activez les notifications pour recevoir automatiquement les nouveaux packs disponibles sur votre serveur Discord.</p>
<label for="humble_bundle_enable">
<input type="checkbox" name="humble_bundle_enable" {% if configuration.getValue('humble_bundle_enable') %}checked="checked"{% endif %}>
Activer les notifications Humble Bundle
</label>
<label for="humble_bundle_channel">Canal de notification</label>
<label for="humble_bundle_enable">Activer</label>
<input type="checkbox" name="humble_bundle_enable" {% if configuration.getValue('humble_bundle_enable') %}
checked="checked" {% endif %}>
<label>Activer les notifications Humble Bundle</label>
<label for="humble_bundle_channel">Canal de notification des packs Humble Bundle</label>
<select name="humble_bundle_channel">
{% for channel in channels %}
<option value="{{channel.id}}" {% if configuration.getIntValue('humble_bundle_channel')==channel.id %}selected="selected"{% endif %}>
{{channel.name}}
</option>
<option value="{{channel.id}}" {% if configuration.getIntValue('humble_bundle_channel')==channel.id %}
selected="selected" {% endif %}>
{{channel.name}}</option>
{% endfor %}
</select>
<input type="Submit" value="Enregistrer la configuration Humble Bundle">
<input type="Submit" value="Définir">
</form>
{% endblock %}

View File

@@ -1,110 +0,0 @@
{% extends "template.html" %}
{% block content %}
<h1>Modération Discord</h1>
<p>
Historique des actions de modération effectuées sur le serveur Discord.
Le bot enregistre automatiquement les avertissements, exclusions et bannissements.
<table>
<thead>
<tr>
<th>Commande</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>!averto @utilisateur raison</strong><br><small>Alias : !warn, !av, !avertissement</small></td>
<td>Avertit un utilisateur et enregistre l'avertissement dans la base de données</td>
</tr>
<tr>
<td><strong>!delaverto id</strong><br><small>Alias : !removewarn, !delwarn</small></td>
<td>Retire un avertissement en utilisant son numéro d'ID</td>
</tr>
<tr>
<td><strong>!warnings</strong> ou <strong>!warnings @utilisateur</strong><br><small>Alias : !listevent, !listwarn</small></td>
<td>Affiche la liste des événements de modération (tous ou pour un utilisateur spécifique)</td>
</tr>
<tr>
<td><strong>!inspect @utilisateur</strong> ou <strong>!inspect id</strong></td>
<td>Affiche des informations détaillées sur un utilisateur : création du compte, date d'arrivée, historique de modération</td>
</tr>
<tr>
<td><strong>!kick @utilisateur raison</strong></td>
<td>Expulse un utilisateur du serveur</td>
</tr>
<tr>
<td><strong>!ban @utilisateur raison</strong></td>
<td>Bannit définitivement un utilisateur du serveur</td>
</tr>
<tr>
<td><strong>!unban discord_id</strong> ou <strong>!unban #sanction_id raison</strong></td>
<td>Révoque le bannissement d'un utilisateur et lui envoie une invitation</td>
</tr>
<tr>
<td><strong>!banlist</strong></td>
<td>Affiche la liste des utilisateurs actuellement bannis du serveur</td>
</tr>
<tr>
<td><strong>!aide</strong><br><small>Alias : !help</small></td>
<td>Affiche l'aide avec toutes les commandes disponibles</td>
</tr>
</tbody>
</table>
</p>
{% if not event %}
<h2>Événements de modération</h2>
<table class="moderation">
<thead>
<tr>
<th>Type</th>
<th>Utilisateur</th>
<th>Discord ID</th>
<th>Date & Heure</th>
<th>Raison</th>
<th>Staff</th>
<th>#</th>
</tr>
</thead>
<tbody>
{% for mod_event in events %}
<tr>
<td>{{ mod_event.type }}</td>
<td>{{ mod_event.username }}</td>
<td>{{ mod_event.discord_id }}</td>
<td>{{ mod_event.created_at.strftime('%d/%m/%Y %H:%M') if mod_event.created_at else 'N/A' }}</td>
<td>{{ mod_event.reason }}</td>
<td>{{ mod_event.staff_name }}</td>
<td>
<a href="{{ url_for('open_edit_moderation_event', event_id = mod_event.id) }}" class="icon"></a>
<a href="{{ url_for('delete_moderation_event', event_id = mod_event.id) }}" onclick="return confirm('Êtes-vous sûr de vouloir supprimer cet événement ?')" class="icon">🗑</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
{% if event %}
<h2>Editer un événement</h2>
<form action="{{ url_for('update_moderation_event', event_id = event.id) }}" method="POST">
<label for="type">Type</label>
<input name="type" type="text" value="{{ event.type }}" disabled />
<label for="username">Utilisateur</label>
<input name="username" type="text" value="{{ event.username }}" disabled />
<label for="discord_id">Discord ID</label>
<input name="discord_id" type="text" value="{{ event.discord_id }}" disabled />
<label for="reason">Raison</label>
<input name="reason" type="text" value="{{ event.reason }}" required="required" />
<label for="staff_name">Staff</label>
<input name="staff_name" type="text" value="{{ event.staff_name }}" disabled />
<input type="Submit" value="Modifier">
<a href="{{ url_for('moderation') }}">Annuler</a>
</form>
{% endif %}
{% endblock %}

View File

@@ -19,10 +19,8 @@
<a href="/"><img src="/static/ico/favicon.ico"></a>
<ul>
<li><a href="/live-alert">Alerte live</a></li>
<li><a href="/youtube">YouTube</a></li>
<li><a href="/commandes">Commandes</a></li>
<li><a href="/humeurs">Humeurs</a></li>
<li><a href="/moderation">Modération</a></li>
<li><a href="/protondb">ProtonDB</a></li>
<li><a href="/configurations">Configurations</a></li>
</ul>

View File

@@ -1,245 +0,0 @@
{% extends "template.html" %}
{% block content %}
<h1>Notifications YouTube</h1>
{% if msg %}
<div id="alert-msg" class="alert alert-{{ msg_type }}" style="padding: 10px; margin: 10px 0; border: 1px solid {{ '#f00' if msg_type == 'error' else '#0f0' }}; background-color: {{ '#ffe0e0' if msg_type == 'error' else '#e0ffe0' }};">
{{ msg }}
</div>
<script>
setTimeout(function() {
var el = document.getElementById('alert-msg');
if (el) el.style.display = 'none';
}, 5000);
</script>
{% endif %}
<p>
Liste des chaînes YouTube surveillées pour les notifications de nouvelles vidéos.
Le bot vérifie toutes les 5 minutes les nouvelles vidéos des chaînes en dessous.
Quand une nouvelle vidéo est détectée, le bot enverra une notification sur Discord.
</p>
{% if not notification %}
<h2>Notifications</h2>
<table class="live-alert">
<thead>
<tr>
<th>Chaîne YouTube</th>
<th>Canal Discord</th>
<th>Type</th>
<th>Message</th>
<th>#</th>
</tr>
</thead>
<tbody>
{% for notification in notifications %}
<tr>
<td>{{notification.channel_id}}</td>
<td>{{notification.notify_channel_name}}</td>
<td>
{% if notification.video_type == 'all' %}
Toutes
{% elif notification.video_type == 'video' %}
Vidéos uniquement
{% elif notification.video_type == 'short' %}
Shorts uniquement
{% endif %}
</td>
<td>{{notification.message}}</td>
<td>
<a href="{{ url_for('toggleYouTube', id = notification.id) }}" class="icon">{{ '✅' if notification.enable else '❌' }}</a>
<a href="{{ url_for('openEditYouTube', id = notification.id) }}" class="icon"></a>
<a href="{{ url_for('delYouTube', id = notification.id) }}"
onclick="return confirm('Êtes-vous sûr de vouloir supprimer cette notification ?')" class="icon">🗑</a>
</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endif %}
<h2>{{ 'Editer une notification' if notification else 'Ajouter une notification YouTube' }}</h2>
<div style="display: grid; grid-template-columns: 1fr 1fr; gap: 20px;">
<div>
<form id="youtube-form" action="{{ url_for('submitEditYouTube', id = notification.id) if notification else url_for('addYouTube') }}" method="POST">
<fieldset>
<legend>Configuration de base</legend>
<label for="channel_id">Lien ou ID de la chaîne YouTube</label>
<input name="channel_id" id="channel_id" type="text" maxlength="256" required="required" value="{{notification.channel_id if notification}}" placeholder="https://www.youtube.com/@513v3 ou https://www.youtube.com/channel/UC... ou UC..."/>
<label for="notify_channel">Canal de Notification Discord</label>
<select name="notify_channel" id="notify_channel">
{% for channel in channels %}
<option value="{{channel.id}}"{% if notification and notification.notify_channel == channel.id %}
selected="selected" {% endif %}>{{channel.name}}</option>
{% endfor %}
</select>
<label for="video_type">Type de vidéo à notifier</label>
<select name="video_type" id="video_type">
<option value="all"{% if notification and notification.video_type == 'all' %} selected="selected" {% endif %}>Toutes (vidéos + shorts)</option>
<option value="video"{% if notification and notification.video_type == 'video' %} selected="selected" {% endif %}>Vidéos uniquement</option>
<option value="short"{% if notification and notification.video_type == 'short' %} selected="selected" {% endif %}>Shorts uniquement</option>
</select>
<label for="message">Message (optionnel, envoyé avant l'embed)</label>
<textarea name="message" id="message" rows="3" cols="50">{{notification.message if notification}}</textarea>
</fieldset>
<fieldset>
<legend>Personnalisation de l'embed Discord</legend>
<label for="embed_title">Titre de l'embed</label>
<input name="embed_title" id="embed_title" type="text" maxlength="256" value="{{notification.embed_title if notification}}" placeholder="{video_title} (par défaut: titre de la vidéo)"/>
<small>Variables: {video_title}, {channel_name}, {video_url}, {video_id}</small>
<label for="embed_description">Description de l'embed</label>
<textarea name="embed_description" id="embed_description" rows="4" cols="50" placeholder="Description optionnelle de l'embed">{{notification.embed_description if notification}}</textarea>
<small>Variables: {video_title}, {channel_name}, {video_url}, {published_at}, {is_short}</small>
<label for="embed_color">Couleur de l'embed (hexadécimal)</label>
<input name="embed_color" id="embed_color" type="color" value="#{{notification.embed_color if notification else 'FF0000'}}" style="width: 100px; height: 40px;"/>
<input type="text" id="embed_color_text" value="{{notification.embed_color if notification else 'FF0000'}}" placeholder="FF0000" style="width: 100px; margin-left: 10px;" maxlength="6"/>
<small>Format: FF0000 (rouge YouTube par défaut)</small>
<label for="embed_author_name">Nom de l'auteur</label>
<input name="embed_author_name" id="embed_author_name" type="text" maxlength="256" value="{{notification.embed_author_name if notification}}" placeholder="{channel_name} (par défaut: nom de la chaîne)"/>
<label for="embed_author_icon">Icône de l'auteur (URL)</label>
<input name="embed_author_icon" id="embed_author_icon" type="text" maxlength="512" value="{{notification.embed_author_icon if notification}}" placeholder="https://www.youtube.com/img/desktop/yt_1200.png"/>
<label for="embed_footer">Pied de page</label>
<input name="embed_footer" id="embed_footer" type="text" maxlength="2048" value="{{notification.embed_footer if notification}}" placeholder="Texte optionnel en bas de l'embed"/>
<label>
<input type="checkbox" name="embed_thumbnail" id="embed_thumbnail" {% if not notification or notification.embed_thumbnail %}checked="checked"{% endif %}>
Afficher la miniature (thumbnail) en haut à droite
</label>
<label>
<input type="checkbox" name="embed_image" id="embed_image" {% if not notification or notification.embed_image %}checked="checked"{% endif %}>
Afficher l'image principale (image de la vidéo)
</label>
</fieldset>
<input type="Submit" value="{{ 'Modifier' if notification else 'Ajouter' }}">
</form>
</div>
<div>
<h3>Prévisualisation de l'embed Discord</h3>
<div id="embed-preview" style="background-color: #2f3136; border-radius: 4px; padding: 16px; font-family: 'Whitney', 'Helvetica Neue', Helvetica, Arial, sans-serif; color: #dcddde; max-width: 520px; border-left: 4px solid #FF0000;">
<div id="embed-author" style="display: flex; align-items: center; margin-bottom: 8px; font-size: 14px;">
<img id="embed-author-icon" src="https://www.youtube.com/img/desktop/yt_1200.png" style="width: 20px; height: 20px; border-radius: 50%; margin-right: 8px;" onerror="this.style.display='none'"/>
<span id="embed-author-name" style="font-weight: 600;">Nom de la chaîne</span>
</div>
<a id="embed-title" href="#" style="color: #00aff4; text-decoration: none; font-size: 16px; font-weight: 600; display: block; margin-bottom: 8px;">Titre de la vidéo</a>
<div id="embed-description" style="font-size: 14px; line-height: 1.375; margin-bottom: 8px; color: #dcddde;"></div>
<div id="embed-thumbnail-container" style="margin: 8px 0;">
<img id="embed-thumbnail" src="" style="max-width: 80px; max-height: 80px; border-radius: 4px; float: right; margin-left: 16px; display: none;"/>
</div>
<div id="embed-image-container" style="margin-top: 16px;">
<img id="embed-image" src="https://i.ytimg.com/vi/dQw4w9WgXcQ/maxresdefault.jpg" style="max-width: 100%; border-radius: 4px; display: none;"/>
</div>
<div id="embed-footer" style="margin-top: 8px; font-size: 12px; color: #72767d;"></div>
</div>
<small style="color: #666;">Cette prévisualisation est approximative. L'apparence réelle sur Discord peut varier légèrement.</small>
</div>
</div>
<script>
function formatText(text, vars) {
if (!text) return '';
return text.replace(/\{(\w+)\}/g, function(match, key) {
return vars[key] || match;
});
}
function updatePreview() {
const embedTitle = document.getElementById('embed_title').value || '{video_title}';
const embedDescription = document.getElementById('embed_description').value || '';
const embedColor = document.getElementById('embed_color_text').value || 'FF0000';
const embedAuthorName = document.getElementById('embed_author_name').value || '{channel_name}';
const embedAuthorIcon = document.getElementById('embed_author_icon').value || 'https://www.youtube.com/img/desktop/yt_1200.png';
const embedFooter = document.getElementById('embed_footer').value || '';
const embedThumbnail = document.getElementById('embed_thumbnail').checked;
const embedImage = document.getElementById('embed_image').checked;
const vars = {
video_title: 'Nouvelle vidéo de test',
channel_name: 'Ma Chaîne YouTube',
video_url: 'https://www.youtube.com/watch?v=dQw4w9WgXcQ',
video_id: 'dQw4w9WgXcQ',
thumbnail: 'https://i.ytimg.com/vi/dQw4w9WgXcQ/maxresdefault.jpg',
published_at: '2026-01-25T12:00:00Z',
is_short: false
};
document.getElementById('embed-title').textContent = formatText(embedTitle, vars);
document.getElementById('embed-title').href = vars.video_url;
document.getElementById('embed-description').textContent = formatText(embedDescription, vars);
document.getElementById('embed-author-name').textContent = formatText(embedAuthorName, vars);
document.getElementById('embed-author-icon').src = embedAuthorIcon;
document.getElementById('embed-footer').textContent = formatText(embedFooter, vars);
document.getElementById('embed-preview').style.borderLeftColor = '#' + embedColor;
if (embedThumbnail) {
document.getElementById('embed-thumbnail').src = vars.thumbnail;
document.getElementById('embed-thumbnail').style.display = 'block';
} else {
document.getElementById('embed-thumbnail').style.display = 'none';
}
if (embedImage) {
document.getElementById('embed-image').src = vars.thumbnail;
document.getElementById('embed-image').style.display = 'block';
} else {
document.getElementById('embed-image').style.display = 'none';
}
}
document.getElementById('embed_color').addEventListener('input', function(e) {
document.getElementById('embed_color_text').value = e.target.value.substring(1).toUpperCase();
updatePreview();
});
document.getElementById('embed_color_text').addEventListener('input', function(e) {
const val = e.target.value.replace(/[^0-9A-Fa-f]/g, '').substring(0, 6);
e.target.value = val;
if (val.length === 6) {
document.getElementById('embed_color').value = '#' + val;
updatePreview();
}
});
const formFields = ['embed_title', 'embed_description', 'embed_author_name', 'embed_author_icon', 'embed_footer', 'embed_thumbnail', 'embed_image'];
formFields.forEach(field => {
const el = document.getElementById(field);
if (el) {
el.addEventListener('input', updatePreview);
el.addEventListener('change', updatePreview);
}
});
updatePreview();
</script>
<p>
<strong>Variables disponibles pour l'embed :</strong>
<ul>
<li><code>{channel_name}</code> : nom de la chaîne YouTube</li>
<li><code>{video_title}</code> : titre de la vidéo</li>
<li><code>{video_url}</code> : lien vers la vidéo</li>
<li><code>{video_id}</code> : ID de la vidéo</li>
<li><code>{thumbnail}</code> : URL de la miniature</li>
<li><code>{published_at}</code> : date de publication</li>
<li><code>{is_short}</code> : True si c'est un short, False sinon</li>
</ul>
</p>
{% endblock %}

View File

@@ -17,34 +17,53 @@ auth: UserAuthenticator
def twitchConfigurationHelp():
return render_template("twitch-aide.html", token_redirect_url = _buildUrl())
@webapp.route("/configurations/twitch/request-token")
async def twitchRequestToken():
@webapp.route("/configurations/twitch/request-token")
async def twitchRequestToken():
global auth
helper = ConfigurationHelper()
twitch = await Twitch(helper.getValue('twitch_client_id'), helper.getValue('twitch_client_secret'))
auth = UserAuthenticator(twitch, USER_SCOPE, url=_buildUrl())
return redirect(auth.return_auth_url())
try:
helper = ConfigurationHelper()
import asyncio
twitch = await asyncio.wait_for(
Twitch(helper.getValue('twitch_client_id'), helper.getValue('twitch_client_secret')),
timeout=30.0
)
auth = UserAuthenticator(twitch, USER_SCOPE, url=_buildUrl())
return redirect(auth.return_auth_url())
except asyncio.TimeoutError:
logging.error('Timeout lors de la connexion à Twitch API pour la demande de token')
return redirect(url_for('openConfigurations'))
except TwitchAPIException as e:
logging.error(f'Erreur API Twitch lors de la demande de token : {e}')
return redirect(url_for('openConfigurations'))
except Exception as e:
logging.error(f'Erreur inattendue lors de la demande de token Twitch : {e}')
return redirect(url_for('openConfigurations'))
@webapp.route("/configurations/twitch/receive-token")
@webapp.route("/configurations/twitch/receive-token")
async def twitchReceiveToken():
global auth
state = request.args.get('state')
code = request.args.get('code')
if state != auth.state :
logging('bad returned state')
logging.error('bad returned state')
return redirect(url_for('openConfigurations'))
if code == None :
logging('no returned state')
logging.error('no returned code')
return redirect(url_for('openConfigurations'))
try:
token, refresh = await auth.authenticate(user_token=code)
import asyncio
token, refresh = await asyncio.wait_for(auth.authenticate(user_token=code), timeout=30.0)
helper = ConfigurationHelper()
helper.createOrUpdate('twitch_access_token', token)
helper.createOrUpdate('twitch_refresh_token', refresh)
db.session.commit()
except asyncio.TimeoutError:
logging.error('Timeout lors de l\'authentification Twitch')
except TwitchAPIException as e:
logging(e)
logging.error(f'Erreur API Twitch lors de l\'authentification : {e}')
except Exception as e:
logging.error(f'Erreur inattendue lors de l\'authentification Twitch : {e}')
return redirect(url_for('openConfigurations'))
# hack pas fou mais on estime qu'on sera toujours en ssl en connecté

View File

@@ -1,181 +0,0 @@
import re
import requests
from urllib.parse import urlencode
from flask import render_template, request, redirect, url_for
from webapp import webapp
from database import db
from database.models import YouTubeNotification
from discordbot import bot
def extract_channel_id(channel_input: str) -> str:
"""Extrait l'ID de la chaîne YouTube depuis différents formats"""
if not channel_input:
return None
channel_input = channel_input.strip()
if channel_input.startswith('UC') and len(channel_input) == 24:
return channel_input
if '/channel/' in channel_input:
match = re.search(r'/channel/([a-zA-Z0-9_-]{24})', channel_input)
if match:
return match.group(1)
if '/c/' in channel_input or '/user/' in channel_input:
parts = channel_input.split('/')
for i, part in enumerate(parts):
if part in ['c', 'user'] and i + 1 < len(parts):
handle = parts[i + 1].split('?')[0].split('&')[0]
channel_id = _get_channel_id_from_handle(handle)
if channel_id:
return channel_id
if '@' in channel_input:
handle = re.search(r'@([a-zA-Z0-9_-]+)', channel_input)
if handle:
channel_id = _get_channel_id_from_handle(handle.group(1))
if channel_id:
return channel_id
return None
def _get_channel_id_from_handle(handle: str) -> str:
"""Récupère l'ID de la chaîne depuis un handle en utilisant le flux RSS"""
try:
url = f"https://www.youtube.com/@{handle}"
response = requests.get(url, timeout=10, allow_redirects=True)
if response.status_code == 200:
channel_id_match = re.search(r'"channelId":"([^"]{24})"', response.text)
if channel_id_match:
return channel_id_match.group(1)
canonical_match = re.search(r'<link rel="canonical" href="https://www\.youtube\.com/channel/([^"]{24})"', response.text)
if canonical_match:
return canonical_match.group(1)
return None
except Exception:
return None
@webapp.route("/youtube")
def openYouTube():
notifications: list[YouTubeNotification] = YouTubeNotification.query.all()
channels = bot.getAllTextChannel()
for notification in notifications:
for channel in channels:
if notification.notify_channel == channel.id:
notification.notify_channel_name = channel.name
msg = request.args.get('msg')
msg_type = request.args.get('type', 'info')
return render_template("youtube.html", notifications=notifications, channels=channels, msg=msg, msg_type=msg_type)
@webapp.route("/youtube/add", methods=['POST'])
def addYouTube():
channel_input = request.form.get('channel_id', '').strip()
channel_id = extract_channel_id(channel_input)
if not channel_id:
return redirect(url_for("openYouTube") + "?" + urlencode({'msg': f"Impossible d'extraire l'ID de la chaîne depuis : {channel_input}. Veuillez vérifier le lien.", 'type': 'error'}))
notify_channel_str = request.form.get('notify_channel')
if not notify_channel_str:
return redirect(url_for("openYouTube") + "?" + urlencode({'msg': "Veuillez sélectionner un canal Discord. Assurez-vous que le bot Discord est connecté.", 'type': 'error'}))
try:
notify_channel = int(notify_channel_str)
except ValueError:
return redirect(url_for("openYouTube") + "?" + urlencode({'msg': "Canal Discord invalide.", 'type': 'error'}))
embed_color = request.form.get('embed_color', 'FF0000').strip().lstrip('#')
if len(embed_color) != 6:
embed_color = 'FF0000'
notification = YouTubeNotification(
enable=True,
channel_id=channel_id,
notify_channel=notify_channel,
message=request.form.get('message'),
video_type=request.form.get('video_type', 'all'),
embed_title=request.form.get('embed_title') or None,
embed_description=request.form.get('embed_description') or None,
embed_color=embed_color,
embed_footer=request.form.get('embed_footer') or None,
embed_author_name=request.form.get('embed_author_name') or None,
embed_author_icon=request.form.get('embed_author_icon') or None,
embed_thumbnail=request.form.get('embed_thumbnail') == 'on',
embed_image=request.form.get('embed_image') == 'on'
)
db.session.add(notification)
db.session.commit()
return redirect(url_for("openYouTube") + "?" + urlencode({'msg': f"Notification ajoutée avec succès pour la chaîne {channel_id}", 'type': 'success'}))
@webapp.route("/youtube/toggle/<int:id>")
def toggleYouTube(id):
notification: YouTubeNotification = YouTubeNotification.query.get_or_404(id)
notification.enable = not notification.enable
db.session.commit()
return redirect(url_for("openYouTube"))
@webapp.route("/youtube/edit/<int:id>")
def openEditYouTube(id):
notification = YouTubeNotification.query.get_or_404(id)
channels = bot.getAllTextChannel()
msg = request.args.get('msg')
msg_type = request.args.get('type', 'info')
return render_template("youtube.html", notification=notification, channels=channels, notifications=YouTubeNotification.query.all(), msg=msg, msg_type=msg_type)
@webapp.route("/youtube/edit/<int:id>", methods=['POST'])
def submitEditYouTube(id):
notification: YouTubeNotification = YouTubeNotification.query.get_or_404(id)
channel_input = request.form.get('channel_id', '').strip()
channel_id = extract_channel_id(channel_input)
if not channel_id:
return redirect(url_for("openEditYouTube", id=id) + "?" + urlencode({'msg': f"Impossible d'extraire l'ID de la chaîne depuis : {channel_input}. Veuillez vérifier le lien.", 'type': 'error'}))
notify_channel_str = request.form.get('notify_channel')
if not notify_channel_str:
return redirect(url_for("openEditYouTube", id=id) + "?" + urlencode({'msg': "Veuillez sélectionner un canal Discord. Assurez-vous que le bot Discord est connecté.", 'type': 'error'}))
try:
notify_channel = int(notify_channel_str)
except ValueError:
return redirect(url_for("openEditYouTube", id=id) + "?" + urlencode({'msg': "Canal Discord invalide.", 'type': 'error'}))
embed_color = request.form.get('embed_color', 'FF0000').strip().lstrip('#')
if len(embed_color) != 6:
embed_color = 'FF0000'
notification.channel_id = channel_id
notification.notify_channel = notify_channel
notification.message = request.form.get('message')
notification.video_type = request.form.get('video_type', 'all')
notification.embed_title = request.form.get('embed_title') or None
notification.embed_description = request.form.get('embed_description') or None
notification.embed_color = embed_color
notification.embed_footer = request.form.get('embed_footer') or None
notification.embed_author_name = request.form.get('embed_author_name') or None
notification.embed_author_icon = request.form.get('embed_author_icon') or None
notification.embed_thumbnail = request.form.get('embed_thumbnail') == 'on'
notification.embed_image = request.form.get('embed_image') == 'on'
db.session.commit()
return redirect(url_for("openYouTube") + "?" + urlencode({'msg': "Notification modifiée avec succès", 'type': 'success'}))
@webapp.route("/youtube/del/<int:id>")
def delYouTube(id):
notification = YouTubeNotification.query.get_or_404(id)
db.session.delete(notification)
db.session.commit()
return redirect(url_for("openYouTube"))