Skip to content

Commit e76030f

Browse files
authored
Add permissive mode setting (#4611)
1 parent d4a2d19 commit e76030f

File tree

15 files changed

+404
-43
lines changed

15 files changed

+404
-43
lines changed

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public enum Key {
3535
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
3636
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
3737
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),
38+
PPL_QUERY_PERMISSIVE("plugins.ppl.query.permissive"),
3839

3940
/** Enable Calcite as execution engine */
4041
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,12 @@ private List<RexNode> expandProjectFields(
404404
}
405405
matchingFields.forEach(f -> expandedFields.add(context.relBuilder.field(f)));
406406
} else if (addedFields.add(fieldName)) {
407-
expandedFields.add(rexVisitor.analyze(field, context));
407+
RexNode analyzed = rexVisitor.analyze(field, context);
408+
if (analyzed instanceof RexInputRef) {
409+
expandedFields.add(analyzed);
410+
} else {
411+
expandedFields.add(context.relBuilder.alias(analyzed, fieldName));
412+
}
408413
}
409414
}
410415
case AllFields ignored -> {

core/src/main/java/org/opensearch/sql/calcite/OpenSearchSchema.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public void registerTable(QualifiedName qualifiedName) {
4141
dataSourceService
4242
.getDataSource(nameResolver.getDataSourceName())
4343
.getStorageEngine()
44-
.getTable(
44+
.getPermissiveAwareTable(
4545
new DataSourceSchemaName(
4646
nameResolver.getDataSourceName(), nameResolver.getSchemaName()),
4747
nameResolver.getIdentifierName());

core/src/main/java/org/opensearch/sql/calcite/QualifiedNameResolver.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.apache.logging.log4j.LogManager;
1717
import org.apache.logging.log4j.Logger;
1818
import org.opensearch.sql.ast.expression.QualifiedName;
19+
import org.opensearch.sql.calcite.plan.DynamicFieldsConstants;
1920
import org.opensearch.sql.expression.function.BuiltinFunctionName;
2021
import org.opensearch.sql.expression.function.PPLFuncImpTable;
2122

@@ -69,6 +70,7 @@ private static RexNode resolveInNonJoinCondition(
6970
.or(() -> resolveFieldWithoutAlias(nameNode, context, 1))
7071
.or(() -> resolveRenamedField(nameNode, context))
7172
.or(() -> resolveCorrelationField(nameNode, context))
73+
.or(() -> resolveDynamicFields(nameNode, context, 1))
7274
.or(() -> replaceWithNullLiteralInCoalesce(context))
7375
.orElseThrow(() -> getNotFoundException(nameNode));
7476
}
@@ -116,6 +118,29 @@ private static Optional<RexNode> resolveFieldWithAlias(
116118
return Optional.empty();
117119
}
118120

121+
private static Optional<RexNode> resolveDynamicFields(
122+
QualifiedName nameNode, CalcitePlanContext context, int inputCount) {
123+
List<String> parts = nameNode.getParts();
124+
log.debug(
125+
"resolveDynamicFields() called with nameNode={}, parts={}, inputCount={}",
126+
nameNode,
127+
parts,
128+
inputCount);
129+
130+
List<Set<String>> inputFieldNames = collectInputFieldNames(context, inputCount);
131+
132+
for (int i = 0; i < inputCount; i++) {
133+
if (inputFieldNames.get(i).contains(DynamicFieldsConstants.DYNAMIC_FIELDS_MAP)) {
134+
String fieldName = String.join(".", parts);
135+
RexNode dynamicField =
136+
context.relBuilder.field(inputCount, i, DynamicFieldsConstants.DYNAMIC_FIELDS_MAP);
137+
RexNode itemAccess = createItemAccess(dynamicField, fieldName, context);
138+
return Optional.of(itemAccess);
139+
}
140+
}
141+
return Optional.empty();
142+
}
143+
119144
private static Optional<RexNode> tryToResolveField(
120145
String alias, String fieldName, CalcitePlanContext context, int inputCount) {
121146
log.debug(
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.plan;
7+
8+
import lombok.experimental.UtilityClass;
9+
10+
@UtilityClass
11+
public class DynamicFieldsConstants {
12+
public static String DYNAMIC_FIELDS_MAP = "_MAP";
13+
}

core/src/main/java/org/opensearch/sql/calcite/utils/OpenSearchTypeFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public static RelDataType convertExprTypeToRelDataType(ExprType fieldType, boole
181181
// https://github.com/opensearch-project/sql/issues/3459
182182
final RelDataType relKey = TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR);
183183
return TYPE_FACTORY.createMapType(
184-
relKey, TYPE_FACTORY.createSqlType(SqlTypeName.BINARY), nullable);
184+
relKey, TYPE_FACTORY.createSqlType(SqlTypeName.ANY), nullable);
185185
case UNKNOWN:
186186
default:
187187
throw new IllegalArgumentException(
@@ -301,6 +301,9 @@ public static ExprValue getExprValueByExprType(ExprType type, Object value) {
301301
return ExprValueUtils.collectionValue((List<Object>) value);
302302
case STRUCT:
303303
return ExprValueUtils.tupleValue((Map<String, Object>) value);
304+
case UNKNOWN:
305+
// Handle UNKNOWN type by returning the value as-is wrapped in ExprValue
306+
return ExprValueUtils.fromObjectValue(value);
304307
default:
305308
throw new IllegalArgumentException(
306309
"Unsupported conversion for OpenSearch Data type: " + type.typeName());

core/src/main/java/org/opensearch/sql/storage/StorageEngine.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ public interface StorageEngine {
1616
/** Get {@link Table} from storage engine. */
1717
Table getTable(DataSourceSchemaName dataSourceSchemaName, String tableName);
1818

19+
/** Get {@link Table} which allows permissive mode or not. */
20+
default Table getPermissiveAwareTable(
21+
DataSourceSchemaName dataSourceSchemaName, String tableName) {
22+
return getTable(dataSourceSchemaName, tableName);
23+
}
24+
1925
/**
2026
* Get list of datasource related functions.
2127
*
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.standalone;
7+
8+
import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient;
9+
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
10+
import static org.opensearch.sql.util.MatcherUtils.rows;
11+
import static org.opensearch.sql.util.MatcherUtils.schema;
12+
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
13+
import static org.opensearch.sql.util.MatcherUtils.verifySchema;
14+
15+
import java.io.IOException;
16+
import org.json.JSONObject;
17+
import org.junit.jupiter.api.Test;
18+
import org.opensearch.client.Request;
19+
20+
public class CalciteDynamicFieldsCommandIT extends CalcitePPLPermissiveIntegTestCase {
21+
22+
private static final String TEST_INDEX_DYNAMIC = "test_dynamic_fields";
23+
24+
@Override
25+
public void init() throws IOException {
26+
super.init();
27+
createTestIndexWithUnmappedFields();
28+
enableCalcite();
29+
}
30+
31+
@Test
32+
public void testBasicProjection() throws IOException {
33+
String query =
34+
source(TEST_INDEX_DYNAMIC, "fields firstname, lastname, department, salary | head 1");
35+
assertExplainYaml(
36+
query,
37+
"calcite:\n"
38+
+ " logical: |\n"
39+
+ " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n"
40+
+ " LogicalSort(fetch=[1])\n"
41+
+ " LogicalProject(firstname=[$0], lastname=[$2], department=[ITEM($9,"
42+
+ " 'department')], salary=[ITEM($9, 'salary')])\n"
43+
+ " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_fields]])\n"
44+
+ " physical: |\n"
45+
+ " EnumerableLimit(fetch=[200])\n"
46+
+ " EnumerableCalc(expr#0..9=[{inputs}], expr#10=['department'],"
47+
+ " expr#11=[ITEM($t9, $t10)], expr#12=['salary'], expr#13=[ITEM($t9, $t12)],"
48+
+ " firstname=[$t0], lastname=[$t2], department=[$t11], salary=[$t13])\n"
49+
+ " EnumerableLimit(fetch=[1])\n"
50+
+ " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_fields]])\n");
51+
52+
JSONObject result = executeQuery(query);
53+
verifySchema(
54+
result,
55+
schema("firstname", "string"),
56+
schema("lastname", "string"),
57+
schema("department", "string"),
58+
schema("salary", "int"));
59+
verifyDataRows(result, rows("John", "Doe", "Engineering", 75000));
60+
}
61+
62+
@Test
63+
public void testEval() throws IOException {
64+
String query =
65+
source(
66+
TEST_INDEX_DYNAMIC,
67+
"eval salary = cast(salary as int) * 2 | fields firstname,"
68+
+ " lastname, salary | head 1");
69+
70+
assertExplainYaml(
71+
query,
72+
"calcite:\n"
73+
+ " logical: |\n"
74+
+ " LogicalSystemLimit(fetch=[200], type=[QUERY_SIZE_LIMIT])\n"
75+
+ " LogicalSort(fetch=[1])\n"
76+
+ " LogicalProject(firstname=[$0], lastname=[$2], salary=[*(SAFE_CAST(ITEM($9,"
77+
+ " 'salary')), 2)])\n"
78+
+ " CalciteLogicalIndexScan(table=[[OpenSearch, test_dynamic_fields]])\n"
79+
+ " physical: |\n"
80+
+ " EnumerableLimit(fetch=[200])\n"
81+
+ " EnumerableCalc(expr#0..9=[{inputs}], expr#10=['salary'], expr#11=[ITEM($t9,"
82+
+ " $t10)], expr#12=[SAFE_CAST($t11)], expr#13=[2], expr#14=[*($t12, $t13)],"
83+
+ " firstname=[$t0], lastname=[$t2], salary=[$t14])\n"
84+
+ " EnumerableLimit(fetch=[1])\n"
85+
+ " CalciteEnumerableIndexScan(table=[[OpenSearch, test_dynamic_fields]])\n"
86+
+ "");
87+
88+
JSONObject result = executeQuery(query);
89+
90+
verifySchema(
91+
result,
92+
schema("firstname", "string"),
93+
schema("lastname", "string"),
94+
schema("salary", "int"));
95+
}
96+
97+
private void createTestIndexWithUnmappedFields() throws IOException {
98+
if (isIndexExist(client(), TEST_INDEX_DYNAMIC)) {
99+
return;
100+
}
101+
102+
String mapping =
103+
"{"
104+
+ "\"mappings\": {"
105+
// Disable dynamic mapping - extra fields won't be indexed but will be stored
106+
+ " \"dynamic\": false,"
107+
+ " \"properties\": {"
108+
+ " \"firstname\": {\"type\": \"text\"},"
109+
+ " \"lastname\": {\"type\": \"text\"},"
110+
+ " \"accountnumber\": {\"type\": \"long\"}"
111+
+ " }"
112+
+ "}"
113+
+ "}";
114+
115+
createIndexByRestClient(client(), TEST_INDEX_DYNAMIC, mapping);
116+
117+
String bulkData =
118+
"{\"index\":{\"_id\":\"1\"}}\n"
119+
+ "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"account_number\":1,\"city\":\"NYC\",\"department\":\"Engineering\",\"salary\":75000}\n"
120+
+ "{\"index\":{\"_id\":\"2\"}}\n"
121+
+ "{\"firstname\":\"Jane\",\"lastname\":\"Smith\",\"account_number\":2,\"city\":\"LA\",\"department\":\"Marketing\",\"salary\":65000}\n"
122+
+ "{\"index\":{\"_id\":\"3\"}}\n"
123+
+ "{\"firstname\":\"Bob\",\"lastname\":\"Johnson\",\"account_number\":3,\"city\":\"Chicago\",\"department\":\"Sales\",\"salary\":55000}\n"
124+
+ "{\"index\":{\"_id\":\"4\"}}\n"
125+
+ "{\"firstname\":\"Alice\",\"lastname\":\"Brown\",\"account_number\":4,\"city\":\"Seattle\",\"department\":\"Engineering\",\"salary\":80000}\n"
126+
+ "{\"index\":{\"_id\":\"5\"}}\n"
127+
+ "{\"firstname\":\"Charlie\",\"lastname\":\"Wilson\",\"account_number\":5,\"city\":\"Boston\",\"department\":\"HR\",\"salary\":60000}\n";
128+
129+
Request request = new Request("POST", "/" + TEST_INDEX_DYNAMIC + "/_bulk?refresh=true");
130+
request.setJsonEntity(bulkData);
131+
client().performRequest(request);
132+
}
133+
}

0 commit comments

Comments
 (0)