Files
lara-schach-live/server.py

304 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Lokal Proxy-Server für Lara's Schachturnier
Lädt die PGN-Datei und stellt sie mit CORS-Headern bereit.
"""
import http.server
import socketserver
import urllib.request
import urllib.parse
import urllib.error
import sys
import os
import threading
import time
import json
import re
from datetime import datetime
STANDINGS_URL = "https://www.deutsche-schachjugend.de/2026/odjm-d/tabelle/"
PORT = int(os.environ.get("PORT", 8111))
CACHE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "cache")
STANDINGS_CACHE_FILE = os.path.join(CACHE_DIR, "standings.json")
CACHE_TTL = int(os.environ.get("CACHE_TTL", 31))
os.makedirs(CACHE_DIR, exist_ok=True)
last_fetch_time = None
last_standings_fetch_time = None
current_round = 0 # wird aus der Tabelle ermittelt
def get_pgn_url():
return f"https://www.deutsche-schachjugend.de/2026/odjm-d/partien/{current_round}.pgn"
def get_cache_file():
return os.path.join(CACHE_DIR, f"runde-{current_round}.pgn")
def fetch_pgn():
"""Lädt die PGN-Datei von der URL als Bytes.
Nutzt If-Modified-Since gibt (data, changed) zurück.
changed=False bedeutet 304 Not Modified (keine neuen Daten)."""
global last_fetch_time
if current_round == 0:
return None, False
url = get_pgn_url()
try:
headers = {"User-Agent": "Mozilla/5.0"}
if last_fetch_time:
headers["If-Modified-Since"] = datetime.utcfromtimestamp(last_fetch_time).strftime("%a, %d %b %Y %H:%M:%S GMT")
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=31) as response:
data = response.read()
last_fetch_time = time.time()
return data, True
except urllib.error.HTTPError as e:
if e.code == 304:
return None, False
print(f"[{datetime.now().strftime('%H:%M:%S')}] HTTP-Fehler: {e}")
return None, False
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Fehler beim Laden: {e}")
return None, False
def get_pgn_content_longpoll(since=0):
"""Long-Poll: Blockiert, bis sich die PGN-Datei tatsächlich ändert.
Antwortet nur bei neuen Daten, nie mit unverändertem Stand."""
while True:
cache_file = get_cache_file()
if os.path.exists(cache_file):
mtime = os.path.getmtime(cache_file)
if mtime > since:
with open(cache_file, "rb") as f:
return f.read(), mtime
age = time.time() - mtime
if age >= CACHE_TTL:
content, changed = fetch_pgn()
if changed and content is not None:
with open(cache_file, "wb") as f:
f.write(content)
new_mtime = os.path.getmtime(cache_file)
print(f"[{datetime.now().strftime('%H:%M:%S')}] PGN via Long-Poll aktualisiert ({len(content)} Bytes)")
return content, new_mtime
else:
content, changed = fetch_pgn()
if changed and content is not None:
with open(cache_file, "wb") as f:
f.write(content)
return content, os.path.getmtime(cache_file)
time.sleep(5)
def fetch_standings():
"""Lädt die Tabellenseite und parst Laras Platzierung.
Ermittelt dabei auch die aktuelle Runde."""
global last_standings_fetch_time, current_round
try:
headers = {"User-Agent": "Mozilla/5.0"}
if last_standings_fetch_time:
headers["modified_since"] = str(last_standings_fetch_time)
req = urllib.request.Request(STANDINGS_URL, headers=headers)
with urllib.request.urlopen(req, timeout=31) as response:
html = response.read().decode("utf-8", errors="replace")
last_standings_fetch_time = time.time()
round_info = ""
m = re.search(r"Tabellenstand\s+nach\s+der\s+(\d+)\.\s*Runde", html)
if m:
detected = int(m.group(1))
if detected != current_round:
old = current_round
current_round = detected
print(f"[{datetime.now().strftime('%H:%M:%S')}] Runde erkannt: {current_round} (war {old})")
round_info = f"nach der {m.group(1)}. Runde"
rows = re.findall(r"<tr[^>]*>(.*?)</tr>", html, re.DOTALL | re.IGNORECASE)
for row in rows:
if "Lara Kiesewetter" not in row:
continue
cells = re.findall(r"<td[^>]*>(.*?)</td>", row, re.DOTALL)
clean = [re.sub(r"<[^>]+>", "", c).strip() for c in cells]
if len(clean) >= 9:
return {
"rank": clean[0],
"player": "Lara Kiesewetter",
"wins": clean[5],
"draws": clean[6],
"losses": clean[7],
"points": clean[8],
"round_info": round_info,
}
return None
except Exception as e:
print(f"Fehler beim Laden der Tabelle: {e}")
return None
def get_standings_content():
"""Gibt Tabellenstand als Dict zurück, nutzt Cache."""
now = time.time()
if os.path.exists(STANDINGS_CACHE_FILE):
age = now - os.path.getmtime(STANDINGS_CACHE_FILE)
if age < 300: # 5 Minuten Cache
with open(STANDINGS_CACHE_FILE, "r") as f:
return json.load(f)
data = fetch_standings()
if data:
with open(STANDINGS_CACHE_FILE, "w") as f:
json.dump(data, f)
print(f"[{datetime.now().strftime('%H:%M:%S')}] Tabellenstand aktualisiert")
return data
return None
class PGNHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
parsed_path = urllib.parse.urlparse(self.path)
if parsed_path.path == "/pgn":
params = urllib.parse.parse_qs(parsed_path.query)
since = float(params.get("since", [0])[0])
content, mtime = get_pgn_content_longpoll(since)
if content:
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-cache")
self.send_header("X-Cache-Mtime", str(mtime))
self.end_headers()
self.wfile.write(content)
else:
self.send_response(502)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(json.dumps({"error": "Konnte PGN nicht laden"}).encode())
elif parsed_path.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "no-cache")
self.end_headers()
status = {
"status": "ok",
"last_fetch": last_fetch_time,
"cache_ttl": CACHE_TTL,
"server_time": time.time()
}
self.wfile.write(json.dumps(status).encode())
elif parsed_path.path == "/standings":
data = get_standings_content()
if data:
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Cache-Control", "max-age=300")
self.end_headers()
self.wfile.write(json.dumps(data).encode())
else:
self.send_response(502)
self.send_header("Content-Type", "application/json")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
err = {"error": "Tabellendaten nicht verf\u00fcgbar"}
self.wfile.write(json.dumps(err).encode())
else:
# Statische Dateien aus dem Verzeichnis
if self.path == "/":
self.path = "/index.html"
filepath = os.path.join(os.path.dirname(os.path.abspath(__file__)), self.path.lstrip("/"))
if os.path.isfile(filepath):
content_types = {
".html": "text/html",
".css": "text/css",
".js": "application/javascript",
".json": "application/json",
".png": "image/png",
".jpg": "image/jpeg",
".svg": "image/svg+xml",
}
ext = os.path.splitext(filepath)[1]
content_type = content_types.get(ext, "application/octet-stream")
with open(filepath, "rb") as f:
content = f.read()
self.send_response(200)
self.send_header("Content-Type", f"{content_type}; charset=utf-8")
self.send_header("Access-Control-Allow-Origin", "*")
self.end_headers()
self.wfile.write(content)
else:
self.send_response(404)
self.end_headers()
def log_message(self, format, *args):
print(f"[{datetime.now().strftime('%H:%M:%S')}] {args[0]}")
def background_refresh():
"""Aktualisiert den Cache im Hintergrund."""
while True:
time.sleep(CACHE_TTL)
try:
content, changed = fetch_pgn()
if changed and content is not None:
cache_file = get_cache_file()
with open(cache_file, "wb") as f:
f.write(content)
except Exception as e:
print(f"[{datetime.now().strftime('%H:%M:%S')}] Hintergrund-Refresh Fehler: {e}")
def main():
print("=" * 50)
print(" [TROPHY] Lara Kiesewetter Live Schachturnier")
print("=" * 50)
print(f" Server läuft auf: http://localhost:{PORT}")
print(f" Drücke Ctrl+C zum Beenden")
print("=" * 50)
# Runde aus Tabelle ermitteln
standings = fetch_standings()
if standings and current_round > 0:
print(f"[OK] Aktuelle Runde: {current_round}")
else:
current_round = 1
print("[INFO] Starte mit Runde 1 (Tabelle noch nicht verfügbar)")
content, changed = fetch_pgn()
if changed and content is not None:
cache_file = get_cache_file()
with open(cache_file, "wb") as f:
f.write(content)
print(f"[OK] PGN geladen ({len(content)} Bytes)")
else:
print("[WARN] Initialer Ladeversuch fehlgeschlagen, wird wiederholt...")
# Hintergrund-Refresh Thread starten
refresh_thread = threading.Thread(target=background_refresh, daemon=True)
refresh_thread.start()
# Server starten
with socketserver.TCPServer(("", PORT), PGNHandler) as httpd:
print(f"\n[SERVER] Server gestartet: http://localhost:{PORT}\n")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n[BYE] Server gestoppt.")
if __name__ == "__main__":
main()