""" LanguageTool Website Checker - Backend Crawlt eine Website und prüft alle Seiten auf Rechtschreibung, Grammatik und Stil. """ import asyncio import json import re import threading import time import uuid from pathlib import Path from urllib.parse import urljoin, urlparse import requests import uvicorn from bs4 import BeautifulSoup from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, StreamingResponse from fastapi.templating import Jinja2Templates app = FastAPI() templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates")) # In-memory storage for active check sessions sessions: dict[str, dict] = {} IGNORE_SELECTORS = [ "nav", "footer", "header nav", ".cookie", "#cookie", ".cookie-banner", "#cookie-banner", ".cookie-consent", "#cookie-consent", ".nav", ".navbar", ".footer", "#footer", "#nav", "#navbar", "script", "style", "noscript", "iframe", ".sidebar", "#sidebar", "[role='navigation']", "[role='banner']", ".menu", "#menu", ".breadcrumb", ".pagination", ] CHUNK_SIZE = 9000 # stay under 10k limit # LanguageTool rule IDs to silently ignore IGNORED_RULES = { "LEERZEICHEN_VOR_SATZZEICHEN", "WHITESPACE_RULE", "WHITESPACE_BEFORE_PUNCTUATION", "LEERZEICHEN_VOR_DOPPELPUNKT", "LEERZEICHEN_VOR_SEMIKOLON", "LEERZEICHEN_VOR_AUSRUFEZEICHEN", "LEERZEICHEN_VOR_FRAGEZEICHEN", } # Regex: space(s) directly before punctuation — catch any remaining cases SPACE_BEFORE_PUNCT_RE = re.compile(r"\s+[,.:;!?\"\u201C\u201D\u201E\u201F]") def normalize_url(url: str) -> str: """Remove fragment and trailing slash for dedup.""" parsed = urlparse(url) path = parsed.path.rstrip("/") or "/" return f"{parsed.scheme}://{parsed.netloc}{path}" def check_domain_reachable(domain: str) -> str: """Check if a domain is reachable. Returns the working base URL or raises ValueError.""" for protocol in ["https", "http"]: try: resp = requests.get( f"{protocol}://{domain}", timeout=10, headers={"User-Agent": "LanguageToolChecker/1.0"}, allow_redirects=True, ) if resp.status_code < 500: return f"{protocol}://{domain}" except requests.exceptions.ConnectionError: continue except requests.exceptions.Timeout: raise ValueError(f"Timeout: {domain} antwortet nicht innerhalb von 10 Sekunden") except Exception: continue raise ValueError( f"Domain '{domain}' ist nicht erreichbar. " f"Bitte prüfe die Schreibweise und ob die Website online ist." ) def extract_sitemap_urls(domain: str) -> list[str]: """Try to load sitemap.xml and extract URLs.""" urls = [] for protocol in ["https", "http"]: try: resp = requests.get( f"{protocol}://{domain}/sitemap.xml", timeout=10, headers={"User-Agent": "LanguageToolChecker/1.0"}, ) if resp.status_code == 200 and " list[str]: """Crawl internal links starting from the homepage.""" base_url = f"https://{domain}" visited = set() to_visit = [base_url] found_urls = [] while to_visit and len(found_urls) < max_pages: url = to_visit.pop(0) norm = normalize_url(url) if norm in visited: continue visited.add(norm) try: resp = requests.get(url, timeout=10, headers={"User-Agent": "LanguageToolChecker/1.0"}) if resp.status_code != 200: continue content_type = resp.headers.get("content-type", "") if "text/html" not in content_type: continue except Exception: continue found_urls.append(url) if progress_cb: progress_cb(f"Crawling: {len(found_urls)} Seiten gefunden...") soup = BeautifulSoup(resp.text, "lxml") for a_tag in soup.find_all("a", href=True): href = a_tag["href"] full_url = urljoin(url, href) parsed = urlparse(full_url) # Only same domain, no fragments, no file extensions if parsed.netloc and parsed.netloc != domain: continue if not parsed.netloc: full_url = urljoin(base_url, href) parsed = urlparse(full_url) skip_ext = (".pdf", ".jpg", ".jpeg", ".png", ".gif", ".svg", ".zip", ".mp3", ".mp4", ".doc", ".docx", ".xls", ".xlsx") if any(parsed.path.lower().endswith(ext) for ext in skip_ext): continue clean = normalize_url(full_url) if clean not in visited and len(to_visit) < max_pages * 3: to_visit.append(full_url) return found_urls def extract_page_content(html: str, url: str) -> dict: """Extract visible text from a page, split by semantic sections.""" soup = BeautifulSoup(html, "lxml") # Remove ignored elements for selector in IGNORE_SELECTORS: try: for el in soup.select(selector): el.decompose() except Exception: continue sections = [] # Title tag title = soup.find("title") if title and title.string and title.string.strip(): sections.append({"type": "Title", "text": title.string.strip()}) # Meta description meta_desc = soup.find("meta", attrs={"name": "description"}) if meta_desc and meta_desc.get("content", "").strip(): sections.append({"type": "Meta Description", "text": meta_desc["content"].strip()}) # Body content body = soup.find("body") if not body: return {"url": url, "sections": sections} # Walk the DOM in document order, skipping children of already-captured elements. # "Block" tags are content containers (headings, paragraphs, list items, etc.) # "Inline" tags (a, button) are only captured when they are NOT inside a block tag. block_tags = { "h1", "h2", "h3", "h4", "h5", "h6", "p", "li", "blockquote", "figcaption", "td", "th", } inline_tags = {"a", "button"} tag_labels = { "h1": "Überschrift (H1)", "h2": "Überschrift (H2)", "h3": "Überschrift (H3)", "h4": "Überschrift (H4)", "h5": "Überschrift (H5)", "h6": "Überschrift (H6)", "p": "Absatz", "li": "Listeneintrag", "blockquote": "Zitat", "figcaption": "Bildunterschrift", "td": "Tabellenzelle", "th": "Tabellenkopf", "button": "Button", "a": "Link-Text", } all_relevant = block_tags | inline_tags seen_texts = set() captured_elements = set() # track element ids so children are skipped for el in body.find_all(all_relevant): # Skip if this element is nested inside an already-captured block element skip = False for parent in el.parents: if id(parent) in captured_elements: skip = True break if parent is body: break if skip: continue tag_name = el.name text = el.get_text(separator=" ", strip=True) text = re.sub(r"\s+", " ", text).strip() if not text or len(text) <= 2 or text in seen_texts: continue if tag_name in inline_tags and (len(text) <= 3 or len(text) >= 200): continue seen_texts.add(text) label = tag_labels.get(tag_name, tag_name) sections.append({"type": label, "text": text}) # Mark block elements as captured so their children are skipped if tag_name in block_tags: captured_elements.add(id(el)) return {"url": url, "sections": sections} def check_text_with_languagetool( text: str, language: str, username: str, api_key: str ) -> list[dict]: """Send text to LanguageTool API and return matches.""" if not text.strip(): return [] all_matches = [] chunks = [] if len(text) > CHUNK_SIZE: # Split at sentence boundaries sentences = re.split(r"(?<=[.!?])\s+", text) current_chunk = "" current_offset = 0 for sentence in sentences: if len(current_chunk) + len(sentence) + 1 > CHUNK_SIZE: if current_chunk: chunks.append((current_chunk, current_offset)) current_offset += len(current_chunk) + 1 current_chunk = sentence else: if current_chunk: current_chunk += " " + sentence else: current_chunk = sentence if current_chunk: chunks.append((current_chunk, current_offset)) else: chunks = [(text, 0)] for chunk_text, offset in chunks: data = { "text": chunk_text, "language": language, "username": username, "apiKey": api_key, "enabledOnly": "false", } try: resp = requests.post( "https://api.languagetoolplus.com/v2/check", data=data, timeout=30, ) if resp.status_code == 200: result = resp.json() for match in result.get("matches", []): rule_id = match.get("rule", {}).get("id", "") if rule_id in IGNORED_RULES: continue # Extra filter: skip any match that is just whitespace before punctuation m_off = match.get("offset", 0) m_len = match.get("length", 0) matched_text = chunk_text[m_off:m_off + m_len] if SPACE_BEFORE_PUNCT_RE.fullmatch(matched_text): continue match["offset"] += offset all_matches.append(match) elif resp.status_code in (401, 403): raise ValueError(f"API-Authentifizierung fehlgeschlagen: {resp.text}") else: raise ValueError(f"LanguageTool API Fehler ({resp.status_code}): {resp.text}") except requests.exceptions.RequestException as e: raise ValueError(f"Netzwerkfehler bei LanguageTool API: {e}") # Rate limiting: max 2 requests/sec time.sleep(0.5) return all_matches def categorize_match(match: dict) -> str: """Categorize a LanguageTool match into spelling/grammar/style.""" rule = match.get("rule", {}) category_id = rule.get("category", {}).get("id", "") issue_type = rule.get("issueType", "") if category_id in ("TYPOS", "SPELLING") or issue_type == "misspelling": return "spelling" elif category_id in ("GRAMMAR", "PUNCTUATION", "SYNTAX") or issue_type == "grammar": return "grammar" else: return "style" def process_matches_for_section(section_text: str, matches: list[dict], section_offset: int) -> list[dict]: """Filter matches that belong to this section and adjust offsets.""" section_matches = [] section_end = section_offset + len(section_text) for match in matches: m_offset = match.get("offset", 0) m_length = match.get("length", 0) if m_offset >= section_offset and m_offset + m_length <= section_end: section_matches.append({ "offset": m_offset - section_offset, "length": m_length, "message": match.get("message", ""), "shortMessage": match.get("shortMessage", ""), "replacements": [r.get("value", "") for r in match.get("replacements", [])[:5]], "category": categorize_match(match), "rule": match.get("rule", {}).get("description", ""), "context_text": match.get("context", {}).get("text", ""), }) return section_matches def crawl_domain(domain: str, max_pages: int) -> dict: """Crawl a domain and return the list of found URLs (synchronous).""" check_domain_reachable(domain) urls = extract_sitemap_urls(domain) sitemap_used = bool(urls) if not urls: urls = crawl_links(domain, max_pages) urls = urls[:max_pages] return {"urls": urls, "sitemap_used": sitemap_used} def run_check(session_id: str, domain: str, language: str, username: str, api_key: str, urls: list[str]): """Run the check pipeline for a given list of URLs (synchronous, runs in a thread).""" session = sessions[session_id] try: session["status"] = "checking" session["progress"] = {"current": 0, "total": len(urls), "page": ""} session["message"] = f"Prüfe {len(urls)} Seiten..." results = [] total_errors = {"spelling": 0, "grammar": 0, "style": 0} for i, url in enumerate(urls): session["progress"]["current"] = i + 1 session["progress"]["page"] = url session["message"] = f"Prüfe Seite {i + 1}/{len(urls)}: {url}" page_result = { "url": url, "sections": [], "error_count": {"spelling": 0, "grammar": 0, "style": 0}, "total_errors": 0, "skipped": False, "error_message": None, } try: resp = requests.get( url, timeout=10, headers={"User-Agent": "LanguageToolChecker/1.0"} ) if resp.status_code != 200: page_result["skipped"] = True page_result["error_message"] = f"HTTP {resp.status_code}" results.append(page_result) continue content_type = resp.headers.get("content-type", "") if "text/html" not in content_type: page_result["skipped"] = True page_result["error_message"] = f"Kein HTML: {content_type}" results.append(page_result) continue page_data = extract_page_content(resp.text, url) # Build full text for API check full_text_parts = [] section_offsets = [] current_offset = 0 for section in page_data["sections"]: section_offsets.append(current_offset) full_text_parts.append(section["text"]) current_offset += len(section["text"]) + 1 # +1 for newline full_text = "\n".join(full_text_parts) if not full_text.strip(): page_result["sections"] = [ {"type": s["type"], "text": s["text"], "matches": []} for s in page_data["sections"] ] results.append(page_result) continue # Check with LanguageTool try: matches = check_text_with_languagetool( full_text, language, username, api_key ) except ValueError as e: error_msg = str(e) if "Authentifizierung" in error_msg: session["status"] = "error" session["message"] = error_msg return page_result["skipped"] = True page_result["error_message"] = error_msg results.append(page_result) continue # Distribute matches to sections for j, section in enumerate(page_data["sections"]): sec_offset = section_offsets[j] if j < len(section_offsets) else 0 sec_matches = process_matches_for_section( section["text"], matches, sec_offset ) page_result["sections"].append({ "type": section["type"], "text": section["text"], "matches": sec_matches, }) for m in sec_matches: cat = m["category"] page_result["error_count"][cat] += 1 total_errors[cat] += 1 page_result["total_errors"] = sum(page_result["error_count"].values()) except requests.exceptions.Timeout: page_result["skipped"] = True page_result["error_message"] = "Timeout (>10s)" except Exception as e: page_result["skipped"] = True page_result["error_message"] = str(e)[:200] results.append(page_result) # Sort by error count descending results.sort(key=lambda r: r["total_errors"], reverse=True) session["status"] = "done" session["message"] = "Prüfung abgeschlossen" session["results"] = { "domain": domain, "language": language, "pages_checked": len(results), "pages_skipped": sum(1 for r in results if r["skipped"]), "total_errors": total_errors, "pages": results, } except Exception as e: session["status"] = "error" session["message"] = f"Unerwarteter Fehler: {str(e)[:300]}" @app.get("/", response_class=HTMLResponse) async def index(request: Request): return templates.TemplateResponse("index.html", {"request": request}) @app.post("/api/crawl") async def crawl(request: Request): """Crawl a domain and return the list of URLs for preview.""" body = await request.json() domain = body.get("domain", "").strip().lower() max_pages = int(body.get("maxPages", 50)) domain = re.sub(r"^https?://", "", domain) domain = domain.rstrip("/") if not domain: return {"error": "Bitte eine Domain eingeben"} try: result = crawl_domain(domain, max_pages) return { "urls": result["urls"], "sitemap_used": result["sitemap_used"], "domain": domain, } except ValueError as e: return {"error": str(e)} @app.post("/api/check") async def start_check(request: Request): body = await request.json() domain = body.get("domain", "").strip().lower() language = body.get("language", "de-DE") username = body.get("username", "").strip() api_key = body.get("apiKey", "").strip() urls = body.get("urls", []) domain = re.sub(r"^https?://", "", domain) domain = domain.rstrip("/") if not domain: return {"error": "Bitte eine Domain eingeben"} if not username or not api_key: return {"error": "Bitte LanguageTool Credentials eingeben"} if not urls: return {"error": "Keine URLs zum Prüfen ausgewählt"} session_id = str(uuid.uuid4()) sessions[session_id] = { "status": "starting", "message": "Starte Prüfung...", "progress": {"current": 0, "total": 0, "page": ""}, "results": None, } thread = threading.Thread( target=run_check, args=(session_id, domain, language, username, api_key, urls), daemon=True, ) thread.start() return {"sessionId": session_id} @app.get("/api/status/{session_id}") async def get_status(session_id: str): session = sessions.get(session_id) if not session: return {"error": "Session nicht gefunden"} return { "status": session["status"], "message": session["message"], "progress": session["progress"], } @app.get("/api/results/{session_id}") async def get_results(session_id: str): session = sessions.get(session_id) if not session: return {"error": "Session nicht gefunden"} if session["status"] != "done": return {"error": "Prüfung noch nicht abgeschlossen", "status": session["status"]} results = session["results"] # Clean up session after delivering results # (keep it around for a bit in case of re-requests) return results @app.get("/api/stream/{session_id}") async def stream_status(session_id: str): """SSE endpoint for live progress updates.""" async def event_generator(): while True: session = sessions.get(session_id) if not session: yield f"data: {json.dumps({'status': 'error', 'message': 'Session nicht gefunden'})}\n\n" break payload = { "status": session["status"], "message": session["message"], "progress": session["progress"], } yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n" if session["status"] in ("done", "error"): break await asyncio.sleep(0.5) return StreamingResponse( event_generator(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) if __name__ == "__main__": print("\n LanguageTool Website Checker") print(" ============================") print(" Öffne http://localhost:8000 im Browser\n") uvicorn.run(app, host="0.0.0.0", port=8000)