Skip to content

Commit

Permalink
feat: add background processing jobs (#5432)
Browse files Browse the repository at this point in the history
# Description

This PR add the following changes:

- [x] Add `rq` to help us execute background jobs.
- [x] Add a background job to update all records for a dataset when the
dataset distribution strategy is updated.
- [x] Change HuggingFace Dockerfile to install Redis and run `rq`
workers inside honcho Procfile.
- [x] Add documentation about new `ARGILLA_REDIS_URL` environment
variable.
- [x] Add ping to Redis so Argilla server is not started if Redis is not
ready.
- [x] Change Argilla docker compose file to include a container with
Redis and rq workers.
- [x] Update Argilla server `README.md` file adding Redis as dependency
to install.
- [x] Add documentation about Redis being a new Argilla server
dependency.
- [x] Add `BACKGROUND_NUM_WORKERS` environment variable to specify the
number of workers in the HF Space container.
- [ ] ~~Modify `Dockerfile` template on HF to include the environment
variable #5443
```
# (since: v2.2.0) Uncomment the next line to specify the number of background job workers to run (default: 2).
# ENV BACKGROUND_NUM_WORKERS=2
```
- [ ] Remove some `TODO` sections before merging.
- [ ] Review K8s documentation (maybe delete it?).
- [ ] If we want to persist Redis data on HF Spaces we can change our
`Procfile` Redis process to the following:
```
redis: /usr/bin/redis-server --dbfilename argilla-redis.rdb --dir ${ARGILLA_HOME_PATH}
```
- [ ] <del>Allow tests job workers synchronously (with pytest)</del>
It's not working due to asyncio stuff (running an asynchronous loop
inside another one, more info here:
rq/rq#1986).

Closes #5431

# Benchmarks

The following timings were obtained updating the distribution strategy
of a dataset with 100 and 10.000 records, using a basic and an upgraded
CPU on HF Spaces, with and without persistent storage and measuring how
much time the background job takes to complete:

CPU basic: 2 vCPU, 16GB RAM
CPU upgrade: 8 vCPU, 32GB RAM

* CPU basic (with persistent storage):
  * 100 records dataset: ~8 seconds.
  * 10.000 records dataset: ~9 minutes.
* CPU upgrade (with persistent storage):
  * 100 records dataset: ~5 seconds.
  * 10.000 records dataset: ~6 minutes.
* CPU basic (no persistent storage):
  * 10.000 records dataset: ~101 seconds.
* CPU upgrade (no persistent storage):
  * 10.000 records dataset: ~62 seconds.

**Type of change**

- New feature (non-breaking change which adds functionality)

**How Has This Been Tested**

- [x] Testing it on HF Spaces.

**Checklist**

- I added relevant documentation
- I followed the style guidelines of this project
- I did a self-review of my code
- I made corresponding changes to the documentation
- I confirm My changes generate no new warnings
- I have added tests that prove my fix is effective or that my feature
works
- I have added relevant notes to the CHANGELOG.md file (See
https://keepachangelog.com/)

---------

Co-authored-by: Damián Pumar <[email protected]>
  • Loading branch information
jfcalvo and damianpumar authored Sep 9, 2024
1 parent fee1f5a commit 84c8aa7
Show file tree
Hide file tree
Showing 24 changed files with 308 additions and 107 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/argilla-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ jobs:
ports:
- 5432:5432

redis:
image: redis
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379

env:
HF_HUB_DISABLE_TELEMETRY: 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class DatasetRepository implements IDatasetRepository {
);

revalidateCache(`/v1/datasets/${id}`);
revalidateCache(`/v1/datasets/${id}/progress`);

return {
when: data.updated_at,
Expand Down
5 changes: 5 additions & 0 deletions argilla-server/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ These are the section headers that we use:

## [Unreleased]()

### Added

- Added [`rq`](https://python-rq.org) library to process background jobs using [Redis](https://redis.io) as a dependency. ([#5432](https://github.com/argilla-io/argilla/pull/5432))
- Added a new background job to update records status when a dataset distribution strategy is updated. ([#5432](https://github.com/argilla-io/argilla/pull/5432))

## [2.1.0](https://github.com/argilla-io/argilla/compare/v2.0.0...v2.1.0)

### Added
Expand Down
16 changes: 16 additions & 0 deletions argilla-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ pdm migrate
pdm server
```

### Run RQ background workers

```sh
pdm worker
```

## CLI commands

This section list and describe the commands offered by the `argilla_server` Python package. If you need more information about the available
Expand Down Expand Up @@ -271,6 +277,16 @@ The `argilla_server search-engine` group of commands offers functionality to wor

- `python -m argilla_server search-engine reindex`: reindex all Argilla entities into search engine.

### Background Jobs

Argilla uses [RQ](https://python-rq.org) as background job manager. RQ depends on [Redis](https://redis.io) to store and retrieve information about the jobs to be processed.

Once that you have correctly installed Redis on your system, you can start the RQ worker by running the following CLI command:

```sh
python -m argilla_server worker
```

## 🫱🏾‍🫲🏼 Contribute

To help our community with the creation of contributions, we have created our [community](https://docs.argilla.io/latest/community/) docs. Additionally, you can always [schedule a meeting](https://calendly.com/david-berenstein-huggingface/30min) with our Developer Advocacy team so they can get you up to speed.
Expand Down
19 changes: 13 additions & 6 deletions argilla-server/docker/argilla-hf-spaces/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,30 @@ COPY scripts/start.sh /home/argilla
COPY Procfile /home/argilla
COPY requirements.txt /packages/requirements.txt

RUN apt-get update && apt-get install -y \
apt-transport-https \
gnupg \
wget
RUN apt-get update && \
apt-get install -y apt-transport-https gnupg wget

# Install Elasticsearch signing key
RUN wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | gpg --dearmor -o /usr/share/keyrings/elasticsearch-keyring.gpg

# Add Elasticsearch repository
RUN echo "deb [signed-by=/usr/share/keyrings/elasticsearch-keyring.gpg] https://artifacts.elastic.co/packages/8.x/apt stable main" | tee /etc/apt/sources.list.d/elastic-8.x.list

# Install Redis signing key
RUN wget -qO - https://packages.redis.io/gpg | gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
# Add Redis repository
RUN apt-get install -y lsb-release
RUN echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | tee /etc/apt/sources.list.d/redis.list

RUN \
# Create a directory where Argilla will store the data
mkdir /data && \
apt-get update && \
# Install Elasticsearch and configure it
apt-get update && apt-get install -y elasticsearch=8.8.2 && \
apt-get install -y elasticsearch=8.8.2 && \
chown -R argilla:argilla /usr/share/elasticsearch /etc/elasticsearch /var/lib/elasticsearch /var/log/elasticsearch && \
chown argilla:argilla /etc/default/elasticsearch && \
# Install Redis
apt-get install -y redis && \
# Install image dependencies
pip install -r /packages/requirements.txt && \
chmod +x /home/argilla/start.sh && \
Expand All @@ -52,6 +58,7 @@ ENV ELASTIC_CONTAINER=true
ENV ES_JAVA_OPTS="-Xms1g -Xmx1g"

ENV ARGILLA_HOME_PATH=/data/argilla
ENV BACKGROUND_NUM_WORKERS=2
ENV REINDEX_DATASETS=1

CMD ["/bin/bash", "start.sh"]
2 changes: 2 additions & 0 deletions argilla-server/docker/argilla-hf-spaces/Procfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
elastic: /usr/share/elasticsearch/bin/elasticsearch
redis: /usr/bin/redis-server
worker: sleep 30; rq worker-pool --num-workers ${BACKGROUND_NUM_WORKERS}
argilla: sleep 30; /bin/bash start_argilla_server.sh
1 change: 1 addition & 0 deletions argilla-server/docker/argilla-hf-spaces/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
honcho
rq ~= 1.16.2
1 change: 0 additions & 1 deletion argilla-server/docker/server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ Besides the common environment variables defined in docs, this Docker image prov
- `API_KEY`: If provided, the owner api key. When `USERNAME` and `PASSWORD` are provided and `API_KEY` is empty, a new random value will be generated (Default: `""`).

- `REINDEX_DATASET`: If `true` or `1`, the datasets will be reindexed in the search engine. This is needed when some search configuration changed or data must be refreshed (Default: `0`).

33 changes: 32 additions & 1 deletion argilla-server/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions argilla-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,15 @@ dependencies = [
"httpx~=0.26.0",
"oauthlib ~= 3.2.0",
"social-auth-core ~= 4.5.0",
# Background processing
"rq ~= 1.16.2",
# Info status
"psutil >= 5.8, <5.10",
# Telemetry
"segment-analytics-python == 2.2.0",
# For logging, tracebacks, printing, progressbars
"rich != 13.1.0",
# for CLI
# For CLI
"typer >= 0.6.0, < 0.10.0", # spaCy only supports typer<0.10.0
"packaging>=23.2",
"psycopg2-binary>=2.9.9",
Expand Down Expand Up @@ -169,10 +171,11 @@ _.env_file = ".env.dev"
cli = { cmd = "python -m argilla_server.cli" }
server = { cmd = "uvicorn argilla_server:app --port 6900 --reload" }
migrate = { cmd = "alembic upgrade head" }
worker = { cmd = "python -m argilla_server worker" }
server-dev.composite = [
"migrate",
"cli database users create_default",
"server"
"server",
]
test = { cmd = "pytest", env_file = ".env.test" }

Expand Down
20 changes: 20 additions & 0 deletions argilla-server/src/argilla_server/_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import os
import shutil
import tempfile
import redis
from datetime import datetime
from pathlib import Path

Expand All @@ -40,6 +41,7 @@
from argilla_server.search_engine import get_search_engine
from argilla_server.settings import settings
from argilla_server.static_rewrite import RewriteStaticFiles
from argilla_server.jobs.queues import REDIS_CONNECTION
from argilla_server.telemetry import get_telemetry_client

_LOGGER = logging.getLogger("argilla")
Expand All @@ -50,7 +52,9 @@ async def app_lifespan(app: FastAPI):
# See https://fastapi.tiangolo.com/advanced/events/#lifespan
await configure_database()
await configure_search_engine()
configure_redis()
track_server_startup()

yield


Expand Down Expand Up @@ -265,4 +269,20 @@ async def ping_search_engine():
await ping_search_engine()


def configure_redis():
@backoff.on_exception(backoff.expo, ConnectionError, max_time=60)
def ping_redis():
try:
REDIS_CONNECTION.ping()
except redis.exceptions.ConnectionError:
raise ConnectionError(
f"Your redis instance at {settings.redis_url} is not available or not responding.\n"
"Please make sure your redis instance is launched and correctly running and\n"
"you have the necessary access permissions. Once you have verified this, restart "
"the argilla server.\n"
)

ping_redis()


app = create_server_app()
2 changes: 2 additions & 0 deletions argilla-server/src/argilla_server/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
from .database import app as database_app
from .search_engine import app as search_engine_app
from .start import start
from .worker import worker

app = typer.Typer(help="Commands for Argilla server management", no_args_is_help=True)


app.add_typer(database_app, name="database")
app.add_typer(search_engine_app, name="search-engine")
app.command(name="worker", help="Starts rq workers")(worker)
app.command(name="start", help="Starts the Argilla server")(start)


Expand Down
37 changes: 37 additions & 0 deletions argilla-server/src/argilla_server/cli/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2021-present, the Recognai S.L. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import typer

from typing import List

from argilla_server.jobs.queues import DEFAULT_QUEUE

DEFAULT_NUM_WORKERS = 2


def worker(
queues: List[str] = typer.Option([DEFAULT_QUEUE.name], help="Name of queues to listen"),
num_workers: int = typer.Option(DEFAULT_NUM_WORKERS, help="Number of workers to start"),
) -> None:
from rq.worker_pool import WorkerPool
from argilla_server.jobs.queues import REDIS_CONNECTION

worker_pool = WorkerPool(
connection=REDIS_CONNECTION,
queues=queues,
num_workers=num_workers,
)

worker_pool.start()
7 changes: 6 additions & 1 deletion argilla-server/src/argilla_server/contexts/datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
from argilla_server.database import get_async_db
from argilla_server.enums import DatasetStatus, UserRole, RecordStatus
from argilla_server.errors.future import NotUniqueError, UnprocessableEntityError
from argilla_server.jobs import dataset_jobs
from argilla_server.models import (
Dataset,
Field,
Expand Down Expand Up @@ -170,7 +171,11 @@ async def publish_dataset(db: AsyncSession, search_engine: SearchEngine, dataset
async def update_dataset(db: AsyncSession, dataset: Dataset, dataset_attrs: dict) -> Dataset:
await DatasetUpdateValidator.validate(db, dataset, dataset_attrs)

return await dataset.update(db, **dataset_attrs)
dataset = await dataset.update(db, **dataset_attrs)

dataset_jobs.update_dataset_records_status_job.delay(dataset.id)

return dataset


async def delete_dataset(db: AsyncSession, search_engine: SearchEngine, dataset: Dataset) -> Dataset:
Expand Down
14 changes: 14 additions & 0 deletions argilla-server/src/argilla_server/jobs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021-present, the Recognai S.L. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

54 changes: 54 additions & 0 deletions argilla-server/src/argilla_server/jobs/dataset_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2021-present, the Recognai S.L. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from uuid import UUID

from rq import Retry
from rq.decorators import job

from sqlalchemy import func, select

from argilla_server.models import Record, Response
from argilla_server.database import AsyncSessionLocal
from argilla_server.jobs.queues import DEFAULT_QUEUE
from argilla_server.search_engine.base import SearchEngine
from argilla_server.settings import settings
from argilla_server.contexts import distribution

JOB_TIMEOUT_DISABLED = -1
JOB_RECORDS_YIELD_PER = 100


@job(DEFAULT_QUEUE, timeout=JOB_TIMEOUT_DISABLED, retry=Retry(max=3))
async def update_dataset_records_status_job(dataset_id: UUID):
"""This Job updates the status of all the records in the dataset when the distribution strategy changes."""

record_ids = []

async with AsyncSessionLocal() as db:
stream = await db.stream(
select(Record.id)
.join(Response)
.where(Record.dataset_id == dataset_id)
.order_by(Record.inserted_at.asc())
.execution_options(yield_per=JOB_RECORDS_YIELD_PER)
)

async for record_id in stream.scalars():
record_ids.append(record_id)

# NOTE: We are updating the records status outside the database transaction to avoid database locks with SQLite.
async with SearchEngine.get_by_name(settings.search_engine) as search_engine:
for record_id in record_ids:
await distribution.update_record_status(search_engine, record_id)
Loading

0 comments on commit 84c8aa7

Please sign in to comment.