diff --git a/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala b/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala index fb77cde5d5..083bf84b4b 100644 --- a/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala +++ b/cluster/src/main/scala/org/apache/spark/memory/SnappyStorageEvictor.scala @@ -27,10 +27,8 @@ import com.gemstone.gemfire.internal.cache._ import com.gemstone.gemfire.internal.cache.control.InternalResourceManager import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType import com.gemstone.gemfire.internal.i18n.LocalizedStrings -import com.pivotal.gemfirexd.internal.engine.Misc import org.apache.spark.Logging -import org.apache.spark.sql.execution.columnar.impl.ColumnFormatRelation class SnappyStorageEvictor extends Logging { @@ -126,12 +124,11 @@ class SnappyStorageEvictor extends Logging { offHeap: Boolean, hasOffHeap: Boolean): Boolean = { val hasLRU = (region.getEvictionAttributes.getAlgorithm.isLRUHeap && (region.getDataStore != null) - && !region.getAttributes.getEnableOffHeapMemory && !region.isRowBuffer()) + && !region.getAttributes.getEnableOffHeapMemory && !region.isRowBuffer) if (hasOffHeap) { // when off-heap is enabled then all column tables use off-heap - val regionPath = Misc.getFullTableNameFromRegionPath(region.getFullPath) - if (offHeap) hasLRU && ColumnFormatRelation.isColumnTable(regionPath) - else hasLRU && !ColumnFormatRelation.isColumnTable(regionPath) + if (offHeap) hasLRU && region.isInternalColumnTable + else hasLRU && !region.isInternalColumnTable } else { assert(!offHeap, "unexpected invocation for hasOffHeap=false and offHeap=true") diff --git a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala index 15c4a342d6..a8b5ba2854 100644 --- a/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala +++ b/core/src/main/scala/io/snappydata/SnappyTableStatsProviderService.scala @@ -20,7 +20,6 @@ package io.snappydata import java.util.concurrent.TimeUnit -import java.util.function.BiFunction import scala.collection.JavaConverters._ import scala.collection.mutable @@ -32,12 +31,11 @@ import com.gemstone.gemfire.CancelException import com.gemstone.gemfire.cache.execute.FunctionService import com.gemstone.gemfire.i18n.LogWriterI18n import com.gemstone.gemfire.internal.SystemTimer -import com.gemstone.gemfire.internal.cache.{AbstractRegionEntry, PartitionedRegion, RegionEntry} +import com.gemstone.gemfire.internal.cache.PartitionedRegion import com.pivotal.gemfirexd.internal.engine.Misc import com.pivotal.gemfirexd.internal.engine.distributed.GfxdListResultCollector.ListResultCollectorValue import com.pivotal.gemfirexd.internal.engine.distributed.{GfxdListResultCollector, GfxdMessage} import com.pivotal.gemfirexd.internal.engine.sql.execute.MemberStatisticsMessage -import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer import com.pivotal.gemfirexd.internal.engine.ui._ import io.snappydata.Constant._ import io.snappydata.sql.catalog.CatalogObjectType @@ -45,7 +43,6 @@ import org.eclipse.collections.impl.map.mutable.UnifiedMap import org.apache.spark.SparkContext import org.apache.spark.sql.collection.Utils -import org.apache.spark.sql.execution.columnar.impl.{ColumnFormatKey, ColumnFormatRelation, ColumnFormatValue, RemoteEntriesIterator} import org.apache.spark.sql.{SnappyContext, ThinClientConnectorMode} /* @@ -275,56 +272,22 @@ object SnappyEmbeddedTableStatsProviderService extends TableStatsProviderService } } - type PRIterator = PartitionedRegion#PRLocalScanIterator - - /** - * Allows pulling stats rows efficiently if required. For the corner case - * of bucket moving away while iterating other buckets. - */ - private val createRemoteIterator = new BiFunction[java.lang.Integer, PRIterator, - java.util.Iterator[RegionEntry]] { - override def apply(bucketId: Integer, - iter: PRIterator): java.util.Iterator[RegionEntry] = { - new RemoteEntriesIterator(bucketId, Array.emptyIntArray, - iter.getPartitionedRegion, null) - } - } - def publishColumnTableRowCountStats(): Unit = { val regions = Misc.getGemFireCache.getApplicationRegions.asScala for (region <- regions) { if (region.getDataPolicy.withPartitioning()) { - val table = Misc.getFullTableNameFromRegionPath(region.getFullPath) val pr = region.asInstanceOf[PartitionedRegion] - val container = pr.getUserAttribute.asInstanceOf[GemFireContainer] - if (ColumnFormatRelation.isColumnTable(table) && - pr.getLocalMaxMemory > 0) { - var numColumnsInTable = -1 + if (pr.isInternalColumnTable && pr.getLocalMaxMemory > 0) { // Resetting PR numRows in cached batch as this will be calculated every time. var rowsInColumnBatch = 0L var offHeapSize = 0L - if (container ne null) { - // TODO: SW: this should avoid iteration and use BucketRegion to get the sizes - val itr = new pr.PRLocalScanIterator(false /* primaryOnly */ , null /* no TX */ , - null /* not required since includeValues is false */ , - createRemoteIterator, false /* forUpdate */ , false /* includeValues */) - // using direct region operations - while (itr.hasNext) { - val re = itr.next().asInstanceOf[AbstractRegionEntry] - val key = re.getRawKey.asInstanceOf[ColumnFormatKey] - val bucketRegion = itr.getHostedBucketRegion - if (bucketRegion.getBucketAdvisor.isPrimary) { - if (numColumnsInTable < 0) { - numColumnsInTable = key.getNumColumnsInTable(table) - } - rowsInColumnBatch += key.getColumnBatchRowCount(bucketRegion, re, - numColumnsInTable) - } - re._getValue() match { - case v: ColumnFormatValue => offHeapSize += v.getOffHeapSizeInBytes - case _ => - } + val buckets = pr.getDataStore.getAllLocalBucketRegions.iterator() + while (buckets.hasNext) { + val bucket = buckets.next() + if (bucket.getBucketAdvisor.isPrimary) { + rowsInColumnBatch += bucket.getNumRowsInColumnTable } + offHeapSize += bucket.getDirectBytesSizeInMemory } val stats = pr.getPrStats stats.setPRNumRowsInColumnBatches(rowsInColumnBatch) diff --git a/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala b/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala index c9ada3fe2c..1459ece694 100644 --- a/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala +++ b/core/src/main/scala/io/snappydata/sql/catalog/impl/StoreHiveCatalog.scala @@ -284,7 +284,7 @@ class StoreHiveCatalog extends ExternalCatalog with Logging { } // exclude policies also from the list of hive tables val metaData = new ExternalTableMetaData(table.identifier.table, - table.database, tableType.toString, null, -1, + table.database, tableType.toString, null, table.schema.length, -1, -1, null, null, null, null, tblDataSourcePath, driverClass) metaData.provider = table.provider match { @@ -387,8 +387,8 @@ class StoreHiveCatalog extends ExternalCatalog with Logging { } new ExternalTableMetaData(qualifiedName, schema, tableType.toString, ExternalStoreUtils.getExternalStoreOnExecutor(parameters, partitions, qualifiedName, - schema), columnBatchSize, columnMaxDeltaRows, compressionCodec, baseTable, dmls, - dependentRelations, tblDataSourcePath, driverClass).asInstanceOf[R] + schema), schema.length, columnBatchSize, columnMaxDeltaRows, compressionCodec, + baseTable, dmls, dependentRelations, tblDataSourcePath, driverClass).asInstanceOf[R] } case GET_METADATA => diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala index 125f586f1a..d056988f0b 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatRelation.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.execution.columnar.impl import java.sql.{Connection, PreparedStatement} -import com.gemstone.gemfire.internal.cache.PartitionedRegion.RegionLock - import scala.util.control.NonFatal -import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, GemFireCacheImpl, LocalRegion, PartitionedRegion} + +import com.gemstone.gemfire.internal.cache.PartitionedRegion.RegionLock +import com.gemstone.gemfire.internal.cache.{ExternalTableMetaData, LocalRegion, PartitionedRegion} import com.pivotal.gemfirexd.internal.engine.Misc import io.snappydata.sql.catalog.{RelationInfo, SnappyExternalCatalog} import io.snappydata.{Constant, Property} + import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Descending, Expression, SortDirection} @@ -236,7 +237,7 @@ abstract class BaseColumnFormatRelation( override def getUpdatePlan(relation: LogicalRelation, child: SparkPlan, updateColumns: Seq[Attribute], updateExpressions: Seq[Expression], keyColumns: Seq[Attribute]): SparkPlan = { - withTableWriteLock() {() => + withTableWriteLock() { () => ColumnUpdateExec(child, externalColumnTableName, partitionColumns, partitionExpressions(relation), numBuckets, isPartitioned, schema, externalStore, this, updateColumns, updateExpressions, keyColumns, connProperties, onExecutor = false) @@ -276,7 +277,7 @@ abstract class BaseColumnFormatRelation( val lock = snc.getContextObject[(Option[TableIdentifier], PartitionedRegion.RegionLock)]( SnappySession.PUTINTO_LOCK) match { case None => snc.grabLock(table, schemaName, connProperties) - case Some(a) => null // Do nothing as putInto will release lock + case _ => null // Do nothing as putInto will release lock } if ((lock != null) && lock.isInstanceOf[RegionLock]) lock.asInstanceOf[RegionLock].lock() try { @@ -301,7 +302,7 @@ abstract class BaseColumnFormatRelation( } } finally { - logDebug(s"Added the ${lock} object to the context. in InsertRows") + logDebug(s"Added the $lock object to the context. in InsertRows") if (lock != null) { snc.releaseLock(lock) } @@ -320,7 +321,7 @@ abstract class BaseColumnFormatRelation( f() } finally { - logDebug(s"Added the ${lock} object to the context.") + logDebug(s"Added the $lock object to the context.") if (lock != null) { snc.addContextObject( SnappySession.BULKWRITE_LOCK, lock) @@ -712,11 +713,6 @@ object ColumnFormatRelation extends Logging with StoreCallback { tableName + Constant.SHADOW_TABLE_SUFFIX } - final def isColumnTable(tableName: String): Boolean = { - tableName.contains(Constant.SHADOW_SCHEMA_NAME_WITH_PREFIX) && - tableName.endsWith(Constant.SHADOW_TABLE_SUFFIX) - } - def getIndexUpdateStruct(indexEntry: ExternalTableMetaData, connectedExternalStore: ConnectedExternalStore): ColumnFormatRelation.IndexUpdateStruct = { diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala index 834c366df3..cf658fbf02 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/JDBCSourceAsColumnarStore.scala @@ -271,7 +271,7 @@ class JDBCSourceAsColumnarStore(private var _connProperties: ConnectionPropertie connectionType match { case ConnectionType.Embedded => val region = Misc.getRegionForTable[ColumnFormatKey, ColumnFormatValue]( - columnTableName, true) + columnTableName, true).asInstanceOf[PartitionedRegion] val key = new ColumnFormatKey(batchId, partitionId, ColumnFormatEntry.DELETE_MASK_COL_INDEX) diff --git a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala index 02d2d371e4..b5e7344c2b 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/columnar/impl/StoreCallbacksImpl.scala @@ -170,9 +170,6 @@ object StoreCallbacksImpl extends StoreCallbacks with Logging with Serializable schemas } - override def isColumnTable(qualifiedName: String): Boolean = - ColumnFormatRelation.isColumnTable(qualifiedName) - override def skipEvictionForEntry(entry: LRUEntry): Boolean = { // skip eviction of stats rows (SNAP-2102) entry.getRawKey match { diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala index c648f0bac2..a8d5775eb2 100644 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala +++ b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnDelta.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import com.gemstone.gemfire.cache.{EntryEvent, EntryNotFoundException, Region} import com.gemstone.gemfire.internal.cache.delta.Delta import com.gemstone.gemfire.internal.cache.versions.{VersionSource, VersionTag} -import com.gemstone.gemfire.internal.cache.{DiskEntry, EntryEventImpl, GemFireCacheImpl} +import com.gemstone.gemfire.internal.cache.{DiskEntry, EntryEventImpl, GemFireCacheImpl, PartitionedRegion} import com.gemstone.gemfire.internal.shared.FetchRequest import com.pivotal.gemfirexd.internal.engine.GfxdSerializable import com.pivotal.gemfirexd.internal.engine.store.GemFireContainer @@ -300,7 +300,7 @@ object ColumnDelta { * Delete entire batch from column store for the batchId and partitionId * matching those of given key. */ - private[columnar] def deleteBatch(key: ColumnFormatKey, columnRegion: Region[_, _], + private[columnar] def deleteBatch(key: ColumnFormatKey, columnRegion: PartitionedRegion, columnTableName: String): Unit = { // delete all the rows with matching batchId @@ -312,7 +312,7 @@ object ColumnDelta { } } - val numColumns = key.getNumColumnsInTable(columnTableName) + val numColumns = columnRegion.getNumColumns // delete the stats rows first destroyKey(key.withColumnIndex(ColumnFormatEntry.STATROW_COL_INDEX)) destroyKey(key.withColumnIndex(ColumnFormatEntry.DELTA_STATROW_COL_INDEX)) diff --git a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala index 28c2990235..c900ec2f1f 100644 --- a/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala +++ b/encoders/src/main/scala/org/apache/spark/sql/execution/columnar/impl/ColumnFormatEntry.scala @@ -27,17 +27,15 @@ import com.gemstone.gemfire.internal.DSFIDFactory.GfxdDSFID import com.gemstone.gemfire.internal.cache._ import com.gemstone.gemfire.internal.cache.lru.Sizeable import com.gemstone.gemfire.internal.cache.persistence.DiskRegionView -import com.gemstone.gemfire.internal.cache.store.SerializedDiskBuffer +import com.gemstone.gemfire.internal.cache.store.{ColumnBatchKey, SerializedDiskBuffer} import com.gemstone.gemfire.internal.shared._ import com.gemstone.gemfire.internal.shared.unsafe.DirectBufferAllocator import com.gemstone.gemfire.internal.size.ReflectionSingleObjectSizer.REFERENCE_SIZE import com.gemstone.gemfire.internal.{ByteBufferDataInput, DSCODE, DSFIDFactory, DataSerializableFixedID, HeapDataOutputStream} -import com.pivotal.gemfirexd.internal.engine.distributed.utils.GemFireXDUtils -import com.pivotal.gemfirexd.internal.engine.store.{GemFireContainer, RegionKey} -import com.pivotal.gemfirexd.internal.engine.{GfxdDataSerializable, GfxdSerializable, Misc} +import com.pivotal.gemfirexd.internal.engine.store.RegionKey +import com.pivotal.gemfirexd.internal.engine.{GfxdSerializable, Misc} import com.pivotal.gemfirexd.internal.iapi.types.{DataValueDescriptor, SQLInteger, SQLLongint} import com.pivotal.gemfirexd.internal.impl.sql.compile.TableName -import com.pivotal.gemfirexd.internal.snappy.ColumnBatchKey import org.apache.spark.memory.MemoryManagerCallback.{allocateExecutionMemory, memoryManager, releaseExecutionMemory} import org.apache.spark.sql.collection.SharedUtils @@ -99,46 +97,37 @@ object ColumnFormatEntry { final class ColumnFormatKey(private[columnar] var uuid: Long, private[columnar] var partitionId: Int, private[columnar] var columnIndex: Int) - extends GfxdDataSerializable with ColumnBatchKey with RegionKey with Serializable { + extends ColumnBatchKey with GfxdSerializable with RegionKey with Serializable { // to be used only by deserialization def this() = this(-1L, -1, -1) - override def getNumColumnsInTable(columnTableName: String): Int = { - val bufferTable = GemFireContainer.getRowBufferTableName(columnTableName) - GemFireXDUtils.getGemFireContainer(bufferTable, true).getNumColumns - 1 - } - override def getColumnBatchRowCount(bucketRegion: BucketRegion, - re: AbstractRegionEntry, numColumnsInTable: Int): Int = { - val currentBucketRegion = bucketRegion.getHostedBucketRegion - if ((columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || + value: SerializedDiskBuffer): Int = { + if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX || - columnIndex == ColumnFormatEntry.DELETE_MASK_COL_INDEX) && - !re.isDestroyedOrRemoved) { - val statsOrDeleteVal = re.getValue(currentBucketRegion) - if (statsOrDeleteVal ne null) { - val statsOrDelete = statsOrDeleteVal.asInstanceOf[ColumnFormatValue] - .getValueRetain(FetchRequest.DECOMPRESS) - val buffer = statsOrDelete.getBuffer - try { - if (buffer.remaining() > 0) { - if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || - columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX) { - val numColumns = ColumnStatsSchema.numStatsColumns(numColumnsInTable) - val unsafeRow = SharedUtils.toUnsafeRow(buffer, numColumns) - unsafeRow.getInt(ColumnStatsSchema.COUNT_INDEX_IN_SCHEMA) - } else { - val allocator = ColumnEncoding.getAllocator(buffer) - // decrement by deleted row count - -ColumnEncoding.readInt(allocator.baseObject(buffer), - allocator.baseOffset(buffer) + buffer.position() + 8) - } - } else 0 - } finally { - statsOrDelete.release() - } - } else 0 + columnIndex == ColumnFormatEntry.DELETE_MASK_COL_INDEX) { + val statsOrDelete = value.asInstanceOf[ColumnFormatValue] + .getValueRetain(FetchRequest.DECOMPRESS) + val buffer = statsOrDelete.getBuffer + try { + if (buffer.remaining() > 0) { + if (columnIndex == ColumnFormatEntry.STATROW_COL_INDEX || + columnIndex == ColumnFormatEntry.DELTA_STATROW_COL_INDEX) { + val numColumns = ColumnStatsSchema.numStatsColumns( + bucketRegion.getPartitionedRegion.getNumColumns) + val unsafeRow = SharedUtils.toUnsafeRow(buffer, numColumns) + unsafeRow.getInt(ColumnStatsSchema.COUNT_INDEX_IN_SCHEMA) + } else { + val allocator = ColumnEncoding.getAllocator(buffer) + // decrement by deleted row count + -ColumnEncoding.readInt(allocator.baseObject(buffer), + allocator.baseOffset(buffer) + buffer.position() + 8) + } + } else 0 + } finally { + statsOrDelete.release() + } } else 0 } @@ -159,6 +148,8 @@ final class ColumnFormatKey(private[columnar] var uuid: Long, case _ => false } + override def getDSFID: Int = DataSerializableFixedID.GFXD_TYPE + override def getGfxdID: Byte = GfxdSerializable.COLUMN_FORMAT_KEY override def toData(out: DataOutput): Unit = { @@ -173,6 +164,8 @@ final class ColumnFormatKey(private[columnar] var uuid: Long, columnIndex = in.readInt() } + override def getSerializationVersions: Array[Version] = null + override def getSizeInBytes: Int = { alignedSize(Sizeable.PER_OBJECT_OVERHEAD + 8 /* uuid */ + 4 /* columnIndex */ + 4 /* partitionId */) @@ -483,7 +476,7 @@ class ColumnFormatValue extends SerializedDiskBuffer if (this.refCount > 1 && isInRegion(context)) { // update the statistics before changing self val newVal = copy(newBuffer, isCompressed, changeOwnerToStorage = false) - context.updateMemoryStats(this, newVal) + context.updateMemoryStats(this, newVal, this.entry) } this.columnBuffer = newBuffer this.decompressionState = state diff --git a/store b/store index 26b2ad2721..e2410eb274 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit 26b2ad272128e8815a3d9cd7236d54aaac711c13 +Subproject commit e2410eb27481ad2482fd128a8458ffc0dfbab7d2