|
1 | 1 | package org.jetbrains.kotlinx.dataframe.impl.api |
2 | 2 |
|
3 | | -import kotlinx.coroutines.async |
4 | | -import kotlinx.coroutines.awaitAll |
5 | | -import kotlinx.coroutines.coroutineScope |
6 | | -import kotlinx.coroutines.runBlocking |
7 | 3 | import kotlinx.datetime.Instant |
8 | 4 | import kotlinx.datetime.LocalDate |
9 | 5 | import kotlinx.datetime.LocalDateTime |
@@ -542,44 +538,32 @@ internal fun <T> DataColumn<String?>.parse(parser: StringParser<T>, options: Par |
542 | 538 | return DataColumn.createValueColumn(name(), parsedValues, parser.type.withNullability(hasNulls)) as DataColumn<T?> |
543 | 539 | } |
544 | 540 |
|
545 | | -internal fun <T> DataFrame<T>.parseImpl(options: ParserOptions?, columns: ColumnsSelector<T, Any?>): DataFrame<T> = |
546 | | - runBlocking { parseParallel(options, columns) } |
547 | | - |
548 | | -private suspend fun <T> DataFrame<T>.parseParallel( |
549 | | - options: ParserOptions?, |
550 | | - columns: ColumnsSelector<T, Any?>, |
551 | | -): DataFrame<T> = |
552 | | - coroutineScope { |
553 | | - val convertedCols = getColumnsWithPaths(columns).map { col -> |
554 | | - async { |
555 | | - when { |
556 | | - // when a frame column is requested to be parsed, |
557 | | - // parse each value/frame column at any depth inside each DataFrame in the frame column |
558 | | - col.isFrameColumn() -> |
559 | | - col.values.map { |
560 | | - async { |
561 | | - it.parseParallel(options) { |
562 | | - colsAtAnyDepth { !it.isColumnGroup() } |
563 | | - } |
564 | | - } |
565 | | - }.awaitAll() |
566 | | - .toColumn(col.name) |
567 | | - |
568 | | - // when a column group is requested to be parsed, |
569 | | - // parse each column in the group |
570 | | - col.isColumnGroup() -> |
571 | | - col.parseParallel(options) { all() } |
572 | | - .asColumnGroup(col.name()) |
573 | | - .asDataColumn() |
574 | | - |
575 | | - // Base case, parse the column if it's a `String?` column |
576 | | - col.isSubtypeOf<String?>() -> |
577 | | - col.cast<String?>().tryParse(options) |
578 | | - |
579 | | - else -> col |
580 | | - }.let { ColumnToInsert(col.path, it) } |
581 | | - } |
582 | | - }.awaitAll() |
583 | | - |
584 | | - emptyDataFrame<T>().insertImpl(convertedCols) |
| 541 | +internal fun <T> DataFrame<T>.parseImpl(options: ParserOptions?, columns: ColumnsSelector<T, Any?>): DataFrame<T> { |
| 542 | + val convertedCols = getColumnsWithPaths(columns).map { col -> |
| 543 | + when { |
| 544 | + // when a frame column is requested to be parsed, |
| 545 | + // parse each value/frame column at any depth inside each DataFrame in the frame column |
| 546 | + col.isFrameColumn() -> |
| 547 | + col.values.map { |
| 548 | + it.parseImpl(options) { |
| 549 | + colsAtAnyDepth { !it.isColumnGroup() } |
| 550 | + } |
| 551 | + }.toColumn(col.name) |
| 552 | + |
| 553 | + // when a column group is requested to be parsed, |
| 554 | + // parse each column in the group |
| 555 | + col.isColumnGroup() -> |
| 556 | + col.parseImpl(options) { all() } |
| 557 | + .asColumnGroup(col.name()) |
| 558 | + .asDataColumn() |
| 559 | + |
| 560 | + // Base case, parse the column if it's a `String?` column |
| 561 | + col.isSubtypeOf<String?>() -> |
| 562 | + col.cast<String?>().tryParse(options) |
| 563 | + |
| 564 | + else -> col |
| 565 | + }.let { ColumnToInsert(col.path, it) } |
585 | 566 | } |
| 567 | + |
| 568 | + return emptyDataFrame<T>().insertImpl(convertedCols) |
| 569 | +} |
0 commit comments