TIL How to add a progress wheel to a blocking Python function
Threads can be used to provide a progress wheel for long-running python functions
import concurrent.futures as cf
import sys
import threading
import time
def slow_function(x: int | float) -> str:
time.sleep(x)
return f"Operation took {x} seconds"
def print_progress(stop_event):
# Create a spinning progress indicator
spin_chars = ["⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"]
while not stop_event.is_set():
for char in spin_chars:
if stop_event.is_set(): # Check if we need to stop the thread
return
sys.stdout.write(f"\r{char} Waiting...")
sys.stdout.flush()
time.sleep(0.1)
def main():
stop_event = threading.Event() # Create an event to signal thread to exit
with cf.ThreadPoolExecutor() as executor:
future_spinner = executor.submit(print_progress, stop_event)
try:
result = slow_function(2)
except KeyboardInterrupt as err:
raise err
finally:
stop_event.set()
cf.wait([future_spinner])
sys.stdout.write("\r\033[K") # Clear the current line
return result
if __name__ == "__main__":
try:
print(main())
except KeyboardInterrupt:
print("Operation cancelled")
This pattern seems like a good candidate for a decorator:
import concurrent.futures as cf
import sys
import threading
import time
def progress_wheel(func):
def print_progress(stop_event):
# Create a spinning progress indicator
spin_chars = ["⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"]
while not stop_event.is_set():
for char in spin_chars:
if stop_event.is_set(): # Check if we need to stop the thread
return
sys.stdout.write(f"\r{char} Waiting...")
sys.stdout.flush()
time.sleep(0.1)
def wrapped_func(*args, **kwargs):
stop_event = threading.Event() # Create an event to signal thread to exit
with cf.ThreadPoolExecutor() as executor:
future_spinner = executor.submit(print_progress, stop_event)
try:
result = func(*args, **kwargs)
except KeyboardInterrupt as err:
raise err
finally:
stop_event.set()
cf.wait([future_spinner])
sys.stdout.write("\r\033[K") # Clear the current line
return result
return wrapped_func
@progress_wheel
def slow_function(x: int | float) -> str:
time.sleep(x)
return f"Operation took {x} seconds"
def main():
return slow_function(2)
if __name__ == "__main__":
try:
print(main())
except KeyboardInterrupt:
print("Operation cancelled")
Experiment with other progress displays, I found a few great ideas using unicode from this stack overflow thread