Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 scheduling observer extraction #38

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## About
This file provides you with the detailed description of parameters listed in the config file, and explaining why they are used
and when you are expected to provide or change them.

## Configuration file

* `http_port` - defaults to `6800` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#http-port))
* `bind_address` - defaults to `127.0.0.1` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address))
* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc))
* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote`
* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s`
* `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username))
* `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password))

The Docker and Kubernetes launchers have their own additional options.

## [scrapyd] section, reconnection_attempts, backoff_time, backoff_coefficient

### Context
The Kubernetes event watcher is used in the code as part of the joblogs feature and is also utilized for limiting the
number of jobs running in parallel on the cluster. Both features are not enabled by default and can be activated if you
choose to use them.

The event watcher establishes a connection to the Kubernetes API and receives a stream of events from it. However, the
nature of this long-lived connection is unstable; it can be interrupted by network issues, proxies configured to terminate
long-lived connections, and other factors. For this reason, a mechanism was implemented to re-establish the long-lived
connection to the Kubernetes API. To achieve this, three parameters were introduced: `reconnection_attempts`,
`backoff_time` and `backoff_coefficient`.

### What are these parameters about?
- `reconnection_attempts` - defines how many consecutive attempts will be made to reconnect if the connection fails;
- `backoff_time` and `backoff_coefficient` - are used to gradually slow down each subsequent attempt to establish a
connection with the Kubernetes API, preventing the API from becoming overloaded with requests. The `backoff_time` increases
exponentially and is calculated as `backoff_time *= self.backoff_coefficient`.

### When do I need to change it in the config file?
Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network
requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup.
11 changes: 1 addition & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,7 @@ Not supported, by design.
If you want to delete a project, remove it from the configuration file.

## Configuration file

* `http_port` - defaults to `6800` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#http-port))
* `bind_address` - defaults to `127.0.0.1` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address))
* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc))
* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote`
* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s`
* `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username))
* `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password))

The Docker and Kubernetes launchers have their own additional options.
To read in detail about the config file, please, navigate to the [Configuration Guide](CONFIG.md)

## License

Expand Down
2 changes: 2 additions & 0 deletions kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ data:
launcher = scrapyd_k8s.launcher.K8s

namespace = default

max_proc = 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is max_proc here 2, and in scrapyd_k8s.sample-k8s.conf 10?


# This is an example spider that should work out of the box.
# Adapt the spider config to your use-case.
Expand Down
3 changes: 3 additions & 0 deletions scrapyd_k8s.sample-k8s.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ namespace = default
# Optional pull secret, in case you have private spiders.
#pull_secret = ghcr-registry

# Maximum number of jobs running in parallel
max_proc = 10

# For each project, define a project section.
# This contains a repository that points to the remote container repository.
# An optional env_secret is the name of a secret with additional environment
Expand Down
4 changes: 1 addition & 3 deletions scrapyd_k8s/__main__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import sys
from .api import run, config
from .joblogs import joblogs_init
from .api import run

def setup_logging():
logging.basicConfig(
Expand All @@ -14,5 +13,4 @@ def setup_logging():

if __name__ == "__main__":
setup_logging()
joblogs_init(config)
run()
6 changes: 0 additions & 6 deletions scrapyd_k8s/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,5 @@ def run():
if config_username is not None and config_password is not None:
enable_authentication(app, config_username, config_password)

if config.joblogs() is not None:
launcher.enable_joblogs(config)
logger.info("Job logs handling enabled.")
else:
logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.")

# run server
app.run(host=host, port=port)
26 changes: 1 addition & 25 deletions scrapyd_k8s/joblogs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1 @@
import logging
from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler

logger = logging.getLogger(__name__)

def joblogs_init(config):
"""
Initializes job logs handling by starting the Kubernetes job log handler.

Parameters
----------
config : Config
Configuration object containing settings for job logs and storage.

Returns
-------
None
"""
joblogs_config = config.joblogs()
if joblogs_config and joblogs_config.get('storage_provider') is not None:
log_handler = KubernetesJobLogHandler(config)
log_handler.start()
logger.info("Job logs handler started.")
else:
logger.warning("No storage provider configured; job logs will not be uploaded.")
from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler
78 changes: 29 additions & 49 deletions scrapyd_k8s/joblogs/log_handler_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,7 @@ def __init__(self, config):
self.pod_tmp_mapping = {}
self.namespace = config.namespace()
self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0))
self.object_storage_provider = None

def start(self):
"""
Starts the pod watcher thread for job logs.

Returns
-------
None
"""
if self.config.joblogs() and self.config.joblogs().get('storage_provider') is not None:
pod_watcher_thread = threading.Thread(
target=self.watch_pods
)
pod_watcher_thread.daemon = True
pod_watcher_thread.start()
logger.info("Started pod watcher thread for job logs.")
else:
logger.warning("No storage provider configured; job logs will not be uploaded.")
self.object_storage_provider = LibcloudObjectStorage(self.config)

def get_last_n_lines(self, file_path, num_lines):
"""
Expand Down Expand Up @@ -236,7 +218,7 @@ def stream_logs(self, job_name):
except Exception as e:
logger.exception(f"Error streaming logs for job '{job_name}': {e}")

def watch_pods(self):
def handle_events(self, event):
"""
Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.

Expand All @@ -245,36 +227,34 @@ def watch_pods(self):
None
"""
self.object_storage_provider = LibcloudObjectStorage(self.config)
w = watch.Watch()
v1 = client.CoreV1Api()
try:
for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace):
pod = event['object']
if pod.metadata.labels.get("org.scrapy.job_id"):
job_id = pod.metadata.labels.get("org.scrapy.job_id")
pod_name = pod.metadata.name
thread_name = f"{self.namespace}_{pod_name}"
if pod.status.phase == 'Running':
if (thread_name in self.watcher_threads
and self.watcher_threads[thread_name] is not None
and self.watcher_threads[thread_name].is_alive()):
pass
else:
self.watcher_threads[thread_name] = threading.Thread(
target=self.stream_logs,
args=(pod_name,)
)
self.watcher_threads[thread_name].start()
elif pod.status.phase in ['Succeeded', 'Failed']:
log_filename = self.pod_tmp_mapping.get(pod_name)
if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0:
if self.object_storage_provider.object_exists(job_id):
logger.info(f"Log file for job '{job_id}' already exists in storage.")
else:
self.object_storage_provider.upload_file(log_filename)

pod = event['object']
if pod.metadata.labels.get("org.scrapy.job_id"):
job_id = pod.metadata.labels.get("org.scrapy.job_id")
pod_name = pod.metadata.name
thread_name = f"{self.namespace}_{pod_name}"
if pod.status.phase == 'Running':
if (thread_name in self.watcher_threads
and self.watcher_threads[thread_name] is not None
and self.watcher_threads[thread_name].is_alive()):
pass
else:
self.watcher_threads[thread_name] = threading.Thread(
target=self.stream_logs,
args=(pod_name,)
)
self.watcher_threads[thread_name].start()
elif pod.status.phase in ['Succeeded', 'Failed']:
log_filename = self.pod_tmp_mapping.get(pod_name)
if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0:
if self.object_storage_provider.object_exists(job_id):
logger.info(f"Log file for job '{job_id}' already exists in storage.")
else:
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
self.object_storage_provider.upload_file(log_filename)
else:
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
except Exception as e:
logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}")
151 changes: 151 additions & 0 deletions scrapyd_k8s/k8s_resource_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import threading
import logging
import time
from kubernetes import client, watch
from typing import Callable, List
import urllib3

logger = logging.getLogger(__name__)

class ResourceWatcher:
"""
Watches Kubernetes pod events and notifies subscribers about relevant events.

Attributes
----------
namespace : str
Kubernetes namespace to watch pods in.
subscribers : List[Callable]
List of subscriber callback functions to notify on events.
"""

def __init__(self, namespace, config):
"""
Initializes the ResourceWatcher.

Parameters
----------
namespace : str
Kubernetes namespace to watch pods in.
"""
self.namespace = namespace
self.reconnection_attempts = int(config.scrapyd().get('reconnection_attempts', 5))
self.backoff_time = int(config.scrapyd().get('backoff_time', 5))
self.backoff_coefficient = int(config.scrapyd().get('backoff_coefficient', 2))
self.subscribers: List[Callable] = []
self._stop_event = threading.Event()
self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True)
self.watcher_thread.start()
logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.")

def subscribe(self, callback: Callable):
"""
Adds a subscriber callback to be notified on events.

Parameters
----------
callback : Callable
A function to call when an event is received.
"""
if callback not in self.subscribers:
self.subscribers.append(callback)
logger.debug(f"Subscriber {callback.__name__} added.")

def unsubscribe(self, callback: Callable):
"""
Removes a subscriber callback.

Parameters
----------
callback : Callable
The subscriber function to remove.
"""
if callback in self.subscribers:
self.subscribers.remove(callback)
logger.debug(f"Subscriber {callback.__name__} removed.")

def notify_subscribers(self, event: dict):
"""
Notifies all subscribers about an event.

Parameters
----------
event : dict
The Kubernetes event data.
"""
for subscriber in self.subscribers:
try:
subscriber(event)
except Exception as e:
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")

def watch_pods(self):
"""
Watches Kubernetes pod events and notifies subscribers.
Runs in a separate thread.
"""
v1 = client.CoreV1Api()
w = watch.Watch()
resource_version = None

logger.info(f"Started watching pods in namespace '{self.namespace}'.")
backoff_time = self.backoff_time
reconnection_attempts = self.reconnection_attempts
while not self._stop_event.is_set() and reconnection_attempts > 0:
try:
kwargs = {
'namespace': self.namespace,
'timeout_seconds': 0,
}
if resource_version:
kwargs['resource_version'] = resource_version
first_event = True
for event in w.stream(v1.list_namespaced_pod, **kwargs):
if first_event:
# Reset reconnection attempts and backoff time upon successful reconnection
reconnection_attempts = self.reconnection_attempts
backoff_time = self.backoff_time
first_event = False # Ensure this only happens once per connection
pod_name = event['object'].metadata.name
resource_version = event['object'].metadata.resource_version
event_type = event['type']
logger.debug(f"Received event: {event_type} for pod: {pod_name}")
self.notify_subscribers(event)
except (urllib3.exceptions.ProtocolError,
urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.ConnectionError) as e:
reconnection_attempts -= 1
logger.exception(f"Encountered network error: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient
except client.ApiException as e:
# Resource version is too old and cannot be accessed anymore
if e.status == 410:
logger.error("Received 410 Gone error, resetting resource_version and restarting watch.")
resource_version = None
continue
else:
reconnection_attempts -= 1
logger.exception(f"Encountered ApiException: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient
except StopIteration:
logger.info("Watch stream ended, restarting watch.")
continue
except Exception as e:
reconnection_attempts -= 1
logger.exception(f"Watcher encountered exception: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient


def stop(self):
"""
Stops the watcher thread gracefully.
"""
self._stop_event.set()
self.watcher_thread.join()
logger.info(f"ResourceWatcher thread stopped for namespace '{self.namespace}'.")
Loading
Loading