Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
445799f
Initial checkpoint - following calcite way and commented legacy way
Oct 10, 2025
427af0f
Removed the build.gradle dependency opensearch-common
Oct 22, 2025
83d7786
Ready to submit this PR
Oct 22, 2025
7f6e127
Ready to submit this PR
Oct 22, 2025
dcbf56b
Ready to submit this PR
Oct 22, 2025
a3c1384
Add mvexpand.rst
Oct 22, 2025
148ccc5
Add Tests
Oct 22, 2025
f5a9e82
Add the mvexpand.rst to the index.rst
Oct 23, 2025
2cc60ad
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 27, 2025
18cbba4
Remove the unwanted code
Oct 27, 2025
60fa2ad
Fix the failing test
Oct 27, 2025
d248cb0
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Oct 30, 2025
a28894a
Address the PR comments and fix the tests accordingly
Oct 30, 2025
825c52e
Address the PR comments and fix the tests accordingly
Oct 30, 2025
dc76a55
Address the PR comments and fix the tests accordingly
Oct 30, 2025
8319583
Add comment lines for buildUnnestForLeft
Oct 30, 2025
6d87133
Fix the mvexpand.rst
Oct 31, 2025
b83ab21
Merge branch 'main' into main
srikanthpadakanti Nov 3, 2025
c84703d
Fix the failing test
Nov 3, 2025
de82b65
Fix the failing test
Nov 3, 2025
6c6e0ec
Fix the failing test
Nov 3, 2025
069d52e
Fix the failing test
Nov 3, 2025
2f3aeb6
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 5, 2025
e584368
Merge branch 'opensearch-project:main' into main
srikanthpadakanti Nov 6, 2025
a41b081
Address the PR comments
Nov 6, 2025
bf018b7
Address the PR comments
Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -700,6 +701,11 @@ public LogicalPlan visitExpand(Expand expand, AnalysisContext context) {
throw getOnlyForCalciteException("Expand");
}

@Override
public LogicalPlan visitMvExpand(MvExpand node, AnalysisContext context) {
throw getOnlyForCalciteException("MvExpand");
}

/** Build {@link LogicalTrendline} for Trendline command. */
@Override
public LogicalPlan visitTrendline(Trendline node, AnalysisContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.opensearch.sql.ast.tree.Lookup;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -441,4 +442,8 @@ public T visitAppend(Append node, C context) {
public T visitMultisearch(Multisearch node, C context) {
return visitChildren(node, context);
}

public T visitMvExpand(MvExpand node, C context) {
return visitChildren(node, context);
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.sql.ast.tree.Head;
import org.opensearch.sql.ast.tree.Limit;
import org.opensearch.sql.ast.tree.MinSpanBin;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
import org.opensearch.sql.ast.tree.Project;
Expand Down Expand Up @@ -135,6 +136,10 @@ public Expand expand(UnresolvedPlan input, Field field, String alias) {
return new Expand(field, alias).attach(input);
}

public static UnresolvedPlan mvexpand(UnresolvedPlan input, Field field, Integer limit) {
return new MvExpand(field, limit);
}

public static UnresolvedPlan projectWithArg(
UnresolvedPlan input, List<Argument> argList, UnresolvedExpression... projectList) {
return new Project(Arrays.asList(projectList), argList).attach(input);
Expand Down
55 changes: 55 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/MvExpand.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import javax.annotation.Nullable;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;
import org.opensearch.sql.ast.expression.Field;

/** AST node representing an {@code mvexpand <field> [limit N]} operation. */
@ToString
@EqualsAndHashCode(callSuper = false)
public class MvExpand extends UnresolvedPlan {

private UnresolvedPlan child;
@Getter private final Field field;
@Getter @Nullable private final Integer limit;

public MvExpand(Field field, @Nullable Integer limit) {
this.field = field;
this.limit = limit;
}

@Override
public MvExpand attach(UnresolvedPlan child) {
this.child = child;
return this;
}

public Field getField() {
return field;
}

@Nullable
public Integer getLimit() {
return limit;
}

@Override
public List<UnresolvedPlan> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitMvExpand(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
import org.opensearch.sql.ast.tree.Lookup.OutputStrategy;
import org.opensearch.sql.ast.tree.ML;
import org.opensearch.sql.ast.tree.Multisearch;
import org.opensearch.sql.ast.tree.MvExpand;
import org.opensearch.sql.ast.tree.Paginate;
import org.opensearch.sql.ast.tree.Parse;
import org.opensearch.sql.ast.tree.Patterns;
Expand Down Expand Up @@ -1553,6 +1554,92 @@ private static void buildDedupNotNull(
context.relBuilder.projectExcept(_row_number_dedup_);
}

private void buildMvExpandRelNode(
RexInputRef arrayFieldRex, String arrayFieldName, String alias, CalcitePlanContext context) {

// 1. Capture left node and its schema BEFORE calling build()
RelNode leftNode = context.relBuilder.peek();
RelDataType leftSchema = leftNode.getRowType();

// 2. Create correlation variable
Holder<RexCorrelVariable> correlVariable = Holder.empty();
context.relBuilder.variable(correlVariable::set);

// 3. Find the array field index in the left schema by name (robust)
RelDataTypeField leftField = leftSchema.getField(arrayFieldName, false, false);
int arrayFieldIndexInLeft;
if (leftField != null) {
arrayFieldIndexInLeft = leftField.getIndex();
} else {
// fallback (best effort)
arrayFieldIndexInLeft = arrayFieldRex.getIndex();
}

// 4. Build correlated field access for the right-side projection
RexNode correlArrayFieldAccess =
context.relBuilder.field(
context.rexBuilder.makeCorrel(leftSchema, correlVariable.get().id),
arrayFieldIndexInLeft);

// 5. Build left and right nodes (leftBuilt is the original left, rightNode uncollects the
// array)
RelNode leftBuilt = context.relBuilder.build();
RelNode rightNode =
context
.relBuilder
.push(LogicalValues.createOneRow(context.relBuilder.getCluster()))
.project(List.of(correlArrayFieldAccess), List.of(arrayFieldName))
.uncollect(List.of(), false)
.build();

// 6. Compute a proper RexInputRef that refers to leftBuilt's rowType at the
// arrayFieldIndexInLeft.
// Use rexBuilder.makeInputRef with leftBuilt.getRowType()
RexNode requiredColumnRef =
context.rexBuilder.makeInputRef(leftBuilt.getRowType(), arrayFieldIndexInLeft);

// 7. Correlate left and right using the proper required column ref
context
.relBuilder
.push(leftBuilt)
.push(rightNode)
.correlate(JoinRelType.INNER, correlVariable.get().id, List.of(requiredColumnRef));

// 8. Remove the original array field from the output by name using the builder's field()
// (this ensures we remove the correct column instance)
RexNode toRemove;
try {
toRemove = context.relBuilder.field(arrayFieldName);
} catch (Exception e) {
// Fallback in case name lookup fails
toRemove = arrayFieldRex;
}
context.relBuilder.projectExcept(toRemove);

// 9. Optional rename into alias (same as your prior logic)
if (alias != null) {
tryToRemoveNestedFields(context);
RexInputRef expandedField = context.relBuilder.field(arrayFieldName);
List<String> names = new ArrayList<>(context.relBuilder.peek().getRowType().getFieldNames());
names.set(expandedField.getIndex(), alias);
context.relBuilder.rename(names);
}
}

@Override
public RelNode visitMvExpand(MvExpand node, CalcitePlanContext context) {
visitChildren(node, context);
Field arrayField = node.getField();
RexInputRef arrayFieldRex = (RexInputRef) rexVisitor.analyze(arrayField, context);

buildMvExpandRelNode(arrayFieldRex, arrayField.getField().toString(), null, context);

if (node.getLimit() != null) {
context.relBuilder.limit(0, node.getLimit());
}
return context.relBuilder.peek();
}

@Override
public RelNode visitWindow(Window node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down
180 changes: 180 additions & 0 deletions docs/user/ppl/cmd/mvexpand.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
=============
mvexpand
=============

.. rubric:: Table of contents

.. contents::
:local:
:depth: 2


Description
============
| The ``mvexpand`` command expands each value in a multivalue (array) field into a separate row, similar to Splunk's `mvexpand` command.
| For each document, every value in the specified field is returned as a new row. This is especially useful for log analytics and data exploration involving array fields.

| Key features of ``mvexpand``:
- Expands array fields into multiple rows, one per value.
- Supports an optional ``limit`` parameter to restrict the number of expanded values per document.
- Handles empty, null, and non-array fields gracefully.
- Works as a streaming/distributable command for performance and scalability.

Version
=======
3.3.0

Syntax
======
mvexpand <field> [limit=<int>]

* **field**: The multivalue (array) field to expand. (Required)
* **limit**: Maximum number of values per document to expand. (Optional)

Usage
=====
Basic expansion::

os> source=logs | mvexpand tags

Expansion with limit::

os> source=docs | mvexpand ids limit=3

Limitations
===========
- Only one field can be expanded per mvexpand command.
- For non-array fields, the value is returned as-is.
- For empty or null arrays, no rows are returned.
- Large arrays may be subject to resource/memory limits; exceeding them results in an error or warning.

Examples and Edge Cases
=======================

Example 1: Basic Expansion
--------------------------
Expand all values from an array field.

Input document::

{ "tags": ["error", "warning", "info"] }

PPL query::

os> source=logs | mvexpand tags
fetched rows / total rows = 3/3
+--------+
| tags |
+--------+
| error |
| warning|
| info |
+--------+

Example 2: Expansion with Limit
-------------------------------
Limit the number of expanded values per document.

Input document::

{ "ids": [1, 2, 3, 4, 5] }

PPL query::

os> source=docs | mvexpand ids limit=3
fetched rows / total rows = 3/3
+-----+
| ids |
+-----+
| 1 |
| 2 |
| 3 |
+-----+

Example 3: Empty or Null Arrays
------------------------------
Handles documents with empty or null array fields.

Input document::

{ "tags": [] }

PPL query::

os> source=logs | mvexpand tags
fetched rows / total rows = 0/0
+------+
| tags |
+------+
+------+

Input document::

{ "tags": null }

PPL query::

os> source=logs | mvexpand tags
fetched rows / total rows = 0/0
+------+
| tags |
+------+
+------+

Example 4: Non-array Field
--------------------------
If the field is a single value (not an array), mvexpand returns the value as-is.

Input document::

{ "tags": "error" }

PPL query::

os> source=logs | mvexpand tags
fetched rows / total rows = 1/1
+-------+
| tags |
+-------+
| error |
+-------+

Example 5: Large Arrays and Memory Limits
----------------------------------------
If an array exceeds configured memory/resource limits, mvexpand returns an error.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this config already supported?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no mvexpand-specific config. Resource limits are enforced by the engine/cluster (e.g. circuit breakers and SQL/PPL query size/timeouts). Use mvexpand's limit data to avoid hitting those limits; if you want I can add references to the exact cluster/SQL settings for our release.

Let me add the same to the documentation

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean SQL/PPL's circuit breaker or OpenSearch DSL's? I think our circuit breaker today only protect scan operator.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clarified in the mvexpand docs that resource limits are enforced by OpenSearch node/SQL-PPL limits (no mvexpand-specific config) and documented using limit + prefiltering as the recommended mitigation;

https://docs.opensearch.org/1.0/search-plugins/ppl/settings/

Referring to this:
plugins.query.memory_limit | Set heap memory usage limit. If a query crosses this limit, it’s terminated.


Input document::

{ "ids": [1, 2, ..., 100000] }

PPL query::

os> source=docs | mvexpand ids
Error: Memory/resource limit exceeded while expanding field 'ids'. Please reduce the array size or specify a limit.

Example 6: Multiple Fields (Limitation)
---------------------------------------
mvexpand only supports expanding one field per command. To expand multiple fields, use multiple mvexpand commands or document the limitation.

PPL query::

os> source=docs | mvexpand a | mvexpand b

Example 7: Edge Case - Field Missing
------------------------------------
If the field does not exist in a document, no row is produced for that document.

Input document::

{ "other": [1,2] }

PPL query::

os> source=docs | mvexpand tags
fetched rows / total rows = 0/0
+------+
| tags |
+------+
+------+

---
Loading
Loading