Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,7 @@ public class DorisOptions {

public static final ConfigOption<Integer> DORIS_SINK_NET_BUFFER_SIZE = ConfigOptions.name("doris.sink.net.buffer.size").intType().defaultValue(1024 * 1024).withDescription("");

public static final ConfigOption<Boolean> DORIS_SINK_HTTP_UTF8_CHARSET = ConfigOptions.name("doris.sink.http-utf8-charset").booleanType().defaultValue(false).withDescription("");


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,31 @@ package org.apache.doris.spark.util
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.http.HttpHeaders
import org.apache.http.client.methods.HttpRequestBase
import org.apache.http.config.ConnectionConfig
import org.apache.http.conn.ssl.{SSLConnectionSocketFactory, TrustAllStrategy}
import org.apache.http.impl.client.{CloseableHttpClient, DefaultRedirectStrategy, HttpClients}
import org.apache.http.protocol.HttpRequestExecutor
import org.apache.http.ssl.SSLContexts

import java.io.{File, FileInputStream}
import java.nio.charset.StandardCharsets
import java.nio.charset.{CodingErrorAction, StandardCharsets}
import java.security.KeyStore
import java.util.Base64
import scala.util.{Failure, Success, Try}

object HttpUtils {

def getHttpClient(config: DorisConfig): CloseableHttpClient = {

var connectionConfig = ConnectionConfig.DEFAULT;
if (config.getValue(DorisOptions.DORIS_SINK_HTTP_UTF8_CHARSET)) {
connectionConfig = ConnectionConfig.custom()
.setCharset(StandardCharsets.UTF_8)
.setMalformedInputAction(CodingErrorAction.REPLACE)
.setUnmappableInputAction(CodingErrorAction.REPLACE).build()
}
val builder = HttpClients.custom()
.setDefaultConnectionConfig(connectionConfig)
.setRequestExecutor(new HttpRequestExecutor(60000))
.setRedirectStrategy(new DefaultRedirectStrategy {
override def isRedirectable(method: String): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DorisWriterITCase extends AbstractContainerTestBase {
val TABLE_JSON_TBL_OVERWRITE: String = "tbl_json_tbl_overwrite"
val TABLE_JSON_TBL_ARROW: String = "tbl_json_tbl_arrow"
val TABLE_BITMAP_TBL: String = "tbl_write_tbl_bitmap"
val TABLE_UNICODE_COL: String = "tbl_unicode_col"

@Test
@throws[Exception]
Expand Down Expand Up @@ -413,4 +414,51 @@ class DorisWriterITCase extends AbstractContainerTestBase {
LOG.info("Checking DorisWriterITCase result. testName={}, actual={}, expected={}", testName, actual, expected)
assertEqualsInAnyOrder(expected.toList.asJava, actual.toList.asJava)
}

@Test
def testWriteUnicodeColumn(): Unit = {
val createDb = String.format("CREATE DATABASE IF NOT EXISTS %s", DATABASE)
val targetInitSql: Array[String] = ContainerUtils.parseFileContentSQL("container/ddl/write_unicode_col.sql")
ContainerUtils.executeSQLStatement(getDorisQueryConnection, LOG, Array(createDb): _*)
val connection = getDorisQueryConnection(DATABASE)
ContainerUtils.executeSQLStatement(connection, LOG, targetInitSql: _*)

val session = SparkSession.builder().master("local[*]").getOrCreate()
try {
val df = session.createDataFrame(Seq(
(1, "243"),
(2, "1"),
(3, "287667876573")
)).toDF("序号", "内容")
df.createTempView("mock_source")
session.sql(
s"""
|CREATE TEMPORARY VIEW test_sink
|USING doris
|OPTIONS(
| "table.identifier"="${DATABASE + "." + TABLE_UNICODE_COL}",
| "fenodes"="${getFenodes}",
| "user"="${getDorisUsername}",
| "password"="${getDorisPassword}",
| "doris.sink.http-utf8-charset"="true"
|)
|""".stripMargin)
session.sql(
"""
|insert into test_sink select `序号`,`内容` from mock_source
|""".stripMargin)

Thread.sleep(10000)
val actual = ContainerUtils.executeSQLStatement(
connection,
LOG,
String.format("select `序号`,`内容` from %s.%s", DATABASE, TABLE_UNICODE_COL),
2)
val expected = util.Arrays.asList("1,243", "2,1", "3,287667876573");
checkResultInAnyOrder("testWriteUnicodeColumn", expected.toArray, actual.toArray)
} finally {
session.stop()
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
SET enable_unicode_name_support = true;
DROP TABLE IF EXISTS `tbl_unicode_col`;
CREATE TABLE `tbl_unicode_col` (
`序号` int NULL,
`内容` text NULL
) ENGINE=OLAP
DUPLICATE KEY(`序号`)
DISTRIBUTED BY HASH(`序号`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);