diff --git a/bitsail-connectors/connector-oss/pom.xml b/bitsail-connectors/connector-oss/pom.xml new file mode 100644 index 000000000..211a54169 --- /dev/null +++ b/bitsail-connectors/connector-oss/pom.xml @@ -0,0 +1,61 @@ + + + + + bitsail-connectors + com.bytedance.bitsail + ${revision} + + 4.0.0 + + connector-oss + + + 3.1.1 + 8 + 8 + + + + org.apache.hadoop + hadoop-aliyun + ${hadoop-aliyun.version} + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + + + com.bytedance.bitsail + bitsail-component-format-json + ${revision} + + + com.bytedance.bitsail + bitsail-component-format-csv + ${revision} + + + \ No newline at end of file diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java new file mode 100644 index 000000000..38357258d --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConf.java @@ -0,0 +1,72 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.oss.option.OssReaderOptions; + +import lombok.Data; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.aliyun.oss.Constants; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@Data +public class OssConf implements Serializable { + private static final String HDFS_IMPL = "org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"; + private static final String SCHEMA = "oss"; + protected Map extraOptions = new HashMap<>(); + protected String hdfsNameKey; + protected String hdfsSitePath; + protected String kerberosPrincipal; + protected String kerberosKeytabPath; + + public String getHdfsImpl() { + return HDFS_IMPL; + } + + public String getSchema() { + return SCHEMA; + } + + public OssConf(String hdfsNameKey) { + this.hdfsNameKey = hdfsNameKey; + } + + public static OssConf buildWithConfig(BitSailConfiguration config) { + OssConf hadoopConf = new OssConf(config.get(OssReaderOptions.BUCKET)); + HashMap ossOptions = new HashMap<>(); + ossOptions.put(Constants.ACCESS_KEY_ID, config.get(OssReaderOptions.ACCESS_KEY)); + ossOptions.put( + Constants.ACCESS_KEY_SECRET, config.get(OssReaderOptions.ACCESS_SECRET)); + ossOptions.put(Constants.ENDPOINT_KEY, config.get(OssReaderOptions.ENDPOINT)); + hadoopConf.setExtraOptions(ossOptions); + return hadoopConf; + } + + public void setExtraOptionsForConfiguration(Configuration configuration) { + if (!extraOptions.isEmpty()) { + extraOptions.forEach(configuration::set); + } + if (hdfsSitePath != null) { + configuration.addResource(new Path(hdfsSitePath)); + } + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConfig.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConfig.java new file mode 100644 index 000000000..b246e91ff --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/config/OssConfig.java @@ -0,0 +1,56 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.config; + +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; +import com.bytedance.bitsail.connector.oss.option.OssReaderOptions; + +import lombok.Getter; +import lombok.Setter; + +import java.io.Serializable; + +@Getter +@Setter +public class OssConfig implements Serializable { + private String bucket; + private String accessKey; + private String accessSecret; + private String endpoint; + private ContentType contentType; + private Boolean skipFirstLine; + private String filePath; + + public OssConfig() { + } + + public enum ContentType { + CSV, + JSON + } + + public OssConfig(BitSailConfiguration jobConf) { + this.bucket = jobConf.getNecessaryOption(OssReaderOptions.BUCKET, OssConnectorErrorCode.REQUIRED_VALUE); + this.accessKey = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_KEY, OssConnectorErrorCode.REQUIRED_VALUE); + this.accessSecret = jobConf.getNecessaryOption(OssReaderOptions.ACCESS_SECRET, OssConnectorErrorCode.REQUIRED_VALUE); + this.endpoint = jobConf.getNecessaryOption(OssReaderOptions.ENDPOINT, OssConnectorErrorCode.REQUIRED_VALUE); + this.contentType = OssConfig.ContentType.valueOf(jobConf.getNecessaryOption(OssReaderOptions.CONTENT_TYPE, OssConnectorErrorCode.UNSUPPORTED_TYPE).toUpperCase()); + this.skipFirstLine = jobConf.get(OssReaderOptions.SKIP_FIRST_LINE); + this.filePath = jobConf.get(OssReaderOptions.FILE_PATH); + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java new file mode 100644 index 000000000..5bc2e6e10 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/constant/OssConstants.java @@ -0,0 +1,23 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.constant; + +public class OssConstants { + public static String OSS_CONNECTOR_NAME = "oss"; + public static final long OSS_SOURCE_SLEEP_MILL_SECS = 1000L; + public static final String OSS_SOURCE_IGNORE_FILENAME = "_SUCCESS"; +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/exception/OssConnectorErrorCode.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/exception/OssConnectorErrorCode.java new file mode 100644 index 000000000..68064a1a0 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/exception/OssConnectorErrorCode.java @@ -0,0 +1,49 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.exception; + +import com.bytedance.bitsail.common.exception.ErrorCode; + +public enum OssConnectorErrorCode implements ErrorCode { + REQUIRED_VALUE("Oss-01", "You missed parameter which is required, please check your configuration."), + CONFIG_ERROR("Oss-02", "Config parameter is error."), + UNSUPPORTED_TYPE("Oss-07", "Content Type is not supported"), + FILE_OPERATION_FAILED("Oss-04", "File Operation Failed"), + SPLIT_ERROR("Oss-05", "Something wrong with creating splits."); + private final String code; + private final String description; + + OssConnectorErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return code; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String toString() { + return super.toString(); + } +} \ No newline at end of file diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/option/OssReaderOptions.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/option/OssReaderOptions.java new file mode 100644 index 000000000..2a09eeb87 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/option/OssReaderOptions.java @@ -0,0 +1,93 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.option; + +import com.bytedance.bitsail.common.option.ConfigOption; +import com.bytedance.bitsail.common.option.ReaderOptions; + +import static com.bytedance.bitsail.common.option.ConfigOptions.key; +import static com.bytedance.bitsail.common.option.ReaderOptions.READER_PREFIX; + +public interface OssReaderOptions extends ReaderOptions.BaseReaderOptions { + + public static final ConfigOption ACCESS_KEY = + key(READER_PREFIX + "access_key") + .noDefaultValue(String.class); + + public static final ConfigOption ACCESS_SECRET = + key(READER_PREFIX + "access_secret") + .noDefaultValue(String.class); + + public static final ConfigOption ENDPOINT = + key(READER_PREFIX + "endpoint") + .noDefaultValue(String.class); + + public static final ConfigOption BUCKET = + key(READER_PREFIX + "bucket") + .noDefaultValue(String.class); + + ConfigOption FILE_PATH = + key(READER_PREFIX + "file_path") + .noDefaultValue(String.class); + + ConfigOption CONTENT_TYPE = + key(READER_PREFIX + "content_type") + .defaultValue("csv"); + + /** + * CSV Format Options + */ + // whether to treat error column as null when parsing + ConfigOption CONVERT_ERROR_COLUMN_AS_NULL = + key(READER_PREFIX + "convert_error_column_as_null") + .defaultValue(false); + + ConfigOption CSV_DELIMITER = + key(READER_PREFIX + "csv_delimiter") + .defaultValue(","); + + ConfigOption CSV_ESCAPE = + key(READER_PREFIX + "csv_escape") + .noDefaultValue(Character.class); + + ConfigOption CSV_QUOTE = + key(READER_PREFIX + "csv_quote") + .noDefaultValue(Character.class); + + ConfigOption CSV_WITH_NULL_STRING = + key(READER_PREFIX + "csv_with_null_string") + .noDefaultValue(String.class); + + ConfigOption CSV_MULTI_DELIMITER_REPLACER = + key(READER_PREFIX + "csv_multi_delimiter_replace_char") + .defaultValue('ยง'); + + ConfigOption SKIP_FIRST_LINE = + key(READER_PREFIX + "skip_first_line") + .defaultValue(false); + + /** + * JSON Options + *

+ * Tips: + * CONVERT_ERROR_COLUMN_AS_NULL is set above + */ + // whether to be insensitive to upper or lower case + ConfigOption CASE_INSENSITIVE = + key(READER_PREFIX + "case_insensitive") + .defaultValue(false); +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java new file mode 100644 index 000000000..a06f975ac --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/OssSource.java @@ -0,0 +1,99 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source; + +import com.bytedance.bitsail.base.connector.reader.v1.Boundedness; +import com.bytedance.bitsail.base.connector.reader.v1.Source; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.base.execution.ExecutionEnviron; +import com.bytedance.bitsail.base.extension.ParallelismComputable; +import com.bytedance.bitsail.base.parallelism.ParallelismAdvice; +import com.bytedance.bitsail.base.serializer.BinarySerializer; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.common.type.TypeInfoConverter; +import com.bytedance.bitsail.common.type.filemapping.FileMappingTypeInfoConverter; +import com.bytedance.bitsail.connector.oss.constant.OssConstants; +import com.bytedance.bitsail.connector.oss.option.OssReaderOptions; +import com.bytedance.bitsail.connector.oss.source.reader.OssSourceReader; +import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplit; +import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplitCoordinator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OssSource implements Source, ParallelismComputable { + private static final Logger LOG = LoggerFactory.getLogger(OssSource.class); + private BitSailConfiguration jobConf; + + @Override + public void configure(ExecutionEnviron execution, BitSailConfiguration readerConfiguration) { + this.jobConf = readerConfiguration; + } + + @Override + public Boundedness getSourceBoundedness() { + return Boundedness.BOUNDEDNESS; + } + + @Override + public SourceReader createReader(SourceReader.Context readerContext) { + return new OssSourceReader(jobConf, readerContext); + } + + @Override + public SourceSplitCoordinator createSplitCoordinator(SourceSplitCoordinator.Context coordinatorContext) { + return new OssSourceSplitCoordinator(coordinatorContext, jobConf); + } + + @Override + public BinarySerializer getSplitSerializer() { + return Source.super.getSplitSerializer(); + } + + @Override + public BinarySerializer getSplitCoordinatorCheckpointSerializer() { + return Source.super.getSplitCoordinatorCheckpointSerializer(); + } + + @Override + public TypeInfoConverter createTypeInfoConverter() { + return new FileMappingTypeInfoConverter(getReaderName()); + } + + @Override + public String getReaderName() { + return OssConstants.OSS_CONNECTOR_NAME; + } + + @Override + public ParallelismAdvice getParallelismAdvice(BitSailConfiguration commonConf, BitSailConfiguration selfConf, ParallelismAdvice upstreamAdvice) throws Exception { + int parallelism; + if (selfConf.fieldExists(OssReaderOptions.READER_PARALLELISM_NUM)) { + parallelism = selfConf.get(OssReaderOptions.READER_PARALLELISM_NUM); + } else { + parallelism = 1; + } + return ParallelismAdvice.builder() + .adviceParallelism(parallelism) + .enforceDownStreamChain(false) + .build(); + + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java new file mode 100644 index 000000000..7a0748747 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/DeserializationSchemaFactory.java @@ -0,0 +1,45 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.component.format.csv.CsvDeserializationSchema; +import com.bytedance.bitsail.component.format.json.JsonRowDeserializationSchema; +import com.bytedance.bitsail.connector.oss.config.OssConfig; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; + +public class DeserializationSchemaFactory { + public static DeserializationSchema createDeserializationSchema(BitSailConfiguration jobConf, SourceReader.Context context, + OssConfig ossConfig) { + if (ossConfig.getContentType() == OssConfig.ContentType.CSV) { + return new CsvDeserializationSchema( + jobConf, + context.getRowTypeInfo()); + } else if (ossConfig.getContentType() == OssConfig.ContentType.JSON) { + return new JsonRowDeserializationSchema( + jobConf, + context.getRowTypeInfo()); + } else { + throw BitSailException.asBitSailException(OssConnectorErrorCode.UNSUPPORTED_TYPE, + "Content type only supports CSV and JSON"); + } + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java new file mode 100644 index 000000000..a380a3eac --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/reader/OssSourceReader.java @@ -0,0 +1,165 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.reader; + +import com.bytedance.bitsail.base.connector.reader.v1.SourcePipeline; +import com.bytedance.bitsail.base.connector.reader.v1.SourceReader; +import com.bytedance.bitsail.base.format.DeserializationSchema; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.common.row.Row; +import com.bytedance.bitsail.connector.oss.config.OssConf; +import com.bytedance.bitsail.connector.oss.config.OssConfig; +import com.bytedance.bitsail.connector.oss.constant.OssConstants; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; +import com.bytedance.bitsail.connector.oss.source.split.OssSourceSplit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; + +public class OssSourceReader implements SourceReader { + private static final Logger LOG = LoggerFactory.getLogger(OssSourceReader.class); + protected OssConf ossConf; + private final transient DeserializationSchema deserializationSchema; + private final OssConfig ossConfig; + private final transient Context context; + private final Deque splits; + private boolean skipFirstLine = false; + private boolean hasNoMoreSplits = false; + private int totalSplitNum = 0; + private int skipFirstLineNums = 0; + private OssSourceSplit currentSplit; + FileSystem fs; + + public OssSourceReader(BitSailConfiguration jobConf, Context context) { + this.ossConfig = new OssConfig(jobConf); + this.context = context; + this.deserializationSchema = DeserializationSchemaFactory.createDeserializationSchema(jobConf, context, ossConfig); + this.splits = new LinkedList<>(); + this.ossConf = OssConf.buildWithConfig(jobConf); + LOG.info("OssSourceReader is initialized."); + } + + @Override + public void start() { + if (this.ossConfig.getSkipFirstLine()) { + skipFirstLine = true; + this.skipFirstLineNums = 1; + } + } + + @Override + public void pollNext(SourcePipeline pipeline) throws Exception { + if (currentSplit == null && splits.isEmpty()) { + LOG.info("pollnext no splits"); + Thread.sleep(OssConstants.OSS_SOURCE_SLEEP_MILL_SECS); + return; + } + LOG.info("pollnext split size {}", this.splits.size()); + Configuration conf = getConfiguration(); + fs = FileSystem.get(conf); + this.currentSplit = this.splits.poll(); + LOG.info("split {} path {}", currentSplit, currentSplit.getPath()); + Path filePath = new Path(currentSplit.getPath()); + try (BufferedReader reader = + new BufferedReader( + new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + reader.lines() + .skip(skipFirstLineNums) + .forEach( + line -> { + try { + if (line != null) { + Row row = deserializationSchema.deserialize(line.getBytes()); + pipeline.output(row); + } + } catch (IOException e) { + String errorMsg = + String.format( + "Read data from this file [%s] failed", + filePath); + throw BitSailException.asBitSailException( + OssConnectorErrorCode.FILE_OPERATION_FAILED, errorMsg, e); + } + }); + } + } + + Configuration getConfiguration() { + return getConfiguration(this.ossConf); + } + + public Configuration getConfiguration(OssConf ossConf) { + Configuration configuration = new Configuration(); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, ossConf.getHdfsNameKey()); + configuration.set( + String.format("fs.%s.impl", ossConf.getSchema()), ossConf.getHdfsImpl()); + ossConf.setExtraOptionsForConfiguration(configuration); + return configuration; + } + + @Override + public void addSplits(List splitList) { + totalSplitNum += splitList.size(); + this.splits.addAll(splitList); + } + + @Override + public boolean hasMoreElements() { + if (hasNoMoreSplits && splits.isEmpty()) { + LOG.info("Finish reading all {} splits.", totalSplitNum); + return false; + } + return true; + } + + @Override + public void notifyNoMoreSplits() { + this.hasNoMoreSplits = true; + LOG.info("No more splits will be assigned."); + } + + @Override + public List snapshotState(long checkpointId) { + return Collections.emptyList(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + SourceReader.super.notifyCheckpointComplete(checkpointId); + } + + @Override + public void close() throws Exception { + if (fs != null) { + fs.close(); + } + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java new file mode 100644 index 000000000..c106e894e --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplit.java @@ -0,0 +1,42 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplit; + +import lombok.Getter; +import lombok.Setter; + +@Getter +public class OssSourceSplit implements SourceSplit { + public static final String OSS_SOURCE_SPLIT_PREFIX = "Oss_source_split_"; + private final String splitId; + + public OssSourceSplit(String splitId) { + this.splitId = OSS_SOURCE_SPLIT_PREFIX + splitId; + this.path = splitId; + } + + @Setter + private String path; + + @Override + public String uniqSplitId() { + return splitId; + } + +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java new file mode 100644 index 000000000..b5c8a7690 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/source/split/OssSourceSplitCoordinator.java @@ -0,0 +1,158 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.source.split; + +import com.bytedance.bitsail.base.connector.reader.v1.SourceEvent; +import com.bytedance.bitsail.base.connector.reader.v1.SourceSplitCoordinator; +import com.bytedance.bitsail.base.connector.writer.v1.state.EmptyState; +import com.bytedance.bitsail.common.BitSailException; +import com.bytedance.bitsail.common.configuration.BitSailConfiguration; +import com.bytedance.bitsail.connector.oss.config.OssConf; +import com.bytedance.bitsail.connector.oss.config.OssConfig; +import com.bytedance.bitsail.connector.oss.exception.OssConnectorErrorCode; +import com.bytedance.bitsail.connector.oss.util.OssUtil; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.NoArgsConstructor; +import org.apache.commons.collections.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class OssSourceSplitCoordinator implements SourceSplitCoordinator { + private static final Logger LOG = LoggerFactory.getLogger(OssSourceSplitCoordinator.class); + private final SourceSplitCoordinator.Context context; + private final BitSailConfiguration jobConf; + private OssConfig ossConfig; + private final Map> splitAssignmentPlan; + + public OssSourceSplitCoordinator(SourceSplitCoordinator.Context context, BitSailConfiguration jobConf) { + this.context = context; + this.jobConf = jobConf; + this.ossConfig = new OssConfig(jobConf); + this.splitAssignmentPlan = Maps.newConcurrentMap(); + } + + private List constructSplit() throws IOException { + OssConf conf = OssConf.buildWithConfig(this.jobConf); + String path = ossConfig.getFilePath(); + List fileList = OssUtil.getFileNamesByPath(conf, path); + LOG.info("OssSourceSplitCoordinator fileList: {}", fileList); + List fileSourceSplits = new ArrayList<>(); + fileList.forEach(file -> fileSourceSplits.add(new OssSourceSplit(file))); + return fileSourceSplits; + } + + @Override + public void start() { + List splits; + try { + splits = constructSplit(); + } catch (IOException e) { + throw new BitSailException(OssConnectorErrorCode.SPLIT_ERROR, "Failed to create splits."); + } + int readerNum = context.totalParallelism(); + LOG.info("Found {} readers and {} splits.", readerNum, splits.size()); + if (readerNum > splits.size()) { + LOG.warn("Reader number {} is larger than split number {}.", readerNum, splits.size()); + } + for (OssSourceSplit split : splits) { + int readerIndex = ReaderSelector.getReaderIndex(readerNum); + splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); + LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex); + } + } + + @Override + public void addReader(int subtaskId) { + LOG.info("Found reader {}", subtaskId); + tryAssignSplitsToReader(); + } + + private void tryAssignSplitsToReader() { + Map> splitsToAssign = new HashMap<>(); + for (Integer readerIndex : splitAssignmentPlan.keySet()) { + if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) { + splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex))); + } + } + for (Integer readerIndex : splitsToAssign.keySet()) { + LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex, + splitsToAssign.get(readerIndex).stream().map(OssSourceSplit::uniqSplitId).collect(Collectors.toList())); + splitAssignmentPlan.remove(readerIndex); + context.assignSplit(readerIndex, splitsToAssign.get(readerIndex)); + context.signalNoMoreSplits(readerIndex); + LOG.info("Finish assigning splits reader {}", readerIndex); + } + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + LOG.info("Source reader {} return splits {}.", subtaskId, splits); + int readerNum = context.totalParallelism(); + for (OssSourceSplit split : splits) { + int readerIndex = ReaderSelector.getReaderIndex(readerNum); + splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); + LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex); + } + tryAssignSplitsToReader(); + } + + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { + + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + SourceSplitCoordinator.super.handleSourceEvent(subtaskId, sourceEvent); + } + + @Override + public EmptyState snapshotState(long checkpoint) throws Exception { + return new EmptyState(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + SourceSplitCoordinator.super.notifyCheckpointComplete(checkpointId); + } + + @Override + public void close() { + } + + @NoArgsConstructor + static class ReaderSelector { + private static long readerIndex = 0; + + public static int getReaderIndex(int totalReaderNum) { + return (int) readerIndex++ % totalReaderNum; + } + } +} diff --git a/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java new file mode 100644 index 000000000..d8c78b742 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/java/com/bytedance/bitsail/connector/oss/util/OssUtil.java @@ -0,0 +1,62 @@ +/* + * Copyright 2022-2023 Bytedance Ltd. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.bytedance.bitsail.connector.oss.util; + +import com.bytedance.bitsail.connector.oss.config.OssConf; +import com.bytedance.bitsail.connector.oss.constant.OssConstants; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class OssUtil { + public static Configuration getConfiguration(OssConf ossConf) { + Configuration configuration = new Configuration(); + configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, ossConf.getHdfsNameKey()); + configuration.set( + String.format("fs.%s.impl", ossConf.getSchema()), ossConf.getHdfsImpl()); + ossConf.setExtraOptionsForConfiguration(configuration); + return configuration; + } + + public static List getFileNamesByPath(OssConf hadoopConf, String path) throws IOException { + Configuration configuration = getConfiguration(hadoopConf); + FileSystem fs = FileSystem.get(configuration); + ArrayList fileNames = new ArrayList<>(); + Path listFiles = new Path(path); + FileStatus[] stats = fs.listStatus(listFiles); + for (FileStatus fileStatus : stats) { + if (fileStatus.isDirectory()) { + fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString())); + continue; + } + if (fileStatus.isFile()) { + if (!fileStatus.getPath().getName().equals(OssConstants.OSS_SOURCE_IGNORE_FILENAME)) { + String filePath = fileStatus.getPath().toString(); + fileNames.add(filePath); + } + } + } + return fileNames; + } +} diff --git a/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json b/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json new file mode 100644 index 000000000..a8be6a9df --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/resources/bitsail-connector-unified-oss.json @@ -0,0 +1,9 @@ +{ + "name": "bitsail-connector-unified-oss", + "classes": [ + "com.bytedance.bitsail.connector.oss.source.OssSource" + ], + "libs": [ + "connector-oss-${version}.jar" + ] +} diff --git a/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml b/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml new file mode 100644 index 000000000..503ee65b4 --- /dev/null +++ b/bitsail-connectors/connector-oss/src/main/resources/oss-type-converter.yaml @@ -0,0 +1,88 @@ +engine.type.to.bitsail.type.converter: + - source.type: tinyint + target.type: int + + - source.type: smallint + target.type: int + + - source.type: int + target.type: int + + - source.type: long + target.type: bigint + + - source.type: bigint + target.type: bigint + + - source.type: float + target.type: float + + - source.type: double + target.type: double + + - source.type: decimal + target.type: bigdecimal + + - source.type: timestamp + target.type: timestamp + + - source.type: date + target.type: date + + - source.type: string + target.type: string + + - source.type: varchar + target.type: string + + - source.type: char + target.type: string + + - source.type: boolean + target.type: boolean + + - source.type: binary + target.type: bytes + +bitsail.type.to.engine.type.converter: + - source.type: byte + target.type: tinyint + + - source.type: short + target.type: smallint + + - source.type: int + target.type: int + + - source.type: long + target.type: bigint + + - source.type: bigint + target.type: bigint + + - source.type: double + target.type: double + + - source.type: float + target.type: float + + - source.type: bigdecimal + target.type: decimal + + - source.type: string + target.type: string + + - source.type: boolean + target.type: boolean + + - source.type: date.date + target.type: string + + - source.type: date.time + target.type: string + + - source.type: date.datetime + target.type: bigint + + - source.type: bytes + target.type: binary \ No newline at end of file diff --git a/bitsail-connectors/pom.xml b/bitsail-connectors/pom.xml index 7f423d273..445acc1dd 100644 --- a/bitsail-connectors/pom.xml +++ b/bitsail-connectors/pom.xml @@ -52,6 +52,7 @@ connector-cdc connector-kafka connector-mongodb + connector-oss diff --git a/bitsail-dist/pom.xml b/bitsail-dist/pom.xml index 276586b39..d8f6c415f 100644 --- a/bitsail-dist/pom.xml +++ b/bitsail-dist/pom.xml @@ -273,6 +273,13 @@ ${revision} provided + + + com.bytedance.bitsail + connector-oss + ${revision} + provided +