import requests import re import json from bs4 import BeautifulSoup from time import sleep from os import makedirs, remove from os.path import exists from git import Repo import argparse from datetime import datetime BUNDESTAG_URL = "https://www.bundestag.de/ajax/filterlist/de/abgeordnete/biografien/1040594-1040594?limit=9999&view=BTBiographyList" BUNDESTAG_BASE_URL = "https://www.bundestag.de" class Biography: def __init__( self, name, party, job, cv, speeches, votes, functions, additional_functions, mandate, disclosures, ): self.name = name self.party = party self.job = job self.cv = cv self.speeches = speeches self.votes = votes self.functions = functions self.additional_functions = additional_functions self.mandate = mandate self.disclosures = disclosures def __repr__(self): txt = f""" name: {self.name} party: {self.party} cv: {self.cv} speeches: {self.speeches} votes: {self.votes} functions: {self.functions} additional_functions: {self.additional_functions} mandate: {self.mandate} disclosures: {self.disclosures} """ return txt def __str__(self): if self.speeches: speeches_str = "".join( [f"\n- {speech[0]}: {speech[1]}" for speech in self.speeches] ) else: speeches_str = "" if self.votes: votes_str = "".join( [f"\n- {vote[0]}, {vote[1]}: {vote[2]}" for vote in self.votes] ) else: votes_str = "" if self.job: job_str = self.job else: job_str = "" txt = f""" # Persönliche Angaben Name: {self.name[1]} {self.name[0]} Partei: {self.party} Beruf: {job_str} Biographie: {self.cv} # Reden {speeches_str} # Abstimmungen {votes_str} # Funktionen ## Ämter im Bundestag {funcs_to_str(self.functions)} ## Sonstige Gremien {funcs_to_str(self.additional_functions)} # Mandat {self.mandate[0]}, {self.mandate[1]} # Veröffentlichungspflichtige Angaben {funcs_to_str(self.disclosures)} """ return txt def to_dict(self): return { "name": self.name, "party": self.party, "cv": self.cv, "speeches": self.speeches, "votes": self.votes, "functions": self.functions, "additional_functions": self.additional_functions, "mandate": self.mandate, "disclosures": self.disclosures, } def funcs_to_str(funcs): if not funcs: return "" out = "" for func in funcs: out += f"\n- {func[0]}" for loc in sorted(func[1]): out += f"\n - {loc}" return out def main(): parser = argparse.ArgumentParser( prog="Bundescrawler", description="Crawls the pages of german representatives and saves the information in a git repository", ) parser.add_argument("-o", "--out") parser.add_argument("--debug", action="store_true") parser.add_argument("--no-git", action="store_true") args = parser.parse_args() if not args.out: raise ValueError("must supply out directory") repo = Repo(args.out) links, names = get_links_and_names() if args.debug: links = links[:5] names = names[:5] sleep_for = 0 else: sleep_for = 10 bios = [get_bio(link, name, sleep_for) for link, name in zip(links, names)] old_bios = load_old_bios(args.out) if not args.debug: save_raw(bios, args.out) save_individuals(bios, args.out) save_votes(bios, args.out) save_disclosures(bios, args.out) save_readme(bios, args.out) save_party_index(bios, args.out) save_letter_indexes(bios, args.out) if args.no_git: return repo.git.add("*") if repo.git.diff("--cached", name_only=True) == "": return message = generate_commit_message(old_bios, bios) repo.index.commit(message) origin = repo.remote(name="origin") origin.push() def load_old_bios(out): try: with open(f"{out}/raw.json", "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: return [] def generate_commit_message(old_bios, new_bios): old_names = {tuple(b["name"]): b for b in old_bios} new_names = {tuple(b.name): b for b in new_bios} added = [new_names[n] for n in new_names if n not in old_names] removed = [old_names[n] for n in old_names if n not in new_names] new_disclosures = [] for bio in new_bios: key = tuple(bio.name) if key not in old_names: continue old_discl = old_names[key].get("disclosures") or [] new_discl = bio.disclosures or [] old_items = {item for d in old_discl for item in d[1]} new_items = {item for d in new_discl for item in d[1]} for item in new_items - old_items: new_disclosures.append((bio, item)) party_changes = [] for bio in new_bios: key = tuple(bio.name) if key not in old_names: continue old_party = old_names[key].get("party", "") if old_party != bio.party: party_changes.append((bio, old_party)) date = datetime.now().strftime("%Y-%m-%d") sections = [] if added: sections.append("Neue Abgeordnete:\n" + "\n".join( f"- {b.name[1]} {b.name[0]} ({b.party})" for b in added )) if removed: sections.append("Ausgeschieden:\n" + "\n".join( f"- {b['name'][1]} {b['name'][0]} ({b.get('party', '')})" for b in removed )) if party_changes: sections.append("Parteiwechsel:\n" + "\n".join( f"- {b.name[1]} {b.name[0]}: {old} -> {b.party}" for b, old in party_changes )) if new_disclosures: sections.append("Neue Veröffentlichungen:\n" + "\n".join( f"- {b.name[1]} {b.name[0]} ({b.party}): {item}" for b, item in new_disclosures )) updated = len(new_bios) sections.append(f"{updated} Profile aktualisiert") title = f"Aktualisierung {date}" body = "\n\n".join(sections) return f"{title}\n\n{body}" def save_individuals(bios, out): for rep in bios: first_letter = rep.name[0][0].upper() name_str = f"{rep.name[0]} {rep.name[1]}".replace(" ", "_") dir = f"{out}/Abgeordnete/{first_letter}" makedirs(dir, exist_ok=True) with open(f"{dir}/{name_str}.md", "w", encoding="utf-8") as rep_file: rep_file.write(str(rep)) def save_raw(bios, out): with open(f"{out}/raw.json", "w", encoding="utf-8") as raw_file: json.dump( [bio.to_dict() for bio in bios], raw_file, indent=2, ensure_ascii=False ) def save_disclosures(bios, out): dir = f"{out}/Voep_Angaben" makedirs(dir, exist_ok=True) bios_with_discl = [bio for bio in bios if bio.disclosures] alpha_str = "" for bio in sorted(bios_with_discl, key=lambda b: b.name): alpha_str += f"# {bio.name[1]} {bio.name[0]} ({bio.party})" alpha_str += funcs_to_str(bio.disclosures) alpha_str += "\n" with open(f"{dir}/Alphabetisch.md", "w", encoding="utf-8") as alpha_file: alpha_file.write(alpha_str.strip()) party_str = "" for party, bio_list in group_by_party(bios_with_discl): party_str += f"# {party}\n" for bio in bio_list: party_str += f"## {bio.name[1]} {bio.name[0]}" party_str += funcs_to_str(bio.disclosures) party_str += "\n" with open(f"{dir}/Nach_Partei.md", "w", encoding="utf-8") as party_file: party_file.write(party_str.strip()) def save_votes(bios, out): dir = f"{out}/Abstimmungen" makedirs(dir, exist_ok=True) # Load existing votes to preserve those deleted from the website json_path = f"{dir}/votes.json" try: with open(json_path, "r", encoding="utf-8") as f: all_votes = json.load(f) except FileNotFoundError: all_votes = {} # Merge new votes for bio in bios: if not bio.votes: continue rep_name = f"{bio.name[1]} {bio.name[0]}" for vote in bio.votes: topic, date, result = vote[0], vote[1], vote[2] key = f"{topic} ({date})" if key not in all_votes: all_votes[key] = {} all_votes[key][rep_name] = {"party": bio.party, "vote": result} # Save JSON backing store with open(json_path, "w", encoding="utf-8") as f: json.dump(all_votes, f, indent=2, ensure_ascii=False) # Generate one markdown file per vote topic for key, votes in sorted(all_votes.items()): md = f"# {key}\n\n" # Group by party, then by vote result by_party = {} for name, info in votes.items(): party = info["party"] if party not in by_party: by_party[party] = {} result = info["vote"] if result not in by_party[party]: by_party[party][result] = [] by_party[party][result].append(name) for party in sorted(by_party): md += f"## {party}\n\n" for result in sorted(by_party[party]): md += f"### {result}\n\n" for name in sorted(by_party[party][result]): md += f"- {name}\n" md += "\n" safe_name = re.sub(r'[/<>:"|?*]', "_", key)[:200] with open(f"{dir}/{safe_name}.md", "w", encoding="utf-8") as f: f.write(md) def save_readme(bios, out): date = datetime.now().strftime("%Y-%m-%d %H:%M") total = len(bios) parties = {} for bio in bios: parties[bio.party] = parties.get(bio.party, 0) + 1 md = "# Bundestag\n\n" md += "Hier werden die Informationen, welche auf der Seite der Abgeordneten auf der " md += "Homepage des Bundestags verfügbar sind getrackt. Dies passiert automatisch " md += "mithilfe des [Bundescrawlers](https://gitlab.com/lentsmarco/bundescrawler).\n\n" md += f"**Abgeordnete:** {total} \n" md += f"**Letzte Aktualisierung:** {date}\n\n" md += "## Parteien\n\n" md += "| Partei | Abgeordnete |\n" md += "|--------|------------:|\n" for party, count in sorted(parties.items()): safe_party = re.sub(r'[/<>:"|?*]', "_", party) url_party = safe_party.replace(" ", "%20") md += f"| [{party}](Parteien/{url_party}.md) | {count} |\n" md += "\n## Struktur\n\n" md += "- [Abgeordnete](Abgeordnete/) — Einzelprofile, sortiert nach Anfangsbuchstabe des Nachnamens\n" md += "- [Abstimmungen](Abstimmungen/) — Abstimmungen nach Thema, gruppiert nach Partei\n" md += "- [Veröffentlichungspflichtige Angaben](Voep_Angaben/) — Nach Alphabet und Partei sortiert\n" md += "- [Parteien](Parteien/) — Abgeordnete nach Partei\n" md += "\nDie Datei `raw.json` enthält alle Informationen zur weiteren Verarbeitung in maschinenlesbarem Format.\n" md += "\n## Anwendung\n\n" md += "Dieses Repository verwendet das Programm für Versionskontrolle namens `git`. " md += "Das erlaubt die Veränderungen auf den Seiten in sehr praktischem Format anzuzeigen.\n\n" md += "Sieh dir im linken Reiter unter `Code -> Commits` die Vergangenheit der " md += "Veränderungen an. Durch einen Klick auf einen speziellen Commit kann man die " md += "Veränderungen sehen, die dabei passiert sind.\n" md += "\n## Kontakt\n\n" md += "Für Fragen stehe ich gerne unter `bundescrawler@pm.me` oder unter " md += "`Plan -> Issues` zur Verfügung.\n" # Remove old Readme.md if it exists alongside the new README.md old_readme = f"{out}/Readme.md" if exists(old_readme): remove(old_readme) with open(f"{out}/README.md", "w", encoding="utf-8") as f: f.write(md) def save_party_index(bios, out): dir = f"{out}/Parteien" makedirs(dir, exist_ok=True) for party, bio_list in group_by_party(bios): md = f"# {party}\n\n" md += f"{len(bio_list)} Abgeordnete\n\n" for bio in sorted(bio_list, key=lambda b: b.name): first_letter = bio.name[0][0].upper() name_str = f"{bio.name[0]} {bio.name[1]}".replace(" ", "_") md += f"- [{bio.name[1]} {bio.name[0]}](../Abgeordnete/{first_letter}/{name_str}.md)" if bio.job: md += f" — {bio.job}" md += "\n" safe_party = re.sub(r'[/<>:"|?*]', "_", party) with open(f"{dir}/{safe_party}.md", "w", encoding="utf-8") as f: f.write(md) def save_letter_indexes(bios, out): by_letter = {} for bio in bios: letter = bio.name[0][0].upper() if letter not in by_letter: by_letter[letter] = [] by_letter[letter].append(bio) for letter, bio_list in sorted(by_letter.items()): md = f"# {letter}\n\n" for bio in sorted(bio_list, key=lambda b: b.name): name_str = f"{bio.name[0]} {bio.name[1]}".replace(" ", "_") md += f"- [{bio.name[1]} {bio.name[0]}]({name_str}.md) ({bio.party})\n" with open(f"{out}/Abgeordnete/{letter}/index.md", "w", encoding="utf-8") as f: f.write(md) def group_by_party(bios): grouped = {} for bio in bios: if bio.party in grouped.keys(): grouped[bio.party].append(bio) else: grouped[bio.party] = [bio] as_list = [(key, val) for key, val in grouped.items()] as_list.sort(key=lambda party: party[0]) return as_list def get_links_and_names(): response = requests.get(BUNDESTAG_URL) soup = BeautifulSoup(response.text, features="html.parser") links = [a.get("href") for a in soup.find_all("a")] names = [a.text.strip().split("\n\n\n") for a in soup.find_all("a")] return (links, names) def get_bio(url, name, sleep_for): name, party = name name = name.split(", ") party = party.replace("\n\xa0*", " (ausgeschieden)") print(f"Getting {url} for {name[1]} {name[0]}") response = request_handle_rate_limit(url) soup = BeautifulSoup(response.text, features="html.parser") intro_info = soup.find(class_="m-biography__introInfo") job_elem = intro_info.find("span") if intro_info else None job = job_elem.text if job_elem else None cv_elem = soup.find(class_="m-biography__biography") cv = cv_elem.text.strip() if cv_elem else "" ajax_divs = soup.find_all(class_="m-ajaxLoadedContent") speech_div = None vote_div = None for div in ajax_divs: if "abstimmung" in div.get("x-data"): vote_div = div else: speech_div = div speech = get_ajax(speech_div) speeches = parse_speech(speech) vote = get_ajax(vote_div) votes = parse_vote(vote) function_divs = soup.find_all(class_="m-biography__memberships") if len(function_divs) > 0: functions = get_functions(function_divs[0]) else: functions = None if len(function_divs) > 1: additional_functions = get_functions(function_divs[1]) else: additional_functions = None mandate_elem = soup.find(class_="m-biography__subHeading --mandate") mandate = ( mandate_elem.text if mandate_elem else "", soup.find(string=re.compile(r"^Wahlkreis \d*:")) or "", ) disclosures = get_disclosures(soup.find(class_="m-biography__infos")) bio = Biography( name, party, job, cv, speeches, votes, functions, additional_functions, mandate, disclosures, ) sleep(sleep_for) return bio def request_handle_rate_limit(url): for _ in range(5): try: response = requests.get(url) response.raise_for_status() return response except requests.RequestException: print("Request failed! waiting 5min") sleep(300) return requests.get(url) def get_disclosures(elem): if not elem: return None divs = elem.find_all("div", class_=lambda cls: cls != "m-biography__infoDisclaimer") out = [] for div in divs: current_heading = "" current_body = [] for child in div.children: if child.name == "h3": if current_body != []: out.append((current_heading, current_body)) current_heading = child.text.strip() current_body = [] continue if not child.name: continue if child.text.strip() == "": continue if child.text.strip() == "Keine veröffentlichungspflichtigen Angaben.": continue current_body.append(child.text.strip()) if current_heading == "" and current_body == []: continue out.append((current_heading, current_body)) return out def get_functions(elem): out = [] current_heading = None current_body = [] for child in elem.children: if child.name == "h3": if current_body != []: out.append((current_heading, sorted(current_body))) current_heading = child.text.strip() current_body = [] continue if not child.name: continue current_body.extend( grandchild.text.strip().replace("\n\n\n(Interner Link)", "") for grandchild in child.children if grandchild.text.strip() != "" ) out.append((current_heading, sorted(current_body))) return sorted(out) def parse_speech(page): if not page: return None soup = BeautifulSoup(page.text, features="html.parser") infos = [s.text.strip() for s in soup.find_all(class_="m-biography__speechTitle")] titles = [ title.text.strip() for title in soup.find_all(class_="a-link__label") if "--hidden" not in title.get("class") ][::2] return list(zip(titles, infos)) def parse_vote(page): if not page: return None soup = BeautifulSoup(page.text, features="html.parser") rows = soup.find_all("tr")[1:] parsed = [] for row in rows: cols = row.find_all("td") parsed.append([col.text.strip() for col in cols]) return parsed def get_ajax(elem): if not elem: return None inner = elem.get("x-data").removeprefix("dynamicTemplateOutput(").removesuffix(")") data = json.loads(inner) url = BUNDESTAG_BASE_URL + data["endpoint"] filters = data["filters"] sanitized_filters = [ (key, value.replace(" ", "+").replace("#", "%23")) for key, value in filters.items() ] url = url + "?" + "&".join(f"{key}={val}" for key, val in sanitized_filters) response = request_handle_rate_limit(url) return response if __name__ == "__main__": main()