Skip to content

Commit

Permalink
Exposing getThreadNumber() function that can be used to determine w…
Browse files Browse the repository at this point in the history
…hich thread we're in (including the main thread).
  • Loading branch information
schuemie committed Oct 3, 2024
1 parent be3ac72 commit 6e30522
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 46 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Package: ParallelLogger
Type: Package
Title: Support for Parallel Computation, Logging, and Function Automation
Version: 3.3.1
Date: 2024-08-23
Version: 3.3.2
Date: 2024-10-03
Authors@R: c(
person("Martijn", "Schuemie", email = "[email protected]", role = c("aut", "cre")),
person("Marc", "Suchard", role = c("aut")),
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export(createFileAppender)
export(createLogger)
export(excludeFromList)
export(getLoggers)
export(getThreadNumber)
export(launchLogViewer)
export(layoutEmail)
export(layoutErrorReport)
Expand Down
8 changes: 8 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
ParallelLogger 3.3.2
====================

Changes

1. Exposing `getThreadNumber()` function that can be used to determine which thread we're in (including the main thread).


ParallelLogger 3.3.1
====================

Expand Down
4 changes: 2 additions & 2 deletions R/Appenders.R
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ createFileAppender <- function(layout = layoutParallel,
)
}
)
if (is.null(getOption("threadNumber")) && identical(layout, layoutErrorReport)) {
if (getThreadNumber() == 0 && identical(layout, layoutErrorReport)) {
writeLines(paste("An error report has been created at ", fileName))
}
}
Expand Down Expand Up @@ -181,7 +181,7 @@ createEmailAppender <- function(layout = layoutEmail,
}

# Only main thread gets to send e-mails:
if (!is.null(getOption("threadNumber"))) {
if (getThreadNumber() != 0) {
return()
}

Expand Down
25 changes: 20 additions & 5 deletions R/Cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@
# See the License for the specific language governing permissions and
# limitations under the License.

#' Return the number of the current thread
#'
#' @return
#' Returns the number of the current thread. Returns 0 if this is the main thread.
#'
#' @export
getThreadNumber <- function() {
threadNumber <- getOption("PARALLELLOGGER_THREAD_NUMBER")
if (is.null(threadNumber)) {
return(0)
} else {
return(threadNumber)
}
}

doSetAndromedaTempFolder <- function(andromedaTempFolder) {
options(andromedaTempFolder = andromedaTempFolder)
ParallelLogger::logTrace("AndromedateTempFolder set to ", andromedaTempFolder)
Expand Down Expand Up @@ -48,7 +63,7 @@ makeCluster <- function(numberOfThreads, singleThreadToMain = TRUE, setAndromeda
for (logger in loggers) {
ParallelLogger::registerLogger(logger)
}
options(threadNumber = threadNumber)
options(PARALLELLOGGER_THREAD_NUMBER = threadNumber)
ParallelLogger::logTrace("Thread ", threadNumber, " initiated")
finalize <- function(env) {
ParallelLogger::logTrace("Thread ", threadNumber, " terminated")
Expand Down Expand Up @@ -182,15 +197,15 @@ clusterApply <- function(cluster, x, fun, ..., stopOnError = FALSE, progressBar
if (progressBar) {
pb <- txtProgressBar(style = 3)
}

for (i in 1:min(n, p)) {
snow::sendCall(cluster[[i]], functionWrapper, c(
list(x[[i]]),
list(...),
list(fun = fun)
), tag = i)
}

val <- vector("list", n)
hasError <- FALSE
for (i in 1:n) {
Expand All @@ -215,8 +230,8 @@ clusterApply <- function(cluster, x, fun, ..., stopOnError = FALSE, progressBar
list(...),
list(fun = fun)
), tag = j)


# snow::sendCall(cluster[[d$node]], fun, c(list(x[[j]]), list(...)), tag = j)
}
val[d$tag] <- list(d$value)
Expand Down
16 changes: 8 additions & 8 deletions R/Layouts.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ layoutTimestamp <- function(level, message) {
#' @export
layoutParallel <- function(level, message) {
time <- format(Sys.time(), "%Y-%m-%d %H:%M:%S")
threadNumber <- getOption("threadNumber")
if (is.null(threadNumber)) {
threadNumber <- getThreadNumber()
if (threadNumber == 0) {
threadLabel <- "Main thread"
} else {
threadLabel <- paste("Thread", threadNumber)
Expand Down Expand Up @@ -113,8 +113,8 @@ layoutParallel <- function(level, message) {
#' @export
layoutStackTrace <- function(level, message) {
time <- format(Sys.time(), "%Y-%m-%d %H:%M:%S")
threadNumber <- getOption("threadNumber")
if (is.null(threadNumber)) {
threadNumber <- getThreadNumber()
if (threadNumber == 0) {
threadLabel <- "Main thread"
} else {
threadLabel <- paste("Thread", threadNumber)
Expand Down Expand Up @@ -161,8 +161,8 @@ layoutEmail <- function(level, message) {
#' @export
layoutErrorReport <- function(level, message) {
lines <- c()
threadNumber <- getOption("threadNumber")
if (is.null(threadNumber)) {
threadNumber <- getThreadNumber()
if (threadNumber == 0) {
lines <- c(lines, "Thread: Main")
} else {
lines <- c(lines, paste("Thread: ", threadNumber))
Expand All @@ -175,7 +175,7 @@ layoutErrorReport <- function(level, message) {
lines <- c(lines, "Stack trace:")
lines <- c(lines, .tidyStackTrace(limitedLabels(sys.calls())))
lines <- c(lines, "")
if (is.null(threadNumber)) {
if (threadNumber == 0) {
lines <- c(lines, .systemInfo())
lines <- c(lines, "")
}
Expand Down Expand Up @@ -207,7 +207,7 @@ layoutErrorReport <- function(level, message) {

.tidyStackTrace <- function(trace) {
# saveRDS(trace, sprintf("s:/temp/trace_%d.rds", length(trace)))
if (is.null(getOption("threadNumber"))) {
if (getThreadNumber() == 0) {
if (length(trace) > 4 && grepl("echoToConsole = FALSE", trace[length(trace) - 4])) {
# Captured via globalCallingHandlers(): 2 more layers to discard
offset <- 2
Expand Down
2 changes: 1 addition & 1 deletion R/Logging.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ handlersRegistered <- function() {
}

registerDefaultHandlers <- function() {
if (!is.null(getOption("threadNumber")) || handlersRegistered()) {
if (getThreadNumber() != 0 || handlersRegistered()) {
return()
}
if (inTryCatchOrWithCallingHandlers()) {
Expand Down
3 changes: 2 additions & 1 deletion _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ reference:
- makeCluster
- clusterRequire
- clusterApply
- stopCluster
- stopCluster
- getThreadNumber
- title: "Function automation"
desc: >
Functions for capturing arguments for function calls so these can be executed automatically later on.
Expand Down
14 changes: 14 additions & 0 deletions man/getThreadNumber.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 16 additions & 27 deletions tests/testthat/test-cluster.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,6 @@ test_that("Create a cluster of nodes for parallel computation", {
testthat::expect_equal(length(res), 1)
})

test_that("makeCluster", {
cluster <- snow::makeCluster(2, type = "SOCK")
logThreadStart <- function(loggers, threadNumber) {
ParallelLogger::clearLoggers()
for (logger in loggers) {
ParallelLogger::registerLogger(logger)
}
options(threadNumber = threadNumber)
ParallelLogger::logTrace("Thread ", threadNumber, " initiated")
finalize <- function(env) {
ParallelLogger::logTrace("Thread ", threadNumber, " terminated")
}
reg.finalizer(globalenv(), finalize, onexit = TRUE)
return(NULL)
}

loggers <- ParallelLogger::getLoggers()
testthat::expect_gt(length(cluster), expected = 1)

for (i in 1:length(cluster)) {
res <- snow::sendCall(cluster[[i]], logThreadStart, list(loggers = loggers, threadNumber = i))
testthat::expect_equal(res, NULL)
}

ParallelLogger::clearLoggers()
})

test_that("Test require package", {
tryCatch(
expr = {
Expand Down Expand Up @@ -87,3 +60,19 @@ test_that("Check andromedaTempFolder", {
testthat::expect_true(!is.null(getOption("andromedaTempFolder")))
testthat::expect_equal(getOption("andromedaTempFolder"), check)
})


test_that("Test getThreadNumber", {
expect_equal(getThreadNumber(), 0)

fun <- function(x) {
return(ParallelLogger::getThreadNumber())
}

cluster <- makeCluster(numberOfThreads = 3)
x <- clusterApply(cluster, 1:3, fun)
stopCluster(cluster)

expect_equal(sort(unlist(x)), 1:3)
})

0 comments on commit 6e30522

Please sign in to comment.