From e80538d13213a7b98df3c42a8840693c02e4130f Mon Sep 17 00:00:00 2001 From: Jochen Groppe Date: Fri, 13 Dec 2024 16:38:31 +0100 Subject: [PATCH] continue refactoring up to end --- Polestar_2_MQTT.py | 71 ++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/Polestar_2_MQTT.py b/Polestar_2_MQTT.py index 487b456..9b88c47 100755 --- a/Polestar_2_MQTT.py +++ b/Polestar_2_MQTT.py @@ -54,7 +54,7 @@ ##################################### # global init -# constants +# internal constants SLEEP_INTERVAL = 0.1 # API config @@ -228,6 +228,7 @@ def get_api_token(tokenRequestCode): return access_token, expiry_time, refresh_token +# login step by step def get_token(email, password): print(" get_path_token()") path_token, cookie = get_path_token() @@ -244,14 +245,14 @@ def get_token(email, password): ##################################### # read data from Polestar API -# Funktion zum Abrufen von Fahrzeugdaten +# get mostly static car data def get_car_data(vin, access_token): url = POLESTAR_API_URL_V2 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}" } - # Quelle für GQL-String: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql + # source for GQL string: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql payload = { "query": "query GetConsumerCarsV2 {" "getConsumerCarsV2 {" @@ -279,28 +280,28 @@ def get_car_data(vin, access_token): "variables": "{}" } response = requests.post(url, headers=headers, json=payload) + if response.status_code != 200: - print(" response.status_code = " + str(response.status_code)) - print(response) - raise Exception("get_car_data() Datenabfrage fehlgeschlagen") + wait_and_die(" response.status_code = {response.status_code}\n" + response, + "get_car_data() no data received") car_data = response.json()['data'] - # Filtern der Daten nach der gegebenen VIN + # get the car with given VIN filtered_car_data = next((car for car in car_data['getConsumerCarsV2'] if car['vin'] == vin), None) if filtered_car_data is None: - raise ValueError(f"get_car_data(): Keine Daten für VIN {vin} gefunden.") + raise ValueError(f"get_car_data(): no data for car with VIN {vin}") return filtered_car_data - -# Funktion zum Abrufen von Batteriedaten +# get battery data def get_battery_data(vin, access_token): url = POLESTAR_API_URL_V2 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}" } + # source for GQL string: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql payload = { "query": "query GetBatteryData($vin:String!) {" "getBatteryData(vin:$vin) {" @@ -323,20 +324,23 @@ def get_battery_data(vin, access_token): "variables": {"vin": vin} } response = requests.post(url, headers=headers, json=payload) + if response.status_code != 200: - print(" response.status_code = " + str(response.status_code)) - raise Exception("get_battery_data() Datenabfrage fehlgeschlagen") + wait_and_die(" response.status_code = {response.status_code}\n" + response, + "get_battery_data() no data received") + battery_data = response.json()['data'] + return battery_data -# Funktion zum Abrufen von Odometerdaten +# get odometer data def get_odometer_data(vin, access_token): url = POLESTAR_API_URL_V2 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {access_token}" } - # Quelle für GQL-String: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql + # source for GQL string: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql payload = { "query": "query GetOdometerData($vin:String!) {" "getOdometerData(vin:$vin) {" @@ -353,30 +357,33 @@ def get_odometer_data(vin, access_token): "variables": {"vin": vin} } response = requests.post(url, headers=headers, json=payload) + if response.status_code != 200: - print(" response.status_code = " + str(response.status_code)) - raise Exception("get_odometer_data() Datenabfrage fehlgeschlagen") + wait_and_die(" response.status_code = {response.status_code}\n" + response, + "get_odometer_data() no data received") + odometer_data = response.json()['data'] + return odometer_data -# Rekursives Durchlaufen des JSON-Objekts und Senden der Daten als MQTT-Nachrichten +# recursive parsing of the JSON object to build corresponding MQTT topics and send def publish_json_as_mqtt(topic, json_obj): if isinstance(json_obj, dict): for key, value in json_obj.items(): sub_topic = f"{topic}/{key}" - publish_json_as_mqtt(sub_topic, value) + publish_json_as_mqtt(sub_topic, value) # call self to resolve next json level else: json_payload = json.dumps(json_obj) client.publish(topic, json_payload, qos=1, retain=True) +# extract SoC from battery data JSON and send to openWB via MQTT def publish_soc_to_openwb(battery_data): if isinstance(battery_data, dict): soc = battery_data['getBatteryData']['batteryChargeLevelPercentage'] - print(f'publishing SoC {soc} to OpenWB {OPENWB_TOPIC}') - client_openwb.publish(OPENWB_TOPIC,soc,qos=1, retain=True) - + print(f' publish SoC {soc} to OpenWB {OPENWB_TOPIC}') + client_openwb.publish(OPENWB_TOPIC, soc, qos=1, retain=True) -# Hauptprogramm +# main program: init and loop forever def main(): print("Polestar_2_MQTT.py startet") print("==========================") @@ -386,15 +393,16 @@ def main(): last_battery_data = None last_odometer_data = None - # init sinal handler + # init sinal handler to catch SIGTERM signal.signal(signal.SIGTERM, signal_handler) - # Starten der MQTT-Verbindung + # start MQTT clients connect_mqtt() if (OPENWB_PUBLISH): connect_mqtt_openwb() while True: + # get new token if expiry_time == None or datetime.now() >= expiry_time: # TODO WORKAROUND: neu einloggen, statt refresh_token nutzen print("get_token()") @@ -432,7 +440,7 @@ def main(): for _ in range(int(POLESTAR_CYCLE/SLEEP_INTERVAL)): time.sleep(SLEEP_INTERVAL) -# Signal-Handler für SIGTERM +# signal handler for SIGTERM def signal_handler(sig, frame): print("SIGTERM received: stop run") server.shutdown() @@ -441,17 +449,18 @@ def signal_handler(sig, frame): client_openwb.disconnect() sys.exit(0) +# catch all exeptions in main to get tracheback output try: main() except Exception as e: - print(f"Fehler: {str(e)}") - exc_type, exc_value, exc_traceback = sys.exc_info() - # extrahiere die letzte Zeile des Tracebacks + # extract last line of traceback fpr line_number = traceback.extract_tb(exc_traceback)[-1][1] - print("Fehler in Zeile:", line_number) - print("Fehlertyp und Nachricht:", exc_type.__name__, exc_value) - + print(f"Error : {str(e)}") + print(f"in line: {line_number}") + print(f"type : {exc_type.__name__}") + print(f"message: {exc_value}") + print("************** Traceback ***************") print(traceback.format_exc()) # ***** EOF *****