Skip to content

Rservice #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ac4548d
Merge pull request #17 from databio/dev
khoroshevskyi Oct 4, 2023
f76790c
Merge pull request #21 from databio/dev
khoroshevskyi Feb 29, 2024
c71d03d
Update README.md
khoroshevskyi Feb 29, 2024
1d528d1
Merge pull request #45 from databio/khoroshevskyi-patch-1
khoroshevskyi Feb 29, 2024
efa915c
Merge pull request #57 from databio/dev
khoroshevskyi Apr 8, 2024
4298e4b
Merge pull request #61 from databio/dev
khoroshevskyi Apr 11, 2024
813a61a
Merge pull request #63 from databio/dev
khoroshevskyi Aug 21, 2024
5a21300
Merge pull request #73 from databio/dev
khoroshevskyi Aug 26, 2024
6770050
add proof of concept for external R service
nsheff Oct 5, 2024
180f14a
write RServiceManager class
nsheff Oct 7, 2024
12ea557
lint
nsheff Oct 7, 2024
622b943
add comment
nsheff Oct 7, 2024
c5d5077
allow python to recieve msg from R process
nsheff Feb 21, 2025
353ad56
Merge branch 'dev' into rservice
nsheff Feb 21, 2025
32ee47e
clean up R service
nsheff Feb 21, 2025
a5bd989
some minor updates
nsheff Feb 21, 2025
d48afdd
Merge branch 'dev' into rservice
khoroshevskyi Feb 27, 2025
803afcf
Merge remote-tracking branch 'origin/dev' into rservice
khoroshevskyi Feb 27, 2025
5689c3f
Working version of Rscript as separate service
khoroshevskyi Feb 28, 2025
dfa61ea
Added R service to bedboss main pipelines
khoroshevskyi Feb 28, 2025
3204e93
Fixed R deps
khoroshevskyi Feb 28, 2025
5b9c3e0
Fixed #108
khoroshevskyi Mar 5, 2025
fb47f41
updated docker file
khoroshevskyi Mar 5, 2025
8c5fc21
lint
khoroshevskyi Mar 5, 2025
37faad2
lint and github actions
khoroshevskyi Mar 5, 2025
e962080
Update bedboss/bedstat/bedstat.py
khoroshevskyi Mar 6, 2025
be283d3
Fixed few comment on PR
khoroshevskyi Mar 6, 2025
fd177bf
Merge remote-tracking branch 'origin/rservice' into rservice
khoroshevskyi Mar 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/black.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
- uses: psf/black@stable
3 changes: 1 addition & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,5 @@ COPY ./production/config.yaml /workdir/config.yaml
#RUN refgenie pull mm10/fasta --skip-read-lock -c /workdir/refgenie/genome_config.yaml
#RUN refgenie pull hg19/fasta --skip-read-lock -c /workdir/refgenie/genome_config.yaml

CMD ["sh", "-c", "bedboss reprocess-all --bedbase-config /workdir/config.yaml --outfolder /workdir/output --limit ${UPLOAD_LIMIT:-1}"]

#CMD ["bash"]
CMD ["sh", "-c", "bedboss reprocess-all --bedbase-config /workdir/config.yaml --outfolder /workdir/output --limit ${UPLOAD_LIMIT:-1}"]
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# bedboss
<h1 align="center">bedboss</h1>

<div align="center">

[![PEP compatible](https://pepkit.github.io/img/PEP-compatible-green.svg)](https://pep.databio.org/)
![Run pytests](https://github.com/bedbase/bedboss/workflows/Run%20instalation%20test/badge.svg)
[![pypi-badge](https://img.shields.io/pypi/v/bedboss?color=%2334D058)](https://pypi.org/project/bedboss)
[![pypi-version](https://img.shields.io/pypi/pyversions/bedboss.svg?color=%2334D058)](https://pypi.org/project/bedboss)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Github badge](https://img.shields.io/badge/source-github-354a75?logo=github)](https://github.com/databio/bedboss)

</div>

---

**Documentation**: <a href="https://docs.bedbase.org/bedboss" target="_blank">https://docs.bedbase.org/bedboss</a>
Expand Down
2 changes: 1 addition & 1 deletion bedboss/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Package-level data """
"""Package-level data"""

import logging

Expand Down
2 changes: 1 addition & 1 deletion bedboss/bbuploader/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
""" Package-level data """
"""Package-level data"""

import coloredlogs
import logmuse
Expand Down
52 changes: 44 additions & 8 deletions bedboss/bbuploader/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@
from bedboss.bbuploader.utils import create_gsm_sub_name
from bedboss.bedboss import run_all
from bedboss.bedbuncher.bedbuncher import run_bedbuncher
from bedboss.exceptions import BedBossException
from bedboss.exceptions import BedBossException, QualityException
from bedboss.skipper import Skipper
from bedboss.utils import calculate_time, download_file, standardize_genome_name
from bedboss.utils import (
calculate_time,
download_file,
standardize_genome_name,
run_initial_qc,
)
from bedboss.utils import standardize_pep as pep_standardizer
from bedboss.bedstat.r_service import RServiceManager

_LOGGER = logging.getLogger(PKG_NAME)
_LOGGER.setLevel(logging.DEBUG)
Expand Down Expand Up @@ -101,6 +107,12 @@ def upload_all(

count = 0
total_projects = len(pep_annotation_list.results)

if not lite:
r_service = RServiceManager()
else:
r_service = None

for gse_pep in pep_annotation_list.results:
count += 1
with Session(bbagent.config.db_engine.engine) as session:
Expand Down Expand Up @@ -158,6 +170,7 @@ def upload_all(
overwrite=overwrite,
overwrite_bedset=overwrite_bedset,
lite=lite,
r_service=r_service,
)
except Exception as err:
_LOGGER.error(
Expand Down Expand Up @@ -414,6 +427,8 @@ def _upload_gse(
reinit_skipper: bool = False,
preload: bool = True,
lite=False,
max_file_size: int = 20 * 1000000,
r_service: RServiceManager = None,
) -> ProjectProcessingStatus:
"""
Upload bed files from GEO series to BedBase
Expand All @@ -433,6 +448,8 @@ def _upload_gse(
:param reinit_skipper: reinitialize skipper, if set to True, skipper will be reinitialized and all logs will be
:param preload: pre - download files to the local folder (used for faster reproducibility)
:param lite: lite mode, where skipping statistic processing for memory optimization and time saving
:param max_file_size: maximum file size in bytes. Default: 20MB
:param r_service: RServiceManager object
:return: None
"""
if isinstance(bedbase_config, str):
Expand Down Expand Up @@ -463,16 +480,15 @@ def _upload_gse(
else:
skipper_obj = None

if not lite and not r_service:
r_service = RServiceManager()
else:
r_service = None

for counter, project_sample in enumerate(project.samples):
_LOGGER.info(f">> Processing {counter+1} / {total_sample_number}")
sample_gsm = project_sample.get("sample_geo_accession", "").lower()

# if int(project_sample.get("file_size") or 0) > 10000000:
# _LOGGER.info(f"Skipping: '{sample_gsm}' - file size is too big")
# project_status.number_of_skipped += 1
# skipper_obj.add_failed(sample_gsm, f"File size is too big. {int(project_sample.get('file_size'))/1000000} MB")
# continue

if skipper_obj:
is_processed = skipper_obj.is_processed(sample_gsm)
if is_processed:
Expand Down Expand Up @@ -531,6 +547,25 @@ def _upload_gse(
sample_status.status = STATUS.PROCESSING
sa_session.commit()

try:
if int(project_sample.get("file_size") or 0) > max_file_size:
raise QualityException(
f"File size is too big. {int(project_sample.get('file_size', 0)) / 1000000} MB"
)

# to speed up the process, we can run initial QC on the file
run_initial_qc(project_sample.file_url)
except QualityException as err:
_LOGGER.error(f"Processing of '{sample_gsm}' failed with error: {str(err)}")
sample_status.status = STATUS.FAIL
sample_status.error = str(err)
project_status.number_of_failed += 1

if skipper_obj:
skipper_obj.add_failed(sample_gsm, f"Error: {str(err)}")
sa_session.commit()
continue

if preload:
gsm_folder = create_gsm_sub_name(sample_gsm)
files_path = os.path.join(outfolder, FILE_FOLDER_NAME, gsm_folder)
Expand Down Expand Up @@ -561,6 +596,7 @@ def _upload_gse(
upload_qdrant=True,
force_overwrite=overwrite,
lite=lite,
r_service=r_service,
)
uploaded_files.append(file_digest)
if skipper_obj:
Expand Down
13 changes: 13 additions & 0 deletions bedboss/bedboss.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from bedboss.skipper import Skipper
from bedboss.utils import calculate_time, get_genome_digest, standardize_genome_name
from bedboss.utils import standardize_pep as pep_standardizer
from bedboss.bedstat.r_service import RServiceManager

_LOGGER = logging.getLogger(PKG_NAME)

Expand Down Expand Up @@ -82,6 +83,7 @@ def run_all(
universe_method: str = None,
universe_bedset: str = None,
pm: pypiper.PipelineManager = None,
r_service: RServiceManager = None,
) -> str:
"""
Run bedboss: bedmaker -> bedqc -> bedclassifier -> bedstat -> upload to s3, qdrant, pephub, and bedbase.
Expand Down Expand Up @@ -115,6 +117,7 @@ def run_all(
:param str universe_method: method used to create the universe [Default: None]
:param str universe_bedset: bedset identifier for the universe [Default: None]
:param pypiper.PipelineManager pm: pypiper object
:param RServiceManager r_service: RServiceManager object that will run R services
:return str bed_digest: bed digest
"""
if isinstance(bedbase_config, str):
Expand Down Expand Up @@ -171,6 +174,7 @@ def run_all(
just_db_commit=just_db_commit,
rfg_config=rfg_config,
pm=pm,
r_service=r_service,
)
statistics_dict["bed_type"] = bed_metadata.bed_type
statistics_dict["bed_format"] = bed_metadata.bed_format.value
Expand Down Expand Up @@ -363,6 +367,11 @@ def insert_pep(
if rerun:
skipper.reinitialize()

if not lite:
r_service = RServiceManager()
else:
r_service = None

for i, pep_sample in enumerate(pep.samples):
is_processed = skipper.is_processed(pep_sample.sample_name)
if is_processed:
Expand Down Expand Up @@ -408,6 +417,7 @@ def insert_pep(
universe_bedset=pep_sample.get("universe_bedset"),
lite=lite,
pm=pm,
r_service=r_service,
)

processed_ids.append(bed_id)
Expand Down Expand Up @@ -482,6 +492,8 @@ def reprocess_all(
else:
stop_pipeline = False

r_service = RServiceManager()

if isinstance(bedbase_config, str):
bbagent = BedBaseAgent(config=bedbase_config)
elif isinstance(bedbase_config, bbconf.BedBaseAgent):
Expand Down Expand Up @@ -525,6 +537,7 @@ def reprocess_all(
universe_method=None,
universe_bedset=None,
pm=pm,
r_service=r_service,
)
except Exception as e:
_LOGGER.error(f"Failed to process {bed_annot.name}. See {e}")
Expand Down
37 changes: 26 additions & 11 deletions bedboss/bedstat/bedstat.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from bedboss.exceptions import BedBossException, OpenSignalMatrixException
from bedboss.utils import download_file
from bedboss.bedstat.r_service import RServiceManager

_LOGGER = logging.getLogger("bedboss")

Expand Down Expand Up @@ -76,6 +77,7 @@ def bedstat(
just_db_commit: bool = False,
rfg_config: Union[str, Path] = None,
pm: pypiper.PipelineManager = None,
r_service: RServiceManager = None,
) -> dict:
"""
Run bedstat pipeline - pipeline for obtaining statistics about bed files
Expand All @@ -92,6 +94,7 @@ def bedstat(
:param str ensdb: a full path to the ensdb gtf file required for genomes
not in GDdata
:param pm: pypiper object
:param r_service: RServiceManager object

:return: dict with statistics and plots metadata
"""
Expand Down Expand Up @@ -156,18 +159,30 @@ def bedstat(
assert os.path.exists(rscript_path), FileNotFoundError(
f"'{rscript_path}' script not found"
)
command = (
f"Rscript {rscript_path} --bedfilePath={bedfile} "
f"--openSignalMatrix={open_signal_matrix} "
f"--outputFolder={outfolder_stats_results} --genome={genome} "
f"--ensdb={ensdb} --digest={bed_digest}"
)

try:
pm.run(cmd=command, target=json_file_path)
except Exception as e:
_LOGGER.error(f"Pipeline failed: {e}")
raise BedBossException(f"Pipeline failed: {e}")
if not r_service:
try:
_LOGGER.info("#=>>> Running usual way!")
command = (
f"Rscript {rscript_path} --bedfilePath={bedfile} "
f"--openSignalMatrix={open_signal_matrix} "
f"--outputFolder={outfolder_stats_results} --genome={genome} "
f"--ensdb={ensdb} --digest={bed_digest}"
)
pm.run(cmd=command, target=json_file_path)
except Exception as e:
_LOGGER.error(f"Pipeline failed: {e}")
raise BedBossException(f"Pipeline failed: {e}")
else:
_LOGGER.info("#=>>> Running R service ")
r_service.run_file(
file_path=bedfile,
digest=bed_digest,
outpath=outfolder_stats_results,
genome=genome,
openSignalMatrix=open_signal_matrix,
gtffile=ensdb,
)

data = {}
if os.path.exists(json_file_path):
Expand Down
Loading
Loading