Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions distribution/src/config/opensearch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,11 @@ ${path.logs}
#
# Limits the memory pool for datafusion which it uses for query execution.
#datafusion.search.memory_pool: 1GB
#node.attr.remote_store.segment.repository: my-repo-1
#node.attr.remote_store.translog.repository: my-repo-1
#node.attr.remote_store.state.repository: my-repo-1
#cluster.remote_store.state.enabled: true
#node.attr.remote_store.repository.my-repo-1.type: fs
#path.repo: /tmp/remote-store-repo
#node.attr.remote_store.repository.my-repo-1.settings.location: /tmp/remote-store-repo

21 changes: 21 additions & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@ testClusters {
testDistribution = 'archive'
if (numZones > 1) numberOfZones = numZones
if (numNodes > 1) numberOfNodes = numNodes

// S3 repository configuration
if (findProperty("enableS3")) {
plugin(':plugins:repository-s3')
if (findProperty("s3Endpoint")) {
setting 's3.client.default.endpoint', findProperty("s3Endpoint")
}
setting 's3.client.default.region', findProperty("s3Region") ?: 'us-east-1'
keystore 's3.client.default.access_key', findProperty("s3AccessKey") ?: System.getenv("AWS_ACCESS_KEY_ID") ?: 'test'
keystore 's3.client.default.secret_key', findProperty("s3SecretKey") ?: System.getenv("AWS_SECRET_ACCESS_KEY") ?: 'test'

// Remote store configuration
setting 'node.attr.remote_store.segment.repository', 'my-s3-repo'
setting 'node.attr.remote_store.translog.repository', 'my-s3-repo'
setting 'node.attr.remote_store.state.repository', 'my-s3-repo'
setting 'cluster.remote_store.state.enabled', 'true'
setting 'node.attr.remote_store.repository.my-s3-repo.type', 's3'
setting 'node.attr.remote_store.repository.my-s3-repo.settings.bucket', 'local-opensearch-bucket'
setting 'node.attr.remote_store.repository.my-s3-repo.settings.base_path', 'raghraaj-local-1230'
}

if (findProperty("installedPlugins")) {
installedPlugins = Eval.me(installedPlugins)

Expand Down
76 changes: 74 additions & 2 deletions plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::num::NonZeroUsize;
* compatible open source license.
*/
use std::ptr::addr_of_mut;
use jni::objects::{JByteArray, JClass, JObject};
use jni::objects::{JByteArray, JClass, JMap, JObject};
use jni::objects::JLongArray;
use jni::sys::{jboolean, jbyteArray, jint, jlong, jstring};
use jni::{JNIEnv, JavaVM};
Expand Down Expand Up @@ -51,7 +51,7 @@ pub mod logger;
use vectorized_exec_spi::{log_info, log_error, log_debug};

use crate::custom_cache_manager::CustomCacheManager;
use crate::util::{create_file_meta_from_filenames, parse_string_arr, set_action_listener_error, set_action_listener_error_global, set_action_listener_ok, set_action_listener_ok_global};
use crate::util::{create_file_meta_from_filenames, parse_string_arr, set_action_listener_error, set_action_listener_error_global, set_action_listener_ok, set_action_listener_ok_global, set_action_listener_ok_global_with_map};
use datafusion::execution::memory_pool::{GreedyMemoryPool, TrackConsumersPool};

use crate::statistics_cache::CustomStatisticsCache;
Expand Down Expand Up @@ -482,6 +482,29 @@ impl CustomFileMeta {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct FileStats {
/// Total file size in bytes
pub size: u64,

/// Total number of rows in the file
pub num_rows: i64,
}

impl FileStats {
pub fn new(size: u64, num_rows: i64) -> Self {
Self { size, num_rows }
}

pub fn size(&self) -> u64 {
self.size
}

pub fn num_rows(&self) -> i64 {
self.num_rows
}
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQueryPhaseAsync(
mut env: JNIEnv,
Expand Down Expand Up @@ -574,6 +597,55 @@ pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_executeQu
});
}

#[no_mangle]
pub extern "system" fn Java_org_opensearch_datafusion_jni_NativeBridge_fetchSegmentStats(
mut env: JNIEnv,
_class: JClass,
shard_view_ptr: jlong,
listener: JObject,
) {
let manager = match TOKIO_RUNTIME_MANAGER.get() {
Some(m) => m,
None => {
log_info!("Runtime manager not initialized");
set_action_listener_error(&mut env, listener,
&DataFusionError::Execution("Runtime manager not initialized".to_string()));
return;
}
};

// Convert listener to GlobalRef (thread-safe)
let listener_ref = match env.new_global_ref(&listener) {
Ok(r) => r,
Err(e) => {
log_error!("Failed to create global ref: {}", e);
set_action_listener_error(&mut env, listener,
&DataFusionError::Execution(format!("Failed to create global ref: {}", e)));
return;
}
};
let io_runtime = manager.io_runtime.clone();

let shard_view = unsafe { &*(shard_view_ptr as *const ShardView) };
let files_meta = shard_view.files_metadata();

io_runtime.block_on(async move {
let file_stats = util::fetch_segment_statistics(files_meta).await;
match file_stats {
Ok(map) => {
with_jni_env(|env| {
set_action_listener_ok_global_with_map(env, &listener_ref, &map);
});
}
Err(e) => {
with_jni_env(|env| {
log_error!("Collecting file stats failed: {}", e);
set_action_listener_error_global(env, &listener_ref, &e);
});
}
}
});
}


#[no_mangle]
Expand Down
3 changes: 2 additions & 1 deletion plugins/engine-datafusion/jni/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use std::sync::Arc;
use std::collections::{BTreeSet, HashMap, HashSet};
use datafusion::common::stats::Precision;
use jni::sys::jlong;
use datafusion::{
common::DataFusionError,
Expand Down Expand Up @@ -51,7 +52,7 @@ use crate::listing_table::{ListingOptions, ListingTable, ListingTableConfig};
use crate::partial_agg_optimizer::PartialAggregationOptimizer;
use crate::executor::DedicatedExecutor;
use crate::cross_rt_stream::CrossRtStream;
use crate::CustomFileMeta;
use crate::{CustomFileMeta, FileStats};
use crate::DataFusionRuntime;
use crate::project_row_id_analyzer::ProjectRowIdAnalyzer;
use crate::absolute_row_id_optimizer::{AbsoluteRowIdOptimizer, ROW_BASE_FIELD_NAME, ROW_ID_FIELD_NAME};
Expand Down
69 changes: 63 additions & 6 deletions plugins/engine-datafusion/jni/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use datafusion::arrow::array::RecordBatch;
use jni::objects::{GlobalRef, JObject, JObjectArray, JString};
use jni::objects::{GlobalRef, JMap, JObject, JObjectArray, JString, JValue};
use jni::sys::jlong;
use jni::JNIEnv;
use object_store::{path::Path as ObjectPath, ObjectMeta};
use std::collections::HashMap;
use std::error::Error;
use std::fs;
use std::sync::Arc;
use datafusion::error::DataFusionError;
use datafusion::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use crate::CustomFileMeta;
use crate::{CustomFileMeta, FileStats};


/// Set error message from a result using a Consumer<String> Java callback
Expand Down Expand Up @@ -206,11 +207,38 @@ pub fn create_object_meta_from_file(file_path: &str) -> Result<Vec<ObjectMeta>,
Ok(vec![object_meta])
}

pub async fn fetch_segment_statistics(
files_meta: Arc<Vec<CustomFileMeta>>,
) -> Result<HashMap<String, FileStats>, DataFusionError> {
let mut stats_map = HashMap::with_capacity(files_meta.len());

for file_meta in files_meta.iter() {
let object_meta = &file_meta.object_meta;
let num_rows: i64 = file_meta.row_group_row_counts.iter().sum();
let file_stats = FileStats::new(object_meta.size, num_rows);

let filename = object_meta
.location
.filename()
.ok_or_else(|| {
DataFusionError::Execution(format!(
"Object path has no filename: {}",
object_meta.location
))
})?
.to_string();

stats_map.insert(filename, file_stats);
}

Ok(stats_map)
}

/// Set success result by calling an ActionListener
pub fn set_action_listener_ok(env: &mut JNIEnv, listener: JObject, value: jlong) {
let long_obj = env.new_object("java/lang/Long", "(J)V", &[value.into()])
.expect("Failed to create Long object");

env.call_method(
listener,
"onResponse",
Expand All @@ -229,7 +257,7 @@ pub fn set_action_listener_error<T: Error>(env: &mut JNIEnv, listener: JObject,
"(Ljava/lang/String;)V",
&[(&error_msg).into()],
).expect("Failed to create exception");

env.call_method(
listener,
"onFailure",
Expand All @@ -239,11 +267,40 @@ pub fn set_action_listener_error<T: Error>(env: &mut JNIEnv, listener: JObject,
.expect("Failed to call ActionListener onFailure");
}

/// Set success result by calling an ActionListener
pub fn set_action_listener_ok_global_with_map(env: &mut JNIEnv, listener: &GlobalRef, map: &HashMap<String, FileStats>) {
let hash_map_obj = env.new_object("java/util/HashMap", "()V", &[])
.expect("Failed to create HashMap");
let jmap = JMap::from_env(env, &hash_map_obj)
.expect("Failed to create JMap");

for (key, value) in map {
let j_key = env.new_string(key)
.expect("Failed to create String object");
let j_value = env.new_object(
"org/opensearch/index/engine/exec/FileStats",
"(JJ)V",
&[JValue::Long(value.size() as jlong), JValue::Long(value.num_rows() as jlong)],
).expect("Failed to create Long object");

jmap.put(env, &JObject::from(j_key), &j_value)
.expect("Failed to populate JMap");
}

env.call_method(
listener.as_obj(),
"onResponse",
"(Ljava/lang/Object;)V",
&[(&hash_map_obj).into()],
)
.expect("Failed to call ActionListener onResponse");
}

/// Set success result by calling an ActionListener with GlobalRef
pub fn set_action_listener_ok_global(env: &mut JNIEnv, listener: &GlobalRef, value: jlong) {
let long_obj = env.new_object("java/lang/Long", "(J)V", &[value.into()])
.expect("Failed to create Long object");

env.call_method(
listener.as_obj(),
"onResponse",
Expand All @@ -262,7 +319,7 @@ pub fn set_action_listener_error_global<T: Error>(env: &mut JNIEnv, listener: &G
"(Ljava/lang/String;)V",
&[(&error_msg).into()],
).expect("Failed to create exception");

env.call_method(
listener.as_obj(),
"onFailure",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.datafusion.search.cache.CacheManager;
import org.opensearch.index.engine.*;
import org.opensearch.index.engine.exec.FileMetadata;
import org.opensearch.index.engine.exec.FileStats;
import org.opensearch.index.engine.exec.composite.CompositeDataFormatWriter;
import org.opensearch.index.mapper.*;
import org.opensearch.index.shard.ShardPath;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu
public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService, ShardPath shardPath) throws IOException {
this.dataFormat = dataFormat;
this.datafusionReaderManager = new DatafusionReaderManager(
shardPath.getDataPath().resolve(dataFormat.getName()).toString(), formatCatalogSnapshot, dataFormat.getName()
shardPath.getDataPath().resolve(dataFormat.getName()).toString(), formatCatalogSnapshot, dataFormat.getName(), dataFusionService
);
this.datafusionService = dataFusionService;
this.cacheManager = datafusionService.getCacheManager();
Expand Down Expand Up @@ -490,4 +491,17 @@ public void executeFetchPhase(DatafusionContext context) throws IOException {
}
}
}

@Override
public Map<String, FileStats> fetchSegmentStats() throws IOException {
DatafusionReader datafusionReader = null;
try {
datafusionReader = datafusionReaderManager.acquire();
return datafusionReader.fetchSegmentStats();
} finally {
if (datafusionReader != null) {
datafusionReaderManager.release(datafusionReader);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package org.opensearch.datafusion.jni;

import org.opensearch.core.action.ActionListener;
import org.opensearch.datafusion.ObjectResultCallback;
import org.opensearch.index.engine.exec.FileStats;

import java.util.Map;

/**
* Core JNI bridge to native DataFusion library.
Expand Down Expand Up @@ -39,6 +41,9 @@ private NativeBridge() {}
public static native void executeQueryPhaseAsync(long readerPtr, String tableName, byte[] plan, boolean isQueryPlanExplainEnabled, long runtimePtr, ActionListener<Long> listener);
public static native long executeFetchPhase(long readerPtr, long[] rowIds, String[] includeFields, String[] excludeFields, long runtimePtr);

// File Stats
public static native void fetchSegmentStats(long readerPtr, ActionListener<Map<String, FileStats>> listener);

// Stream operations
public static native void streamNext(long runtime, long stream, ActionListener<Long> listener);
public static native void streamGetSchema(long stream, ActionListener<Long> listener);
Expand Down
Loading
Loading