Initial commit

This commit is contained in:
s4luorth
2026-02-07 13:00:27 +01:00
commit 43bf686ef4
3 changed files with 627 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
{
"permissions": {
"allow": [
"Bash(pip install:*)",
"Bash(python:*)"
]
}
}

BIN
.gitignore vendored Normal file

Binary file not shown.

619
website-checker/app.py Normal file
View File

@@ -0,0 +1,619 @@
"""
LanguageTool Website Checker - Backend
Crawlt eine Website und prüft alle Seiten auf Rechtschreibung, Grammatik und Stil.
"""
import asyncio
import json
import re
import threading
import time
import uuid
from pathlib import Path
from urllib.parse import urljoin, urlparse
import requests
import uvicorn
from bs4 import BeautifulSoup
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastapi.templating import Jinja2Templates
app = FastAPI()
templates = Jinja2Templates(directory=str(Path(__file__).parent / "templates"))
# In-memory storage for active check sessions
sessions: dict[str, dict] = {}
IGNORE_SELECTORS = [
"nav", "footer", "header nav", ".cookie", "#cookie", ".cookie-banner",
"#cookie-banner", ".cookie-consent", "#cookie-consent", ".nav", ".navbar",
".footer", "#footer", "#nav", "#navbar", "script", "style", "noscript",
"iframe", ".sidebar", "#sidebar", "[role='navigation']", "[role='banner']",
".menu", "#menu", ".breadcrumb", ".pagination",
]
CHUNK_SIZE = 9000 # stay under 10k limit
def normalize_url(url: str) -> str:
"""Remove fragment and trailing slash for dedup."""
parsed = urlparse(url)
path = parsed.path.rstrip("/") or "/"
return f"{parsed.scheme}://{parsed.netloc}{path}"
def check_domain_reachable(domain: str) -> str:
"""Check if a domain is reachable. Returns the working base URL or raises ValueError."""
for protocol in ["https", "http"]:
try:
resp = requests.get(
f"{protocol}://{domain}",
timeout=10,
headers={"User-Agent": "LanguageToolChecker/1.0"},
allow_redirects=True,
)
if resp.status_code < 500:
return f"{protocol}://{domain}"
except requests.exceptions.ConnectionError:
continue
except requests.exceptions.Timeout:
raise ValueError(f"Timeout: {domain} antwortet nicht innerhalb von 10 Sekunden")
except Exception:
continue
raise ValueError(
f"Domain '{domain}' ist nicht erreichbar. "
f"Bitte prüfe die Schreibweise und ob die Website online ist."
)
def extract_sitemap_urls(domain: str) -> list[str]:
"""Try to load sitemap.xml and extract URLs."""
urls = []
for protocol in ["https", "http"]:
try:
resp = requests.get(
f"{protocol}://{domain}/sitemap.xml", timeout=10,
headers={"User-Agent": "LanguageToolChecker/1.0"},
)
if resp.status_code == 200 and "<url" in resp.text.lower():
soup = BeautifulSoup(resp.text, "lxml-xml")
# Handle sitemap index
sitemaps = soup.find_all("sitemap")
if sitemaps:
for sm in sitemaps:
loc = sm.find("loc")
if loc:
try:
sub_resp = requests.get(loc.text.strip(), timeout=10)
if sub_resp.status_code == 200:
sub_soup = BeautifulSoup(sub_resp.text, "lxml-xml")
for url_tag in sub_soup.find_all("url"):
loc_tag = url_tag.find("loc")
if loc_tag:
urls.append(loc_tag.text.strip())
except Exception:
continue
else:
for url_tag in soup.find_all("url"):
loc = url_tag.find("loc")
if loc:
urls.append(loc.text.strip())
if urls:
return urls
except Exception:
continue
return urls
def crawl_links(domain: str, max_pages: int, progress_cb=None) -> list[str]:
"""Crawl internal links starting from the homepage."""
base_url = f"https://{domain}"
visited = set()
to_visit = [base_url]
found_urls = []
while to_visit and len(found_urls) < max_pages:
url = to_visit.pop(0)
norm = normalize_url(url)
if norm in visited:
continue
visited.add(norm)
try:
resp = requests.get(url, timeout=10, headers={"User-Agent": "LanguageToolChecker/1.0"})
if resp.status_code != 200:
continue
content_type = resp.headers.get("content-type", "")
if "text/html" not in content_type:
continue
except Exception:
continue
found_urls.append(url)
if progress_cb:
progress_cb(f"Crawling: {len(found_urls)} Seiten gefunden...")
soup = BeautifulSoup(resp.text, "lxml")
for a_tag in soup.find_all("a", href=True):
href = a_tag["href"]
full_url = urljoin(url, href)
parsed = urlparse(full_url)
# Only same domain, no fragments, no file extensions
if parsed.netloc and parsed.netloc != domain:
continue
if not parsed.netloc:
full_url = urljoin(base_url, href)
parsed = urlparse(full_url)
skip_ext = (".pdf", ".jpg", ".jpeg", ".png", ".gif", ".svg",
".zip", ".mp3", ".mp4", ".doc", ".docx", ".xls", ".xlsx")
if any(parsed.path.lower().endswith(ext) for ext in skip_ext):
continue
clean = normalize_url(full_url)
if clean not in visited and len(to_visit) < max_pages * 3:
to_visit.append(full_url)
return found_urls
def extract_page_content(html: str, url: str) -> dict:
"""Extract visible text from a page, split by semantic sections."""
soup = BeautifulSoup(html, "lxml")
# Remove ignored elements
for selector in IGNORE_SELECTORS:
try:
for el in soup.select(selector):
el.decompose()
except Exception:
continue
sections = []
# Title tag
title = soup.find("title")
if title and title.string and title.string.strip():
sections.append({"type": "Title", "text": title.string.strip()})
# Meta description
meta_desc = soup.find("meta", attrs={"name": "description"})
if meta_desc and meta_desc.get("content", "").strip():
sections.append({"type": "Meta Description", "text": meta_desc["content"].strip()})
# Body content
body = soup.find("body")
if not body:
return {"url": url, "sections": sections}
# Walk the DOM in document order, skipping children of already-captured elements.
# "Block" tags are content containers (headings, paragraphs, list items, etc.)
# "Inline" tags (a, button) are only captured when they are NOT inside a block tag.
block_tags = {
"h1", "h2", "h3", "h4", "h5", "h6",
"p", "li", "blockquote", "figcaption", "td", "th",
}
inline_tags = {"a", "button"}
tag_labels = {
"h1": "Überschrift (H1)", "h2": "Überschrift (H2)", "h3": "Überschrift (H3)",
"h4": "Überschrift (H4)", "h5": "Überschrift (H5)", "h6": "Überschrift (H6)",
"p": "Absatz", "li": "Listeneintrag", "blockquote": "Zitat",
"figcaption": "Bildunterschrift", "td": "Tabellenzelle", "th": "Tabellenkopf",
"button": "Button", "a": "Link-Text",
}
all_relevant = block_tags | inline_tags
seen_texts = set()
captured_elements = set() # track element ids so children are skipped
for el in body.find_all(all_relevant):
# Skip if this element is nested inside an already-captured block element
skip = False
for parent in el.parents:
if id(parent) in captured_elements:
skip = True
break
if parent is body:
break
if skip:
continue
tag_name = el.name
text = el.get_text(separator=" ", strip=True)
text = re.sub(r"\s+", " ", text).strip()
if not text or len(text) <= 2 or text in seen_texts:
continue
if tag_name in inline_tags and (len(text) <= 3 or len(text) >= 200):
continue
seen_texts.add(text)
label = tag_labels.get(tag_name, tag_name)
sections.append({"type": label, "text": text})
# Mark block elements as captured so their children are skipped
if tag_name in block_tags:
captured_elements.add(id(el))
return {"url": url, "sections": sections}
def check_text_with_languagetool(
text: str, language: str, username: str, api_key: str
) -> list[dict]:
"""Send text to LanguageTool API and return matches."""
if not text.strip():
return []
all_matches = []
chunks = []
if len(text) > CHUNK_SIZE:
# Split at sentence boundaries
sentences = re.split(r"(?<=[.!?])\s+", text)
current_chunk = ""
current_offset = 0
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > CHUNK_SIZE:
if current_chunk:
chunks.append((current_chunk, current_offset))
current_offset += len(current_chunk) + 1
current_chunk = sentence
else:
if current_chunk:
current_chunk += " " + sentence
else:
current_chunk = sentence
if current_chunk:
chunks.append((current_chunk, current_offset))
else:
chunks = [(text, 0)]
for chunk_text, offset in chunks:
data = {
"text": chunk_text,
"language": language,
"username": username,
"apiKey": api_key,
"enabledOnly": "false",
}
try:
resp = requests.post(
"https://api.languagetoolplus.com/v2/check",
data=data,
timeout=30,
)
if resp.status_code == 200:
result = resp.json()
for match in result.get("matches", []):
match["offset"] += offset
all_matches.append(match)
elif resp.status_code in (401, 403):
raise ValueError(f"API-Authentifizierung fehlgeschlagen: {resp.text}")
else:
raise ValueError(f"LanguageTool API Fehler ({resp.status_code}): {resp.text}")
except requests.exceptions.RequestException as e:
raise ValueError(f"Netzwerkfehler bei LanguageTool API: {e}")
# Rate limiting: max 2 requests/sec
time.sleep(0.5)
return all_matches
def categorize_match(match: dict) -> str:
"""Categorize a LanguageTool match into spelling/grammar/style."""
rule = match.get("rule", {})
category_id = rule.get("category", {}).get("id", "")
issue_type = rule.get("issueType", "")
if category_id in ("TYPOS", "SPELLING") or issue_type == "misspelling":
return "spelling"
elif category_id in ("GRAMMAR", "PUNCTUATION", "SYNTAX") or issue_type == "grammar":
return "grammar"
else:
return "style"
def process_matches_for_section(section_text: str, matches: list[dict], section_offset: int) -> list[dict]:
"""Filter matches that belong to this section and adjust offsets."""
section_matches = []
section_end = section_offset + len(section_text)
for match in matches:
m_offset = match.get("offset", 0)
m_length = match.get("length", 0)
if m_offset >= section_offset and m_offset + m_length <= section_end:
section_matches.append({
"offset": m_offset - section_offset,
"length": m_length,
"message": match.get("message", ""),
"shortMessage": match.get("shortMessage", ""),
"replacements": [r.get("value", "") for r in match.get("replacements", [])[:5]],
"category": categorize_match(match),
"rule": match.get("rule", {}).get("description", ""),
"context_text": match.get("context", {}).get("text", ""),
})
return section_matches
def crawl_domain(domain: str, max_pages: int) -> dict:
"""Crawl a domain and return the list of found URLs (synchronous)."""
check_domain_reachable(domain)
urls = extract_sitemap_urls(domain)
sitemap_used = bool(urls)
if not urls:
urls = crawl_links(domain, max_pages)
urls = urls[:max_pages]
return {"urls": urls, "sitemap_used": sitemap_used}
def run_check(session_id: str, domain: str, language: str,
username: str, api_key: str, urls: list[str]):
"""Run the check pipeline for a given list of URLs (synchronous, runs in a thread)."""
session = sessions[session_id]
try:
session["status"] = "checking"
session["progress"] = {"current": 0, "total": len(urls), "page": ""}
session["message"] = f"Prüfe {len(urls)} Seiten..."
results = []
total_errors = {"spelling": 0, "grammar": 0, "style": 0}
for i, url in enumerate(urls):
session["progress"]["current"] = i + 1
session["progress"]["page"] = url
session["message"] = f"Prüfe Seite {i + 1}/{len(urls)}: {url}"
page_result = {
"url": url,
"sections": [],
"error_count": {"spelling": 0, "grammar": 0, "style": 0},
"total_errors": 0,
"skipped": False,
"error_message": None,
}
try:
resp = requests.get(
url, timeout=10,
headers={"User-Agent": "LanguageToolChecker/1.0"}
)
if resp.status_code != 200:
page_result["skipped"] = True
page_result["error_message"] = f"HTTP {resp.status_code}"
results.append(page_result)
continue
content_type = resp.headers.get("content-type", "")
if "text/html" not in content_type:
page_result["skipped"] = True
page_result["error_message"] = f"Kein HTML: {content_type}"
results.append(page_result)
continue
page_data = extract_page_content(resp.text, url)
# Build full text for API check
full_text_parts = []
section_offsets = []
current_offset = 0
for section in page_data["sections"]:
section_offsets.append(current_offset)
full_text_parts.append(section["text"])
current_offset += len(section["text"]) + 1 # +1 for newline
full_text = "\n".join(full_text_parts)
if not full_text.strip():
page_result["sections"] = [
{"type": s["type"], "text": s["text"], "matches": []}
for s in page_data["sections"]
]
results.append(page_result)
continue
# Check with LanguageTool
try:
matches = check_text_with_languagetool(
full_text, language, username, api_key
)
except ValueError as e:
error_msg = str(e)
if "Authentifizierung" in error_msg:
session["status"] = "error"
session["message"] = error_msg
return
page_result["skipped"] = True
page_result["error_message"] = error_msg
results.append(page_result)
continue
# Distribute matches to sections
for j, section in enumerate(page_data["sections"]):
sec_offset = section_offsets[j] if j < len(section_offsets) else 0
sec_matches = process_matches_for_section(
section["text"], matches, sec_offset
)
page_result["sections"].append({
"type": section["type"],
"text": section["text"],
"matches": sec_matches,
})
for m in sec_matches:
cat = m["category"]
page_result["error_count"][cat] += 1
total_errors[cat] += 1
page_result["total_errors"] = sum(page_result["error_count"].values())
except requests.exceptions.Timeout:
page_result["skipped"] = True
page_result["error_message"] = "Timeout (>10s)"
except Exception as e:
page_result["skipped"] = True
page_result["error_message"] = str(e)[:200]
results.append(page_result)
# Sort by error count descending
results.sort(key=lambda r: r["total_errors"], reverse=True)
session["status"] = "done"
session["message"] = "Prüfung abgeschlossen"
session["results"] = {
"domain": domain,
"language": language,
"pages_checked": len(results),
"pages_skipped": sum(1 for r in results if r["skipped"]),
"total_errors": total_errors,
"pages": results,
}
except Exception as e:
session["status"] = "error"
session["message"] = f"Unerwarteter Fehler: {str(e)[:300]}"
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/api/crawl")
async def crawl(request: Request):
"""Crawl a domain and return the list of URLs for preview."""
body = await request.json()
domain = body.get("domain", "").strip().lower()
max_pages = int(body.get("maxPages", 50))
domain = re.sub(r"^https?://", "", domain)
domain = domain.rstrip("/")
if not domain:
return {"error": "Bitte eine Domain eingeben"}
try:
result = crawl_domain(domain, max_pages)
return {
"urls": result["urls"],
"sitemap_used": result["sitemap_used"],
"domain": domain,
}
except ValueError as e:
return {"error": str(e)}
@app.post("/api/check")
async def start_check(request: Request):
body = await request.json()
domain = body.get("domain", "").strip().lower()
language = body.get("language", "de-DE")
username = body.get("username", "").strip()
api_key = body.get("apiKey", "").strip()
urls = body.get("urls", [])
domain = re.sub(r"^https?://", "", domain)
domain = domain.rstrip("/")
if not domain:
return {"error": "Bitte eine Domain eingeben"}
if not username or not api_key:
return {"error": "Bitte LanguageTool Credentials eingeben"}
if not urls:
return {"error": "Keine URLs zum Prüfen ausgewählt"}
session_id = str(uuid.uuid4())
sessions[session_id] = {
"status": "starting",
"message": "Starte Prüfung...",
"progress": {"current": 0, "total": 0, "page": ""},
"results": None,
}
thread = threading.Thread(
target=run_check,
args=(session_id, domain, language, username, api_key, urls),
daemon=True,
)
thread.start()
return {"sessionId": session_id}
@app.get("/api/status/{session_id}")
async def get_status(session_id: str):
session = sessions.get(session_id)
if not session:
return {"error": "Session nicht gefunden"}
return {
"status": session["status"],
"message": session["message"],
"progress": session["progress"],
}
@app.get("/api/results/{session_id}")
async def get_results(session_id: str):
session = sessions.get(session_id)
if not session:
return {"error": "Session nicht gefunden"}
if session["status"] != "done":
return {"error": "Prüfung noch nicht abgeschlossen", "status": session["status"]}
results = session["results"]
# Clean up session after delivering results
# (keep it around for a bit in case of re-requests)
return results
@app.get("/api/stream/{session_id}")
async def stream_status(session_id: str):
"""SSE endpoint for live progress updates."""
async def event_generator():
while True:
session = sessions.get(session_id)
if not session:
yield f"data: {json.dumps({'status': 'error', 'message': 'Session nicht gefunden'})}\n\n"
break
payload = {
"status": session["status"],
"message": session["message"],
"progress": session["progress"],
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
if session["status"] in ("done", "error"):
break
await asyncio.sleep(0.5)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
if __name__ == "__main__":
print("\n LanguageTool Website Checker")
print(" ============================")
print(" Öffne http://localhost:8000 im Browser\n")
uvicorn.run(app, host="0.0.0.0", port=8000)