Skip to content

Mild memory leak in dask workers #8164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
rabernat opened this issue Sep 6, 2023 · 14 comments
Open

Mild memory leak in dask workers #8164

rabernat opened this issue Sep 6, 2023 · 14 comments
Labels
bug Something is broken help wanted memory

Comments

@rabernat
Copy link

rabernat commented Sep 6, 2023

Describe the issue:

I have been observing a mild but consistent increase in memory when storing large arrays to Zarr. I have reproduced this with both s3fs (my real use case) and with a dummy "Dev Null" store of my own devising.

I would expect to be able to store arrays of essentially infinite size in streaming fashion. Instead, this memory leak means that eventually I will run out of memory.

I'm aware that the root of this issue may be hard to diagnose. The ultimate cause may be upstream, in Zarr, where I am a maintainer. Very happy to work with the developers here to isolate and resolve the underlying issue. 🙏

I do not know if this issue occurs with other schedulers, as I don't know how to diagnose memory usage as conveniently as with distributed.

Minimal Complete Verifiable Example:

import dask
import dask.array as da
from dask.distributed import LocalCluster
from distributed.diagnostics import MemorySampler
import zarr


class DevNullStore(dict):
    """Dummy store in which data just vanishes."""

    def __setitem__(self, key, value):
        # only store attributes
        if key == '.zarray':
            super().__setitem__(key, value)
            
            
shape = 10_000_000
chunks = 1_000
data = da.zeros(shape, chunks=chunks)

cluster = LocalCluster(
    n_workers=8, threads_per_worker=1, host="*",
)

ms = MemorySampler()
store = zarr.storage.KVStore(DevNullStore())
with cluster.get_client() as client:
    with ms.sample("nullstore-zarr_v2-compute_true"):
        data.to_zarr(store, compressor=None)
        
ms.plot(align=True)

image

As you can see, memory usage increases steadily over the course of the computation. If you make the array larger, the trend continues.

Anything else we need to know?:

A likely objection to this issue might be "this is an artifact of your funky DevNullStore". However, the same behavior can be reproduced (albeit much more slowly) with s3fs. This example requires write access to s3

import s3fs
import uuid

# replace with a bucket you can write to
s3_url = "s3://earthmover-rechunk-tmp/memory-leak-test"

def make_s3_store():
    url = f'{s3_url}/{uuid.uuid4().hex}'
    store = zarr.storage.FSStore(url=url)
    return store

with cluster.get_client() as client:
    client.restart()
    with ms.sample("s3fs-zarr_v2-compute_true"):
        data.to_zarr(make_s3_store(), compressor=None)
        
ms.plot(align=True)

image

As you can see, the overall magnitude of the memory leak is similar: about 200 MB. This suggests that it is independent of the underlying store.

Environment:

  • Dask version: 2023.7.0
  • Zarr version: 2.16.1
  • s3fs version: 2023.9.0
  • Python version: 3.10.12
  • Operating System: Linux
  • Install method (conda, pip, source): conda + pip

cc @crusaderky, who helped us with an earlier iteration of this problem.

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2023

Thanks @rabernat for the well crafted example. I suspect that folks will take a look tomorrow.

@rabernat
Copy link
Author

rabernat commented Sep 6, 2023

This issue would have been so much more difficult to describe without MemorySampler. What a great tool! 🙌

@fjetter
Copy link
Member

fjetter commented Sep 6, 2023

I strongly suspect this is due to internal state keeping. We keep logs about internal state transitions/events and even store some actual log messages. All of these logs are bound to a maximum size and most (unfortunately not all) are configurable.

In your example, 200MB on 8 workers, i.e. 25MB each sounds about right for this kind of logging.

For instance, we have Worker.log which holds task transition logs. An entry of this log could look like

("zeroes_like-store-map-4567890", "executing", "memory", "memory", {}, "task-finished-34567898765", 1234567890.098765)

and this log deque is bound to 100_000 entries. (@crusaderky the worker transition log should likely also have a configurable transition-log-length)

In [6]: format_bytes(sizeof(("zeroes_like-store-map-4567890", "executing", "memory", "memory", {}, "task-finished-34567898765", 1234567890.098765)) * 100_000)
Out[6]: '48.07 MiB'

so that's about 50MB. Once it reaches this size, we stop storing more stuff in there (or rather throw out the oldest entries). You probably haven't reached this limit, yet.

This is a known problem for the scheduler (typically it does not matter) see #5570 or #1795

The only config that affects the worker is currently distributed.admin.log-length which you can set to zero s.t. ordinary python logs are not kept in memory.

We'll look into this and if it's indeed the transition counter we'll adjust the log length and make it configurable.

@crusaderky crusaderky self-assigned this Sep 6, 2023
@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2023

@rabernat have you actually run into problems where workers run out of memory, or are you seeing the increase and afraid that it will continue forever?

@rabernat
Copy link
Author

rabernat commented Sep 6, 2023

@rabernat have you actually run into problems where workers run out of memory

No I have not. This emerged from a broader investigation into memory usage. By the time we made the graph big enough to really cause problems, we would likely first hit other known scaling limitations due to the number of tasks. It would be perfectly reasonable to say, "yes, there is a small memory leak, but it doesn't affect any real-world workflows in practice," and prioritize this issue accordingly.

@mrocklin
Copy link
Member

mrocklin commented Sep 6, 2023

we would likely first hit other known scaling limitations due to the number of tasks

Which limitation did you run into, I'm curious? (hearing about these again is useful to help prioritize them).

@rabernat
Copy link
Author

rabernat commented Sep 6, 2023

This graph had 10_000 tasks and leaked 200 MB of memory. If it had 100_000 tasks, it would leak 2GB of memory. However, at this point, our graph would start becoming large, in violation of the best practices. Symptoms of this would be very slow serialization time and high memory load on the scheduler.

@crusaderky
Copy link
Collaborator

crusaderky commented Sep 6, 2023

Reproduced.

This is just the core behaviour of dask workers - nothing to do with zarr specifically.

import asyncio
import dask.bag as db
from distributed.diagnostics import MemorySampler
from distributed import Client, wait

WIDTH = 80_000
NITER = 10
data = db.from_sequence(range(WIDTH), npartitions=WIDTH)

client = Client(n_workers=16, threads_per_worker=2)

async def wait_cleanup(dask_worker):
    while dask_worker.state.tasks:
        await asyncio.sleep(0.1)

ms = MemorySampler()
with ms.sample():
    for _ in range(NITER):
        wait(data.persist())
        client.run(wait_cleanup)

ms.plot(grid=True, align=True)

image

After forcing all deques to maxlen=0 - both the configurable ones and the hardcoded ones - the plot changes as follows:
image

as you can see, there's around 25~30 MiB worth of deques per worker, plus a genuine memory leak of roughly 400 bytes per task (2.3 GiB to 2.6 GiB over 80k tasks x 10 runs) - which is substantial in the long run.

Pending actions

  • ensure that all deques have configurable maxlen
  • simplify deque length config. There are currently 8 different settings in the config which feels like way too many. How would people feel if I reduced some deques (namely, transaction logs) from 100k entries to 10k?
    Ideally I would love to have a single setting that controls them all, and for it to be the first thing that we tell users to switch off when they walk in complaining about leaks.
  • investigate the remaining leak

@crusaderky crusaderky changed the title Mild memory leak when writing to_zarr Mild memory leak in dask workers Sep 6, 2023
@fjetter
Copy link
Member

fjetter commented Sep 6, 2023

How would people feel if I reduced some deques (namely, transaction logs) from 100k entries to 10k?

Should be fine. I would even think zero is a good default since most users don't care about this. In the rare case where we instruct folks to create dumps we can also tell them to set these numbers

@crusaderky
Copy link
Collaborator

50 runs, no deques

image

@crusaderky
Copy link
Collaborator

crusaderky commented Sep 12, 2023

I've reran the test, downstream of #8173 (log-length=0), this time without the MemoryMonitor.

import asyncio
import gc
import dask.bag as db
from dask.utils import format_bytes
from distributed.diagnostics import MemorySampler
from distributed import Client, wait

WIDTH = 80_000
data = db.from_sequence(range(WIDTH), npartitions=WIDTH)

client = Client(n_workers=16, threads_per_worker=2)

async def wait_cleanup(dask_worker):
    while dask_worker.state.tasks:
        await asyncio.sleep(0.1)

    gc.collect()
    await asyncio.sleep(0.25)
    gc.collect()
    await asyncio.sleep(0.25)
    return dask_worker.monitor.get_process_memory()

measures = []
for _ in range(100):
    wait(data.persist())
    mems = client.run(wait_cleanup)
    total = sum(mems.values())
    measures.append(total)
import pandas

df = pandas.DataFrame({"rss": measures})
df["delta"] = df.rss.diff()
df = df.iloc[1:]
df.index *= 80_000 // 16
df["delta"] = df["delta"].astype(int)
df["delta per task"] = df["delta"] // 80_000
df["delta per task"].rolling(10).mean().plot(grid=True)

On the X axis, you have the number of tasks per worker since the start of the test.
On the Y axis, you have how many bytes worth of leak each task caused (rolling average).

image

>>> df["delta per task"].loc[200_000:].mean()
28.661290322580644

So while there is still a leak in the long term, it's less than 30 bytes per task, which I don't think we should spend time investigating as the only users impacted are those that keep the same workers online for weeks at a time running flat out.

@crusaderky
Copy link
Collaborator

There's a separate aspect to this, which may not be obvious to most:

to_zarr() will keep the run_specs of the final tasks and their output (a bunch of None's) on the workers until the whole computation is completed.
So if you call it on a collection with a gigantic number of chunks running on an undersized cluster, you will see the memory usage increasing - very very slowly - over time, until the computation is completed.

This is, theoretically, solvable by wrapping the operation with a dask.graph_manipulation.checkpoint(split_every=64).
This would set an upper bound to the number of tasks in memory on the workers without causing too much overhead (I've experimented with the default split_every=4 and the time overhead was substantial). However, it would cause an order-of-magnitude increase in the problem described at #8182, which makes me very hesitant to implement it.

However.

We're talking about 16 kiB per chunk (measured below). To put things in perspective, an array worth eight terabytes, chunked into 128 MiB chunks, will produce 65536 chunks.
I've run the initial reproducer on such array, downstream of #8173, log-length=1000:

image

Note that this is on 8 workers. In real life, most people will use way more than 8 workers if they have to push through 8 terabytes of data. If you ran it on 80 workers, the cluster-wide magnitude of the leak would remain the same; in other words the leak per worker would go down from 128 MiB to 13 MiB.

@fjetter
Copy link
Member

fjetter commented Sep 13, 2023

and their output (a bunch of None's)

None is a singleton. If all tasks are returning None, this will not add to the memory blowup. The TaskState objects will, of course.


In other words:

There are two sources for increasing memory

  • internal logs that are bound to a fixed value. This fixed value will be reduced / will vanish after Review log-length configuration #8173
  • The actual graph (i.e. TaskState) and possibly result data (None won't add to this since it is a singleton) that is just building up because we're not forgetting the tasks until the entire computation is done. This is relatively little data and it will be released once the computation is finished, i.e. it is not leaked.

Is this an accurate summary @crusaderky ?


@rabernat is this kind of growth (i.e. order of 16KiB per chunk) negligible/acceptable/concerning for your application?

@crusaderky
Copy link
Collaborator

None won't add to this since it is a singleton

Well actually, 🧐 storing a None in the SpillBuffer costs ~240 bytes.

import psutil
import dask.array as da
from distributed.spill import SpillBuffer

!rm -rf spill
!mkdir spill
buf = SpillBuffer("spill", 2**44, False)
data = da.zeros(2**44)
proc = psutil.Process()

m0 = proc.memory_info().rss

for k in data.__dask_keys__():
    buf[k] = None

m1 = proc.memory_info().rss
(m1 - m0) / len(data.__dask_keys__())
242.921875

Is this an accurate summary @crusaderky ?

  1. on top of the two items you listed, there's a genuine leak of 25~30 bytes per task; more for the first ~200k tasks on the worker.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken help wanted memory
Projects
None yet
Development

No branches or pull requests

5 participants