From f968cd4bc63767355907208d1f9c64ef2fd85f24 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Wed, 31 Jul 2019 15:47:33 +0530 Subject: [PATCH 1/5] disallow persistence=none for column tables untested and as of now causes StackOverflowError during eviction --- .../main/scala/org/apache/spark/sql/store/StoreUtils.scala | 5 ++++- store | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/store/StoreUtils.scala b/core/src/main/scala/org/apache/spark/sql/store/StoreUtils.scala index 6b43523e98..97bd23dd52 100644 --- a/core/src/main/scala/org/apache/spark/sql/store/StoreUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/store/StoreUtils.scala @@ -442,10 +442,13 @@ object StoreUtils { v.equalsIgnoreCase("synchronous")) { sb.append(s"$GEM_PERSISTENT SYNCHRONOUS ") } else if (v.equalsIgnoreCase("none")) { + if (isShadowTable) { + throw Utils.analysisException(s"Column tables do not support $PERSISTENCE = none") + } isPersistent = false sb } else { - throw Utils.analysisException(s"Invalid value for option " + + throw Utils.analysisException("Invalid value for option " + s"$PERSISTENCE = $v (expected one of: sync, async, none, " + s"synchronous, asynchronous)") } diff --git a/store b/store index 76f7701cf3..4ed7d6ff58 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit 76f7701cf322fbead46f44223ea1128a5c1fba9f +Subproject commit 4ed7d6ff58850eef7eb1b8798848142de5e5abe1 From d4f7d4f42e0dee3fd22168f7f8ba677d258d2548 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Wed, 31 Jul 2019 21:02:02 +0530 Subject: [PATCH 2/5] update store link --- store | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store b/store index 4ed7d6ff58..219271a473 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit 4ed7d6ff58850eef7eb1b8798848142de5e5abe1 +Subproject commit 219271a473733ff840c5d6bebf5431c6bcd8091e From 6cf9ca93f6e3a91b1ae0a25c99188e53f491b9b5 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Wed, 31 Jul 2019 22:51:05 +0530 Subject: [PATCH 3/5] corrected minimum bucket size determination to use off-heap size --- .../SnappyTableStatsProviderService.scala | 14 ++++------ .../impl/JDBCSourceAsColumnarStore.scala | 28 ++++++++++--------- store | 2 +- 3 files changed, 21 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala index 3b791aa014..15c4a342d6 100644 --- a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala +++ b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala @@ -25,14 +25,14 @@ import java.util.function.BiFunction import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.implicitConversions -import scala.util.control.NonFatal import scala.util.control.Breaks._ +import scala.util.control.NonFatal import com.gemstone.gemfire.CancelException import com.gemstone.gemfire.cache.execute.FunctionService import com.gemstone.gemfire.i18n.LogWriterI18n import com.gemstone.gemfire.internal.SystemTimer -import com.gemstone.gemfire.internal.cache.{AbstractRegionEntry, LocalRegion, PartitionedRegion, RegionEntry} +import com.gemstone.gemfire.internal.cache.{AbstractRegionEntry, PartitionedRegion, RegionEntry} import com.pivotal.gemfirexd.internal.engine.Misc import com.pivotal.gemfirexd.internal.engine.distributed.GfxdListResultCollector.ListResultCollectorValue import com.pivotal.gemfirexd.internal.engine.distributed.{GfxdListResultCollector, GfxdMessage} @@ -291,10 +291,8 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService } def publishColumnTableRowCountStats(): Unit = { - def asSerializable[C](c: C) = c.asInstanceOf[C with Serializable] - - val regions = asSerializable(Misc.getGemFireCache.getApplicationRegions.asScala) - for (region: LocalRegion <- regions) { + val regions = Misc.getGemFireCache.getApplicationRegions.asScala + for (region <- regions) { if (region.getDataPolicy.withPartitioning()) { val table = Misc.getFullTableNameFromRegionPath(region.getFullPath) val pr = region.asInstanceOf[PartitionedRegion] @@ -306,9 +304,7 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService var rowsInColumnBatch = 0L var offHeapSize = 0L if (container ne null) { - // TODO: this should use a transactional iterator to get a consistent - // snapshot (also pass the same transaction to getNumColumnsInTable - // for reading value and delete count) + // TODO: SW: this should avoid iteration and use BucketRegion to get the sizes val itr = new pr.PRLocalScanIterator(false /* primaryOnly */ , null /* no TX */ , null /* not required since includeValues is false */ , createRemoteIterator, false /* forUpdate */ , false /* includeValues */) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala index 602f322689..834c366df3 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala @@ -21,6 +21,7 @@ import java.sql.{Connection, PreparedStatement, ResultSet, Statement} import java.util.Collections import scala.annotation.meta.param +import scala.collection.mutable.ArrayBuffer import scala.util.Random import scala.util.control.NonFatal @@ -634,8 +635,9 @@ class JDBCSourceAsColumnarStore(private var _connProperties: ConnectionPropertie } } - private def getInProgressBucketSize(br: BucketRegion, shift: Int): Long = - (br.getTotalBytes + br.getInProgressSize) >> shift + // round off to nearest 8k to avoid tiny size changes from effecting the minimum selection + private def getInProgressBucketSize(br: BucketRegion): Long = + (br.getTotalBytes + br.getInProgressSize) >> 13L // use the same saved connection for all operation private def getPartitionID(columnTableName: String, @@ -653,24 +655,24 @@ class JDBCSourceAsColumnarStore(private var _connProperties: ConnectionPropertie } else { // select the bucket with smallest size at this point val iterator = primaryBuckets.iterator() - // for heap buffer, round off to nearest 8k to avoid tiny - // size changes from effecting the minimum selection else - // round off to 32 for off-heap where memory bytes has only - // the entry+key overhead (but overflow bytes have data too) - val shift = if (GemFireCacheImpl.hasNewOffHeap) 5 else 13 assert(iterator.hasNext) - var smallestBucket = iterator.next() - var minBucketSize = getInProgressBucketSize(smallestBucket, shift) + val smallestBuckets = new ArrayBuffer[BucketRegion](4) + smallestBuckets += iterator.next() + var minBucketSize = getInProgressBucketSize(smallestBuckets(0)) while (iterator.hasNext) { val bucket = iterator.next() - val bucketSize = getInProgressBucketSize(bucket, shift) - if (bucketSize < minBucketSize || - (bucketSize == minBucketSize && Random.nextBoolean())) { - smallestBucket = bucket + val bucketSize = getInProgressBucketSize(bucket) + if (bucketSize < minBucketSize) { + smallestBuckets.clear() + smallestBuckets += bucket minBucketSize = bucketSize + } else if (bucketSize == minBucketSize) { + smallestBuckets += bucket } } val batchSize = getBatchSizeInBytes() + // choose a random bucket among all the smallest ones + val smallestBucket = smallestBuckets(Random.nextInt(smallestBuckets.length)) // update the in-progress size of the chosen bucket smallestBucket.updateInProgressSize(batchSize) (smallestBucket.getId, Some(smallestBucket), batchSize) diff --git a/store b/store index 219271a473..5c1b1fd9a2 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit 219271a473733ff840c5d6bebf5431c6bcd8091e +Subproject commit 5c1b1fd9a281ab2b616169003213975dbe29a063 From de4bc0fcee99793e500c7a2c9f29be4fe1bdb0f1 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Thu, 1 Aug 2019 02:02:14 +0530 Subject: [PATCH 4/5] use pre-collected BucketRegion stats instead of region iteration --- .../spark/memory/SnappyStorageEvictor.scala | 9 +-- .../SnappyTableStatsProviderService.scala | 53 ++------------ .../sql/catalog/impl/StoreHiveCatalog.scala | 6 +- .../columnar/impl/ColumnFormatRelation.scala | 20 ++--- .../impl/JDBCSourceAsColumnarStore.scala | 2 +- .../columnar/impl/StoreCallbacksImpl.scala | 3 - .../execution/columnar/impl/ColumnDelta.scala | 6 +- .../columnar/impl/ColumnFormatEntry.scala | 73 +++++++++---------- store | 2 +- 9 files changed, 60 insertions(+), 114 deletions(-) diff --git a/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala b/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala index fb77cde5d5..083bf84b4b 100644 --- a/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala +++ b/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala @@ -27,10 +27,8 @@ import com.gemstone.gemfire.internal.cache._ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType import com.gemstone.gemfire.internal.i18n.LocalizedStrings -import com.pivotal.gemfirexd.internal.engine.Misc import org.apache.spark.Logging -import org.apache.spark.sql.execution.columnar.impl.ColumnFormatRelation class SnappyStorageEvictor extends Logging { @@ -126,12 +124,11 @@ class SnappyStorageEvictor extends Logging { offHeap: Boolean, hasOffHeap: Boolean): Boolean = { val hasLRU = (region.getEvictionAttributes.getAlgorithm.isLRUHeap && (region.getDataStore != null) - && !region.getAttributes.getEnableOffHeapMemory && !region.isRowBuffer()) + && !region.getAttributes.getEnableOffHeapMemory && !region.isRowBuffer) if (hasOffHeap) { // when off-heap is enabled then all column tables use off-heap - val regionPath = Misc.getFullTableNameFromRegionPath(region.getFullPath) - if (offHeap) hasLRU && ColumnFormatRelation.isColumnTable(regionPath) - else hasLRU && !ColumnFormatRelation.isColumnTable(regionPath) + if (offHeap) hasLRU && region.isInternalColumnTable + else hasLRU && !region.isInternalColumnTable } else { assert(!offHeap, "unexpected invocation for hasOffHeap=false and offHeap=true") diff --git a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala index 15c4a342d6..a8b5ba2854 100644 --- a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala +++ b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala @@ -20,7 +20,6 @@ package io.snappydata import java.util.concurrent.TimeUnit -import java.util.function.BiFunction import scala.collection.JavaConverters._ import scala.collection.mutable @@ -32,12 +31,11 @@ import com.gemstone.gemfire.CancelException import com.gemstone.gemfire.cache.execute.FunctionService import com.gemstone.gemfire.i18n.LogWriterI18n import com.gemstone.gemfire.internal.SystemTimer -import com.gemstone.gemfire.internal.cache.{AbstractRegionEntry, PartitionedRegion, RegionEntry} +import com.gemstone.gemfire.internal.cache.PartitionedRegion import com.pivotal.gemfirexd.internal.engine.Misc import com.pivotal.gemfirexd.internal.engine.distributed.GfxdListResultCollector.ListResultCollectorValue import com.pivotal.gemfirexd.internal.engine.distributed.{GfxdListResultCollector, GfxdMessage} import com.pivotal.gemfirexd.internal.engine.sql.execute.MemberStatisticsMessage -import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer import com.pivotal.gemfirexd.internal.engine.ui._ import io.snappydata.Constant._ import io.snappydata.sql.catalog.CatalogObjectType @@ -45,7 +43,6 @@ import org.eclipse.collections.impl.map.mutable.UnifiedMap import org.apache.spark.SparkContext import org.apache.spark.sql.collection.Utils -import org.apache.spark.sql.execution.columnar.impl.{ColumnFormatKey, ColumnFormatRelation, ColumnFormatValue, RemoteEntriesIterator} import org.apache.spark.sql.{SnappyContext, ThinClientConnectorMode} /* @@ -275,56 +272,22 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService } } - type PRIterator = PartitionedRegion#PRLocalScanIterator - - /** - * Allows pulling stats rows efficiently if required. For the corner case - * of bucket moving away while iterating other buckets. - */ - private val createRemoteIterator = new BiFunction[java.lang.Integer, PRIterator, - java.util.Iterator[RegionEntry]] { - override def apply(bucketId: Integer, - iter: PRIterator): java.util.Iterator[RegionEntry] = { - new RemoteEntriesIterator(bucketId, Array.emptyIntArray, - iter.getPartitionedRegion, null) - } - } - def publishColumnTableRowCountStats(): Unit = { val regions = Misc.getGemFireCache.getApplicationRegions.asScala for (region <- regions) { if (region.getDataPolicy.withPartitioning()) { - val table = Misc.getFullTableNameFromRegionPath(region.getFullPath) val pr = region.asInstanceOf[PartitionedRegion] - val container = pr.getUserAttribute.asInstanceOf[GemFireContainer] - if (ColumnFormatRelation.isColumnTable(table) && - pr.getLocalMaxMemory > 0) { - var numColumnsInTable = -1 + if (pr.isInternalColumnTable && pr.getLocalMaxMemory > 0) { // Resetting PR numRows in cached batch as this will be calculated every time. var rowsInColumnBatch = 0L var offHeapSize = 0L - if (container ne null) { - // TODO: SW: this should avoid iteration and use BucketRegion to get the sizes - val itr = new pr.PRLocalScanIterator(false /* primaryOnly */ , null /* no TX */ , - null /* not required since includeValues is false */ , - createRemoteIterator, false /* forUpdate */ , false /* includeValues */) - // using direct region operations - while (itr.hasNext) { - val re = itr.next().asInstanceOf[AbstractRegionEntry] - val key = re.getRawKey.asInstanceOf[ColumnFormatKey] - val bucketRegion = itr.getHostedBucketRegion - if (bucketRegion.getBucketAdvisor.isPrimary) { - if (numColumnsInTable < 0) { - numColumnsInTable = key.getNumColumnsInTable(table) - } - rowsInColumnBatch += key.getColumnBatchRowCount(bucketRegion, re, - numColumnsInTable) - } - re._getValue() match { - case v: ColumnFormatValue => offHeapSize += v.getOffHeapSizeInBytes - case _ => - } + val buckets = pr.getDataStore.getAllLocalBucketRegions.iterator() + while (buckets.hasNext) { + val bucket = buckets.next() + if (bucket.getBucketAdvisor.isPrimary) { + rowsInColumnBatch += bucket.getNumRowsInColumnTable } + offHeapSize += bucket.getDirectBytesSizeInMemory } val stats = pr.getPrStats stats.setPRNumRowsInColumnBatches(rowsInColumnBatch) diff --git a/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala b/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala index c9ada3fe2c..1459ece694 100644 --- a/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala +++ b/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala @@ -284,7 +284,7 @@ class StoreHiveCatalog extends ExternalCatalog with Logging { } // exclude policies also from the list of hive tables val metaData = new ExternalTableMetaData(table.identifier.table, - table.database, tableType.toString, null, -1, + table.database, tableType.toString, null, table.schema.length, -1, -1, null, null, null, null, tblDataSourcePath, driverClass) metaData.provider = table.provider match { @@ -387,8 +387,8 @@ class StoreHiveCatalog extends ExternalCatalog with Logging { } new ExternalTableMetaData(qualifiedName, schema, tableType.toString, ExternalStoreUtils.getExternalStoreOnExecutor(parameters, partitions, qualifiedName, - schema), columnBatchSize, columnMaxDeltaRows, compressionCodec, baseTable, dmls, - dependentRelations, tblDataSourcePath, driverClass).asInstanceOf[R] + schema), schema.length, columnBatchSize, columnMaxDeltaRows, compressionCodec, + baseTable, dmls, dependentRelations, tblDataSourcePath, driverClass).asInstanceOf[R] } case GET_METADATA => diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala index 125f586f1a..d056988f0b 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.execution.columnar.impl import java.sql.{Connection, PreparedStatement} -import com.gemstone.gemfire.internal.cache.PartitionedRegion.RegionLock - import scala.util.control.NonFatal -import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, GemFireCacheImpl, LocalRegion, PartitionedRegion} + +import com.gemstone.gemfire.internal.cache.PartitionedRegion.RegionLock +import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, LocalRegion, PartitionedRegion} import com.pivotal.gemfirexd.internal.engine.Misc import io.snappydata.sql.catalog.{RelationInfo, SnappyExternalCatalog} import io.snappydata.{Constant, Property} + import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Descending, Expression, SortDirection} @@ -236,7 +237,7 @@ abstract class BaseColumnFormatRelation( override def getUpdatePlan(relation: LogicalRelation, child: SparkPlan, updateColumns: Seq[Attribute], updateExpressions: Seq[Expression], keyColumns: Seq[Attribute]): SparkPlan = { - withTableWriteLock() {() => + withTableWriteLock() { () => ColumnUpdateExec(child, externalColumnTableName, partitionColumns, partitionExpressions(relation), numBuckets, isPartitioned, schema, externalStore, this, updateColumns, updateExpressions, keyColumns, connProperties, onExecutor = false) @@ -276,7 +277,7 @@ abstract class BaseColumnFormatRelation( val lock = snc.getContextObject[(Option[TableIdentifier], PartitionedRegion.RegionLock)]( SnappySession.PUTINTO_LOCK) match { case None => snc.grabLock(table, schemaName, connProperties) - case Some(a) => null // Do nothing as putInto will release lock + case _ => null // Do nothing as putInto will release lock } if ((lock != null) && lock.isInstanceOf[RegionLock]) lock.asInstanceOf[RegionLock].lock() try { @@ -301,7 +302,7 @@ abstract class BaseColumnFormatRelation( } } finally { - logDebug(s"Added the ${lock} object to the context. in InsertRows") + logDebug(s"Added the $lock object to the context. in InsertRows") if (lock != null) { snc.releaseLock(lock) } @@ -320,7 +321,7 @@ abstract class BaseColumnFormatRelation( f() } finally { - logDebug(s"Added the ${lock} object to the context.") + logDebug(s"Added the $lock object to the context.") if (lock != null) { snc.addContextObject( SnappySession.BULKWRITE_LOCK, lock) @@ -712,11 +713,6 @@ object ColumnFormatRelation extends Logging with StoreCallback { tableName + Constant.SHADOW_TABLE_SUFFIX } - final def isColumnTable(tableName: String): Boolean = { - tableName.contains(Constant.SHADOW_SCHEMA_NAME_WITH_PREFIX) && - tableName.endsWith(Constant.SHADOW_TABLE_SUFFIX) - } - def getIndexUpdateStruct(indexEntry: ExternalTableMetaData, connectedExternalStore: ConnectedExternalStore): ColumnFormatRelation.IndexUpdateStruct = { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala index 834c366df3..cf658fbf02 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala @@ -271,7 +271,7 @@ class JDBCSourceAsColumnarStore(private var _connProperties: ConnectionPropertie connectionType match { case ConnectionType.Embedded => val region = Misc.getRegionForTable[ColumnFormatKey, ColumnFormatValue]( - columnTableName, true) + columnTableName, true).asInstanceOf[PartitionedRegion] val key = new ColumnFormatKey(batchId, partitionId, ColumnFormatEntry.DELETE_MASK_COL_INDEX) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala index 02d2d371e4..b5e7344c2b 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala @@ -170,9 +170,6 @@ object StoreCallbacksImpl extends StoreCallbacks with Logging with Serializable schemas } - override def isColumnTable(qualifiedName: String): Boolean = - ColumnFormatRelation.isColumnTable(qualifiedName) - override def skipEvictionForEntry(entry: LRUEntry): Boolean = { // skip eviction of stats rows (SNAP-2102) entry.getRawKey match { diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala index c648f0bac2..a8d5775eb2 100644 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala +++ b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import com.gemstone.gemfire.cache.{EntryEvent, EntryNotFoundException, Region} import com.gemstone.gemfire.internal.cache.delta.Delta import com.gemstone.gemfire.internal.cache.versions.{VersionSource, VersionTag} -import com.gemstone.gemfire.internal.cache.{DiskEntry, EntryEventImpl, GemFireCacheImpl} +import com.gemstone.gemfire.internal.cache.{DiskEntry, EntryEventImpl, GemFireCacheImpl, PartitionedRegion} import com.gemstone.gemfire.internal.shared.FetchRequest import com.pivotal.gemfirexd.internal.engine.GfxdSerializable import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer @@ -300,7 +300,7 @@ object ColumnDelta { * Delete entire batch from column store for the batchId and partitionId * matching those of given key. */ - private[columnar] def deleteBatch(key: ColumnFormatKey, columnRegion: Region[_, _], + private[columnar] def deleteBatch(key: ColumnFormatKey, columnRegion: PartitionedRegion, columnTableName: String): Unit = { // delete all the rows with matching batchId @@ -312,7 +312,7 @@ object ColumnDelta { } } - val numColumns = key.getNumColumnsInTable(columnTableName) + val numColumns = columnRegion.getNumColumns // delete the stats rows first destroyKey(key.withColumnIndex(ColumnFormatEntry.STATROW_COL_INDEX)) destroyKey(key.withColumnIndex(ColumnFormatEntry.DELTA_STATROW_COL_INDEX)) diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala index 28c2990235..c900ec2f1f 100644 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala +++ b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala @@ -27,17 +27,15 @@ import com.gemstone.gemfire.internal.DSFIDFactory.GfxdDSFID import com.gemstone.gemfire.internal.cache._ import com.gemstone.gemfire.internal.cache.lru.Sizeable import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView -import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer +import com.gemstone.gemfire.internal.cache.store.{ColumnBatchKey, SerializedDiskBuffer} import com.gemstone.gemfire.internal.shared._ import com.gemstone.gemfire.internal.shared.unsafe.DirectBufferAllocator import com.gemstone.gemfire.internal.size.ReflectionSingleObjectSizer.REFERENCE_SIZE import com.gemstone.gemfire.internal.{ByteBufferDataInput, DSCODE, DSFIDFactory, DataSerializableFixedID, HeapDataOutputStream} -import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils -import com.pivotal.gemfirexd.internal.engine.store.{GemFireContainer, RegionKey} -import com.pivotal.gemfirexd.internal.engine.{GfxdDataSerializable, GfxdSerializable, Misc} +import com.pivotal.gemfirexd.internal.engine.store.RegionKey +import com.pivotal.gemfirexd.internal.engine.{GfxdSerializable, Misc} import com.pivotal.gemfirexd.internal.iapi.types.{DataValueDescriptor, SQLInteger, SQLLongint} import com.pivotal.gemfirexd.internal.impl.sql.compile.TableName -import com.pivotal.gemfirexd.internal.snappy.ColumnBatchKey import org.apache.spark.memory.MemoryManagerCallback.{allocateExecutionMemory, memoryManager, releaseExecutionMemory} import org.apache.spark.sql.collection.SharedUtils @@ -99,46 +97,37 @@ object ColumnFormatEntry { final class ColumnFormatKey(private[columnar] var uuid: Long, private[columnar] var partitionId: Int, private[columnar] var columnIndex: Int) - extends GfxdDataSerializable with ColumnBatchKey with RegionKey with Serializable { + extends ColumnBatchKey with GfxdSerializable with RegionKey with Serializable { // to be used only by deserialization def this() = this(-1L, -1, -1) - override def getNumColumnsInTable(columnTableName: String): Int = { - val bufferTable = GemFireContainer.getRowBufferTableName(columnTableName) - GemFireXDUtils.getGemFireContainer(bufferTable, true).getNumColumns - 1 - } - override def getColumnBatchRowCount(bucketRegion: BucketRegion, - re: AbstractRegionEntry, numColumnsInTable: Int): Int = { - val currentBucketRegion = bucketRegion.getHostedBucketRegion - if ((columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || + value: SerializedDiskBuffer): Int = { + if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX || - columnIndex == ColumnFormatEntry.DELETE_MASK_COL_INDEX) && - !re.isDestroyedOrRemoved) { - val statsOrDeleteVal = re.getValue(currentBucketRegion) - if (statsOrDeleteVal ne null) { - val statsOrDelete = statsOrDeleteVal.asInstanceOf[ColumnFormatValue] - .getValueRetain(FetchRequest.DECOMPRESS) - val buffer = statsOrDelete.getBuffer - try { - if (buffer.remaining() > 0) { - if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || - columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX) { - val numColumns = ColumnStatsSchema.numStatsColumns(numColumnsInTable) - val unsafeRow = SharedUtils.toUnsafeRow(buffer, numColumns) - unsafeRow.getInt(ColumnStatsSchema.COUNT_INDEX_IN_SCHEMA) - } else { - val allocator = ColumnEncoding.getAllocator(buffer) - // decrement by deleted row count - -ColumnEncoding.readInt(allocator.baseObject(buffer), - allocator.baseOffset(buffer) + buffer.position() + 8) - } - } else 0 - } finally { - statsOrDelete.release() - } - } else 0 + columnIndex == ColumnFormatEntry.DELETE_MASK_COL_INDEX) { + val statsOrDelete = value.asInstanceOf[ColumnFormatValue] + .getValueRetain(FetchRequest.DECOMPRESS) + val buffer = statsOrDelete.getBuffer + try { + if (buffer.remaining() > 0) { + if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || + columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX) { + val numColumns = ColumnStatsSchema.numStatsColumns( + bucketRegion.getPartitionedRegion.getNumColumns) + val unsafeRow = SharedUtils.toUnsafeRow(buffer, numColumns) + unsafeRow.getInt(ColumnStatsSchema.COUNT_INDEX_IN_SCHEMA) + } else { + val allocator = ColumnEncoding.getAllocator(buffer) + // decrement by deleted row count + -ColumnEncoding.readInt(allocator.baseObject(buffer), + allocator.baseOffset(buffer) + buffer.position() + 8) + } + } else 0 + } finally { + statsOrDelete.release() + } } else 0 } @@ -159,6 +148,8 @@ final class ColumnFormatKey(private[columnar] var uuid: Long, case _ => false } + override def getDSFID: Int = DataSerializableFixedID.GFXD_TYPE + override def getGfxdID: Byte = GfxdSerializable.COLUMN_FORMAT_KEY override def toData(out: DataOutput): Unit = { @@ -173,6 +164,8 @@ final class ColumnFormatKey(private[columnar] var uuid: Long, columnIndex = in.readInt() } + override def getSerializationVersions: Array[Version] = null + override def getSizeInBytes: Int = { alignedSize(Sizeable.PER_OBJECT_OVERHEAD + 8 /* uuid */ + 4 /* columnIndex */ + 4 /* partitionId */) @@ -483,7 +476,7 @@ class ColumnFormatValue extends SerializedDiskBuffer if (this.refCount > 1 && isInRegion(context)) { // update the statistics before changing self val newVal = copy(newBuffer, isCompressed, changeOwnerToStorage = false) - context.updateMemoryStats(this, newVal) + context.updateMemoryStats(this, newVal, this.entry) } this.columnBuffer = newBuffer this.decompressionState = state diff --git a/store b/store index 5c1b1fd9a2..5f32b05eb5 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit 5c1b1fd9a281ab2b616169003213975dbe29a063 +Subproject commit 5f32b05eb578ec9914273701587369090e9345b5 From a648ad21adbf70d398f985f0dad1600aa5a2bdf4 Mon Sep 17 00:00:00 2001 From: Sumedh Wale Date: Thu, 1 Aug 2019 11:11:36 +0530 Subject: [PATCH 5/5] update store link --- store | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store b/store index 466e7fb1d5..e2410eb274 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit 466e7fb1d5f076f0f9075f5105e6cdda57f7eefd +Subproject commit e2410eb27481ad2482fd128a8458ffc0dfbab7d2