diff --git a/kubernetes.yaml b/kubernetes.yaml index 92bf89a..11853ba 100644 --- a/kubernetes.yaml +++ b/kubernetes.yaml @@ -51,6 +51,8 @@ spec: mountPath: /opt/app/scrapyd_k8s.conf readOnly: true subPath: scrapyd_k8s.conf + #- name: joblogs + # mountPath: /data # Enable if your spider repository needs a pull secret # - name: scrapyd-k8s-pull-secret # mountPath: /opt/app/.docker @@ -59,6 +61,9 @@ spec: - configMap: name: scrapyd-k8s-config name: scrapyd-k8s-config + #- name: joblogs + # persistentVolumeClaim: + # claimName: pv-claim # Enable if your spider repository needs a pull secret # - secret: # secretName: pull-secret @@ -80,7 +85,7 @@ data: repository = scrapyd_k8s.repository.Remote launcher = scrapyd_k8s.launcher.K8s - + namespace = default # This is an example spider that should work out of the box. @@ -106,6 +111,29 @@ metadata: app.kubernetes.io/name: spider-example stringData: FOO_API_KEY: "1234567890abcdef" +#--- +#apiVersion: v1 +#kind: PersistentVolume +#metadata: +# name: pv-volume +#spec: +# capacity: +# storage: 5Gi +# accessModes: +# - ReadWriteOnce +# hostPath: +# path: "/mnt/data" +#--- +#apiVersion: v1 +#kind: PersistentVolumeClaim +#metadata: +# name: pv-claim +#spec: +# accessModes: +# - ReadWriteOnce +# resources: +# requests: +# storage: 5Gi --- apiVersion: v1 kind: ConfigMap @@ -144,10 +172,13 @@ metadata: rules: - apiGroups: [""] resources: ["pods"] - verbs: ["get", "list"] + verbs: ["get", "list", "watch"] - apiGroups: [""] resources: ["pods/exec"] verbs: ["get"] + - apiGroups: [""] + resources: ["pods/log"] + verbs: ["get"] - apiGroups: ["batch"] resources: ["jobs"] verbs: ["get", "list", "create", "delete"] diff --git a/requirements.txt b/requirements.txt index 8885f50..d3d56dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ kubernetes>=27.2.0 # introduction of suspend in jobspec flask>=2.0.0 natsort>=8.0.0 Flask-BasicAuth>=0.2.0 +MarkupSafe>=2.1.5 +apache-libcloud>=3.8.0 \ No newline at end of file diff --git a/scrapyd_k8s.sample-k8s.conf b/scrapyd_k8s.sample-k8s.conf index fc411cb..44aa99e 100644 --- a/scrapyd_k8s.sample-k8s.conf +++ b/scrapyd_k8s.sample-k8s.conf @@ -57,3 +57,18 @@ requests_cpu = 80m requests_memory = 0.12G limits_cpu = 0.5 limits_memory = 0.2G + +#[joblogs] +# Choose storage provider +#storage_provider = s3 +#container_name = scrapyd-k8s-example-bucket + +# Choose number of unique logs, but at least 2 +#num_lines_to_check = 2 + +#[joblogs.storage.s3] +# Set your S3 key as ENV or below +#key = ${S3_KEY} +# Set your S3 secret key as ENV or below +#secret = ${S3_SECRET} +#region = eu-north-1 diff --git a/scrapyd_k8s/__main__.py b/scrapyd_k8s/__main__.py index 189977f..6620da8 100644 --- a/scrapyd_k8s/__main__.py +++ b/scrapyd_k8s/__main__.py @@ -1,3 +1,18 @@ -from .api import run +import logging +import sys +from .api import run, config +from .joblogs import joblogs_init -run() +def setup_logging(): + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(name)s [%(levelname)s]: %(message)s', + handlers=[ + logging.StreamHandler(sys.stdout) + ] + ) + +if __name__ == "__main__": + setup_logging() + joblogs_init(config) + run() diff --git a/scrapyd_k8s/api.py b/scrapyd_k8s/api.py index 24c7dd7..dc45740 100644 --- a/scrapyd_k8s/api.py +++ b/scrapyd_k8s/api.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 import uuid +import logging from flask import Flask, request, Response, jsonify from flask_basicauth import BasicAuth -from markupsafe import escape from natsort import natsort_keygen, ns from .config import Config @@ -13,7 +13,7 @@ repository = (config.repository_cls())(config) launcher = (config.launcher_cls())(config) scrapyd_config = config.scrapyd() - +logger = logging.getLogger(__name__) @app.get("/") def home(): @@ -155,5 +155,11 @@ def run(): if config_username is not None and config_password is not None: enable_authentication(app, config_username, config_password) + if config.joblogs() is not None: + launcher.enable_joblogs(config) + logger.info("Job logs handling enabled.") + else: + logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.") + # run server app.run(host=host, port=port) diff --git a/scrapyd_k8s/config.py b/scrapyd_k8s/config.py index ce67382..92194e9 100644 --- a/scrapyd_k8s/config.py +++ b/scrapyd_k8s/config.py @@ -21,6 +21,17 @@ def launcher_cls(self): pkg, cls = repo.rsplit('.', 1) return getattr(import_module(pkg), cls) + def joblogs(self): + if self._config.has_section('joblogs'): + return self._config['joblogs'] + else: + return None + + def joblogs_storage(self, provider): + if not self._config.has_section('joblogs.storage.%s' % provider): + return None + return self._config['joblogs.storage.%s' % provider] + def listprojects(self): return self._projects @@ -28,6 +39,9 @@ def project(self, project): if project in self._projects: return ProjectConfig(self._config, project, self._config['project.' + project]) + def namespace(self): + return self.scrapyd().get('namespace', 'default') + class ProjectConfig: def __init__(self, config, projectid, projectconfig): self._id = projectid diff --git a/scrapyd_k8s/joblogs/__init__.py b/scrapyd_k8s/joblogs/__init__.py new file mode 100644 index 0000000..483fb7b --- /dev/null +++ b/scrapyd_k8s/joblogs/__init__.py @@ -0,0 +1,25 @@ +import logging +from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler + +logger = logging.getLogger(__name__) + +def joblogs_init(config): + """ + Initializes job logs handling by starting the Kubernetes job log handler. + + Parameters + ---------- + config : Config + Configuration object containing settings for job logs and storage. + + Returns + ------- + None + """ + joblogs_config = config.joblogs() + if joblogs_config and joblogs_config.get('storage_provider') is not None: + log_handler = KubernetesJobLogHandler(config) + log_handler.start() + logger.info("Job logs handler started.") + else: + logger.warning("No storage provider configured; job logs will not be uploaded.") diff --git a/scrapyd_k8s/joblogs/log_handler_k8s.py b/scrapyd_k8s/joblogs/log_handler_k8s.py new file mode 100644 index 0000000..374b597 --- /dev/null +++ b/scrapyd_k8s/joblogs/log_handler_k8s.py @@ -0,0 +1,280 @@ +import os +import threading +import tempfile +import logging + +from kubernetes import client, watch +from scrapyd_k8s.object_storage import LibcloudObjectStorage + +logger = logging.getLogger(__name__) + +class KubernetesJobLogHandler: + """ + A class to handle Kubernetes job logs by watching pods, streaming logs, and uploading them to object storage. + + ... + + Attributes + ---------- + DEFAULT_BLOCK_SIZE : int + Default size (in bytes) of blocks to read when retrieving the last N lines from a file. + config : object + Configuration object containing settings for job logs and storage. + watcher_threads : dict + Dictionary to keep track of watcher threads for each pod. + pod_tmp_mapping : dict + Mapping of pod names to their temporary log file paths. + namespace : str + Kubernetes namespace to watch pods in. + num_lines_to_check : int + Number of lines to check for matching logs when resuming log streaming. + object_storage_provider : LibcloudObjectStorage + Instance of the object storage provider for uploading logs. + + Methods + ------- + start(): + Starts the pod watcher thread for job logs. + get_last_n_lines(file_path, num_lines): + Efficiently retrieves the last `num_lines` lines from a file. + concatenate_and_delete_files(main_file_path, temp_file_path, block_size=6144): + Concatenates a temporary file to the main log file and deletes the temporary file. + make_log_filename_for_job(job_name): + Generates a unique temporary file path for storing logs of a job. + stream_logs(job_name): + Streams logs from a Kubernetes pod and writes them to a file. + watch_pods(): + Watches Kubernetes pods and handles events such as starting log streaming or uploading logs. + """ + # The value was chosen to provide a balance between memory usage and the number of I/O operations + DEFAULT_BLOCK_SIZE = 6144 + + def __init__(self, config): + """ + Constructs all the necessary attributes for the KubernetesJobLogHandler object. + + Parameters + ---------- + config : object + Configuration object containing settings for job logs and storage. + """ + self.config = config + self.watcher_threads = {} + self.pod_tmp_mapping = {} + self.namespace = config.namespace() + self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0)) + self.object_storage_provider = None + + def start(self): + """ + Starts the pod watcher thread for job logs. + + Returns + ------- + None + """ + if self.config.joblogs() and self.config.joblogs().get('storage_provider') is not None: + pod_watcher_thread = threading.Thread( + target=self.watch_pods + ) + pod_watcher_thread.daemon = True + pod_watcher_thread.start() + logger.info("Started pod watcher thread for job logs.") + else: + logger.warning("No storage provider configured; job logs will not be uploaded.") + + def get_last_n_lines(self, file_path, num_lines): + """ + Efficiently retrieves the last `num_lines` lines from a file. + + Parameters + ---------- + file_path : str + Path to the file from which to read the last lines. + num_lines : int + Number of lines to retrieve from the end of the file. + + Returns + ------- + list of str + A list containing the last `num_lines` lines from the file. + """ + try: + with open(file_path, 'rb') as f: + # Move to the end of the file + f.seek(0, os.SEEK_END) + file_size = f.tell() + block_size = self.DEFAULT_BLOCK_SIZE + data = b'' + remaining_lines = num_lines + while remaining_lines > 0 and file_size > 0: + if file_size - block_size > 0: + f.seek(file_size - block_size) + block_data = f.read(block_size) + else: + f.seek(0) + block_data = f.read(file_size) + file_size -= block_size + data = block_data + data + lines_found = data.count(b'\n') + # If we've found enough lines, we can stop reading more blocks + if lines_found >= remaining_lines: + break + + # Decode the data and split into lines + lines = data.decode('utf-8', errors='replace').splitlines() + # Return the last `num_lines` + result = lines[-num_lines:] + return result + except FileNotFoundError: + logger.warning(f"File not found: {file_path}") + return [] + + def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_size=DEFAULT_BLOCK_SIZE): + """ + Concatenates a temporary file to the main log file and deletes the temporary file. + + Parameters + ---------- + main_file_path : str + Path to the main log file. + temp_file_path : str + Path to the temporary log file to be concatenated. + block_size : int, optional + Size of blocks to read at a time (default is 6144 bytes). + + Returns + ------- + None + """ + try: + with open(main_file_path, 'ab') as main_file, open(temp_file_path, 'rb') as temp_file: + while True: + block_data = temp_file.read(block_size) + if not block_data: + break + main_file.write(block_data) + os.remove(temp_file_path) + logger.debug(f"Concatenated '{temp_file_path}' into '{main_file_path}' and deleted temporary file.") + except (IOError, OSError) as e: + logger.error(f"Failed to concatenate and delete files for job: {e}") + + def make_log_filename_for_job(self, job_name): + """ + Generates a unique temporary file path for storing logs of a job. + + Parameters + ---------- + job_name : str + Name of the Kubernetes job or pod. + + Returns + ------- + str + Path to the temporary log file for the given job. + """ + if self.pod_tmp_mapping.get(job_name) is not None: + return self.pod_tmp_mapping[job_name] + temp_dir = tempfile.gettempdir() + app_temp_dir = os.path.join(temp_dir, 'job_logs') + os.makedirs(app_temp_dir, exist_ok=True) + fd, path = tempfile.mkstemp(prefix=f"{job_name}_logs_", suffix=".txt", dir=app_temp_dir) + os.close(fd) + self.pod_tmp_mapping[job_name] = path + return path + + def stream_logs(self, job_name): + """ + Streams logs from a Kubernetes pod and writes them to a file. + + Parameters + ---------- + job_name : str + Name of the Kubernetes pod to stream logs from. + + Returns + ------- + None + """ + log_lines_counter = 0 + v1 = client.CoreV1Api() + w = watch.Watch() + log_file_path = self.make_log_filename_for_job(job_name) + last_n_lines = self.get_last_n_lines(log_file_path, self.num_lines_to_check) + if len(last_n_lines) == 0: + logger.info(f"Log file '{log_file_path}' is empty or not found. Starting fresh logs for job '{job_name}'.") + + try: + with open(log_file_path, 'a') as log_file: + temp_dir = os.path.dirname(log_file_path) + with tempfile.NamedTemporaryFile(mode='w+', delete=False, dir=temp_dir, + prefix=f"{job_name}_logs_tmp_", suffix=".txt") as temp_logs: + temp_file_path = temp_logs.name + for line in w.stream( + v1.read_namespaced_pod_log, + name=job_name, + namespace=self.namespace, + follow=True, + _preload_content=False + ): + temp_logs.write(line + "\n") + temp_logs.flush() + + if log_lines_counter == len(last_n_lines): + log_file.write(line + "\n") + log_file.flush() + elif line == last_n_lines[log_lines_counter]: + log_lines_counter += 1 + else: + log_lines_counter = 0 + + if len(last_n_lines) > log_lines_counter: + self.concatenate_and_delete_files(log_file_path, temp_file_path) + else: + os.remove(temp_file_path) + logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_name}'.") + except Exception as e: + logger.exception(f"Error streaming logs for job '{job_name}': {e}") + + def watch_pods(self): + """ + Watches Kubernetes pods and handles events such as starting log streaming or uploading logs. + + Returns + ------- + None + """ + self.object_storage_provider = LibcloudObjectStorage(self.config) + w = watch.Watch() + v1 = client.CoreV1Api() + try: + for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace): + pod = event['object'] + if pod.metadata.labels.get("org.scrapy.job_id"): + job_id = pod.metadata.labels.get("org.scrapy.job_id") + pod_name = pod.metadata.name + thread_name = f"{self.namespace}_{pod_name}" + if pod.status.phase == 'Running': + if (thread_name in self.watcher_threads + and self.watcher_threads[thread_name] is not None + and self.watcher_threads[thread_name].is_alive()): + pass + else: + self.watcher_threads[thread_name] = threading.Thread( + target=self.stream_logs, + args=(pod_name,) + ) + self.watcher_threads[thread_name].start() + elif pod.status.phase in ['Succeeded', 'Failed']: + log_filename = self.pod_tmp_mapping.get(pod_name) + if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0: + if self.object_storage_provider.object_exists(log_filename): + logger.info(f"Log file for job '{job_id}' already exists in storage.") + else: + self.object_storage_provider.upload_file(log_filename) + else: + logger.info(f"Logfile not found for job '{job_id}'") + else: + logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'") + except Exception as e: + logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}") diff --git a/scrapyd_k8s/launcher/docker.py b/scrapyd_k8s/launcher/docker.py index 65a7375..5bc5805 100644 --- a/scrapyd_k8s/launcher/docker.py +++ b/scrapyd_k8s/launcher/docker.py @@ -1,10 +1,12 @@ import re -import os import socket +import logging import docker from ..utils import native_stringify_dict +logger = logging.getLogger(__name__) + class Docker: LABEL_PROJECT = 'org.scrapy.project' @@ -66,6 +68,9 @@ def cancel(self, project_id, job_id, signal): c.kill(signal='SIG' + signal) return prevstate + def enable_joblogs(self, config): + logger.warning("Job logs are not supported when using the Docker launcher.") + def _parse_job(self, c): return { 'id': c.labels.get(self.LABEL_JOB_ID), diff --git a/scrapyd_k8s/launcher/k8s.py b/scrapyd_k8s/launcher/k8s.py index 3c969b7..ba3451e 100644 --- a/scrapyd_k8s/launcher/k8s.py +++ b/scrapyd_k8s/launcher/k8s.py @@ -3,9 +3,9 @@ import kubernetes import kubernetes.stream from signal import Signals -from subprocess import check_output, CalledProcessError from ..utils import native_stringify_dict +from scrapyd_k8s.joblogs import joblogs_init class K8s: @@ -121,6 +121,9 @@ def cancel(self, project, job_id, signal): ) return prevstate + def enable_joblogs(self, config): + joblogs_init(config) + def _parse_job(self, job): return { 'id': job.metadata.labels.get(self.LABEL_JOB_ID), diff --git a/scrapyd_k8s/object_storage/__init__.py b/scrapyd_k8s/object_storage/__init__.py new file mode 100644 index 0000000..e14aa7a --- /dev/null +++ b/scrapyd_k8s/object_storage/__init__.py @@ -0,0 +1 @@ +from .libcloud_driver import LibcloudObjectStorage diff --git a/scrapyd_k8s/object_storage/libcloud_driver.py b/scrapyd_k8s/object_storage/libcloud_driver.py new file mode 100644 index 0000000..8cfd368 --- /dev/null +++ b/scrapyd_k8s/object_storage/libcloud_driver.py @@ -0,0 +1,185 @@ +import os +import logging +import re + +logger = logging.getLogger(__name__) + +from libcloud.storage.types import ( + ObjectError, + ContainerDoesNotExistError, + ObjectDoesNotExistError, + InvalidContainerNameError, +) +from libcloud.storage.providers import get_driver + +class LibcloudObjectStorage: + """ + A class to interact with cloud object storage using Apache Libcloud. + + ... + + Attributes + ---------- + driver : libcloud.storage.base.StorageDriver + An instance of the storage driver for the specified provider. + _storage_provider : str + The storage provider name (e.g., 's3' for Amazon S3). + _container_name : str + The name of the container (bucket) in the storage provider. + VARIABLE_PATTERN : re.Pattern + A compiled regular expression pattern for variable substitution. + + Methods + ------- + upload_file(local_path: str): + Uploads a file to the object storage container. + object_exists(local_path: str) -> bool: + Checks if an object exists in the object storage container. + """ + + VARIABLE_PATTERN = re.compile(r'\$\{([^}]+)}') + + def __init__(self, config): + """ + Constructs all the necessary attributes for the LibcloudObjectStorage object. + + Parameters + ---------- + config : Config + Configuration object containing settings for job logs and storage. + + Raises + ------ + ValueError + If the storage provider or container name is not defined in the configuration. + """ + self._storage_provider = config.joblogs().get('storage_provider') + if self._storage_provider is None: + logger.error("Storage provider is not defined in the configuration.") + raise ValueError("Storage provider is not defined") + + self._container_name = config.joblogs().get('container_name') + if self._container_name is None: + logger.error("Container name is not set in the configuration.") + raise ValueError("Container name is not set") + + args_envs = config.joblogs_storage(self._storage_provider) + args = {} + for arg, value in args_envs.items(): + value_str = str(value) + substituted_value = self._substitute_variables(value_str, arg) + logger.debug(f"Substituted value for '{arg}': {substituted_value}") + args[arg] = substituted_value + + driver_class = get_driver(self._storage_provider) + try: + self.driver = driver_class(**args) + logger.info(f"Initialized driver for storage provider '{self._storage_provider}'.") + except Exception as e: + logger.exception(f"Failed to initialize driver for storage provider '{self._storage_provider}': {e}") + raise + + def _substitute_variables(self, value, arg_name): + """ + Replaces placeholders in the configuration value with environment variable values. + + Parameters + ---------- + value : str + The configuration value possibly containing placeholders. + arg_name : str + The name of the argument being processed (for logging purposes). + + Returns + ------- + str + The value with placeholders replaced by environment variable values. + + Raises + ------ + ValueError + If the required environment variable is not set. + """ + def replace_var(match): + env_var = match.group(1) + env_value = os.getenv(env_var) + if env_value is not None: + env_value = env_value.strip().strip('"').strip("'") + return env_value + else: + logger.error(f"Environment variable '{env_var}' is not set for argument '{arg_name}'.") + raise ValueError(f"Environment variable '{env_var}' is not set for argument '{arg_name}'.") + + result = self.VARIABLE_PATTERN.sub(replace_var, value) + result = result.replace(r'\${', '${') + return result + + def upload_file(self, local_path): + """ + Uploads a file to the object storage container. + + Parameters + ---------- + local_path : str + The local file path of the file to be uploaded. + + Returns + ------- + None + + Logs + ---- + Logs information about the upload status or errors encountered. + """ + object_name = os.path.basename(local_path) + try: + container = self.driver.get_container(container_name=self._container_name) + self.driver.upload_object( + local_path, + container, + object_name, + extra=None, + verify_hash=False, + headers=None + ) + logger.info(f"Successfully uploaded '{object_name}' to container '{self._container_name}'.") + except (ObjectError, ContainerDoesNotExistError, InvalidContainerNameError) as e: + logger.exception(f"Error uploading the file '{object_name}': {e}") + except Exception as e: + logger.exception(f"An unexpected error occurred while uploading '{object_name}': {e}") + + def object_exists(self, local_path): + """ + Checks if an object exists in the object storage container. + + Parameters + ---------- + local_path : str + The local file path corresponding to the object name. + + Returns + ------- + bool + True if the object exists, False otherwise. + + Logs + ---- + Logs information about the existence check or errors encountered. + """ + object_name = os.path.basename(local_path) + try: + self.driver.get_object( + container_name=self._container_name, + object_name=object_name + ) + logger.debug(f"Object '{object_name}' exists in container '{self._container_name}'.") + return True + except ObjectDoesNotExistError: + logger.debug(f"Object '{object_name}' does not exist in container '{self._container_name}'.") + except ContainerDoesNotExistError: + logger.error(f"Container '{self._container_name}' does not exist in the cloud storage.") + except InvalidContainerNameError: + logger.error(f"Invalid container name '{self._container_name}'.") + except Exception as e: + logger.exception(f"An unexpected error occurred while checking for object '{object_name}': {e}") + return False