-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild.R
191 lines (160 loc) · 7.15 KB
/
build.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#' FCS header scraping and FlowRepository project metadata collection script
#'
#' This script takes approximately ~4-5 hours to run in its entirety and at TOW (2019-01-27)
#' collects information for about 700 datasets.
library(FlowRepositoryR)
library(tidyverse)
library(dbplyr)
library(logging)
basicConfig()
# Number of bytes to read for each FCS file (this should be large enough to capture
# the header and text segments without slowing down processing too much -- 32K seems good)
FCS_BYTE_OFFSET <- 32768
# Maximum number of files to collect metadata for for a single dataset -- if a dataset
# has more than this many, than they will be randomly sampled down to this number wich
# is generally more than enough to get a sense of what parameters are present across all files
FCS_MAX_FILES <- 30
get_metadata <- function(dataset_id){
data_set <- suppressWarnings(flowRep.get(dataset_id))
res <- list()
res$exp_id <- data_set@id[[1]]
res$exp_name <- data_set@name[[1]]
# Function to pull and concatenate string attributes and ensure
# they are not empty vectors
get_str <- function(prop) {
v <- attr(data_set, prop) %>% str_trim %>% str_c(collapse='|')
if (length(v) > 0) v else NA_character_
}
# Extract information about researchers and related attachments, all
# of which could be empty, one, or multiple values
res$investigators <- get_str('primary.investigator')
res$researchers <- get_str('primary.researcher')
res$dates <- get_str('experiment.dates')
res$keywords <- unlist(data_set@keywords)
# Avoid failing whole experiment parsing due to any potential attachment processing errors
tryCatch({
res$attachments <- data_set@attachments %>% map_chr(~.@name) %>% str_trim %>% str_c(collapse='|')
res$attachments <- if (length(res$attachments) > 0) res$attachments else NA_character_
}, error = function(e) {
res$attachments <- '[Parser Error]'
})
# Limit the number of fcs files to process randomly, if necessary,
# and record the count before and after applying this restriction
fcs_files <- [email protected]
if (length(fcs_files) > FCS_MAX_FILES)
fcs_files <- fcs_files %>% base::sample(FCS_MAX_FILES)
res$n_fcs_files_total <- length([email protected])
res$n_fcs_files_parse <- length(fcs_files)
# Process each fcs file entry separately where the focus is on extracting
# channel names (but other file-global data is also collected)
res$fcs <- map(fcs_files, function(fcs_file){
# Read the first FCS_BYTE_OFFSET bytes into an array and then pass this array
# to flowCore internal functions as if it was a standard file connection
url_con <- url(fcs_file@url)
open(url_con, "rb")
con <- rawConnection(readBin(url_con, 'raw', FCS_BYTE_OFFSET))
# Parse header and text segments (note that emptyValue corresponds to how
# ambiguous double delimiters are treated and per the flowCore recommendations,
# both are tried in the event of a failure)
header <- flowCore:::readFCSheader(con)
txt <- tryCatch(
flowCore:::readFCStext(con, header, emptyValue=T),
error = function(e) flowCore:::readFCStext(con, header, emptyValue=F)
)
# Extract file-level metadata
r <- list()
r$exp_id <- data_set@id[[1]]
r$version <- [email protected]
r$filename <- fcs_file@name
r$size <- fcs_file@size
r$creator <- flowCore:::readFCSgetPar(txt, 'CREATOR', strict=F) %>% unname
r$n_params <- as.integer(flowCore:::readFCSgetPar(txt, "$PAR"))
# Extract non-scalar per-channel data
r$param_channels <- flowCore:::readFCSgetPar(txt, paste("$P", 1:r$n_params, "N", sep="")) %>% str_trim
r$param_names <- flowCore:::readFCSgetPar(txt, paste("$P", 1:r$n_params, "S", sep=""), strict=F) %>% str_trim
close(url_con)
close(con)
r
})
res
}
##############
# Processing #
##############
process_datasets <- function(dataset_ids, max_failures=50){
data <- list()
failures <- 0
n <- length(dataset_ids)
for (i in 1:n) {
dataset_id <- dataset_ids[i]
loginfo(str_glue('Processing dataset {dataset_id} ({i} of {n} | #failures = {failures})'))
data[[dataset_id]] <- tryCatch(
get_metadata(dataset_id),
error = function(e){
failures <<- failures + 1
if (failures > max_failures)
stop(str_glue('Number of dataset processing failures ({failures}) exceeds maximum ({max_failures})'))
tryCatch({
# logwarn (or loginfo) fail if the message string is too long
msg <- str_sub(e$message, end=1000)
logwarn(str_glue('A failure occurred processing dataset {dataset_id}. Error message: {msg}"'))
}, error=function(e) {
logwarn(str_glue('A failure occurred processing dataset {dataset_id}"'))
})
list()
}
)
}
loginfo(str_glue('Processing complete. Number of failed datasets = {failures} (of {n})', n=length(dataset_ids)))
data
}
# Load list of all current dataset ids
data_sets <- flowRep.ls()
# Process all and return results as nested lists, where top level is named by dataset id
data_raw <- process_datasets(data_sets)
data_raw <- data
# Save parsed results to file immediately in case of downstream crash
saveRDS(data_raw, file = "data/build-data.rds")
# Remove list entries for failed datasets (returned as empty lists)
failed_data_set_ids <- data_raw %>% map(length) %>% unlist %>% keep(~. == 0) %>% names
loginfo(str_glue(
'Removing the following {n} failed data set ids from results: {ids}',
n=length(failed_data_set_ids), ids=str_c(failed_data_set_ids, collapse=', ')
))
data <- names(data) %in% failed_data_set_ids %>% `!` %>% data_raw[.]
stopifnot(length(data) == length(data_raw) - length(failed_data_set_ids))
##############
# Extraction #
##############
## Extract experiment metadata
cs_exp <- # Build vector of scalar fields specific to experiments
data %>% map(names) %>% unlist %>% unname %>% unique %>%
discard(~. %in% c('fcs', 'keywords'))
df_exp <- # Extract the above fields for each experiment
data %>% map(`[`, cs_exp) %>% bind_rows
## Extract experiment keywords
df_kwd <-
data %>% map('keywords') %>% enframe %>% filter(!map_lgl(value, is.null)) %>% unnest %>%
set_names('exp_id', 'keyword')
## Extract FCS file metadata
cs_fcs <- # Build vector of scalar fields specific to fcs files (but not channels)
data %>% map('fcs') %>% map(~map(., names)) %>%
unlist %>% unname %>% unique %>%
discard(~. %in% c('param_channels', 'param_names'))
df_fcs <- # Extract the fields above for each fcs file
data %>% map('fcs') %>% unlist(recursive=F) %>% map(`[`, cs_fcs) %>% bind_rows
## Extract FCS file channels
df_chl <- # Build single data frame mapping experiments to fcs files and their corresponding channels
data %>% map('fcs') %>% unlist(recursive=F) %>%
map(`[`, c('param_channels', 'param_names', 'filename', 'exp_id')) %>%
map(as_tibble) %>% bind_rows %>%
rename(param_name=param_names, param_channel=param_channels)
loginfo('Extraction complete')
##########
# Export #
##########
write_csv(df_exp, 'data/experiments.csv')
write_csv(df_kwd, 'data/keywords.csv')
write_csv(df_fcs, 'data/fcs_files.csv')
write_csv(df_chl, 'data/fcs_channels.csv')
loginfo('All data collection complete')