Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,20 @@ class AzBatchExecutor extends Executor implements ExtensionPoint {
azConfig = AzConfig.getConfig(session)
batchService = new AzBatchService(this)

// Generate an account SAS token using either activeDirectory configs or storage account keys
// When using account key, generate an account-level SAS (covers all containers).
// When using AD/MI, SAS tokens are generated lazily per-container on first access
// via AzFileSystemProvider.newFileSystem0() — see generateContainerSasIfNeeded().
if (!azConfig.storage().sasToken) {
azConfig.storage().sasToken = azConfig.activeDirectory().isConfigured() || azConfig.managedIdentity().isConfigured()
? AzHelper.generateContainerSasWithActiveDirectory(workDir, azConfig.storage().tokenDuration)
: AzHelper.generateAccountSasWithAccountKey(workDir, azConfig.storage().tokenDuration)
if( !azConfig.activeDirectory().isConfigured() && !azConfig.managedIdentity().isConfigured() ) {
azConfig.storage().sasToken = AzHelper.generateAccountSasWithAccountKey(workDir, azConfig.storage().tokenDuration)
}
// For AD/MI: generate a SAS for the workDir container eagerly so it is available
// immediately, then additional containers are handled lazily on first access.
else {
final container = (workDir as AzPath).getContainerName() as String
final sas = AzHelper.generateContainerSasWithActiveDirectory(workDir, azConfig.storage().tokenDuration)
azConfig.storage().setSasToken(container, sas)
}
}

Global.onCleanup((it) -> batchService.close())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,12 +518,18 @@ class AzBatchService implements Closeable {
return key.size()>MAX_LEN ? key.substring(0,MAX_LEN) : key
}

protected String getSasForPath(Path path) {
if( path instanceof AzPath )
return config.storage().getSasToken(path.getContainerName() as String)
return null
}

protected BatchTaskCreateContent createTask(String poolId, String jobId, TaskRun task) {
assert poolId, 'Missing Azure Batch poolId argument'
assert jobId, 'Missing Azure Batch jobId argument'
assert task, 'Missing Azure Batch task argument'

final sas = config.storage().sasToken
final sas = config.storage().sasToken ?: getSasForPath(task.workDir)
if( !sas )
throw new IllegalArgumentException("Missing Azure Blob storage SAS token")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.cloud.azure.config.AzConfig
import nextflow.cloud.azure.file.AzBashLib
import nextflow.cloud.azure.nio.AzPath
import nextflow.executor.SimpleFileCopyStrategy
import nextflow.processor.TaskBean
import nextflow.processor.TaskRun
Expand Down Expand Up @@ -52,6 +53,13 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
this.maxParallelTransfers = config.batch().maxParallelTransfers
this.maxTransferAttempts = config.batch().maxTransferAttempts
this.delayBetweenAttempts = config.batch().delayBetweenAttempts
if( !sasToken ) {
final List<Path> paths = []
if( remoteBinDir ) paths << remoteBinDir
if( bean.workDir ) paths << bean.workDir
paths.addAll( bean.inputFiles.values() )
for( Path p : paths ) getSasForPath(p)
}
}

@Override
Expand All @@ -64,18 +72,55 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
copy.remove('PATH')
copy.put('PATH', '$PWD/.nextflow-bin:$AZ_BATCH_NODE_SHARED_DIR/bin/:$PATH')
copy.put('AZCOPY_LOG_LOCATION', '$PWD/.azcopy_log')
copy.put('AZ_SAS', sasToken)
if( sasToken ) {
copy.put('AZ_SAS', sasToken)
}
else {
for( Map.Entry<String,String> entry : config.storage().containerSasTokens.entrySet() ) {
final varName = 'AZ_SAS_' + entry.key.toUpperCase().replaceAll('[^A-Z0-9]', '_')
copy.put(varName, entry.value)
}
}

// finally render the environment
final envSnippet = super.getEnvScript(copy,false)
if( envSnippet )
result << envSnippet
return result.toString()

}

static String uploadCmd(String source, Path targetDir) {
"nxf_az_upload ${Escape.path(source)} '${AzHelper.toHttpUrl(targetDir)}'"
protected String getSasForPath(Path path) {
if( sasToken )
return sasToken
if( !(path instanceof AzPath) )
return null
final container = (path as AzPath).getContainerName() as String
if( !container )
return null
String sas = config.storage().getSasToken(container)
if( sas )
return sas
// No SAS registered yet for this container — generate one now.
// This handles input files from containers that were opened before
// the executor started (e.g. az://igenomes accessed during task graph build).
if( config.activeDirectory().isConfigured() || config.managedIdentity().isConfigured() ) {
log.debug "Generating SAS token for Azure container: $container"
sas = AzHelper.generateContainerSasWithActiveDirectory(path, config.storage().tokenDuration)
config.storage().setSasToken(container, sas)
}
return sas
}

protected String httpUrl(Path path) {
AzHelper.toHttpUrl(path, getSasForPath(path))
}

String uploadCmd(String source, Path targetDir) {
"nxf_az_upload ${Escape.path(source)} '${httpUrl(targetDir)}'"
}

static String uploadCmd(String source, Path targetDir, String sas) {
"nxf_az_upload ${Escape.path(source)} '${AzHelper.toHttpUrl(targetDir, sas)}'"
}

@Override
Expand All @@ -86,7 +131,7 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
@Override
String getStageInputFilesScript(Map<String, Path> inputFiles) {
String result = ( remoteBinDir ? """\
nxf_az_download '${AzHelper.toHttpUrl(remoteBinDir)}' \$PWD/.nextflow-bin
nxf_az_download '${httpUrl(remoteBinDir)}' \$PWD/.nextflow-bin
chmod +x \$PWD/.nextflow-bin/* || true
""".stripIndent(true) : '' )

Expand All @@ -103,8 +148,8 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
String stageInputFile( Path path, String targetName ) {
// third param should not be escaped, because it's used in the grep match rule
def stage_cmd = maxTransferAttempts > 1
? "downloads+=(\"nxf_cp_retry nxf_az_download '${AzHelper.toHttpUrl(path)}' ${Escape.path(targetName)}\")"
: "downloads+=(\"nxf_az_download '${AzHelper.toHttpUrl(path)}' ${Escape.path(targetName)}\")"
? "downloads+=(\"nxf_cp_retry nxf_az_download '${httpUrl(path)}' ${Escape.path(targetName)}\")"
: "downloads+=(\"nxf_az_download '${httpUrl(path)}' ${Escape.path(targetName)}\")"
return stage_cmd
}

Expand All @@ -128,7 +173,7 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
uploads=()
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
uploads+=("nxf_az_upload '\$name' '${AzHelper.toHttpUrl(targetDir)}'")
uploads+=("nxf_az_upload '\$name' '${httpUrl(targetDir)}'")
done
unset IFS
nxf_parallel "\${uploads[@]}"
Expand Down Expand Up @@ -156,7 +201,7 @@ class AzFileCopyStrategy extends SimpleFileCopyStrategy {
*/
@Override
String copyFile( String name, Path target ) {
"nxf_az_upload ${Escape.path(name)} '${AzHelper.toHttpUrl(target.parent)}'"
"nxf_az_upload ${Escape.path(name)} '${httpUrl(target.parent)}'"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,34 @@ class AzHelper {


static String generateContainerSasWithActiveDirectory(Path path, Duration duration) {
final key = generateUserDelegationKey(az0(path), duration)
final azPath = az0(path)
final key = generateUserDelegationKey(azPath, duration)
return generateContainerUserDelegationSas(azPath.containerClient(), duration, key)
}

return generateContainerUserDelegationSas(az0(path).containerClient(), duration, key)
/**
* Generate a container-scoped SAS token for the given {@link BlobContainerClient} using
* an already-obtained user delegation key. This overload avoids a redundant key fetch
* when generating tokens for multiple containers that share the same service client.
*
* @param client The container client to generate the SAS for
* @param duration The requested token lifetime
* @param key A user delegation key obtained from the parent {@link BlobServiceClient}
* @return The SAS token string
*/
static String generateContainerSasWithActiveDirectory(BlobContainerClient client, Duration duration, UserDelegationKey key) {
return generateContainerUserDelegationSas(client, duration, key)
}

static String generateAccountSasWithAccountKey(Path path, Duration duration) {
generateAccountSas(az0(path).getFileSystem().getBlobServiceClient(), duration)
}

static UserDelegationKey generateUserDelegationKey(Path path, Duration duration) {
return generateUserDelegationKey(az0(path).getFileSystem().getBlobServiceClient(), duration)
}

final client = az0(path).getFileSystem().getBlobServiceClient()

static UserDelegationKey generateUserDelegationKey(BlobServiceClient client, Duration duration) {
final startTime = OffsetDateTime.now()
final indicatedExpiryTime = startTime.plusHours(duration.toHours())

Expand All @@ -131,9 +146,7 @@ class AzHelper {

final expiryTime = (indicatedExpiryTime.toEpochSecond() <= maxExpiryTime.toEpochSecond()) ? indicatedExpiryTime : maxExpiryTime

final delegationKey = client.getUserDelegationKey(startTime, expiryTime)

return delegationKey
return client.getUserDelegationKey(startTime, expiryTime)
}

static String generateContainerUserDelegationSas(BlobContainerClient client, Duration duration, UserDelegationKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.cloud.azure.config

import java.util.concurrent.ConcurrentHashMap

import groovy.transform.CompileStatic
import nextflow.SysEnv
import nextflow.cloud.azure.batch.AzHelper
Expand Down Expand Up @@ -60,6 +62,12 @@ class AzStorageOpts implements ConfigScope {
@PlaceholderName("<name>")
final Map<String,AzFileShareOpts> fileShares

/**
* Per-container SAS tokens, keyed by container name.
* Used when accessing multiple blob containers with AD/MI authentication.
* ConcurrentHashMap ensures safe concurrent reads and writes from multiple task threads.
*/
private final Map<String,String> containerSasTokens = new ConcurrentHashMap<>()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be better as some object rather than Map<String, String> but I think this is sufficient to demonstrate the fix.


AzStorageOpts(Map config, Map<String,String> env=SysEnv.get()) {
assert config!=null
Expand All @@ -72,6 +80,34 @@ class AzStorageOpts implements ConfigScope {

}

/**
* Return the SAS token for the given container name.
* Falls back to the global {@link #sasToken} if no per-container token is registered.
*
* @param containerName The blob container name
* @return The SAS token, or {@code null} if none is available
*/
String getSasToken(String containerName) {
return containerSasTokens.getOrDefault(containerName, sasToken)
}

/**
* Register a SAS token for a specific container.
*
* @param containerName The blob container name
* @param token The SAS token to associate with the container
*/
void setSasToken(String containerName, String token) {
containerSasTokens.put(containerName, token)
}

/**
* Return all registered per-container SAS tokens (unmodifiable view).
*/
Map<String,String> getContainerSasTokens() {
return Collections.unmodifiableMap(containerSasTokens)
}

Map<String,Object> getEnv() {
Map<String, Object> props = new HashMap<>();
props.put(AzFileSystemProvider.AZURE_STORAGE_ACCOUNT_KEY, accountKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,18 @@ class AzBashLib extends BashFunLib<AzBashLib> {
local target=${2%/} ## remove ending slash
local base_name="$(basename "$name")"
local dir_name="$(dirname "$name")"
local target_base="${target%%\\?*}"
local target_qs="${target#*\\?}"
[[ "$target_base" == "$target" ]] && target_qs=""

if [[ -d $name ]]; then
if [[ "$base_name" == "$name" ]]; then
azcopy cp "$name" "$target?$AZ_SAS" --recursive --block-blob-tier $AZCOPY_BLOCK_BLOB_TIER --block-size-mb $AZCOPY_BLOCK_SIZE_MB
azcopy cp "$name" "$target_base${target_qs:+?$target_qs}" --recursive --block-blob-tier $AZCOPY_BLOCK_BLOB_TIER --block-size-mb $AZCOPY_BLOCK_SIZE_MB
else
azcopy cp "$name" "$target/$dir_name?$AZ_SAS" --recursive --block-blob-tier $AZCOPY_BLOCK_BLOB_TIER --block-size-mb $AZCOPY_BLOCK_SIZE_MB
azcopy cp "$name" "$target_base/$dir_name${target_qs:+?$target_qs}" --recursive --block-blob-tier $AZCOPY_BLOCK_BLOB_TIER --block-size-mb $AZCOPY_BLOCK_SIZE_MB
fi
else
azcopy cp "$name" "$target/$name?$AZ_SAS" --block-blob-tier $AZCOPY_BLOCK_BLOB_TIER --block-size-mb $AZCOPY_BLOCK_SIZE_MB
azcopy cp "$name" "$target_base/$name${target_qs:+?$target_qs}" --block-blob-tier $AZCOPY_BLOCK_BLOB_TIER --block-size-mb $AZCOPY_BLOCK_SIZE_MB
fi
}

Expand All @@ -80,10 +83,14 @@ class AzBashLib extends BashFunLib<AzBashLib> {
local ret
mkdir -p "$basedir"

ret=$(azcopy cp "$source?$AZ_SAS" "$target" 2>&1) || {
ret=$(azcopy cp "$source" "$target" 2>&1) || {
## if fails check if it was trying to download a directory
mkdir -p $target
azcopy cp "$source/*?$AZ_SAS" "$target" --recursive >/dev/null || {
local source_base="${source%%\\?*}"
local source_qs="${source#*\\?}"
[[ "$source_base" == "$source" ]] && source_qs=""
local source_dir="${source_base}/*${source_qs:+?$source_qs}"
azcopy cp "$source_dir" "$target" --recursive >/dev/null || {
rm -rf $target
>&2 echo "Unable to download path: $source"
exit 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ class AzPathFactory extends FileSystemPathFactory {

@Override
protected String getUploadCmd(String source, Path target) {
return target instanceof AzPath ? AzFileCopyStrategy.uploadCmd(source, target) : null
if( !(target instanceof AzPath) )
return null
final container = ((AzPath)target).getContainerName() as String
final sas = AzConfig.getConfig().storage().getSasToken(container)
return AzFileCopyStrategy.uploadCmd(source, target, sas)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,36 +74,38 @@ class AzFusionEnv implements FusionEnv {
return result
}

// If no managed identity, use the standard environment with SAS token
final containerTokens = cfg.storage().containerSasTokens
for( Map.Entry<String,String> entry : containerTokens.entrySet() ) {
final envKey = "AZURE_STORAGE_SAS_TOKEN_${entry.key.toUpperCase().replaceAll('[^A-Z0-9]', '_')}"
result.put(envKey, entry.value)
}
result.AZURE_STORAGE_SAS_TOKEN = getOrCreateSasToken()

return result
}

/**
* Return the SAS token if it is defined in the configuration, otherwise generate one based on the requested
* authentication method.
*/
synchronized String getOrCreateSasToken() {
final cfg = AzConfig.config

// Check for incompatible configuration
if (cfg.storage().accountKey && cfg.storage().sasToken) {
throw new IllegalArgumentException("Azure Storage Access key and SAS token detected. Only one is allowed")
}

// If a SAS token is already defined in the configuration, just return it
if (cfg.storage().sasToken) {
return cfg.storage().sasToken
}

// For Active Directory and Managed Identity, we cannot generate an *account* SAS token, but we can generate
// a *container* SAS token for the work directory.
if (cfg.activeDirectory().isConfigured() || cfg.managedIdentity().isConfigured()) {
return AzHelper.generateContainerSasWithActiveDirectory(Global.session.workDir, cfg.storage().tokenDuration)
final workDir = Global.session.workDir
final container = (workDir as nextflow.cloud.azure.nio.AzPath).getContainerName()
final existing = cfg.storage().getSasToken(container as String)
if( existing )
return existing
final sas = AzHelper.generateContainerSasWithActiveDirectory(workDir, cfg.storage().tokenDuration)
cfg.storage().setSasToken(container as String, sas)
return sas
}

// Shared Key authentication can use an account SAS token
return AzHelper.generateAccountSasWithAccountKey(Global.session.workDir, cfg.storage().tokenDuration)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ class AzFileSystem extends FileSystem {

@PackageScope
void copy(AzPath source, AzPath target) {
final sasToken = provider.getSasToken()
final sasToken = provider.getSasToken(source.getContainerName())
String sourceUrl = source.blobClient().getBlobUrl()

if (sasToken != null) {
Expand Down
Loading
Loading