from blacksheep.client import ClientSession from blacksheep.contents import Content from blacksheep.messages import Response from blacksheep.server import Application from blacksheep.server.responses import not_found from json import loads as from_json from ticketfrei3.models import ( Fahrt, Fahrt_Pydantic, Fahrt_Pydantic_List, Halt, Halt_Pydantic, Halt_Pydantic_List, Haltestelle, Haltestelle_Pydantic, Haltestelle_Pydantic_List, ) from tortoise import Tortoise from tortoise.transactions import in_transaction from tortoise.contrib.pydantic import PydanticModel app = Application() async def init(app: Application) -> None: await Tortoise.init( db_url="sqlite://db.sqlite3", modules={"models": ["ticketfrei3.models"]} ) await Tortoise.generate_schemas() app.services.add_instance(ClientSession(connection_timeout=30)) app.on_start += init async def finalize(app: Application) -> None: client = app.service_provider.get(ClientSession) await client.close() await Tortoise.close_connections() app.on_stop += finalize @app.route("/api/fetch_haltestellen") async def fetch_haltestellen(client: ClientSession) -> None: response = await client.get("https://start.vag.de/dm/api/v1/haltestellen.json/vag") haltestellen = from_json(await response.text()) for haltestelle in haltestellen["Haltestellen"]: if await Haltestelle.exists(id=haltestelle["VGNKennung"]): continue await Haltestelle.create( id=haltestelle["VGNKennung"], name=haltestelle["Haltestellenname"], longitude=haltestelle.get("Longitude"), latitude=haltestelle.get("Latitude"), ) @app.route("/api/fetch_fahrten") async def fetch_fahrten(client: ClientSession) -> None: for zweig in ("bus", "tram", "ubahn"): response = await client.get( "https://start.vag.de/dm/api/v1/fahrten.json/%s?timespan=70" % zweig ) fahrten = from_json(await response.text()) for fahrt in fahrten["Fahrten"]: if await Fahrt.exists(id=fahrt["Fahrtnummer"]): continue async with in_transaction() as connection: fahrt = Fahrt(id=fahrt["Fahrtnummer"], linie=fahrt["Linienname"]) await fahrt.save(using_db=connection) response = await client.get( "https://start.vag.de/dm/api/v1/fahrten.json/%s/%d" % (zweig, fahrt.id) ) details = from_json(await response.text()) for halt in details["Fahrtverlauf"]: haltestelle = ( await Haltestelle.filter(id=halt["VGNKennung"]) .using_db(connection) .first() ) if haltestelle is None: haltestelle = Haltestelle( id=halt["VGNKennung"], name=halt["Haltestellenname"], longitude=halt.get("Longitude"), latitude=halt.get("Latitude"), ) await haltestelle.save(using_db=connection) await Halt( fahrt=fahrt, haltestelle=haltestelle, ankunft=halt.get("AnkunftszeitIst"), abfahrt=halt.get("AbfahrtszeitIst"), ).save(using_db=connection) def to_json(model: PydanticModel) -> Response: return Response( 200, content=Content(b"application/json", model.json(ensure_ascii=False).encode()), ) @app.router.get("/api/haltestellen") async def haltestellen() -> Response: return to_json(await Haltestelle_Pydantic_List.from_queryset(Haltestelle.all())) @app.router.get("/api/haltestelle/{id}") async def haltestelle(id: int) -> Response: haltestelle = await Haltestelle.filter(id=id).first() if haltestelle is None: return not_found() return to_json(await Haltestelle_Pydantic.from_tortoise_orm(haltestelle)) @app.router.get("/api/fahrten") async def fahrten() -> Response: return to_json(await Fahrt_Pydantic_List.from_queryset(Fahrt.all())) @app.router.get("/api/fahrt/{id}") async def fahrt(id: int) -> Response: fahrt = await Fahrt.filter(id=id).first() if fahrt is None: return not_found() return to_json(await Fahrt_Pydantic.from_tortoise_orm(fahrt)) @app.router.get("/api/halte") async def halte() -> Response: return to_json(await Halt_Pydantic_List.from_queryset(Halt.all())) @app.router.get("/api/halt/{id}") async def halt(id: int) -> Response: halt = await Halt.filter(id=id).first() if halt is None: return not_found() return to_json(await Halt_Pydantic.from_tortoise_orm(halt))