Files
lara-schach-live/server.py

281 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Lokal Proxy-Server für Lara's Schachturnier
Lädt die PGN-Datei und stellt sie mit CORS-Headern bereit.
"""
import http.server
import socketserver
import urllib.request
import urllib.parse
import sys
import os
import threading
import time
import json
import re
from datetime import datetime
PGN_URL = os.environ.get("PGN_URL", "https://www.deutsche-schachjugend.de/2026/odjm-d/partien/gesamt-utf8.pgn")
STANDINGS_URL = "https://www.deutsche-schachjugend.de/2026/odjm-d/tabelle/"
PORT = int(os.environ.get("PORT", 8111))
CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cache")
CACHE_FILE = os.path.join(CACHE_DIR, "gesamt-utf8.pgn")
STANDINGS_CACHE_FILE = os.path.join(CACHE_DIR, "standings.json")
CACHE_TTL = int(os.environ.get("CACHE_TTL", 31)) # Sekunden
os.makedirs(CACHE_DIR, exist_ok=True)
last_fetch_time = None
last_standings_fetch_time = None
def fetch_pgn():
"""Lädt die PGN-Datei von der URL als Bytes."""
global last_fetch_time
try:
headers = {"User-Agent": "Mozilla/5.0"}
if last_fetch_time:
headers["modified_since"] = str(last_fetch_time)
req = urllib.request.Request(PGN_URL, headers=headers)
with urllib.request.urlopen(req, timeout=31) as response:
data = response.read()
last_fetch_time = time.time()
return data
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Fehler beim Laden: {e}")
return None
def get_pgn_content_longpoll(since=0):
"""Long-Poll: Gibt PGN-Inhalt + mtime zurück.
Wenn Cache seit `since` unverändert ist, wird bis zu 31s gewartet."""
deadline = time.time() + 31
while time.time() < deadline:
if os.path.exists(CACHE_FILE):
mtime = os.path.getmtime(CACHE_FILE)
if mtime > since:
with open(CACHE_FILE, "rb") as f:
return f.read(), mtime
age = time.time() - mtime
if age >= CACHE_TTL:
content = fetch_pgn()
if content:
with open(CACHE_FILE, "wb") as f:
f.write(content)
new_mtime = os.path.getmtime(CACHE_FILE)
if new_mtime > since:
print(f"[{datetime.now().strftime('%H:%M:%S')}] PGN via Long-Poll aktualisiert ({len(content)} Bytes)")
return content, new_mtime
else:
content = fetch_pgn()
if content:
with open(CACHE_FILE, "wb") as f:
f.write(content)
return content, os.path.getmtime(CACHE_FILE)
time.sleep(2)
# Timeout aktuellen Stand zurückgeben
if os.path.exists(CACHE_FILE):
with open(CACHE_FILE, "rb") as f:
return f.read(), os.path.getmtime(CACHE_FILE)
return None, 0
def fetch_standings():
"""Lädt die Tabellenseite und parst Laras Platzierung."""
global last_standings_fetch_time
try:
headers = {"User-Agent": "Mozilla/5.0"}
if last_standings_fetch_time:
headers["modified_since"] = str(last_standings_fetch_time)
req = urllib.request.Request(STANDINGS_URL, headers=headers)
with urllib.request.urlopen(req, timeout=31) as response:
html = response.read().decode("utf-8", errors="replace")
last_standings_fetch_time = time.time()
# Rundeninfo extrahieren
round_info = ""
m = re.search(r"Tabellenstand\s+nach\s+der\s+(\d+)\.\s*Runde", html)
if m:
round_info = f"nach der {m.group(1)}. Runde"
# Tabellenzeilen suchen
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL | re.IGNORECASE)
for row in rows:
if "Lara Kiesewetter" not in row:
continue
cells = re.findall(r"<td[^>]*>(.*?)</td>", row, re.DOTALL)
clean = [re.sub(r"<[^>]+>", "", c).strip() for c in cells]
if len(clean) >= 9:
return {
"rank": clean[0],
"player": "Lara Kiesewetter",
"wins": clean[5],
"draws": clean[6],
"losses": clean[7],
"points": clean[8],
"round_info": round_info,
}
return None
except Exception as e:
print(f"Fehler beim Laden der Tabelle: {e}")
return None
def get_standings_content():
"""Gibt Tabellenstand als Dict zurück, nutzt Cache."""
now = time.time()
if os.path.exists(STANDINGS_CACHE_FILE):
age = now - os.path.getmtime(STANDINGS_CACHE_FILE)
if age < 300: # 5 Minuten Cache
with open(STANDINGS_CACHE_FILE, "r") as f:
return json.load(f)
data = fetch_standings()
if data:
with open(STANDINGS_CACHE_FILE, "w") as f:
json.dump(data, f)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Tabellenstand aktualisiert")
return data
return None
class PGNHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
parsed_path = urllib.parse.urlparse(self.path)
if parsed_path.path == "/pgn":
params = urllib.parse.parse_qs(parsed_path.query)
since = float(params.get("since", [0])[0])
content, mtime = get_pgn_content_longpoll(since)
if content:
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-cache")
self.send_header("X-Cache-Mtime", str(mtime))
self.end_headers()
self.wfile.write(content)
else:
self.send_response(502)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"error": "Konnte PGN nicht laden"}).encode())
elif parsed_path.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-cache")
self.end_headers()
status = {
"status": "ok",
"last_fetch": last_fetch_time,
"cache_ttl": CACHE_TTL,
"server_time": time.time()
}
self.wfile.write(json.dumps(status).encode())
elif parsed_path.path == "/standings":
data = get_standings_content()
if data:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "max-age=300")
self.end_headers()
self.wfile.write(json.dumps(data).encode())
else:
self.send_response(502)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
err = {"error": "Tabellendaten nicht verf\u00fcgbar"}
self.wfile.write(json.dumps(err).encode())
else:
# Statische Dateien aus dem Verzeichnis
if self.path == "/":
self.path = "/index.html"
filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.path.lstrip("/"))
if os.path.isfile(filepath):
content_types = {
".html": "text/html",
".css": "text/css",
".js": "application/javascript",
".json": "application/json",
".png": "image/png",
".jpg": "image/jpeg",
".svg": "image/svg+xml",
}
ext = os.path.splitext(filepath)[1]
content_type = content_types.get(ext, "application/octet-stream")
with open(filepath, "rb") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-Type", f"{content_type}; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(content)
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
def background_refresh():
"""Aktualisiert den Cache im Hintergrund."""
while True:
time.sleep(CACHE_TTL)
try:
content = fetch_pgn()
if content:
with open(CACHE_FILE, "wb") as f:
f.write(content)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Hintergrund-Refresh Fehler: {e}")
def main():
print("=" * 50)
print(" [TROPHY] Lara Kiesewetter Live Schachturnier")
print("=" * 50)
print(f" Server läuft auf: http://localhost:{PORT}")
print(f" Drücke Ctrl+C zum Beenden")
print("=" * 50)
# Initialer Ladeversuch
content = fetch_pgn()
if content:
with open(CACHE_FILE, "wb") as f:
f.write(content)
print(f"[OK] PGN geladen ({len(content)} Bytes)")
else:
print("[WARN] Initialer Ladeversuch fehlgeschlagen, wird wiederholt...")
# Hintergrund-Refresh Thread starten
refresh_thread = threading.Thread(target=background_refresh, daemon=True)
refresh_thread.start()
# Server starten
with socketserver.TCPServer(("", PORT), PGNHandler) as httpd:
print(f"\n[SERVER] Server gestartet: http://localhost:{PORT}\n")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n[BYE] Server gestoppt.")
if __name__ == "__main__":
main()