Skip to content

Commit

Permalink
continue refactoring
Browse files Browse the repository at this point in the history
up to end
  • Loading branch information
CONSULitAS committed Dec 13, 2024
1 parent 70cd52a commit e80538d
Showing 1 changed file with 40 additions and 31 deletions.
71 changes: 40 additions & 31 deletions Polestar_2_MQTT.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
#####################################
# global init

# constants
# internal constants
SLEEP_INTERVAL = 0.1

# API config
Expand Down Expand Up @@ -228,6 +228,7 @@ def get_api_token(tokenRequestCode):

return access_token, expiry_time, refresh_token

# login step by step
def get_token(email, password):
print(" get_path_token()")
path_token, cookie = get_path_token()
Expand All @@ -244,14 +245,14 @@ def get_token(email, password):
#####################################
# read data from Polestar API

# Funktion zum Abrufen von Fahrzeugdaten
# get mostly static car data
def get_car_data(vin, access_token):
url = POLESTAR_API_URL_V2
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
# Quelle für GQL-String: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql
# source for GQL string: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql
payload = {
"query": "query GetConsumerCarsV2 {"
"getConsumerCarsV2 {"
Expand Down Expand Up @@ -279,28 +280,28 @@ def get_car_data(vin, access_token):
"variables": "{}"
}
response = requests.post(url, headers=headers, json=payload)

if response.status_code != 200:
print(" response.status_code = " + str(response.status_code))
print(response)
raise Exception("get_car_data() Datenabfrage fehlgeschlagen")
wait_and_die(" response.status_code = {response.status_code}\n" + response,
"get_car_data() no data received")
car_data = response.json()['data']

# Filtern der Daten nach der gegebenen VIN
# get the car with given VIN
filtered_car_data = next((car for car in car_data['getConsumerCarsV2'] if car['vin'] == vin), None)

if filtered_car_data is None:
raise ValueError(f"get_car_data(): Keine Daten für VIN {vin} gefunden.")
raise ValueError(f"get_car_data(): no data for car with VIN {vin}")

return filtered_car_data


# Funktion zum Abrufen von Batteriedaten
# get battery data
def get_battery_data(vin, access_token):
url = POLESTAR_API_URL_V2
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
# source for GQL string: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql
payload = {
"query": "query GetBatteryData($vin:String!) {"
"getBatteryData(vin:$vin) {"
Expand All @@ -323,20 +324,23 @@ def get_battery_data(vin, access_token):
"variables": {"vin": vin}
}
response = requests.post(url, headers=headers, json=payload)

if response.status_code != 200:
print(" response.status_code = " + str(response.status_code))
raise Exception("get_battery_data() Datenabfrage fehlgeschlagen")
wait_and_die(" response.status_code = {response.status_code}\n" + response,
"get_battery_data() no data received")

battery_data = response.json()['data']

return battery_data

# Funktion zum Abrufen von Odometerdaten
# get odometer data
def get_odometer_data(vin, access_token):
url = POLESTAR_API_URL_V2
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {access_token}"
}
# Quelle für GQL-String: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql
# source for GQL string: https://github.com/evcc-io/evcc/blob/master/vehicle/polestar/query.gql
payload = {
"query": "query GetOdometerData($vin:String!) {"
"getOdometerData(vin:$vin) {"
Expand All @@ -353,30 +357,33 @@ def get_odometer_data(vin, access_token):
"variables": {"vin": vin}
}
response = requests.post(url, headers=headers, json=payload)

if response.status_code != 200:
print(" response.status_code = " + str(response.status_code))
raise Exception("get_odometer_data() Datenabfrage fehlgeschlagen")
wait_and_die(" response.status_code = {response.status_code}\n" + response,
"get_odometer_data() no data received")

odometer_data = response.json()['data']

return odometer_data

# Rekursives Durchlaufen des JSON-Objekts und Senden der Daten als MQTT-Nachrichten
# recursive parsing of the JSON object to build corresponding MQTT topics and send
def publish_json_as_mqtt(topic, json_obj):
if isinstance(json_obj, dict):
for key, value in json_obj.items():
sub_topic = f"{topic}/{key}"
publish_json_as_mqtt(sub_topic, value)
publish_json_as_mqtt(sub_topic, value) # call self to resolve next json level
else:
json_payload = json.dumps(json_obj)
client.publish(topic, json_payload, qos=1, retain=True)

# extract SoC from battery data JSON and send to openWB via MQTT
def publish_soc_to_openwb(battery_data):
if isinstance(battery_data, dict):
soc = battery_data['getBatteryData']['batteryChargeLevelPercentage']
print(f'publishing SoC {soc} to OpenWB {OPENWB_TOPIC}')
client_openwb.publish(OPENWB_TOPIC,soc,qos=1, retain=True)

print(f' publish SoC {soc} to OpenWB {OPENWB_TOPIC}')
client_openwb.publish(OPENWB_TOPIC, soc, qos=1, retain=True)

# Hauptprogramm
# main program: init and loop forever
def main():
print("Polestar_2_MQTT.py startet")
print("==========================")
Expand All @@ -386,15 +393,16 @@ def main():
last_battery_data = None
last_odometer_data = None

# init sinal handler
# init sinal handler to catch SIGTERM
signal.signal(signal.SIGTERM, signal_handler)

# Starten der MQTT-Verbindung
# start MQTT clients
connect_mqtt()
if (OPENWB_PUBLISH):
connect_mqtt_openwb()

while True:
# get new token
if expiry_time == None or datetime.now() >= expiry_time:
# TODO WORKAROUND: neu einloggen, statt refresh_token nutzen
print("get_token()")
Expand Down Expand Up @@ -432,7 +440,7 @@ def main():
for _ in range(int(POLESTAR_CYCLE/SLEEP_INTERVAL)):
time.sleep(SLEEP_INTERVAL)

# Signal-Handler für SIGTERM
# signal handler for SIGTERM
def signal_handler(sig, frame):
print("SIGTERM received: stop run")
server.shutdown()
Expand All @@ -441,17 +449,18 @@ def signal_handler(sig, frame):
client_openwb.disconnect()
sys.exit(0)

# catch all exeptions in main to get tracheback output
try:
main()
except Exception as e:
print(f"Fehler: {str(e)}")

exc_type, exc_value, exc_traceback = sys.exc_info()
# extrahiere die letzte Zeile des Tracebacks
# extract last line of traceback fpr
line_number = traceback.extract_tb(exc_traceback)[-1][1]
print("Fehler in Zeile:", line_number)
print("Fehlertyp und Nachricht:", exc_type.__name__, exc_value)

print(f"Error : {str(e)}")
print(f"in line: {line_number}")
print(f"type : {exc_type.__name__}")
print(f"message: {exc_value}")
print("************** Traceback ***************")
print(traceback.format_exc())

# ***** EOF *****

0 comments on commit e80538d

Please sign in to comment.