Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 scheduling #36

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4c0a82d
extracted resource watching logic into a separate class; implemented …
vlerkin Nov 1, 2024
da8254f
add kubernetes scheduler class that subscribes to the event observer …
vlerkin Nov 6, 2024
9dae80d
Merge branch 'main' into 6-scheduling
vlerkin Nov 6, 2024
7f66895
finish the observer logic extraction; merge changes from main; add re…
vlerkin Nov 7, 2024
963f279
change enable_joblogs method signature in docker
vlerkin Nov 7, 2024
7dc24bc
refactor api.py and launcher/k8s.py to keep api.py launcher agnostic
vlerkin Nov 11, 2024
98f5559
add implementation for docker to check the number of running containe…
vlerkin Nov 12, 2024
01b120b
remove update from RBAC because we perform patching of the resource a…
vlerkin Nov 12, 2024
08f1785
add number of retry attempts and exponential backoff time to the reco…
vlerkin Nov 14, 2024
2d5e6bf
create a helper finction _filter_jobs that accepts a filter function …
vlerkin Nov 20, 2024
36217d9
change the signature of the filtering method to also accept a parse f…
vlerkin Nov 20, 2024
2d843c1
add number of retry attempts and exponential backoff time to the reco…
vlerkin Nov 14, 2024
923915f
add logic to reset number of reconnect attempts and backoff time when…
vlerkin Nov 15, 2024
6a64485
added a CONFIG.md file with detailed explanations about parameters us…
vlerkin Nov 19, 2024
3604267
move section about config file from README.md to CONFIG.md; add a lin…
vlerkin Nov 19, 2024
6394633
Merge branch 'main' into 6-scheduling
vlerkin Nov 21, 2024
e3197f2
make number of job limitation optional by separating k8s_scheduler in…
vlerkin Nov 22, 2024
e23bfce
implement the feature to limit running jobs to be optional for Docker…
vlerkin Nov 22, 2024
17d14b6
add exceptions to the KubernetesScheduler class, for the init method,…
vlerkin Nov 25, 2024
a30c531
Merge remote-tracking branch 'upstream/main' into 6-scheduling
vlerkin Nov 25, 2024
8d8032c
add custom exceptions to the class for log handling; add exceptions t…
vlerkin Nov 25, 2024
331090f
modify method that fetches nez job id to be unsuspended to add creati…
vlerkin Nov 27, 2024
16a8276
make the proper test structure according to the standards; add tests …
vlerkin Nov 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion CONFIG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,43 @@ stick to [scrapyd's configuration](https://scrapyd.readthedocs.io/en/latest/conf

The Docker and Kubernetes launchers have their own additional options.

## [scrapyd] section

### reconnection_attempts, backoff_time, backoff_coefficient

#### Context
The Kubernetes event watcher is used in the code as part of the joblogs feature and is also utilized for limiting the
number of jobs running in parallel on the cluster. Both features are not enabled by default and can be activated if you
choose to use them.

The event watcher establishes a connection to the Kubernetes API and receives a stream of events from it. However, the
nature of this long-lived connection is unstable; it can be interrupted by network issues, proxies configured to terminate
long-lived connections, and other factors. For this reason, a mechanism was implemented to re-establish the long-lived
connection to the Kubernetes API. To achieve this, three parameters were introduced: `reconnection_attempts`,
`backoff_time` and `backoff_coefficient`.

#### What are these parameters about?
- `reconnection_attempts` - defines how many consecutive attempts will be made to reconnect if the connection fails;
- `backoff_time` and `backoff_coefficient` - are used to gradually slow down each subsequent attempt to establish a
connection with the Kubernetes API, preventing the API from becoming overloaded with requests. The `backoff_time` increases
exponentially and is calculated as `backoff_time *= self.backoff_coefficient`.

#### When do I need to change it in the config file?
Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network
requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup.

### max_proc

By default `max_proc` is not present in the config file but you can add it under the scrapyd section to limit the number
of jobs that run in parallel, while other scheduled job wait for their turn to run whenever currently running job(s)
complete the run, or are cancelled, or are failed. This feature is available for both Kubernetes and Docker.

For example, you have a cluster with 0 running jobs, you schedule 20 jobs and provide `max_proc = 5` in the scrapyd section.
Then 5 jobs start running immediately and 15 others are suspended. Whenever at least of the jobs finish running, the new
job is added to run. The order in which jobs were scheduled is preserved.

`max_proc` - a parameter you can set to limit the number of running jobs at a given moment

## project sections

Each project you want to be able to run, gets its own section, prefixed with `project.`. For example,
Expand Down Expand Up @@ -69,4 +106,3 @@ connection to the Kubernetes API. To achieve this, three parameters were introdu

Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network
requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup.

4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,10 @@ If you want to delete a version, remove the corresponding Docker image from the
Not supported, by design.
If you want to delete a project, remove it from the configuration file.

## Configuration

## Configuration file
This is done in the file `scrapyd_k8s.conf`, the options are explained in the [Configuration Guide](CONFIG.md).


## License

This software is distributed under the [MIT license](LICENSE.md).
4 changes: 1 addition & 3 deletions kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ data:
launcher = scrapyd_k8s.launcher.K8s

namespace = default

max_proc = 2

# This is an example spider that should work out of the box.
# Adapt the spider config to your use-case.
Expand Down Expand Up @@ -183,7 +181,7 @@ rules:
verbs: ["get"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: ["get", "list", "create", "delete"]
verbs: ["get", "list", "create", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
Expand Down
11 changes: 9 additions & 2 deletions scrapyd_k8s.sample-k8s.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,15 @@ namespace = default
# Optional pull secret, in case you have private spiders.
#pull_secret = ghcr-registry

# Maximum number of jobs running in parallel
max_proc = 10
# Number of attempts to reconnect with k8s API to watch events, default is 5
reconnection_attempts = 5

# Minimum time in seconds to wait before reconnecting to k8s API to watch events, default is 5
backoff_time = 5

# Coefficient that is multiplied by backoff_time to provide exponential backoff to prevent k8s API from being overwhelmed
# default is 2, every reconnection attempt will take backoff_time*backoff_coefficient
backoff_coefficient = 2

# For each project, define a project section.
# This contains a repository that points to the remote container repository.
Expand Down
51 changes: 40 additions & 11 deletions scrapyd_k8s/joblogs/log_handler_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,20 @@

logger = logging.getLogger(__name__)

# Custom Exceptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move these custom exceptions to a separate file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! If I make a file with custom exceptions within joblogs module, is it fine? I just feel like making a general file for custom exceptions in the root directory for this optional feature is not a good idea. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine for sure for now. If we add more exceptions later we could reconsider to make a more central one then.

class KubernetesJobLogHandlerError(Exception):
"""Base exception class for KubernetesJobLogHandler."""

class LogUploadError(KubernetesJobLogHandlerError):
"""Raised when uploading logs to object storage fails."""

class PodStreamingError(KubernetesJobLogHandlerError):
"""Raised when streaming logs from a pod fails."""

class KubernetesJobLogHandler:
"""
A class to handle Kubernetes job logs by watching pods, streaming logs, and uploading them to object storage.

...

Attributes
----------
DEFAULT_BLOCK_SIZE : int
Expand All @@ -33,8 +41,6 @@ class KubernetesJobLogHandler:

Methods
-------
start():
Starts the pod watcher thread for job logs.
get_last_n_lines(file_path, num_lines):
Efficiently retrieves the last `num_lines` lines from a file.
concatenate_and_delete_files(main_file_path, temp_file_path, block_size=6144):
Expand All @@ -43,8 +49,8 @@ class KubernetesJobLogHandler:
Generates a unique temporary file path for storing logs of a job.
stream_logs(job_name):
Streams logs from a Kubernetes pod and writes them to a file.
watch_pods():
Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.
handle_events(event):
Watches Kubernetes pod events and handles actions such as starting log streaming or uploading logs.
"""
# The value was chosen to provide a balance between memory usage and the number of I/O operations
DEFAULT_BLOCK_SIZE = 6144
Expand Down Expand Up @@ -140,6 +146,7 @@ def concatenate_and_delete_files(self, main_file_path, temp_file_path, block_siz
logger.debug(f"Concatenated '{temp_file_path}' into '{main_file_path}' and deleted temporary file.")
except (IOError, OSError) as e:
logger.error(f"Failed to concatenate and delete files for job: {e}")
raise KubernetesJobLogHandlerError(f"Failed to concatenate and delete files: {e}") from e

def make_log_filename_for_job(self, job_name):
"""
Expand Down Expand Up @@ -177,6 +184,11 @@ def stream_logs(self, job_name):
Returns
-------
None

Raises
------
PodStreamingError
If an I/O error occurs while streaming logs or processing them.
"""
log_lines_counter = 0
v1 = client.CoreV1Api()
Expand Down Expand Up @@ -215,16 +227,32 @@ def stream_logs(self, job_name):
else:
os.remove(temp_file_path)
logger.info(f"Removed temporary file '{temp_file_path}' after streaming logs for job '{job_name}'.")
except Exception as e:
logger.exception(f"Error streaming logs for job '{job_name}': {e}")
except (IOError, OSError) as e:
logger.error(f"I/O error while streaming logs for job '{job_name}': {e}")
raise PodStreamingError(f"I/O error while streaming logs for job '{job_name}': {e}") from e
except KubernetesJobLogHandlerError as e:
logger.error(f"Error processing logs for job '{job_name}': {e}")
raise PodStreamingError(f"Error processing logs for job '{job_name}': {e}") from e
finally:
w.stop()

def handle_events(self, event):
"""
Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.
Watches Kubernetes pod events and handles actions such as starting log streaming or uploading logs.

Parameters
----------
event : dict
The event dictionary containing information about the pod event.

Returns
-------
None

Raises
------
KubernetesJobLogHandlerError
If an error occurs while handling pod events.
"""
try:

Expand Down Expand Up @@ -255,5 +283,6 @@ def handle_events(self, event):
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
except Exception as e:
logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}")
except KubernetesJobLogHandlerError as e:
logger.error(f"Handled error in handle_events: {e}")
raise e
1 change: 1 addition & 0 deletions scrapyd_k8s/k8s_scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from scrapyd_k8s.k8s_scheduler.k8s_scheduler import KubernetesScheduler
Loading