mis.pipeline.execution
[docs]
module
mis.pipeline.execution
| from __future__ import annotations
import abc
import asyncio
from enum import Enum
from typing import Callable, Generic, TypeVar
class Status(str, Enum):
IN_PROGRESS = "in-progress"
SUCCESS = "success"
FAILURE = "failure"
Result = TypeVar("Result")
Transformed = TypeVar("Transformed")
class Execution(abc.ABC, Generic[Result]):
"""
A task being sent to a quantum device and which
may need some time before it is completed.
"""
@abc.abstractmethod
def result(self) -> Result: ...
@abc.abstractmethod
async def wait(self) -> Result: ...
@abc.abstractmethod
def status(self) -> Status: ...
def map(self, transform: Callable[[Result], Transformed]) -> Execution[Transformed]:
"""
Apply a transformation to the result once it is
complete.
"""
return MappedExecution(self, transform)
@classmethod
def success(cls, result: Result) -> Execution[Result]:
"""
Shortcut to return a result that has already succeeded.
"""
return SuccessfulExecution(result)
class WaitingExecution(Execution[Result]):
"""
A task being sent to a quantum device and which
definitely needs some time before it is completed.
Unless you're implementing new executors, you're probably
not interested in this class.
"""
def __init__(self, sleep_sec: int):
self._sleep_sec = sleep_sec
async def wait(self) -> Result:
while self.status() is Status.IN_PROGRESS:
await asyncio.sleep(self._sleep_sec)
return self.result()
class MappedExecution(Execution[Transformed]):
"""
The result of calling `map` on an `Execution`.
Unless you're implementing new executors, you're probably
not interested in this class.
"""
def __init__(self, origin: Execution[Result], transform: Callable[[Result], Transformed]):
self._cache_filled = False
self._origin = origin
self._transform = transform
def result(self) -> Transformed:
original = self._origin.result()
return self._transform(original)
def status(self) -> Status:
return self._origin.status()
async def wait(self) -> Transformed:
original = await self._origin.wait()
return self._transform(original)
class SuccessfulExecution(Execution[Result]):
"""
An execution that is already completed.
Unless you're implementing new executors, you're probably
not interested in this class.
"""
def __init__(self, result: Result):
self._result = result
def status(self) -> Status:
return Status.SUCCESS
async def wait(self) -> Result:
# No need to wait.
return self.result()
def result(self) -> Result:
return self._result
def map(self, transform: Callable[[Result], Transformed]) -> Execution[Transformed]:
# Since we know that we're not waiting for anything,
# we can perform the `map` call immediately.
mapped = transform(self.result())
return SuccessfulExecution(result=mapped)
|