.calva | |||||
target | |||||
pom.xml |
# Merge RNA-Seq Expression | |||||
Merge RNA-Seq expression tables and generating multiqc report. | |||||
## Arguments | |||||
| Arguments | Description | Required | | |||||
| -------------- | ---------------------------------------------------------------------- | -------- | | |||||
| filepath | A directory which contains a series of files from the RNA-Seq Pipeline | true | | |||||
| enable_multiqc | Enable to generate multiqc report? | false | |
(defproject tservice/merge-rnaseq-expression "v0.1.0" | |||||
:min-lein-version "2.5.0" | |||||
:deployable true | |||||
:dependencies | |||||
[[org.clojure/data.csv "1.0.0"] | |||||
[org.clojure/tools.logging "1.1.0"] | |||||
[org.clojure/core.async "0.4.500" | |||||
:exclusions [org.clojure/tools.reader]] | |||||
[clj-commons/clj-yaml "0.7.0"]] | |||||
:profiles | |||||
{:provided | |||||
{:dependencies | |||||
[[org.clojure/clojure "1.10.1"] | |||||
[org.clojars.yjcyxky/tservice "0.5.6"]]} | |||||
:uberjar | |||||
{:auto-clean true | |||||
:aot :all | |||||
:omit-source true | |||||
:javac-options ["-target" "1.8", "-source" "1.8"] | |||||
:target-path "target/%s" | |||||
:resource-paths ["resources"] | |||||
:uberjar-name "merge-rnaseq-expression.tservice-plugin.jar"}}) |
multiqc==1.11 |
info: | |||||
name: Merge Expression Table for RNA-Seq | |||||
version: v0.1.0 | |||||
description: Merge expression table for rna-seq. | |||||
category: Tool | |||||
home: https://github.com/clinico-omics/tservice-plugins | |||||
source: PGx | |||||
short_name: merge-rnaseq-exp | |||||
icons: | |||||
- src: "" | |||||
type: image/png | |||||
sizes: 192x192 | |||||
author: Jingcheng Yang | |||||
plugin: | |||||
name: merge-rnaseq-expression | |||||
display-name: Merge RNASeq Expression | |||||
lazy-load: false | |||||
init: | |||||
# Unpack environment file to the directory, repository/envs/merge-rnaseq-expression | |||||
# - step: unpack-env | |||||
# envname: merge-rnaseq-expression | |||||
# postunpack: clone-env /opt/local/merge-rnaseq-expression {{ ENV_DEST_DIR }} | |||||
- step: load-namespace | |||||
namespace: tservice.plugins.merge-rnaseq-expression | |||||
- step: register-plugin | |||||
entrypoint: tservice.plugins.merge-rnaseq-expression/metadata | |||||
- step: init-event | |||||
entrypoint: tservice.plugins.merge-rnaseq-expression/events-init |
(ns tservice.plugins.merge-rnaseq-expression | |||||
(:require [clojure.data.json :as json] | |||||
[clojure.spec.alpha :as s] | |||||
[clojure.tools.logging :as log] | |||||
[spec-tools.core :as st] | |||||
[clj-yaml.core :as yaml] | |||||
[tservice.lib.files :as ff] | |||||
[tservice.api.storage.fs :as fs-api] | |||||
[tservice.lib.fs :as fs-lib] | |||||
[tservice.vendor.multiqc :as mq] | |||||
[tservice.util :as u] | |||||
[clojure.java.shell :as shell :refer [sh]] | |||||
[tservice.api.task :refer [publish-event! make-plugin-metadata make-events-init create-task! update-process!]] | |||||
[tservice.plugins.merge-rnaseq-expression.merge-exp :as me] | |||||
[tservice.plugins.merge-rnaseq-expression.commons :refer [add-env-to-path correct-filepath]])) | |||||
(set! *warn-on-reflection* true) | |||||
;;; ------------------------------------------------ Event Specs ------------------------------------------------ | |||||
(s/def ::enable_multiqc | |||||
(st/spec | |||||
{:spec boolean? | |||||
:type :boolean | |||||
:description "Whether enable to generate multiqc report." | |||||
:swagger/default true | |||||
:reason "Only support bollean."})) | |||||
(s/def ::excludes | |||||
(st/spec | |||||
{:spec vector? | |||||
:type :vector | |||||
:description "A collection of exclude files." | |||||
:swagger/default [] | |||||
:reason "The excludes must be an array."})) | |||||
(s/def ::filepath | |||||
(st/spec | |||||
{:spec (s/and string? #(some? (re-matches #"^[a-zA-Z0-9]+:\/\/(\/|\.\/)[a-zA-Z0-9_]+.*" %))) | |||||
:type :string | |||||
:description "File path for covertor." | |||||
:swagger/default nil | |||||
:reason "The filepath must be string."})) | |||||
(def merge-exp-params-body | |||||
"A spec for the body parameters." | |||||
(s/keys :req-un [::filepath] | |||||
:opt-un [::excludes ::enable_multiqc])) | |||||
;;; ------------------------------------------------ Event Metadata ------------------------------------------------- | |||||
(def metadata | |||||
(make-plugin-metadata | |||||
{:name "merge-rnaseq-exp" | |||||
:summary "Merge expression table for rna-seq." | |||||
:params-schema merge-exp-params-body | |||||
:handler (fn [{:keys [filepath excludes enable_multiqc] :as payload}] | |||||
(let [workdir (ff/get-workdir) | |||||
from-path (correct-filepath filepath) | |||||
report-file (fs-lib/join-paths workdir "multiqc_report.html") | |||||
log-path (fs-lib/join-paths workdir "log") | |||||
response {:files [report-file | |||||
(fs-lib/join-paths workdir "fpkm.csv") | |||||
(fs-lib/join-paths workdir "count.csv")] | |||||
:log log-path | |||||
:response-type :data2files} | |||||
task-id (create-task! {:name (str "merge-rnaseq-expression" (u/datetime)) | |||||
:description "Merge Expression Table for RNA-Seq." | |||||
:payload payload | |||||
:plugin-name "syncdata" | |||||
:plugin-type "ToolPlugin" | |||||
:plugin-version "v0.1.0" | |||||
:response response})] | |||||
(fs-lib/create-directories! workdir) | |||||
; Launch the mergeexp | |||||
(spit log-path (json/write-str {:status "Running" :msg ""})) | |||||
(update-process! task-id 0) | |||||
(publish-event! "merge_rnaseq_expression" | |||||
{:data-dir from-path | |||||
:excludes excludes | |||||
:enable-multiqc enable_multiqc | |||||
:dest-dir workdir | |||||
:task-id task-id}) | |||||
response)) | |||||
:plugin-type :ToolPlugin | |||||
:response-type :data2files})) | |||||
;;; ------------------------------------------------ Event Processing ------------------------------------------------ | |||||
(defn- decompression-tar | |||||
[filepath] | |||||
(shell/with-sh-env {:PATH (add-env-to-path "merge-rnaseq-expression") | |||||
:LC_ALL "en_US.utf-8" | |||||
:LANG "en_US.utf-8"} | |||||
(let [command ["bash" "-c" | |||||
(format "tar -xvf %s -C %s" filepath (fs-lib/parent-path filepath))] | |||||
result (apply sh command) | |||||
status (if (= (:exit result) 0) "Success" "Error") | |||||
msg (str (:out result) "\n" (:err result))] | |||||
{:status status | |||||
:msg msg}))) | |||||
(defn- merge-exp! | |||||
[data-dir patterns dest-dir output-file] | |||||
(let [files (fs-api/batch-filter-files data-dir patterns)] | |||||
(log/info "Merge these files: " files) | |||||
(fs-lib/create-directories! dest-dir) | |||||
(log/info "Merge gene experiment files from ballgown directory to a experiment table: " dest-dir output-file) | |||||
(fs-api/copy-files! files dest-dir {:replace-existing true}) | |||||
(me/merge-exp-files! (fs-api/list-files dest-dir {:mode "file"}) output-file))) | |||||
(defn gen-multiqc-config | |||||
[config-file] | |||||
(let [config (yaml/generate-string | |||||
{:run_modules ["fastqc" "fastq_screen" "qualimap"]} | |||||
:dumper-options {:flow-style :block})] | |||||
(spit config-file config))) | |||||
(defn- merge-exp-event! | |||||
[{:keys [data-dir dest-dir excludes enable-multiqc task-id]}] | |||||
;; TODO: filter excludes? | |||||
(let [log-path (fs-lib/join-paths dest-dir "log")] | |||||
(try | |||||
;; Merge ballgown files | |||||
(merge-exp! data-dir | |||||
[".*call-ballgown/.*.txt"] | |||||
(fs-lib/join-paths dest-dir "ballgown") | |||||
(fs-lib/join-paths dest-dir "fpkm.csv")) | |||||
(update-process! task-id 30) | |||||
;; Merge count files | |||||
(merge-exp! data-dir | |||||
[".*call-count/.*_gene_count_matrix.csv"] | |||||
(fs-lib/join-paths dest-dir "count") | |||||
(fs-lib/join-paths dest-dir "count.csv")) | |||||
(update-process! task-id 60) | |||||
(if enable-multiqc | |||||
(let [files (fs-api/batch-filter-files data-dir [".*call-fastqc/.*.zip" | |||||
".*call-fastqscreen/.*screen.txt" | |||||
".*call-qualimap/.*bamqc" | |||||
".*call-qualimap/.*bamqc_qualimap.tar.gz" | |||||
".*call-qualimap/.*RNAseq" | |||||
".*call-qualimap/.*RNAseq_qualimap.tar.gz"]) | |||||
multiqc-dir (fs-lib/join-paths dest-dir "multiqc") | |||||
config (fs-lib/join-paths dest-dir "multiqc_report.yaml")] | |||||
(fs-lib/create-directories! multiqc-dir) | |||||
(fs-api/copy-files! files multiqc-dir {:replace-existing true}) | |||||
(gen-multiqc-config config) | |||||
(doseq [file (fs-api/batch-filter-files multiqc-dir [".*bamqc_qualimap.tar.gz" ".*RNAseq_qualimap.tar.gz"])] | |||||
(decompression-tar file)) | |||||
(let [multiqc-result (mq/multiqc multiqc-dir dest-dir {:title "MultiQC Report" :template "default" :config config}) | |||||
result {:status (:status multiqc-result) | |||||
:msg (:msg multiqc-result)} | |||||
log (json/write-str result)] | |||||
(log/info "Status: " result) | |||||
(spit log-path log))) | |||||
(spit log-path (json/write-str {:status "Success" :msg ""}))) | |||||
(update-process! task-id 100) | |||||
(catch Exception e | |||||
(update-process! task-id -1) | |||||
(let [log (json/write-str {:status "Error" :msg (.toString e)})] | |||||
(log/info "Status: " log) | |||||
(spit log-path log)))))) | |||||
;;; --------------------------------------------------- Lifecycle ---------------------------------------------------- | |||||
(def events-init | |||||
"Automatically called during startup; start event listener for merge_rnaseq_expression events." | |||||
(make-events-init "merge_rnaseq_expression" merge-exp-event!)) |
(ns tservice.plugins.merge-rnaseq-expression.commons | |||||
(:require [clojure.data.csv :as csv] | |||||
[clojure.string :as clj-str] | |||||
[tservice.lib.files :refer [get-tservice-workdir get-plugin-jar-dir get-path-variable]] | |||||
[tservice.lib.fs :as fs-lib] | |||||
[clojure.java.io :as io] | |||||
[clojure.java.shell :as shell :refer [sh]]) | |||||
(:import [org.apache.commons.io.input BOMInputStream])) | |||||
(defn add-env-to-path | |||||
[plugin-name] | |||||
(let [env-bin-path (fs-lib/join-paths (get-plugin-jar-dir) | |||||
"envs" plugin-name "bin") | |||||
path (get-path-variable)] | |||||
(str env-bin-path ":" path))) | |||||
(defn hashmap->parameters | |||||
"{ '-d' 'true' '-o' 'output' } -> '-d true -o output'" | |||||
[coll] | |||||
(clj-str/join " " (map #(clj-str/join " " %) (into [] coll)))) | |||||
(defn call-command! | |||||
[cmd parameters-coll] | |||||
(shell/with-sh-env {:PATH (add-env-to-path "merge-rnaseq-expression") | |||||
:LC_ALL "en_US.utf-8" | |||||
:LANG "en_US.utf-8"} | |||||
(let [command ["bash" "-c" (format "%s %s" cmd (hashmap->parameters parameters-coll))] | |||||
result (apply sh command) | |||||
status (if (= (:exit result) 0) "Success" "Error") | |||||
msg (str (:out result) "\n" (:err result))] | |||||
{:status status | |||||
:msg msg}))) | |||||
(defn csv-data->maps [csv-data] | |||||
(map zipmap | |||||
(->> (first csv-data) ;; First row is the header | |||||
(map keyword) ;; Drop if you want string keys instead | |||||
repeat) | |||||
(rest csv-data))) | |||||
(defn bom-reader | |||||
"Remove `Byte Order Mark` and return reader" | |||||
[filepath] | |||||
(-> filepath | |||||
io/input-stream | |||||
BOMInputStream. | |||||
io/reader)) | |||||
(defn guess-separator | |||||
[filepath] | |||||
(with-open [reader (bom-reader filepath)] | |||||
(let [header (first (line-seq reader)) | |||||
seps [\tab \, \; \space] | |||||
sep-map (->> (map #(hash-map % (count (clj-str/split header (re-pattern (str %))))) seps) | |||||
(into {}))] | |||||
(key (apply max-key val sep-map))))) | |||||
(defn read-csv | |||||
[^String file] | |||||
(when (.isFile (io/file file)) | |||||
(with-open | |||||
[reader (io/reader file)] | |||||
(doall | |||||
(->> (csv/read-csv reader :separator (guess-separator file)) | |||||
csv-data->maps))))) | |||||
(defn vec-remove | |||||
"Remove elem in coll" | |||||
[pos coll] | |||||
(vec (concat (subvec coll 0 pos) (subvec coll (inc pos))))) | |||||
(defn write-csv! | |||||
"Write row-data to a csv file, row-data is a vector that each element is a map." | |||||
[path row-data] | |||||
(let [columns (keys (first row-data)) | |||||
headers (map name columns) | |||||
rows (mapv #(mapv % columns) row-data)] | |||||
(with-open [file (io/writer path)] | |||||
(csv/write-csv file (cons headers rows) :separator \tab)))) | |||||
(defn write-csv-by-cols! [path row-data columns] | |||||
(let [headers (map name columns) | |||||
rows (mapv #(mapv % columns) row-data)] | |||||
(with-open [file (io/writer path)] | |||||
(csv/write-csv file (cons headers rows))))) | |||||
(defn is-localpath? | |||||
[filepath] | |||||
(re-matches #"^file:\/\/.*" filepath)) | |||||
(defn correct-filepath | |||||
[filepath] | |||||
(if (is-localpath? filepath) | |||||
(if (re-matches #"^file:\/\/\/.*" filepath) | |||||
; Absolute path with file:// | |||||
(clj-str/replace filepath #"^file:\/\/" "") | |||||
(fs-lib/join-paths (get-tservice-workdir) (clj-str/replace filepath #"^file:\/\/" ""))) | |||||
filepath)) |
(ns tservice.plugins.merge-rnaseq-expression.merge-exp | |||||
(:require [tservice.plugins.merge-rnaseq-expression.commons :refer | |||||
[read-csv write-csv! vec-remove write-csv-by-cols!]] | |||||
[clojure.java.io :as io])) | |||||
(defn read-header [^String file] | |||||
(let [data (read-csv file)] | |||||
(keys (first data)))) | |||||
(defn- sort-by-val [s] (sort-by val s)) | |||||
(defn- first-elements [pairs] (map #(get % 0) pairs)) | |||||
(defn most-frequent-n | |||||
"return the most common n items, e.g. | |||||
(most-frequent-n 2 [:a :b :a :d :x :b :c :d :d :b :d :b]) => | |||||
=> (:d :b)" | |||||
[n items] | |||||
(take n (-> | |||||
items ; [:a :b :a :d :x :b :c :d :d :b :d :b] | |||||
frequencies ; {:a 2, :b 4, :d 4, :x 1, :c 1} | |||||
seq ; ([:a 2] [:b 4] [:d 4] [:x 1] [:c 1]) | |||||
sort-by-val ; ([:x 1] [:c 1] [:a 2] [:b 4] [:d 4]) | |||||
reverse ; ([:d 4] [:b 4] [:a 2] [:c 1] [:x 1]) | |||||
first-elements))) ; (:d :b :a :c :x) | |||||
(defn guess-id | |||||
[files] | |||||
(->> (map read-header files) ; [{:GENE_ID :SAMPLE1} {:GENE_ID :SAMPLE2}] | |||||
(flatten) ; [:GENE_ID :SAMPLE1 :GENE_ID :SAMPLE2] | |||||
(most-frequent-n 1) ; [:GENE_ID] | |||||
(first))) ; :GENE_ID] | |||||
(def id ^:private (atom :GENE_ID)) | |||||
(defn setup-id! | |||||
[^clojure.lang.Keyword v] | |||||
(reset! id v)) | |||||
(defn sort-exp-data | |||||
[coll] | |||||
(sort-by @id coll)) | |||||
(defn read-csvs | |||||
[files] | |||||
(map #(sort-exp-data (read-csv %)) files)) | |||||
(defn indices-of [f coll] | |||||
(keep-indexed #(if (f %2) %1 nil) coll)) | |||||
(defn first-index-of [f coll] | |||||
(first (indices-of f coll))) | |||||
(defn find-index [coll id] | |||||
(first-index-of #(= % id) coll)) | |||||
(defn reorder | |||||
[data] | |||||
(let [cols (vec (sort (keys (first data))))] | |||||
(cons @id (vec-remove (find-index cols @id) cols)))) | |||||
(defn merge-exp | |||||
"Examples: | |||||
[[{:GENE_ID 'XXX0' :YYY0 1.2} {:GENE_ID 'XXX1' :YYY1 1.3}] | |||||
[{:GENE_ID 'XXX0' :YYY2 1.2} {:GENE_ID 'XXX1' :YYY3 1.3}]]" | |||||
[all-exp-data] | |||||
(apply map merge all-exp-data)) | |||||
(defn write-csv-by-ordered-cols! | |||||
[path row-data] | |||||
(let [cols (reorder row-data)] | |||||
(write-csv-by-cols! path row-data cols))) | |||||
(defn copy-file [^String source-path ^String dest-path] | |||||
(io/copy (io/file source-path) (io/file dest-path))) | |||||
(defn merge-exp-files! | |||||
"Assumption: all files have the same id column, no matter what order." | |||||
[files path] | |||||
(if (= (count files) 1) | |||||
(copy-file (first files) path) | |||||
(let [id (guess-id (take 2 files))] | |||||
(setup-id! id) | |||||
(->> (read-csvs files) | |||||
(merge-exp) | |||||
(write-csv-by-ordered-cols! path))))) |