diff --git a/deploy/k8s/config/templates/cm.common.yaml b/deploy/k8s/config/templates/cm.common.yaml index 604dbada..6759d69e 100644 --- a/deploy/k8s/config/templates/cm.common.yaml +++ b/deploy/k8s/config/templates/cm.common.yaml @@ -1,7 +1,7 @@ apiVersion: v1 data: F7T_DEBUG_MODE: {{ .Values.global.F7T_DEBUG_MODE | default "False" | quote }} - F7T_GUNICORN_WORKER: {{ .Values.global.F7T_GUNICORN_WORKER | default "--workers=3 --threads=3" | quote }} + F7T_GUNICORN_WORKER: {{ .Values.global.F7T_GUNICORN_WORKER | default "--workers=1 --threads=1" | quote }} F7T_AUTH_HEADER_NAME: {{ .Values.F7T_AUTH_HEADER_NAME | default "Authorization" | quote }} F7T_AUTH_REQUIRED_SCOPE: {{ .Values.F7T_AUTH_REQUIRED_SCOPE | default "" | quote }} F7T_AUTH_ROLE: {{ .Values.F7T_AUTH_ROLE | default "" | quote }} diff --git a/src/common/async_task.py b/src/common/async_task.py index dd22f8ce..b9a08d2d 100644 --- a/src/common/async_task.py +++ b/src/common/async_task.py @@ -83,6 +83,22 @@ def __init__(self,task_id,user,service=None,system=None,data=None,created_at=Non self.created_at = created_at self.updated_at = self.created_at + @classmethod + def deserialise(cls, value): + + status = value["status"] + user = value["user"] + data = value["data"] + service = value["service"] + system = value["system"] + created_at = value["created_at"] + task_id = value["task_id"] + + task = AsyncTask(task_id,user,service=service,system=system,created_at=created_at) + task.set_status(status,data) + + return task + # create hash_id as user-task_id MD5 encoded string # used for public access to info in Queue def get_hashid(self,task_id,user): diff --git a/src/common/tasks_persistence.py b/src/common/tasks_persistence.py index fbc0ca13..1387e5e4 100644 --- a/src/common/tasks_persistence.py +++ b/src/common/tasks_persistence.py @@ -241,6 +241,7 @@ def get_user_tasks(r,user,task_list=None, status_code=None) -> Union[dict,None]: # logging.info(json_task) # decode because redis stores it in Bytes not string task = json.loads(json_task.decode('latin-1')) + task["task_id"]=key_parts(task_id.decode('latin-1'))[2] try: _user = task["user"] diff --git a/src/tasks/tasks.py b/src/tasks/tasks.py index 829cb852..f811782f 100644 --- a/src/tasks/tasks.py +++ b/src/tasks/tasks.py @@ -41,8 +41,7 @@ DEBUG_MODE = get_boolean_var(os.environ.get("F7T_DEBUG_MODE", False)) -# task dict, key is the task_id -tasks = {} + app = Flask(__name__) @@ -81,24 +80,6 @@ def init_queue(): app.logger.error("Tasks microservice cannot be started") return - # dictionary: [task_id] = {hash_id,status_code,user,data} - task_list = persistence.get_all_tasks(r) - - # key = task_id ; values = {status_code,user,data} - for task_id, value in task_list.items(): - - - status = value["status"] - user = value["user"] - data = value["data"] - service = value["service"] - system = value["system"] - created_at = value["created_at"] - - t = async_task.AsyncTask(task_id,user,service=service,system=system,created_at=created_at) - t.set_status(status,data) - tasks[t.hash_id] = t - # init Redis connection init_queue() @@ -189,7 +170,6 @@ def create_task(): t = async_task.AsyncTask(task_id=str(task_id), user=username, service=service, system=system,data=init_data) - tasks[t.hash_id] = t if JAEGER_AGENT != "": try: span = tracing.get_span(request) @@ -237,20 +217,28 @@ def get_task(id): # for better knowledge of what this id is hash_id = id + + + + try: - if not tasks[hash_id].is_owner(username): - return jsonify(description="Operation not permitted. Invalid task owner."), 403 + global r + current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id]) + + #if not user_tasks[hash_id].is_owner(username): + # return jsonify(description="Operation not permitted. Invalid task owner."), 403 + #task_status=user_tasks[hash_id].get_status() + #task_status["task_url"] = f"{KONG_URL}/tasks/{hash_id}" + current_task = current_task.get_status() + current_task["task_url"] = f"/tasks/{hash_id}" + data = jsonify(task=current_task) - task_status=tasks[hash_id].get_status() - task_status["task_url"] = f"/tasks/{hash_id}" - data = jsonify(task=task_status) return data, 200 except KeyError: data = jsonify(error=f"Task {id} does not exist") return data, 404 - # update status of the task with task_id = id @app.route("/",methods=["PUT"]) def update_task(id): @@ -277,52 +265,66 @@ def update_task(id): # for better knowledge of what this id is hash_id = id + auth_header = request.headers[AUTH_HEADER_NAME] + # getting username from auth_header + is_username_ok = get_username(auth_header) + + if not is_username_ok["result"]: + app.logger.error(f"Couldn't extract username from JWT token: {is_username_ok['reason']}") + return jsonify(description=f"Couldn't retrieve task. Reason: {is_username_ok['reason']}"), 401 + + username = is_username_ok["username"] + + # check if task exist try: - current_task=tasks[hash_id] - except KeyError: - data = jsonify(error=f"Task {hash_id} does not exist") - return data, 404 + global r + current_task = async_task.AsyncTask.deserialise(persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id]) + + - if JAEGER_AGENT != "": - try: - span = tracing.get_span(request) - span.set_tag('f7t_task_id', hash_id) - except Exception as e: - app.logger.info(e) + if JAEGER_AGENT != "": + try: + span = tracing.get_span(request) + span.set_tag('f7t_task_id', hash_id) + except Exception as e: + app.logger.info(e) - # checks if status request is valid: - if status not in async_task.status_codes: - data = jsonify(error="Status code error",status=status) - app.logger.error(data) - return data, 400 + # checks if status request is valid: + if status not in async_task.status_codes: + data = jsonify(error="Status code error",status=status) + app.logger.error(data) + return data, 400 - # if no msg on request, default status msg: - if msg == None: - msg = async_task.status_codes[status] + # if no msg on request, default status msg: + if msg == None: + msg = async_task.status_codes[status] - # update task in memory - tasks[hash_id].set_status(status=status, data=msg) + # update task in memory + current_task.set_status(status=status, data=msg) - # getting service from task, to set exp_time according to the service - service = tasks[hash_id].get_internal_status()["service"] + # getting service from task, to set exp_time according to the service + service = current_task.get_internal_status()["service"] - global r - exp_time = STORAGE_TASK_EXP_TIME + + exp_time = STORAGE_TASK_EXP_TIME - if service == "compute": - exp_time = COMPUTE_TASK_EXP_TIME + if service == "compute": + exp_time = COMPUTE_TASK_EXP_TIME - #update task in persistence server - if not persistence.save_task(r,tasks[hash_id].task_id, task=tasks[hash_id].get_internal_status(), exp_time=exp_time): - app.logger.error("Error saving task") - app.logger.error(tasks[hash_id].get_internal_status()) - return jsonify(description="Couldn't update task"), 400 + #update task in persistence server + if not persistence.save_task(r,current_task.task_id, task=current_task.get_internal_status(), exp_time=exp_time): + app.logger.error("Error saving task") + app.logger.error(current_task.get_internal_status()) + return jsonify(description="Couldn't update task"), 400 - app.logger.info(f"New status for task {hash_id}: {status}") + app.logger.info(f"New status for task {hash_id}: {status}") + except KeyError: + data = jsonify(error=f"Task {hash_id} does not exist") + return data, 404 data = jsonify(success="task updated") return data, 200 @@ -347,23 +349,20 @@ def delete_task(id): hash_id = id # if username isn't taks owner, then deny access - try: - if not tasks[hash_id].is_owner(username): - return jsonify(description="Operation not permitted. Invalid task owner."), 403 - except KeyError: - data = jsonify(error=f"Task {id} does not exist") - return data, 404 - try: global r + current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id]) + - if not persistence.set_expire_task(r,tasks[hash_id].task_id,tasks[hash_id].get_internal_status(),secs=300): + if not persistence.set_expire_task(r,current_task.task_id,current_task.get_internal_status(),secs=300): return jsonify(error=f"Failed to delete task {hash_id} on persistence server"), 400 data = jsonify(success=f"Task {hash_id} deleted") - tasks[hash_id].set_status(status=async_task.INVALID, data="") return data, 204 + except KeyError: + data = jsonify(error=f"Task {id} does not exist") + return data, 404 except Exception as e: app.logger.error(f"Failed to delete task {hash_id} on persistence server") app.logger.error(f"Error: {type(e)}") @@ -403,30 +402,27 @@ def expire_task(id): # if username isn't taks owner, then deny access try: - if not tasks[hash_id].is_owner(username): - return jsonify(description="Operation not permitted. Invalid task owner."), 403 - except KeyError: - data = jsonify(error=f"Task {id} does not exist") - return data, 404 - + global r + current_task = async_task.AsyncTask.deserialise( persistence.get_user_tasks(r,username,task_list=[hash_id])[hash_id]) + - exp_time = STORAGE_TASK_EXP_TIME - if service == "compute": - exp_time = COMPUTE_TASK_EXP_TIME + exp_time = STORAGE_TASK_EXP_TIME - try: - global r + if service == "compute": + exp_time = COMPUTE_TASK_EXP_TIME - app.logger.info(f"Set expiration for task {tasks[hash_id].task_id} - {exp_time} secs") - if not persistence.set_expire_task(r,tasks[hash_id].task_id,tasks[hash_id].get_internal_status(),secs=exp_time): + app.logger.info(f"Set expiration for task {current_task.task_id} - {exp_time} secs") + if not persistence.set_expire_task(r,current_task.task_id,current_task.get_internal_status(),secs=exp_time): app.logger.warning(f"Task couldn't be marked as expired") return jsonify(error="Failed to set expiration time on task in persistence server"), 400 data = jsonify(success=f"Task expiration time set to {exp_time} secs.") return data, 200 - + except KeyError: + data = jsonify(error=f"Task {id} does not exist") + return data, 404 except Exception: data = jsonify(Error="Failed to set expiration time on task in persistence server") return data, 400