399 lines
11 KiB
Python
399 lines
11 KiB
Python
import requests
|
|
import re
|
|
import json
|
|
from bs4 import BeautifulSoup
|
|
from time import sleep
|
|
from os.path import commonprefix
|
|
from os import makedirs
|
|
from git import Repo
|
|
import argparse
|
|
from datetime import datetime
|
|
|
|
BUNDESTAG_URL = "https://www.bundestag.de/ajax/filterlist/de/abgeordnete/biografien/1040594-1040594?limit=9999&view=BTBiographyList"
|
|
BUNDESTAG_BASE_URL = "https://www.bundestag.de"
|
|
|
|
|
|
class Biography:
|
|
def __init__(
|
|
self,
|
|
name,
|
|
party,
|
|
job,
|
|
cv,
|
|
speeches,
|
|
votes,
|
|
functions,
|
|
additional_functions,
|
|
mandate,
|
|
disclosures,
|
|
):
|
|
self.name = name
|
|
self.party = party
|
|
self.job = job
|
|
self.cv = cv
|
|
self.speeches = speeches
|
|
self.votes = votes
|
|
self.functions = functions
|
|
self.additional_functions = additional_functions
|
|
self.mandate = mandate
|
|
self.disclosures = disclosures
|
|
|
|
def __repr__(self):
|
|
txt = f"""
|
|
name: {self.name}
|
|
party: {self.party}
|
|
cv: {self.cv}
|
|
speeches: {self.speeches}
|
|
votes: {self.votes}
|
|
functions: {self.functions}
|
|
additional_functions: {self.additional_functions}
|
|
mandate: {self.mandate}
|
|
disclosures: {self.disclosures}
|
|
"""
|
|
return txt
|
|
|
|
def __str__(self):
|
|
if self.speeches:
|
|
speeches_str = "".join(
|
|
[f"\n- {speech[0]}: {speech[1]}" for speech in self.speeches]
|
|
)
|
|
else:
|
|
speeches_str = ""
|
|
|
|
if self.votes:
|
|
votes_str = "".join(
|
|
[f"\n- {vote[0]}, {vote[1]}: {vote[2]}" for vote in self.votes]
|
|
)
|
|
else:
|
|
votes_str = ""
|
|
|
|
if self.job:
|
|
job_str = self.job
|
|
else:
|
|
job_str = ""
|
|
|
|
txt = f"""
|
|
# Persönliche Angaben
|
|
Name: {self.name[1]} {self.name[0]}
|
|
|
|
Partei: {self.party}
|
|
|
|
Beruf: {job_str}
|
|
|
|
Biographie: {self.cv}
|
|
|
|
# Reden {speeches_str}
|
|
|
|
# Abstimmungen {votes_str}
|
|
|
|
# Funktionen
|
|
## Ämter im Bundestag {funcs_to_str(self.functions)}
|
|
|
|
## Sonstige Gremien {funcs_to_str(self.additional_functions)}
|
|
|
|
# Mandat
|
|
{self.mandate[0]}, {self.mandate[1]}
|
|
|
|
# Veröffentlichungspflichtige Angaben {funcs_to_str(self.disclosures)}
|
|
"""
|
|
return txt
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"name": self.name,
|
|
"party": self.party,
|
|
"cv": self.cv,
|
|
"speeches": self.speeches,
|
|
"votes": self.votes,
|
|
"functions": self.functions,
|
|
"additional_functions": self.additional_functions,
|
|
"mandate": self.mandate,
|
|
"disclosures": self.disclosures,
|
|
}
|
|
|
|
|
|
def funcs_to_str(funcs):
|
|
if not funcs:
|
|
return ""
|
|
out = ""
|
|
for func in funcs:
|
|
out += f"\n- {func[0]}"
|
|
for loc in sorted(func[1]):
|
|
out += f"\n - {loc}"
|
|
return out
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
prog="Bundescrawler",
|
|
description="Crawls the pages of german representatives and saves the information in a git repository",
|
|
)
|
|
parser.add_argument("-o", "--out")
|
|
parser.add_argument("--debug", action="store_true")
|
|
parser.add_argument("--no-git", action="store_true")
|
|
args = parser.parse_args()
|
|
if not args.out:
|
|
raise ValueError("must supply out directory")
|
|
repo = Repo(args.out)
|
|
links, names = get_links_and_names()
|
|
if args.debug:
|
|
links = links[:5]
|
|
names = names[:5]
|
|
sleep_for = 0
|
|
else:
|
|
sleep_for = 10
|
|
|
|
bios = [get_bio(link, name, sleep_for) for link, name in zip(links, names)]
|
|
|
|
if not args.debug:
|
|
save_raw(bios, args.out)
|
|
save_individuals(bios, args.out)
|
|
save_disclosures(bios, args.out)
|
|
|
|
if args.no_git:
|
|
return
|
|
|
|
if repo.git.diff(name_only=True) == "":
|
|
return
|
|
|
|
repo.git.add("*")
|
|
repo.index.commit(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
|
origin = repo.remote(name="origin")
|
|
origin.push()
|
|
|
|
|
|
def save_individuals(bios, out):
|
|
for rep in bios:
|
|
first_letter = rep.name[0][0].upper()
|
|
name_str = f"{rep.name[0]} {rep.name[1]}".replace(" ", "_")
|
|
dir = f"{out}/Abgeordnete/{first_letter}"
|
|
try:
|
|
makedirs(dir)
|
|
except FileExistsError:
|
|
pass
|
|
with open(f"{dir}/{name_str}.md", "w") as rep_file:
|
|
rep_file.write(str(rep))
|
|
|
|
|
|
def save_raw(bios, out):
|
|
with open(f"{out}/raw.json", "w") as raw_file:
|
|
json.dump(
|
|
[bio.to_dict() for bio in bios], raw_file, indent=2, ensure_ascii=False
|
|
)
|
|
|
|
|
|
def save_disclosures(bios, out):
|
|
dir = f"{out}/Voep_Angaben"
|
|
try:
|
|
makedirs(dir)
|
|
except FileExistsError:
|
|
pass
|
|
bios_with_discl = [bio for bio in bios if bio.disclosures]
|
|
alpha_str = ""
|
|
for bio in sorted(bios_with_discl, key=lambda b: b.name):
|
|
alpha_str += f"# {bio.name[1]} {bio.name[0]} ({bio.party})"
|
|
alpha_str += funcs_to_str(bio.disclosures)
|
|
alpha_str += "\n"
|
|
|
|
with open(f"{dir}/Alphabetisch.md", "w") as alpha_file:
|
|
alpha_file.write(alpha_str.strip())
|
|
|
|
party_str = ""
|
|
for party, bio_list in group_by_party(bios_with_discl):
|
|
party_str += f"# {party}\n"
|
|
for bio in bio_list:
|
|
party_str += f"## {bio.name[1]} {bio.name[0]}"
|
|
party_str += funcs_to_str(bio.disclosures)
|
|
party_str += "\n"
|
|
|
|
with open(f"{dir}/Nach_Partei.md", "w") as party_file:
|
|
party_file.write(party_str.strip())
|
|
|
|
|
|
def group_by_party(bios):
|
|
grouped = {}
|
|
for bio in bios:
|
|
if bio.party in grouped.keys():
|
|
grouped[bio.party].append(bio)
|
|
else:
|
|
grouped[bio.party] = [bio]
|
|
|
|
as_list = [(key, val) for key, val in grouped.items()]
|
|
as_list.sort(key=lambda party: party[0])
|
|
return as_list
|
|
|
|
|
|
def get_links_and_names():
|
|
response = requests.get(BUNDESTAG_URL)
|
|
soup = BeautifulSoup(response.content, features="html.parser")
|
|
links = [a.get("href") for a in soup.find_all("a")]
|
|
names = [a.text.strip().split("\n\n\n") for a in soup.find_all("a")]
|
|
|
|
return (links, names)
|
|
|
|
|
|
def get_bio(url, name, sleep_for):
|
|
name, party = name
|
|
name = name.split(", ")
|
|
party = party.replace("\n\xa0*", " (ausgeschieden)")
|
|
print(f"Getting {url} for {name[1]} {name[0]}")
|
|
response = request_handle_rate_limit(url)
|
|
soup = BeautifulSoup(response.content, features="html.parser")
|
|
job_elem = soup.find(class_="m-biography__introInfo").find("span")
|
|
if job_elem:
|
|
job = job_elem.text
|
|
else:
|
|
job = None
|
|
cv = soup.find(class_="m-biography__biography").text.strip()
|
|
ajax_divs = soup.find_all(class_="m-ajaxLoadedContent")
|
|
speech_div = None
|
|
vote_div = None
|
|
for div in ajax_divs:
|
|
if "abstimmung" in div.get("x-data"):
|
|
vote_div = div
|
|
else:
|
|
speech_div = div
|
|
speech = get_ajax(speech_div)
|
|
speeches = parse_speech(speech)
|
|
vote = get_ajax(vote_div)
|
|
votes = parse_vote(vote)
|
|
function_divs = soup.find_all(class_="m-biography__memberships")
|
|
if len(function_divs) > 0:
|
|
functions = get_functions(function_divs[0])
|
|
else:
|
|
functions = None
|
|
if len(function_divs) > 1:
|
|
additional_functions = get_functions(function_divs[1])
|
|
else:
|
|
additional_functions = None
|
|
mandate = (
|
|
soup.find(class_="m-biography__subHeading --mandate").text,
|
|
soup.find(string=re.compile(r"^Wahlkreis \d*:")),
|
|
)
|
|
disclosures = get_disclosures(soup.find(class_="m-biography__infos"))
|
|
|
|
bio = Biography(
|
|
name,
|
|
party,
|
|
job,
|
|
cv,
|
|
speeches,
|
|
votes,
|
|
functions,
|
|
additional_functions,
|
|
mandate,
|
|
disclosures,
|
|
)
|
|
|
|
sleep(sleep_for)
|
|
|
|
return bio
|
|
|
|
|
|
def request_handle_rate_limit(url):
|
|
for _ in range(5):
|
|
try:
|
|
return requests.get(url)
|
|
except:
|
|
print("Rate limit! waiting 5min")
|
|
sleep(300)
|
|
return requests.get(url)
|
|
|
|
|
|
def get_disclosures(elem):
|
|
if not elem:
|
|
return None
|
|
divs = elem.find_all("div", class_=lambda cls: cls != "m-biography__infoDisclaimer")
|
|
out = []
|
|
for div in divs:
|
|
current_heading = ""
|
|
current_body = []
|
|
for child in div.children:
|
|
if child.name == "h3":
|
|
if current_body != []:
|
|
out.append((current_heading, current_body))
|
|
current_heading = child.text.strip()
|
|
current_body = []
|
|
continue
|
|
if not child.name:
|
|
continue
|
|
if child.text.strip() == "":
|
|
continue
|
|
if child.text.strip() == "Keine veröffentlichungspflichtigen Angaben.":
|
|
continue
|
|
current_body.append(child.text.strip())
|
|
if current_heading == "" and current_body == []:
|
|
continue
|
|
out.append((current_heading, current_body))
|
|
return out
|
|
|
|
|
|
def get_functions(elem):
|
|
out = []
|
|
current_heading = None
|
|
current_body = []
|
|
for child in elem.children:
|
|
if child.name == "h3":
|
|
if current_body != []:
|
|
out.append((current_heading, sorted(current_body)))
|
|
current_heading = child.text.strip()
|
|
current_body = []
|
|
continue
|
|
if not child.name:
|
|
continue
|
|
current_body.extend(
|
|
grandchild.text.strip().replace("\n\n\n(Interner Link)", "")
|
|
for grandchild in child.children
|
|
if grandchild.text.strip() != ""
|
|
)
|
|
out.append((current_heading, sorted(current_body)))
|
|
return sorted(out)
|
|
|
|
|
|
def parse_speech(page):
|
|
if not page:
|
|
return None
|
|
soup = BeautifulSoup(page.content, features="html.parser")
|
|
infos = [s.text.strip() for s in soup.find_all(class_="m-biography__speechTitle")]
|
|
titles = [
|
|
title.text.strip()
|
|
for title in soup.find_all(class_="a-link__label")
|
|
if "--hidden" not in title.get("class")
|
|
][::2]
|
|
return list(zip(titles, infos))
|
|
|
|
|
|
def parse_vote(page):
|
|
if not page:
|
|
return None
|
|
soup = BeautifulSoup(page.content, features="html.parser")
|
|
rows = soup.find_all("tr")[1:]
|
|
parsed = []
|
|
for row in rows:
|
|
cols = row.find_all("td")
|
|
parsed.append([col.text.strip() for col in cols])
|
|
return parsed
|
|
|
|
|
|
def get_ajax(elem):
|
|
if not elem:
|
|
return None
|
|
inner = elem.get("x-data").lstrip("dynamicTemplateOutput(").rstrip(")")
|
|
data = json.loads(inner)
|
|
url = BUNDESTAG_BASE_URL + data["endpoint"]
|
|
filters = data["filters"]
|
|
sanitized_filters = [
|
|
(key, value.replace(" ", "+").replace("#", "%23"))
|
|
for key, value in filters.items()
|
|
]
|
|
url = url + "?" + "&".join(f"{key}={val}" for key, val in sanitized_filters)
|
|
response = request_handle_rate_limit(url)
|
|
return response
|
|
|
|
|
|
def common_suffix(strings):
|
|
return commonprefix([s[::-1] for s in strings])[::-1]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|