Skip to content

Propose pattern for pipelines with multiple/optional correction steps #211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions docs/recipes/optional-pipeline-steps.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "0",
"metadata": {},
"source": [
"# Optional Pipeline Steps\n",
"\n",
"## Background\n",
"\n",
"A recurring requirement, e.g., in data analysis workflows, is to have optional steps in a pipeline.\n",
"This could be a chain of multiple corrections to be applied, where a user can choose to apply or skip each step.\n",
"This brings two challenges.\n",
"Firstly, Sciline does not provide a way to skip steps in a pipeline, since it is in conflict with the idea that a pipeline is a directed acyclic graph (DAG).\n",
"Secondly, attempts to work around this limitation, e.g., by using providers that perform no operation instead of a correction, depending on a flag are hampered by cumbersome and misleading domain type naming.\n",
"Adding additional corrections furthermore require access to the source code of the pipeline, which is not always practical when the pipeline is part of a library.\n",
"\n",
"## Solution\n",
"\n",
"The solution is to move away from domain types that reflect, e.g., which corrections were applied to the data.\n",
"That is, instead of having `DataWithCorrection1` and `DataWithCorrection1And2`, we have use a single `CorrectedData` and a single pipeline step that applies all corrections."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"from typing import NewType\n",
"import sciline\n",
"\n",
"RawData = NewType('RawData', float)\n",
"CorrectedData = NewType('CorrectedData', float)\n",
"CorrectionA = NewType('CorrectionA', float)\n",
"CorrectionB = NewType('CorrectionB', float)\n",
"\n",
"\n",
"def compute_correction_a() -> CorrectionA:\n",
" # Placeholder for actual computation logic\n",
" return 1.0\n",
"\n",
"\n",
"def compute_correction_b() -> CorrectionB:\n",
" # Placeholder for actual computation logic\n",
" return 2.0\n",
"\n",
"\n",
"def _do_correction_a(raw_data: float, correction_a: CorrectionA) -> float:\n",
" return raw_data * correction_a\n",
"\n",
"\n",
"def _do_correction_b(raw_data: float, correction_b: CorrectionB) -> float:\n",
" return raw_data - correction_b\n",
"\n",
"\n",
"def apply_a(raw_data: RawData, correction_a: CorrectionA) -> CorrectedData:\n",
" corrected_data = _do_correction_a(raw_data, correction_a)\n",
" return CorrectedData(corrected_data)\n",
"\n",
"\n",
"def apply_b(raw_data: RawData, correction_b: CorrectionB) -> CorrectedData:\n",
" corrected_data = _do_correction_b(raw_data, correction_b)\n",
" return CorrectedData(corrected_data)\n",
"\n",
"\n",
"def apply_a_and_b(\n",
" raw_data: RawData, correction_a: CorrectionA, correction_b: CorrectionB\n",
") -> CorrectedData:\n",
" corrected_data = _do_correction_a(raw_data, correction_a)\n",
" corrected_data = _do_correction_b(corrected_data, correction_b)\n",
" return CorrectedData(corrected_data)\n",
"\n",
"\n",
"pl = sciline.Pipeline((compute_correction_a, compute_correction_b, apply_a_and_b))\n",
"pl.visualize(mode='both')"
]
},
{
"cell_type": "markdown",
"id": "2",
"metadata": {},
"source": [
"Above, we used `apply_a_and_b` to apply both corrections `a` and `b`.\n",
"To control which corrections to apply, we can insert the desired `apply` function into the pipeline:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "3",
"metadata": {},
"outputs": [],
"source": [
"pl.insert(apply_a)\n",
"pl.visualize(mode='both')"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4",
"metadata": {},
"outputs": [],
"source": [
"pl.insert(apply_b)\n",
"pl.visualize(mode='both')"
]
},
{
"cell_type": "markdown",
"id": "5",
"metadata": {},
"source": [
"While this does not solve all problems, it allows us to create a pipeline that can be easily modified to include or exclude corrections.\n",
"The pipeline author will need to foresee this in the pipeline design, and thin wrapper functions need to be maintained for each combination of corrections.\n",
"While there is an upfront cost to this, it will allow pipeline users to not only select between the pre-defined corrections, but also add new corrections or combinations of corrections to the pipeline.\n",
"The limitation is that these corrections will all be applied at the same point in the pipeline, unless the pipeline author has foreseen this by allowing for different correction stages in the pipeline."
]
},
{
"cell_type": "markdown",
"id": "6",
"metadata": {},
"source": [
"## Variants\n",
"\n",
"A variant of the above approach is to use a single function that can apply all given corrections.\n",
"The selection of corrections is then done by inserting one of a set of providers that collect the desired corrections.\n",
"This is similar to the approach above, but allows for better control and less duplication of the code that applies the corrections."
]
}
],
"metadata": {
"kernelspec": {
"display_name": "dev310",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}