Skip to content

Commit 978c6bf

Browse files
Fix imports in cache clean up scripts (#54)
1 parent 9fec32b commit 978c6bf

File tree

2 files changed

+52
-13
lines changed

2 files changed

+52
-13
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
1-
from presto_utils import execute_cluster_call
1+
from presto_utils import execute_presto_query
22
import argparse
33
import sys
44

55
def clean_directory_list_cache(hostname, username, password, catalog_name):
66
query = "CALL " + catalog_name + ".system.invalidate_directory_list_cache()"
7-
return execute_cluster_call(hostname, username, password, catalog_name, query)
7+
return execute_presto_query(hostname, username, password, catalog_name, query)
88

99
def clean_metastore_cache(hostname, username, password, catalog_name):
1010
query = "CALL " + catalog_name + ".system.invalidate_metastore_cache()"
11-
return execute_cluster_call(hostname, username, password, catalog_name, query)
11+
return execute_presto_query(hostname, username, password, catalog_name, query)
12+
13+
def clean_statistics_file_cache(hostname, username, password, catalog_name):
14+
query = "CALL " + catalog_name + ".system.invalidate_statistics_file_cache()"
15+
return execute_presto_query(hostname, username, password, catalog_name, query)
16+
17+
def clean_manifest_file_cache(hostname, username, password, catalog_name):
18+
query = "CALL " + catalog_name + ".system.invalidate_manifest_file_cache()"
19+
return execute_presto_query(hostname, username, password, catalog_name, query)
1220

1321
if __name__ == "__main__":
1422
parser = argparse.ArgumentParser(description='Connect to PrestoDB')
@@ -18,13 +26,18 @@ def clean_metastore_cache(hostname, username, password, catalog_name):
1826

1927
args = parser.parse_args()
2028

21-
catalog_list = ["hive"]
22-
is_list_cache_cleanup_enabled = True
23-
is_metadata_cache_cleanup_enabled = True
29+
hive_catalog_list = ["hive"]
30+
iceberg_catalog_list = ["iceberg"]
31+
32+
# coordinator cache
33+
is_list_cache_cleanup_enabled = True #for hive
34+
is_metadata_cache_cleanup_enabled = True #for hive
35+
is_statistics_file_cache_cleanup_enabled = True #for iceberg
36+
is_manifest_file_cache_cleanup_enabled = True #for iceberg
2437

2538
# Directory list cache clean up
2639
if is_list_cache_cleanup_enabled:
27-
for catalog_name in catalog_list:
40+
for catalog_name in hive_catalog_list:
2841
print("Cleaning up directory list cache for ", catalog_name)
2942
rows = clean_directory_list_cache(args.host, args.username, args.password, catalog_name)
3043
print("directory_list_cache_cleanup_query Query Result: ", rows)
@@ -36,7 +49,7 @@ def clean_metastore_cache(hostname, username, password, catalog_name):
3649

3750
# Metadata cache clean up
3851
if is_metadata_cache_cleanup_enabled:
39-
for catalog_name in catalog_list:
52+
for catalog_name in hive_catalog_list:
4053
print("Cleaning up metadata cache for ", catalog_name)
4154
rows = clean_metastore_cache(args.host, args.username, args.password, catalog_name)
4255
print("metastore_cache_cleanup_query Query Result: ", rows)
@@ -45,3 +58,27 @@ def clean_metastore_cache(hostname, username, password, catalog_name):
4558
else:
4659
print("Metastore cache clean up is failed for ", catalog_name)
4760
sys.exit(1)
61+
62+
# Statistics file cache clean up
63+
if is_statistics_file_cache_cleanup_enabled:
64+
for catalog_name in iceberg_catalog_list:
65+
print("Cleaning up statistics file cache for ", catalog_name)
66+
rows = clean_statistics_file_cache(args.host, args.username, args.password, catalog_name)
67+
print("statistics_file_cache_cleanup_query Query Result: ", rows)
68+
if rows[0][0] == True:
69+
print("Statistics file cache clean up is successful for ", catalog_name)
70+
else:
71+
print("Statistics file cache clean up is failed for ", catalog_name)
72+
sys.exit(1)
73+
74+
# Manifest file cache clean up
75+
if is_manifest_file_cache_cleanup_enabled:
76+
for catalog_name in iceberg_catalog_list:
77+
print("Cleaning up manifest file cache for ", catalog_name)
78+
rows = clean_manifest_file_cache(args.host, args.username, args.password, catalog_name)
79+
print("manifest_file_cache_cleanup_query Query Result: ", rows)
80+
if rows[0][0] == True:
81+
print("Manifest file cache clean up is successful for ", catalog_name)
82+
else:
83+
print("Manifest file cache clean up is failed for ", catalog_name)
84+
sys.exit(1)

benchmarks/scripts/cache_cleaning_workers.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,23 @@ def get_workers_public_ips(data):
1414
return worker_ips
1515

1616
def cleanup_worker_disk_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port):
17-
ssd_cache_clean_commands = ["curl", "-X", "GET", f"http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=ssd"]
17+
ssd_cache_clean_command = f"docker exec $(docker ps -q --filter 'name=^presto_workers') curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=ssd"
1818
for worker_ip in worker_public_ips:
19-
execute_ssh_command(worker_ip, login_user, ssh_key_path, ssd_cache_clean_commands)
19+
execute_ssh_command(worker_ip, login_user, ssh_key_path, ssd_cache_clean_command)
20+
print("cleanup_worker_disk_cache is successful!")
2021

2122
def cleanup_worker_os_cache(worker_public_ips, login_user, ssh_key_path):
2223
for worker_ip in worker_public_ips:
2324
os_cache_clean_commands = ["sudo sync && echo 3 | sudo tee /proc/sys/vm/drop_caches", "sudo swapoff -a; sudo swapon -a"]
2425
for command in os_cache_clean_commands:
2526
execute_ssh_command(worker_ip, login_user, ssh_key_path, command)
27+
print("cleanup_worker_os_cache is successful!")
2628

2729
def cleanup_worker_memory_cache(worker_public_ips, login_user, ssh_key_path, worker_http_port):
30+
memory_cache_clean_command = f"docker exec $(docker ps -q --filter 'name=^presto_workers') curl -sS -X GET http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=memory"
2831
for worker_ip in worker_public_ips:
29-
memory_cache_clean_commands = ["curl", "-X", "GET", f"http://localhost:{worker_http_port}/v1/operation/server/clearCache?type=memory"]
30-
for command in memory_cache_clean_commands:
31-
execute_ssh_command(worker_ip, login_user, ssh_key_path, command)
32+
execute_ssh_command(worker_ip, login_user, ssh_key_path, memory_cache_clean_command)
33+
print("cleanup_worker_memory_cache is successful!")
3234

3335
# Main function to connect and run queries
3436
if __name__ == "__main__":

0 commit comments

Comments
 (0)