Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/bdbag.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12", "3.13", "3.14"]

steps:
- uses: actions/checkout@v3
Expand Down
29 changes: 29 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
# CHANGE LOG

## 1.9.0

#### New feature: Fetch Parallelism
* Added support for concurrent/parallel file fetching via `ThreadPoolExecutor`. Opt-in via CLI `--fetch-concurrency N`
argument or API `fetch_concurrency=N` parameter on `resolve_fetch()` and `materialize()`. Default remains serial
(concurrency=1) for full backward compatibility.
* New configuration keys in `bdbag.json`:
* `max_concurrent_fetches` (default `8`): ceiling for the effective concurrency. The requested concurrency is clamped
to this value.
* `concurrent_fetch_exclude_schemes` (default `["globus"]`): transport schemes that are always fetched serially, even
when concurrent fetching is enabled.
* Thread-safety improvements:
* `os.makedirs` in `fetch/__init__.py` now uses `exist_ok=True` to avoid race conditions.
* HTTP session creation in `fetch_http.py` is now protected by a `threading.Lock`.
* HTTP fetch sessions are now isolated per worker thread, preventing concurrent fetches from corrupting shared auth state.
* Fetcher instance creation uses double-checked locking to prevent duplicate transport instantiation.
* Ctrl+C (SIGINT) handling during concurrent fetches: a custom signal handler sets a cancellation event that causes
in-flight workers to abort promptly, with clean shutdown and `KeyboardInterrupt` re-raised to the caller.
* CLI now catches `KeyboardInterrupt` and exits cleanly with an "Interrupted by user." message instead of a traceback.

#### Significant changes
* Dropped support for Python < 3.9.
* Removed obsolete Python version checks throughout the codebase.

#### Bugfixes
* Fixed SSL certificate verification bypass whitelist matching to use origin-based comparison instead of substring matching, preventing unintended domain matches.
* Fixed keychain authentication entry matching to use URL prefix comparison instead of substring matching, preventing credential leakage to unintended hosts.
* Replaced use of `eval()` for numeric filter comparisons with `operator` module functions.

## 1.8.0

* Dropped support for `Python<3.8`, including Python 2.
Expand Down
870 changes: 481 additions & 389 deletions Pipfile.lock

Large diffs are not rendered by default.

13 changes: 4 additions & 9 deletions bdbag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import logging
import mimetypes
import shutil
import operator as op
from datetime import datetime
from urllib.parse import quote as urlquote, unquote as urlunquote, urlsplit, urlunsplit, urlparse
from urllib.request import urlretrieve, urlopen, urlcleanup
from importlib.metadata import distribution, PackageNotFoundError

logger = logging.getLogger(__name__)

__version__ = "1.8.0"
__version__ = "1.9.0-dev"
__bagit_version__ = "1.9.0"
__bagit_profile_version__ = "1.3.1"

Expand Down Expand Up @@ -127,12 +128,6 @@ def parse_content_disposition(value): # pragma: no cover
except Exception as e:
raise ValueError('Invalid URL encoding of content-disposition filename component. %s.' % e)

try:
if sys.version_info < (3,):
n = n.decode('utf8')
except Exception as e:
raise ValueError('Invalid UTF-8 encoding of content-disposition filename component. %s.' % e)

return n


Expand Down Expand Up @@ -203,8 +198,8 @@ def filter_dict(expr, entry):
result = str(value).endswith(filter_val)
elif filter_relation:
try:
statement = "%d%s%d" % (int(value), operator, int(filter_val))
result = eval(statement)
_ops = {">": op.gt, ">=": op.ge, "<": op.lt, "<=": op.le}
result = _ops[operator](int(value), int(filter_val))
except Exception as e:
logger.warning("Unable to evaluate filter expression [%s]: %s" %
(expr, get_typed_exception(e)))
Expand Down
78 changes: 36 additions & 42 deletions bdbag/bdbag_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,9 @@
def configure_logging(level=logging.INFO, logpath=None, filemode='a', log_format=DEFAULT_LOG_FORMAT, force=False):
logging.captureWarnings(True)
if logpath:
if sys.version_info > (3, 8):
logging.basicConfig(filename=logpath, filemode=filemode, level=level, format=log_format, force=force)
else:
logging.basicConfig(filename=logpath, filemode=filemode, level=level, format=log_format)
logging.basicConfig(filename=logpath, filemode=filemode, level=level, format=log_format, force=force)
else:
if sys.version_info > (3, 8):
logging.basicConfig(level=level, format=log_format, force=force)
else:
logging.basicConfig(level=level, format=log_format)
logging.basicConfig(level=level, format=log_format, force=force)


def read_metadata(metadata_file):
Expand Down Expand Up @@ -377,15 +371,15 @@ def archive_bag(bag_path, bag_archiver, config_file=None, idempotent=None):
tarmode = 'w:gz'
elif bag_archiver == 'bz2':
tarmode = 'w:bz2'
elif bag_archiver == 'xz' and sys.version_info >= (3, 3):
elif bag_archiver == 'xz':
tarmode = 'w:xz'
elif bag_archiver == 'zip':
zfp = os.path.join(os.path.dirname(bag_path), fn)
archive = zip_bag_dir(bag_path, zfp, idempotent)
else:
raise RuntimeError("Archive format not supported for bag file: %s \n "
"Supported archive formats are ZIP or TAR/GZ/BZ2%s" %
(bag_path, ("/XZ" if sys.version_info >= (3, 3) else "")))
(bag_path, "/XZ"))

if tarmode:
archive = tar_bag_dir(bag_path, fn, tarmode, idempotent)
Expand Down Expand Up @@ -447,36 +441,33 @@ def zip_bag_dir(bag_path, zip_file_path, idempotent=False):
entries.sort()
for e in entries:
filepath = os.path.join(os.path.dirname(bag_path), e)
if sys.version_info < (3,):
zipfile.write(filepath, e)
if idempotent:
# a fixed mtime is a core requirement for a reproducible archive
date_time = (1980, 1, 1, 0, 0, 0)
else:
if idempotent:
# a fixed mtime is a core requirement for a reproducible archive
date_time = (1980, 1, 1, 0, 0, 0)
else:
st = os.stat(filepath)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
info = ZipInfo(
filename=e,
date_time=date_time
)
info.create_system = 3 # unix
if e.endswith(os.path.sep):
info.external_attr = 0o40755 << 16 | 0x010
info.compress_type = ZIP_STORED
info.CRC = 0 # unclear why necessary, maybe a bug?
zipfile.writestr(info, b'')
else:
info.external_attr = 0o100644 << 16
info.compress_type = ZIP_DEFLATED
with io.open(filepath, 'rb') as data, zipfile.open(info, 'w') as out:
while True:
chunk = data.read(io.DEFAULT_BUFFER_SIZE)
if not chunk:
break
out.write(chunk)
out.flush()
st = os.stat(filepath)
mtime = time.localtime(st.st_mtime)
date_time = mtime[0:6]
info = ZipInfo(
filename=e,
date_time=date_time
)
info.create_system = 3 # unix
if e.endswith(os.path.sep):
info.external_attr = 0o40755 << 16 | 0x010
info.compress_type = ZIP_STORED
info.CRC = 0 # unclear why necessary, maybe a bug?
zipfile.writestr(info, b'')
else:
info.external_attr = 0o100644 << 16
info.compress_type = ZIP_DEFLATED
with io.open(filepath, 'rb') as data, zipfile.open(info, 'w') as out:
while True:
chunk = data.read(io.DEFAULT_BUFFER_SIZE)
if not chunk:
break
out.write(chunk)
out.flush()
zipfile.close()
return zipfile.filename

Expand Down Expand Up @@ -506,14 +497,13 @@ def extract_bag(bag_path, output_path=None, temp=False, config_file=None):
archive = ZipFile(bag_path)
files = archive.namelist()
elif tarfile.is_tarfile(bag_path):
logger.info("Extracting TAR/GZ/BZ2%s archived file: %s" %
(("/XZ" if sys.version_info >= (3, 3) else ""), bag_path))
logger.info("Extracting TAR/GZ/BZ2/XZ archived file: %s" % bag_path)
archive = tarfile.open(bag_path)
files = archive.getnames()
else:
raise RuntimeError("Archive format not supported for file: %s\n"
"Supported archive formats are ZIP or TAR/GZ/BZ2%s" %
(bag_path, ("/XZ" if sys.version_info >= (3, 3) else "")))
(bag_path, "/XZ"))
archived_bag_dir = bag_parent_dir_from_archive(files)
extracted_path = os.path.join(base_path, archived_bag_dir or bag_dir)
output_path = os.path.join(output_path, extracted_path or bag_dir) if output_path else None
Expand Down Expand Up @@ -728,6 +718,7 @@ def resolve_fetch(bag_path,
keychain_file=DEFAULT_KEYCHAIN_FILE,
config_file=None,
filter_expr=None,
fetch_concurrency=None,
**kwargs):
bag = bdbagit.BDBag(bag_path)
if force or not check_payload_consistency(bag, skip_remote=False, quiet=kwargs.get("quiet", True)):
Expand All @@ -741,6 +732,7 @@ def resolve_fetch(bag_path,
config_file=config_file,
callback=callback,
filter_expr=filter_expr,
fetch_concurrency=fetch_concurrency,
**kwargs)
else:
return True
Expand All @@ -754,6 +746,7 @@ def materialize(input_path,
config_file=None,
filter_expr=None,
force=False,
fetch_concurrency=None,
**kwargs):

bag_file = bag_path = None
Expand Down Expand Up @@ -789,6 +782,7 @@ def materialize(input_path,
keychain_file=keychain_file,
config_file=config_file,
filter_expr=filter_expr,
fetch_concurrency=fetch_concurrency,
**kwargs):
logger.warning("One or more bag files were not fetched successfully.")

Expand Down
20 changes: 15 additions & 5 deletions bdbag/bdbag_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ def parse_cli():
"directory will be deleted.")

archiver_arg = "--archiver"
choices = ['zip', 'tar', 'tgz', 'bz2']
if sys.version_info >= (3, 3):
choices.append("xz")
choices = ['zip', 'tar', 'tgz', 'bz2', 'xz']
standard_args.add_argument(
archiver_arg, choices=choices, help="Archive a bag using the specified format.")

Expand Down Expand Up @@ -152,6 +150,12 @@ def parse_cli():
"The \"all\" option causes all fetch files to be re-acquired,"
" even if they already exist in the bag payload directory.")

fetch_concurrency_arg = "--fetch-concurrency"
standard_args.add_argument(
fetch_concurrency_arg, type=int, default=1, metavar='<int>',
help="Number of concurrent fetch operations to perform. Defaults to 1 (serial). "
"Clamped to the max_concurrent_fetches value in the configuration file.")

fetch_filter_arg = "--fetch-filter"
standard_args.add_argument(
fetch_filter_arg, metavar="<column><operator><value>",
Expand Down Expand Up @@ -350,7 +354,8 @@ def main():
validation_callback=None,
keychain_file=args.keychain_file,
config_file=args.config_file,
filter_expr=args.fetch_filter)
filter_expr=args.fetch_filter,
fetch_concurrency=args.fetch_concurrency)
return result

if is_uri:
Expand Down Expand Up @@ -400,7 +405,8 @@ def main():
force=True if args.resolve_fetch == 'all' else False,
keychain_file=args.keychain_file,
config_file=args.config_file,
filter_expr=args.fetch_filter)
filter_expr=args.fetch_filter,
fetch_concurrency=args.fetch_concurrency)

if args.validate_profile:
if not is_file:
Expand Down Expand Up @@ -435,6 +441,10 @@ def main():
if args.revert:
bdb.revert_bag(path)

except KeyboardInterrupt:
result = 1
error = "Interrupted by user."

except Exception as e:
result = 1
error = "Error: %s" % get_typed_exception(e)
Expand Down
7 changes: 7 additions & 0 deletions bdbag/bdbag_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@

logger = logging.getLogger(__name__)

FETCH_CONCURRENCY_TAG = "max_concurrent_fetches"
DEFAULT_FETCH_CONCURRENCY = 8
FETCH_CONCURRENCY_EXCLUDE_TAG = "concurrent_fetch_exclude_schemes"
DEFAULT_FETCH_CONCURRENCY_EXCLUDE = ["globus"]

BAG_CONFIG_TAG = "bag_config"
BAG_SPEC_VERSION_TAG = "bagit_spec_version"
BAG_ALGORITHMS_TAG = "bag_algorithms"
Expand Down Expand Up @@ -148,6 +153,8 @@
}
},
FETCH_CONFIG_TAG: DEFAULT_FETCH_CONFIG,
FETCH_CONCURRENCY_TAG: DEFAULT_FETCH_CONCURRENCY,
FETCH_CONCURRENCY_EXCLUDE_TAG: DEFAULT_FETCH_CONCURRENCY_EXCLUDE,
ID_RESOLVER_TAG: DEFAULT_ID_RESOLVERS,
RESOLVER_CONFIG_TAG: DEFAULT_RESOLVER_CONFIG
}
Expand Down
2 changes: 1 addition & 1 deletion bdbag/fetch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ def ensure_valid_output_path(url, output_path=None):
output_path = os.path.abspath(output_path)
output_dir = os.path.dirname(output_path)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
os.makedirs(output_dir, exist_ok=True)

return output_path
3 changes: 2 additions & 1 deletion bdbag/fetch/auth/keychain.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ def has_auth_attr(auth, attr, quiet=False):

def get_auth_entries(url, auth):
entries = list()
url_lower = url.lower()
for entry in auth:
uri = entry.get("uri", "").lower().strip()
if uri in url.lower():
if url_lower.startswith(uri):
entries.append(entry)
return entries

Expand Down
Loading
Loading