ProbableOdyssey

TIL How to compose functions and coroutines in Python

A list of functions can be composed using this pattern:


# Some example functions
def foo(x):
    return 3*x

def bar(x):
    return x + 3

def baz(x):
    return x**3

# Function to compose functions
def compose(*funcs):
    def composition(x):
        for func in funcs:
            x = func(x)
        return x
    return composition

# Example usage
def main():
    pipeline = compose(
        foo,
        bar,
        baz
    )

    x = 5
    x_out = pipeline(x)
    print(x_out)  # 5832

if __name__ == "__main__":
    main()

But what if one of your functions is a coroutine? And another one as a class with an asynchronous __call__ method?

import asyncio

async def foo_async(x):
    await asyncio.sleep(1)
    return 3*x

class BarAsync:
    def __init__(self, floob):
        self.floob = floob

    async def __call__(self, x):
        await asyncio.sleep(self.floob)
        return 3*x

def baz(x):
    return x**3

Well we can modify our pipeline to handle these cases:

def compose_async(*funcs):
    async def composition_async(x):
        for func in funcs:
            result = func(x)
            if asyncio.iscoroutine(result):
                x = await result
            else:
                x = result
        return x
    return composition_async

def main():
    pipeline = compose_async(
        foo_async,
        BarAsync(2),
        baz
    )

    x = 5
    x_out = asyncio.run(pipeline(x))  # or `await` this in another coroutine
    print(x_out)  # 5832

if __name__ == "__main__":
    main()

Bonus tip: here’s how to type-hint the example to keep mypy happy:

import asyncio
from typing import Union, Callable, Awaitable, cast

# Pipeline functions
async def foo_async(x: int) -> int:
    await asyncio.sleep(1)
    return 3*x

class BarAsync:
    def __init__(self, floob: int):
        self.floob = floob

    async def __call__(self, x: int) -> int:
        await asyncio.sleep(self.floob)
        return 3*x

def baz(x: int) -> int:
    return x**3


# Define a type alias for functions in the pipeline
PipelineFunc = Union[
    Callable[[int], int],
    Callable[[int], Awaitable[int]]
]

# Define pipeline
def compose_async(*funcs: PipelineFunc) -> Callable[[int], Awaitable[int]]:
    async def composition_async(x: int) -> int:
        for func in funcs:
            result = func(x)
            if asyncio.iscoroutine(result):
                x = await result
            else:
                x = cast(int, result)  # Tell mypy explicitly about type
        return x
    return composition_async


# Example of pipeline usage
def main() -> None:
    pipeline = compose_async(
        foo_async,
        BarAsync(2),
        baz
    )

    x = 5
    x_out = asyncio.run(pipeline(x))  # or `await` this in another coroutine
    print(x_out)  # 5832

if __name__ == "__main__":
    main()

Reply to this post by email ↪