import requests import re import json from bs4 import BeautifulSoup from time import sleep from os import makedirs from git import Repo import argparse from datetime import datetime BUNDESTAG_URL = "https://www.bundestag.de/ajax/filterlist/de/abgeordnete/biografien/1040594-1040594?limit=9999&view=BTBiographyList" BUNDESTAG_BASE_URL = "https://www.bundestag.de" class Biography: def __init__( self, name, party, job, cv, speeches, votes, functions, additional_functions, mandate, disclosures, ): self.name = name self.party = party self.job = job self.cv = cv self.speeches = speeches self.votes = votes self.functions = functions self.additional_functions = additional_functions self.mandate = mandate self.disclosures = disclosures def __repr__(self): txt = f""" name: {self.name} party: {self.party} cv: {self.cv} speeches: {self.speeches} votes: {self.votes} functions: {self.functions} additional_functions: {self.additional_functions} mandate: {self.mandate} disclosures: {self.disclosures} """ return txt def __str__(self): if self.speeches: speeches_str = "".join( [f"\n- {speech[0]}: {speech[1]}" for speech in self.speeches] ) else: speeches_str = "" if self.votes: votes_str = "".join( [f"\n- {vote[0]}, {vote[1]}: {vote[2]}" for vote in self.votes] ) else: votes_str = "" if self.job: job_str = self.job else: job_str = "" txt = f""" # Persönliche Angaben Name: {self.name[1]} {self.name[0]} Partei: {self.party} Beruf: {job_str} Biographie: {self.cv} # Reden {speeches_str} # Abstimmungen {votes_str} # Funktionen ## Ämter im Bundestag {funcs_to_str(self.functions)} ## Sonstige Gremien {funcs_to_str(self.additional_functions)} # Mandat {self.mandate[0]}, {self.mandate[1]} # Veröffentlichungspflichtige Angaben {funcs_to_str(self.disclosures)} """ return txt def to_dict(self): return { "name": self.name, "party": self.party, "cv": self.cv, "speeches": self.speeches, "votes": self.votes, "functions": self.functions, "additional_functions": self.additional_functions, "mandate": self.mandate, "disclosures": self.disclosures, } def funcs_to_str(funcs): if not funcs: return "" out = "" for func in funcs: out += f"\n- {func[0]}" for loc in sorted(func[1]): out += f"\n - {loc}" return out def main(): parser = argparse.ArgumentParser( prog="Bundescrawler", description="Crawls the pages of german representatives and saves the information in a git repository", ) parser.add_argument("-o", "--out") parser.add_argument("--debug", action="store_true") parser.add_argument("--no-git", action="store_true") args = parser.parse_args() if not args.out: raise ValueError("must supply out directory") repo = Repo(args.out) links, names = get_links_and_names() if args.debug: links = links[:5] names = names[:5] sleep_for = 0 else: sleep_for = 10 bios = [get_bio(link, name, sleep_for) for link, name in zip(links, names)] old_bios = load_old_bios(args.out) if not args.debug: save_raw(bios, args.out) save_individuals(bios, args.out) save_votes(bios, args.out) save_disclosures(bios, args.out) save_readme(bios, args.out) save_party_index(bios, args.out) save_letter_indexes(bios, args.out) if args.no_git: return repo.git.add("*") if repo.git.diff("--cached", name_only=True) == "": return message = generate_commit_message(old_bios, bios) repo.index.commit(message) origin = repo.remote(name="origin") origin.push() def load_old_bios(out): try: with open(f"{out}/raw.json", "r", encoding="utf-8") as f: return json.load(f) except FileNotFoundError: return [] def generate_commit_message(old_bios, new_bios): old_names = {tuple(b["name"]): b for b in old_bios} new_names = {tuple(b.name): b for b in new_bios} added = [new_names[n] for n in new_names if n not in old_names] removed = [old_names[n] for n in old_names if n not in new_names] new_disclosures = [] for bio in new_bios: key = tuple(bio.name) if key not in old_names: continue old_discl = old_names[key].get("disclosures") or [] new_discl = bio.disclosures or [] old_items = {item for d in old_discl for item in d[1]} new_items = {item for d in new_discl for item in d[1]} for item in new_items - old_items: new_disclosures.append((bio, item)) party_changes = [] for bio in new_bios: key = tuple(bio.name) if key not in old_names: continue old_party = old_names[key].get("party", "") if old_party != bio.party: party_changes.append((bio, old_party)) date = datetime.now().strftime("%Y-%m-%d") sections = [] if added: sections.append("Neue Abgeordnete:\n" + "\n".join( f"- {b.name[1]} {b.name[0]} ({b.party})" for b in added )) if removed: sections.append("Ausgeschieden:\n" + "\n".join( f"- {b['name'][1]} {b['name'][0]} ({b.get('party', '')})" for b in removed )) if party_changes: sections.append("Parteiwechsel:\n" + "\n".join( f"- {b.name[1]} {b.name[0]}: {old} -> {b.party}" for b, old in party_changes )) if new_disclosures: sections.append("Neue Veröffentlichungen:\n" + "\n".join( f"- {b.name[1]} {b.name[0]} ({b.party}): {item}" for b, item in new_disclosures )) updated = len(new_bios) sections.append(f"{updated} Profile aktualisiert") title = f"Aktualisierung {date}" body = "\n\n".join(sections) return f"{title}\n\n{body}" def save_individuals(bios, out): for rep in bios: first_letter = rep.name[0][0].upper() name_str = f"{rep.name[0]} {rep.name[1]}".replace(" ", "_") dir = f"{out}/Abgeordnete/{first_letter}" makedirs(dir, exist_ok=True) with open(f"{dir}/{name_str}.md", "w", encoding="utf-8") as rep_file: rep_file.write(str(rep)) def save_raw(bios, out): with open(f"{out}/raw.json", "w", encoding="utf-8") as raw_file: json.dump( [bio.to_dict() for bio in bios], raw_file, indent=2, ensure_ascii=False ) def save_disclosures(bios, out): dir = f"{out}/Voep_Angaben" makedirs(dir, exist_ok=True) bios_with_discl = [bio for bio in bios if bio.disclosures] alpha_str = "" for bio in sorted(bios_with_discl, key=lambda b: b.name): alpha_str += f"# {bio.name[1]} {bio.name[0]} ({bio.party})" alpha_str += funcs_to_str(bio.disclosures) alpha_str += "\n" with open(f"{dir}/Alphabetisch.md", "w", encoding="utf-8") as alpha_file: alpha_file.write(alpha_str.strip()) party_str = "" for party, bio_list in group_by_party(bios_with_discl): party_str += f"# {party}\n" for bio in bio_list: party_str += f"## {bio.name[1]} {bio.name[0]}" party_str += funcs_to_str(bio.disclosures) party_str += "\n" with open(f"{dir}/Nach_Partei.md", "w", encoding="utf-8") as party_file: party_file.write(party_str.strip()) def save_votes(bios, out): dir = f"{out}/Abstimmungen" makedirs(dir, exist_ok=True) # Load existing votes to preserve those deleted from the website json_path = f"{dir}/votes.json" try: with open(json_path, "r", encoding="utf-8") as f: all_votes = json.load(f) except FileNotFoundError: all_votes = {} # Merge new votes for bio in bios: if not bio.votes: continue rep_name = f"{bio.name[1]} {bio.name[0]}" for vote in bio.votes: topic, date, result = vote[0], vote[1], vote[2] key = f"{topic} ({date})" if key not in all_votes: all_votes[key] = {} all_votes[key][rep_name] = {"party": bio.party, "vote": result} # Save JSON backing store with open(json_path, "w", encoding="utf-8") as f: json.dump(all_votes, f, indent=2, ensure_ascii=False) # Generate one markdown file per vote topic for key, votes in sorted(all_votes.items()): md = f"# {key}\n\n" # Group by party, then by vote result by_party = {} for name, info in votes.items(): party = info["party"] if party not in by_party: by_party[party] = {} result = info["vote"] if result not in by_party[party]: by_party[party][result] = [] by_party[party][result].append(name) for party in sorted(by_party): md += f"## {party}\n\n" for result in sorted(by_party[party]): md += f"### {result}\n\n" for name in sorted(by_party[party][result]): md += f"- {name}\n" md += "\n" safe_name = re.sub(r'[/<>:"|?*]', "_", key)[:200] with open(f"{dir}/{safe_name}.md", "w", encoding="utf-8") as f: f.write(md) def save_readme(bios, out): date = datetime.now().strftime("%Y-%m-%d %H:%M") total = len(bios) parties = {} for bio in bios: parties[bio.party] = parties.get(bio.party, 0) + 1 md = f"# Bundestag\n\n" md += f"Automatisch erfasste Daten der Abgeordneten des Deutschen Bundestages.\n\n" md += f"**Abgeordnete:** {total} \n" md += f"**Letzte Aktualisierung:** {date}\n\n" md += "## Parteien\n\n" md += "| Partei | Abgeordnete |\n" md += "|--------|------------:|\n" for party, count in sorted(parties.items()): safe_party = re.sub(r'[/<>:"|?*]', "_", party) md += f"| [{party}](Parteien/{safe_party}.md) | {count} |\n" md += f"\n## Verzeichnisse\n\n" md += "- [Abgeordnete](Abgeordnete/) — Einzelprofile nach Nachname\n" md += "- [Abstimmungen](Abstimmungen/) — Abstimmungen nach Thema\n" md += "- [Veröffentlichungspflichtige Angaben](Voep_Angaben/) — Nebentätigkeiten\n" md += "- [Parteien](Parteien/) — Abgeordnete nach Partei\n" with open(f"{out}/README.md", "w", encoding="utf-8") as f: f.write(md) def save_party_index(bios, out): dir = f"{out}/Parteien" makedirs(dir, exist_ok=True) for party, bio_list in group_by_party(bios): md = f"# {party}\n\n" md += f"{len(bio_list)} Abgeordnete\n\n" for bio in sorted(bio_list, key=lambda b: b.name): first_letter = bio.name[0][0].upper() name_str = f"{bio.name[0]} {bio.name[1]}".replace(" ", "_") md += f"- [{bio.name[1]} {bio.name[0]}](../Abgeordnete/{first_letter}/{name_str}.md)" if bio.job: md += f" — {bio.job}" md += "\n" safe_party = re.sub(r'[/<>:"|?*]', "_", party) with open(f"{dir}/{safe_party}.md", "w", encoding="utf-8") as f: f.write(md) def save_letter_indexes(bios, out): by_letter = {} for bio in bios: letter = bio.name[0][0].upper() if letter not in by_letter: by_letter[letter] = [] by_letter[letter].append(bio) for letter, bio_list in sorted(by_letter.items()): md = f"# {letter}\n\n" for bio in sorted(bio_list, key=lambda b: b.name): name_str = f"{bio.name[0]} {bio.name[1]}".replace(" ", "_") md += f"- [{bio.name[1]} {bio.name[0]}]({name_str}.md) ({bio.party})\n" with open(f"{out}/Abgeordnete/{letter}/index.md", "w", encoding="utf-8") as f: f.write(md) def group_by_party(bios): grouped = {} for bio in bios: if bio.party in grouped.keys(): grouped[bio.party].append(bio) else: grouped[bio.party] = [bio] as_list = [(key, val) for key, val in grouped.items()] as_list.sort(key=lambda party: party[0]) return as_list def get_links_and_names(): response = requests.get(BUNDESTAG_URL) soup = BeautifulSoup(response.text, features="html.parser") links = [a.get("href") for a in soup.find_all("a")] names = [a.text.strip().split("\n\n\n") for a in soup.find_all("a")] return (links, names) def get_bio(url, name, sleep_for): name, party = name name = name.split(", ") party = party.replace("\n\xa0*", " (ausgeschieden)") print(f"Getting {url} for {name[1]} {name[0]}") response = request_handle_rate_limit(url) soup = BeautifulSoup(response.text, features="html.parser") intro_info = soup.find(class_="m-biography__introInfo") job_elem = intro_info.find("span") if intro_info else None job = job_elem.text if job_elem else None cv_elem = soup.find(class_="m-biography__biography") cv = cv_elem.text.strip() if cv_elem else "" ajax_divs = soup.find_all(class_="m-ajaxLoadedContent") speech_div = None vote_div = None for div in ajax_divs: if "abstimmung" in div.get("x-data"): vote_div = div else: speech_div = div speech = get_ajax(speech_div) speeches = parse_speech(speech) vote = get_ajax(vote_div) votes = parse_vote(vote) function_divs = soup.find_all(class_="m-biography__memberships") if len(function_divs) > 0: functions = get_functions(function_divs[0]) else: functions = None if len(function_divs) > 1: additional_functions = get_functions(function_divs[1]) else: additional_functions = None mandate_elem = soup.find(class_="m-biography__subHeading --mandate") mandate = ( mandate_elem.text if mandate_elem else "", soup.find(string=re.compile(r"^Wahlkreis \d*:")) or "", ) disclosures = get_disclosures(soup.find(class_="m-biography__infos")) bio = Biography( name, party, job, cv, speeches, votes, functions, additional_functions, mandate, disclosures, ) sleep(sleep_for) return bio def request_handle_rate_limit(url): for _ in range(5): try: response = requests.get(url) response.raise_for_status() return response except requests.RequestException: print("Request failed! waiting 5min") sleep(300) return requests.get(url) def get_disclosures(elem): if not elem: return None divs = elem.find_all("div", class_=lambda cls: cls != "m-biography__infoDisclaimer") out = [] for div in divs: current_heading = "" current_body = [] for child in div.children: if child.name == "h3": if current_body != []: out.append((current_heading, current_body)) current_heading = child.text.strip() current_body = [] continue if not child.name: continue if child.text.strip() == "": continue if child.text.strip() == "Keine veröffentlichungspflichtigen Angaben.": continue current_body.append(child.text.strip()) if current_heading == "" and current_body == []: continue out.append((current_heading, current_body)) return out def get_functions(elem): out = [] current_heading = None current_body = [] for child in elem.children: if child.name == "h3": if current_body != []: out.append((current_heading, sorted(current_body))) current_heading = child.text.strip() current_body = [] continue if not child.name: continue current_body.extend( grandchild.text.strip().replace("\n\n\n(Interner Link)", "") for grandchild in child.children if grandchild.text.strip() != "" ) out.append((current_heading, sorted(current_body))) return sorted(out) def parse_speech(page): if not page: return None soup = BeautifulSoup(page.text, features="html.parser") infos = [s.text.strip() for s in soup.find_all(class_="m-biography__speechTitle")] titles = [ title.text.strip() for title in soup.find_all(class_="a-link__label") if "--hidden" not in title.get("class") ][::2] return list(zip(titles, infos)) def parse_vote(page): if not page: return None soup = BeautifulSoup(page.text, features="html.parser") rows = soup.find_all("tr")[1:] parsed = [] for row in rows: cols = row.find_all("td") parsed.append([col.text.strip() for col in cols]) return parsed def get_ajax(elem): if not elem: return None inner = elem.get("x-data").removeprefix("dynamicTemplateOutput(").removesuffix(")") data = json.loads(inner) url = BUNDESTAG_BASE_URL + data["endpoint"] filters = data["filters"] sanitized_filters = [ (key, value.replace(" ", "+").replace("#", "%23")) for key, value in filters.items() ] url = url + "?" + "&".join(f"{key}={val}" for key, val in sanitized_filters) response = request_handle_rate_limit(url) return response if __name__ == "__main__": main()