Let me sleep on it

Improving concurrency in unexpected ways


$ whoami
Taming the Vdsm beast since 2013
Tinkering with Python since 2003
Free software enthusiast
Father of two
          

Python threads suck

The GIL

The global interpretor lock will let only one thread run Python code at a time

What's going on here?


Py_BEGIN_ALLOW_THREADS
n = read(fd, buf, count);
Py_END_ALLOW_THREADS
            

Python releases the GIL when possible

Python threads are useful

You can do I/O or wait for other programs concurrently

Sequential code is easy

When using threads, you can write simple and clear code, as if you are the only one running

Vdsm uses lot of threads

What is Vdsm?

Vdsm manages virtual machines on a hypervisor

Virtual machines need storage

  • Vdsm provides storage for virtual machines
  • Typically shared storage

Shared storage is extremely fast

Seen 750MiB/s writes using direct I/O to SSD disk array

Shared storage is horribly slow


$ ps -p 32729 -o stat -o cmd
STAT CMD
D+   dd if=/dev/zero of=mnt/test bs=8M count=1280 oflag=direct
            

Vdsm monitors storage

  • Every storage domain has a dedicated thread
  • We can have 50 storage domains

Monitor threads are isolated

If one thread gets stuck on unresponsive storage, other threads are not affected

Vdsm uses LVM

  • Vdsm manages block storage using LVM
  • Creates logical volumes for VM disks and snapshots

Accessing LVM metadata is slow

Vdsm caches LVM metadata

“There are only two hard things in Computer Science: cache invalidation and naming things.”
-- Phil Karlton

Monitor threads refresh LVM cache

Monitor threads invalidate the cache and run LVM commands to reload the cache

Operation Mutex

LVM Cache


class LVMCache(object):
    ...
    def _invalidate_lvs(self, vg_name, lv_names):
        with self._opmutex.locked(LVM_OP_INVALIDATE):
            for lv_name in lv_names:
                self._lvs[(vg_name, lv_name)] = Stub(lv_name)

    def _reload_lvs(self, vg_name, lv_names):
        with self._opmutex.locked(LVM_OP_RELOAD):
            lvm_output = self._run_lvs(vg_name, lv_names)
            for lv in self._parse_lvs(lvm_output):
                self._lvs[(vg_name, lv_name)] = lv
            

(Simplified)

LVM cache uses fancy locking

  • Multiple threads can invalidate the cache at the same time
  • Multiple threads can reload the cache at the same time
  • Invalidate and reload cannot run at the same time

How Operation Mutex works

  1. Thread-1 acquires the mutex for an invalidate operation
  2. Thread-2 tries to acquire the mutex for a reload operation, waiting...
  3. Thread-3 enters the mutex for an invalidate operation
  4. Thread-1 exits the mutex
  5. Thread-3 exits and release the mutex
  6. Thread-2 wakes up and acquires the mutex for a reload operation

Operation Mutex [1/3]


class OperationMutex(object):

    def __init__(self):
        self._cond = threading.Condition(threading.Lock())
        self._operation = None
        self._holders = 0

    @contextmanager
    def locked(self, operation):
        self._acquire(operation)
        try:
            yield self
        finally:
            self._release()
            

Operation Mutex [2/3]


    def _acquire(self, operation):
        with self._cond:
            while self._operation not in (operation, None):
                self._cond.wait()
            if self._operation is None:
                self._operation = operation
            self._holders += 1
            

(Logging removed)

Operation Mutex [3/3]


    def _release(self):
        with self._cond:
            self._holders -= 1
            if self._holders == 0:
                self._operation = None
                self._cond.notify_all()
            

(Logging removed)

Operation Mutex in practice

  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_reload_vgs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_reload_vgs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_reload_vgs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...
  • (_invalidate_lvs) 'lvm reload' is holding the mutex, waiting...

(Simplified, colored)

We have a problem

  • Too many threads waiting...
  • Most of the time only one thread is inside the operation mutex

Operation mutex is harmful

  • Lots of storage domains...
  • Storage is overloaded...
  • LVM commands become slow...
  • Monitor threads waiting...
  • Monitoring timeouts...
  • Hypervisor goes down

Improving concurrency

How are Operation Mutex tests passing?

There are no Operation Mutex tests

“Code without tests is broken.”
-- Nir Soffer

Adding tests

Let's start with the easy case, allowing multiple threads to perform the same operation

Testing same operation


def test_same_operation():
    m = opmutex.OperationMutex()

    def worker(n):
        with m.locked("operation"):
            time.sleep(1.0)

    elapsed = run_threads(worker, 50)

    assert elapsed < 2.0
            

We need some help


def run_threads(func, count):
    threads = []
    start = time.time()
    try:
        for i in range(count):
            t = threading.Thread(target=func,
                                 args=(i,),
                                 name="worker-%02d" % i)
            t.daemon = True
            t.start()
            threads.append(t)
    finally:
        for t in threads:
            t.join()
    return time.time() - start
            

Test passes!

  • opmutex_test.py::test_same_operation PASSED

Refresh LVM cache flow

  1. Invalidate cache
  2. Reload cache

Time matters

  1. Invalidate updates a dict - microseconds
  2. Reload goes to storage - seconds

Testing refresh flow


def test_refresh_flow(run):
    count = 50
    m = opmutex.OperationMutex()
    cache = ["old"] * count

    def worker(n):
        with m.locked("invalidate"):
            cache[n] = "invalid"
        with m.locked("reload"):
            time.sleep(1.0)
            cache[n] = "new"

    elapsed = run_threads(worker, count)
    assert elapsed < 2.0
    assert cache == ["new"] * count
            

Test fails randomly

  • Testing threading issues is tricky
  • How can we make it fail consistently?

Making it fail


@pytest.mark.parametrize("run", range(10))
def test_refresh_flow(run):
    count = 50
    m = opmutex.OperationMutex()
    cache = ["old"] * count

    def worker(n):
        with m.locked("invalidate"):
            cache[n] = "invalid"
        with m.locked("reload"):
            time.sleep(1.0)
            cache[n] = "new"

    elapsed = run_threads(worker, count)
    assert elapsed < 2.0
            

Failing tests are good

  • opmutex_test.py::test_same_operation PASSED
  • opmutex_test.py::test_refresh_flow[0] FAILED
  • opmutex_test.py::test_refresh_flow[1] PASSED
  • opmutex_test.py::test_refresh_flow[2] PASSED
  • opmutex_test.py::test_refresh_flow[3] PASSED
  • opmutex_test.py::test_refresh_flow[4] PASSED
  • opmutex_test.py::test_refresh_flow[5] FAILED
  • opmutex_test.py::test_refresh_flow[6] PASSED
  • opmutex_test.py::test_refresh_flow[7] FAILED
  • opmutex_test.py::test_refresh_flow[8] PASSED
  • opmutex_test.py::test_refresh_flow[9] FAILED

Test failure log

  • worker-00: Operation 'invalidate' acquired the mutex
  • worker-00: Operation 'invalidate' released the mutex
  • worker-00: Operation 'reload' acquired the mutex
  • worker-01: Operation 'reload' is holding the mutex, waiting...
  • worker-02: Operation 'reload' is holding the mutex, waiting...
  • worker-03: Operation 'reload' is holding the mutex, waiting...
  • worker-04: Operation 'reload' is holding the mutex, waiting...
  • worker-05: Operation 'reload' is holding the mutex, waiting...
  • worker-06: Operation 'reload' is holding the mutex, waiting...
  • worker-07: Operation 'reload' is holding the mutex, waiting...
  • worker-08: Operation 'reload' is holding the mutex, waiting...
  • worker-09: Operation 'reload' is holding the mutex, waiting...
  • worker-10: Operation 'reload' is holding the mutex, waiting...

Why does it fail?

  1. worker-00 acquires the GIL
  2. worker-00 acquires the operation mutex for an invalidate operation
  3. Other workers could enter the operation mutex, but worker-00 is holding the GIL...
  4. worker-00 releases the operation mutex
  5. worker-00 acquires the operation mutex again for a reload operation
  6. worker-00 releases the GIL during sleep
  7. Other workers cannot enter the operation mutex, waiting...

How should it work

  1. All workers enter the operation mutex for an invalidate operation
  2. All workers exit the operation mutex
  3. All workers enter the operation mutex for a reload operation
  4. All workers exit the operation mutex

Can we fix it?

Need to sleep on it...

Threads are not polite

When you enter a building, you hold the door so the next person can enter

How can we make threads polite?

When entering the operation mutex, take a little nap, letting other threads in

Take a little nap


    @contextmanager
    def locked(self, operation):
        self._acquire(operation)
        try:
            # Give other threads chance to get in.
            time.sleep(0.01)
            yield self
        finally:
            self._release()
            

Green again!

  • opmutex_test.py::test_same_operation PASSED
  • opmutex_test.py::test_refresh_flow[0] PASSED
  • opmutex_test.py::test_refresh_flow[1] PASSED
  • opmutex_test.py::test_refresh_flow[2] PASSED
  • opmutex_test.py::test_refresh_flow[3] PASSED
  • opmutex_test.py::test_refresh_flow[4] PASSED
  • opmutex_test.py::test_refresh_flow[5] PASSED
  • opmutex_test.py::test_refresh_flow[6] PASSED
  • opmutex_test.py::test_refresh_flow[7] PASSED
  • opmutex_test.py::test_refresh_flow[8] PASSED
  • opmutex_test.py::test_refresh_flow[9] PASSED

Fixed test log

  • worker-00: Operation 'invalidate' acquired the mutex
  • worker-01: Operation 'invalidate' entered the mutex
  • worker-02: Operation 'invalidate' entered the mutex
  • worker-03: Operation 'invalidate' entered the mutex
  • worker-04: Operation 'invalidate' entered the mutex
  • worker-05: Operation 'invalidate' entered the mutex
  • worker-06: Operation 'invalidate' entered the mutex
  • worker-00: Operation 'invalidate' exited the mutex
  • worker-07: Operation 'invalidate' entered the mutex
  • worker-01: Operation 'invalidate' exited the mutex
  • worker-00: Operation 'invalidate' is holding the mutex, waiting...
  • worker-02: Operation 'invalidate' exited the mutex
  • ...

Fix available in ovirt-3.6

Are we done?

Why do we need the operation mutex?

Need to sleep on it little bit more...

We don't

  • No need to separate invalidate and reload operations
  • Multiple threads modifying the cache is not thread safe
  • The real problem is monitor thread isolation
  • Monitor threads should not refresh LVM cache

Operation Mutex was removed in ovirt-4.0

More on this in my PyCon Israel 2018 talk?

“The Best Code is No Code At All.”
-- http://quotes.cat-v.org/programming

More info

Fork this talk on github
https://github.com/nirs/let-me-sleep-on-it

oVirt - open your virtual datacenter
https://github.com/ovirt

We are hiring
https://jobs.redhat.com

Thank you!

Questions?