Skip to content

Commit 2d6c6f4

Browse files
Eduardo95BhaaVast
authored andcommitted
For branch next, add an expression function named FirstDifference, wh… (OpenTSDB#1458)
* For branch next, add an expression function named FirstDifference, which calculates the first difference of a time series. I noticed there is MovingAverage calculation, so I thought maybe I can enrich the mathematics functions into that. * add some unit tests for FirstDifference Bump version to 2.5.0-SNAPSHOT. Fix a compilation error about missing FirstDifference (OpenTSDB#1471) Signed-off-by: Chris Larsen <[email protected]> Bugfix of FsckOptions. (OpenTSDB#1464) Signed-off-by: Chris Larsen <[email protected]> CORE: (OpenTSDB#1472) - Add RpcResponder for handling callbacks asynchronously UTILS: - Add two convenient methods in Config Signed-off-by: Chris Larsen <[email protected]> fix OpenTSDB#1581 by correcting an edge case in TsdbQuery.getScanEndTimeSeconds() (OpenTSDB#1582) Dockerfile that works without a script. (OpenTSDB#1739) replace FOREVER with a valid value in table creation (OpenTSDB#1967) Co-authored-by: Ion DULGHERU <[email protected]> Jackson has a serious security problem in 2.9.5, which will cause RCE (OpenTSDB#2034) * Jackson has a serious security problem in 2.9.5, which will cause RCE FasterXML/jackson-databind#2295 * Jackson has a serious security problem in 2.9.5, which will cause RCE FasterXML/jackson-databind#2295 Co-authored-by: chi-chi weng <[email protected]> Pr 1663 (OpenTSDB#1966) * Make UniqueIdRpc aware of the mode * Update javadoc on new method and rename test methods to be more descriptive Co-authored-by: Simon Matic Langford <[email protected]> Re-introduce query timeouts. (OpenTSDB#2035) Co-authored-by: Itamar Turner-Trauring <[email protected]> Updating maven central urls and versions to match what is available now (OpenTSDB#2039) Fixes OpenTSDB#1899 Fixes OpenTSDB#1941 always write cli tools to stdout (OpenTSDB#1488) Signed-off-by: Chris Larsen <[email protected]> Fix OpenTSDB#1632 (OpenTSDB#1634) Add "check_tsd_v2" script (OpenTSDB#1567) Enhanced check_tsd script evaluates each individual metric group separately when given a filter Collect stats from meta cache plugin if configured (OpenTSDB#1649) Fix SaltScanner race condition on spans maps (OpenTSDB#1651) * Fix SaltScanner race condition on spans maps * Fix 1.6 compatibility Synchronise the KVs list for scanner results Synchronises the list that holds the KeyValues that have been produced by the scanner callbacks. The list is accessed from multiple threads at a time and wasn't thread-safe, causing inconsistent results and partial loss of data in the response. Relates to: OpenTSDB#1753 Resolves: OpenTSDB#1760 Allow rollup downsample and series aggregator to be different Fix TestSaltScannerHistogram, looks like the method was renamed and the UTs were not adjusted. ExplicitTags filtering with FuzzyFilters Fix PR 1896 with the fuzzy filter list so that it will honor the regex filter and properly ignore rows that don't match the explicit filter. Also sort the fuzzy filter list in ascending order and implement a static comparator instead of instantiating one on each call. Test rollup filter fix for OpenTSDB#1083 Fix concurrent result reporting from scanners Fixes a concurrency bug where scanners report their results into a map and would overwrite each other's results Resolves: OpenTSDB#1753 Update Maven jars URLs with HTTPS access Remove excess param in javadoc for RpcHandler Fix check_tsd_v2 (OpenTSDB#1937) * renamed instancename of logger The previous name was copied from another script, cosmetic change only * Change behaviour of --ignore-recent option Previous option would fetch data from opentsdb from --duration seconds ago to time.now(), and then try to remove timestamps that was inside the --ignore-recent seconds ago, however the logic was flawed and it actually only included these seconds. Furthermore opentsdb supports setting an "end" parameter, so we use this to only get the data we want. for example -d 180 -I 80, would render a query parameter that looks like `?start=180s-ago&end=80s-ago`. Keeps it simple. Also added debuglogging to output the actual query sent to OpenTSDB if --debug option is enabled. * fixed logic of --percent-over parameter Previous behaviour didn't work due to wrong logic, would set "crit" or "warn" to True regardless. This change fixes that. * better output from logging Add logmessages to be consistent across alerting-scenarios, and changed format of some floats. Fixed a log messaged that displayed "crit" value where it should have been "warn" value. * Fixed bug in logic that parses results Removed an if statement that `continue`:ed the for-loop if a result was neither a `crit` or `warn` already, however this check also made the logic skip the test to see if no values were returned by opentsdb and -A flag was specified to alert in such scenarios. * changed check for timestamps type Previous behaviour was to check if a timestamp could be cast as a float, which is a bit weird, because opentsdb will return integers. I do doubt that opentsdb would return a timestamp that is not an integer to begin with, so i suspect this check is redundant, but leaving it in for now regardless, as per discussion in PR. Rename maxScannerUidtoStringTime into maxScannerUidToStringTime (OpenTSDB#1875) Fix the missing index from OpenTSDB#1754 in the salt scanner. Force Sunday as first day of week. Tweak TestTsdbQueryQueries to pass in older java versions. Fix the min case for doubles in AggregationIterator. Fix the Screw Driver config. Fix UT for JDK8 PR for SD config. Fix: Rollup queries with count aggregator produce unexpected results (OpenTSDB#1895) Co-authored-by: Tony Di Nucci <[email protected]> Fixed function description Fixes OpenTSDB#841 (OpenTSDB#2040) Added tracking of metrics which are null due to auto_metric being disabled Fixes OpenTSDB#786 (OpenTSDB#2042) Add support for splitting rollup queries (OpenTSDB#1853) * Add an SLA config flag for rollup intervals Adds a configuration option for rollup intervals to specify their maximum acceptable delay. Queries that cover a time between now and that maximum delay will need to query other tables for that time interval. * Add global config flag to enable splitting queries Adds a global config flag to enable splitting queries that would hit the rollup table, but the rollup table has a delay SLA configured. In that case, this feature allows splitting a query into to; one that gets the data from the rollups table until the time where it's guaranteed to be available, and the rest from the raw table. * Add a new SplitRollupQuery Adds a SplitRollupQuery class that suports splitting a rollup query into two separate queries. This is useful for when a rollup table is filled by e.g. a batch job that processes the data from the previous day on a daily basis. Rollup data for yesterday will then only be available some time today. This delay SLA can be configured on a per-table basis. The delay would specify by how much time the table can be behind real time. If a query comes in that would query data from that blackout period where data is only available in the raw table, but not yet guaranteed to be in the rollup table, the incoming query can be split into two using the SplitRollupQuery class. It wraps a query that queries the rollup table until the last guaranteed to be available timestamp based on the SLA; and one that gets the remaining data from the raw table. * Extract an AbstractQuery Extracts an AbstractQuery from the TsdbQuery implementation since we'd like to reuse some parts of it in other Query classes (in this case SplitRollupQuery) * Extract an AbstractSpanGroup * Avoid NullPointerException when setting start time Avoids a NullPointerException that happened when we were trying to set the start time on a query that would be eligible to split, but due to the SLA config only hit the raw table anyway. * Scale timestamps to milliseconds for split queries Scales all timestamps for split queries to milliseconds. It's important to maintain consistent units between all the partial queries that make up the bigger one. * Fix starting time error for split queries Fixes a bug that would happen when the start time of a query aligns perfectly with the time configured in the SLA for the delay of a rollup table. For a defined SLA, e.g. 1 day, if the start time of the query was exactly 1 day ago, the end time of the rollups part of the query would be updated and then be equal to its start time. That isn't allowed and causes a query exception.
1 parent 877cdfa commit 2d6c6f4

32 files changed

+1146
-303
lines changed

tools/docker/Dockerfile renamed to Dockerfile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ FROM java:openjdk-8-alpine
22

33
44

5-
ENV VERSION 2.3.0-RC1
5+
ENV VERSION 2.5.0-SNAPSHOT
66
ENV WORKDIR /usr/share/opentsdb
77
ENV LOGDIR /var/log/opentsdb
88
ENV DATADIR /data/opentsdb
@@ -32,10 +32,10 @@ ENV TSDB_PORT 4244
3232

3333
WORKDIR $WORKDIR
3434

35-
ADD libs $WORKDIR/libs
36-
ADD logback.xml $WORKDIR
35+
ADD third_party/*/*.jar $WORKDIR/libs/
36+
ADD src/logback.xml $WORKDIR
3737
ADD tsdb-$VERSION.jar $WORKDIR
38-
ADD opentsdb.conf $ETCDIR/opentsdb.conf
38+
ADD src/opentsdb.conf $ETCDIR/opentsdb.conf
3939

4040
VOLUME ["/etc/openstsdb"]
4141
VOLUME ["/data/opentsdb"]

Makefile.am

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ tsdb_SRC := \
8282
src/core/RequestBuilder.java \
8383
src/core/RowKey.java \
8484
src/core/RowSeq.java \
85+
src/core/RpcResponder.java \
8586
src/core/iRowSeq.java \
8687
src/core/SaltScanner.java \
8788
src/core/SeekableView.java \
@@ -127,6 +128,7 @@ tsdb_SRC := \
127128
src/query/expression/ExpressionReader.java \
128129
src/query/expression/Expressions.java \
129130
src/query/expression/ExpressionTree.java \
131+
src/query/expression/FirstDifference.java \
130132
src/query/expression/HighestCurrent.java \
131133
src/query/expression/HighestMax.java \
132134
src/query/expression/IntersectionIterator.java \
@@ -323,6 +325,7 @@ test_SRC := \
323325
test/core/TestRateSpan.java \
324326
test/core/TestRowKey.java \
325327
test/core/TestRowSeq.java \
328+
test/core/TestRpcResponsder.java \
326329
test/core/TestSaltScanner.java \
327330
test/core/TestSeekableViewChain.java \
328331
test/core/TestSpan.java \

configure.ac

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
# along with this library. If not, see <http://www.gnu.org/licenses/>.
1515

1616
# Semantic Versioning (see http://semver.org/).
17-
AC_INIT([opentsdb], [2.4.1], [[email protected]])
17+
AC_INIT([opentsdb], [2.5.0-SNAPSHOT], [[email protected]])
18+
1819
AC_CONFIG_AUX_DIR([build-aux])
1920
AM_INIT_AUTOMAKE([foreign])
2021

src/core/IncomingDataPoints.java

Lines changed: 14 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Date;
1919
import java.util.List;
2020
import java.util.Map;
21+
import java.util.concurrent.atomic.AtomicLong;
2122

2223
import com.stumbleupon.async.Callback;
2324
import com.stumbleupon.async.Deferred;
@@ -45,6 +46,11 @@ final class IncomingDataPoints implements WritableDataPoints {
4546
*/
4647
static final Histogram putlatency = new Histogram(16000, (short) 2, 100);
4748

49+
/**
50+
* Keep track of the number of UIDs that came back null with auto_metric disabled.
51+
*/
52+
static final AtomicLong auto_metric_rejection_count = new AtomicLong();
53+
4854
/** The {@code TSDB} instance we belong to. */
4955
private final TSDB tsdb;
5056

@@ -137,9 +143,14 @@ static byte[] rowKeyTemplate(final TSDB tsdb, final String metric,
137143

138144
short pos = (short) Const.SALT_WIDTH();
139145

140-
copyInRowKey(row, pos,
141-
(tsdb.config.auto_metric() ? tsdb.metrics.getOrCreateId(metric)
142-
: tsdb.metrics.getId(metric)));
146+
byte[] metric_id = (tsdb.config.auto_metric() ? tsdb.metrics.getOrCreateId(metric)
147+
: tsdb.metrics.getId(metric));
148+
149+
if(!tsdb.config.auto_metric() && metric_id == null) {
150+
auto_metric_rejection_count.incrementAndGet();
151+
}
152+
153+
copyInRowKey(row, pos, metric_id);
143154
pos += metric_width;
144155

145156
pos += Const.TIMESTAMP_BYTES;
@@ -151,60 +162,6 @@ static byte[] rowKeyTemplate(final TSDB tsdb, final String metric,
151162
return row;
152163
}
153164

154-
/**
155-
* Returns a partially initialized row key for this metric and these tags. The
156-
* only thing left to fill in is the base timestamp.
157-
*
158-
* @since 2.0
159-
*/
160-
static Deferred<byte[]> rowKeyTemplateAsync(final TSDB tsdb,
161-
final String metric, final Map<String, String> tags) {
162-
final short metric_width = tsdb.metrics.width();
163-
final short tag_name_width = tsdb.tag_names.width();
164-
final short tag_value_width = tsdb.tag_values.width();
165-
final short num_tags = (short) tags.size();
166-
167-
int row_size = (Const.SALT_WIDTH() + metric_width + Const.TIMESTAMP_BYTES
168-
+ tag_name_width * num_tags + tag_value_width * num_tags);
169-
final byte[] row = new byte[row_size];
170-
171-
// Lookup or create the metric ID.
172-
final Deferred<byte[]> metric_id;
173-
if (tsdb.config.auto_metric()) {
174-
metric_id = tsdb.metrics.getOrCreateIdAsync(metric, metric, tags);
175-
} else {
176-
metric_id = tsdb.metrics.getIdAsync(metric);
177-
}
178-
179-
// Copy the metric ID at the beginning of the row key.
180-
class CopyMetricInRowKeyCB implements Callback<byte[], byte[]> {
181-
public byte[] call(final byte[] metricid) {
182-
copyInRowKey(row, (short) Const.SALT_WIDTH(), metricid);
183-
return row;
184-
}
185-
}
186-
187-
// Copy the tag IDs in the row key.
188-
class CopyTagsInRowKeyCB implements
189-
Callback<Deferred<byte[]>, ArrayList<byte[]>> {
190-
public Deferred<byte[]> call(final ArrayList<byte[]> tags) {
191-
short pos = (short) (Const.SALT_WIDTH() + metric_width);
192-
pos += Const.TIMESTAMP_BYTES;
193-
for (final byte[] tag : tags) {
194-
copyInRowKey(row, pos, tag);
195-
pos += tag.length;
196-
}
197-
// Once we've resolved all the tags, schedule the copy of the metric
198-
// ID and return the row key we produced.
199-
return metric_id.addCallback(new CopyMetricInRowKeyCB());
200-
}
201-
}
202-
203-
// Kick off the resolution of all tags.
204-
return Tags.resolveOrCreateAllAsync(tsdb, metric, tags)
205-
.addCallbackDeferring(new CopyTagsInRowKeyCB());
206-
}
207-
208165
public void setSeries(final String metric, final Map<String, String> tags) {
209166
checkMetricAndTags(metric, tags);
210167
try {

src/core/Query.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public Deferred<Object> configureFromQuery(final TSQuery query,
197197
* way we get this one data point is by aggregating all the data points of
198198
* that interval together using an {@link Aggregator}. This enables you
199199
* to compute things like the 5-minute average or 10 minute 99th percentile.
200-
* @param interval Number of seconds wanted between each data point.
200+
* @param interval Number of milliseconds wanted between each data point.
201201
* @param downsampler Aggregation function to use to group data points
202202
* within an interval.
203203
*/

src/core/RpcResponder.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// This file is part of OpenTSDB.
2+
// Copyright (C) 2010-2017 The OpenTSDB Authors.
3+
//
4+
// This program is free software: you can redistribute it and/or modify it
5+
// under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 2.1 of the License, or (at your
7+
// option) any later version. This program is distributed in the hope that it
8+
// will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9+
// of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
10+
// General Public License for more details. You should have received a copy
11+
// of the GNU Lesser General Public License along with this program. If not,
12+
// see <http://www.gnu.org/licenses/>.
13+
package net.opentsdb.core;
14+
15+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.Executors;
18+
import java.util.concurrent.TimeUnit;
19+
20+
import net.opentsdb.utils.Config;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
*
26+
* This class is responsible for building result of requests and
27+
* respond to clients asynchronously.
28+
*
29+
* It can reduce requests that stacking in AsyncHBase, especially put requests.
30+
* When a HBase's RPC has completed, the "AsyncHBase I/O worker" just decodes
31+
* the response, and then do callback by this class asynchronously. We should
32+
* take up workers as short as possible time so that workers can remove RPCs
33+
* from in-flight state more quickly.
34+
*
35+
*/
36+
public class RpcResponder {
37+
38+
private static final Logger LOG = LoggerFactory.getLogger(RpcResponder.class);
39+
40+
public static final String TSD_RESPONSE_ASYNC_KEY = "tsd.core.response.async";
41+
public static final boolean TSD_RESPONSE_ASYNC_DEFAULT = true;
42+
43+
public static final String TSD_RESPONSE_WORKER_NUM_KEY =
44+
"tsd.core.response.worker.num";
45+
public static final int TSD_RESPONSE_WORKER_NUM_DEFAULT = 10;
46+
47+
private final boolean async;
48+
private ExecutorService responders;
49+
private volatile boolean running = true;
50+
51+
RpcResponder(final Config config) {
52+
async = config.getBoolean(TSD_RESPONSE_ASYNC_KEY,
53+
TSD_RESPONSE_ASYNC_DEFAULT);
54+
55+
if (async) {
56+
int threads = config.getInt(TSD_RESPONSE_WORKER_NUM_KEY,
57+
TSD_RESPONSE_WORKER_NUM_DEFAULT);
58+
responders = Executors.newFixedThreadPool(threads,
59+
new ThreadFactoryBuilder()
60+
.setNameFormat("OpenTSDB Responder #%d")
61+
.setDaemon(true)
62+
.setUncaughtExceptionHandler(new ExceptionHandler())
63+
.build());
64+
}
65+
66+
LOG.info("RpcResponder mode: {}", async ? "async" : "sync");
67+
}
68+
69+
public void response(Runnable run) {
70+
if (async) {
71+
if (running) {
72+
responders.execute(run);
73+
} else {
74+
throw new IllegalStateException("RpcResponder is closing or closed.");
75+
}
76+
} else {
77+
run.run();
78+
}
79+
}
80+
81+
public void close() {
82+
if (running) {
83+
running = false;
84+
responders.shutdown();
85+
}
86+
87+
boolean completed;
88+
try {
89+
completed = responders.awaitTermination(5, TimeUnit.MINUTES);
90+
} catch (InterruptedException e) {
91+
completed = false;
92+
}
93+
94+
if (!completed) {
95+
LOG.warn(
96+
"There are still some results that are not returned to the clients.");
97+
}
98+
}
99+
100+
public boolean isAsync() {
101+
return async;
102+
}
103+
104+
private class ExceptionHandler implements Thread.UncaughtExceptionHandler {
105+
@Override
106+
public void uncaughtException(Thread t, Throwable e) {
107+
LOG.error("Run into an uncaught exception in thread: " + t.getName(), e);
108+
}
109+
}
110+
}

src/core/SaltScanner.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,8 @@ final class ScannerCB implements Callback<Object,
491491
private long rows_pre_filter = 0;
492492
private long dps_post_filter = 0;
493493
private long rows_post_filter = 0;
494-
494+
private long query_timeout = tsdb.getConfig().getLong("tsd.query.timeout");
495+
495496
public ScannerCB(final Scanner scanner, final int index) {
496497
this.scanner = scanner;
497498
this.index = index;
@@ -554,7 +555,24 @@ public Object call(final ArrayList<ArrayList<KeyValue>> rows)
554555
final List<Deferred<Object>> lookups =
555556
filters != null && !filters.isEmpty() ?
556557
new ArrayList<Deferred<Object>>(rows.size()) : null;
557-
558+
559+
// fail the query when the timeout exceeded
560+
if (this.query_timeout > 0 && fetch_time > (this.query_timeout * 1000000)) {
561+
try {
562+
close(false);
563+
handleException(
564+
new QueryException(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE,
565+
"Sorry, your query timed out. Time limit: "
566+
+ this.query_timeout + " ms, fetch time: "
567+
+ (double)(fetch_time)/1000000 + " ms. Please try filtering "
568+
+ "using more tags or decrease your time range."));
569+
return false;
570+
} catch (Exception e) {
571+
LOG.error("Sorry, Scanner is closed: " + scanner, e);
572+
return false;
573+
}
574+
}
575+
558576
// validation checking before processing the next set of results. It's
559577
// kinda funky but we want to allow queries to sneak through that were
560578
// just a *tad* over the limits so that's why we don't check at the

0 commit comments

Comments
 (0)