From 15ad92c7e31e1029d02c06646e3c6f9a56b0b4f9 Mon Sep 17 00:00:00 2001 From: Franziska Kunsmann Date: Sat, 4 Jan 2025 09:24:16 +0100 Subject: [PATCH] fix code style --- voctopublish/api_client/mastodon_client.py | 6 +- voctopublish/api_client/voctoweb_client.py | 24 ++- voctopublish/api_client/youtube_client.py | 15 +- voctopublish/create-event-by-ticket.py | 201 +++++++++++++-------- 4 files changed, 158 insertions(+), 88 deletions(-) diff --git a/voctopublish/api_client/mastodon_client.py b/voctopublish/api_client/mastodon_client.py index 5900be0..a0a91a5 100644 --- a/voctopublish/api_client/mastodon_client.py +++ b/voctopublish/api_client/mastodon_client.py @@ -66,12 +66,12 @@ def send_toot(ticket, config): ) toot = mastodon.status_post( message, - language="en", # announcements are always in english + language="en", # announcements are always in english ) LOG.debug(toot) return { - 'id': toot['id'], - 'uri': toot['uri'], + "id": toot["id"], + "uri": toot["uri"], } except Exception as e_: # we don't care if tooting fails here. diff --git a/voctopublish/api_client/voctoweb_client.py b/voctopublish/api_client/voctoweb_client.py index d459e75..90a7ab2 100644 --- a/voctopublish/api_client/voctoweb_client.py +++ b/voctopublish/api_client/voctoweb_client.py @@ -437,10 +437,26 @@ def create_or_update_event(self): "subtitle": self.t.subtitle, "link": event_url, "original_language": self.t.languages[0], - "thumb_filename": (self.t.voctoweb_filename_base + ".jpg") if self.t.voctoweb_filename_base else "", - "poster_filename": (self.t.voctoweb_filename_base + "_preview.jpg") if self.t.voctoweb_filename_base else "", - "timeline_filename": (self.t.voctoweb_filename_base + ".timeline.jpg") if self.t.voctoweb_filename_base else "", - "thumbnails_filename": (self.t.voctoweb_filename_base + ".thumbnails.vtt") if self.t.voctoweb_filename_base else "", + "thumb_filename": ( + (self.t.voctoweb_filename_base + ".jpg") + if self.t.voctoweb_filename_base + else "" + ), + "poster_filename": ( + (self.t.voctoweb_filename_base + "_preview.jpg") + if self.t.voctoweb_filename_base + else "" + ), + "timeline_filename": ( + (self.t.voctoweb_filename_base + ".timeline.jpg") + if self.t.voctoweb_filename_base + else "" + ), + "thumbnails_filename": ( + (self.t.voctoweb_filename_base + ".thumbnails.vtt") + if self.t.voctoweb_filename_base + else "" + ), "description": "\n\n".join(description), "date": self.t.date, "persons": self.t.people, diff --git a/voctopublish/api_client/youtube_client.py b/voctopublish/api_client/youtube_client.py index 6916f48..cadd1c4 100644 --- a/voctopublish/api_client/youtube_client.py +++ b/voctopublish/api_client/youtube_client.py @@ -320,7 +320,9 @@ def upload(self, file, lang): if error_from_youtube: exception_message.append(error_from_youtube) else: - exception_message.append(f"Video creation failed with http status code {r.status_code}") + exception_message.append( + f"Video creation failed with http status code {r.status_code}" + ) exception_message.append(r.text) exception_message.append(json.dumps(metadata, indent=2)) @@ -458,7 +460,9 @@ def _replace_language_placeholders(self, string, lang): translation = self.translation_strings[lang] language_name = self.lang_map[lang] else: - raise YouTubeException("language not defined in translation strings, got") + raise YouTubeException( + "language not defined in translation strings, got" + ) return ( string.replace("${translation}", translation) @@ -563,7 +567,9 @@ def update_metadata(self, video_id, metadata): if error_from_youtube: exception_message.append(error_from_youtube) else: - exception_message.append(f"Video update failed with http status code {r.status_code}") + exception_message.append( + f"Video update failed with http status code {r.status_code}" + ) exception_message.append(r.text) exception_message.append(json.dumps(metadata, indent=2)) @@ -748,7 +754,8 @@ def get_fresh_token(refresh_token: str, client_id: str, client_secret: str): data = r.json() if "access_token" not in data: raise YouTubeException( - "fetching a fresh authToken did not return a access_token\n\n%s" % r.text + "fetching a fresh authToken did not return a access_token\n\n%s" + % r.text ) LOG.info("successfully fetched Access-Token %s" % data["access_token"]) diff --git a/voctopublish/create-event-by-ticket.py b/voctopublish/create-event-by-ticket.py index 0d7cddf..15639ca 100755 --- a/voctopublish/create-event-by-ticket.py +++ b/voctopublish/create-event-by-ticket.py @@ -15,24 +15,25 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import argparse import configparser -import socket -import sys import logging import os +import socket import subprocess -import requests -import argparse +import sys import time +import requests + try: # python 3.11 from tomllib import loads as toml_load except ImportError: from rtoml import load as toml_load -from c3tt_rpc_client import C3TTClient from api_client.voctoweb_client import VoctowebClient +from c3tt_rpc_client import C3TTClient from model.ticket_module import Ticket @@ -41,9 +42,10 @@ class RelivePublisher: This is the main class for the Voctopublish application It is meant to be used with the c3tt ticket tracker """ - def __init__(self, args = {}): + + def __init__(self, args={}): # load config - config_path = '../client.conf' + config_path = "../client.conf" if not os.path.exists(config_path): raise IOError("Error: config file not found") @@ -53,55 +55,73 @@ def __init__(self, args = {}): self.notfail = args.notfail # set up logging - logging.addLevelName(logging.WARNING, "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING)) - logging.addLevelName(logging.ERROR, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.ERROR)) - logging.addLevelName(logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO)) - logging.addLevelName(logging.DEBUG, "\033[1;85m%s\033[1;0m" % logging.getLevelName(logging.DEBUG)) + logging.addLevelName( + logging.WARNING, + "\033[1;33m%s\033[1;0m" % logging.getLevelName(logging.WARNING), + ) + logging.addLevelName( + logging.ERROR, "\033[1;41m%s\033[1;0m" % logging.getLevelName(logging.ERROR) + ) + logging.addLevelName( + logging.INFO, "\033[1;32m%s\033[1;0m" % logging.getLevelName(logging.INFO) + ) + logging.addLevelName( + logging.DEBUG, "\033[1;85m%s\033[1;0m" % logging.getLevelName(logging.DEBUG) + ) self.logger = logging.getLogger() sh = logging.StreamHandler(sys.stdout) - if self.config['general']['debug']: - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s {%(filename)s:%(lineno)d} %(message)s') + if self.config["general"]["debug"]: + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s {%(filename)s:%(lineno)d} %(message)s" + ) else: - formatter = logging.Formatter('%(asctime)s - %(message)s') + formatter = logging.Formatter("%(asctime)s - %(message)s") sh.setFormatter(formatter) self.logger.addHandler(sh) self.logger.setLevel(logging.DEBUG) - level = self.config['general']['debug'] - if level == 'info': + level = self.config["general"]["debug"] + if level == "info": self.logger.setLevel(logging.INFO if not args.verbose else logging.DEBUG) - elif level == 'warning': + elif level == "warning": self.logger.setLevel(logging.WARNING) - elif level == 'error': + elif level == "error": self.logger.setLevel(logging.ERROR) - elif level == 'debug': + elif level == "debug": self.logger.setLevel(logging.DEBUG) - if self.config['C3Tracker']['host'] == "None": + if self.config["C3Tracker"]["host"] == "None": self.host = socket.getfqdn() else: - self.host = 'relive-releasing' #self.config['C3Tracker']['host'] + self.host = "relive-releasing" # self.config['C3Tracker']['host'] - self.ticket_type = 'encoding' - self.to_state = 'releasing' + self.ticket_type = "encoding" + self.to_state = "releasing" - logging.debug('creating C3TTClient') + logging.debug("creating C3TTClient") try: + def get_var(key, env_key): if env_key in os.environ: return os.environ[env_key] else: - return self.config['C3Tracker'][key] - - self.c3tt = C3TTClient( self.config['C3Tracker']['url'], #get_var('url', 'CRS_TRACKER'), # CRS_TRACKER is https://tracker.c3voc.de/rpc but we need it without the rpc at the end, so do not use it for now... --Andi - get_var('group', 'CRS_TOKEN'), - self.host, - get_var('secret', 'CRS_SECRET')) + return self.config["C3Tracker"][key] + + self.c3tt = C3TTClient( + self.config["C3Tracker"][ + "url" + ], # get_var('url', 'CRS_TRACKER'), # CRS_TRACKER is https://tracker.c3voc.de/rpc but we need it without the rpc at the end, so do not use it for now... --Andi + get_var("group", "CRS_TOKEN"), + self.host, + get_var("secret", "CRS_SECRET"), + ) except Exception as e_: - raise PublisherException('Config parameter missing or empty, please check config') from e_ + raise PublisherException( + "Config parameter missing or empty, please check config" + ) from e_ def create_event(self, ticket_id): """ @@ -114,23 +134,29 @@ def create_event(self, ticket_id): # voctoweb if ticket._get_bool("Publishing.Voctoweb.EnableProfile"): - ticket.voctoweb_event_id = ticket._get_str("Voctoweb.EventId", optional=True) + ticket.voctoweb_event_id = ticket._get_str( + "Voctoweb.EventId", optional=True + ) ticket.voctoweb_slug = ticket._get_str("Publishing.Voctoweb.Slug") ticket.url = ticket._get_str("Fahrplan.URL") ticket.abstract = ticket._get_str("Fahrplan.Abstract", optional=True) ticket.description = ticket._get_str("Fahrplan.Description", optional=True) if ticket.abstract == ticket.description: ticket.abstract = None - ticket.links = ticket._get_list("Fahrplan.Links", optional=True, split_by=" ") + ticket.links = ticket._get_list( + "Fahrplan.Links", optional=True, split_by=" " + ) ticket.people = ticket._get_list("Fahrplan.Person_list") - ticket.license = ticket._get_str("Meta.License", optional=True, try_default=True) + ticket.license = ticket._get_str( + "Meta.License", optional=True, try_default=True + ) ticket.guid = ticket._get_str("Fahrplan.GUID") ticket.title = ticket._get_str("Fahrplan.Title") ticket.track = ticket._get_str("Fahrplan.Track", optional=True) ticket.room = ticket._get_str("Fahrplan.Room") ticket.subtitle = ticket._get_str("Fahrplan.Subtitle", optional=True) - lang_map = {'en': 'eng', 'de': 'deu'} # WORKAROUND - ticket.language = lang_map[ticket._get_str('Fahrplan.Language')] + lang_map = {"en": "eng", "de": "deu"} # WORKAROUND + ticket.language = lang_map[ticket._get_str("Fahrplan.Language")] ticket.languages = {0: ticket.language} ticket.fahrplan_id = ticket._get_str("Fahrplan.ID") ticket.slug = ticket._get_str("Fahrplan.Slug") @@ -163,7 +189,7 @@ def _get_ticket_by_id(self, ticket_id): Request a ticket from tracker :return: a ticket object or None in case no ticket is available """ - logging.info('requesting ticket from tracker') + logging.info("requesting ticket from tracker") t = None logging.info("Ticket ID:" + str(ticket_id)) @@ -172,32 +198,34 @@ def _get_ticket_by_id(self, ticket_id): logging.debug("Ticket Properties: " + str(ticket_properties)) except Exception as e_: raise e_ - ticket_properties['EncodingProfile.Slug'] = 'relive' - ticket_properties['Publishing.Voctoweb.EnableProfile'] = 'yes' + ticket_properties["EncodingProfile.Slug"] = "relive" + ticket_properties["Publishing.Voctoweb.EnableProfile"] = "yes" t = Ticket(ticket_properties, ticket_id, self.config) return t - - def _get_ticket_from_tracker(self): """ Request the next unassigned ticket for the configured states :return: a ticket object or None in case no ticket is available """ - logging.info('requesting ticket from tracker') + logging.info("requesting ticket from tracker") t = None ticket_meta = None # when we are in debug mode, we first check if we are already assigned to a ticket from previous run if self.notfail: - ticket_meta = self.c3tt.get_assigned_for_state(self.ticket_type, self.to_state, {'EncodingProfile.Slug': 'relive'}) + ticket_meta = self.c3tt.get_assigned_for_state( + self.ticket_type, self.to_state, {"EncodingProfile.Slug": "relive"} + ) # otherwhise, or if that was not successful get the next unassigned one if not ticket_meta: - ticket_meta = self.c3tt.assign_next_unassigned_for_state(self.ticket_type, self.to_state, {'EncodingProfile.Slug': 'relive'}) - + ticket_meta = self.c3tt.assign_next_unassigned_for_state( + self.ticket_type, self.to_state, {"EncodingProfile.Slug": "relive"} + ) + if ticket_meta: - ticket_id = ticket_meta['id'] + ticket_id = ticket_meta["id"] logging.info("Ticket ID:" + str(ticket_id)) try: ticket_properties = self.c3tt.get_ticket_properties(ticket_id) @@ -208,7 +236,9 @@ def _get_ticket_from_tracker(self): raise e_ t = Ticket(ticket_meta, ticket_properties) else: - logging.info('No ticket of type ' + self.ticket_type + ' for state ' + self.to_state) + logging.info( + "No ticket of type " + self.ticket_type + " for state " + self.to_state + ) return t @@ -217,44 +247,51 @@ def _publish_event_to_voctoweb(self, ticket): Create a event on an voctomix instance. This includes creating a recording for each media file. """ try: - vw = VoctowebClient(ticket, - None, - self.config['voctoweb']['api_key'], - self.config['voctoweb']['api_url'], - self.config['voctoweb']['ssh_host'], - self.config['voctoweb']['ssh_port'], - self.config['voctoweb']['ssh_user'], - self.config['voctoweb']['frontend_url']) + vw = VoctowebClient( + ticket, + None, + self.config["voctoweb"]["api_key"], + self.config["voctoweb"]["api_url"], + self.config["voctoweb"]["ssh_host"], + self.config["voctoweb"]["ssh_port"], + self.config["voctoweb"]["ssh_user"], + self.config["voctoweb"]["frontend_url"], + ) except Exception as e_: - raise PublisherException('Error initializing voctoweb client. Config parameter missing') from e_ + raise PublisherException( + "Error initializing voctoweb client. Config parameter missing" + ) from e_ - if not(ticket.voctoweb_event_id): + if not (ticket.voctoweb_event_id): # if this is master ticket we need to check if we need to create an event on voctoweb # check if event exists on voctoweb instance, and abort if this is already the case - r = requests.get(f"{self.config['voctoweb']['frontend_url']}/public/events/{ticket.voctoweb_event_id}") + r = requests.get( + f"{self.config['voctoweb']['frontend_url']}/public/events/{ticket.voctoweb_event_id}" + ) if r.status_code == 204: print(r.content) print(r.status_code) - #self.c3tt.set_ticket_done(ticket) - print('event already exists, abort!') + # self.c3tt.set_ticket_done(ticket) + print("event already exists, abort!") return - - logging.debug('this is a master ticket') + logging.debug("this is a master ticket") r = vw.create_or_update_event() if r.status_code in [200, 201]: logging.info("new event created or existing updated") - self.c3tt.set_ticket_properties(ticket.id, {'Voctoweb.EventId': r.json()['id']}) - - ''' + self.c3tt.set_ticket_properties( + ticket.id, {"Voctoweb.EventId": r.json()["id"]} + ) + + """ try: # we need to write the Event ID onto the parent ticket, so the other (master) encoding tickets # also have acccess to the Voctoweb Event ID self.c3tt.set_ticket_properties(ticket.parent_id, {'Voctoweb.EventId': r.json()['id']}) except Exception as e_: raise PublisherException('failed to Voctoweb EventID to parent ticket') from e_ - ''' + """ elif r.status_code == 422: # If this happens tracker and voctoweb are out of sync regarding the event id @@ -262,26 +299,36 @@ def _publish_event_to_voctoweb(self, ticket): # todo: write voctoweb event_id to ticket properties --Andi logging.warning("event already exists => please sync event manually") else: - raise PublisherException('Voctoweb returned an error while creating an event: ' + str(r.status_code) + ' - ' + str(r.content)) + raise PublisherException( + "Voctoweb returned an error while creating an event: " + + str(r.status_code) + + " - " + + str(r.content) + ) else: logging.info("nothing to to, is already pubilshed") - - - #self.c3tt.set_ticket_done(ticket) + # self.c3tt.set_ticket_done(ticket) class PublisherException(Exception): pass -if __name__ == '__main__': - parser = argparse.ArgumentParser(description='generate events on voctoweb for relive ') - parser.add_argument('ticket', action="store", help="CRS ID of a tracker ticket") - parser.add_argument('--verbose', '-v', action='store_true', default=False) - parser.add_argument('--notfail', action='store_true', default=False, help='do not mark ticket as failed in tracker when something goes wrong') - - args = parser.parse_args() +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="generate events on voctoweb for relive " + ) + parser.add_argument("ticket", action="store", help="CRS ID of a tracker ticket") + parser.add_argument("--verbose", "-v", action="store_true", default=False) + parser.add_argument( + "--notfail", + action="store_true", + default=False, + help="do not mark ticket as failed in tracker when something goes wrong", + ) + + args = parser.parse_args() try: publisher = RelivePublisher(args)