• 6 Processes 下


    输出(Outputs)

    我们已经了解了如何将数据输入到进程中; 现在我们将了解如何从进程中输出文件和值。
    output 声明块允许我们定义进程用于发送生成的文件和值的通道。

    输出值

    与输入类似,输出数据的类型也是使用类型限定符定义的。

    val 限定符允许我们输出脚本中定义的值。

    因为 Nextflow 进程只能通过通道进行通信,所以如果我们希望将一个值输入共享到一个进程中,作为另一个进程的输入,我们需要在输出声明块中定义该值,如下例所示:

    //process_output_value.nf
    nextflow.enable.dsl=2
    
    process METHOD {
      input:
      val x
    
      output:
      val x
    
      script:
      """
      echo $x > method.txt
      """
    }
    // Both 'Channel' and 'channel' keywords work to generate channels.
    // However, it is a good practice to be consistent through the whole pipeline development
    methods_ch = channel.of('salmon', 'kallisto')
    
    workflow {
      METHOD(methods_ch)
      // use the view operator to display contents of the channel
      METHOD.out.view({ "Received: $it" })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    输出文件

    如果我们想要捕获一个文件而不是一个值作为输出,我们可以使用path限定符,它可以通过指定的通道捕获进程生成的一个或多个文件。

    //process_output_file.nf
    nextflow.enable.dsl=2
    
    methods_ch = channel.of('salmon', 'kallisto')
    
    process METHOD {
      input:
      val x
    
      output:
      path 'method.txt'
    
      """
      echo $x > method.txt
      """
    }
    
    workflow {
      METHOD(methods_ch)
      // use the view operator to display contents of the channel
      METHOD.out.view({ "Received: $it" })
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    输出如下:

    executor >  local (2)
    [88/f82057] process > METHOD (2) [100%] 2 of 2 ✔
    Received: /mnt/dev/likai/nextflow/work/ce/9716cf14d9d1e975619f9c2abe13c0/method.txt
    Received: /mnt/dev/likai/nextflow/work/88/f82057da2faf4ed364b65ad3eee9ba/method.txt
    
    • 1
    • 2
    • 3
    • 4

    在上面的例子中,在包含方法名的工作目录中创建了一个名为 method.txt 的文件。
    当任务完成时,该文件将通过输出通道发送。下游的operator,例如声明与输入相同通道的.viewprocess 将会接收它。

    多个输出文件

    当输出文件名包含 * 或?,它被解释为模式匹配。这允许我们将多个文件捕获到一个列表中,并将它们作为一个项目通道输出。

    //process_output_multiple.nf
    nextflow.enable.dsl=2
    
    process FASTQC {
      input:
      path read
    
      output:
      path "fqc_res/*"
    
      script:
      """
      mkdir fqc_res
      fastqc $read -o fqc_res
      """
    }
    
    read_ch = channel.fromPath("data/yeast/reads/ref1*.fq.gz")
    
    workflow {
      FASTQC(read_ch)
      FASTQC.out.view()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    对于匹配模式有一些注意事项:

    • 输入文件不包括在可能匹配的列表中。
    • Glob模式匹配文件和目录路径。
    • 当使用双星模式 * * 递归遍历子目录时,只匹配文件路径,即结果列表中不包含目录。

    到目前为止,我们已经了解了如何声明多个输入和输出通道,但是每个通道一次只处理一个值。 Nextflow 可以使用tuple限定符处理组。
    在元组中,第一项是分组键,第二项是列表。[group_key,[file1,file2,...]].
    当使用包含元组的通道时,例如.filesFromPairs工厂方法,必须使用tuple限定符声明相应的输入声明,然后定义元组中的每个项。

    //process_tuple_input.nf
    nextflow.enable.dsl=2
    
    process TUPLEINPUT{
      input:
      tuple val(sample_id), path(reads)
      
      script:
      """
      echo $sample_id
      echo $reads
      """
    }
    
    reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
    
    workflow {
      TUPLEINPUT(reads_ch)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    以同样的方式,可以使用tuple限定符声明包含元组值的输出通道,然后使用元组中每个元组元素的定义。

    //process_tuple_io_fastp.nf
    nextflow.enable.dsl=2
    
    process FASTP {
      input:
      tuple val(sample_id), path(reads)
      
      output:
      tuple val(sample_id), path("*FP*.fq.gz")
      
      script:
      """
      fastp \
       -i ${reads[0]} \
       -I ${reads[1]} \
       -o ${sample_id}_FP_R1.fq.gz \
       -O ${sample_id}_FP_R2.fq.gz
      """
    }
    
    reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
    
    workflow {
      FASTP(reads_ch)
      FASTP.out.view()
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    输出如下:

    [ref1, [work/9b/fcca75db83718a15f7a95caabfbc15/ref1_FP_R1.fq.gz, work/9b/fcca75db83718a15f7a95caabfbc15/ref1_FP_R2.fq.gz]]

    条件语句

    when声明根据各种输入和参数的状态启用/禁用流程执行。例如:

    //process_when.nf
    nextflow.enable.dsl=2
    
    process CONDITIONAL {
      input:
      val chr
    
      when:
      chr <= 5
    
      script:
      """
      echo $chr
      """
    }
    
    chr_ch = channel.of(1..22)
    
    workflow {
      CONDITIONAL(chr_ch)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    指令(Directives)

    指令声明允许定义可选设置,如 CPU 数量和内存量,这些设置影响当前进程的执行,而不影响任务本身。

    指令必须在进程体的最上方,在任何其他的声明块之前(如:input,output等)。

    注意: 为指令赋值时不使用 = 。

    指令通常用于定义要使用的计算资源量或用于配置日志记录。例如:

    //process_directive.nf
    nextflow.enable.dsl=2
    
    process PRINTCHR {
      tag "tagging with chr$chr"
      cpus 1
      echo true
    
      input:
      val chr
    
      script:
      """
      echo processing chromosome: $chr
      echo number of cpus $task.cpus
      """
    }
    
    chr_ch = channel.of(1..22, 'X', 'Y')
    
    workflow {
      PRINTCHR(chr_ch)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    上面的进程使用了3个指令:tag, cpus, echo

    tag指令允许为每个进程执行提供自定义标记。此标记使得在日志文件或执行报告中标识特定任务(进程的执行实例)变得更加容易。
    第二个指令 cpus 允许您定义每个任务所需的 CPU 数量。
    第三个指令 echo true 将标准输出打印到终端。

    我们可以使用 task.cpus 变量来捕获分配给任务的 CPU 数量。

    还有一个常用的指令是memory

    完整的指令集在这

    组织输出

    PublishDir 指令

    Nextflow 独立地管理中间结果。
    进程创建的文件存储在特定任务的临时工作目录中。通常这是在work目录下,可以在完成后删除。

    如果我们想要在 result/quant输出目录中捕获 QUANT 进程的结果,我们需要在output中定义文件,并在 PublishDir 指令中指定结果目录的位置:

    //process_publishDir.nf
    nextflow.enable.dsl=2
    
    process QUANT {
      publishDir "results/quant"
      
      input:
      tuple val(sample_id), path(reads)
      each index
      
      output:
      tuple val(sample_id), path("${sample_id}_salmon_output")
      
      script:
      """
      salmon quant -i $index \
       -l A \
       -1 ${reads[0]} \
       -2 ${reads[1]} \
       -o ${sample_id}_salmon_output
      """
    }
    
    reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
    index_ch = Channel.fromPath('data/yeast/salmon_index')
    
    workflow {
      QUANT(reads_ch, index_ch)
      QUANT.out.view()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    publishDir指令可以使用可选参数,例如 mode 参数可以使用值“copy”来指定希望将文件复制到输出目录,而不仅仅是一个指向工作目录中文件的符号链接。由于工作目录通常在管道完成时删除,因此最安全的方式是使用结果文件的“copy”模式。

    可以使用多个 publishDir 在不同的目录中保存不同的输出。若要指定在哪个输出目录中放哪些文件,使用参数pattern和通配符模式,后者从整个输出文件集中选择要发布的文件。

    例如:

    //process_publishDir_semantic.nf
    nextflow.enable.dsl=2
    
    process QUANT {
      publishDir "results/bams", pattern: "*.bam", mode: "copy"
      publishDir "results/quant", pattern: "${sample_id}_salmon_output", mode: "copy"
    
      input:
      tuple val(sample_id), path(reads)
      path index
      
      output:
      tuple val(sample_id), path("${sample_id}.bam")
      path "${sample_id}_salmon_output"
      
      script:
      """
      salmon quant -i $index \
       -l A \
       -1 ${reads[0]} \
       -2 ${reads[1]} \
       -o ${sample_id}_salmon_output \
       --writeMappings | samtools sort | samtools view -bS -o ${sample_id}.bam
      """
    }
    
    reads_ch = Channel.fromFilePairs('data/yeast/reads/ref1_{1,2}.fq.gz')
    index_ch = Channel.fromPath('data/yeast/salmon_index')
    
    workflow {
      QUANT(reads_ch, index_ch)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 相关阅读:
    聊一聊前端图片懒加载背后的故事
    git及dbc的学习
    【DKN: Deep Knowledge-Aware Network for News Recommendation】
    通过FXmlFile构建xml时,注意xml规范
    以太坊合并 你需知道的10个问题
    SSM框架整合+配置
    Ajax学习:设置CROS响应头实现跨域(跨域资源共享)
    并发容器介绍(一)
    【洛谷题解】P1036 [NOIP2002 普及组] 选数
    Serverless Devs 进入 CNCF 沙箱,成首个入选的 Serverless 工具项目
  • 原文地址:https://blog.csdn.net/sinat_20791575/article/details/125393644