Files
lara-schach-live/server.py

311 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Lokal Proxy-Server für Lara's Schachturnier
Lädt die PGN-Datei und stellt sie mit CORS-Headern bereit.
"""
import http.server
import socketserver
import urllib.request
import urllib.parse
import urllib.error
import sys
import os
import threading
import time
import json
import re
from datetime import datetime
STANDINGS_URL = "https://www.deutsche-schachjugend.de/2026/odjm-d/tabelle/"
PORT = int(os.environ.get("PORT", 8111))
CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cache")
STANDINGS_CACHE_FILE = os.path.join(CACHE_DIR, "standings.json")
CACHE_TTL = int(os.environ.get("CACHE_TTL", 31))
os.makedirs(CACHE_DIR, exist_ok=True)
last_standings_fetch_time = None
current_round = 0 # wird aus der Tabelle ermittelt
def get_pgn_url(round_num=None):
if round_num is None:
round_num = current_round
return f"https://www.deutsche-schachjugend.de/2026/odjm-d/partien/{round_num}.pgn"
def get_cache_file(round_num=None):
if round_num is None:
round_num = current_round
return os.path.join(CACHE_DIR, f"runde-{round_num}.pgn")
def fetch_pgn(round_num=None):
"""Lädt die PGN-Datei von der URL als Bytes.
Gibt (data, True) bei Erfolg oder (None, False) bei Fehler zurück."""
if round_num is None:
round_num = current_round
if round_num == 0:
return None, False
url = get_pgn_url(round_num)
try:
headers = {"User-Agent": "Mozilla/5.0"}
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=31) as response:
data = response.read()
return data, True
except urllib.error.HTTPError as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] HTTP-Fehler (Runde {round_num}): {e}")
return None, False
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Fehler beim Laden (Runde {round_num}): {e}")
return None, False
def get_round_pgn(round_num):
"""Gibt PGN-Daten für eine Runde zurück.
Bei der aktuellen Runde wird immer live von der DSJ geholt."""
cache_file = get_cache_file(round_num)
if round_num == current_round:
content, changed = fetch_pgn(round_num)
if changed and content is not None:
with open(cache_file, "wb") as f:
f.write(content)
return content, os.path.getmtime(cache_file)
if os.path.exists(cache_file):
with open(cache_file, "rb") as f:
return f.read(), os.path.getmtime(cache_file)
return None, 0
if os.path.exists(cache_file):
with open(cache_file, "rb") as f:
return f.read(), os.path.getmtime(cache_file)
content, changed = fetch_pgn(round_num)
if changed and content is not None:
with open(cache_file, "wb") as f:
f.write(content)
return content, os.path.getmtime(cache_file)
return None, 0
def fetch_standings():
"""Lädt die Tabellenseite und parst Laras Platzierung.
Ermittelt dabei auch die aktuelle Runde."""
global last_standings_fetch_time, current_round
try:
headers = {"User-Agent": "Mozilla/5.0"}
if last_standings_fetch_time:
headers["modified_since"] = str(last_standings_fetch_time)
req = urllib.request.Request(STANDINGS_URL, headers=headers)
with urllib.request.urlopen(req, timeout=31) as response:
html = response.read().decode("utf-8", errors="replace")
last_standings_fetch_time = time.time()
round_info = ""
m = re.search(r"Tabellenstand\s+nach\s+der\s+(\d+)\.\s*Runde", html)
if m:
detected = int(m.group(1))
if detected != current_round:
old = current_round
current_round = detected
print(f"[{datetime.now().strftime('%H:%M:%S')}] Runde erkannt: {current_round} (war {old})")
round_info = f"nach der {m.group(1)}. Runde"
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL | re.IGNORECASE)
for row in rows:
if "Lara Kiesewetter" not in row:
continue
cells = re.findall(r"<td[^>]*>(.*?)</td>", row, re.DOTALL)
clean = [re.sub(r"<[^>]+>", "", c).strip() for c in cells]
if len(clean) >= 9:
return {
"rank": clean[0],
"player": "Lara Kiesewetter",
"wins": clean[5],
"draws": clean[6],
"losses": clean[7],
"points": clean[8],
"round_info": round_info,
"round": current_round,
}
return None
except Exception as e:
print(f"Fehler beim Laden der Tabelle: {e}")
return None
def get_standings_content():
"""Gibt Tabellenstand als Dict zurück, nutzt Cache."""
now = time.time()
if os.path.exists(STANDINGS_CACHE_FILE):
age = now - os.path.getmtime(STANDINGS_CACHE_FILE)
if age < 300: # 5 Minuten Cache
with open(STANDINGS_CACHE_FILE, "r") as f:
return json.load(f)
data = fetch_standings()
if data:
with open(STANDINGS_CACHE_FILE, "w") as f:
json.dump(data, f)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Tabellenstand aktualisiert")
return data
return None
class PGNHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
parsed_path = urllib.parse.urlparse(self.path)
parts = parsed_path.path.strip("/").split("/")
if len(parts) == 2 and parts[0] == "pgn" and parts[1].isdigit():
round_num = int(parts[1])
if round_num <= 0 or round_num > current_round + 1:
self.send_response(404)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"error": "Runde nicht verf\u00fcgbar"}).encode())
return
content, mtime = get_round_pgn(round_num)
if content:
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-cache")
self.end_headers()
self.wfile.write(content)
else:
self.send_response(404)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"error": "Konnte PGN nicht laden"}).encode())
elif parsed_path.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-cache")
self.end_headers()
status = {
"status": "ok",
"server_time": time.time()
}
self.wfile.write(json.dumps(status).encode())
elif parsed_path.path == "/standings":
data = get_standings_content()
if data:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "max-age=300")
self.end_headers()
self.wfile.write(json.dumps(data).encode())
else:
self.send_response(502)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
err = {"error": "Tabellendaten nicht verf\u00fcgbar"}
self.wfile.write(json.dumps(err).encode())
else:
# Statische Dateien aus dem Verzeichnis
if self.path == "/":
self.path = "/index.html"
filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.path.lstrip("/"))
if os.path.isfile(filepath):
content_types = {
".html": "text/html",
".css": "text/css",
".js": "application/javascript",
".json": "application/json",
".png": "image/png",
".jpg": "image/jpeg",
".svg": "image/svg+xml",
}
ext = os.path.splitext(filepath)[1]
content_type = content_types.get(ext, "application/octet-stream")
with open(filepath, "rb") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-Type", f"{content_type}; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(content)
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
def background_refresh():
"""Aktualisiert den Cache im Hintergrund."""
while True:
time.sleep(CACHE_TTL)
try:
content, changed = fetch_pgn()
if changed and content is not None:
cache_file = get_cache_file()
with open(cache_file, "wb") as f:
f.write(content)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Hintergrund-Refresh: PGN aktualisiert ({len(content)} Bytes)")
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Hintergrund-Refresh Fehler: {e}")
def main():
global current_round
print("=" * 50)
print(" [TROPHY] Lara Kiesewetter Live Schachturnier")
print("=" * 50)
print(f" Server läuft auf: http://localhost:{PORT}")
print(f" Drücke Ctrl+C zum Beenden")
print("=" * 50)
# Runde aus Tabelle ermitteln
standings = fetch_standings()
if standings and current_round > 0:
print(f"[OK] Aktuelle Runde: {current_round}")
else:
current_round = 1
print("[INFO] Starte mit Runde 1 (Tabelle noch nicht verfügbar)")
content, changed = fetch_pgn()
if changed and content is not None:
cache_file = get_cache_file()
with open(cache_file, "wb") as f:
f.write(content)
print(f"[OK] PGN geladen ({len(content)} Bytes)")
else:
print("[WARN] Initialer Ladeversuch fehlgeschlagen, wird wiederholt...")
# Hintergrund-Refresh Thread starten
refresh_thread = threading.Thread(target=background_refresh, daemon=True)
refresh_thread.start()
# Server starten
with socketserver.TCPServer(("", PORT), PGNHandler) as httpd:
print(f"\n[SERVER] Server gestartet: http://localhost:{PORT}\n")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n[BYE] Server gestoppt.")
if __name__ == "__main__":
main()