From 43bf686ef4e317bb9fbc335889292485f927acce Mon Sep 17 00:00:00 2001 From: s4luorth Date: Sat, 7 Feb 2026 13:00:27 +0100 Subject: [PATCH] Initial commit --- .claude/settings.local.json | 8 + .gitignore | Bin 0 -> 12 bytes website-checker/app.py | 619 ++++++++++++++++++++++++++++++++++++ 3 files changed, 627 insertions(+) create mode 100644 .claude/settings.local.json create mode 100644 .gitignore create mode 100644 website-checker/app.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..7d208ae --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,8 @@ +{ + "permissions": { + "allow": [ + "Bash(pip install:*)", + "Bash(python:*)" + ] + } +} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000000000000000000000000000000000000..d255ddcaf775667bb0d261339b59157a22004a46 GIT binary patch literal 12 TcmezWFOQ*=A%}sNfr|kEA+7^t literal 0 HcmV?d00001 diff --git a/website-checker/app.py b/website-checker/app.py new file mode 100644 index 0000000..c88e863 --- /dev/null +++ b/website-checker/app.py @@ -0,0 +1,619 @@ +""" +LanguageTool Website Checker - Backend +Crawlt eine Website und prüft alle Seiten auf Rechtschreibung, Grammatik und Stil. +""" + +import asyncio +import json +import re +import threading +import time +import uuid +from pathlib import Path +from urllib.parse import urljoin, urlparse + +import requests +import uvicorn +from bs4 import BeautifulSoup +from fastapi import FastAPI, Request +from fastapi.responses import HTMLResponse, StreamingResponse +from fastapi.templating import Jinja2Templates + +app = FastAPI() +templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates")) + +# In-memory storage for active check sessions +sessions: dict[str, dict] = {} + +IGNORE_SELECTORS = [ + "nav", "footer", "header nav", ".cookie", "#cookie", ".cookie-banner", + "#cookie-banner", ".cookie-consent", "#cookie-consent", ".nav", ".navbar", + ".footer", "#footer", "#nav", "#navbar", "script", "style", "noscript", + "iframe", ".sidebar", "#sidebar", "[role='navigation']", "[role='banner']", + ".menu", "#menu", ".breadcrumb", ".pagination", +] + +CHUNK_SIZE = 9000 # stay under 10k limit + + +def normalize_url(url: str) -> str: + """Remove fragment and trailing slash for dedup.""" + parsed = urlparse(url) + path = parsed.path.rstrip("/") or "/" + return f"{parsed.scheme}://{parsed.netloc}{path}" + + +def check_domain_reachable(domain: str) -> str: + """Check if a domain is reachable. Returns the working base URL or raises ValueError.""" + for protocol in ["https", "http"]: + try: + resp = requests.get( + f"{protocol}://{domain}", + timeout=10, + headers={"User-Agent": "LanguageToolChecker/1.0"}, + allow_redirects=True, + ) + if resp.status_code < 500: + return f"{protocol}://{domain}" + except requests.exceptions.ConnectionError: + continue + except requests.exceptions.Timeout: + raise ValueError(f"Timeout: {domain} antwortet nicht innerhalb von 10 Sekunden") + except Exception: + continue + raise ValueError( + f"Domain '{domain}' ist nicht erreichbar. " + f"Bitte prüfe die Schreibweise und ob die Website online ist." + ) + + +def extract_sitemap_urls(domain: str) -> list[str]: + """Try to load sitemap.xml and extract URLs.""" + urls = [] + for protocol in ["https", "http"]: + try: + resp = requests.get( + f"{protocol}://{domain}/sitemap.xml", timeout=10, + headers={"User-Agent": "LanguageToolChecker/1.0"}, + ) + if resp.status_code == 200 and " list[str]: + """Crawl internal links starting from the homepage.""" + base_url = f"https://{domain}" + visited = set() + to_visit = [base_url] + found_urls = [] + + while to_visit and len(found_urls) < max_pages: + url = to_visit.pop(0) + norm = normalize_url(url) + if norm in visited: + continue + visited.add(norm) + + try: + resp = requests.get(url, timeout=10, headers={"User-Agent": "LanguageToolChecker/1.0"}) + if resp.status_code != 200: + continue + content_type = resp.headers.get("content-type", "") + if "text/html" not in content_type: + continue + except Exception: + continue + + found_urls.append(url) + if progress_cb: + progress_cb(f"Crawling: {len(found_urls)} Seiten gefunden...") + + soup = BeautifulSoup(resp.text, "lxml") + for a_tag in soup.find_all("a", href=True): + href = a_tag["href"] + full_url = urljoin(url, href) + parsed = urlparse(full_url) + + # Only same domain, no fragments, no file extensions + if parsed.netloc and parsed.netloc != domain: + continue + if not parsed.netloc: + full_url = urljoin(base_url, href) + parsed = urlparse(full_url) + + skip_ext = (".pdf", ".jpg", ".jpeg", ".png", ".gif", ".svg", + ".zip", ".mp3", ".mp4", ".doc", ".docx", ".xls", ".xlsx") + if any(parsed.path.lower().endswith(ext) for ext in skip_ext): + continue + + clean = normalize_url(full_url) + if clean not in visited and len(to_visit) < max_pages * 3: + to_visit.append(full_url) + + return found_urls + + +def extract_page_content(html: str, url: str) -> dict: + """Extract visible text from a page, split by semantic sections.""" + soup = BeautifulSoup(html, "lxml") + + # Remove ignored elements + for selector in IGNORE_SELECTORS: + try: + for el in soup.select(selector): + el.decompose() + except Exception: + continue + + sections = [] + + # Title tag + title = soup.find("title") + if title and title.string and title.string.strip(): + sections.append({"type": "Title", "text": title.string.strip()}) + + # Meta description + meta_desc = soup.find("meta", attrs={"name": "description"}) + if meta_desc and meta_desc.get("content", "").strip(): + sections.append({"type": "Meta Description", "text": meta_desc["content"].strip()}) + + # Body content + body = soup.find("body") + if not body: + return {"url": url, "sections": sections} + + # Walk the DOM in document order, skipping children of already-captured elements. + # "Block" tags are content containers (headings, paragraphs, list items, etc.) + # "Inline" tags (a, button) are only captured when they are NOT inside a block tag. + block_tags = { + "h1", "h2", "h3", "h4", "h5", "h6", + "p", "li", "blockquote", "figcaption", "td", "th", + } + inline_tags = {"a", "button"} + tag_labels = { + "h1": "Überschrift (H1)", "h2": "Überschrift (H2)", "h3": "Überschrift (H3)", + "h4": "Überschrift (H4)", "h5": "Überschrift (H5)", "h6": "Überschrift (H6)", + "p": "Absatz", "li": "Listeneintrag", "blockquote": "Zitat", + "figcaption": "Bildunterschrift", "td": "Tabellenzelle", "th": "Tabellenkopf", + "button": "Button", "a": "Link-Text", + } + all_relevant = block_tags | inline_tags + + seen_texts = set() + captured_elements = set() # track element ids so children are skipped + + for el in body.find_all(all_relevant): + # Skip if this element is nested inside an already-captured block element + skip = False + for parent in el.parents: + if id(parent) in captured_elements: + skip = True + break + if parent is body: + break + if skip: + continue + + tag_name = el.name + text = el.get_text(separator=" ", strip=True) + text = re.sub(r"\s+", " ", text).strip() + + if not text or len(text) <= 2 or text in seen_texts: + continue + if tag_name in inline_tags and (len(text) <= 3 or len(text) >= 200): + continue + + seen_texts.add(text) + label = tag_labels.get(tag_name, tag_name) + sections.append({"type": label, "text": text}) + + # Mark block elements as captured so their children are skipped + if tag_name in block_tags: + captured_elements.add(id(el)) + + return {"url": url, "sections": sections} + + +def check_text_with_languagetool( + text: str, language: str, username: str, api_key: str +) -> list[dict]: + """Send text to LanguageTool API and return matches.""" + if not text.strip(): + return [] + + all_matches = [] + chunks = [] + + if len(text) > CHUNK_SIZE: + # Split at sentence boundaries + sentences = re.split(r"(?<=[.!?])\s+", text) + current_chunk = "" + current_offset = 0 + for sentence in sentences: + if len(current_chunk) + len(sentence) + 1 > CHUNK_SIZE: + if current_chunk: + chunks.append((current_chunk, current_offset)) + current_offset += len(current_chunk) + 1 + current_chunk = sentence + else: + if current_chunk: + current_chunk += " " + sentence + else: + current_chunk = sentence + if current_chunk: + chunks.append((current_chunk, current_offset)) + else: + chunks = [(text, 0)] + + for chunk_text, offset in chunks: + data = { + "text": chunk_text, + "language": language, + "username": username, + "apiKey": api_key, + "enabledOnly": "false", + } + + try: + resp = requests.post( + "https://api.languagetoolplus.com/v2/check", + data=data, + timeout=30, + ) + if resp.status_code == 200: + result = resp.json() + for match in result.get("matches", []): + match["offset"] += offset + all_matches.append(match) + elif resp.status_code in (401, 403): + raise ValueError(f"API-Authentifizierung fehlgeschlagen: {resp.text}") + else: + raise ValueError(f"LanguageTool API Fehler ({resp.status_code}): {resp.text}") + except requests.exceptions.RequestException as e: + raise ValueError(f"Netzwerkfehler bei LanguageTool API: {e}") + + # Rate limiting: max 2 requests/sec + time.sleep(0.5) + + return all_matches + + +def categorize_match(match: dict) -> str: + """Categorize a LanguageTool match into spelling/grammar/style.""" + rule = match.get("rule", {}) + category_id = rule.get("category", {}).get("id", "") + issue_type = rule.get("issueType", "") + + if category_id in ("TYPOS", "SPELLING") or issue_type == "misspelling": + return "spelling" + elif category_id in ("GRAMMAR", "PUNCTUATION", "SYNTAX") or issue_type == "grammar": + return "grammar" + else: + return "style" + + +def process_matches_for_section(section_text: str, matches: list[dict], section_offset: int) -> list[dict]: + """Filter matches that belong to this section and adjust offsets.""" + section_matches = [] + section_end = section_offset + len(section_text) + + for match in matches: + m_offset = match.get("offset", 0) + m_length = match.get("length", 0) + + if m_offset >= section_offset and m_offset + m_length <= section_end: + section_matches.append({ + "offset": m_offset - section_offset, + "length": m_length, + "message": match.get("message", ""), + "shortMessage": match.get("shortMessage", ""), + "replacements": [r.get("value", "") for r in match.get("replacements", [])[:5]], + "category": categorize_match(match), + "rule": match.get("rule", {}).get("description", ""), + "context_text": match.get("context", {}).get("text", ""), + }) + + return section_matches + + +def crawl_domain(domain: str, max_pages: int) -> dict: + """Crawl a domain and return the list of found URLs (synchronous).""" + check_domain_reachable(domain) + + urls = extract_sitemap_urls(domain) + sitemap_used = bool(urls) + + if not urls: + urls = crawl_links(domain, max_pages) + + urls = urls[:max_pages] + return {"urls": urls, "sitemap_used": sitemap_used} + + +def run_check(session_id: str, domain: str, language: str, + username: str, api_key: str, urls: list[str]): + """Run the check pipeline for a given list of URLs (synchronous, runs in a thread).""" + session = sessions[session_id] + + try: + session["status"] = "checking" + session["progress"] = {"current": 0, "total": len(urls), "page": ""} + session["message"] = f"Prüfe {len(urls)} Seiten..." + + results = [] + total_errors = {"spelling": 0, "grammar": 0, "style": 0} + + for i, url in enumerate(urls): + session["progress"]["current"] = i + 1 + session["progress"]["page"] = url + session["message"] = f"Prüfe Seite {i + 1}/{len(urls)}: {url}" + + page_result = { + "url": url, + "sections": [], + "error_count": {"spelling": 0, "grammar": 0, "style": 0}, + "total_errors": 0, + "skipped": False, + "error_message": None, + } + + try: + resp = requests.get( + url, timeout=10, + headers={"User-Agent": "LanguageToolChecker/1.0"} + ) + if resp.status_code != 200: + page_result["skipped"] = True + page_result["error_message"] = f"HTTP {resp.status_code}" + results.append(page_result) + continue + + content_type = resp.headers.get("content-type", "") + if "text/html" not in content_type: + page_result["skipped"] = True + page_result["error_message"] = f"Kein HTML: {content_type}" + results.append(page_result) + continue + + page_data = extract_page_content(resp.text, url) + + # Build full text for API check + full_text_parts = [] + section_offsets = [] + current_offset = 0 + + for section in page_data["sections"]: + section_offsets.append(current_offset) + full_text_parts.append(section["text"]) + current_offset += len(section["text"]) + 1 # +1 for newline + + full_text = "\n".join(full_text_parts) + + if not full_text.strip(): + page_result["sections"] = [ + {"type": s["type"], "text": s["text"], "matches": []} + for s in page_data["sections"] + ] + results.append(page_result) + continue + + # Check with LanguageTool + try: + matches = check_text_with_languagetool( + full_text, language, username, api_key + ) + except ValueError as e: + error_msg = str(e) + if "Authentifizierung" in error_msg: + session["status"] = "error" + session["message"] = error_msg + return + page_result["skipped"] = True + page_result["error_message"] = error_msg + results.append(page_result) + continue + + # Distribute matches to sections + for j, section in enumerate(page_data["sections"]): + sec_offset = section_offsets[j] if j < len(section_offsets) else 0 + sec_matches = process_matches_for_section( + section["text"], matches, sec_offset + ) + page_result["sections"].append({ + "type": section["type"], + "text": section["text"], + "matches": sec_matches, + }) + + for m in sec_matches: + cat = m["category"] + page_result["error_count"][cat] += 1 + total_errors[cat] += 1 + + page_result["total_errors"] = sum(page_result["error_count"].values()) + + except requests.exceptions.Timeout: + page_result["skipped"] = True + page_result["error_message"] = "Timeout (>10s)" + except Exception as e: + page_result["skipped"] = True + page_result["error_message"] = str(e)[:200] + + results.append(page_result) + + # Sort by error count descending + results.sort(key=lambda r: r["total_errors"], reverse=True) + + session["status"] = "done" + session["message"] = "Prüfung abgeschlossen" + session["results"] = { + "domain": domain, + "language": language, + "pages_checked": len(results), + "pages_skipped": sum(1 for r in results if r["skipped"]), + "total_errors": total_errors, + "pages": results, + } + + except Exception as e: + session["status"] = "error" + session["message"] = f"Unerwarteter Fehler: {str(e)[:300]}" + + +@app.get("/", response_class=HTMLResponse) +async def index(request: Request): + return templates.TemplateResponse("index.html", {"request": request}) + + +@app.post("/api/crawl") +async def crawl(request: Request): + """Crawl a domain and return the list of URLs for preview.""" + body = await request.json() + domain = body.get("domain", "").strip().lower() + max_pages = int(body.get("maxPages", 50)) + + domain = re.sub(r"^https?://", "", domain) + domain = domain.rstrip("/") + + if not domain: + return {"error": "Bitte eine Domain eingeben"} + + try: + result = crawl_domain(domain, max_pages) + return { + "urls": result["urls"], + "sitemap_used": result["sitemap_used"], + "domain": domain, + } + except ValueError as e: + return {"error": str(e)} + + +@app.post("/api/check") +async def start_check(request: Request): + body = await request.json() + domain = body.get("domain", "").strip().lower() + language = body.get("language", "de-DE") + username = body.get("username", "").strip() + api_key = body.get("apiKey", "").strip() + urls = body.get("urls", []) + + domain = re.sub(r"^https?://", "", domain) + domain = domain.rstrip("/") + + if not domain: + return {"error": "Bitte eine Domain eingeben"} + if not username or not api_key: + return {"error": "Bitte LanguageTool Credentials eingeben"} + if not urls: + return {"error": "Keine URLs zum Prüfen ausgewählt"} + + session_id = str(uuid.uuid4()) + sessions[session_id] = { + "status": "starting", + "message": "Starte Prüfung...", + "progress": {"current": 0, "total": 0, "page": ""}, + "results": None, + } + + thread = threading.Thread( + target=run_check, + args=(session_id, domain, language, username, api_key, urls), + daemon=True, + ) + thread.start() + + return {"sessionId": session_id} + + +@app.get("/api/status/{session_id}") +async def get_status(session_id: str): + session = sessions.get(session_id) + if not session: + return {"error": "Session nicht gefunden"} + + return { + "status": session["status"], + "message": session["message"], + "progress": session["progress"], + } + + +@app.get("/api/results/{session_id}") +async def get_results(session_id: str): + session = sessions.get(session_id) + if not session: + return {"error": "Session nicht gefunden"} + if session["status"] != "done": + return {"error": "Prüfung noch nicht abgeschlossen", "status": session["status"]} + + results = session["results"] + # Clean up session after delivering results + # (keep it around for a bit in case of re-requests) + return results + + +@app.get("/api/stream/{session_id}") +async def stream_status(session_id: str): + """SSE endpoint for live progress updates.""" + async def event_generator(): + while True: + session = sessions.get(session_id) + if not session: + yield f"data: {json.dumps({'status': 'error', 'message': 'Session nicht gefunden'})}\n\n" + break + + payload = { + "status": session["status"], + "message": session["message"], + "progress": session["progress"], + } + yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" + + if session["status"] in ("done", "error"): + break + + await asyncio.sleep(0.5) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +if __name__ == "__main__": + print("\n LanguageTool Website Checker") + print(" ============================") + print(" Öffne http://localhost:8000 im Browser\n") + uvicorn.run(app, host="0.0.0.0", port=8000)