diff --git a/README.md b/README.md index d970d6bf6d..025804f675 100644 --- a/README.md +++ b/README.md @@ -100,7 +100,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N - 整库迁移:https://help.aliyun.com/document_detail/137809.html - 批量上云:https://help.aliyun.com/document_detail/146671.html - 更新更多能力请访问:https://help.aliyun.com/document_detail/137663.html - - + # 我要开发新的插件 @@ -108,18 +108,8 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N # 重要版本更新说明 -DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容如下。 - -- [datax_v202309](https://github.com/alibaba/DataX/releases/tag/datax_v202309) - - 支持Phoenix 同步数据添加 where条件 - - 支持华为 GuassDB读写插件 - - 修复ClickReader 插件运行报错 Can't find bundle for base name - - 增加 DataX调试模块 - - 修复 orc空文件报错问题 - - 优化obwriter性能 - - txtfilewriter 增加导出为insert语句功能支持 - - HdfsReader/HdfsWriter 支持parquet读写能力 - +DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下。 + - [datax_v202308](https://github.com/alibaba/DataX/releases/tag/datax_v202308) - OTS 插件更新 - databend 插件更新 diff --git a/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java b/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java index df5e1e4a17..f688d1639f 100755 --- a/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java +++ b/common/src/main/java/com/alibaba/datax/common/element/DateColumn.java @@ -5,7 +5,6 @@ import java.math.BigDecimal; import java.math.BigInteger; -import java.sql.Time; import java.util.Date; /** @@ -13,54 +12,18 @@ */ public class DateColumn extends Column { - private DateType subType = DateType.DATETIME; - - private int nanos = 0; - - private int precision = -1; - - public static enum DateType { - DATE, TIME, DATETIME - } - - /** - * 构建值为time(java.sql.Time)的DateColumn,使用Date子类型为TIME,只有时间,没有日期 - */ - public DateColumn(Time time, int nanos, int jdbcPrecision) { - this(time); - if (time != null) { - setNanos(nanos); - } - if (jdbcPrecision == 10) { - setPrecision(0); - } - if (jdbcPrecision >= 12 && jdbcPrecision <= 17) { - setPrecision(jdbcPrecision - 11); - } - } - - public long getNanos() { - return nanos; - } - - public void setNanos(int nanos) { - this.nanos = nanos; - } - - public int getPrecision() { - return precision; - } - - public void setPrecision(int precision) { - this.precision = precision; - } - - /** - * 构建值为null的DateColumn,使用Date子类型为DATETIME - */ - public DateColumn() { - this((Long) null); - } + private DateType subType = DateType.DATETIME; + + public static enum DateType { + DATE, TIME, DATETIME + } + + /** + * 构建值为null的DateColumn,使用Date子类型为DATETIME + * */ + public DateColumn() { + this((Long)null); + } /** * 构建值为stamp(Unix时间戳)的DateColumn,使用Date子类型为DATETIME diff --git a/common/src/main/java/com/alibaba/datax/common/util/Configuration.java b/common/src/main/java/com/alibaba/datax/common/util/Configuration.java index ef29320dd2..c1194532a7 100755 --- a/common/src/main/java/com/alibaba/datax/common/util/Configuration.java +++ b/common/src/main/java/com/alibaba/datax/common/util/Configuration.java @@ -1047,7 +1047,7 @@ private void checkPath(final String path) { "系统编程错误, 该异常代表系统编程错误, 请联系DataX开发团队!."); } - for (final String each : StringUtils.split(path, ".")) { + for (final String each : StringUtils.split(".")) { if (StringUtils.isBlank(each)) { throw new IllegalArgumentException(String.format( "系统编程错误, 路径[%s]不合法, 路径层次之间不能出现空白字符 .", path)); diff --git a/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java b/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java index 24f43d55d5..20039864b8 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java +++ b/core/src/main/java/com/alibaba/datax/core/util/ConfigParser.java @@ -168,7 +168,6 @@ public static Configuration parseOnePluginConfig(final String path, boolean isDefaultPath = StringUtils.isBlank(pluginPath); if (isDefaultPath) { configuration.set("path", path); - configuration.set("loadType","jarLoader"); } Configuration result = Configuration.newDefault(); diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java b/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java index ddf22baef6..9fc113dc6a 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/JarLoader.java @@ -15,7 +15,7 @@ /** * 提供Jar隔离的加载机制,会把传入的路径、及其子路径、以及路径中的jar文件加入到class path。 */ -public class JarLoader extends URLClassLoader{ +public class JarLoader extends URLClassLoader { public JarLoader(String[] paths) { this(paths, JarLoader.class.getClassLoader()); } diff --git a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java index 9a6a830291..30e926c385 100755 --- a/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java +++ b/core/src/main/java/com/alibaba/datax/core/util/container/LoadUtil.java @@ -49,7 +49,7 @@ public String value() { /** * jarLoader的缓冲 */ - private static Map jarLoaderCenter = new HashMap(); + private static Map jarLoaderCenter = new HashMap(); /** * 设置pluginConfigs,方便后面插件来获取 diff --git a/core/src/main/job/job.json b/core/src/main/job/job.json index ad5d4a85c5..cc35387778 100755 --- a/core/src/main/job/job.json +++ b/core/src/main/job/job.json @@ -2,10 +2,11 @@ "job": { "setting": { "speed": { - "channel": 2 + "channel":1 }, "errorLimit": { - "record": 0 + "record": 0, + "percentage": 0.02 } }, "content": [ @@ -13,17 +14,17 @@ "reader": { "name": "streamreader", "parameter": { - "column": [ + "column" : [ { "value": "DataX", "type": "string" }, { - "value": 1724154616370, + "value": 19890604, "type": "long" }, { - "value": "2024-01-01 00:00:00", + "value": "1989-06-04 00:00:00", "type": "date" }, { @@ -31,11 +32,11 @@ "type": "bool" }, { - "value": "TestRawData", + "value": "test", "type": "bytes" } ], - "sliceRecordCount": 100 + "sliceRecordCount": 100000 } }, "writer": { @@ -48,4 +49,4 @@ } ] } -} \ No newline at end of file +} diff --git a/dataxPluginDev.md b/dataxPluginDev.md index 8c7241bf5a..4483f2708c 100644 --- a/dataxPluginDev.md +++ b/dataxPluginDev.md @@ -447,9 +447,6 @@ DataX的内部类型在实现上会选用不同的java类型: 3. 用户在插件中在`reader`/`writer`配置的`name`字段指定插件名字。框架根据插件的类型(`reader`/`writer`)和插件名称去插件的路径下扫描所有的jar,加入`classpath`。 4. 根据插件配置中定义的入口类,框架通过反射实例化对应的`Job`和`Task`对象。 -### 编写测试用例 -1. 在datax-example工程下新建新的插件测试模块,调用`ExampleContainer.start(jobPath)`方法来检测你的代码逻辑是否正确。[datax-example使用](https://github.com/alibaba/DataX/blob/master/datax-example/doc/README.md) - ## 三、Last but not Least diff --git a/package.xml b/package.xml index 624109f799..786584f2af 100644 --- a/package.xml +++ b/package.xml @@ -39,13 +39,6 @@ datax - - obhbasereader/target/datax/ - - **/*.* - - datax - drdsreader/target/datax/ @@ -117,13 +110,13 @@ datax - - ossreader/target/datax/ - - **/*.* - - datax - + + + + + + + mongodbreader/target/datax/ @@ -159,13 +152,13 @@ datax - - hdfsreader/target/datax/ - - **/*.* - - datax - + + + + + + + hbase11xreader/target/datax/ @@ -244,14 +237,7 @@ datax - dorisreader/target/datax/ - - **/*.* - - datax - - - sybasereader/target/datax/ + qcubicreader/target/datax/ **/*.* @@ -315,13 +301,13 @@ datax - - osswriter/target/datax/ - - **/*.* - - datax - + + + + + + + adswriter/target/datax/ @@ -392,13 +378,13 @@ datax - - hdfswriter/target/datax/ - - **/*.* - - datax - + + + + + + + hbase11xwriter/target/datax/ @@ -483,13 +469,6 @@ datax - - obhbasewriter/target/datax/ - - **/*.* - - datax - gdbwriter/target/datax/ @@ -539,12 +518,12 @@ datax - - sybasewriter/target/datax/ - - **/*.* - - datax - + + + + + + + diff --git a/plugin-rdbms-util/pom.xml b/plugin-rdbms-util/pom.xml index 6dc69e06c6..c49f64af16 100755 --- a/plugin-rdbms-util/pom.xml +++ b/plugin-rdbms-util/pom.xml @@ -33,17 +33,6 @@ ${mysql.driver.version} test - - com.oceanbase - oceanbase-client - 2.4.11 - - - com.google.guava - guava - - - org.slf4j slf4j-api diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java index f31804025e..aa58a77301 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/CommonRdbmsReader.java @@ -244,6 +244,7 @@ protected Record buildRecord(RecordSender recordSender,ResultSet rs, ResultSetMe try { for (int i = 1; i <= columnNumber; i++) { + switch (metaData.getColumnType(i)) { case Types.CHAR: diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java index da078df924..0eb34feb0d 100644 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ObVersion.java @@ -16,8 +16,6 @@ public class ObVersion implements Comparable { private int patchNumber; public static final ObVersion V2276 = valueOf("2.2.76"); - public static final ObVersion V2252 = valueOf("2.2.52"); - public static final ObVersion V3 = valueOf("3.0.0.0"); public static final ObVersion V4000 = valueOf("4.0.0.0"); private static final ObVersion DEFAULT_VERSION = diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java index dec2353d9e..70d793f086 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/DataBaseType.java @@ -25,10 +25,9 @@ public enum DataBaseType { Oscar("oscar", "com.oscar.Driver"), OceanBase("oceanbase", "com.alipay.oceanbase.jdbc.Driver"), StarRocks("starrocks", "com.mysql.jdbc.Driver"), - Sybase("sybase", "com.sybase.jdbc4.jdbc.SybDriver"), - GaussDB("gaussdb", "org.opengauss.Driver"), Databend("databend", "com.databend.jdbc.DatabendDriver"), - Doris("doris","com.mysql.jdbc.Driver"); + Qcubic("qcubic","Qcubic.jdbc.driver.QcubicDriver"), + QcubicShard("qcubicShard","QcubicShard.jdbc.driver.QcubicDriver"); private String typeName; private String driverClassName; @@ -74,9 +73,9 @@ public String appendJDBCSuffixForReader(String jdbc) { break; case StarRocks: break; - case GaussDB: + case Qcubic: break; - case Doris: + case QcubicShard: break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type."); @@ -139,9 +138,9 @@ public String appendJDBCSuffixForWriter(String jdbc) { result = jdbc + "?" + suffix; } break; - case Sybase: + case Qcubic: break; - case GaussDB: + case QcubicShard: break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type."); @@ -170,7 +169,9 @@ public String formatPk(String splitPk) { case KingbaseES: case Oscar: break; - case GaussDB: + case Qcubic: + break; + case QcubicShard: break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type."); @@ -197,7 +198,9 @@ public String quoteColumnName(String columnName) { case KingbaseES: case Oscar: break; - case GaussDB: + case Qcubic: + break; + case QcubicShard: break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type"); @@ -225,7 +228,9 @@ public String quoteTableName(String tableName) { break; case Oscar: break; - case GaussDB: + case Qcubic: + break; + case QcubicShard: break; default: throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE, "unsupported database type"); diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java index 7b84c32088..91fe61c600 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/CommonRdbmsWriter.java @@ -12,7 +12,6 @@ import com.alibaba.datax.plugin.rdbms.util.RdbmsException; import com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil; import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.slf4j.Logger; @@ -200,9 +199,6 @@ public static class Task { protected boolean emptyAsNull; protected Triple, List, List> resultSetMetaData; - private int dumpRecordLimit = Constant.DEFAULT_DUMP_RECORD_LIMIT; - private AtomicLong dumpRecordCount = new AtomicLong(0); - public Task(DataBaseType dataBaseType) { this.dataBaseType = dataBaseType; } @@ -213,7 +209,7 @@ public void init(Configuration writerSliceConfig) { this.jdbcUrl = writerSliceConfig.getString(Key.JDBC_URL); //ob10的处理 - if (this.jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING)) { + if (this.jdbcUrl.startsWith(Constant.OB10_SPLIT_STRING) && this.dataBaseType == DataBaseType.MySql) { String[] ss = this.jdbcUrl.split(Constant.OB10_SPLIT_STRING_PATTERN); if (ss.length != 3) { throw DataXException @@ -372,11 +368,7 @@ protected void doBatchInsert(Connection connection, List buffer) } } - public boolean needToDumpRecord() { - return dumpRecordCount.incrementAndGet() <= dumpRecordLimit; - } - - public void doOneInsert(Connection connection, List buffer) { + protected void doOneInsert(Connection connection, List buffer) { PreparedStatement preparedStatement = null; try { connection.setAutoCommit(true); @@ -389,10 +381,7 @@ public void doOneInsert(Connection connection, List buffer) { preparedStatement, record); preparedStatement.execute(); } catch (SQLException e) { - if (needToDumpRecord()) { - LOG.warn("ERROR : record {}", record); - LOG.warn("Insert fatal error SqlState ={}, errorCode = {}, {}", e.getSQLState(), e.getErrorCode(), e); - } + LOG.debug(e.toString()); this.taskPluginCollector.collectDirtyRecord(record, e); } finally { @@ -423,6 +412,7 @@ protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStat protected PreparedStatement fillPreparedStatementColumnType(PreparedStatement preparedStatement, int columnIndex, int columnSqltype, String typeName, Column column) throws SQLException { java.util.Date utilDate; + switch (columnSqltype) { case Types.CHAR: case Types.NCHAR: diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java index 9510fd14ef..0e4692e2c8 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/Constant.java @@ -19,5 +19,4 @@ public final class Constant { public static final String OB10_SPLIT_STRING = "||_dsc_ob10_dsc_||"; public static final String OB10_SPLIT_STRING_PATTERN = "\\|\\|_dsc_ob10_dsc_\\|\\|"; - public static final int DEFAULT_DUMP_RECORD_LIMIT = 10; } diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java index 6bfc1bb9e2..c86bd20650 100644 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/ColumnEntry.java @@ -1,11 +1,11 @@ package com.alibaba.datax.plugin.unstructuredstorage.reader; -import com.alibaba.fastjson2.JSON; -import org.apache.commons.lang3.StringUtils; - import java.text.DateFormat; import java.text.SimpleDateFormat; +import org.apache.commons.lang3.StringUtils; + +import com.alibaba.fastjson2.JSON; public class ColumnEntry { private Integer index; @@ -13,15 +13,6 @@ public class ColumnEntry { private String value; private String format; private DateFormat dateParse; - private String name; - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } public Integer getIndex() { return index; diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java index 0945779b17..71e13ad244 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/Key.java @@ -87,7 +87,4 @@ public class Key { public static final String TAR_FILE_FILTER_PATTERN = "tarFileFilterPattern"; public static final String ENABLE_INNER_SPLIT = "enableInnerSplit"; - public static final String HIVE_PARTION_COLUMN = "hivePartitionColumn"; - - } diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java index 27f4c48ac4..afcad85132 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/reader/UnstructuredStorageReaderUtil.java @@ -715,70 +715,4 @@ public static void setSourceFileName(Configuration configuration, List s public static void setSourceFile(Configuration configuration, List sourceFiles){ configuration.set(Constant.SOURCE_FILE, sourceFiles); } - - public static ArrayList getHivePartitionColumns(String filePath, List hivePartitionColumnEntrys) { - ArrayList hivePartitionColumns = new ArrayList<>(); - - if (null == hivePartitionColumnEntrys) { - return hivePartitionColumns; - } - - // 对于分区列pt,则从path中找/pt=xxx/,xxx即分区列的值,另外确认在path中只有一次出现 - - for (ColumnEntry columnEntry : hivePartitionColumnEntrys) { - String parColName = columnEntry.getValue(); - String patten = String.format("/%s=", parColName); - int index = filePath.indexOf(patten); - if (index != filePath.lastIndexOf(patten)) { - throw new DataXException(String.format("Found multiple partition folder in filePath %s, partition: %s", filePath, parColName)); - } - - String subPath = filePath.substring(index + 1); - int firstSeparatorIndex = subPath.indexOf(File.separator); - if (firstSeparatorIndex > 0) { - subPath = subPath.substring(0, firstSeparatorIndex); - } - - if (subPath.split("=").length != 2) { - throw new DataXException(String.format("Found partition column value in filePath %s failed, partition: %s", filePath, parColName)); - } - String parColVal = subPath.split("=")[1]; - - String colType = columnEntry.getType().toUpperCase(); - Type type = Type.valueOf(colType); - - Column generateColumn; - switch (type) { - case STRING: - generateColumn = new StringColumn(parColVal); - break; - - case DOUBLE: - generateColumn = new DoubleColumn(parColVal); - break; - - case LONG: - generateColumn = new LongColumn(parColVal); - break; - - case BOOLEAN: - generateColumn = new BoolColumn(parColVal); - break; - - case DATE: - generateColumn = new DateColumn(new StringColumn(parColVal.toString()).asDate()); - break; - - default: - String errorMessage = String.format("The column type you configured is not currently supported: %s", parColVal); - LOG.error(errorMessage); - throw DataXException.asDataXException(UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE, errorMessage); - } - - hivePartitionColumns.add(generateColumn); - } - - return hivePartitionColumns; - } - } diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java index a485c1249b..092fbfd7c8 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Constant.java @@ -12,13 +12,9 @@ public class Constant { public static final String FILE_FORMAT_TEXT = "text"; - public static final String FILE_FORMAT_SQL = "sql"; - //每个分块10MB,最大10000个分块, MAX_FILE_SIZE 单位: MB public static final Long MAX_FILE_SIZE = 10 * 10000L; - public static final int DEFAULT_COMMIT_SIZE = 2000; - public static final String DEFAULT_SUFFIX = ""; public static final String TRUNCATE = "truncate"; diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java index ee97abd86d..125957f189 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/Key.java @@ -5,16 +5,12 @@ public class Key { // must have public static final String FILE_NAME = "fileName"; - public static final String TABLE_NAME = "table"; - // must have public static final String WRITE_MODE = "writeMode"; // not must , not default , public static final String FIELD_DELIMITER = "fieldDelimiter"; - public static final String QUOTE_CHARACTER = "quoteChar"; - // not must , default os's line delimiter public static final String LINE_DELIMITER = "lineDelimiter"; @@ -42,8 +38,6 @@ public class Key { // writer maxFileSize public static final String MAX_FILE_SIZE = "maxFileSize"; - - public static final String COMMIT_SIZE = "commitSize"; // writer file type suffix, like .txt .csv public static final String SUFFIX = "suffix"; diff --git a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java index e74e5698ed..e9040662ab 100755 --- a/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java +++ b/plugin-unstructured-storage-util/src/main/java/com/alibaba/datax/plugin/unstructuredstorage/writer/UnstructuredStorageWriterUtil.java @@ -10,10 +10,7 @@ import java.util.UUID; import com.alibaba.datax.common.element.BytesColumn; - -import com.google.common.base.Preconditions; import org.apache.commons.codec.binary.Base64; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.compress.compressors.CompressorOutputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; @@ -93,8 +90,7 @@ public static void validateParameter(Configuration writerConfiguration) { writerConfiguration.set(Key.FILE_FORMAT, fileFormat); } if (!Constant.FILE_FORMAT_CSV.equals(fileFormat) - && !Constant.FILE_FORMAT_TEXT.equals(fileFormat) - && !Constant.FILE_FORMAT_SQL.equals(fileFormat)) { + && !Constant.FILE_FORMAT_TEXT.equals(fileFormat)) { throw DataXException.asDataXException( UnstructuredStorageWriterErrorCode.ILLEGAL_VALUE, String.format("unsupported fileFormat %s ", fileFormat)); } @@ -236,31 +232,22 @@ private static void doWriteToStream(RecordReceiver lineReceiver, // warn: default false String fileFormat = config.getString(Key.FILE_FORMAT, Constant.FILE_FORMAT_TEXT); - boolean isSqlFormat = Constant.FILE_FORMAT_SQL.equalsIgnoreCase(fileFormat); - int commitSize = config.getInt(Key.COMMIT_SIZE, Constant.DEFAULT_COMMIT_SIZE); + UnstructuredWriter unstructuredWriter = produceUnstructuredWriter(fileFormat, config, writer); List headers = config.getList(Key.HEADER, String.class); - if (null != headers && !headers.isEmpty() && !isSqlFormat) { + if (null != headers && !headers.isEmpty()) { unstructuredWriter.writeOneRecord(headers); } Record record = null; - int receivedCount = 0; String byteEncoding = config.getString(Key.BYTE_ENCODING); while ((record = lineReceiver.getFromReader()) != null) { UnstructuredStorageWriterUtil.transportOneRecord(record, nullFormat, dateParse, taskPluginCollector, unstructuredWriter, byteEncoding); - receivedCount++; - if (isSqlFormat && receivedCount % commitSize == 0) { - ((SqlWriter) unstructuredWriter).appendCommit(); - } } - if (isSqlFormat) { - ((SqlWriter)unstructuredWriter).appendCommit(); - } // warn:由调用方控制流的关闭 // IOUtils.closeQuietly(unstructuredWriter); } @@ -275,16 +262,6 @@ public static UnstructuredWriter produceUnstructuredWriter(String fileFormat, Co String fieldDelimiter = config.getString(Key.FIELD_DELIMITER, String.valueOf(Constant.DEFAULT_FIELD_DELIMITER)); unstructuredWriter = TextCsvWriterManager.produceTextWriter(writer, fieldDelimiter, config); - } else if (StringUtils.equalsIgnoreCase(fileFormat, Constant.FILE_FORMAT_SQL)) { - String tableName = config.getString(Key.TABLE_NAME); - Preconditions.checkArgument(StringUtils.isNotEmpty(tableName), "table name is empty"); - String quoteChar = config.getString(Key.QUOTE_CHARACTER); - Preconditions.checkArgument(StringUtils.isNotEmpty(quoteChar), "quote character is empty"); - String lineSeparator = config.getString(Key.LINE_DELIMITER, IOUtils.LINE_SEPARATOR); - List headers = config.getList(Key.HEADER, String.class); - Preconditions.checkArgument(CollectionUtils.isNotEmpty(headers), "column names are empty"); - String nullFormat = config.getString(Key.NULL_FORMAT, Constant.DEFAULT_NULL_FORMAT); - unstructuredWriter = new SqlWriter(writer, quoteChar, tableName, lineSeparator, headers, nullFormat); } return unstructuredWriter; diff --git a/pom.xml b/pom.xml index c7f43f1725..dc9052563f 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,6 @@ oraclereader cassandrareader oceanbasev10reader - obhbasereader rdbmsreader odpsreader @@ -66,12 +65,12 @@ hbase11xsqlreader hbase20xsqlreader - ossreader - hdfsreader + + ftpreader txtfilereader streamreader - clickhousereader + mongodbreader tdenginereader @@ -81,8 +80,9 @@ loghubreader datahubreader starrocksreader - sybasereader - dorisreader + + qcubicreader + mysqlwriter starrockswriter @@ -94,14 +94,13 @@ kingbaseeswriter adswriter oceanbasev10writer - obhbasewriter adbpgwriter hologresjdbcwriter rdbmswriter odpswriter - osswriter + otswriter hbase11xwriter hbase094xwriter @@ -109,7 +108,7 @@ hbase20xsqlwriter kuduwriter ftpwriter - hdfswriter + txtfilewriter streamwriter @@ -123,19 +122,18 @@ loghubwriter datahubwriter cassandrawriter - clickhousewriter + doriswriter selectdbwriter adbmysqlwriter - sybasewriter neo4jwriter + + + plugin-rdbms-util plugin-unstructured-storage-util - gaussdbreader - gaussdbwriter - datax-example - + diff --git a/qcubicreader/pom.xml b/qcubicreader/pom.xml new file mode 100644 index 0000000000..3a20fb58f5 --- /dev/null +++ b/qcubicreader/pom.xml @@ -0,0 +1,88 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + qcubicreader + qcubicreader + jar + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + com.alibaba.datax + plugin-rdbms-util + ${datax-project-version} + + + + com.qcubic + qcubic + 5.3 + system + ${basedir}/src/main/libs/Qcubic.jar + + + + + + + + + maven-compiler-plugin + + ${jdk-version} + ${jdk-version} + ${project-sourceEncoding} + + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + \ No newline at end of file diff --git a/qcubicreader/src/main/assembly/package.xml b/qcubicreader/src/main/assembly/package.xml new file mode 100644 index 0000000000..dd986f6e88 --- /dev/null +++ b/qcubicreader/src/main/assembly/package.xml @@ -0,0 +1,42 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/qcubicreader + + + target/ + + qcubicreader-0.0.1-SNAPSHOT.jar + + plugin/reader/qcubicreader + + + src/main/libs + + *.* + + plugin/reader/qcubicreader/libs + + + + + + false + plugin/reader/qcubicreader/libs + runtime + + + diff --git a/qcubicreader/src/main/java/com/alibaba/datax/plugin/reader/qcubicreader/Constant.java b/qcubicreader/src/main/java/com/alibaba/datax/plugin/reader/qcubicreader/Constant.java new file mode 100644 index 0000000000..0442ea11ba --- /dev/null +++ b/qcubicreader/src/main/java/com/alibaba/datax/plugin/reader/qcubicreader/Constant.java @@ -0,0 +1,7 @@ +package com.alibaba.datax.plugin.reader.qcubicreader; + +public class Constant { + + public static final int DEFAULT_FETCH_SIZE = 1000; + +} diff --git a/qcubicreader/src/main/java/com/alibaba/datax/plugin/reader/qcubicreader/QcubicReader.java b/qcubicreader/src/main/java/com/alibaba/datax/plugin/reader/qcubicreader/QcubicReader.java new file mode 100644 index 0000000000..d75af5e33d --- /dev/null +++ b/qcubicreader/src/main/java/com/alibaba/datax/plugin/reader/qcubicreader/QcubicReader.java @@ -0,0 +1,87 @@ +package com.alibaba.datax.plugin.reader.qcubicreader; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader; +import com.alibaba.datax.plugin.rdbms.reader.Constant; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; + +import java.util.List; + +public class QcubicReader extends Reader { + + private static final DataBaseType DATABASE_TYPE = DataBaseType.Qcubic; + + public static class Job extends Reader.Job { + + private Configuration originalConfig; + private CommonRdbmsReader.Job commonRdbmsReaderMaster; + + @Override + public void init() { + this.originalConfig = super.getPluginJobConf(); + int fetchSize = this.originalConfig.getInt(Constant.FETCH_SIZE, + com.alibaba.datax.plugin.reader.qcubicreader.Constant.DEFAULT_FETCH_SIZE); + if (fetchSize < 1) { + throw DataXException.asDataXException(DBUtilErrorCode.REQUIRED_VALUE, + String.format("您配置的fetchSize有误,根据DataX的设计,fetchSize : [%d] 设置值不能小于 1.", fetchSize)); + } + this.originalConfig.set(Constant.FETCH_SIZE, fetchSize); + + this.commonRdbmsReaderMaster = new CommonRdbmsReader.Job(DATABASE_TYPE); + this.commonRdbmsReaderMaster.init(this.originalConfig); + } + + @Override + public List split(int adviceNumber) { + return this.commonRdbmsReaderMaster.split(this.originalConfig, adviceNumber); + } + + @Override + public void post() { + this.commonRdbmsReaderMaster.post(this.originalConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderMaster.destroy(this.originalConfig); + } + + } + + public static class Task extends Reader.Task { + + private Configuration readerSliceConfig; + private CommonRdbmsReader.Task commonRdbmsReaderSlave; + + @Override + public void init() { + this.readerSliceConfig = super.getPluginJobConf(); + this.commonRdbmsReaderSlave = new CommonRdbmsReader.Task(DATABASE_TYPE, super.getTaskGroupId(), super.getTaskId()); + this.commonRdbmsReaderSlave.init(this.readerSliceConfig); + } + + @Override + public void startRead(RecordSender recordSender) { + int fetchSize = this.readerSliceConfig.getInt(Constant.FETCH_SIZE); + + this.commonRdbmsReaderSlave.startRead(this.readerSliceConfig, recordSender, + super.getTaskPluginCollector(), fetchSize); + } + + @Override + public void post() { + this.commonRdbmsReaderSlave.post(this.readerSliceConfig); + } + + @Override + public void destroy() { + this.commonRdbmsReaderSlave.destroy(this.readerSliceConfig); + } + + } + +} diff --git a/qcubicreader/src/main/resources/plugin.json b/qcubicreader/src/main/resources/plugin.json new file mode 100644 index 0000000000..8685249253 --- /dev/null +++ b/qcubicreader/src/main/resources/plugin.json @@ -0,0 +1,6 @@ +{ + "name": "qcubicreader", + "class": "com.alibaba.datax.plugin.reader.qcubicreader.QcubicReader", + "description": "useScene: prod. mechanism: Jdbc connection using the database, execute select sql, retrieve data from the ResultSet. warn: The more you know about the database, the less problems you encounter.", + "developer": "alibaba" +} \ No newline at end of file diff --git a/qcubicreader/src/main/resources/plugin_job_template.json b/qcubicreader/src/main/resources/plugin_job_template.json new file mode 100644 index 0000000000..052dbd58ce --- /dev/null +++ b/qcubicreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "qcubicreader", + "parameter": { + "username": "", + "password": "", + "connection": [ + { + "jdbcUrl": [], + "table": [] + } + ] + } +} \ No newline at end of file diff --git a/transformer/doc/transformer.md b/transformer/doc/transformer.md index a9da83a4ed..0a00dbaa9c 100644 --- a/transformer/doc/transformer.md +++ b/transformer/doc/transformer.md @@ -47,7 +47,7 @@ dx_replace(1,"5","10","****") column 1的value为“dataxTest”=>"datax****" 4. dx_filter (关联filter暂不支持,即多个字段的联合判断,函参太过复杂,用户难以使用。) * 参数: * 第一个参数:字段编号,对应record中第几个字段。 - * 第二个参数:运算符,支持以下运算符:like, not like, >, =, <, >=, !=, <= + * 第二个参数:运算符,支持一下运算符:like, not like, >, =, <, >=, !=, <= * 第三个参数:正则表达式(java正则表达式)、值。 * 返回: * 如果匹配正则表达式,返回Null,表示过滤该行。不匹配表达式时,表示保留该行。(注意是该行)。对于>=<都是对字段直接compare的结果. @@ -145,11 +145,11 @@ String code3 = "Column column = record.getColumn(1);\n" + "type": "string" }, { - "value": 1724154616370, + "value": 19890604, "type": "long" }, { - "value": "2024-01-01 00:00:00", + "value": "1989-06-04 00:00:00", "type": "date" }, { @@ -157,11 +157,11 @@ String code3 = "Column column = record.getColumn(1);\n" + "type": "bool" }, { - "value": "TestRawData", + "value": "test", "type": "bytes" } ], - "sliceRecordCount": 100 + "sliceRecordCount": 100000 } }, "writer": { @@ -174,44 +174,38 @@ String code3 = "Column column = record.getColumn(1);\n" + "transformer": [ { "name": "dx_substr", - "parameter": { - "columnIndex": 5, - "paras": [ - "1", - "3" - ] - } + "parameter": + { + "columnIndex":5, + "paras":["1","3"] + } }, { "name": "dx_replace", - "parameter": { - "columnIndex": 4, - "paras": [ - "3", - "4", - "****" - ] - } + "parameter": + { + "columnIndex":4, + "paras":["3","4","****"] + } }, { "name": "dx_digest", - "parameter": { - "columnIndex": 3, - "paras": [ - "md5", - "toLowerCase" - ] - } + "parameter": + { + "columnIndex":3, + "paras":["md5", "toLowerCase"] + } }, { "name": "dx_groovy", - "parameter": { - "code": "//groovy code//", - "extraPackage": [ - "import somePackage1;", - "import somePackage2;" - ] - } + "parameter": + { + "code": "//groovy code//", + "extraPackage":[ + "import somePackage1;", + "import somePackage2;" + ] + } } ] } diff --git a/userGuid.md b/userGuid.md index badb1b4e75..ff3f93b3c7 100644 --- a/userGuid.md +++ b/userGuid.md @@ -17,7 +17,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 * 工具部署 - * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz) + * 方法一、直接下载DataX工具包:[DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz) 下载后解压至本地某个目录,进入bin目录,即可运行同步作业: