44import yaml
55
66from pathlib import Path
7+ from typing import Optional
78from sigma .plugins import InstalledSigmaPlugins
89from sigma .conversion .base import Backend , SigmaCollection
910from sigma .exceptions import SigmaTransformationError
@@ -28,6 +29,7 @@ def __init__(self, parameters: dict, base_config, platform_name, logger_param) -
2829 self ._parameters = parameters ["pipelines" ]
2930 self ._filters_directory = base_config .get ("sigma_filters_directory" , None )
3031 self ._platform_name = platform_name
32+ self ._logger_param = logger_param
3133
3234 def get_pipeline_config_group (self , rule_content ):
3335 """Retrieve the logsource config group name
@@ -62,36 +64,75 @@ def ms_cloud_kusto(self) -> str | None:
6264 """
6365 return "kusto" if self ._platform_name in ["microsoft_sentinel" , "microsoft_xdr" ] else None
6466
65- def init_sigma_filters (self , rule_file ) -> None :
67+ def init_sigma_filters (self , rule_file , customer_filter_directory : Optional [ str ] = None ) -> None :
6668 """Function to load Sigma filters
69+
70+ Loads filters from both the default filters directory AND customer-specific directory.
71+ This allows base filters to be applied to all rules while customer-specific filters
72+ can add or override for specific customers.
73+
6774 Args:
68- filter_path
75+ rule_file: Path to the sigma rule file
76+ customer_filter_directory: Optional additional filter directory for customer-specific filters
77+
78+ Returns:
79+ SigmaCollection with rule and all applicable filters loaded
6980 """
70- filters = SigmaCollection .load_ruleset (
71- [
72- Path (self ._filters_directory ),
73- Path (rule_file )
74- ]
75- )
81+ paths_to_load = []
82+
83+ # Always load default filters first (if configured)
84+ if self ._filters_directory :
85+ default_filter_path = Path (self ._filters_directory )
86+ if default_filter_path .exists ():
87+ paths_to_load .append (default_filter_path )
88+ self .logger .debug (f"Loading default filters from: { self ._filters_directory } " )
89+
90+ # Then load customer-specific filters (if provided)
91+ if customer_filter_directory :
92+ customer_filter_path = Path (customer_filter_directory )
93+ if customer_filter_path .exists ():
94+ paths_to_load .append (customer_filter_path )
95+ self .logger .debug (f"Loading customer-specific filters from: { customer_filter_directory } " )
96+ else :
97+ self .logger .warning (f"Customer filter directory does not exist: { customer_filter_directory } " )
98+
99+ # Always include the rule file
100+ paths_to_load .append (Path (rule_file ))
101+
102+ filters = SigmaCollection .load_ruleset (paths_to_load )
76103
77104 return filters
78105
79- def init_sigma_rule (self , rule_file ) -> None :
106+ def init_sigma_rule (self , rule_file , customer_filter_directory : Optional [ str ] = None ) -> None :
80107 """Function to load a sigma rule
81108
82109 Args:
83- rule
110+ rule_file: Path to the sigma rule file
111+ customer_filter_directory: Optional customer-specific filter directory for MSSP mode
112+ (will be combined with default filters, not replace them)
84113 """
85114 with open (rule_file , "r" , encoding = "utf-8" ) as file :
86- if self ._filters_directory :
87- sigma_rule = self .init_sigma_filters (rule_file )
115+ # Load with filters if any filter directory is configured
116+ if self ._filters_directory or customer_filter_directory :
117+ if customer_filter_directory :
118+ self .logger .debug (f"Loading rule with default + customer-specific filters from: { customer_filter_directory } " )
119+ else :
120+ self .logger .debug ("Loading rule with default filters only" )
121+ sigma_rule = self .init_sigma_filters (rule_file , customer_filter_directory )
88122 else :
89123 sigma_rule = SigmaCollection .from_yaml (file )
90124
91125 return sigma_rule
92126
93- def convert_rule (self , rule_content , rule_file , platform ):
127+ def convert_rule (self , rule_content , rule_file , platform , customer_filter_directory : Optional [str ] = None ):
128+ """Convert a Sigma rule to the target platform query language
94129
130+ Args:
131+ rule_content: The parsed rule content dictionary
132+ rule_file: Path to the rule file
133+ platform: The target platform instance
134+ customer_filter_directory: Optional customer-specific filter directory for MSSP mode
135+ """
95136 plugins = InstalledSigmaPlugins .autodiscover ()
96137 backends = plugins .backends
97138 pipeline_resolver = plugins .get_pipeline_resolver ()
@@ -122,7 +163,16 @@ def convert_rule(self, rule_content, rule_file, platform):
122163 else :
123164 pipeline = None
124165 backend : Backend = backend_class (processing_pipeline = pipeline )
125- sigma_rule = self .init_sigma_rule (rule_file )
166+
167+ # Log filter application details
168+ if customer_filter_directory and self ._filters_directory :
169+ self .logger .info (f"Applying default filters + customer-specific filters from { customer_filter_directory } " )
170+ elif customer_filter_directory :
171+ self .logger .info (f"Applying customer-specific filters from { customer_filter_directory } " )
172+ elif self ._filters_directory :
173+ self .logger .debug (f"Applying default filters from { self ._filters_directory } " )
174+
175+ sigma_rule = self .init_sigma_rule (rule_file , customer_filter_directory )
126176 rule_converted = backend .convert (sigma_rule , self ._format )[0 ]
127177 # For esql and eql backend only
128178 if isinstance (platform , ElasticPlatform ):
@@ -175,6 +225,79 @@ def convert_rules(parameters, droid_config, base_config, logger_param):
175225 platform_name = parameters .platform
176226 target = Conversion (droid_config , base_config , platform_name , logger_param )
177227 platform = None
228+
229+ # Handle MSSP convert mode - show conversions for each customer with their filters
230+ if parameters .mssp :
231+ export_list_mssp = droid_config .get ("export_list_mssp" , None )
232+ if not export_list_mssp :
233+ logger .error (f"No export_list_mssp found in configuration for platform { platform_name } " )
234+ error = True
235+ return error , search_warning
236+
237+ logger .info ("MSSP Convert Mode: Converting rules for each customer with their filters" )
238+
239+ # Process rules for each customer
240+ if path .is_dir ():
241+ for rule_file in path .rglob ("*.y*ml" ):
242+ rule_content = load_rule (rule_file )
243+ logger .info (f"Converting rule: { rule_file } " )
244+
245+ # First show default conversion (no customer filters)
246+ try :
247+ default_converted = target .convert_rule (rule_content , rule_file , platform )
248+ logger .info (f" [DEFAULT] { default_converted } " )
249+ except Exception as e :
250+ logger .error (f" [DEFAULT] Conversion failed: { e } " )
251+ continue
252+
253+ # Then convert for each customer with their filters
254+ for group , info in export_list_mssp .items ():
255+ customer_name = info .get ('customer_name' , group )
256+ customer_filter_dir = info .get ('customer_filters_directory' )
257+
258+ if customer_filter_dir :
259+ try :
260+ customer_converted = target .convert_rule (
261+ rule_content , rule_file , platform , customer_filter_dir
262+ )
263+ logger .info (f" [{ customer_name } ] { customer_converted } " )
264+ except Exception as e :
265+ logger .warning (f" [{ customer_name } ] Conversion failed: { e } " )
266+ else :
267+ logger .info (f" [{ customer_name } ] No filters configured - same as default" )
268+
269+ print () # Empty line between rules for readability
270+
271+ elif path .is_file ():
272+ rule_file = path
273+ rule_content = load_rule (rule_file )
274+ logger .info (f"Converting rule: { rule_file } " )
275+
276+ # First show default conversion (no customer filters)
277+ try :
278+ default_converted = target .convert_rule (rule_content , rule_file , platform )
279+ logger .info (f" [DEFAULT] { default_converted } " )
280+ except Exception as e :
281+ logger .error (f" [DEFAULT] Conversion failed: { e } " )
282+ return True , search_warning
283+
284+ # Then convert for each customer with their filters
285+ for group , info in export_list_mssp .items ():
286+ customer_name = info .get ('customer_name' , group )
287+ customer_filter_dir = info .get ('customer_filters_directory' )
288+
289+ if customer_filter_dir :
290+ try :
291+ customer_converted = target .convert_rule (
292+ rule_content , rule_file , platform , customer_filter_dir
293+ )
294+ logger .info (f" [{ customer_name } ] { customer_converted } " )
295+ except Exception as e :
296+ logger .warning (f" [{ customer_name } ] Conversion failed: { e } " )
297+ else :
298+ logger .info (f" [{ customer_name } ] No filters configured - same as default" )
299+
300+ return error , search_warning
178301
179302 if parameters .platform and (parameters .search or parameters .export or parameters .integrity ):
180303 platform_name = parameters .platform
@@ -187,14 +310,33 @@ def convert_rules(parameters, droid_config, base_config, logger_param):
187310 platform = ElasticPlatform (droid_config , logger_param , "eql" , raw = False )
188311 elif "microsoft_sentinel" in platform_name and parameters .mssp :
189312 platform = SentinelPlatform (droid_config , logger_param , export_mssp = True )
313+ # Set up callback for customer-specific filter conversion
314+ # The callback receives customer_filter_directory directly from export_list_mssp
315+ platform .set_convert_rule_callback (
316+ lambda rule_content , rule_file , plat , customer_filter_dir : target .convert_rule (
317+ rule_content , rule_file , plat , customer_filter_dir
318+ )
319+ )
190320 elif "microsoft_sentinel" in platform_name :
191321 platform = SentinelPlatform (droid_config , logger_param , export_mssp = False )
192322 elif "microsoft_xdr" in platform_name and parameters .sentinel_xdr and parameters .mssp :
193323 platform = SentinelPlatform (droid_config , logger_param , export_mssp = True )
324+ # Set up callback for customer-specific filter conversion
325+ platform .set_convert_rule_callback (
326+ lambda rule_content , rule_file , plat , customer_filter_dir : target .convert_rule (
327+ rule_content , rule_file , plat , customer_filter_dir
328+ )
329+ )
194330 elif "microsoft_xdr" in platform_name and parameters .sentinel_xdr :
195331 platform = SentinelPlatform (droid_config , logger_param , export_mssp = False )
196332 elif "microsoft_xdr" in platform_name and parameters .mssp :
197333 platform = MicrosoftXDRPlatform (droid_config , logger_param , export_mssp = True )
334+ # Set up callback for customer-specific filter conversion
335+ platform .set_convert_rule_callback (
336+ lambda rule_content , rule_file , plat , customer_filter_dir : target .convert_rule (
337+ rule_content , rule_file , plat , customer_filter_dir
338+ )
339+ )
198340 elif "microsoft_xdr" in platform_name :
199341 platform = MicrosoftXDRPlatform (droid_config , logger_param , export_mssp = False )
200342
0 commit comments