From e3b327bc5f8ff5144dfa6d27cb537106ff858498 Mon Sep 17 00:00:00 2001 From: Grace Murphy Date: Wed, 25 Oct 2023 18:12:24 -0600 Subject: [PATCH] WIP: fixed creds issue --- .../cloud_connectors/common/connector.py | 9 +- .../gcp_connector/connector.py | 193 +++++++++++------- 2 files changed, 126 insertions(+), 76 deletions(-) diff --git a/src/censys/cloud_connectors/common/connector.py b/src/censys/cloud_connectors/common/connector.py index 27b5803..51d85cc 100644 --- a/src/censys/cloud_connectors/common/connector.py +++ b/src/censys/cloud_connectors/common/connector.py @@ -49,7 +49,7 @@ def __init__(self, settings: Settings): """ if not self.provider: raise ValueError("The provider must be set.") - self.label_prefix = self.provider.label() + ": " + self.label_prefix = self.get_provider_label_prefix() self.settings = settings self.logger = get_logger( log_name=f"{self.provider.lower()}_cloud_connector", @@ -79,6 +79,13 @@ def __init__(self, settings: Settings): self.cloud_assets = defaultdict(set) self.current_service = None + def get_provider_label_prefix(self): + """Get the provider label prefix. + Returns: + str: Provider label prefix. + """ + return self.provider.label() + ": " + def delete_seeds_by_label(self, label: str): """Replace seeds for [label] with an empty list. diff --git a/src/censys/cloud_connectors/gcp_connector/connector.py b/src/censys/cloud_connectors/gcp_connector/connector.py index 7d4d355..5430163 100644 --- a/src/censys/cloud_connectors/gcp_connector/connector.py +++ b/src/censys/cloud_connectors/gcp_connector/connector.py @@ -29,6 +29,7 @@ class GcpScanContext: """Required configuration context for scan().""" + label_prefix: str provider_settings: GcpSpecificSettings organization_id: int credentials: service_account.Credentials @@ -37,6 +38,7 @@ class GcpScanContext: all_projects: dict[str, dict[str, str]] logger: Logger +# TODO: question: seed labels and cloud asset uids are the same for GCP. Is that confusing for keeping stale seeds out? Or does the fact that we submit seeds and cloud assets separately make it ok? class GcpCloudConnector(CloudConnector): """Gcp Cloud Connector.""" @@ -63,37 +65,95 @@ def __init__(self, settings: Settings): def scan_seeds(self, **kwargs): """Scan AWS for seeds.""" - scan_context: GcpScanContext = kwargs["scan_context"] + ctx: GcpScanContext = kwargs["scan_context"] logger = get_logger( log_name=f"{self.provider.lower()}_connector", level=self.settings.logging_level, - provider=f"{self.provider}_{scan_context.organization_id}", + provider=f"{self.provider}_{ctx.organization_id}", + ) + ctx.logger = logger + + key_file_path = ( + Path(self.settings.secrets_dir) + / ctx.provider_settings.service_account_json_file ) - scan_context.logger = logger + try: + ctx.credentials = ( + service_account.Credentials.from_service_account_file( + str(key_file_path) + ) + ) + except ValueError as e: + ctx.logger.error( + "Failed to load service account credentials from" + f" {key_file_path}: {e}" + ) + raise + + ctx.cloud_asset_client = AssetServiceClient( + credentials=ctx.credentials + ) + + # TODO: potentially add this to provider_settings. once populated it doesn't change, just accessed + all_projects = self.list_projects(ctx) + ctx.all_projects = all_projects - self.logger.info( - f"Scanning GCP organization {scan_context.organization_id} for seeds." + ctx.logger.info( + f"Scanning GCP organization {ctx.organization_id} for seeds." ) + # TODO: make sure events are working (not broken by worker pool change) + # TODO: specify that this is a scan of seeds + self.dispatch_event(EventTypeEnum.SCAN_STARTED) super().scan_seeds(**kwargs) + self.dispatch_event(EventTypeEnum.SCAN_FINISHED) + # TODO: maybe we can add some additional info here (like project id) for the healthcheck ui? def scan_cloud_assets(self, **kwargs): """Scan AWS for cloud assets.""" - scan_context: GcpScanContext = kwargs["scan_context"] + ctx: GcpScanContext = kwargs["scan_context"] logger = get_logger( log_name=f"{self.provider.lower()}_connector", level=self.settings.logging_level, - provider=f"{self.provider}_{scan_context.organization_id}", + provider=f"{self.provider}_{ctx.organization_id}", + ) + ctx.logger = logger + + key_file_path = ( + Path(self.settings.secrets_dir) + / ctx.provider_settings.service_account_json_file ) - scan_context.logger = logger + try: + ctx.credentials = ( + service_account.Credentials.from_service_account_file( + str(key_file_path) + ) + ) + except ValueError as e: + ctx.logger.error( + "Failed to load service account credentials from" + f" {key_file_path}: {e}" + ) + raise + + ctx.cloud_asset_client = AssetServiceClient( + credentials=ctx.credentials + ) + + all_projects = self.list_projects(ctx) + ctx.all_projects = all_projects - self.logger.info( - f"Scanning GCP organization {scan_context.organization_id} for cloud assets." + ctx.logger.info( + f"Scanning GCP organization {ctx.organization_id} for cloud assets." ) + # TODO: make sure events are working (not broken by worker pool change) + # TODO: specify that this is a scan of cloud assets + self.dispatch_event(EventTypeEnum.SCAN_STARTED) super().scan_cloud_assets(**kwargs) + self.dispatch_event(EventTypeEnum.SCAN_FINISHED) def scan_all(self): """Scan all Gcp Organizations.""" @@ -105,8 +165,12 @@ def scan_all(self): f"Scanning GCP using {self.settings.scan_concurrency} processes." ) + label_prefix = self.get_provider_label_prefix() + pool = Pool(processes=self.settings.scan_concurrency) + self.logger.debug("after pool, before for loop") + for provider_setting in provider_settings.values(): # `provider_setting` represents a specific top-level GcpOrganization entry in providers.yml # @@ -115,32 +179,13 @@ def scan_all(self): self.provider_settings = provider_setting organization_id = provider_setting.organization_id - key_file_path = ( - Path(self.settings.secrets_dir) - / provider_setting.service_account_json_file - ) - try: - credentials = ( - service_account.Credentials.from_service_account_file( - str(key_file_path) - ) - ) - except ValueError as e: - self.logger.error( - "Failed to load service account credentials from" - f" {key_file_path}: {e}" - ) - raise - cloud_asset_client = AssetServiceClient( - credentials=credentials - ) - try: # scan seeds with Healthcheck( self.settings, provider_setting, - provider={ + provider={"organization_id": organization_id}, + exception_map={ exceptions.Unauthenticated: "PERMISSIONS", exceptions.PermissionDenied: "PERMISSIONS", }, @@ -152,19 +197,17 @@ def scan_all(self): scan_context = GcpScanContext( provider_settings=provider_setting, organization_id=organization_id, - credentials=credentials, - cloud_asset_client=cloud_asset_client, - all_projects=all_projects, + credentials=None, + cloud_asset_client=None, + all_projects=None, logger=None, + label_prefix=label_prefix, ) - - # TODO: potentially add this to provider_settings. once populated it doesn't change, just accessed - all_projects = self.list_projects(scan_context) - scan_context.all_projects = all_projects pool.apply_async( self.scan_seeds, kwds={"scan_context": scan_context}, + error_callback=lambda e: self.logger.error(f"Async Error: {e}") ) except Exception as e: @@ -178,7 +221,8 @@ def scan_all(self): with Healthcheck( self.settings, provider_setting, - provider={ + provider={"organization_id": organization_id}, + exception_map={ exceptions.Unauthenticated: "PERMISSIONS", exceptions.PermissionDenied: "PERMISSIONS", }, @@ -190,18 +234,18 @@ def scan_all(self): scan_context = GcpScanContext( provider_settings=provider_setting, organization_id=organization_id, - credentials=credentials, - cloud_asset_client=cloud_asset_client, - all_projects=all_projects, + credentials=None, + cloud_asset_client=None, + all_projects=None, logger=None, + label_prefix=label_prefix, ) - all_projects = self.list_projects(scan_context) - scan_context.all_projects = all_projects pool.apply_async( self.scan_cloud_assets, kwds={"scan_context": scan_context}, + error_callback=lambda e: self.logger.error(f"Async Error: {e}") ) except Exception as e: @@ -230,7 +274,7 @@ def list_projects(self, ctx: GcpScanContext) -> dict[str, dict]: try: if versioned_resource := self.check_asset_version( - GcpCloudAssetInventoryTypes.PROJECT, project + ctx, GcpCloudAssetInventoryTypes.PROJECT, project ): resource = versioned_resource["resource"] version = versioned_resource["version"] @@ -270,19 +314,6 @@ def parse_project_number(self, path: str) -> str: project_number = project["project"] return project_number - def return_if_str(self, val: Any) -> str: - """Return the value if it is a string. - - Args: - val (Any): The value to check. - - Returns: - str: The value if it is a string. - """ - if isinstance(val, str): - return val - return "" - def check_asset_version( self, ctx: GcpScanContext, asset_type: GcpCloudAssetInventoryTypes, asset: dict ) -> Optional[dict]: @@ -335,6 +366,7 @@ def check_asset_version( return None + # TODO: do we need this still? # def clean_up(self): # """Remove seeds and cloud assets for GCP projects where no assets were found.""" # possible_projects = set(self.all_projects.keys()) @@ -379,7 +411,7 @@ def search_all_resources( SearchAllResourcesPager: Gcp assets. """ request = { - "scope": ctx.provider_settings.parent(), + "scope": ctx.provider_settings.parent(), # TODO: can I use this? "asset_types": [filter], "read_mask": "*", } @@ -388,13 +420,12 @@ def search_all_resources( def get_compute_instances(self, **kwargs): """Get Gcp compute instances assets.""" ctx: GcpScanContext = kwargs["scan_context"] - try: results = self.search_all_resources( ctx, filter=GcpCloudAssetInventoryTypes.COMPUTE_INSTANCE ) except ValueError as e: - self.logger.error(f"Failed to get compute instances: {e}") + ctx.logger.error(f"Failed to get compute instances: {e}") return for result in results: @@ -443,7 +474,7 @@ def get_compute_instances(self, **kwargs): ValueError, KeyError, ) as e: # pragma: no cover - self.logger.debug(f"Failed to parse asset: {asset}: {e}") + ctx.logger.debug(f"Failed to parse asset: {asset}: {e}") continue self.submit_seed_payload(label, seeds) @@ -457,7 +488,7 @@ def get_compute_addresses(self, **kwargs): ctx, filter=GcpCloudAssetInventoryTypes.COMPUTE_ADDRESS ) except ValueError as e: - self.logger.error(f"Failed to get compute addresses: {e}") + ctx.logger.error(f"Failed to get compute addresses: {e}") return for result in results: @@ -494,7 +525,7 @@ def get_compute_addresses(self, **kwargs): ValueError, KeyError, ) as e: # pragma: no cover - self.logger.debug(f"Failed to parse asset: {asset}: {e}") + ctx.logger.debug(f"Failed to parse asset: {asset}: {e}") continue self.submit_seed_payload(label, seeds) @@ -508,7 +539,7 @@ def get_container_clusters(self, **kwargs): ctx, filter=GcpCloudAssetInventoryTypes.CONTAINER_CLUSTER ) except ValueError as e: - self.logger.error(f"Failed to get container clusters: {e}") + ctx.logger.error(f"Failed to get container clusters: {e}") return for result in results: @@ -549,7 +580,7 @@ def get_container_clusters(self, **kwargs): ValueError, KeyError, ) as e: # pragma: no cover - self.logger.debug(f"Failed to parse asset: {asset}: {e}") + ctx.logger.debug(f"Failed to parse asset: {asset}: {e}") continue self.submit_seed_payload(label, seeds) @@ -563,7 +594,7 @@ def get_cloud_sql_instances(self, **kwargs): ctx, filter=GcpCloudAssetInventoryTypes.CLOUD_SQL_INSTANCE ) except ValueError as e: - self.logger.error(f"Failed to get cloud sql instances: {e}") + ctx.logger.error(f"Failed to get cloud sql instances: {e}") return for result in results: @@ -604,7 +635,7 @@ def get_cloud_sql_instances(self, **kwargs): ValueError, KeyError, ) as e: # pragma: no cover - self.logger.debug(f"Failed to parse asset: {asset}: {e}") + ctx.logger.debug(f"Failed to parse asset: {asset}: {e}") continue self.submit_seed_payload(label, seeds) @@ -616,7 +647,7 @@ def get_dns_records(self, **kwargs): try: results = self.search_all_resources(ctx, filter=GcpCloudAssetInventoryTypes.DNS_ZONE) except ValueError as e: - self.logger.error(f"Failed to get dns records: {e}") + ctx.logger.error(f"Failed to get dns records: {e}") return for result in results: @@ -648,7 +679,7 @@ def get_dns_records(self, **kwargs): ValueError, KeyError, ) as e: # pragma: no cover - self.logger.debug(f"Failed to parse asset: {asset}: {e}") + ctx.logger.debug(f"Failed to parse asset: {asset}: {e}") continue self.submit_seed_payload(label, seeds) @@ -662,12 +693,17 @@ def get_storage_buckets(self, **kwargs): ctx, filter=GcpCloudAssetInventoryTypes.STORAGE_BUCKET ) except ValueError as e: - self.logger.error(f"Failed to get storage buckets: {e}") + ctx.logger.error(f"Failed to get storage buckets: {e}") return + for result in results: try: asset = ResourceSearchResult.to_dict(result) + # We want to submit a payload per uid, so we need to keep track of the uids we've seen + # TODO: should this be a set instead of a list? Could there be duplicate buckets? + # findings = { 'GCP: 123456789123/my-project-1': [asset, ...], 'GCP: 123456789123/my-project-2': [asset, ...]} + findings: dict[str, list[GcpStorageBucketAsset]] = {} if versioned_resource := self.check_asset_version( ctx, GcpCloudAssetInventoryTypes.STORAGE_BUCKET, asset ): @@ -682,23 +718,30 @@ def get_storage_buckets(self, **kwargs): scan_data["projectName"] = project_id scan_data["location"]: str = resource["location"] scan_data["selfLink"]: str = resource["selfLink"] - uid = self.format_uid(project_id) + uid = self.format_uid(ctx, project_id) with SuppressValidationError(): bucket_asset = GcpStorageBucketAsset( # TODO: Update when API can accept other urls value=f"https://storage.googleapis.com/{bucket_name}", uid=uid, # Cast project_number to int from float - scan_data=scan_data, + scan_data=scan_data, # TODO: is the scan_data the same? ) # self.add_cloud_asset(bucket_asset) - # TODO: this is not complete + # TODO: do we need this? does it need other args like bucket_name? bucket_asset = self.process_cloud_asset(bucket_asset) + + if uid not in findings: + findings[uid] = [] + findings[uid].append(bucket_asset) except ( json.decoder.JSONDecodeError, ValueError, KeyError, ) as e: # pragma: no cover - self.logger.debug(f"Failed to parse asset: {asset}: {e}") + ctx.logger.debug(f"Failed to parse asset: {asset}: {e}") continue + + for uid, assets in findings.items(): + self.submit_cloud_asset_payload(uid, assets)