- README.md with total count, party breakdown table, and directory links - Parteien/<party>.md listing all members with links to their profiles - Abgeordnete/<letter>/index.md listing all representatives per letter Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
564 lines
17 KiB
Python
564 lines
17 KiB
Python
import requests
|
|
import re
|
|
import json
|
|
from bs4 import BeautifulSoup
|
|
from time import sleep
|
|
from os import makedirs
|
|
from git import Repo
|
|
import argparse
|
|
from datetime import datetime
|
|
|
|
BUNDESTAG_URL = "https://www.bundestag.de/ajax/filterlist/de/abgeordnete/biografien/1040594-1040594?limit=9999&view=BTBiographyList"
|
|
BUNDESTAG_BASE_URL = "https://www.bundestag.de"
|
|
|
|
|
|
class Biography:
|
|
def __init__(
|
|
self,
|
|
name,
|
|
party,
|
|
job,
|
|
cv,
|
|
speeches,
|
|
votes,
|
|
functions,
|
|
additional_functions,
|
|
mandate,
|
|
disclosures,
|
|
):
|
|
self.name = name
|
|
self.party = party
|
|
self.job = job
|
|
self.cv = cv
|
|
self.speeches = speeches
|
|
self.votes = votes
|
|
self.functions = functions
|
|
self.additional_functions = additional_functions
|
|
self.mandate = mandate
|
|
self.disclosures = disclosures
|
|
|
|
def __repr__(self):
|
|
txt = f"""
|
|
name: {self.name}
|
|
party: {self.party}
|
|
cv: {self.cv}
|
|
speeches: {self.speeches}
|
|
votes: {self.votes}
|
|
functions: {self.functions}
|
|
additional_functions: {self.additional_functions}
|
|
mandate: {self.mandate}
|
|
disclosures: {self.disclosures}
|
|
"""
|
|
return txt
|
|
|
|
def __str__(self):
|
|
if self.speeches:
|
|
speeches_str = "".join(
|
|
[f"\n- {speech[0]}: {speech[1]}" for speech in self.speeches]
|
|
)
|
|
else:
|
|
speeches_str = ""
|
|
|
|
if self.votes:
|
|
votes_str = "".join(
|
|
[f"\n- {vote[0]}, {vote[1]}: {vote[2]}" for vote in self.votes]
|
|
)
|
|
else:
|
|
votes_str = ""
|
|
|
|
if self.job:
|
|
job_str = self.job
|
|
else:
|
|
job_str = ""
|
|
|
|
txt = f"""
|
|
# Persönliche Angaben
|
|
Name: {self.name[1]} {self.name[0]}
|
|
|
|
Partei: {self.party}
|
|
|
|
Beruf: {job_str}
|
|
|
|
Biographie: {self.cv}
|
|
|
|
# Reden {speeches_str}
|
|
|
|
# Abstimmungen {votes_str}
|
|
|
|
# Funktionen
|
|
## Ämter im Bundestag {funcs_to_str(self.functions)}
|
|
|
|
## Sonstige Gremien {funcs_to_str(self.additional_functions)}
|
|
|
|
# Mandat
|
|
{self.mandate[0]}, {self.mandate[1]}
|
|
|
|
# Veröffentlichungspflichtige Angaben {funcs_to_str(self.disclosures)}
|
|
"""
|
|
return txt
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"name": self.name,
|
|
"party": self.party,
|
|
"cv": self.cv,
|
|
"speeches": self.speeches,
|
|
"votes": self.votes,
|
|
"functions": self.functions,
|
|
"additional_functions": self.additional_functions,
|
|
"mandate": self.mandate,
|
|
"disclosures": self.disclosures,
|
|
}
|
|
|
|
|
|
def funcs_to_str(funcs):
|
|
if not funcs:
|
|
return ""
|
|
out = ""
|
|
for func in funcs:
|
|
out += f"\n- {func[0]}"
|
|
for loc in sorted(func[1]):
|
|
out += f"\n - {loc}"
|
|
return out
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="Bundescrawler",
|
|
description="Crawls the pages of german representatives and saves the information in a git repository",
|
|
)
|
|
parser.add_argument("-o", "--out")
|
|
parser.add_argument("--debug", action="store_true")
|
|
parser.add_argument("--no-git", action="store_true")
|
|
args = parser.parse_args()
|
|
if not args.out:
|
|
raise ValueError("must supply out directory")
|
|
repo = Repo(args.out)
|
|
links, names = get_links_and_names()
|
|
if args.debug:
|
|
links = links[:5]
|
|
names = names[:5]
|
|
sleep_for = 0
|
|
else:
|
|
sleep_for = 10
|
|
|
|
bios = [get_bio(link, name, sleep_for) for link, name in zip(links, names)]
|
|
|
|
old_bios = load_old_bios(args.out)
|
|
|
|
if not args.debug:
|
|
save_raw(bios, args.out)
|
|
save_individuals(bios, args.out)
|
|
save_votes(bios, args.out)
|
|
save_disclosures(bios, args.out)
|
|
save_readme(bios, args.out)
|
|
save_party_index(bios, args.out)
|
|
save_letter_indexes(bios, args.out)
|
|
|
|
if args.no_git:
|
|
return
|
|
|
|
repo.git.add("*")
|
|
if repo.git.diff("--cached", name_only=True) == "":
|
|
return
|
|
|
|
message = generate_commit_message(old_bios, bios)
|
|
repo.index.commit(message)
|
|
origin = repo.remote(name="origin")
|
|
origin.push()
|
|
|
|
|
|
def load_old_bios(out):
|
|
try:
|
|
with open(f"{out}/raw.json", "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
return []
|
|
|
|
|
|
def generate_commit_message(old_bios, new_bios):
|
|
old_names = {tuple(b["name"]): b for b in old_bios}
|
|
new_names = {tuple(b.name): b for b in new_bios}
|
|
|
|
added = [new_names[n] for n in new_names if n not in old_names]
|
|
removed = [old_names[n] for n in old_names if n not in new_names]
|
|
|
|
new_disclosures = []
|
|
for bio in new_bios:
|
|
key = tuple(bio.name)
|
|
if key not in old_names:
|
|
continue
|
|
old_discl = old_names[key].get("disclosures") or []
|
|
new_discl = bio.disclosures or []
|
|
old_items = {item for d in old_discl for item in d[1]}
|
|
new_items = {item for d in new_discl for item in d[1]}
|
|
for item in new_items - old_items:
|
|
new_disclosures.append((bio, item))
|
|
|
|
party_changes = []
|
|
for bio in new_bios:
|
|
key = tuple(bio.name)
|
|
if key not in old_names:
|
|
continue
|
|
old_party = old_names[key].get("party", "")
|
|
if old_party != bio.party:
|
|
party_changes.append((bio, old_party))
|
|
|
|
date = datetime.now().strftime("%Y-%m-%d")
|
|
sections = []
|
|
|
|
if added:
|
|
sections.append("Neue Abgeordnete:\n" + "\n".join(
|
|
f"- {b.name[1]} {b.name[0]} ({b.party})" for b in added
|
|
))
|
|
|
|
if removed:
|
|
sections.append("Ausgeschieden:\n" + "\n".join(
|
|
f"- {b['name'][1]} {b['name'][0]} ({b.get('party', '')})" for b in removed
|
|
))
|
|
|
|
if party_changes:
|
|
sections.append("Parteiwechsel:\n" + "\n".join(
|
|
f"- {b.name[1]} {b.name[0]}: {old} -> {b.party}"
|
|
for b, old in party_changes
|
|
))
|
|
|
|
if new_disclosures:
|
|
sections.append("Neue Veröffentlichungen:\n" + "\n".join(
|
|
f"- {b.name[1]} {b.name[0]} ({b.party}): {item}"
|
|
for b, item in new_disclosures
|
|
))
|
|
|
|
updated = len(new_bios)
|
|
sections.append(f"{updated} Profile aktualisiert")
|
|
|
|
title = f"Aktualisierung {date}"
|
|
body = "\n\n".join(sections)
|
|
return f"{title}\n\n{body}"
|
|
|
|
|
|
def save_individuals(bios, out):
|
|
for rep in bios:
|
|
first_letter = rep.name[0][0].upper()
|
|
name_str = f"{rep.name[0]} {rep.name[1]}".replace(" ", "_")
|
|
dir = f"{out}/Abgeordnete/{first_letter}"
|
|
makedirs(dir, exist_ok=True)
|
|
with open(f"{dir}/{name_str}.md", "w", encoding="utf-8") as rep_file:
|
|
rep_file.write(str(rep))
|
|
|
|
|
|
def save_raw(bios, out):
|
|
with open(f"{out}/raw.json", "w", encoding="utf-8") as raw_file:
|
|
json.dump(
|
|
[bio.to_dict() for bio in bios], raw_file, indent=2, ensure_ascii=False
|
|
)
|
|
|
|
|
|
def save_disclosures(bios, out):
|
|
dir = f"{out}/Voep_Angaben"
|
|
makedirs(dir, exist_ok=True)
|
|
bios_with_discl = [bio for bio in bios if bio.disclosures]
|
|
alpha_str = ""
|
|
for bio in sorted(bios_with_discl, key=lambda b: b.name):
|
|
alpha_str += f"# {bio.name[1]} {bio.name[0]} ({bio.party})"
|
|
alpha_str += funcs_to_str(bio.disclosures)
|
|
alpha_str += "\n"
|
|
|
|
with open(f"{dir}/Alphabetisch.md", "w", encoding="utf-8") as alpha_file:
|
|
alpha_file.write(alpha_str.strip())
|
|
|
|
party_str = ""
|
|
for party, bio_list in group_by_party(bios_with_discl):
|
|
party_str += f"# {party}\n"
|
|
for bio in bio_list:
|
|
party_str += f"## {bio.name[1]} {bio.name[0]}"
|
|
party_str += funcs_to_str(bio.disclosures)
|
|
party_str += "\n"
|
|
|
|
with open(f"{dir}/Nach_Partei.md", "w", encoding="utf-8") as party_file:
|
|
party_file.write(party_str.strip())
|
|
|
|
|
|
def save_votes(bios, out):
|
|
dir = f"{out}/Abstimmungen"
|
|
makedirs(dir, exist_ok=True)
|
|
|
|
# Load existing votes to preserve those deleted from the website
|
|
json_path = f"{dir}/votes.json"
|
|
try:
|
|
with open(json_path, "r", encoding="utf-8") as f:
|
|
all_votes = json.load(f)
|
|
except FileNotFoundError:
|
|
all_votes = {}
|
|
|
|
# Merge new votes
|
|
for bio in bios:
|
|
if not bio.votes:
|
|
continue
|
|
rep_name = f"{bio.name[1]} {bio.name[0]}"
|
|
for vote in bio.votes:
|
|
topic, date, result = vote[0], vote[1], vote[2]
|
|
key = f"{topic} ({date})"
|
|
if key not in all_votes:
|
|
all_votes[key] = {}
|
|
all_votes[key][rep_name] = {"party": bio.party, "vote": result}
|
|
|
|
# Save JSON backing store
|
|
with open(json_path, "w", encoding="utf-8") as f:
|
|
json.dump(all_votes, f, indent=2, ensure_ascii=False)
|
|
|
|
# Generate one markdown file per vote topic
|
|
for key, votes in sorted(all_votes.items()):
|
|
md = f"# {key}\n\n"
|
|
for name in sorted(votes, key=lambda n: votes[n]["party"]):
|
|
info = votes[name]
|
|
md += f"- {name} ({info['party']}): {info['vote']}\n"
|
|
safe_name = re.sub(r'[/<>:"|?*]', "_", key)[:200]
|
|
with open(f"{dir}/{safe_name}.md", "w", encoding="utf-8") as f:
|
|
f.write(md)
|
|
|
|
|
|
def save_readme(bios, out):
|
|
date = datetime.now().strftime("%Y-%m-%d %H:%M")
|
|
total = len(bios)
|
|
parties = {}
|
|
for bio in bios:
|
|
parties[bio.party] = parties.get(bio.party, 0) + 1
|
|
|
|
md = f"# Bundestag\n\n"
|
|
md += f"Automatisch erfasste Daten der Abgeordneten des Deutschen Bundestages.\n\n"
|
|
md += f"**Abgeordnete:** {total} \n"
|
|
md += f"**Letzte Aktualisierung:** {date}\n\n"
|
|
md += "## Parteien\n\n"
|
|
md += "| Partei | Abgeordnete |\n"
|
|
md += "|--------|------------:|\n"
|
|
for party, count in sorted(parties.items()):
|
|
md += f"| [{party}](Parteien/{party}.md) | {count} |\n"
|
|
md += f"\n## Verzeichnisse\n\n"
|
|
md += "- [Abgeordnete](Abgeordnete/) — Einzelprofile nach Nachname\n"
|
|
md += "- [Abstimmungen](Abstimmungen/) — Abstimmungen nach Thema\n"
|
|
md += "- [Veröffentlichungspflichtige Angaben](Voep_Angaben/) — Nebentätigkeiten\n"
|
|
md += "- [Parteien](Parteien/) — Abgeordnete nach Partei\n"
|
|
|
|
with open(f"{out}/README.md", "w", encoding="utf-8") as f:
|
|
f.write(md)
|
|
|
|
|
|
def save_party_index(bios, out):
|
|
dir = f"{out}/Parteien"
|
|
makedirs(dir, exist_ok=True)
|
|
for party, bio_list in group_by_party(bios):
|
|
md = f"# {party}\n\n"
|
|
md += f"{len(bio_list)} Abgeordnete\n\n"
|
|
for bio in sorted(bio_list, key=lambda b: b.name):
|
|
first_letter = bio.name[0][0].upper()
|
|
name_str = f"{bio.name[0]} {bio.name[1]}".replace(" ", "_")
|
|
md += f"- [{bio.name[1]} {bio.name[0]}](../Abgeordnete/{first_letter}/{name_str}.md)"
|
|
if bio.job:
|
|
md += f" — {bio.job}"
|
|
md += "\n"
|
|
with open(f"{dir}/{party}.md", "w", encoding="utf-8") as f:
|
|
f.write(md)
|
|
|
|
|
|
def save_letter_indexes(bios, out):
|
|
by_letter = {}
|
|
for bio in bios:
|
|
letter = bio.name[0][0].upper()
|
|
if letter not in by_letter:
|
|
by_letter[letter] = []
|
|
by_letter[letter].append(bio)
|
|
for letter, bio_list in sorted(by_letter.items()):
|
|
md = f"# {letter}\n\n"
|
|
for bio in sorted(bio_list, key=lambda b: b.name):
|
|
name_str = f"{bio.name[0]} {bio.name[1]}".replace(" ", "_")
|
|
md += f"- [{bio.name[1]} {bio.name[0]}]({name_str}.md) ({bio.party})\n"
|
|
with open(f"{out}/Abgeordnete/{letter}/index.md", "w", encoding="utf-8") as f:
|
|
f.write(md)
|
|
|
|
|
|
def group_by_party(bios):
|
|
grouped = {}
|
|
for bio in bios:
|
|
if bio.party in grouped.keys():
|
|
grouped[bio.party].append(bio)
|
|
else:
|
|
grouped[bio.party] = [bio]
|
|
|
|
as_list = [(key, val) for key, val in grouped.items()]
|
|
as_list.sort(key=lambda party: party[0])
|
|
return as_list
|
|
|
|
|
|
def get_links_and_names():
|
|
response = requests.get(BUNDESTAG_URL)
|
|
soup = BeautifulSoup(response.text, features="html.parser")
|
|
links = [a.get("href") for a in soup.find_all("a")]
|
|
names = [a.text.strip().split("\n\n\n") for a in soup.find_all("a")]
|
|
|
|
return (links, names)
|
|
|
|
|
|
def get_bio(url, name, sleep_for):
|
|
name, party = name
|
|
name = name.split(", ")
|
|
party = party.replace("\n\xa0*", " (ausgeschieden)")
|
|
print(f"Getting {url} for {name[1]} {name[0]}")
|
|
response = request_handle_rate_limit(url)
|
|
soup = BeautifulSoup(response.text, features="html.parser")
|
|
intro_info = soup.find(class_="m-biography__introInfo")
|
|
job_elem = intro_info.find("span") if intro_info else None
|
|
job = job_elem.text if job_elem else None
|
|
cv_elem = soup.find(class_="m-biography__biography")
|
|
cv = cv_elem.text.strip() if cv_elem else ""
|
|
ajax_divs = soup.find_all(class_="m-ajaxLoadedContent")
|
|
speech_div = None
|
|
vote_div = None
|
|
for div in ajax_divs:
|
|
if "abstimmung" in div.get("x-data"):
|
|
vote_div = div
|
|
else:
|
|
speech_div = div
|
|
speech = get_ajax(speech_div)
|
|
speeches = parse_speech(speech)
|
|
vote = get_ajax(vote_div)
|
|
votes = parse_vote(vote)
|
|
function_divs = soup.find_all(class_="m-biography__memberships")
|
|
if len(function_divs) > 0:
|
|
functions = get_functions(function_divs[0])
|
|
else:
|
|
functions = None
|
|
if len(function_divs) > 1:
|
|
additional_functions = get_functions(function_divs[1])
|
|
else:
|
|
additional_functions = None
|
|
mandate_elem = soup.find(class_="m-biography__subHeading --mandate")
|
|
mandate = (
|
|
mandate_elem.text if mandate_elem else "",
|
|
soup.find(string=re.compile(r"^Wahlkreis \d*:")) or "",
|
|
)
|
|
disclosures = get_disclosures(soup.find(class_="m-biography__infos"))
|
|
|
|
bio = Biography(
|
|
name,
|
|
party,
|
|
job,
|
|
cv,
|
|
speeches,
|
|
votes,
|
|
functions,
|
|
additional_functions,
|
|
mandate,
|
|
disclosures,
|
|
)
|
|
|
|
sleep(sleep_for)
|
|
|
|
return bio
|
|
|
|
|
|
def request_handle_rate_limit(url):
|
|
for _ in range(5):
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
return response
|
|
except requests.RequestException:
|
|
print("Request failed! waiting 5min")
|
|
sleep(300)
|
|
return requests.get(url)
|
|
|
|
|
|
def get_disclosures(elem):
|
|
if not elem:
|
|
return None
|
|
divs = elem.find_all("div", class_=lambda cls: cls != "m-biography__infoDisclaimer")
|
|
out = []
|
|
for div in divs:
|
|
current_heading = ""
|
|
current_body = []
|
|
for child in div.children:
|
|
if child.name == "h3":
|
|
if current_body != []:
|
|
out.append((current_heading, current_body))
|
|
current_heading = child.text.strip()
|
|
current_body = []
|
|
continue
|
|
if not child.name:
|
|
continue
|
|
if child.text.strip() == "":
|
|
continue
|
|
if child.text.strip() == "Keine veröffentlichungspflichtigen Angaben.":
|
|
continue
|
|
current_body.append(child.text.strip())
|
|
if current_heading == "" and current_body == []:
|
|
continue
|
|
out.append((current_heading, current_body))
|
|
return out
|
|
|
|
|
|
def get_functions(elem):
|
|
out = []
|
|
current_heading = None
|
|
current_body = []
|
|
for child in elem.children:
|
|
if child.name == "h3":
|
|
if current_body != []:
|
|
out.append((current_heading, sorted(current_body)))
|
|
current_heading = child.text.strip()
|
|
current_body = []
|
|
continue
|
|
if not child.name:
|
|
continue
|
|
current_body.extend(
|
|
grandchild.text.strip().replace("\n\n\n(Interner Link)", "")
|
|
for grandchild in child.children
|
|
if grandchild.text.strip() != ""
|
|
)
|
|
out.append((current_heading, sorted(current_body)))
|
|
return sorted(out)
|
|
|
|
|
|
def parse_speech(page):
|
|
if not page:
|
|
return None
|
|
soup = BeautifulSoup(page.text, features="html.parser")
|
|
infos = [s.text.strip() for s in soup.find_all(class_="m-biography__speechTitle")]
|
|
titles = [
|
|
title.text.strip()
|
|
for title in soup.find_all(class_="a-link__label")
|
|
if "--hidden" not in title.get("class")
|
|
][::2]
|
|
return list(zip(titles, infos))
|
|
|
|
|
|
def parse_vote(page):
|
|
if not page:
|
|
return None
|
|
soup = BeautifulSoup(page.text, features="html.parser")
|
|
rows = soup.find_all("tr")[1:]
|
|
parsed = []
|
|
for row in rows:
|
|
cols = row.find_all("td")
|
|
parsed.append([col.text.strip() for col in cols])
|
|
return parsed
|
|
|
|
|
|
def get_ajax(elem):
|
|
if not elem:
|
|
return None
|
|
inner = elem.get("x-data").removeprefix("dynamicTemplateOutput(").removesuffix(")")
|
|
data = json.loads(inner)
|
|
url = BUNDESTAG_BASE_URL + data["endpoint"]
|
|
filters = data["filters"]
|
|
sanitized_filters = [
|
|
(key, value.replace(" ", "+").replace("#", "%23"))
|
|
for key, value in filters.items()
|
|
]
|
|
url = url + "?" + "&".join(f"{key}={val}" for key, val in sanitized_filters)
|
|
response = request_handle_rate_limit(url)
|
|
return response
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|