Skip to content



Folders and files

Last commit message
Last commit date

Latest commit


Repository files navigation

A dataflow based workflow framework.

work in progress

Install with PyPi Github release Documentation Version Downloads Downloads per week Build Status codecov license


  • Intutive syntax: Dataflow-like flow/task composing syntax similar to function call.
    • Inspired from nextflow 's DSL2.
  • Pure python: No DSL, Import/Compose/Modify Task/Flow python objects at will.
    • Extensible and interactive due to dynamic nature of Python.
      • Task Cache.
      • ...
  • Concurrent: Task runs implicitly parallel in asyncio event loop.
  • Distributable: Use Dask distributed as Task executor, can deploy in local, cluster, cloud.
  • Hybrid execution model.
    • Build Flow in Local python or web UI.
    • Schedule/Monitor flow execution in remote server through python or web UI.

Web UI



pip install flowsaber


  • A minimal working example consists most features and usages of flowsaber.
from flowsaber.api import *

def add(self, num):  # self is optional
    return num + 1

def multiply(num1, num2):
    return num1 * num2

def write(num):
    """echo {num} > {num}.txt"""
    return '*.txt'

def read(f: File):
    return open(str(f)).readlines()

def sub_flow(num):
    return add(num) | map_(lambda x: x ** 2) | add

def my_flow(num):
    [sub_flow(num), sub_flow(num)] | multiply \
    | write | read | flatten \
    | map_(lambda x: int(x.strip())) \
    | view

num_ch = Channel.values(1, 2, 3, 4, 5, 6, 7, 8)
# resolve dependencies
workflow = my_flow(num=num_ch)

Example 2

This is a bioinformatics workflow, rewrite verion of snakemake tutorial

from flowsaber.api import *

def bwa(self, fa: File, fastq: File):  # input will be automatically converted if has type annotation
    """bwa mem -t {self.config.cpu} {fa} {fastq} | samtools view -Sb - > {fastq.stem}.bam"""
    return "*.bam"  # for ShellTask, str variable in the return will be treated as File and globed

def sort(bam: File):  # self is optional in case you don't want to access the current task
    """samtools sort -o {sorted_bam} {bam}"""
    sorted_bam = f"{bam.stem}.sorted.bam"
    return sorted_bam

def call(fa: File, bams: list):  # In case you need to write some python codes
    """samtools mpileup -g -f {fa} {bam_files} | bcftools call -mv - > all.vcf"""
    bam_files = ' '.join(str(bam) for bam in bams)
    return "all.vcf"

def stats(vcf: File):
    import matplotlib
    import matplotlib.pyplot as plt
    from pysam import VariantFile

    quals = [record.qual for record in VariantFile(str(vcf))]


def call_vcf_flow():
    def _call(bams):  # task is normal function, use python as wish
        return call(fa, bams)

    context = flowsaber.context
    fa = Channel.value(context.fa)
    fastq = Channel.values(*context.fastq)

    bam1 = bwa(fa, fastq)  # automatically clone channel
    bam2 = bwa(fa, fastq)
    mix(bam1, bam2) | sort | collect | _call | stats

prefix = 'tests/test_flow/snamke-demo.nosync/data'
with flowsaber.context({
    "fa": f'{prefix}/genome.fa',
    "fastq": [f'{prefix}/samples/{sample}' for sample in ['A.fastq', 'B.fastq', 'C.fastq']]
    # resolve dependency
    workflow = call_vcf_flow()

Example to run in remote

Both server and agent need to be run in background before submitting flowruns.

Start server(API endpoint)

In bash shell.

flowsaber server
Start agent(Flow dispatcher)

In bash shell.

flowsaber agent --server "" --id test
Create flow and schedule for running

In python script or IPython console.

from flowsaber.api import *
def add(num):
    print("This is meesage send by print to stdout in task")
    print("This is meesage send by print to stderr in task", file= sys.stderr)
    a = 1
    for i in range(10000000):
        a += 1
    return num + 1

def myflow(num):
    return num | add | add | view | add | view

num_ch = Channel.values(*list(range(10)))
f = myflow(num_ch)

run(f, server_address="", agent_id="test")


python -m pytest tests -s -o log_cli=True -vvvv


  • Pbs/Torque executor
  • More cache mode.
  • Supportrun in Cloud platform.
  • Run CWL script, Convert between CWL and flowsaber flow.



Dataflow based workflow framework







No packages published


  • Python 100.0%