Skip to content

Commit

Permalink
Improve watching resources (PR #38)
Browse files Browse the repository at this point in the history
* extracted resource watching logic into a separate class; implemented logic for observer in a RecourceWatcher class; added method to stop a thread gracefully

* finish the observer logic extraction; merge changes from main; add resource watcher instance to enable_joblogs to subscribe to the event watcher if the log feature is configured; delete logic about event watcher from main; pass container for list objects function instead of container name; remove start methon from log handler class; modify joblogs init to subscribe to event watcher

* add number of retry attempts and exponential backoff time to the reconnect loop for event watcher; make number of reconnect attempts, backoff time and a coefficient for exponential growth configurable via config; add backoff_time, reconnection_attempts and backoff_coefficient as attributes to the resource watcher init; add resource_version as a param to w.stream so a failed stream can read from the last resource it was able to catch; add urllib3.exceptions.ProtocolError and handle reconnection after some exponential backoff time to avoid api flooding; add config as a param for init for resource watcher; modify config in kubernetes.yaml and k8s config to contain add backoff_time, reconnection_attempts and backoff_coefficient

* add logic to reset number of reconnect attempts and backoff time when connection to the k8s was achieved so only sequential failures detected; add exception handling to watch_pods to handle failure in urllib3, when source version is old and not available anymore, and when stream is ended; remove k8s resource watcher initialization from run function in api.py and move it to k8s.py launcher as _init_resource_watcher; refactor existing logic from joblogs/__init__.py to keep it in _init_resource_watcher and enable_joblogs in k8s launcher

* added a CONFIG.md file with detailed explanations about parameters used to re-establish connection to the Kubernetes wather

* move section about config file from README.md to CONFIG.md; add a link to the CONFIG.md in the README.md; remove variables for reconnection_attempts, backoff_time and backoff_coefficient fron the sample config since default values are provided in the code.
  • Loading branch information
vlerkin authored Nov 19, 2024
1 parent 33bae3e commit f36de79
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 96 deletions.
38 changes: 38 additions & 0 deletions CONFIG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## About
This file provides you with the detailed description of parameters listed in the config file, and explaining why they are used
and when you are expected to provide or change them.

## Configuration file

* `http_port` - defaults to `6800` ([](https://scrapyd.readthedocs.io/en/latest/config.html#http-port))
* `bind_address` - defaults to `127.0.0.1` ([](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address))
* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc))
* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote`
* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s`
* `username` - Set this and `password` to enable basic authentication ([](https://scrapyd.readthedocs.io/en/latest/config.html#username))
* `password` - Set this and `username` to enable basic authentication ([](https://scrapyd.readthedocs.io/en/latest/config.html#password))

The Docker and Kubernetes launchers have their own additional options.

## [scrapyd] section, reconnection_attempts, backoff_time, backoff_coefficient

### Context
The Kubernetes event watcher is used in the code as part of the joblogs feature and is also utilized for limiting the
number of jobs running in parallel on the cluster. Both features are not enabled by default and can be activated if you
choose to use them.

The event watcher establishes a connection to the Kubernetes API and receives a stream of events from it. However, the
nature of this long-lived connection is unstable; it can be interrupted by network issues, proxies configured to terminate
long-lived connections, and other factors. For this reason, a mechanism was implemented to re-establish the long-lived
connection to the Kubernetes API. To achieve this, three parameters were introduced: `reconnection_attempts`,
`backoff_time` and `backoff_coefficient`.

### What are these parameters about?
- `reconnection_attempts` - defines how many consecutive attempts will be made to reconnect if the connection fails;
- `backoff_time` and `backoff_coefficient` - are used to gradually slow down each subsequent attempt to establish a
connection with the Kubernetes API, preventing the API from becoming overloaded with requests. The `backoff_time` increases
exponentially and is calculated as `backoff_time *= self.backoff_coefficient`.

### When do I need to change it in the config file?
Default values for these parameters are provided in the code and are tuned to an "average" cluster setting. If your network
requirements or other conditions are unusual, you may need to adjust these values to better suit your specific setup.
11 changes: 1 addition & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,7 @@ Not supported, by design.
If you want to delete a project, remove it from the configuration file.
## Configuration file
* `http_port` - defaults to `6800` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#http-port))
* `bind_address` - defaults to `127.0.0.1` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#bind-address))
* `max_proc` - _(implementation pending)_, if unset or `0` it will use the number of nodes in the cluster, defaults to `0` ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#max-proc))
* `repository` - Python class for accessing the image repository, defaults to `scrapyd_k8s.repository.Remote`
* `launcher` - Python class for managing jobs on the cluster, defaults to `scrapyd_k8s.launcher.K8s`
* `username` - Set this and `password` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#username))
* `password` - Set this and `username` to enable basic authentication ([➽](https://scrapyd.readthedocs.io/en/latest/config.html#password))
The Docker and Kubernetes launchers have their own additional options.
To read in detail about the config file, please, navigate to the [Configuration Guide](CONFIG.md)
## License
Expand Down
2 changes: 2 additions & 0 deletions kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ data:
launcher = scrapyd_k8s.launcher.K8s
namespace = default
max_proc = 2
# This is an example spider that should work out of the box.
# Adapt the spider config to your use-case.
Expand Down
3 changes: 3 additions & 0 deletions scrapyd_k8s.sample-k8s.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ namespace = default
# Optional pull secret, in case you have private spiders.
#pull_secret = ghcr-registry

# Maximum number of jobs running in parallel
max_proc = 10

# For each project, define a project section.
# This contains a repository that points to the remote container repository.
# An optional env_secret is the name of a secret with additional environment
Expand Down
4 changes: 1 addition & 3 deletions scrapyd_k8s/__main__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import sys
from .api import run, config
from .joblogs import joblogs_init
from .api import run

def setup_logging():
logging.basicConfig(
Expand All @@ -14,5 +13,4 @@ def setup_logging():

if __name__ == "__main__":
setup_logging()
joblogs_init(config)
run()
6 changes: 0 additions & 6 deletions scrapyd_k8s/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,5 @@ def run():
if config_username is not None and config_password is not None:
enable_authentication(app, config_username, config_password)

if config.joblogs() is not None:
launcher.enable_joblogs(config)
logger.info("Job logs handling enabled.")
else:
logger.debug("Job logs handling not enabled; 'joblogs' configuration section is missing.")

# run server
app.run(host=host, port=port)
26 changes: 1 addition & 25 deletions scrapyd_k8s/joblogs/__init__.py
Original file line number Diff line number Diff line change
@@ -1,25 +1 @@
import logging
from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler

logger = logging.getLogger(__name__)

def joblogs_init(config):
"""
Initializes job logs handling by starting the Kubernetes job log handler.
Parameters
----------
config : Config
Configuration object containing settings for job logs and storage.
Returns
-------
None
"""
joblogs_config = config.joblogs()
if joblogs_config and joblogs_config.get('storage_provider') is not None:
log_handler = KubernetesJobLogHandler(config)
log_handler.start()
logger.info("Job logs handler started.")
else:
logger.warning("No storage provider configured; job logs will not be uploaded.")
from scrapyd_k8s.joblogs.log_handler_k8s import KubernetesJobLogHandler
78 changes: 29 additions & 49 deletions scrapyd_k8s/joblogs/log_handler_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,7 @@ def __init__(self, config):
self.pod_tmp_mapping = {}
self.namespace = config.namespace()
self.num_lines_to_check = int(config.joblogs().get('num_lines_to_check', 0))
self.object_storage_provider = None

def start(self):
"""
Starts the pod watcher thread for job logs.
Returns
-------
None
"""
if self.config.joblogs() and self.config.joblogs().get('storage_provider') is not None:
pod_watcher_thread = threading.Thread(
target=self.watch_pods
)
pod_watcher_thread.daemon = True
pod_watcher_thread.start()
logger.info("Started pod watcher thread for job logs.")
else:
logger.warning("No storage provider configured; job logs will not be uploaded.")
self.object_storage_provider = LibcloudObjectStorage(self.config)

def get_last_n_lines(self, file_path, num_lines):
"""
Expand Down Expand Up @@ -236,7 +218,7 @@ def stream_logs(self, job_name):
except Exception as e:
logger.exception(f"Error streaming logs for job '{job_name}': {e}")

def watch_pods(self):
def handle_events(self, event):
"""
Watches Kubernetes pods and handles events such as starting log streaming or uploading logs.
Expand All @@ -245,36 +227,34 @@ def watch_pods(self):
None
"""
self.object_storage_provider = LibcloudObjectStorage(self.config)
w = watch.Watch()
v1 = client.CoreV1Api()
try:
for event in w.stream(v1.list_namespaced_pod, namespace=self.namespace):
pod = event['object']
if pod.metadata.labels.get("org.scrapy.job_id"):
job_id = pod.metadata.labels.get("org.scrapy.job_id")
pod_name = pod.metadata.name
thread_name = f"{self.namespace}_{pod_name}"
if pod.status.phase == 'Running':
if (thread_name in self.watcher_threads
and self.watcher_threads[thread_name] is not None
and self.watcher_threads[thread_name].is_alive()):
pass
else:
self.watcher_threads[thread_name] = threading.Thread(
target=self.stream_logs,
args=(pod_name,)
)
self.watcher_threads[thread_name].start()
elif pod.status.phase in ['Succeeded', 'Failed']:
log_filename = self.pod_tmp_mapping.get(pod_name)
if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0:
if self.object_storage_provider.object_exists(job_id):
logger.info(f"Log file for job '{job_id}' already exists in storage.")
else:
self.object_storage_provider.upload_file(log_filename)

pod = event['object']
if pod.metadata.labels.get("org.scrapy.job_id"):
job_id = pod.metadata.labels.get("org.scrapy.job_id")
pod_name = pod.metadata.name
thread_name = f"{self.namespace}_{pod_name}"
if pod.status.phase == 'Running':
if (thread_name in self.watcher_threads
and self.watcher_threads[thread_name] is not None
and self.watcher_threads[thread_name].is_alive()):
pass
else:
self.watcher_threads[thread_name] = threading.Thread(
target=self.stream_logs,
args=(pod_name,)
)
self.watcher_threads[thread_name].start()
elif pod.status.phase in ['Succeeded', 'Failed']:
log_filename = self.pod_tmp_mapping.get(pod_name)
if log_filename is not None and os.path.isfile(log_filename) and os.path.getsize(log_filename) > 0:
if self.object_storage_provider.object_exists(job_id):
logger.info(f"Log file for job '{job_id}' already exists in storage.")
else:
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
self.object_storage_provider.upload_file(log_filename)
else:
logger.info(f"Logfile not found for job '{job_id}'")
else:
logger.debug(f"Other pod event type '{event['type']}' for pod '{pod.metadata.name}' - Phase: '{pod.status.phase}'")
except Exception as e:
logger.exception(f"Error watching pods in namespace '{self.namespace}': {e}")
151 changes: 151 additions & 0 deletions scrapyd_k8s/k8s_resource_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import threading
import logging
import time
from kubernetes import client, watch
from typing import Callable, List
import urllib3

logger = logging.getLogger(__name__)

class ResourceWatcher:
"""
Watches Kubernetes pod events and notifies subscribers about relevant events.
Attributes
----------
namespace : str
Kubernetes namespace to watch pods in.
subscribers : List[Callable]
List of subscriber callback functions to notify on events.
"""

def __init__(self, namespace, config):
"""
Initializes the ResourceWatcher.
Parameters
----------
namespace : str
Kubernetes namespace to watch pods in.
"""
self.namespace = namespace
self.reconnection_attempts = int(config.scrapyd().get('reconnection_attempts', 5))
self.backoff_time = int(config.scrapyd().get('backoff_time', 5))
self.backoff_coefficient = int(config.scrapyd().get('backoff_coefficient', 2))
self.subscribers: List[Callable] = []
self._stop_event = threading.Event()
self.watcher_thread = threading.Thread(target=self.watch_pods, daemon=True)
self.watcher_thread.start()
logger.info(f"ResourceWatcher thread started for namespace '{self.namespace}'.")

def subscribe(self, callback: Callable):
"""
Adds a subscriber callback to be notified on events.
Parameters
----------
callback : Callable
A function to call when an event is received.
"""
if callback not in self.subscribers:
self.subscribers.append(callback)
logger.debug(f"Subscriber {callback.__name__} added.")

def unsubscribe(self, callback: Callable):
"""
Removes a subscriber callback.
Parameters
----------
callback : Callable
The subscriber function to remove.
"""
if callback in self.subscribers:
self.subscribers.remove(callback)
logger.debug(f"Subscriber {callback.__name__} removed.")

def notify_subscribers(self, event: dict):
"""
Notifies all subscribers about an event.
Parameters
----------
event : dict
The Kubernetes event data.
"""
for subscriber in self.subscribers:
try:
subscriber(event)
except Exception as e:
logger.exception(f"Error notifying subscriber {subscriber.__name__}: {e}")

def watch_pods(self):
"""
Watches Kubernetes pod events and notifies subscribers.
Runs in a separate thread.
"""
v1 = client.CoreV1Api()
w = watch.Watch()
resource_version = None

logger.info(f"Started watching pods in namespace '{self.namespace}'.")
backoff_time = self.backoff_time
reconnection_attempts = self.reconnection_attempts
while not self._stop_event.is_set() and reconnection_attempts > 0:
try:
kwargs = {
'namespace': self.namespace,
'timeout_seconds': 0,
}
if resource_version:
kwargs['resource_version'] = resource_version
first_event = True
for event in w.stream(v1.list_namespaced_pod, **kwargs):
if first_event:
# Reset reconnection attempts and backoff time upon successful reconnection
reconnection_attempts = self.reconnection_attempts
backoff_time = self.backoff_time
first_event = False # Ensure this only happens once per connection
pod_name = event['object'].metadata.name
resource_version = event['object'].metadata.resource_version
event_type = event['type']
logger.debug(f"Received event: {event_type} for pod: {pod_name}")
self.notify_subscribers(event)
except (urllib3.exceptions.ProtocolError,
urllib3.exceptions.ReadTimeoutError,
urllib3.exceptions.ConnectionError) as e:
reconnection_attempts -= 1
logger.exception(f"Encountered network error: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient
except client.ApiException as e:
# Resource version is too old and cannot be accessed anymore
if e.status == 410:
logger.error("Received 410 Gone error, resetting resource_version and restarting watch.")
resource_version = None
continue
else:
reconnection_attempts -= 1
logger.exception(f"Encountered ApiException: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient
except StopIteration:
logger.info("Watch stream ended, restarting watch.")
continue
except Exception as e:
reconnection_attempts -= 1
logger.exception(f"Watcher encountered exception: {e}")
logger.info(f"Retrying to watch pods after {backoff_time} seconds...")
time.sleep(backoff_time)
backoff_time *= self.backoff_coefficient


def stop(self):
"""
Stops the watcher thread gracefully.
"""
self._stop_event.set()
self.watcher_thread.join()
logger.info(f"ResourceWatcher thread stopped for namespace '{self.namespace}'.")
Loading

0 comments on commit f36de79

Please sign in to comment.