使用snakemake构建分析流程
介绍使用snakemake分析数据流程。
最近又要对原始数据进行分析,以前都是一个步骤建一个文件夹或者把所有步骤放在整个脚本中分析数据,现在想试试snakemake。
1. 安装
conda install bioconda::snakemake-minimal
2. 示例测试
通过创建虚假的文件来熟悉一下该工具的使用,示例:对Tumor/Normal匹配的两个样本,经过DNA双端测序得到的fastq文件,比对到参考基因组的fasta文件上,输出bam文件(示例仅传达这个意思,并没有真实的进行比对)。
- 构建虚假的数据:
# Create a toy folder where we will run our commands:
mkdir toy-example
cd toy-example
# Make a fake reference genome
touch genome.fa
# Make fake fastq data
mkdir fastq
touch fastq/tumor.R1.fastq.gz fastq/tumor.R2.fastq.gz
touch fastq/normal.R1.fastq.gz fastq/normal.R2.fastq.gz
- 创建Snakefile文件
创建Snakefile文件,文件内容如下:
SAMPLES = ['tumor', 'normal']
rule all:
input:
expand('{sample}.bam', sample=SAMPLES)
rule bwa:
input:
genome = 'genome.fa',
r1 = 'fastq/{sample}.R1.fastq.gz',
r2 = 'fastq/{sample}.R2.fastq.gz'
output:
'{sample}.bam'
shell:
'echo {input.genome} {input.r1} {input.r2} > {output}'
- 逐层理解
SAMPLES = ['tumor', 'normal']
定义了字符串列表SAMPLES
,内容是样本名。
rule all:
input:
expand('{sample}.bam', sample=SAMPLES)
rule all
中的input代表了pipeline的最终输出文件,这里最终输出是两个文件tumor.bam
和normal.bam
,expand()
是特定的函数能够根据SAMPLES的内容把{sample}.bam
扩充为['tumor.txt','normal.txt']
。
rule bwa:
input:
genome = 'genome.fa',
r1 = 'fastq/{sample}.R1.fastq.gz',
r2 = 'fastq/{sample}.R2.fastq.gz'
output:
'{sample}.bam'
shell:
'echo {input.genome} {input.r1} {input.r2} > {output}'
因为设定了tumor.bam
和normal.bam
是最终的输出,需要规则来创建这些文件,当Snakemake识别{sample}.bam
时,会把sample替换为SAMPLES的内容来创建结果。
- 运行Snakemake
直接调用snakemake
来运行流程,它会自动寻找当前目录下名为Snakefile
的文件,也可以通过参数--snakefile
来指定文件:
snakemake --cores 1
Building DAG of jobs...
Using shell: /usr/bin/bash
Provided cores: 1 (use --cores to define parallelism)
Rules claiming more threads will be scaled down.
Job stats:
job count
----- -------
all 1
bwa 2
total 3
Select jobs to execute...
[Tue Jan 9 14:24:04 2024]
rule bwa:
input: genome.fa, fastq/normal.R1.fastq.gz, fastq/normal.R2.fastq.gz
output: normal.bam
jobid: 2
reason: Missing output files: normal.bam
wildcards: sample=normal
resources: tmpdir=/tmp
[Tue Jan 9 14:24:04 2024]
Finished job 2.
1 of 3 steps (33%) done
Select jobs to execute...
[Tue Jan 9 14:24:04 2024]
rule bwa:
input: genome.fa, fastq/tumor.R1.fastq.gz, fastq/tumor.R2.fastq.gz
output: tumor.bam
jobid: 1
reason: Missing output files: tumor.bam
wildcards: sample=tumor
resources: tmpdir=/tmp
[Tue Jan 9 14:24:04 2024]
Finished job 1.
2 of 3 steps (67%) done
Select jobs to execute...
[Tue Jan 9 14:24:04 2024]
localrule all:
input: tumor.bam, normal.bam
jobid: 0
reason: Input files updated by another job: normal.bam, tumor.bam
resources: tmpdir=/tmp
[Tue Jan 9 14:24:04 2024]
Finished job 0.
3 of 3 steps (100%) done
Complete log: .snakemake/log/2024-01-09T142402.506612.snakemake.log
- 检查结果
head Sample?.txt
genome.fa fastq/tumor.R1.fastq.gz fastq/tumor.R2.fastq.gz
genome.fa fastq/normal.R1.fastq.gz fastq/normal.R2.fastq.gz
还可以把流程图形化:
snakemake --forceall --dag | dot -Tpng > dag1.png
3. 功能
代码质量检查
snakemake --lint
在5.11版本之后可以对代码质量检查,强烈推荐在发布任何工作流之前使用linter检查质量。
多线程
通过在Snakefile中加入如下参数设置可以使用多线程,如果没有指定那么默认是单线程:
threads: 8
注意,在执行工作流程时,Snakemake调度器会考虑作业所需的线程数。特别是:确保同时运行的所有作业的线程总和不超过给定的可用CPU核心数。通过参数--cores
给出,这里rule需要8个线程,因此同一时间只能运行一个rule的作业,而 Snakemake 调度器将尝试用其他作业填满剩余的核心。当提供的core少于thread时,规则使用的thread将被减少到提供的core。
在没有超线程技术的情况下,一个CPU核心只能同时执行一个线程。这意味着每个物理核心只能处理一个指令流。在这种情况下,一个CPU核心实际上只有一个线程。
-
–force, -f 参数:不考虑已经创建的输出,强制执行所选目标或第一个rule。(默认值:False)
-
–forceall, -F 参数:不考虑已经创建的输出,强制执行所选(或第一个)rule以及其所有依赖的rule。(默认值:False)
config文件
指定样本时可以通过上述创建Python列表来制定,但为了适应使流程更方便使用新的数据,可以通过创建config文件,该文件可以是JSON
或者YAML
格式,可以通过在Snakefile
文件开头通过参数configfile
直接使用:
configfile: "config.yaml"
Snakemake会载入config文件并把把内容存储在全局的字典中config
,config.yaml
的内容如下:
samples:
A: data/samples/A.fastq
B: data/samples/B.fastq
可以把上述的SAMPLES去掉来优化Snakefile:
rule bcftools_call:
input:
fa="data/genome.fa",
bam=expand("sorted_reads/{sample}.bam", sample=config["samples"]),
bai=expand("sorted_reads/{sample}.bam.bai", sample=config["samples"])
output:
"calls/all.vcf"
shell:
"bcftools mpileup -f {input.fa} {input.bam} | "
"bcftools call -mv - > {output}"
输入函数
上面在config文件中存储了FASTQ文件的路径,还可以通过以下方法来使用路径,先介绍一下Snakemake工作流的三段:
-
initialization段:解析定义工作流的文件并实例化所有rule
-
DAG段:填充通配符并根据输入文件和输出文件匹配
-
scheduling段:根据可用资源启动作业,执行作业的DAG
def get_bwa_map_input_fastqs(wildcards):
return config["samples"][wildcards.sample]
rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
"mapped_reads/{sample}.bam"
threads: 8
shell:
"bwa mem -t {threads} {input} | samtools view -Sb - > {output}"
参数
shell命令可能不止由输入和输出文件组成,存在其他的参数,可能这些参数依赖于通配符的值,Snakemake允许通过参数params
直接定义任何参数,举例:
rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
"mapped_reads/{sample}.bam"
params:
rg=r"@RG\tID:{sample}\tSM:{sample}"
threads: 8
shell:
"bwa mem -R '{params.rg}' -t {threads} {input} | samtools view -Sb - > {output}"
保存log文件
存储每个job的log输出,而不是直接在终端输出,并行调用脚本的时候容易出现混乱。Snakemake允许直接通过参数log
指定log文件,举例:
rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
"mapped_reads/{sample}.bam"
params:
rg=r"@RG\tID:{sample}\tSM:{sample}"
log:
"logs/bwa_mem/{sample}.log"
threads: 8
shell:
"(bwa mem -R '{params.rg}' -t {threads} {input} | "
"samtools view -Sb - > {output}) 2> {log}"
注意,为了整洁最好创建一个log文件夹存储所有的log文件。
临时文件和保护文件
在处理原始数据的过程中,从fastq到最后的变异的提取,中途会产生BAM文件,BAM文件通常很大,可以在output
中标记输出文件是临时文件,当执行完所有需要该临时文件作为输入的命令之后,snakemake将会删除标记的临时文件,示例:
rule bwa_map:
input:
"data/genome.fa",
get_bwa_map_input_fastqs
output:
temp("mapped_reads/{sample}.bam")
params:
rg=r"@RG\tID:{sample}\tSM:{sample}"
log:
"logs/bwa_mem/{sample}.log"
threads: 8
shell:
"(bwa mem -R '{params.rg}' -t {threads} {input} | "
"samtools view -Sb - > {output}) 2> {log}"
另外,由于创建BAM文件耗费了大量的资源,也可以对最终生成的BAM文件进行保护,以防意外的删除或修改,举例:
rule samtools_sort:
input:
"mapped_reads/{sample}.bam"
output:
protected("sorted_reads/{sample}.bam")
shell:
"samtools sort -T sorted_reads/{wildcards.sample} "
"-O bam {input} > {output}"
在成功执行任务之后,Snakemake会对输出文件进行写
保护。
资源消耗测试
使用benchmark指令来自动监控计算rule的资源消耗,示例:
rule bwa_map:
input:
"data/genome.fa",
lambda wildcards: config["samples"][wildcards.sample]
output:
temp("mapped_reads/{sample}.bam")
params:
rg="@RG\tID:{sample}\tSM:{sample}"
log:
"logs/bwa_mem/{sample}.log"
benchmark:
"benchmarks/{sample}.bwa.benchmark.txt"
threads: 8
shell:
"(bwa mem -R '{params.rg}' -t {threads} {input} | "
"samtools view -Sb - > {output}) 2> {log}"
另外可以重复多次进行测试,比如重复3次:repeat("benchmarks/{sample}.bwa.benchmark.txt", 3)
模块化
为了重新调用或者简化大型的工作流,可以把工作流拆分成多个模块(module),提供include
来直接把其他的Snakemake文件导入当前文件:
include: "path/to/other.smk"
主文件是Snakefile,其他的文件是.smk
。另外,Snakmake允许定义子工作流(sub-workflows),子工作流指的是包含完整Snakemake工作流的工作目录,子工作流的输出文件可以用于当前的Snakefile,当运行时,Snakemake确保了子工作流的输出文件是最新的,这种做法在想要重复之前的分析时特别有用。
自动部署软件依赖
为了数据分析的可重复性,可以指定每个rule的环境:
rule samtools_index:
input:
"sorted_reads/{sample}.bam"
output:
"sorted_reads/{sample}.bam.bai"
conda:
"envs/samtools.yaml"
shell:
"samtools index {input}"
定义envs/samtools.yaml
channels:
- bioconda
- conda-forge
dependencies:
- samtools =1.9
运行Snakemake:
snakemake --software-deployment-method conda --cores 1
#or the short form
snakemake --sdm conda -c 1
这样就可以在任务运行前自动地创建需要的环境并激活它们,这样有两个优点:
-
工作流会记录所有使用的工具版本
-
工作流可以在无需管理员权限的系统下重新运行,不需要额外安装除了Snakemake和Miniconda以外的条件
注意:
conda不能和run
结合使用,因为conda需要和周围的Snakemake共享同一个Python环境,
工具包
对于常用工具,可以简化对它们的使用。Snakemake提供了名叫wrapper的仓库,一个wrapper是一段包装命令行应用的代码可以直接在Snakemake里调用,示例:
rule bwa_mem:
input:
ref="data/genome.fa",
sample=lambda wildcards: config["samples"][wildcards.sample]
output:
temp("mapped_reads/{sample}.bam")
log:
"logs/bwa_mem/{sample}.log"
params:
"-R '@RG\tID:{sample}\tSM:{sample}'"
threads: 8
wrapper:
"0.15.3/bio/bwa/mem"
这里的URL第一部分是git版本,调用后,Snakemake会自动下载所需版本的wrapper。另外结合--software-deployment-method conda
执行前会自动部署所需要的工具。
在集群上执行
Snakemake可以在分布式环境中运行,比如集群。如果节点共享公共文件系统,Snakemake提供三种替代执行模式:
- 在集群环境中,任务通常通过shell脚本的方式
qsub
提交到节点上:
snakemake --cluster qsub --jobs 100
每个任务被编译成一个shell脚本,这里通过qsub提交。--jobs
限制了并行提交的作业数量,这个基本的模式假设提交命令在提交完作业后立刻返回。
- 有些集群允许以同步模式运行提交命令,即等待作业执行完毕。在这种情况下,我们可以调用例如:
snakemake --cluster-sync "qsub -sync yes" --jobs 100
指定的提交命令也可以使用从提交作业中获取的其他参数进行修饰。例如,可以使用大括号访问已使用线程的数量,类似于shell命令的格式化,例如。
snakemake --cluster "qsub -pe threaded {threads}" --jobs 100
- 另外,snake也可以使用分布式资源管理应用程序API (DRMAA)。该API提供了一个公共接口来控制各种资源管理系统。DRMAA支持可以通过如下调用snake来激活:
snakemake --drmaa --jobs 100
如果可用,DRMAA比一般集群模式更好,因为它提供了更好的控制和错误处理。其他用于特定集群的参数,可以通过工作流的profile(参考Profiles部分)来补充Snakefile。
-cluster-status
判断集群任务是否成功完成、失败还是仍在继续,--cluster
进行的错误检测可以改进边缘情况,比如超时和超过内存的作业被排队系统静默终止。这可以通过--cluster-status
选项实现。该选项的值应为可执行脚本,该脚本以作业ID作为第一个参数,并仅在标准输出中打印 [running|success|failed] 中的一个。Snakemake接收到的信息不单单是作业ID,想要单独获得作业ID,有三种潜在的解决方案:解析脚本接收的字符串并在脚本中提取作业ID,包装提交工具以拦截其标准输出并仅返回作业代码,或者最理想的情况是,集群可以提供一个选项,以在提交时仅返回作业ID,然后您可以指示Snakemake使用该选项。对于SGE,这看起来像是 snakemake –cluster “qsub -terse”。
以下(简化的)脚本可用于在给定的SLURM集群上检测作业状态(需要版本 >= 14.03.0rc1 以支持 –parsable)。
#!/usr/bin/env python
import subprocess
import sys
jobid = sys.argv[1]
output = str(subprocess.check_output("sacct -j %s --format State --noheader | head -1 | awk '{print $1}'" % jobid, shell=True).strip())
running_status=["PENDING", "CONFIGURING", "COMPLETING", "RUNNING", "SUSPENDED"]
if "COMPLETED" in output:
print("success")
elif any(r in output for r in running_status):
print("running")
else:
print("failed")
要使用此脚本,请调用snakemake类似于以下方式,其中status.py是上述提到的脚本:
snakemake all --jobs 100 --cluster "sbatch --cpus-per-task=1 --parsable" --cluster-status ./status.py
当通过按Ctrl-C终止snakemake时,使用 –drmaa 时,它将取消所有当前运行的节点。您可以通过添加 –cluster-cancel 并传递一个用于按作业ID取消作业的命令(例如,对于SLURM是 scancel,对于SGE是 qdel)来实现相同的行为,即在使用 –cluster 时也会发生这种情况。大多数作业调度程序都可以传递多个作业ID,您可以使用 –cluster-cancel-nargs 来限制参数的数量(默认值是1000,对于大多数调度程序来说是合理的)。
有趣的资源
使用snakemake规则和流程来快速的调用常用的工具,举个例子:
rule samtools_sort:
input:
"mapped/{sample}.bam"
output:
"mapped/{sample}.sorted.bam"
params:
"-m 4G"
threads: 8
wrapper:
"https://github.com/snakemake/snakemake-wrappers/raw/0.2.0/bio/samtools/sort"
目前已有的流程:
包含很多常用的流程
创建可重建的工作流程
我常用的功能
- 多个样本并行提交任务,举例,Snakefile如下:
SAMPLES = ['sample1', 'sample2']
rule all:
input:
expand('{sample}.bam', sample=SAMPLES)
rule bwa:
input:
genome = 'genome.fa',
r1 = 'fastq/{sample}.R1.fastq.gz',
r2 = 'fastq/{sample}.R2.fastq.gz'
output:
'{sample}.bam'
shell:
'echo {input.genome} {input.r1} {input.r2} > {output}'
对每个样本单独提交pbs,可以使用
snakemake --cluster "qsub -q slst_fat " --jobs 2
- 如果不想用snakemake的这个功能,想单独对每个样本生成Snakemake,可以使用如下方法:
-
file.txt:存储所有的样本名
-
run.py:对每个样本单独构建文件夹,每个文件夹内针对该样本生成Snakemake文件,之后可以批量提交
## bash run.sh
python3 run.py path_to_input path_to_output path_to_script
## run.py
import os
import argparse
# 创建 ArgumentParser 对象
parser = argparse.ArgumentParser(description='Description of your script.')
# 添加参数
parser.add_argument('path2sample', type=str, help='Path to input sample of fastq')
parser.add_argument('path2out', type=str, help='Path to output dir')
parser.add_argument('path2trim', type=str, help='Path to trim script')
# 解析命令行参数
args = parser.parse_args()
# 在脚本中使用参数
print(f"Path to sample: {args.path2sample}")
print(f"Path to file of output: {args.path2out}")
# 读取存储样本名的文件
with open('file.txt', 'r') as file:
samples = [line.strip() for line in file]
print(samples)
# 为每个样本生成文件夹和Snakefile
for sample in samples:
# 创建文件夹
print(sample)
os.makedirs(sample, exist_ok=True)
# 生成Snakefile内容
snakefile_content = f"""
from snakemake.utils import R
import glob
import os
from os import listdir
from os.path import isfile, isdir, join
import os.path as path
import sys
# 从配置文件中获取分析路径,并连接到 'data' 目录
data="{args.path2sample}"
os.chdir(data)
rule all:
input:
'{args.path2out}/{sample}_R1_trimmed.fastq',
'{args.path2out}/{sample}_R2_trimmed.fastq',
'{args.path2out}/{sample}_R1_unpaired.fastq',
'{args.path2out}/{sample}_R2_unpaired.fastq'
rule trimmo:
input:
R1 = '{sample}_1.fastq',
R2 = '{sample}_2.fastq'
output:
R1 = '{args.path2out}/{sample}_R1_trimmed.fastq',
R2 = '{args.path2out}/{sample}_R2_trimmed.fastq',
unpaired_R1 = '{args.path2out}/{sample}_R1_unpaired.fastq',
unpaired_R2 = '{args.path2out}/{sample}_R2_unpaired.fastq',
logname = '{args.path2out}/logs/{sample}_run_trimmomatic.err'
shell:
'trimmomatic PE -threads 50 -phred33 {{input.R1}} {{input.R2}} {{output.R1}} {{output.unpaired_R1}} {{output.R2}} {{output.unpaired_R2}} ILLUMINACLIP:test:2:36:10 LEADING:10 TRAILING:10 MAXINFO:50:0.97 MINLEN:20 2>{{output.logname}}'
"""
# 将Snakefile内容写入文件
with open(f'{sample}/Snakefile', 'w') as snakefile:
snakefile.write(snakefile_content)
## file.txt
# all sample
sample1
sample2
sample3