Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timeout error when doing .to_hats with thousands partitions #525

Open
3 tasks
hombit opened this issue Dec 6, 2024 · 1 comment
Open
3 tasks

Timeout error when doing .to_hats with thousands partitions #525

hombit opened this issue Dec 6, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@hombit
Copy link
Contributor

hombit commented Dec 6, 2024

Bug report

When writing many partitions with .to_hats the job fails after all parquet files are generated.

Reproducible example (PSC path)

import lsdb
from dask.distributed import Client


PS1_PATH = '/ocean/projects/phy210048p/shared/hats/catalogs/ps1/ps1_otmo'

in_catalog = lsdb.read_hats(PS1_PATH, columns=['objID', 'raMean', 'decMean'])
out_catalog = in_catalog.map_partitions(lambda df: df.head(2))

with Client(n_workers=20, threads_per_worker=1) as client:
    display(client)
    out_catalog.partitions[:10000].to_hats('tmp')
Traceback
CancelledError                            Traceback (most recent call last)
File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py:1910](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py#line=1909), in wait_for(fut, timeout)
   1909 async with asyncio.timeout(timeout):
-> 1910     return await fut

CancelledError: 

The above exception was the direct cause of the following exception:

TimeoutError                              Traceback (most recent call last)
File <timed exec>:21

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/client.py:1737](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/client.py#line=1736), in Client.__exit__(self, exc_type, exc_value, traceback)
   1730             raise  # pragma: nocover
   1731         warnings.warn(
   1732             "It is deprecated to enter and exit the Client context "
   1733             "manager from different threads",
   1734             DeprecationWarning,
   1735             stacklevel=2,
   1736         )
-> 1737 self.close()

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/client.py:1998](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/client.py#line=1997), in Client.close(self, timeout)
   1995         coro = wait_for(coro, timeout)
   1996     return coro
-> 1998 sync(self.loop, self._close, fast=True, callback_timeout=timeout)
   1999 assert self.status == "closed"
   2001 if not self._is_finalizing():

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py:439](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py#line=438), in sync(loop, func, callback_timeout, *args, **kwargs)
    436         wait(10)
    438 if error is not None:
--> 439     raise error
    440 else:
    441     return result

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py:413](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py#line=412), in sync.<locals>.f()
    411         awaitable = wait_for(awaitable, timeout)
    412     future = asyncio.ensure_future(awaitable)
--> 413     result = yield future
    414 except Exception as exception:
    415     error = exception

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/tornado/gen.py:766](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/tornado/gen.py#line=765), in Runner.run(self)
    764 try:
    765     try:
--> 766         value = future.result()
    767     except Exception as e:
    768         # Save the exception for later. It's important that
    769         # gen.throw() not be called inside this try[/except](http://127.0.0.1:8888/except) block
    770         # because that makes sys.exc_info behave unexpectedly.
    771         exc: Optional[Exception] = e

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py:1910](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py#line=1909), in wait_for(fut, timeout)
   1908 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
   1909     async with asyncio.timeout(timeout):
-> 1910         return await fut

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py:805](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py#line=804), in _LogErrors.__call__.<locals>.wrapper(*args, **kwargs)
    803 async def wrapper(*args, **kwargs):
    804     with self:
--> 805         return await func(*args, **kwargs)

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/client.py:1945](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/client.py#line=1944), in Client._close(self, fast)
   1943 if self._start_arg is None:
   1944     with suppress(AttributeError):
-> 1945         await self.cluster.close()
   1947 await self.rpc.close()
   1949 self.status = "closed"

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/deploy/spec.py:448](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/deploy/spec.py#line=447), in SpecCluster._close(self)
    446 if isawaitable(f):
    447     await f
--> 448 await self._correct_state()
    449 await asyncio.gather(*self._futures)
    451 if self.scheduler_comm:

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/deploy/spec.py:359](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/deploy/spec.py#line=358), in SpecCluster._correct_state_internal(self)
    353         await self.scheduler_comm.retire_workers(workers=list(to_close))
    354     tasks = [
    355         asyncio.create_task(self.workers[w].close())
    356         for w in to_close
    357         if w in self.workers
    358     ]
--> 359     await asyncio.gather(*tasks)
    360 for name in to_close:
    361     if name in self.workers:

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/nanny.py:619](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/nanny.py#line=618), in Nanny.close(self, timeout, reason)
    617 self.stop()
    618 if self.process is not None:
--> 619     await self.kill(timeout=timeout, reason=reason)
    621 self.process = None
    622 await self.rpc.close()

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/nanny.py:400](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/nanny.py#line=399), in Nanny.kill(self, timeout, reason)
    394 """Kill the local worker process
    395 
    396 Blocks until both the process is down and the scheduler is properly
    397 informed
    398 """
    399 if self.process is not None:
--> 400     await self.process.kill(reason=reason, timeout=timeout)

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/nanny.py:883](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/nanny.py#line=882), in WorkerProcess.kill(self, timeout, executor_wait, reason)
    879     logger.warning(
    880         f"Worker process still alive after {wait_timeout:.1f} seconds, killing"
    881     )
    882     await process.kill()
--> 883     await process.join(max(0, deadline - time()))
    884 except ValueError as e:
    885     if "invalid operation on closed AsyncProcess" in str(e):

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/process.py:330](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/process.py#line=329), in AsyncProcess.join(self, timeout)
    327     return
    328 # Shield otherwise the timeout cancels the future and our
    329 # on_exit callback will try to set a result on a canceled future
--> 330 await wait_for(asyncio.shield(self._exit_future), timeout)

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py:1909](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/site-packages/distributed/utils.py#line=1908), in wait_for(fut, timeout)
   1908 async def wait_for(fut: Awaitable[T], timeout: float) -> T:
-> 1909     async with asyncio.timeout(timeout):
   1910         return await fut

File [/ocean/projects/phy210048p/malanche/lsdb-tests/cenv/lib/python3.11/asyncio/timeouts.py:115](http://127.0.0.1:8888/lab/tree/lsdb-tests/lsdb-tests/cenv/lib/python3.11/asyncio/timeouts.py#line=114), in Timeout.__aexit__(self, exc_type, exc_val, exc_tb)
    110     self._state = _State.EXPIRED
    112     if self._task.uncancel() <= self._cancelling and exc_type is exceptions.CancelledError:
    113         # Since there are no new cancel requests, we're
    114         # handling this.
--> 115         raise TimeoutError from exc_val
    116 elif self._state is _State.ENTERED:
    117     self._state = _State.EXITED

TimeoutError:

Setting these timeouts doesn't help:

dask.config.set({
    "distributed.comm.timeouts.connect": "600s",
    "distributed.comm.timeouts.tcp": "600s",
    "distributed.nanny.shutdown-timeout": "600s",
    "distributed.deploy.lost-worker-timeout": "600s",
    "distributed.scheduler.idle-timeout": "600s",
    "distributed.scheduler.no-workers-timeout": "600s",
    "distributed.scheduler.locks.lease-timeout": "600s",
})

Before submitting
Please check the following:

  • I have described the situation in which the bug arose, including what code was executed, information about my environment, and any applicable data others will need to reproduce the problem.
  • I have included available evidence of the unexpected behavior (including error messages, screenshots, and/or plots) as well as a description of what I expected instead.
  • If I have a solution in mind, I have provided an explanation and/or pseudocode and/or task list.
@hombit hombit added the bug Something isn't working label Dec 6, 2024
@delucchi-cmu
Copy link
Contributor

Can you try to reproduce with PR #534?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
Status: No status
Development

No branches or pull requests

2 participants