Skip to content

Commit

Permalink
Merge pull request #4589 from codalab/sanity_check
Browse files Browse the repository at this point in the history
Sanity check
  • Loading branch information
dma1dma1 authored Jan 28, 2024
2 parents f213477 + c7bc2ab commit 51bbd25
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 60 deletions.
202 changes: 142 additions & 60 deletions codalab/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import os
import signal
import tarfile
import random
from apache_beam.io.filesystems import FileSystems
from codalab.common import (
StorageType,
Expand All @@ -52,6 +53,7 @@
from codalab.worker.file_util import (
OpenFile,
read_file_section,
read_file_section_gzip,
tar_gzip_directory,
)

Expand Down Expand Up @@ -148,12 +150,13 @@ class Migration:
ATTENTION: this class will modify real bundle database.
"""

def __init__(self, target_store_name, change_db, delete, proc_id) -> None:
def __init__(self, target_store_name, change_db, delete, proc_id, sanity_check_number) -> None:
self.target_store_name = target_store_name
self.change_db = change_db
self.delete = delete
self.times = defaultdict(list)
self.proc_id = proc_id
self.sanity_check_count = sanity_check_number

self.setUp()

Expand Down Expand Up @@ -278,7 +281,7 @@ def upload_to_azure_blob(self, bundle_uuid, bundle_location, is_dir=False):
)

if is_dir:
source_fileobj = tar_gzip_directory(bundle_location, exclude_patterns=None)
source_fileobj = tar_gzip_directory(bundle_location)
source_ext = ".tar.gz"
unpack = True
else:
Expand Down Expand Up @@ -334,42 +337,88 @@ def modify_bundle_data(self, bundle, bundle_uuid, is_dir):
def sanity_check(self, bundle_uuid, bundle_location, bundle_info, is_dir, new_location=None):
if new_location is None:
new_location = self.get_bundle_location(bundle_uuid)

success, reason = None, None

if is_dir:
# For dirs, check the folder contains same files
with OpenFile(new_location, gzipped=True, exclude_patterns=None) as f:
with OpenFile(new_location, gzipped=True) as f:
new_file_list = tarfile.open(fileobj=f, mode='r:gz').getnames()
new_file_list.sort()

(files, dirs) = path_util.recursive_ls(bundle_location)
(dirs, files) = path_util.recursive_ls(bundle_location)
files = [n.replace(bundle_location, '.') for n in files]
dirs = [n.replace(bundle_location, '.') for n in dirs]
old_file_list = files + dirs
old_file_list = [n.replace(bundle_location, '.') for n in old_file_list]
old_file_list.sort()
if old_file_list != new_file_list:
if old_file_list!= new_file_list:
return False, "Directory file lists differ."

# Sanity check files
if self.sanity_check_count == 0 or self.sanity_check_count > len(files):
# Check all files
success, reason = self.dir_file_sanity_check(bundle_location, new_location, files)
else:
# Check sanity_check_count files, excluding directories
file_sample = random.sample(files, self.sanity_check_count)
success, reason = self.dir_file_sanity_check(bundle_location, new_location, file_sample)

else:
# For files, check the file has same contents
old_content = read_file_section(bundle_location, 5, 10)
new_content = read_file_section(new_location, 5, 10)
success, reason = self.file_sanity_check(bundle_location, new_location)

return success, reason

def file_sanity_check(self, bundle_location, new_location):
# For files, check the file has same contents
old_content = read_file_section(bundle_location, 5, 10)
new_content = read_file_section(new_location, 5, 10)
if old_content != new_content:
return False, "First 5 bytes differ."

old_file_size = path_util.get_path_size(bundle_location)
new_file_size = path_util.get_path_size(new_location)
if old_file_size != new_file_size:
return False, "File sizes differ"

# check file contents of last 10 bytes
if old_file_size < 10:
if read_file_section(bundle_location, 0, 10) != read_file_section(
new_location, 0, 10
):
return False, "First 10 bytes differ."
else:
if read_file_section(bundle_location, old_file_size - 10, 10) != read_file_section(
new_location, old_file_size - 10, 10
):
return False, "Last 10 bytes differ."

return True, ""

def dir_file_sanity_check(self, bundle_location, new_location, file_list):
for file in file_list:
file_location = bundle_location + "/" + file

# For files, check the file has same contents
old_content = read_file_section(file_location, 5, 10)
new_content = read_file_section_gzip(new_location, file, 5, 10)
if old_content != new_content:
return False, "First 5 bytes differ."

old_file_size = path_util.get_path_size(bundle_location)
new_file_size = path_util.get_path_size(new_location)
if old_file_size != new_file_size:
return False, "File sizes differ"
old_file_size = path_util.get_path_size(file_location)

# check file contents of last 10 bytes
if old_file_size < 10:
if read_file_section(bundle_location, 0, 10) != read_file_section(
new_location, 0, 10
if read_file_section(file_location, 0, 10) != read_file_section_gzip(
new_location, file, 0, 10
):
return False, "First 10 bytes differ."
else:
if read_file_section(bundle_location, old_file_size - 10, 10) != read_file_section(
new_location, old_file_size - 10, 10
if read_file_section(file_location, old_file_size - 10, 10) != read_file_section_gzip(
new_location, file, old_file_size - 10, 10
):
return False, "Last 10 bytes differ."

return True, ""

def delete_original_bundle(self, uuid):
Expand Down Expand Up @@ -428,57 +477,84 @@ def migrate_bundle(self, bundle_uuid):
self.logger.info("Getting Bundle info")
bundle = self.get_bundle(bundle_uuid)
bundle_location = self.get_bundle_location(bundle_uuid)
bundle_info = self.get_bundle_info(bundle_uuid, bundle_location)
is_dir = bundle_info['type'] == 'directory'
target_location = self.blob_target_location(bundle_uuid, is_dir)
disk_location = self.get_bundle_disk_location(bundle_uuid)

# Don't migrate currently running bundles
if bundle.state not in State.FINAL_STATES:
bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
return

# Don't migrate linked bundles
if self.is_linked_bundle(bundle_uuid):
bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
return

# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED:
return
elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.status = MigrationStatus.CHANGED_DB
elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)[0]):
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# Upload to Azure.
if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
success, reason = self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)
if not success:
raise ValueError(f"SanityCheck failed with {reason}")
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# This is for handling cases where rm -d was run on the bundle
is_bundle_rm = False
bundle_info = None
try:
bundle_info = self.get_bundle_info(bundle_uuid, bundle_location)
except Exception as e:
if "Path ''" in str(e):
for i in range(0, 10):
try:
bundle_info = self.get_bundle_info(bundle_uuid, f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}')
bundle_location = f'/home/azureuser/codalab-worksheets/var/codalab/home/partitions/codalab{i}/bundles/{bundle_uuid}'
bundle_migration_status.status = MigrationStatus.NOT_STARTED
except:
pass

if not bundle_info:
self.logger.info(f"{bundle_uuid} will have database metadata changed to blob without migration")
is_bundle_rm = True
is_dir = False
bundle_migration_status.status = MigrationStatus.NOT_STARTED
bundle_migration_status.error_message = None
else:
raise e

# Normal Migration
if not is_bundle_rm:
is_dir = bundle_info['type'] == 'directory'
target_location = self.blob_target_location(bundle_uuid, is_dir)
disk_location = self.get_bundle_disk_location(bundle_uuid)

# Don't migrate currently running bundles
if bundle.state not in State.FINAL_STATES:
bundle_migration_status.status = MigrationStatus.SKIPPED_NOT_FINAL
return

# Don't migrate linked bundles
if self.is_linked_bundle(bundle_uuid):
bundle_migration_status.status = MigrationStatus.SKIPPED_LINKED
return

# if db already changed
# TODO: Check if bundle_location is azure (see other places in code base.)
if bundle_migration_status.status == MigrationStatus.FINISHED and bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
return
elif bundle_migration_status.changed_db() or bundle_location.startswith(StorageURLScheme.AZURE_BLOB_STORAGE.value):
bundle_migration_status.status = MigrationStatus.CHANGED_DB
elif bundle_migration_status.uploaded_to_azure() or (FileSystems.exists(target_location) and self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)[0]):
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# Upload to Azure.
# print(bundle_migration_status.uploaded_to_azure(), os.path.lexists(disk_location))
if not bundle_migration_status.uploaded_to_azure() and os.path.lexists(disk_location):
self.logger.info("Uploading to Azure")
start_time = time.time()
self.adjust_quota_and_upload_to_blob(bundle_uuid, bundle_location, is_dir)
self.times["adjust_quota_and_upload_to_blob"].append(time.time() - start_time)
success, reason = self.sanity_check(
bundle_uuid, bundle_location, bundle_info, is_dir, target_location
)
if not success:
raise ValueError(f"SanityCheck failed with {reason}")
bundle_migration_status.status = MigrationStatus.UPLOADED_TO_AZURE
bundle_migration_status.error_message = None

# Change bundle metadata in database to point to the Azure Blob location (not disk)
if self.change_db and not bundle_migration_status.changed_db():
if (self.change_db or is_bundle_rm) and not bundle_migration_status.changed_db() and not bundle_migration_status.error_message:
self.logger.info("Changing DB")
start_time = time.time()
self.modify_bundle_data(bundle, bundle_uuid, is_dir)
self.times["modify_bundle_data"].append(time.time() - start_time)
bundle_migration_status.status = MigrationStatus.CHANGED_DB

# Delete the bundle from disk.
if self.delete:
if self.delete and not is_bundle_rm:
self.logger.info("Deleting from disk")
start_time = time.time()
if os.path.lexists(disk_location):
Expand All @@ -495,6 +571,8 @@ def migrate_bundle(self, bundle_uuid):
bundle_migration_status.status = MigrationStatus.ERROR

finally:
if bundle_migration_status.error_message:
bundle_migration_status.status = MigrationStatus.ERROR
self.bundle_migration_statuses.append(bundle_migration_status)

def log_times(self):
Expand Down Expand Up @@ -531,7 +609,7 @@ def migrate_bundles(self, bundle_uuids, log_interval=100):
self.write_bundle_statuses()


def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_result, num_processes, proc_id):
def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_result, num_processes, sanity_check_number, proc_id):
"""A function for running the migration in parallel.
NOTE: I know this is bad styling since we re-create the Migration object and the
Expand All @@ -541,7 +619,7 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul
(BundleManager, CodalabManager, etc.), and so this is the compromise we came up with.
"""
# Setup Migration.
migration = Migration(target_store_name, change_db, delete, proc_id)
migration = Migration(target_store_name, change_db, delete, proc_id, sanity_check_number)

# Get bundle uuids (if not already provided)
if not bundle_uuids:
Expand Down Expand Up @@ -597,6 +675,9 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul
default=multiprocessing.cpu_count(),
)
parser.add_argument('-d', '--delete', help='Delete the original database', action='store_true')
parser.add_argument(
'-s', '--sanity_check_number', type=int, help='Sanity check directories on N files, default is 10, set to 0 for infinity', default=10
)
args = parser.parse_args()

# Run the program with multiprocessing
Expand All @@ -609,6 +690,7 @@ def job(target_store_name, change_db, delete, worksheet, bundle_uuids, max_resul
args.bundle_uuids,
args.max_result,
args.num_processes,
args.sanity_check_number,
)
with multiprocessing.Pool(processes=args.num_processes) as pool:
pool.map(f, list(range(args.num_processes)))
17 changes: 17 additions & 0 deletions codalab/worker/file_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import subprocess
import bz2
import hashlib
import tarfile
import stat

from codalab.common import BINARY_PLACEHOLDER, UsageError
Expand Down Expand Up @@ -466,6 +467,22 @@ def read_file_section(file_path, offset, length):
return fileobj.read(length)


def read_file_section_gzip(bundle_path, file_name, offset, length):
"""
TODO: UNSAFE
Given a tar.gz file, reads length bytes of given file_name from the
given offset.
Return bytes.
"""
with OpenFile(bundle_path, 'rb', gzipped=True) as bundle:
tf = tarfile.open(fileobj=bundle, mode='r:gz')
member = tf.getmember(file_name)
fileobj = tf.extractfile(member)
fileobj.seek(offset, os.SEEK_SET)
return fileobj.read(length)


def summarize_file(file_path, num_head_lines, num_tail_lines, max_line_length, truncation_text):
"""
Summarizes the file at the given path, returning a string containing the
Expand Down

0 comments on commit 51bbd25

Please sign in to comment.