diff --git a/docs/docs/concepts/views.md b/docs/docs/concepts/views.md index 8f0559fde9c4..b6c649ff5df9 100644 --- a/docs/docs/concepts/views.md +++ b/docs/docs/concepts/views.md @@ -36,6 +36,8 @@ View metadata is persisted only when the catalog implementation supports it: - **Hive metastore catalog** – view metadata is stored together with table metadata inside the metastore warehouse. - **REST catalog** – view metadata is kept in the REST backend and exposed through the catalog API. +- **JDBC catalog** – view metadata is stored in the catalog database in the `paimon_views` + metadata table. The table is created automatically when the JDBC catalog is initialized. File-system catalogs do not currently support views because they lack persistent metadata storage. @@ -57,6 +59,10 @@ view using their native dialect. Use `CREATE VIEW` or `CREATE OR REPLACE VIEW` to register a view. Paimon assigns a UUID, writes the first metadata file, and records version `1`. +The catalog stores the view definition. It does not resolve or validate referenced tables when the +view is created, so missing or cross-database table references are checked by the compute engine when +the view is queried. + ```sql CREATE VIEW sales_view AS SELECT region, SUM(amount) AS total_amount diff --git a/docs/docs/flink/sql-ddl.md b/docs/docs/flink/sql-ddl.md index 7708af840afc..835bc376e120 100644 --- a/docs/docs/flink/sql-ddl.md +++ b/docs/docs/flink/sql-ddl.md @@ -146,6 +146,18 @@ You can also perform logical isolation for databases under multiple catalogs by Additionally, when creating a JdbcCatalog, you can specify the maximum length for the lock key by configuring "lock-key-max-length," which defaults to 255. Since this value is a combination of {catalog-key}.{database-name}.{table-name}, please adjust accordingly. +JDBC catalog supports persistent Paimon views. View metadata is stored in the automatically created +`paimon_views` table in the catalog database. The view SQL is stored as catalog metadata and is not +resolved when the view is created. + +```sql +CREATE VIEW sales_view AS SELECT name, amount FROM sales WHERE amount > 100; + +SHOW VIEWS; + +DROP VIEW sales_view; +``` + You can define any default table options with the prefix `table-default.` for tables created in the catalog. ## Create Table diff --git a/docs/docs/spark/sql-ddl.md b/docs/docs/spark/sql-ddl.md index 66ac0bc8b1cb..4e6b9566b528 100644 --- a/docs/docs/spark/sql-ddl.md +++ b/docs/docs/spark/sql-ddl.md @@ -103,6 +103,9 @@ Paimon JDBC Catalog in Spark needs to correctly add the corresponding jar packag | mysql | mysql-connector-java | [Download](https://mvnrepository.com/artifact/mysql/mysql-connector-java) | | postgres | postgresql | [Download](https://mvnrepository.com/artifact/org.postgresql/postgresql) | +JDBC catalog supports persistent Paimon views. View metadata is stored in the automatically created +`paimon_views` table in the catalog database. + ```bash spark-sql ... \ --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \ @@ -375,7 +378,7 @@ CREATE TABLE target_tbl LIKE source_tbl; ## View Views are based on the result-set of an SQL query, when using `org.apache.paimon.spark.SparkCatalog`, views are managed by paimon itself. -And in this case, views are supported when the `metastore` type is `hive` or `rest`. +And in this case, views are supported when the `metastore` type is `hive`, `rest` or `jdbc`. ### Create Or Replace View diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java index afe4ed8ae6de..3a0a6c909a88 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CachingCatalog.java @@ -32,6 +32,8 @@ import org.apache.paimon.table.system.SystemTableLoader; import org.apache.paimon.utils.DVMetaCache; import org.apache.paimon.utils.SegmentsCache; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; @@ -368,6 +370,40 @@ public void invalidateTable(Identifier identifier) { } } + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + wrapped.createView(identifier, view, ignoreIfExists); + } + + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + return wrapped.getView(identifier); + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + return wrapped.listViews(databaseName); + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + wrapped.dropView(identifier, ignoreIfNotExists); + } + + @Override + public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) + throws ViewNotExistException, ViewAlreadyExistException { + wrapped.renameView(fromView, toView, ignoreIfNotExists); + } + + @Override + public void alterView(Identifier view, List viewChanges, boolean ignoreIfNotExists) + throws ViewNotExistException, DialectAlreadyExistException, DialectNotExistException { + wrapped.alterView(view, viewChanges, ignoreIfNotExists); + } + // ================================== Cache Public API // ================================================ diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..c6f8828c5558 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -19,12 +19,15 @@ package org.apache.paimon.jdbc; import org.apache.paimon.CoreOptions; +import org.apache.paimon.PagedList; +import org.apache.paimon.TableType; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.AbstractCatalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogLoader; import org.apache.paimon.catalog.CatalogLockContext; import org.apache.paimon.catalog.CatalogLockFactory; +import org.apache.paimon.catalog.CatalogUtils; import org.apache.paimon.catalog.Database; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.catalog.PropertyChange; @@ -37,8 +40,13 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; +import org.apache.paimon.view.ViewImpl; +import org.apache.paimon.view.ViewSchema; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; @@ -58,14 +66,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.PATH; +import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase; import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable; +import static org.apache.paimon.catalog.CatalogUtils.validateCreateTable; import static org.apache.paimon.jdbc.JdbcCatalogLock.acquireTimeout; import static org.apache.paimon.jdbc.JdbcCatalogLock.checkMaxSleep; import static org.apache.paimon.jdbc.JdbcUtils.deleteProperties; @@ -129,39 +141,66 @@ private void initializeCatalogTablesIfNeed() throws SQLException, InterruptedExc connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = - dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null); - if (tableExists.next()) { - return true; + try (ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.CATALOG_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_CATALOG_TABLE).execute(); }); // Check and create database properties table. connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = + try (ResultSet tableExists = dbMeta.getTables( - null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null); - if (tableExists.next()) { - return true; + null, null, JdbcUtils.DATABASE_PROPERTIES_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_DATABASE_PROPERTIES_TABLE) - .execute(); }); // Check and create table properties table. connections.run( conn -> { DatabaseMetaData dbMeta = conn.getMetaData(); - ResultSet tableExists = + try (ResultSet tableExists = dbMeta.getTables( - null, null, JdbcUtils.TABLE_PROPERTIES_TABLE_NAME, null); - if (tableExists.next()) { - return true; + null, null, JdbcUtils.TABLE_PROPERTIES_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_TABLE_PROPERTIES_TABLE)) { + return statement.execute(); + } + }); + + // Check and create view table. + connections.run( + conn -> { + DatabaseMetaData dbMeta = conn.getMetaData(); + try (ResultSet tableExists = + dbMeta.getTables(null, null, JdbcUtils.VIEW_TABLE_NAME, null)) { + if (tableExists.next()) { + return true; + } + } + try (PreparedStatement statement = + conn.prepareStatement(JdbcUtils.CREATE_VIEW_TABLE)) { + return statement.execute(); } - return conn.prepareStatement(JdbcUtils.CREATE_TABLE_PROPERTIES_TABLE).execute(); }); // if lock enabled, Check and create distributed lock table. @@ -226,6 +265,26 @@ protected void createDatabaseImpl(String name, Map properties) { insertProperties(connections, catalogKey, name, createProps); } + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException { + checkNotSystemDatabase(name); + try { + getDatabase(name); + } catch (DatabaseNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw new DatabaseNotExistException(name); + } + + if (!cascade && (!listTables(name).isEmpty() || !listViews(name).isEmpty())) { + throw new DatabaseNotEmptyException(name); + } + + dropDatabaseImpl(name); + } + @Override protected void dropDatabaseImpl(String name) { // Delete table from paimon_tables @@ -240,6 +299,8 @@ protected void dropDatabaseImpl(String name) { catalogKey, name); } + // Delete views from paimon_views. + execute(connections, JdbcUtils.DELETE_VIEWS_SQL, catalogKey, name); } @Override @@ -290,6 +351,55 @@ protected List listTablesImpl(String databaseName) { databaseName); } + @Override + public void createTable(Identifier identifier, Schema schema, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException { + checkNotBranch(identifier, "createTable"); + checkNotSystemTable(identifier, "createTable"); + validateCreateTable(schema, false); + validateCustomTablePath(schema.options()); + + getDatabase(identifier.getDatabaseName()); + + try { + getTable(identifier); + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(identifier); + } catch (TableNotExistException ignored) { + } + + if (JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(identifier); + } + + copyTableDefaultOptions(schema.options()); + + TableType tableType = Options.fromMap(schema.options()).get(TYPE); + switch (tableType) { + case TABLE: + case MATERIALIZED_TABLE: + createTableImpl(identifier, schema); + break; + case FORMAT_TABLE: + createFormatTable(identifier, schema); + break; + case OBJECT_TABLE: + throw new UnsupportedOperationException( + String.format( + "Catalog %s cannot support object tables.", + this.getClass().getName())); + } + } + @Override protected void dropTableImpl(Identifier identifier, List externalPaths) { try { @@ -370,6 +480,37 @@ protected void createTableImpl(Identifier identifier, Schema schema) { } } + @Override + public void renameTable(Identifier fromTable, Identifier toTable, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException { + checkNotBranch(fromTable, "renameTable"); + checkNotBranch(toTable, "renameTable"); + checkNotSystemTable(fromTable, "renameTable"); + checkNotSystemTable(toTable, "renameTable"); + + try { + getTable(fromTable); + } catch (TableNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw new TableNotExistException(fromTable); + } + + try { + getTable(toTable); + throw new TableAlreadyExistException(toTable); + } catch (TableNotExistException ignored) { + } + + if (JdbcUtils.viewExists( + connections, catalogKey, toTable.getDatabaseName(), toTable.getObjectName())) { + throw new TableAlreadyExistException(toTable); + } + + renameTableImpl(fromTable, toTable); + } + @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { @@ -576,6 +717,19 @@ private boolean syncTableProperties() { return options.get(CatalogOptions.SYNC_ALL_PROPERTIES); } + private void copyTableDefaultOptions(Map tableOptions) { + tableDefaultOptions.forEach(tableOptions::putIfAbsent); + } + + private void validateCustomTablePath(Map tableOptions) { + if (!allowCustomTablePath() && tableOptions.containsKey(PATH.key())) { + throw new UnsupportedOperationException( + String.format( + "The current catalog %s does not support specifying the table path when creating a table.", + this.getClass().getSimpleName())); + } + } + private Map convertToPropertiesTableKey(TableSchema tableSchema) { Map properties = new HashMap<>(); if (!tableSchema.primaryKeys().isEmpty()) { @@ -646,4 +800,287 @@ private List fetch(RowProducer toRow, String sql, String... args) { throw new RuntimeException("Interrupted in SQL query", e); } } + + // ======================= view methods =============================== + + @Override + public View getView(Identifier identifier) throws ViewNotExistException { + try { + String viewSchemaJson = + JdbcUtils.getViewSchema( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + if (viewSchemaJson == null) { + throw new ViewNotExistException(identifier); + } + + ViewSchema viewSchema = JsonSerdeUtil.fromJson(viewSchemaJson, ViewSchema.class); + return new ViewImpl( + identifier, + viewSchema.fields(), + viewSchema.query(), + viewSchema.dialects(), + viewSchema.comment(), + viewSchema.options()); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException("Failed to get view " + identifier.getFullName(), e); + } + } + + @Override + public void createView(Identifier identifier, View view, boolean ignoreIfExists) + throws ViewAlreadyExistException, DatabaseNotExistException { + // Check if database exists + try { + getDatabase(identifier.getDatabaseName()); + } catch (DatabaseNotExistException e) { + throw e; + } + + if (JdbcUtils.tableExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return; + } + throw new ViewAlreadyExistException(identifier); + } + + // Check if view already exists + if (JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfExists) { + return; + } + throw new ViewAlreadyExistException(identifier); + } + + // Serialize view schema to JSON + ViewSchema viewSchema = + new ViewSchema( + view.rowType().getFields(), + view.query(), + view.dialects(), + view.comment().orElse(null), + view.options()); + String viewSchemaJson = JsonSerdeUtil.toJson(viewSchema); + + // Insert view + try { + JdbcUtils.insertView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + } catch (RuntimeException e) { + if (e.getMessage() != null && e.getMessage().contains("View already exists")) { + throw new ViewAlreadyExistException(identifier, e); + } + throw new RuntimeException("Failed to create view " + identifier.getFullName(), e); + } + } + + @Override + public void dropView(Identifier identifier, boolean ignoreIfNotExists) + throws ViewNotExistException { + // Check if view exists + if (!JdbcUtils.viewExists( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName())) { + if (ignoreIfNotExists) { + return; + } + throw new ViewNotExistException(identifier); + } + + // Delete view + int deletedRecords = + execute( + connections, + JdbcUtils.DROP_VIEW_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName()); + + if (deletedRecords != 1) { + throw new RuntimeException( + String.format( + "Failed to drop view %s: affected %d rows", + identifier.getFullName(), deletedRecords)); + } + } + + @Override + public List listViews(String databaseName) throws DatabaseNotExistException { + // Check if database exists + if (!JdbcUtils.databaseExists(connections, catalogKey, databaseName)) { + throw new DatabaseNotExistException(databaseName); + } + + return fetch( + row -> row.getString(JdbcUtils.VIEW_NAME), + JdbcUtils.LIST_VIEWS_SQL, + catalogKey, + databaseName); + } + + @Override + public PagedList listViewsPaged( + String databaseName, Integer maxResults, String pageToken, String viewNamePattern) + throws DatabaseNotExistException { + CatalogUtils.validateNamePattern(this, viewNamePattern); + return new PagedList<>(listViews(databaseName), null); + } + + @Override + public PagedList listViewDetailsPaged( + String databaseName, Integer maxResults, String pageToken, String viewNamePattern) + throws DatabaseNotExistException { + PagedList pagedViews = + listViewsPaged(databaseName, maxResults, pageToken, viewNamePattern); + return new PagedList<>( + pagedViews.getElements().stream() + .map( + viewName -> { + try { + return getView(Identifier.create(databaseName, viewName)); + } catch (ViewNotExistException ignored) { + LOG.warn( + "view {}.{} does not exist", + databaseName, + viewName); + return null; + } + }) + .filter(Objects::nonNull) + .collect(Collectors.toList()), + pagedViews.getNextPageToken()); + } + + @Override + public void renameView(Identifier fromView, Identifier toView, boolean ignoreIfNotExists) + throws ViewNotExistException, ViewAlreadyExistException { + // Check if source view exists + if (!JdbcUtils.viewExists( + connections, catalogKey, fromView.getDatabaseName(), fromView.getObjectName())) { + if (ignoreIfNotExists) { + return; + } + throw new ViewNotExistException(fromView); + } + + // Check if target view already exists + if (JdbcUtils.viewExists( + connections, catalogKey, toView.getDatabaseName(), toView.getObjectName())) { + throw new ViewAlreadyExistException(toView); + } + if (JdbcUtils.tableExists( + connections, catalogKey, toView.getDatabaseName(), toView.getObjectName())) { + throw new ViewAlreadyExistException(toView); + } + if (!JdbcUtils.databaseExists(connections, catalogKey, toView.getDatabaseName())) { + throw new IllegalArgumentException( + String.format("Database %s does not exist.", toView.getDatabaseName())); + } + + // Rename view + try { + JdbcUtils.renameView(connections, catalogKey, fromView, toView); + } catch (RuntimeException e) { + if (e.getMessage() != null && e.getMessage().contains("View already exists")) { + throw new ViewAlreadyExistException(toView, e); + } else if (e.getMessage() != null && e.getMessage().contains("View does not exist")) { + throw new ViewNotExistException(fromView, e); + } + throw new RuntimeException( + "Failed to rename view from " + + fromView.getFullName() + + " to " + + toView.getFullName(), + e); + } + } + + @Override + public void alterView( + Identifier identifier, List changes, boolean ignoreIfNotExists) + throws ViewNotExistException, DialectAlreadyExistException, DialectNotExistException { + // Get existing view + View existingView; + try { + existingView = getView(identifier); + } catch (ViewNotExistException e) { + if (ignoreIfNotExists) { + return; + } + throw e; + } + + // Apply changes + Map newOptions = new HashMap<>(existingView.options()); + String newComment = existingView.comment().orElse(null); + Map newDialects = new HashMap<>(existingView.dialects()); + for (ViewChange change : changes) { + if (change instanceof ViewChange.SetViewOption) { + ViewChange.SetViewOption setOption = (ViewChange.SetViewOption) change; + newOptions.put(setOption.key(), setOption.value()); + } else if (change instanceof ViewChange.RemoveViewOption) { + ViewChange.RemoveViewOption removeOption = (ViewChange.RemoveViewOption) change; + newOptions.remove(removeOption.key()); + } else if (change instanceof ViewChange.UpdateViewComment) { + ViewChange.UpdateViewComment updateComment = (ViewChange.UpdateViewComment) change; + newComment = updateComment.comment(); + } else if (change instanceof ViewChange.AddDialect) { + ViewChange.AddDialect addDialect = (ViewChange.AddDialect) change; + if (newDialects.containsKey(addDialect.dialect())) { + throw new DialectAlreadyExistException(identifier, addDialect.dialect()); + } + newDialects.put(addDialect.dialect(), addDialect.query()); + } else if (change instanceof ViewChange.UpdateDialect) { + ViewChange.UpdateDialect updateDialect = (ViewChange.UpdateDialect) change; + if (!newDialects.containsKey(updateDialect.dialect())) { + throw new DialectNotExistException(identifier, updateDialect.dialect()); + } + newDialects.put(updateDialect.dialect(), updateDialect.query()); + } else if (change instanceof ViewChange.DropDialect) { + ViewChange.DropDialect dropDialect = (ViewChange.DropDialect) change; + if (!newDialects.containsKey(dropDialect.dialect())) { + throw new DialectNotExistException(identifier, dropDialect.dialect()); + } + newDialects.remove(dropDialect.dialect()); + } + } + + // Create updated view schema + ViewSchema updatedSchema = + new ViewSchema( + existingView.rowType().getFields(), + existingView.query(), + newDialects, + newComment, + newOptions); + String viewSchemaJson = JsonSerdeUtil.toJson(updatedSchema); + + // Update view + try { + JdbcUtils.updateView( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getObjectName(), + viewSchemaJson); + } catch (RuntimeException e) { + throw new RuntimeException("Failed to alter view " + identifier.getFullName(), e); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 52cf4224f2f7..42867b5feba6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.options.Options; @@ -40,6 +41,7 @@ /** Util for jdbc catalog. */ public class JdbcUtils { private static final Logger LOG = LoggerFactory.getLogger(JdbcUtils.class); + private static final String UNIQUE_CONSTRAINT_VIOLATION_STATE = "23505"; public static final String CATALOG_TABLE_NAME = "paimon_tables"; public static final String CATALOG_KEY = "catalog_key"; public static final String TABLE_DATABASE = "database_name"; @@ -331,6 +333,117 @@ public class JdbcUtils { static final String ACQUIRED_AT = "acquired_at"; static final String EXPIRE_TIME = "expire_time_seconds"; + // View table + public static final String VIEW_TABLE_NAME = "paimon_views"; + public static final String VIEW_DATABASE = "database_name"; + public static final String VIEW_NAME = "view_name"; + public static final String VIEW_SCHEMA = "view_schema"; + + static final String CREATE_VIEW_TABLE = + "CREATE TABLE " + + VIEW_TABLE_NAME + + "(" + + CATALOG_KEY + + " VARCHAR(255) NOT NULL," + + VIEW_DATABASE + + " VARCHAR(255) NOT NULL," + + VIEW_NAME + + " VARCHAR(255) NOT NULL," + + VIEW_SCHEMA + + " TEXT NOT NULL," + + " PRIMARY KEY (" + + CATALOG_KEY + + ", " + + VIEW_DATABASE + + ", " + + VIEW_NAME + + ")" + + ")"; + + static final String GET_VIEW_SQL = + "SELECT * FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String LIST_VIEWS_SQL = + "SELECT * FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ?"; + + static final String INSERT_VIEW_SQL = + "INSERT INTO " + + VIEW_TABLE_NAME + + " (" + + CATALOG_KEY + + ", " + + VIEW_DATABASE + + ", " + + VIEW_NAME + + ", " + + VIEW_SCHEMA + + ") " + + " VALUES (?,?,?,?)"; + + static final String DROP_VIEW_SQL = + "DELETE FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String DELETE_VIEWS_SQL = + "DELETE FROM " + + VIEW_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ?"; + + static final String RENAME_VIEW_SQL = + "UPDATE " + + VIEW_TABLE_NAME + + " SET " + + VIEW_DATABASE + + " = ? , " + + VIEW_NAME + + " = ? " + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + + static final String UPDATE_VIEW_SQL = + "UPDATE " + + VIEW_TABLE_NAME + + " SET " + + VIEW_SCHEMA + + " = ? " + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + VIEW_DATABASE + + " = ? AND " + + VIEW_NAME + + " = ? "; + public static Properties extractJdbcConfiguration( Map properties, String prefix) { Properties result = new Properties(); @@ -372,9 +485,7 @@ public static void updateTable( int updatedRecords = execute( err -> { - if (err instanceof SQLIntegrityConstraintViolationException - || (err.getMessage() != null - && err.getMessage().contains("constraint failed"))) { + if (isUniqueConstraintViolation(err)) { throw new RuntimeException( String.format("Table already exists: %s", toTable)); } @@ -678,4 +789,126 @@ private static String deletePropertiesStatement(Set properties) { return sqlStatement.toString(); } + + /** Check if view exists. */ + public static boolean viewExists( + JdbcClientPool connections, String storeKey, String databaseName, String viewName) { + return exists(connections, JdbcUtils.GET_VIEW_SQL, storeKey, databaseName, viewName); + } + + /** Get view schema JSON. */ + public static String getViewSchema( + JdbcClientPool connections, String storeKey, String databaseName, String viewName) + throws SQLException, InterruptedException { + return connections.run( + conn -> { + try (PreparedStatement sql = conn.prepareStatement(JdbcUtils.GET_VIEW_SQL)) { + sql.setString(1, storeKey); + sql.setString(2, databaseName); + sql.setString(3, viewName); + try (ResultSet rs = sql.executeQuery()) { + if (rs.next()) { + return rs.getString(VIEW_SCHEMA); + } + } + return null; + } + }); + } + + /** Insert view. */ + public static void insertView( + JdbcClientPool connections, + String storeKey, + String databaseName, + String viewName, + String viewSchemaJson) { + int insertedRecords = + execute( + err -> { + if (isUniqueConstraintViolation(err)) { + throw new RuntimeException( + String.format( + "View already exists: %s.%s", + databaseName, viewName)); + } + }, + connections, + JdbcUtils.INSERT_VIEW_SQL, + storeKey, + databaseName, + viewName, + viewSchemaJson); + + if (insertedRecords != 1) { + throw new RuntimeException( + String.format( + "Failed to insert view %s.%s: affected %d rows", + databaseName, viewName, insertedRecords)); + } + } + + /** Update view. */ + public static void updateView( + JdbcClientPool connections, + String storeKey, + String databaseName, + String viewName, + String viewSchemaJson) { + int updatedRecords = + execute( + connections, + JdbcUtils.UPDATE_VIEW_SQL, + viewSchemaJson, + storeKey, + databaseName, + viewName); + + if (updatedRecords == 0) { + throw new RuntimeException( + String.format("View does not exist: %s.%s", databaseName, viewName)); + } else if (updatedRecords != 1) { + LOG.warn( + "Update operation affected {} rows: the view table's primary key assumption has been violated", + updatedRecords); + } + } + + /** Rename view. */ + public static void renameView( + JdbcClientPool connections, String storeKey, Identifier fromView, Identifier toView) { + int updatedRecords = + execute( + err -> { + if (isUniqueConstraintViolation(err)) { + throw new RuntimeException( + String.format("View already exists: %s", toView)); + } + }, + connections, + JdbcUtils.RENAME_VIEW_SQL, + toView.getDatabaseName(), + toView.getObjectName(), + storeKey, + fromView.getDatabaseName(), + fromView.getObjectName()); + + if (updatedRecords == 1) { + LOG.info("Renamed view from {}, to {}", fromView, toView); + } else if (updatedRecords == 0) { + throw new RuntimeException(String.format("View does not exist: %s", fromView)); + } else { + LOG.warn( + "Rename operation affected {} rows: the view table's primary key assumption has been violated", + updatedRecords); + } + } + + @VisibleForTesting + static boolean isUniqueConstraintViolation(SQLException err) { + String message = err.getMessage(); + return err instanceof SQLIntegrityConstraintViolationException + || UNIQUE_CONSTRAINT_VIOLATION_STATE.equals(err.getSQLState()) + || (message != null && message.contains("constraint failed")); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index fd3c6fdc5950..3c1dac34b59f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -29,7 +29,11 @@ import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; +import org.apache.paimon.view.View; +import org.apache.paimon.view.ViewChange; +import org.apache.paimon.view.ViewImpl; +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; @@ -41,11 +45,17 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -64,7 +74,9 @@ private JdbcCatalog initCatalog(Map props) { Map properties = Maps.newHashMap(); properties.put( CatalogOptions.URI.key(), - "jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", "")); + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"); properties.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); properties.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); @@ -146,6 +158,27 @@ protected boolean supportsReplaceTable() { return false; } + @Override + protected boolean supportsView() { + return true; + } + + @Test + public void testUniqueConstraintViolationDetection() { + assertThat( + JdbcUtils.isUniqueConstraintViolation( + new SQLIntegrityConstraintViolationException("duplicate entry"))) + .isTrue(); + assertThat( + JdbcUtils.isUniqueConstraintViolation( + new SQLException("duplicate key", "23505"))) + .isTrue(); + assertThat(JdbcUtils.isUniqueConstraintViolation(new SQLException("constraint failed"))) + .isTrue(); + assertThat(JdbcUtils.isUniqueConstraintViolation(new SQLException("syntax error"))) + .isFalse(); + } + @Test public void testRepairTableNotExist() throws Exception { String databaseName = "repair_db"; @@ -345,6 +378,34 @@ private Map fetchTableProperties( } } + private String fetchViewSchema(JdbcCatalog jdbcCatalog, String databaseName, String viewName) { + try { + return JdbcUtils.getViewSchema( + jdbcCatalog.getConnections(), + jdbcCatalog.getCatalogKey(), + databaseName, + viewName); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private boolean tableExists(JdbcCatalog jdbcCatalog, String tableName) { + try { + return jdbcCatalog + .getConnections() + .run( + conn -> { + try (ResultSet rs = + conn.getMetaData().getTables(null, null, tableName, null)) { + return rs.next(); + } + }); + } catch (SQLException | InterruptedException e) { + throw new RuntimeException(e); + } + } + @Test public void testTablePropertiesSyncOnCreate() throws Exception { JdbcCatalog syncCatalog = initCatalogWithSync(true); @@ -625,4 +686,270 @@ public void testInsertTableUtility() throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to insert table"); } + + @Test + public void testViewTableCreatedOnInitialize() { + assertThat(tableExists((JdbcCatalog) catalog, JdbcUtils.VIEW_TABLE_NAME)).isTrue(); + } + + @Test + public void testCreateViewStoresViewSchemaWithoutResolvingReferencedTable() throws Exception { + String databaseName = "view_schema_db"; + Identifier identifier = Identifier.create(databaseName, "view_on_missing_table"); + View view = createView(identifier); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, view, false); + + String viewSchemaJson = + fetchViewSchema((JdbcCatalog) catalog, databaseName, "view_on_missing_table"); + assertThat(viewSchemaJson).contains("\"query\"").contains("\"SELECT * FROM OTHER_TABLE\""); + assertThat(catalog.getView(identifier).query()).isEqualTo("SELECT * FROM OTHER_TABLE"); + } + + @Test + public void testCreateViewStoresCrossDatabaseQueryWithoutResolvingReferencedTable() + throws Exception { + String databaseName = "cross_database_view_db"; + Identifier identifier = Identifier.create(databaseName, "view_on_other_database"); + String query = "SELECT * FROM other_database.missing_table"; + View baseView = createView(identifier); + View view = + new ViewImpl( + identifier, + baseView.rowType().getFields(), + query, + baseView.dialects(), + baseView.comment().orElse(null), + baseView.options()); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, view, false); + + String viewSchemaJson = + fetchViewSchema((JdbcCatalog) catalog, databaseName, "view_on_other_database"); + assertThat(viewSchemaJson).contains("\"query\"").contains(query); + assertThat(catalog.getView(identifier).query()).isEqualTo(query); + } + + @Test + public void testDropDatabaseCleansViewMetadata() throws Exception { + String databaseName = "drop_view_db"; + Identifier identifier = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, createView(identifier), false); + catalog.dropDatabase(databaseName, false, true); + + assertThat( + fetchViewSchema( + (JdbcCatalog) catalog, + identifier.getDatabaseName(), + identifier.getObjectName())) + .isNull(); + } + + @Test + public void testDropDatabaseWithoutCascadeRejectsViewOnlyDatabase() throws Exception { + String databaseName = "view_only_db"; + Identifier identifier = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createView(identifier, createView(identifier), false); + + assertThatThrownBy(() -> catalog.dropDatabase(databaseName, false, false)) + .isInstanceOf(Catalog.DatabaseNotEmptyException.class) + .hasMessage("Database " + databaseName + " is not empty."); + assertThat(catalog.getView(identifier).fullName()).isEqualTo(identifier.getFullName()); + + catalog.dropDatabase(databaseName, false, true); + assertThatThrownBy(() -> catalog.getDatabase(databaseName)) + .isInstanceOf(Catalog.DatabaseNotExistException.class); + } + + @Test + public void testRenameViewAcrossDatabases() throws Exception { + String sourceDatabase = "source_view_db"; + String targetDatabase = "target_view_db"; + Identifier source = Identifier.create(sourceDatabase, "view_name"); + Identifier target = Identifier.create(targetDatabase, "view_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createDatabase(targetDatabase, false); + catalog.createView(source, createView(source), false); + + catalog.renameView(source, target, false); + + assertThatThrownBy(() -> catalog.getView(source)) + .isInstanceOf(Catalog.ViewNotExistException.class); + assertThat(catalog.getView(target).fullName()).isEqualTo(target.getFullName()); + assertThat(catalog.listViews(sourceDatabase)).isEmpty(); + assertThat(catalog.listViews(targetDatabase)).containsExactly("view_name"); + } + + @Test + public void testRenameViewRejectsMissingTargetDatabase() throws Exception { + String sourceDatabase = "source_view_db"; + Identifier source = Identifier.create(sourceDatabase, "view_name"); + Identifier target = Identifier.create("missing_view_db", "view_name"); + + catalog.createDatabase(sourceDatabase, false); + catalog.createView(source, createView(source), false); + + assertThatThrownBy(() -> catalog.renameView(source, target, false)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Database missing_view_db does not exist."); + assertThat(catalog.getView(source).fullName()).isEqualTo(source.getFullName()); + assertThatThrownBy(() -> catalog.getView(target)) + .isInstanceOf(Catalog.ViewNotExistException.class); + } + + @Test + public void testTableAndViewCannotShareName() throws Exception { + String databaseName = "shared_name_db"; + Identifier tableFirst = Identifier.create(databaseName, "table_first"); + Identifier viewFirst = Identifier.create(databaseName, "view_first"); + + catalog.createDatabase(databaseName, false); + + catalog.createTable(tableFirst, DEFAULT_TABLE_SCHEMA, false); + assertThatThrownBy(() -> catalog.createView(tableFirst, createView(tableFirst), false)) + .isInstanceOf(Catalog.ViewAlreadyExistException.class); + catalog.createView(tableFirst, createView(tableFirst), true); + assertThat(catalog.listViews(databaseName)).doesNotContain(tableFirst.getObjectName()); + + catalog.createView(viewFirst, createView(viewFirst), false); + assertThatThrownBy(() -> catalog.createTable(viewFirst, DEFAULT_TABLE_SCHEMA, false)) + .isInstanceOf(Catalog.TableAlreadyExistException.class); + catalog.createTable(viewFirst, DEFAULT_TABLE_SCHEMA, true); + assertThat(catalog.listTables(databaseName)).doesNotContain(viewFirst.getObjectName()); + } + + @Test + public void testRenameRejectsTableViewNameCollision() throws Exception { + String databaseName = "rename_collision_db"; + Identifier table = Identifier.create(databaseName, "table_name"); + Identifier view = Identifier.create(databaseName, "view_name"); + + catalog.createDatabase(databaseName, false); + catalog.createTable(table, DEFAULT_TABLE_SCHEMA, false); + catalog.createView(view, createView(view), false); + + assertThatThrownBy(() -> catalog.renameTable(table, view, false)) + .isInstanceOf(Catalog.TableAlreadyExistException.class); + assertThatThrownBy(() -> catalog.renameView(view, table, false)) + .isInstanceOf(Catalog.ViewAlreadyExistException.class); + assertThat(catalog.listTables(databaseName)).containsExactly(table.getObjectName()); + assertThat(catalog.listViews(databaseName)).containsExactly(view.getObjectName()); + } + + @Test + public void testConcurrentCreateViewOnlyCreatesOneView() throws Exception { + String databaseName = "concurrent_view_db"; + Identifier identifier = Identifier.create(databaseName, "same_view"); + View view = createView(identifier); + ExecutorService executor = Executors.newFixedThreadPool(2); + + catalog.createDatabase(databaseName, false); + Callable create = + () -> { + try { + catalog.createView(identifier, view, false); + return true; + } catch (Catalog.ViewAlreadyExistException e) { + return false; + } + }; + + try { + Future first = executor.submit(create); + Future second = executor.submit(create); + + assertThat(ImmutableList.of(first.get(), second.get())) + .containsExactlyInAnyOrder(true, false); + assertThat(catalog.listViews(databaseName)).containsExactly("same_view"); + } catch (ExecutionException e) { + throw new RuntimeException(e.getCause()); + } finally { + executor.shutdownNow(); + } + } + + @Test + public void testAlterView() throws Exception { + Identifier identifier = Identifier.create("alter_view_db", "my_view"); + View view = createView(identifier); + catalog.createDatabase(identifier.getDatabaseName(), false); + + assertDoesNotThrow( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.setOption("k", "v")), + true)); + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.setOption("k", "v")), + false)) + .isInstanceOf(Catalog.ViewNotExistException.class); + + catalog.createView(identifier, view, false); + catalog.alterView(identifier, ImmutableList.of(ViewChange.setOption("k", "v")), false); + assertThat(catalog.getView(identifier).options()).containsEntry("k", "v"); + + catalog.alterView(identifier, ImmutableList.of(ViewChange.removeOption("k")), false); + assertThat(catalog.getView(identifier).options()).doesNotContainKey("k"); + + catalog.alterView( + identifier, ImmutableList.of(ViewChange.updateComment("new comment")), false); + assertThat(catalog.getView(identifier).comment()).hasValue("new comment"); + + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.addDialect("flink_1", "SELECT * FROM FLINK_TABLE_1")), + false); + assertThat(catalog.getView(identifier).query("flink_1")) + .isEqualTo("SELECT * FROM FLINK_TABLE_1"); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.addDialect( + "flink_1", "SELECT * FROM FLINK_TABLE_1")), + false)) + .isInstanceOf(Catalog.DialectAlreadyExistException.class); + + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.updateDialect("flink_1", "SELECT * FROM FLINK_TABLE_2")), + false); + assertThat(catalog.getView(identifier).query("flink_1")) + .isEqualTo("SELECT * FROM FLINK_TABLE_2"); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of( + ViewChange.updateDialect( + "missing", "SELECT * FROM FLINK_TABLE_2")), + false)) + .isInstanceOf(Catalog.DialectNotExistException.class); + + catalog.alterView(identifier, ImmutableList.of(ViewChange.dropDialect("flink_1")), false); + assertThat(catalog.getView(identifier).query("flink_1")).isEqualTo(view.query()); + + assertThatThrownBy( + () -> + catalog.alterView( + identifier, + ImmutableList.of(ViewChange.dropDialect("missing")), + false)) + .isInstanceOf(Catalog.DialectNotExistException.class); + } } diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index c0774b1c587a..8818ecc8b8a5 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -238,6 +238,13 @@ under the License. ${project.version} test + + + org.xerial + sqlite-jdbc + 3.44.0.0 + test + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java new file mode 100644 index 000000000000..a3b1af7aeb5b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/JdbcCatalogViewITCase.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.jdbc.JdbcCatalog; +import org.apache.paimon.options.CatalogOptions; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT Case for JDBC Catalog View support. */ +public class JdbcCatalogViewITCase extends CatalogITCaseBase { + + private static final String DATABASE_NAME = "test_db"; + private static final String TABLE_NAME = "test_table"; + + @TempDir java.nio.file.Path tempFile; + + @BeforeEach + @Override + public void before() throws IOException { + super.before(); + sql(String.format("CREATE DATABASE %s", DATABASE_NAME)); + sql(String.format("USE %s", DATABASE_NAME)); + sql( + String.format( + "CREATE TABLE %s.%s (id INT, name STRING, amount DOUBLE)", + DATABASE_NAME, TABLE_NAME)); + sql( + String.format( + "INSERT INTO %s.%s VALUES (1, 'Alice', 100.0), (2, 'Bob', 200.0), (3, 'Charlie', 150.0)", + DATABASE_NAME, TABLE_NAME)); + } + + @Override + protected Map catalogOptions() { + Map options = new HashMap<>(); + options.put("metastore", "jdbc"); + options.put( + CatalogOptions.URI.key(), + "jdbc:sqlite:file:" + + UUID.randomUUID().toString().replace("-", "") + + "?mode=memory&cache=shared"); + options.put(JdbcCatalog.PROPERTY_PREFIX + "username", "user"); + options.put(JdbcCatalog.PROPERTY_PREFIX + "password", "password"); + // Disable lock for simpler testing + options.put(CatalogOptions.LOCK_ENABLED.key(), "false"); + return options; + } + + @Override + protected String getTempDirPath() { + return tempFile.toUri().toString(); + } + + @Test + public void testCreateAndQueryView() { + // Create a view + String viewName = "sales_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT name, amount FROM %s.%s WHERE amount > 100", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Query the view + List result = sql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(2); + assertThat(result.toString()).contains("Bob"); + assertThat(result.toString()).contains("Charlie"); + } + + @Test + public void testShowViews() { + // Create multiple views + sql( + String.format( + "CREATE VIEW %s.view1 AS SELECT * FROM %s.%s", + DATABASE_NAME, DATABASE_NAME, TABLE_NAME)); + sql( + String.format( + "CREATE VIEW %s.view2 AS SELECT id, name FROM %s.%s", + DATABASE_NAME, DATABASE_NAME, TABLE_NAME)); + + // List views + List result = sql("SHOW VIEWS"); + assertThat(result).hasSize(2); + assertThat(result.toString()).contains("view1"); + assertThat(result.toString()).contains("view2"); + } + + @Test + public void testDropView() { + // Create a view + String viewName = "temp_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Verify view exists + List views = sql("SHOW VIEWS"); + assertThat(views.toString()).contains(viewName); + + // Drop the view + sql(String.format("DROP VIEW %s.%s", DATABASE_NAME, viewName)); + + // Verify view is dropped + views = sql("SHOW VIEWS"); + assertThat(views.toString()).doesNotContain(viewName); + } + + @Test + public void testViewWithAggregation() { + // Create a view with aggregation + String viewName = "agg_view"; + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT COUNT(*) as cnt, SUM(amount) as total FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Query the view + List result = sql(String.format("SELECT * FROM %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(1); + assertThat(result.get(0).getField(0)).isEqualTo(3L); // count + assertThat((Double) result.get(0).getField(1)).isEqualTo(450.0); // sum + } + + @Test + public void testCreateViewIfNotExists() { + String viewName = "idempotent_view"; + + // Create view first time + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Create view again with IF NOT EXISTS - should not throw error + sql( + String.format( + "CREATE VIEW IF NOT EXISTS %s.%s AS SELECT id FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + + // Original view should remain unchanged + List result = + sql(String.format("SELECT * FROM %s.%s LIMIT 1", DATABASE_NAME, viewName)); + assertThat(result.get(0).getArity()).isEqualTo(3); // original view has 3 columns + } + + @Test + public void testDropViewIfExists() { + String viewName = "maybe_exists_view"; + + // Drop non-existent view with IF EXISTS - should not throw error + sql(String.format("DROP VIEW IF EXISTS %s.%s", DATABASE_NAME, viewName)); + + // Create and drop + sql( + String.format( + "CREATE VIEW %s.%s AS SELECT * FROM %s.%s", + DATABASE_NAME, viewName, DATABASE_NAME, TABLE_NAME)); + sql(String.format("DROP VIEW IF EXISTS %s.%s", DATABASE_NAME, viewName)); + + // Verify it's gone + List views = sql("SHOW VIEWS"); + assertThat(views.toString()).doesNotContain(viewName); + } + + @Test + public void testShowCreateView() { + String viewName = "describable_view"; + String query = + String.format( + "SELECT `name`, `amount` FROM `%s`.`%s` WHERE `amount` > 100", + DATABASE_NAME, TABLE_NAME); + sql(String.format("CREATE VIEW %s.%s AS %s", DATABASE_NAME, viewName, query)); + + // Show create view + List result = sql(String.format("SHOW CREATE VIEW %s.%s", DATABASE_NAME, viewName)); + assertThat(result).hasSize(1); + String createStatement = result.get(0).toString(); + assertThat(createStatement).contains(viewName); + assertThat(createStatement).contains("amount"); + } +}