Skip to content

Commit

Permalink
Update BanyanDB storage module to adapt BanyanDB 0.2 (apache#9839)
Browse files Browse the repository at this point in the history
* update to banyandb client 0.2

* simplify group check and creation

* support indexOnly

* partial apply configuration

* support CompleteableFuture

* fix BanyanDBProfileThreadSnapshotQueryDAO

* optimize BanyanDBProfileTaskQueryDAO

* optimize *LogQueryDAO

* support indexType

* add configuration

* use TTL from core config

Signed-off-by: Megrez Lu <[email protected]>
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
  • Loading branch information
lujiajing1126 and wu-sheng authored Nov 4, 2022
1 parent cba0e61 commit d32a318
Show file tree
Hide file tree
Showing 34 changed files with 726 additions and 410 deletions.
6 changes: 6 additions & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,9 @@ dependency:
- name: com.google.flatbuffers:flatbuffers-java
version: 1.12.0
license: Apache-2.0
- name: build.buf.protoc-gen-validate:pgv-java-stub
version: 0.6.13
license: Apache-2.0
- name: build.buf.protoc-gen-validate:protoc-gen-validate
version: 0.6.13
license: Apache-2.0
11 changes: 11 additions & 0 deletions dist-material/release-docs/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,8 @@ Apache-2.0 licenses
========================================================================
The following components are provided under the Apache-2.0 License. See project link for details.
The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/build.buf.protoc-gen-validate/pgv-java-stub/0.6.13 Apache-2.0
https://mvnrepository.com/artifact/build.buf.protoc-gen-validate/protoc-gen-validate/0.6.13 Apache-2.0
https://mvnrepository.com/artifact/com.aayushatharva.brotli4j/brotli4j/1.7.1 Apache-2.0
https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-api/1.4.2 Apache-2.0
https://mvnrepository.com/artifact/com.alibaba.nacos/nacos-client/1.4.2 Apache-2.0
Expand Down Expand Up @@ -263,6 +265,7 @@ The text of each license is the standard Apache 2.0 license.
https://mvnrepository.com/artifact/commons-codec/commons-codec/1.15 Apache-2.0
https://mvnrepository.com/artifact/commons-io/commons-io/2.7 Apache-2.0
https://mvnrepository.com/artifact/commons-logging/commons-logging/1.2 Apache-2.0
https://mvnrepository.com/artifact/commons-validator/commons-validator/1.7 Apache-2.0
https://npmjs.com/package/d3-flame-graph/v/4.1.3 4.1.3 Apache-2.0
https://npmjs.com/package/echarts/v/5.2.2 5.2.2 Apache-2.0
https://mvnrepository.com/artifact/io.etcd/jetcd-common/0.5.3 Apache-2.0
Expand Down Expand Up @@ -618,6 +621,14 @@ The text of each license is also included in licenses/LICENSE-[project].txt.

https://npmjs.com/package/robust-predicates/v/3.0.1 3.0.1 Unlicense

========================================================================
https://golang.org/LICENSE licenses
========================================================================
The following components are provided under the https://golang.org/LICENSE License. See project link for details.
The text of each license is also included in licenses/LICENSE-[project].txt.

https://mvnrepository.com/artifact/com.google.re2j/re2j/1.5 https://golang.org/LICENSE

========================================================================
https://spdx.org/licenses/MIT-0.html licenses
========================================================================
Expand Down
13 changes: 13 additions & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,19 @@
* Support `sampledTrace` in LAL.
* Support multiple rules with different names under the same layer of LAL script.
* (Optimization) Reduce the buffer size(queue) of MAL(only) metric streams. Set L1 queue size as 1/20, L2 queue size as 1/2.
* [**Breaking Change**] Migrate to BanyanDB v0.2.0.
* Adopt new OR logical operator for,
1. `MeasureIDs` query
2. `BanyanDBProfileThreadSnapshotQueryDAO` query
3. Multiple `Event` conditions query
4. Metrics query
* Simplify Group check and creation
* Partially apply `UITemplate` changes
* Support `index_only`
* Return `CompletableFuture<Void>` directly from BanyanDB client
* Optimize data binary parse methods in *LogQueryDAO
* Support different indexType
* Support configuration for TTL and (block|segment) intervals

#### UI

Expand Down
4 changes: 4 additions & 0 deletions docs/en/setup/backend/backend-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ storage:
superDatasetShardsFactor: ${SW_STORAGE_BANYANDB_SUPERDATASET_SHARDS_FACTOR:2}
concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15}
profileTaskQueryMaxSize: ${SW_STORAGE_BANYANDB_PROFILE_TASK_QUERY_MAX_SIZE:200} # the max number of fetch task in a request
streamBlockInterval: ${SW_STORAGE_BANYANDB_STREAM_BLOCK_INTERVAL:4} # Unit is hour
streamSegmentInterval: ${SW_STORAGE_BANYANDB_STREAM_SEGMENT_INTERVAL:24} # Unit is hour
measureBlockInterval: ${SW_STORAGE_BANYANDB_MEASURE_BLOCK_INTERVAL:4} # Unit is hour
measureSegmentInterval: ${SW_STORAGE_BANYANDB_MEASURE_SEGMENT_INTERVAL:24} # Unit is hour
```

For more details, please refer to the documents of [BanyanDB](https://skywalking.apache.org/docs/skywalking-banyandb/next/readme/)
Expand Down
3 changes: 2 additions & 1 deletion oap-server-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@
<awaitility.version>3.0.0</awaitility.version>
<httpcore.version>4.4.13</httpcore.version>
<commons-compress.version>1.21</commons-compress.version>
<banyandb-java-client.version>0.1.0</banyandb-java-client.version>
<!-- TODO: use release version of banyandb java client -->
<banyandb-java-client.version>0.2.0-SNAPSHOT</banyandb-java-client.version>
<kafka-clients.version>2.4.1</kafka-clients.version>
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
public @interface BanyanDB {
/**
* GlobalIndex declares advanced global index, which are only available in BanyanDB.
*
* <p>
* Global index should only be considered if a column value has a huge value candidates, but we will need a direct
* equal
* query without timestamp.
* The typical global index is designed for huge candidate of indexed values, such as `trace ID` or `segment ID`.
*
* <p>
* Only work with {@link Column}
*/
@Target({ElementType.FIELD})
Expand All @@ -51,17 +51,17 @@
* ServiceA's traffic gauge, service call per minute, includes following timestamp values, then it should be sharded
* by service ID
* [ServiceA(encoded ID): 01-28 18:30 values-1, 01-28 18:31 values-2, 01-28 18:32 values-3, 01-28 18:32 values-4]
*
* <p>
* BanyanDB is the 1st storage implementation supporting this. It would make continuous time series metrics stored
* closely and compressed better.
*
* <p>
* 1. One entity could have multiple sharding keys
* 2. If no column is appointed for this, {@link org.apache.skywalking.oap.server.core.storage.StorageData#id}
* would be used by the storage implementation accordingly.
*
* <p>
* NOTICE, this sharding concept is NOT just for splitting data into different database instances or physical
* files.
*
* <p>
* Only work with {@link Column}
*
* @return non-negative if this column be used for sharding. -1 means not as a sharding key
Expand Down Expand Up @@ -91,4 +91,31 @@
@interface NoIndexing {

}

/**
* Additional information for constructing Index in BanyanDB.
*
* @since 9.3.0
*/
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@interface IndexRule {
/**
* IndexRule supports selecting two distinct kinds of index structures.
*/
IndexType indexType() default IndexType.INVERTED;

enum IndexType {
/**
* The `INVERTED` index is the primary option when users set up an index rule.
* It's suitable for most tag indexing due to a better memory usage ratio and query performance.
*/
INVERTED,
/**
* The `TREE` index could be better when there are high cardinalities, such as the `ID` tag and numeric duration tag.
* In these cases, it saves much memory space.
*/
TREE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ public class BanyanDBExtension {
*/
private final boolean shouldIndex;

/**
* indexType is the type of index built for a {@link ModelColumn} in BanyanDB.
*
* @since 9.3.0
*/
@Getter
private final BanyanDB.IndexRule.IndexType indexType;

/**
* @return true if this column is a part of sharding key
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,13 @@ private void retrieval(final Class<?> clazz,
BanyanDB.GlobalIndex.class);
final BanyanDB.NoIndexing banyanDBNoIndex = field.getAnnotation(
BanyanDB.NoIndexing.class);
final BanyanDB.IndexRule banyanDBIndexRule = field.getAnnotation(
BanyanDB.IndexRule.class);
BanyanDBExtension banyanDBExtension = new BanyanDBExtension(
banyanDBShardingKey == null ? -1 : banyanDBShardingKey.index(),
banyanDBGlobalIndex == null ? false : true,
banyanDBNoIndex != null ? false : column.storageOnly()
banyanDBGlobalIndex != null,
banyanDBNoIndex == null && column.storageOnly(),
banyanDBIndexRule == null ? BanyanDB.IndexRule.IndexType.INVERTED : banyanDBIndexRule.indexType()
);

final ModelColumn modelColumn = new ModelColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ BrowserErrorLogs queryBrowserErrorLogs(String serviceId,
int limit,
int from) throws IOException;

default BrowserErrorLog parserDataBinary(String dataBinaryBase64) {
return parserDataBinary(Base64.getDecoder().decode(dataBinaryBase64));
}

/**
* Parser the raw error log.
*/
default BrowserErrorLog parserDataBinary(
String dataBinaryBase64) {
default BrowserErrorLog parserDataBinary(byte[] dataBinary) {
try {
BrowserErrorLog log = new BrowserErrorLog();
org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog browserErrorLog = org.apache.skywalking.apm.network.language.agent.v3.BrowserErrorLog
.parseFrom(Base64.getDecoder().decode(dataBinaryBase64));
.parseFrom(dataBinary);

log.setService(browserErrorLog.getService());
log.setServiceVersion(browserErrorLog.getServiceVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
package org.apache.skywalking.oap.server.core.storage.query;

import com.google.protobuf.InvalidProtocolBufferException;

import java.io.IOException;
import java.util.Base64;
import java.util.List;

import org.apache.skywalking.apm.network.logging.v3.LogTags;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import org.apache.skywalking.oap.server.core.query.enumeration.Order;
Expand Down Expand Up @@ -50,11 +52,15 @@ Logs queryLogs(String serviceId,
final List<String> excludingKeywordsOfContent) throws IOException;

/**
* Parser the raw tags.
* Parse the raw tags with base64 representation of data binary
*/
default void parserDataBinary(String dataBinaryBase64, List<KeyValue> tags) {
parserDataBinary(Base64.getDecoder().decode(dataBinaryBase64), tags);
}

default void parserDataBinary(byte[] dataBinary, List<KeyValue> tags) {
try {
LogTags logTags = LogTags.parseFrom(Base64.getDecoder().decode(dataBinaryBase64));
LogTags logTags = LogTags.parseFrom(dataBinary);
logTags.getDataList().forEach(pair -> tags.add(new KeyValue(pair.getKey(), pair.getValue())));
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.core.storage.model;

import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
import org.apache.skywalking.oap.server.core.storage.annotation.ElasticSearch;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -31,7 +32,7 @@ public void testColumnDefine() {
new SQLDatabaseExtension(),
new ElasticSearchExtension(
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED)
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
Expand All @@ -40,7 +41,7 @@ public void testColumnDefine() {
false, false, true, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED)
);
Assert.assertEquals(true, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
Expand All @@ -50,7 +51,7 @@ public void testColumnDefine() {
false, false, true, 200,
new SQLDatabaseExtension(),
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED)
);
Assert.assertEquals(false, column.isStorageOnly());
Assert.assertEquals("abc", column.getColumnName().getName());
Expand All @@ -63,7 +64,7 @@ public void testConflictDefinition() {
new SQLDatabaseExtension(),
new ElasticSearchExtension(
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED)
);
}

Expand All @@ -74,7 +75,7 @@ public void testConflictDefinitionIndexOnly() {
new SQLDatabaseExtension(),
new ElasticSearchExtension(
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc"),
new BanyanDBExtension(-1, false, true)
new BanyanDBExtension(-1, false, true, BanyanDB.IndexRule.IndexType.INVERTED)
);
}
}
4 changes: 4 additions & 0 deletions oap-server/server-starter/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ storage:
superDatasetShardsFactor: ${SW_STORAGE_BANYANDB_SUPERDATASET_SHARDS_FACTOR:2}
concurrentWriteThreads: ${SW_STORAGE_BANYANDB_CONCURRENT_WRITE_THREADS:15}
profileTaskQueryMaxSize: ${SW_STORAGE_BANYANDB_PROFILE_TASK_QUERY_MAX_SIZE:200} # the max number of fetch task in a request
streamBlockInterval: ${SW_STORAGE_BANYANDB_STREAM_BLOCK_INTERVAL:4} # Unit is hour
streamSegmentInterval: ${SW_STORAGE_BANYANDB_STREAM_SEGMENT_INTERVAL:24} # Unit is hour
measureBlockInterval: ${SW_STORAGE_BANYANDB_MEASURE_BLOCK_INTERVAL:4} # Unit is hour
measureSegmentInterval: ${SW_STORAGE_BANYANDB_MEASURE_SEGMENT_INTERVAL:24} # Unit is hour

agent-analyzer:
selector: ${SW_AGENT_ANALYZER:default}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void apply(MeasureQuery query) {
return Collections.emptyList();
}

MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName);
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(modelName, duration.getStep());
if (schema == null) {
throw new IOException("schema is not registered");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> implements IBatchDAO {
private static final Object STREAM_SYNCHRONIZER = new Object();
Expand Down Expand Up @@ -64,16 +65,16 @@ public void insert(InsertRequest insertRequest) {
@Override
public CompletableFuture<Void> flush(List<PrepareRequest> prepareRequests) {
if (CollectionUtils.isNotEmpty(prepareRequests)) {
for (final PrepareRequest r : prepareRequests) {
return CompletableFuture.allOf(prepareRequests.stream().map((Function<PrepareRequest, CompletableFuture<Void>>) r -> {
if (r instanceof BanyanDBStreamInsertRequest) {
// TODO: return CompletableFuture<Void>
getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
return getStreamBulkWriteProcessor().add(((BanyanDBStreamInsertRequest) r).getStreamWrite());
} else if (r instanceof BanyanDBMeasureInsertRequest) {
getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureInsertRequest) r).getMeasureWrite());
} else if (r instanceof BanyanDBMeasureUpdateRequest) {
getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
return getMeasureBulkWriteProcessor().add(((BanyanDBMeasureUpdateRequest) r).getMeasureWrite());
}
}
return CompletableFuture.completedFuture(null);
}).toArray(CompletableFuture[]::new));
}

return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public static class StorageToStream implements Convert2Entity {
private final MetadataRegistry.Schema schema;
private final RowEntity rowEntity;

public StorageToStream(String modelName, RowEntity rowEntity) {
this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName);
public StorageToStream(String streamModelName, RowEntity rowEntity) {
this.schema = MetadataRegistry.INSTANCE.findRecordMetadata(streamModelName);
this.rowEntity = rowEntity;
}

Expand Down Expand Up @@ -215,8 +215,8 @@ public static class StorageToMeasure implements Convert2Entity {
private final MetadataRegistry.Schema schema;
private final DataPoint dataPoint;

public StorageToMeasure(String modelName, DataPoint dataPoint) {
this.schema = MetadataRegistry.INSTANCE.findMetadata(modelName);
public StorageToMeasure(MetadataRegistry.Schema schema, DataPoint dataPoint) {
this.schema = schema;
this.dataPoint = dataPoint;
}

Expand Down
Loading

0 comments on commit d32a318

Please sign in to comment.