Multi-step Workflows

Overview

  • Tutorial: 30 min

    Objectives:
    • Learn how to implement multi-step workflows.

Connecting multiple processes is essential for building real-world workflows. Here we look into how to link them in a multi-step pipeline.

 1process sayHello {
 2
 3    publishDir 'results', mode: 'copy'
 4
 5    input:
 6        val message
 7
 8    output:
 9        path "${message}-output.txt"
10
11    script:
12    """
13    echo '$message' > '$message-output.txt'
14    """
15}
16
17
18process convertToUpper {
19
20    publishDir 'results', mode: 'copy'
21
22    input:
23        path input_file
24
25    output:
26        path "UPPER-${input_file}"
27
28    script:
29    """
30    cat '$input_file' | tr '[a-z]' '[A-Z]' > 'UPPER-${input_file}'
31    """
32}
33
34params.message = '../data/data_message.csv'
35
36workflow {
37
38    message_ch = Channel.fromPath(params.message)
39                     .view { "Before splitCsv: $it" }
40                     .splitCsv()
41                     .view { "After splitCsv: $it" }
42                     .map { item -> item[0] }
43                     .view { "After map: $it" }
44
45    sayHello(message_ch)
46
47    convertToUpper(sayHello.out)
48}

convertToUpper is the second process in the workflow. It uses the tr command within UNIX to convert a string to uppercase.

Nextflow follows a dataflow model, allowing process outputs to seamlessly feed into other processes. By default, a process’s output is stored in <process>.out, so sayHello.out can be directly used as input for convertToUpper().

Run the following workflow:

1nextflow run 10_upper_case.nf

How can we process multiple output files from one process into a single file? Or combine different files from a process into a single summary file?

To illustrate this, we add another process to the workflow that executes the following command:

1echo 'Hello' | tr '[a-z]' '[A-Z]' > UPPER-Hello-output.txt
2echo 'Bonjour' | tr '[a-z]' '[A-Z]' > UPPER-Bonjour-output.txt
3echo 'Holà' | tr '[a-z]' '[A-Z]' > UPPER-Holà-output.txt
4cat UPPER-Hello-output.txt UPPER-Bonjour-output.txt UPPER-Holà-output.txt > COLLECTED-output.txt
 1process collectGreetings {
 2
 3    publishDir 'results', mode: 'copy'
 4
 5    input:
 6        path input_files
 7
 8    output:
 9        path "COLLECTED-output.txt"
10
11    script:
12    """
13    cat ${input_files} > 'COLLECTED-output.txt'
14    """
15}
16
17.....
18.....
19
20workflow {
21
22    message_ch = Channel.fromPath(params.message)
23                     .view { "Before splitCsv: $it" }
24                     .splitCsv()
25                     .view { "After splitCsv: $it" }
26                     .map { item -> item[0] }
27                     .view { "After map: $it" }
28
29    sayHello(message_ch)
30
31    convertToUpper(sayHello.out)
32
33    collectGreetings(convertToUpper.out)
34}

Explanation

  1. The path prefix works for multiple files, so no special handling is needed.

  2. The process must handle any number of input files dynamically.

  3. If the input channel contains [file1.txt, file2.txt, file3.txt], Nextflow should generate cat file1.txt file2.txt file3.txt.

  4. Simply using cat ${input_files} in the script allows Nextflow to handle this automatically.

Run the following workflow and test if you are getting the expected output:

1nextflow run 11_collect.nf

The collection step ran separately for each greeting, which is not the intended behavior. To ensure the third step processes all items from convertToUpper() together, we need to explicitly instruct Nextflow.

Collect() Operator

The collect operator collects all items from a source channel into a list and emits it as a single item.

1sayHello(message_ch)
2
3convertToUpper(sayHello.out)
4
5collectGreetings(convertToUpper.out.collect())

Run the following workflow:

1nextflow run 12_collect_corrected.nf

Key Points

  1. In Nextflow, multiple computational steps can be combined into a single workflow.

  2. The collect operator gathers all channel items into a list and emits them as a single item.