@@ -343,14 +343,19 @@ def area(s):
343343# region: Quality Control
344344def process_cluster (cluster : List [Any ], enabled : bool = False ) -> List [Any ]:
345345 if not enabled :
346- np . random .shuffle (cluster )
346+ RNG .shuffle (cluster )
347347 return cluster [:1 ]
348348
349349 cluster .sort (
350350 key = lambda x : (
351- - x [- 1 ] if x [- 1 ] is not None else 0.0 , # star_events_count
352- - x [- 2 ] if x [- 2 ] is not None else 0.0 , # fork_events_count
353- - np .datetime64 (x [- 3 ]).astype (np .uint64 ) if x [- 3 ] is not None else 0.0 , # visit_date
351+ # license_type, the more permissive the better
352+ ["permissive" , "no_license" , "non_permissive" ].index (x [- 1 ]) if x [- 1 ] is not None else float ("inf" ),
353+ # star_events_count, the more the better
354+ - x [- 2 ] if x [- 2 ] is not None else 0.0 ,
355+ # fork_events_count, the more the better
356+ - x [- 3 ] if x [- 3 ] is not None else 0.0 ,
357+ # visit_date, the earliest the better, tie breaker
358+ np .datetime64 (x [- 4 ]).astype (np .uint64 ) if x [- 4 ] is not None else float ("inf" ),
354359 )
355360 )
356361 return cluster [:1 ]
@@ -382,7 +387,7 @@ def partitioned_save(df: DataFrame, chunk_size: int, max_partitions: int, output
382387 """
383388
384389 total_rows = df .count ()
385- partitions = max (1 , min (math .ceil (total_rows / chunk_size ), max_partitions ))
390+ partitions = max (256 , min (math .ceil (total_rows / chunk_size ), max_partitions ))
386391
387392 def save_partition (df : pd .DataFrame ) -> pd .DataFrame : # type: ignore
388393 pid = df ["__pid__" ].iloc [0 ]
@@ -392,15 +397,17 @@ def save_partition(df: pd.DataFrame) -> pd.DataFrame: # type: ignore
392397 )
393398 return pd .DataFrame ([{"__status__" : True , "__pid__" : pid }])
394399
400+ log .debug (f"Saving { total_rows } rows to { partitions } partitions." )
401+
395402 results = (
396403 df .repartition (partitions ) # random and uniform hash partitioning
397404 .withColumn ("__pid__" , F .spark_partition_id ())
398405 .groupBy ("__pid__" )
399406 .applyInPandas (save_partition , schema = "__status__ boolean, __pid__ int" )
400- .toPandas ()
407+ .cache ()
401408 )
402409
403- if results [ "__status__" ]. all () :
410+ if results . filter ( ~ F . col ( "__status__" )). count () == 0 :
404411 pd .DataFrame ([]).to_csv (os .path .join (output , "_SUCCESS" ), index = False , header = False )
405412 return
406413
@@ -433,6 +440,8 @@ def save_partition(df: pd.DataFrame) -> pd.DataFrame: # type: ignore
433440 conf = SparkConf ()
434441 conf .set ("spark.app.name" , "MinHashLSH" )
435442 conf .set ("spark.sql.execution.arrow.pyspark.enabled" , "true" )
443+ conf .set ("spark.storage.memoryFraction" , "1" )
444+ conf .set ("spark.default.parallelism" , "100" )
436445 spark = SparkSession .builder .config (conf = conf ).getOrCreate () # type: ignore
437446 log : Logger = spark .sparkContext ._jvm .org .apache .log4j .LogManager .getLogger (__name__ ) # type: ignore
438447
@@ -445,8 +454,8 @@ def save_partition(df: pd.DataFrame) -> pd.DataFrame: # type: ignore
445454 if B is None or R is None :
446455 B , R = optimal_param (args .threshold , args .num_perm )
447456
448- MAX_WRITE_CHUNK_SIZE : int = 1_000_000
449- MAX_WRITE_PARTITIONS : int = 256
457+ MAX_WRITE_CHUNK_SIZE : int = 80_000
458+ MAX_WRITE_PARTITIONS : int = 2048
450459 HASH_RANGES : List [Tuple [int , int ]] = [(i * R , (i + 1 ) * R ) for i in range (B )]
451460 PERMUTATIONS : Tuple [np .ndarray , np .ndarray ] = (
452461 RNG .randint (1 , MOD_PRIME , size = (args .num_perm ,), dtype = DTYPE ),
@@ -455,6 +464,7 @@ def save_partition(df: pd.DataFrame) -> pd.DataFrame: # type: ignore
455464
456465 # region: Data Loading
457466 df : DataFrame = spark .read .option ("mergeSchema" , "true" ).parquet (args .input )
467+ # df = df.filter(F.col("license_type") == "permissive").cache()
458468 if args .index_column is None :
459469 df = df .withColumn ("__id__" , F .monotonically_increasing_id ()).cache ()
460470 else :
@@ -600,11 +610,9 @@ def save_partition(df: pd.DataFrame) -> pd.DataFrame: # type: ignore
600610 "__component__" ,
601611 args .repo_column ,
602612 "visit_date" ,
613+ "fork_events_count" ,
603614 "star_events_count" ,
604- "fork_events_count"
605- # "max_stars_repo_stars_event_min_datetime",
606- # "max_stars_count",
607- # "max_forks_count",
615+ "license_type" ,
608616 ]
609617 if args .rank
610618 else [
0 commit comments