我们已经了解了如何将数据输入到进程中; 现在我们将了解如何从进程中输出文件和值。
output 声明块允许我们定义进程用于发送生成的文件和值的通道。
与输入类似,输出数据的类型也是使用类型限定符定义的。
val 限定符允许我们输出脚本中定义的值。
因为 Nextflow 进程只能通过通道进行通信,所以如果我们希望将一个值输入共享到一个进程中,作为另一个进程的输入,我们需要在输出声明块中定义该值,如下例所示:
//process_output_value.nf
nextflow.enable.dsl=2
process METHOD {
input:
val x
output:
val x
script:
"""
echo $x > method.txt
"""
}
// Both 'Channel' and 'channel' keywords work to generate channels.
// However, it is a good practice to be consistent through the whole pipeline development
methods_ch = channel.of('salmon', 'kallisto')
workflow {
METHOD(methods_ch)
// use the view operator to display contents of the channel
METHOD.out.view({ "Received: $it" })
}
如果我们想要捕获一个文件而不是一个值作为输出,我们可以使用path限定符,它可以通过指定的通道捕获进程生成的一个或多个文件。
//process_output_file.nf
nextflow.enable.dsl=2
methods_ch = channel.of('salmon', 'kallisto')
process METHOD {
input:
val x
output:
path 'method.txt'
"""
echo $x > method.txt
"""
}
workflow {
METHOD(methods_ch)
// use the view operator to display contents of the channel
METHOD.out.view({ "Received: $it" })
}
输出如下:
executor > local (2)
[88/f82057] process > METHOD (2) [100%] 2 of 2 ✔
Received: /mnt/dev/likai/nextflow/work/ce/9716cf14d9d1e975619f9c2abe13c0/method.txt
Received: /mnt/dev/likai/nextflow/work/88/f82057da2faf4ed364b65ad3eee9ba/method.txt
在上面的例子中,在包含方法名的工作目录中创建了一个名为 method.txt 的文件。
当任务完成时,该文件将通过输出通道发送。下游的operator,例如声明与输入相同通道的.view 或process 将会接收它。
当输出文件名包含 * 或?,它被解释为模式匹配。这允许我们将多个文件捕获到一个列表中,并将它们作为一个项目通道输出。
//process_output_multiple.nf
nextflow.enable.dsl=2
process FASTQC {
input:
path read
output:
path "fqc_res/*"
script:
"""
mkdir fqc_res
fastqc $read -o fqc_res
"""
}
read_ch = channel.fromPath("data/yeast/reads/ref1*.fq.gz")
workflow {
FASTQC(read_ch)
FASTQC.out.view()
}
对于匹配模式有一些注意事项:
到目前为止,我们已经了解了如何声明多个输入和输出通道,但是每个通道一次只处理一个值。 Nextflow 可以使用tuple限定符处理组。
在元组中,第一项是分组键,第二项是列表。[group_key,[file1,file2,...]].
当使用包含元组的通道时,例如.filesFromPairs工厂方法,必须使用tuple限定符声明相应的输入声明,然后定义元组中的每个项。
//process_tuple_input.nf
nextflow.enable.dsl=2
process TUPLEINPUT{
input:
tuple val(sample_id), path(reads)
script:
"""
echo $sample_id
echo $reads
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
workflow {
TUPLEINPUT(reads_ch)
}
以同样的方式,可以使用tuple限定符声明包含元组值的输出通道,然后使用元组中每个元组元素的定义。
//process_tuple_io_fastp.nf
nextflow.enable.dsl=2
process FASTP {
input:
tuple val(sample_id), path(reads)
output:
tuple val(sample_id), path("*FP*.fq.gz")
script:
"""
fastp \
-i ${reads[0]} \
-I ${reads[1]} \
-o ${sample_id}_FP_R1.fq.gz \
-O ${sample_id}_FP_R2.fq.gz
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
workflow {
FASTP(reads_ch)
FASTP.out.view()
}
输出如下:
[ref1, [work/9b/fcca75db83718a15f7a95caabfbc15/ref1_FP_R1.fq.gz, work/9b/fcca75db83718a15f7a95caabfbc15/ref1_FP_R2.fq.gz]]
when声明根据各种输入和参数的状态启用/禁用流程执行。例如:
//process_when.nf
nextflow.enable.dsl=2
process CONDITIONAL {
input:
val chr
when:
chr <= 5
script:
"""
echo $chr
"""
}
chr_ch = channel.of(1..22)
workflow {
CONDITIONAL(chr_ch)
}
指令声明允许定义可选设置,如 CPU 数量和内存量,这些设置影响当前进程的执行,而不影响任务本身。
指令必须在进程体的最上方,在任何其他的声明块之前(如:input,output等)。
注意: 为指令赋值时不使用 = 。
指令通常用于定义要使用的计算资源量或用于配置日志记录。例如:
//process_directive.nf
nextflow.enable.dsl=2
process PRINTCHR {
tag "tagging with chr$chr"
cpus 1
echo true
input:
val chr
script:
"""
echo processing chromosome: $chr
echo number of cpus $task.cpus
"""
}
chr_ch = channel.of(1..22, 'X', 'Y')
workflow {
PRINTCHR(chr_ch)
}
上面的进程使用了3个指令:tag, cpus, echo。
tag指令允许为每个进程执行提供自定义标记。此标记使得在日志文件或执行报告中标识特定任务(进程的执行实例)变得更加容易。
第二个指令 cpus 允许您定义每个任务所需的 CPU 数量。
第三个指令 echo true 将标准输出打印到终端。
我们可以使用 task.cpus 变量来捕获分配给任务的 CPU 数量。
还有一个常用的指令是memory。
完整的指令集在这。
Nextflow 独立地管理中间结果。
进程创建的文件存储在特定任务的临时工作目录中。通常这是在work目录下,可以在完成后删除。
如果我们想要在 result/quant输出目录中捕获 QUANT 进程的结果,我们需要在output中定义文件,并在 PublishDir 指令中指定结果目录的位置:
//process_publishDir.nf
nextflow.enable.dsl=2
process QUANT {
publishDir "results/quant"
input:
tuple val(sample_id), path(reads)
each index
output:
tuple val(sample_id), path("${sample_id}_salmon_output")
script:
"""
salmon quant -i $index \
-l A \
-1 ${reads[0]} \
-2 ${reads[1]} \
-o ${sample_id}_salmon_output
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
index_ch = Channel.fromPath('data/yeast/salmon_index')
workflow {
QUANT(reads_ch, index_ch)
QUANT.out.view()
}
publishDir指令可以使用可选参数,例如 mode 参数可以使用值“copy”来指定希望将文件复制到输出目录,而不仅仅是一个指向工作目录中文件的符号链接。由于工作目录通常在管道完成时删除,因此最安全的方式是使用结果文件的“copy”模式。
可以使用多个 publishDir 在不同的目录中保存不同的输出。若要指定在哪个输出目录中放哪些文件,使用参数pattern和通配符模式,后者从整个输出文件集中选择要发布的文件。
例如:
//process_publishDir_semantic.nf
nextflow.enable.dsl=2
process QUANT {
publishDir "results/bams", pattern: "*.bam", mode: "copy"
publishDir "results/quant", pattern: "${sample_id}_salmon_output", mode: "copy"
input:
tuple val(sample_id), path(reads)
path index
output:
tuple val(sample_id), path("${sample_id}.bam")
path "${sample_id}_salmon_output"
script:
"""
salmon quant -i $index \
-l A \
-1 ${reads[0]} \
-2 ${reads[1]} \
-o ${sample_id}_salmon_output \
--writeMappings | samtools sort | samtools view -bS -o ${sample_id}.bam
"""
}
reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
index_ch = Channel.fromPath('data/yeast/salmon_index')
workflow {
QUANT(reads_ch, index_ch)
}