Skip to content

Commit 218d87a

Browse files
committed
Add ODG controller
Reconciles ODG resources in mODG-root cluster and creates ODG-Extension resources (via gardener-resource-manager).
1 parent ae1dfe7 commit 218d87a

File tree

9 files changed

+648
-2
lines changed

9 files changed

+648
-2
lines changed

ocm_util.py

+57-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import collections.abc
22
import logging
3+
import urllib.parse
34

5+
import ci.util
46
import cnudie.iter
57
import cnudie.retrieve_async
68
import dso.model
@@ -64,7 +66,7 @@ def iter_local_blob_content(
6466
)
6567

6668

67-
async def find_artefact_node(
69+
async def find_artefact_node_async(
6870
component_descriptor_lookup: cnudie.retrieve_async.ComponentDescriptorLookupById,
6971
artefact: dso.model.ComponentArtefactId,
7072
absent_ok: bool=False,
@@ -119,3 +121,57 @@ async def find_artefact_node(
119121

120122
if not absent_ok:
121123
raise ValueError(f'could not find OCM node for {artefact=}')
124+
125+
126+
def to_absolute_access(
127+
access: ocm.OciAccess | ocm.RelativeOciAccess,
128+
ocm_repo: ocm.OciOcmRepository=None,
129+
) -> ocm.OciAccess:
130+
if access.type is ocm.AccessType.OCI_REGISTRY:
131+
return access
132+
133+
if access.type is ocm.AccessType.RELATIVE_OCI_REFERENCE:
134+
if not '://' in ocm_repo.baseUrl:
135+
base_url = urllib.parse.urlparse(f'x://{ocm_repo.baseUrl}').netloc
136+
else:
137+
base_url = urllib.parse.urlparse(ocm_repo.baseUrl).netloc
138+
139+
return ocm.OciAccess(
140+
imageReference=ci.util.urljoin(base_url, access.reference),
141+
)
142+
143+
raise NotImplementedError(access.type)
144+
145+
146+
def find_artefact_node(
147+
component: ocm.Component,
148+
component_descriptor_lookup,
149+
artefact_name: str,
150+
artefact_version: str,
151+
artefact_type: str,
152+
node_filter: collections.abc.Callable | None,
153+
absent_ok: bool=False,
154+
recursion_depth: int=-1,
155+
) -> cnudie.iter.ArtefactNode | None:
156+
for rnode in cnudie.iter.iter(
157+
component=component,
158+
lookup=component_descriptor_lookup,
159+
node_filter=node_filter,
160+
recursion_depth=recursion_depth,
161+
):
162+
rnode: cnudie.iter.ResourceNode
163+
if not (
164+
rnode.resource.name == artefact_name
165+
and rnode.resource.version == artefact_version
166+
and rnode.resource.type == artefact_type
167+
):
168+
continue
169+
170+
return rnode
171+
172+
else:
173+
if absent_ok:
174+
return None
175+
176+
raise ValueError(f'no ocm node found for {artefact_name=} {artefact_version=} \
177+
{artefact_type=}')

odg_operator/__init__.py

Whitespace-only changes.

odg_operator/odg_controller.py

+232
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
import argparse
2+
import collections
3+
import http
4+
import logging
5+
import os
6+
7+
import dacite
8+
import kubernetes.client
9+
import kubernetes.client.rest
10+
import kubernetes.watch
11+
import urllib3
12+
import yaml
13+
14+
import ci.log
15+
import cnudie.iter
16+
import oci.client
17+
18+
import k8s.util
19+
import lookups
20+
import odg_operator.odg_model as odgm
21+
22+
23+
ci.log.configure_default_logging()
24+
logger = logging.getLogger(__name__)
25+
own_dir = os.path.abspath(os.path.dirname(__file__))
26+
CUSTOMER_CLEANUP_FINALIZER = 'open-delivery-gear.ocm.software/customer-cluster-cleanup'
27+
ODG_COMPONENT_NAME = 'ocm.software/ocm-gear'
28+
29+
30+
def find_extension_definition(
31+
extension_definitions: list[odgm.ExtensionDefinition],
32+
extension_name: str,
33+
absent_ok: bool=False,
34+
) -> odgm.ExtensionDefinition | None:
35+
for extension_definition in extension_definitions:
36+
if extension_definition.name == extension_name:
37+
return extension_definition
38+
39+
if absent_ok:
40+
return None
41+
42+
raise ValueError(f'unknown extension-definition for {extension_name=}')
43+
44+
45+
def add_missing_dependencies(
46+
requested: list[odgm.ExtensionDefinition],
47+
known: list[odgm.ExtensionDefinition],
48+
) -> list[odgm.ExtensionDefinition]:
49+
'''
50+
recursively add known extensions until all dependencies are included.
51+
assumes extension-definitions are consistent.
52+
'''
53+
seen = set([e.name for e in requested])
54+
for extension_definition in requested:
55+
if not set(extension_definition.dependencies).issubset(seen):
56+
requested.extend([
57+
find_extension_definition(
58+
extension_definitions=known,
59+
extension_name=dependency,
60+
)
61+
for dependency in extension_definition.dependencies
62+
])
63+
return add_missing_dependencies(
64+
requested=requested,
65+
known=known,
66+
)
67+
68+
return requested
69+
70+
71+
def outputs_as_jsonpath(
72+
outputs_by_extension: dict,
73+
) -> dict:
74+
'''
75+
convert outputs as templated by extensions to lookup dict ready to use with `jsonpaths_ng`.
76+
'''
77+
output_lookup = collections.defaultdict(lambda: collections.defaultdict(dict))
78+
for name, outputs in outputs_by_extension.items():
79+
_outputs = {}
80+
for output in outputs:
81+
output: odgm.ExtensionOutput
82+
_outputs[output.name] = output.value
83+
output_lookup['dependencies'][name]['outputs'] = _outputs
84+
return dict(output_lookup)
85+
86+
87+
def reconcile(
88+
extension_definitions: list[odgm.ExtensionDefinition],
89+
component_descriptor_lookup,
90+
kubeconfig_path: str=None,
91+
):
92+
kubernetes_api = k8s.util.kubernetes_api(kubeconfig_path=kubeconfig_path)
93+
resource_version = ''
94+
95+
while True:
96+
group = odgm.ODGExtensionMeta.group
97+
plural = odgm.ODGMeta.plural
98+
logger.info(f'watching for events: {group=} {plural=}')
99+
try:
100+
for event in kubernetes.watch.Watch().stream(
101+
kubernetes_api.custom_kubernetes_api.list_cluster_custom_object,
102+
group=group,
103+
version='v1',
104+
plural=plural,
105+
resource_version=resource_version,
106+
timeout_seconds=0,
107+
):
108+
metadata = event['object'].get('metadata')
109+
odg_name = metadata['name']
110+
logger.info(f'{event["type"]} "{odg_name}" in "{metadata["namespace"]}"')
111+
112+
requested_extension_definitions = [
113+
find_extension_definition(
114+
extension_definitions=extension_definitions,
115+
extension_name=extension_name,
116+
)
117+
for extension_name in event['object']['spec']['extensions']
118+
]
119+
120+
requested_extension_definitions = add_missing_dependencies(
121+
requested_extension_definitions,
122+
extension_definitions,
123+
)
124+
125+
context = event['object']['spec']['context']
126+
127+
outputs_for_extension = dict([
128+
(
129+
extension_definition.name,
130+
extension_definition.templated_outputs(context),
131+
)
132+
for extension_definition in requested_extension_definitions
133+
])
134+
outputs_jsonpath = outputs_as_jsonpath(outputs_for_extension)
135+
136+
extension_instances = [
137+
odgm.ExtensionInstance.from_definition(
138+
extension_definition=extension_definition,
139+
outputs=outputs_jsonpath,
140+
component_descriptor_lookup=component_descriptor_lookup,
141+
)
142+
for extension_definition in requested_extension_definitions
143+
]
144+
145+
import pprint
146+
for extension_instance in extension_instances:
147+
pprint.pprint(extension_instance)
148+
149+
# TODO: create managed resources
150+
151+
except kubernetes.client.rest.ApiException as e:
152+
if e.status == http.HTTPStatus.GONE:
153+
resource_version = ''
154+
logger.info('API resource watching expired, will start new watch')
155+
else:
156+
raise e
157+
158+
except urllib3.exceptions.ProtocolError:
159+
# this is a known error which has no impact on the functionality, thus rather be
160+
# degregated to a warning or even info
161+
# [ref](https://github.com/kiwigrid/k8s-sidecar/issues/233#issuecomment-1332358459)
162+
resource_version = ''
163+
logger.info('API resource watching received protocol error, will start new watch')
164+
165+
166+
if __name__ == '__main__':
167+
parser = argparse.ArgumentParser()
168+
parser.add_argument('--kubeconfig')
169+
parser.add_argument('--extension-definition-file')
170+
parser.add_argument(
171+
'--extension',
172+
dest='extensions',
173+
action='append',
174+
default=[],
175+
help='can be specified multiple times, expected format: <component-name>:<component-version>'
176+
)
177+
parsed = parser.parse_args()
178+
179+
oci_client = oci.client.Client(
180+
credentials_lookup=lambda **kwargs: None, # consume public oci-images only
181+
)
182+
component_descriptor_lookup = lookups.init_component_descriptor_lookup(
183+
cache_dir='./cache/ocm',
184+
oci_client=oci_client,
185+
)
186+
187+
extension_definitions = []
188+
189+
if parsed.extension_definition_file:
190+
with open(parsed.extension_definition_file) as f:
191+
extensions_raw = yaml.safe_load_all(f)
192+
extension_definitions.extend([
193+
dacite.from_dict(
194+
data=extension_raw,
195+
data_class=odgm.ExtensionDefinition,
196+
)
197+
for extension_raw in extensions_raw
198+
])
199+
200+
for extension in parsed.extensions:
201+
component = component_descriptor_lookup(extension).component
202+
for resource_node in cnudie.iter.iter(
203+
component=component,
204+
recursion_depth=0,
205+
node_filter=cnudie.iter.Filter.resources,
206+
):
207+
if resource_node.resource.type == 'odg-extension':
208+
break
209+
else:
210+
raise ValueError(f'no odg-extension found in {extension}')
211+
212+
resource_node: cnudie.iter.ResourceNode
213+
odg_extension_raw = oci_client.blob(
214+
image_reference=resource_node.component.current_ocm_repo.component_version_oci_ref(
215+
name=resource_node.component.name,
216+
version=resource_node.component.version,
217+
),
218+
digest=resource_node.resource.access.localReference,
219+
stream=False,
220+
).json()
221+
extension_definitions.append(dacite.from_dict(
222+
data=odg_extension_raw,
223+
data_class=odgm.ExtensionDefinition,
224+
))
225+
226+
logger.info(f'known extension definitions: {[e.name for e in extension_definitions]}')
227+
228+
reconcile(
229+
kubeconfig_path=parsed.kubeconfig,
230+
extension_definitions=extension_definitions,
231+
component_descriptor_lookup=component_descriptor_lookup,
232+
)

0 commit comments

Comments
 (0)