|
| 1 | +import json |
| 2 | +from copy import deepcopy |
| 3 | +import requests |
| 4 | + |
| 5 | + |
| 6 | +# -------------------------------------------- |
| 7 | +# Resolve a $ref inside the components/schemas |
| 8 | +# -------------------------------------------- |
| 9 | + |
| 10 | + |
| 11 | +def resolve_ref(ref, components): |
| 12 | + ref_path = ref.replace("#/components/schemas/", "") |
| 13 | + if ref_path not in components: |
| 14 | + return {} |
| 15 | + schema = components[ref_path] |
| 16 | + |
| 17 | + # Deep copy to avoid mutating original schema |
| 18 | + return deepcopy(schema) |
| 19 | + |
| 20 | + |
| 21 | +# -------------------------------------------- |
| 22 | +# Recursively expand schemas and resolve $ref |
| 23 | +# -------------------------------------------- |
| 24 | +def expand_schema(schema, components): |
| 25 | + """Recursively expand all $ref inside a schema node.""" |
| 26 | + if not isinstance(schema, dict): |
| 27 | + return schema |
| 28 | + |
| 29 | + # If the schema is only a $ref, replace it fully |
| 30 | + if "$ref" in schema: |
| 31 | + target = resolve_ref(schema["$ref"], components) |
| 32 | + return expand_schema(target, components) |
| 33 | + |
| 34 | + expanded = {} |
| 35 | + for key, value in schema.items(): |
| 36 | + |
| 37 | + # Recurse into lists (e.g., 'allOf', 'oneOf') |
| 38 | + if isinstance(value, list): |
| 39 | + expanded[key] = [expand_schema(v, components) for v in value] |
| 40 | + continue |
| 41 | + |
| 42 | + # Recurse into dict children (e.g., items, properties, etc.) |
| 43 | + if isinstance(value, dict): |
| 44 | + expanded[key] = expand_schema(value, components) |
| 45 | + continue |
| 46 | + |
| 47 | + # Base case: primitive or unchanged field |
| 48 | + expanded[key] = value |
| 49 | + |
| 50 | + return expanded |
| 51 | + |
| 52 | + |
| 53 | +def extract_top_level_inputs(schema, components): |
| 54 | + """Return only top-level properties of requestBody schema.""" |
| 55 | + expanded = expand_schema(schema, components) |
| 56 | + |
| 57 | + inputs = {} |
| 58 | + if expanded.get("type") == "object": |
| 59 | + for prop_name, prop_schema in expanded.get("properties", {}).items(): |
| 60 | + # Fully expand each property |
| 61 | + inputs[prop_name] = expand_schema(prop_schema, components) |
| 62 | + return inputs |
| 63 | + |
| 64 | + |
| 65 | +# -------------------------------------------- |
| 66 | +# Convert schema to simple {field: type} |
| 67 | +# -------------------------------------------- |
| 68 | +def flatten_schema(schema): |
| 69 | + flat = {} |
| 70 | + |
| 71 | + def walk(name, node): |
| 72 | + if not isinstance(node, dict): |
| 73 | + return |
| 74 | + |
| 75 | + node_type = node.get("type") |
| 76 | + |
| 77 | + # --------------------------------- |
| 78 | + # Primitive types |
| 79 | + # --------------------------------- |
| 80 | + if node_type in ("string", "number", "integer", "boolean"): |
| 81 | + flat[name] = { |
| 82 | + "type": node_type |
| 83 | + } |
| 84 | + return |
| 85 | + |
| 86 | + # --------------------------------- |
| 87 | + # Object |
| 88 | + # --------------------------------- |
| 89 | + if node_type == "object": |
| 90 | + props = node.get("properties", {}) |
| 91 | + |
| 92 | + # Create schema entry for this object |
| 93 | + flat[name] = { |
| 94 | + "type": "object", |
| 95 | + "properties": {} |
| 96 | + } |
| 97 | + |
| 98 | + # Add detailed properties |
| 99 | + for p_name, p_schema in props.items(): |
| 100 | + flat[name]["properties"][p_name] = p_schema |
| 101 | + |
| 102 | + # Also flatten subproperties |
| 103 | + full_name = f"{name}.{p_name}" if name else p_name |
| 104 | + walk(full_name, p_schema) |
| 105 | + |
| 106 | + return |
| 107 | + |
| 108 | + # --------------------------------- |
| 109 | + # Array |
| 110 | + # --------------------------------- |
| 111 | + if node_type == "array": |
| 112 | + items = node.get("items", {}) |
| 113 | + |
| 114 | + # Create array schema entry |
| 115 | + flat[name] = { |
| 116 | + "type": "array", |
| 117 | + "items": items |
| 118 | + } |
| 119 | + |
| 120 | + # Flatten item structure using name[] |
| 121 | + walk(name + "[]", items) |
| 122 | + return |
| 123 | + |
| 124 | + # Default fallback |
| 125 | + flat[name] = node |
| 126 | + |
| 127 | + walk("", schema) |
| 128 | + return flat |
| 129 | + |
| 130 | + |
| 131 | +# -------------------------------------------- |
| 132 | +# Process entire OpenAPI document |
| 133 | +# -------------------------------------------- |
| 134 | +def extract_check_records(openapi): |
| 135 | + components = openapi.get("components", {}).get("schemas", {}) |
| 136 | + paths = openapi.get("paths", {}) |
| 137 | + |
| 138 | + output = [] |
| 139 | + |
| 140 | + for path, methods in paths.items(): |
| 141 | + # Only process check endpoints |
| 142 | + if "/checks" not in path: |
| 143 | + continue |
| 144 | + for method, details in methods.items(): |
| 145 | + method = method.upper() |
| 146 | + |
| 147 | + # Split the URL into parts |
| 148 | + segments = path.strip("/").split("/") |
| 149 | + |
| 150 | + # Extract version (always after 'api') |
| 151 | + version = segments[1] |
| 152 | + |
| 153 | + # Find index of 'checks' |
| 154 | + checks_index = segments.index("checks") |
| 155 | + |
| 156 | + name = segments[-1] |
| 157 | + |
| 158 | + module = "/".join(segments[checks_index + 1:-1]) |
| 159 | + |
| 160 | + id = 'L-' + module + '-' + name + '-' + version |
| 161 | + |
| 162 | + entry = { |
| 163 | + "id": id, |
| 164 | + "path": path, |
| 165 | + "method": method, |
| 166 | + "name": name, |
| 167 | + "module": module, |
| 168 | + "version": version, |
| 169 | + "inputs": {} |
| 170 | + } |
| 171 | + |
| 172 | + # ---------------------------------------- |
| 173 | + # 1. Path or query parameters |
| 174 | + # ---------------------------------------- |
| 175 | + parameters = details.get("parameters", []) |
| 176 | + for p in parameters: |
| 177 | + name = p["name"] |
| 178 | + dtype = p.get("schema", {}).get("type", "unknown") |
| 179 | + entry["inputs"][name] = dtype |
| 180 | + |
| 181 | + # ---------------------------------------- |
| 182 | + # 2. Request body parameters |
| 183 | + # ---------------------------------------- |
| 184 | + if "requestBody" in details: |
| 185 | + content = details["requestBody"]["content"] |
| 186 | + |
| 187 | + if "application/json" in content: |
| 188 | + |
| 189 | + schema = content["application/json"].get("schema", {}) |
| 190 | + # Only expand top-level 'parameters' and 'situation' |
| 191 | + entry["inputs"].update( |
| 192 | + extract_top_level_inputs(schema, components)) |
| 193 | + |
| 194 | + output.append(entry) |
| 195 | + |
| 196 | + return output |
| 197 | + |
| 198 | + |
| 199 | +# -------------------------------------------- |
| 200 | +# Load your OpenAPI JSON here |
| 201 | +# -------------------------------------------- |
| 202 | +if __name__ == "__main__": |
| 203 | + |
| 204 | + url = "https://library-api-cnsoqyluna-uc.a.run.app/q/openapi.json" |
| 205 | + |
| 206 | + # Send a GET request |
| 207 | + response = requests.get(url) |
| 208 | + |
| 209 | + # Raise an error if the request failed |
| 210 | + response.raise_for_status() # optional, but good practice |
| 211 | + |
| 212 | + # Parse JSON |
| 213 | + data = response.json() |
| 214 | + |
| 215 | + check_records = extract_check_records(data) |
| 216 | + |
| 217 | + # Write JSON file using UTF-8 to avoid errors |
| 218 | + with open("endpoint_inputs.json", "w", encoding="utf-8") as out: |
| 219 | + json.dump(check_records, out, indent=2, ensure_ascii=False) |
| 220 | + |
| 221 | + print("Output written to endpoint_inputs.json") |
0 commit comments