Skip to content

Commit

Permalink
Runn spotless:apply
Browse files Browse the repository at this point in the history
  • Loading branch information
grzegorz8 committed Jul 31, 2023
1 parent a399bd0 commit 9d951cf
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;

/**
* Factory for {@link ElasticsearchDialect}.
*/
/** Factory for {@link ElasticsearchDialect}. */
public class ElasticsearchDialectFactory implements JdbcDialectFactory {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ protected JdbcDeserializationConverter createInternalConverter(LogicalType type)
case FLOAT:
return val -> val;
case DATE:
return val -> (int) (((Timestamp) val).toLocalDateTime().toLocalDate().toEpochDay());
return val ->
(int) (((Timestamp) val).toLocalDateTime().toLocalDate().toEpochDay());
default:
return super.createInternalConverter(type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@

import org.junit.jupiter.api.extension.ExtendWith;

/**
* Base class for Elasticsearch testing.
*/
/** Base class for Elasticsearch testing. */
@ExtendWith(ElasticsearchDatabase.class)
public interface ElasticsearchTestBase extends DatabaseTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ protected List<TestItem> testData() {

// Not valid data
createTestItem("CHAR", "The Elasticsearch dialect doesn't support type: CHAR(1)."),
createTestItem("BINARY", "The Elasticsearch dialect doesn't support type: BINARY(1)."),
createTestItem(
"BINARY", "The Elasticsearch dialect doesn't support type: BINARY(1)."),
createTestItem("TIME", "The Elasticsearch dialect doesn't support type: TIME(0)."),
createTestItem(
"VARBINARY(10)",
Expand All @@ -40,5 +41,4 @@ protected List<TestItem> testData() {
"DECIMAL(10, 4)",
"The Elasticsearch dialect doesn't support type: DECIMAL(10, 4)."));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,24 @@

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for {@link ElasticsearchPreparedStatementTest}.
*/
/** Tests for {@link ElasticsearchPreparedStatementTest}. */
public class ElasticsearchPreparedStatementTest {

private final JdbcDialect dialect =
JdbcDialectLoader.load(
"jdbc:elasticsearch://localhost:9200/test", getClass().getClassLoader());

private final String[] fieldNames =
new String[]{"id", "name", "email", "ts", "field1", "field_2", "__field_3__"};
private final String[] keyFields = new String[]{"id", "__field_3__"};
new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"};
private final String[] keyFields = new String[] {"id", "__field_3__"};
private final String tableName = "tbl";

@Test
void testRowExistsStatement() {
String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
assertThat(rowExistStmt)
.isEqualTo("SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
.isEqualTo(
"SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
}

@Test
Expand All @@ -55,5 +54,4 @@ void testSelectStatement() {
+ "FROM \"tbl\" "
+ "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@
import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
import static org.assertj.core.api.Assertions.assertThat;

/**
* The Table Source ITCase for {@link ElasticsearchDialect}.
*/
public class ElasticsearchDynamicTableSourceITCase extends AbstractTestBase implements ElasticsearchTestBase {
/** The Table Source ITCase for {@link ElasticsearchDialect}. */
public class ElasticsearchDynamicTableSourceITCase extends AbstractTestBase
implements ElasticsearchTestBase {

private ElasticsearchRestClient client;
private TableEnvironment tEnv;
Expand Down Expand Up @@ -188,52 +187,52 @@ public void testFilter() {

// test TIMESTAMP filter
assertThat(
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'"))
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'"))
.containsExactly(onlyRow1);

// test the IN operator
assertThat(
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE 1 = idx AND double_col IN (100.1234, 101.1234)"))
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE 1 = idx AND double_col IN (100.1234, 101.1234)"))
.containsExactly(onlyRow1);

// test mixing AND and OR operator
assertThat(
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE idx = 1 AND double_col = 100.1234 OR double_col = 101.1234"))
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE idx = 1 AND double_col = 100.1234 OR double_col = 101.1234"))
.containsExactlyInAnyOrderElementsOf(twoRows);

// test mixing AND/OR with parenthesis, and the swapping the operand of equal expression
assertThat(
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE (2 = idx AND double_col = 100.1234) OR double_col = 101.1234"))
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE (2 = idx AND double_col = 100.1234) OR double_col = 101.1234"))
.containsExactly(onlyRow2);

// test Greater than, just to make sure we didnt break anything that we cannot pushdown
assertThat(
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE idx = 2 AND double_col > 100 OR double_col = 101.123"))
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE idx = 2 AND double_col > 100 OR double_col = 101.123"))
.containsExactly(onlyRow2);

// One more test of parenthesis
assertThat(
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE 2 = idx AND (double_col = 100.1234 OR double_col = 102.1234)"))
executeQuery(
"SELECT * FROM FAKE_TABLE WHERE 2 = idx AND (double_col = 100.1234 OR double_col = 102.1234)"))
.isEmpty();

assertThat(
executeQuery(
"SELECT * FROM "
+ partitionedTable
+ " WHERE id = 2 AND double_col > 100 OR double_col = 101.123"))
executeQuery(
"SELECT * FROM "
+ partitionedTable
+ " WHERE id = 2 AND double_col > 100 OR double_col = 101.123"))
.isEmpty();

assertThat(
executeQuery(
"SELECT * FROM "
+ partitionedTable
+ " WHERE 1 = id AND double_col IN (100.1234, 101.1234)"))
executeQuery(
"SELECT * FROM "
+ partitionedTable
+ " WHERE 1 = id AND double_col IN (100.1234, 101.1234)"))
.containsExactly(onlyRow1);
}

Expand Down Expand Up @@ -403,8 +402,6 @@ protected List<Row> getTestData() {
-1_223_372_036_854_775_807L,
-123.5f,
"other-text",
LocalDate.parse("2020-01-02"))
);
LocalDate.parse("2020-01-02")));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,22 @@

import static java.lang.String.format;

/**
* Creates content for Elastic Bulk API call.
*/
/** Creates content for Elastic Bulk API call. */
public class ElasticsearchBulkBuilder {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);

public static String createBulkContent(TableRow schema, List<Row> data) throws JsonProcessingException {
public static String createBulkContent(TableRow schema, List<Row> data)
throws JsonProcessingException {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < data.size(); ++i) {
builder.append(format("{\"create\":{\"_index\":\"%s\",\"_id\":\"%d\"}}\n", schema.getTableName(), i + 1));
builder.append(
format(
"{\"create\":{\"_index\":\"%s\",\"_id\":\"%d\"}}\n",
schema.getTableName(), i + 1));
builder.append(rowToJson(schema, data.get(i))).append('\n');
}
return builder.toString();
Expand All @@ -71,5 +74,4 @@ private static Object adjustValueIfNeeded(Object object) {
return object;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@
import static org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata.PASSWORD;
import static org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata.USERNAME;

/**
* Elasticsearch database for testing.
*/
/** Elasticsearch database for testing. */
public class ElasticsearchDatabase extends DatabaseExtension implements ElasticsearchImages {

private static final ElasticsearchContainer CONTAINER = new ElasticsearchContainer(ELASTICSEARCH_8)
.waitingFor(
Wait.forHttp("/")
.withBasicCredentials(USERNAME, PASSWORD)
.withReadTimeout(Duration.ofMinutes(2)));
private static final ElasticsearchContainer CONTAINER =
new ElasticsearchContainer(ELASTICSEARCH_8)
.waitingFor(
Wait.forHttp("/")
.withBasicCredentials(USERNAME, PASSWORD)
.withReadTimeout(Duration.ofMinutes(2)));

private static ElasticsearchMetadata metadata;
private static ElasticsearchRestClient client;
Expand Down Expand Up @@ -85,5 +84,4 @@ protected void stopDatabase() throws Exception {
metadata = null;
client = null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@
public interface ElasticsearchImages {

String ELASTICSEARCH_8 = "docker.elastic.co/elasticsearch/elasticsearch:8.8.1";

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,27 @@
import static java.util.Arrays.stream;
import static java.util.stream.Collectors.joining;

/**
* Creates content for Elastic Index API call.
*/
/** Creates content for Elastic Index API call. */
public class ElasticsearchIndexSchemaBuilder {

private static final ElasticsearchDataTypeMapper MAPPER = new ElasticsearchDataTypeMapper();

public static String buildIndexSchema(TableRow tableRow) {
String fields = stream(tableRow.getTableDataFields())
.map(field -> format("\"%s\": %s", field.getName(), field.getDataType().accept(MAPPER)))
.collect(joining(", "));
return "{\"settings\": {\"number_of_shards\": 1}, \"mappings\": {\"properties\": {" + fields + "}}}";
String fields =
stream(tableRow.getTableDataFields())
.map(
field ->
format(
"\"%s\": %s",
field.getName(),
field.getDataType().accept(MAPPER)))
.collect(joining(", "));
return "{\"settings\": {\"number_of_shards\": 1}, \"mappings\": {\"properties\": {"
+ fields
+ "}}}";
}

/**
* Maps Flink types to Elasticsearch types.
*/
/** Maps Flink types to Elasticsearch types. */
private static class ElasticsearchDataTypeMapper
implements DataTypeVisitor<String>, LogicalTypeVisitor<String> {

Expand Down Expand Up @@ -246,5 +250,4 @@ public String visit(LogicalType other) {
return other.accept(this);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,40 @@

import static java.lang.String.format;

/**
* Elasticsearch REST API client.
*/
/** Elasticsearch REST API client. */
public class ElasticsearchRestClient {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private static final ObjectMapper OBJECT_MAPPER =
new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private final RestClient restClient;

public ElasticsearchRestClient(ElasticsearchMetadata metadata) {
this(metadata.getContainerHost(),
this(
metadata.getContainerHost(),
metadata.getContainerPort(),
metadata.getUsername(),
metadata.getPassword());
}

public ElasticsearchRestClient(String host, int port, String username, String password) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
this.restClient = RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(builder -> builder.setDefaultCredentialsProvider(credentialsProvider))
.build();
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(username, password));
this.restClient =
RestClient.builder(new HttpHost(host, port, "http"))
.setHttpClientConfigCallback(
builder ->
builder.setDefaultCredentialsProvider(credentialsProvider))
.build();
}

public boolean trialEnabled() throws Exception {
Request request = new Request("GET", "/_license");
ElasticLicenseResponse response = executeRequest(request, ElasticLicenseResponse.class);
return response != null && response.license.status.equals("active") && response.license.type.equals("trial");
return response != null
&& response.license.status.equals("active")
&& response.license.type.equals("trial");
}

public void enableTrial() throws Exception {
Expand Down

0 comments on commit 9d951cf

Please sign in to comment.