Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[socradar] Add SOCRadar external import connector #3072

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

Radargoger
Copy link

Proposed changes

  • Add SOCRadar external import connector with support for:
    • IP addresses (IPv4/IPv6)
    • Domains
    • URLs
    • File hashes
  • Implement STIX2 mapping with metadata:
    • Source attribution
    • First/last seen dates
    • Confidence levels
    • TLP marking
  • Add rate limiting and pagination handling
  • Include error handling and logging

Related issues

  • None

Checklist

  • I consider the submitted work as finished
  • I tested the code for its functionality using different use cases
  • I added/update the relevant documentation (either on github or on notion)
  • Where necessary I refactored code to improve the overall quality

Further comments

This connector integrates SOCRadar's threat intelligence feeds into OpenCTI. Testing has been completed with:

  • OpenCTI 6.4.2
  • Python 3.11
  • Various indicator types and formats
  • STIX2 bundle creation and import verification

The implementation follows OpenCTI connector best practices and provides a robust solution for importing SOCRadar threat intelligence into the platform.

@helene-nguyen helene-nguyen added the partner used to identify PR from patner label Dec 5, 2024
@romain-filigran romain-filigran added feature use for describing a new feature to develop and removed feature use for describing a new feature to develop labels Dec 6, 2024
Copy link
Contributor

@flavienSindou flavienSindou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution!

Could you please rework the code and documentation a bit? Once that’s done, I’ll be happy to approve your pull request.

)

indicator = Indicator(
id=f"indicator--{str(uuid.uuid4())}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would create stochastic id and will prevent de-duplication performed by the platform.

You should rather use pycti.Indicator.generate_id method.


# Create relationship between indicator and maintainer identity
relationship = Relationship(
id=f"relationship--{str(uuid.uuid4())}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would create stochastic id and will prevent de-duplication performed by the platform.

You should rather use pycti.StixCoreRelationship.generate_id method.

try:
if feed_type == "url" or self._matches_pattern(value, "url"):
return URL(
id=f"url--{str(uuid.uuid4())}",
Copy link
Contributor

@flavienSindou flavienSindou Dec 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stix2.Observable IDs are deterministic if not explicitly modified.
This allows the platform to de-duplicate these entities.

)
return None

def run(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easier to use Opencti Scheduler

Here is an example of usage :

Comment on lines +28 to +41
| Parameter | Docker envvar | Mandatory | Description |
| --- | --- | --- | --- |
| `opencti_url` | `OPENCTI_URL` | Yes | The URL of the OpenCTI platform |
| `opencti_token` | `OPENCTI_TOKEN` | Yes | The default admin token configured in the OpenCTI platform |
| `connector_id` | `CONNECTOR_ID` | Yes | A valid arbitrary UUIDv4 for this connector |
| `connector_type` | `CONNECTOR_TYPE` | Yes | Must be 'EXTERNAL_IMPORT' |
| `connector_name` | `CONNECTOR_NAME` | Yes | Name of the connector |
| `connector_scope` | `CONNECTOR_SCOPE` | Yes | Scope of the connector (socradar) |
| `connector_confidence_level` | `CONNECTOR_CONFIDENCE_LEVEL` | Yes | Default confidence level for created data |
| `connector_log_level` | `CONNECTOR_LOG_LEVEL` | Yes | Logging level (debug, info, warn, error) |
| `radar_base_feed_url` | `RADAR_BASE_FEED_URL` | Yes | SocRadar API base URL |
| `radar_format_type` | `RADAR_FORMAT_TYPE` | Yes | Response format (.json) |
| `radar_socradar_key` | `RADAR_SOCRADAR_KEY` | Yes | Your SocRadar API key |
| `radar_interval` | `RADAR_INTERVAL` | Yes | Interval between runs in seconds |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update this configuration description to match your code please ?

Comment on lines +14 to +25
radar:
base_feed_url: "https://platform.socradar.com/api/threat/intelligence/feed_list/"
format_type: ".json?key="
socradar_key: "SOCRADAR_KEY"
run_interval: 600
collections_uuid:
collection_1:
id: ["COLLECTION_UUID"]
name: ["COLLECTION_NAME"]
default_marking: 'TLP:WHITE'
create_observables: true
create_indicators: true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rework this example and remove unused variables please ?

@Radargoger
Copy link
Author

Radargoger commented Dec 25, 2024 via email

Copy link
Contributor

@flavienSindou flavienSindou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the improvements you’ve made.

I have a few additional remarks to ensure that your connector performs optimally.

Additionally, I still encourage you to use the OpenCTI dedicated Scheduler instead of implementing your own.

Let me know if you need any support with this—I’ll be happy to assist. I look forward to reviewing your next changes!

https://filigran.io/auto-backpressue-control-octi-connectors/

- SOCRadar API key
- Python 3.11+

### Configuration
Copy link
Contributor

@flavienSindou flavienSindou Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

radar.format_type seems missing from the documentation

| `opencti.token` | `OPENCTI_TOKEN` | Yes | Your OpenCTI admin token |
| `radar.base_feed_url` | `RADAR_BASE_FEED_URL` | Yes | SOCRadar API base URL |
| `radar.socradar_key` | `RADAR_SOCRADAR_KEY` | Yes | Your SOCRadar API key |
| `radar.interval` | `RADAR_INTERVAL` | Yes | Time between runs (in seconds, default: 600) |
Copy link
Contributor

@flavienSindou flavienSindou Dec 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected parameter seems to be run_interval

current_time = datetime.utcnow()

identity = Identity(
id=f"identity--{str(uuid.uuid4())}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would create stochastic id and will prevent de-duplication performed by the platform.

You should rather use pycti.Identity.generate_id method.

self.socradar_key = get_config_variable(
"RADAR_SOCRADAR_KEY", ["radar", "socradar_key"], config
)
self.collections = get_config_variable(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seems possible to use self.collections as expected in

    def _process_feed(self, work_id: str) -> None:
        """Step 5.0: Process feed collection"""
        try:
            self.helper.log_info("Starting feed collection...")

            # Step 5.1: Process each collection
            for collection_name, collection_data in self.collections.items():
                try:
                    collection_id = collection_data["id"][0]

A Mapping of Sequence is used in this code whereas a string would be loaded by

        self.collections = get_config_variable(
            "RADAR_COLLECTIONS_UUID", ["radar", "collections_uuid"], config
        )

when using RADAR_COLLECTIONS_UUID env var.

To resolve this, you could try casting the variable as follows:

if isinstance(my_raw_var, str):
    try: 
        my_var = json.loads(my_raw_var)
    except json.JSONDecodeError: 
        raise ValueError("Expecting a valid json string Mapping for my_raw_var")
        

@flavienSindou flavienSindou linked an issue Dec 26, 2024 that may be closed by this pull request
@Radargoger Radargoger force-pushed the feature/socradar-connector branch 4 times, most recently from 512707a to c73d838 Compare December 27, 2024 13:44
@Radargoger
Copy link
Author

Radargoger commented Dec 27, 2024 via email

@Radargoger
Copy link
Author

Radargoger commented Jan 5, 2025 via email

@romain-filigran romain-filigran added this to the PRs backlog milestone Jan 7, 2025
@helene-nguyen
Copy link
Member

@Radargoger Thank you and sorry for the delay of the answer.
To allow us to move forward, could you please sign all of your commits and resolve the conflicts ?

@helene-nguyen helene-nguyen self-assigned this Jan 14, 2025
@elendx

This comment was marked as outdated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
partner used to identify PR from patner
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[SocRADAR] Create the connector
6 participants