Skip to content

Commit b6cfb0d

Browse files
authored
Try putting all test run info into clickhouse (#7347)
See how well clickhouse handles having all test run info Companion to https://github.com/pytorch/pytorch/pull/165484/files
1 parent 56ff274 commit b6cfb0d

File tree

1 file changed

+108
-0
lines changed

1 file changed

+108
-0
lines changed

aws/lambda/clickhouse-replicator-s3/lambda_function.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,112 @@ def encode_url_component(url):
5454
return urllib.parse.quote(url)
5555

5656

57+
def handle_test_run_s3_small(table, bucket, key) -> List[Dict[str, Any]]:
58+
def clean_up_query(query):
59+
return " ".join([line.strip() for line in query.split("\n")])
60+
61+
def get_sys_err_out_parser(name):
62+
# system-err and system-out generally have either the format:
63+
# Tuple(text String) or Array(Tuple(text String))
64+
# This function returns a query that will parse out the text field into an array of strings
65+
return f"""
66+
if(
67+
JSONArrayLength(`{name}`) is null,
68+
if(
69+
JSONHas(`{name}`, 'text'),
70+
array(JSONExtractString(`{name}`, 'text')),
71+
[ ]
72+
),
73+
JSONExtractArrayRaw(JSON_QUERY(`{name}`, '$[*].text'))
74+
) as `{name}`
75+
"""
76+
77+
def get_skipped_failure_parser_helper(name, type, field_to_check_for_existence):
78+
# skipped and failure generally have either the format:
79+
# Tuple(stuff) or Array(Tuple(stuff)).
80+
# The stuff varies. The type input should be the string `Tuple(stuff)`
81+
# The field_to_check_for_existence is the field that is checked to see
82+
# if the skip/rerun exists or if it should be an empty array. It is a
83+
# dictionary key in the tuple
84+
return f"""
85+
if(
86+
JSONArrayLength({name}) is null,
87+
if(
88+
JSONHas({name}, '{field_to_check_for_existence}'),
89+
array(
90+
JSONExtract(
91+
{name},
92+
'{type}'
93+
)
94+
),
95+
[ ]
96+
),
97+
JSONExtract(
98+
{name},
99+
'Array({type})'
100+
)
101+
) as {name}
102+
"""
103+
104+
# Cannot use general_adapter due to custom field for now()::DateTime64(9)
105+
# time_inserted
106+
query = f"""
107+
insert into {table}
108+
select
109+
classname,
110+
duration,
111+
{get_skipped_failure_parser_helper('error', 'Tuple(type String, message String, text String)', 'message')},
112+
{get_skipped_failure_parser_helper('failure', 'Tuple(type String, message String, text String)', 'message')},
113+
file,
114+
invoking_file,
115+
job_id,
116+
line::Int64,
117+
name,
118+
{get_skipped_failure_parser_helper('rerun', 'Tuple(message String, text String)', 'message')},
119+
result,
120+
{get_skipped_failure_parser_helper('skipped', 'Tuple(type String, message String, text String)', 'message')},
121+
status,
122+
time,
123+
now()::DateTime64(9) as time_inserted,
124+
workflow_id,
125+
workflow_run_attempt,
126+
('{bucket}', '{key}')
127+
from
128+
s3(
129+
'https://{bucket}.s3.amazonaws.com/{encode_url_component(key)}',
130+
'JSONEachRow',
131+
'
132+
`classname` String,
133+
`duration` Float32,
134+
`error` String,
135+
`failure` String,
136+
`file` String,
137+
`invoking_file` String,
138+
`job_id` Int64,
139+
`line` Float32,
140+
`name` String,
141+
`properties` Tuple(property Tuple(name String, value String)),
142+
`rerun` String,
143+
`result` String,
144+
`skipped` String,
145+
`status` String,
146+
`system-err` String,
147+
`system-out` String,
148+
`time` Float32,
149+
`type_param` String,
150+
`value_param` String,
151+
`workflow_id` Int64,
152+
`workflow_run_attempt` Int32',
153+
'gzip'
154+
)
155+
"""
156+
query = clean_up_query(query)
157+
try:
158+
get_clickhouse_client().query(query)
159+
except Exception as e:
160+
log_failure_to_clickhouse(table, bucket, key, e)
161+
162+
57163
def handle_test_run_s3(table, bucket, key) -> List[Dict[str, Any]]:
58164
def clean_up_query(query):
59165
return " ".join([line.strip() for line in query.split("\n")])
@@ -585,12 +691,14 @@ def cloudwatch_metrics_adapter(table, bucket, key):
585691
"disabled_tests_historical": "misc.disabled_tests_historical",
586692
# fbossci-cloudwatch-metrics bucket
587693
"ghci-related": "infra_metrics.cloudwatch_metrics",
694+
"all_test_runs": "fortesting.all_test_runs",
588695
}
589696

590697
OBJECT_CONVERTER = {
591698
"default.merges": merges_adapter,
592699
"default.test_run_s3": handle_test_run_s3,
593700
"default.failed_test_runs": handle_test_run_s3,
701+
"fortesting.all_test_runs": handle_test_run_s3_small,
594702
"default.test_run_summary": handle_test_run_summary,
595703
"default.merge_bases": merge_bases_adapter,
596704
"default.rerun_disabled_tests": rerun_disabled_tests_adapter,

0 commit comments

Comments
 (0)