网站首页 > 技术文章 正文
下载地址
https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz
生成简易模板
-- 查看配置模板
python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
-- 保存模板
python datax.py -r {YOUR_READER} -w {YOUR_WRITER} > YOUR_WRITER.JSON
启动 DataX
cd {YOUR_DATAX_DIR_BIN}
python datax.py ./stream2stream.json
DataX 传参
在JSON配置文件中使用${param}引用参数,在提交任务时使用 **-p "-Dparam=value"** 传入参数值;
-- 执行任务
python bin/datax.py ./job/base_province_2.json -p "-Ddt=2020-06-14"
-- JSON 文件
{
"job":{
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://server15:3306/gmall2022"
],
"querySql":[
"select id,name from base_province where id>=3"
]
}
],
"password":"root",
"username":"root"
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"column":[
{
"name":"id",
"type":"bigint"
},
{
"name":"name",
"type":"string"
}
],
"compress":"gzip",
"defaultFS":"hdfs://server16:8020",
"fieldDelimiter":"\t",
"fileName":"base_province",
"fileType":"text",
"path":"/base_province/${dt}",
"writeMode":"append"
}
}
}
],
"setting":{
"speed":{
"channel":"1"
}
}
}
}
优化
参数 | 说明 |
job.setting.speed.channel | 总并发数 |
job.setting.speed.record | 总record限速 |
job.setting.speed.byte | 总byte限速 |
core.transport.channel.speed.record | 单个channel的record限速,默认值为10000(10000条/s) |
core.transport.channel.speed.byte | 单个channel的byte限速,默认值1024*1024(1M/s) |
注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
计算公式为:
min(总byte限速/单个channle的byte限速,总record限速/单个channel的record限速)
示例 :
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 1048576 //单个channel byte限速1M/s
}
}
}
},
"job": {
"setting": {
"speed": {
"byte" : 5242880 //总byte限速5M/s
}
},
...
}
}
当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。
调整JVM xms xmx参数的两种方式:
一种是直接更改datax.py脚本;
另一种是在启动的时候,加上对应的参数,如下:
python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json
平台集成
1. 使用Datax API进行集成
Datax提供了丰富的API接口,可以通过调用这些接口实现与云平台的集成。例如,可以使用POST方法调用Datax的startJob接口,启动一个数据同步任务。
import com.aliyun.datax.client.api.DataxClient;
import com.aliyun.datax.client.config.JobConfig;
import com.aliyun.datax.client.exception.DataXException;
public class DataxRunner {
public static void main(String[] args) throws Exception {
// 创建Datax客户端对象
DataxClient client = new DataxClient("http://your_datax_host:your_datax_port");
// 创建作业配置对象
JobConfig jobConfig = new JobConfig();
jobConfig.setProjectName("your_project_name");
jobConfig.setJobName("your_job_name");
// 启动作业
client.startJob(jobConfig);
}
}
2.使用Datax SDK进行集成
import com.aliyun.datax.client.dataxclient.api.IDataxClient;
import com.aliyun.datax.client.dataxclient.entity.JobRequest;
import com.aliyun.datax.client.dataxclient.entity.JobResponse;
import com.aliyun.datax.client.dataxclient.impl.DefaultDataxClient;
public class DataxRunner {
public static void main(String[] args) throws Exception {
// 创建Datax客户端对象
IDataxClient client = new DefaultDataxClient("http://your_datax_host:your_datax_port");
// 创建作业请求对象
JobRequest jobRequest = new JobRequest();
jobRequest.setProjectName("your_project_name");
jobRequest.setJobName("your_job_name");
// 启动作业
JobResponse jobResponse = client.startJob(jobRequest);
System.out.println("Job status: " + jobResponse.getStatus());
System.out.println("Job id: " + jobResponse.getId());
}
}
DataX-WEB 安装部署
1.下载
-- 源码
https://github.com/WeiYe-Jing/datax-web
-- 部署文档
https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/datax-web-deploy.md
2.执行一键安装脚本
进入解压后的目录,找到bin目录下面的install.sh文件,如果选择交互式的安装,则直接执行
./bin/install.sh
3.数据库初始化(MySQL与 DataX-WEB不在同一台服务器上)
1. 登录 msyql 建库
mysql> create database dataxweb;
2. 初始化脚本
mysql> {INSTALL_PATH}/bin/db/datax-web.sql
3. 修改 datax-web 连接 MySQL 配置
vim {INSTALL_PATH}/modules/datax-admin/conf/bootstrap.properties
#Database
#DB_HOST=
#DB_PORT=
#DB_USERNAME=
#DB_PASSWORD=
#DB_DATABASE=
- 启动服务
一键启动所有服务
./bin/start-all.sh
单一地启动某一模块服务:
./bin/start.sh -m {module_name}
一键取消所有服务
./bin/stop-all.sh
单一地停止某一模块服务:
./bin/stop.sh -m {module_name}
- 查看服务
在Linux环境下使用JPS命令,查看是否出现 DataXAdminApplication 和 DataXExecutorApplication 进程,如果存在这表示项目运行成功
如果项目启动失败,请检查启动日志:modules/datax-admin/bin/console.out 或者 modules/datax-executor/bin/console.out
- 登录
在浏览器中输入 http://ip:port/index.html 就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为为datax-admin 指定的运行端口)
http://ip:9527/index.html
输入用户名 admin 密码 123456 就可以直接访问系统
- 配置
1. 在项目目录: /modules/datax-admin/bin/env.properties
配置邮件服务
MAIL_USERNAME=""
MAIL_PASSWORD=""
访问端口
SERVER_PORT=9527
2. 在项目目录下/modules/datax-execute/bin/env.properties
指定 PYTHON_PATH 的路径(datax.py 绝对路径)
PYTHON_PATH=/opt/datax/bin/datax.py
datax json 文件存放位置
JSON_PATH=${BIN}/../json
保持和datax-admin端口一致;默认是9527,如果没改datax-admin的端口,可以忽略
DATAX_ADMIN_PORT=
- 运行日志
部署完成之后,在modules/对应的项目/data/applogs下(用户也可以自己指定日志,修改application.yml 中的logpath地址即可),用户可以根据此日志跟踪项目实际启动情况
如果执行器启动比admin快,执行器会连接失败,日志报"拒绝连接"的错误,一般是先启动admin,再启动executor,30秒之后会重连,如果成功请忽略这个异常。
Shell 脚本
#!/bin/bash
# 设置需要同步的哪一天的数据
# 一般情况下,当天产生的数据,会在第二天的凌晨增量导入到数据仓库中
dt=`date -d yesterday +"%Y-%m-%d"`
# 提取时间分量
year=${dt:0:4}
month=${dt:5:2}
# 设置 Datax的路径
DATAX_HOME=/opt/datax
# 设置jobs的路径
JOBS HOME=/opt/datax/jobs
# 增量导入数据
python3 $DATAX_HOME/bin/datax.py -p"-Ddate=$dt" $JOBS_HOME/incr-xxx.json
# 增量导入订单数据
# 1.检查Hive表中指定的分区是否存在,如果不存在,需要手动创建分区
if [ ! hive -e "show partitions datax_shop.orders partition(year='$year', month='$month')" ]; then
hive -e "alter table datax shop.orders add partition(year='$vear', month='$month')"
fi
# 2.执行增量入订单数据
python3 DATAX_HOME/bin/datax,py -p "-Ddate=Sdt -Dyear=Syear -Dmonth=Smonth" SJOBS-HOME/incr-orders.json
mysqlreader 插件任务切分
问题
1. DataX读取的字段比job中column的字段个数多或者少,或者一行数据转换为多行数据
问题描述:
DataX是将reader读取到的数据,默认是CSV格式,分隔符是\t,单行数据分割符是\n\r,所以当读取到的字段中包含\t时,会将一列数据转换为两列,包含\n\r时,会将一行数据转换为两行。导致writer时,报Reason: actual column number is more than schema column number.actual number: 21异常
处理方式:
1、单独处理制表符
REPLACE(columnName, ' ', '')
2、处理字段中包含制表符、回车符和换行符
REPLACE(REPLACE(REPLACE(columnName, chr(10),' '), chr(13), ''), chr(9),' ')
在 job.content.reader.parameter.connection.querySql 中通过 select SQL 语句替换;
猜你喜欢
- 2024-10-28 美团面试:熟悉哪些JVM调优参数,幸好我准备过
- 2024-10-28 Java虚拟机:Jvm概念和原理详解以及GC机制的分析
- 2024-10-28 JVM 的内存模型(jvm1.8内存模型)
- 2024-10-28 JDK、JRE、JVM,是什么关系?(jdk jrejvm的区别)
- 2024-10-28 一个 JVM 参数引发的频繁 CMS GC(当产生一个异常时,jvm会做什么)
- 2024-10-28 JVM系列一:JVM内存组成及分配(jvm的内存分配)
- 2024-10-28 谈谈JMM与JVM的相关知识(jmm jvm juc)
- 2024-10-28 常见的JVM参数配置(常见的JVM参数配置有哪些)
- 2024-10-28 一份详细介绍JVM的资料(对比JDK8和JDK7)
- 2024-10-28 理解JVM运行时数据区域,看这一篇文章就够了
- 11-26Win7\8\10下一条cmd命令可查得笔记本电脑连接过的Wifi密码
- 11-26一文搞懂MySQL行锁、表锁、间隙锁详解
- 11-26电脑的wifi密码忘记了?一招教你如何找回密码,简单明了,快收藏
- 11-26代码解决忘记密码问题 教你用CMD命令查看所有连接过的WIFI密码
- 11-26CMD命令提示符能干嘛?这些功能你都知道吗?
- 11-26性能测试之慢sql分析
- 11-26论渗透信息收集的重要性
- 11-26如何查看电脑连接过的所有WiFi密码
- 最近发表
- 标签列表
-
- cmd/c (57)
- c++中::是什么意思 (57)
- sqlset (59)
- ps可以打开pdf格式吗 (58)
- phprequire_once (61)
- localstorage.removeitem (74)
- routermode (59)
- vector线程安全吗 (70)
- & (66)
- java (73)
- org.redisson (64)
- log.warn (60)
- cannotinstantiatethetype (62)
- js数组插入 (83)
- resttemplateokhttp (59)
- gormwherein (64)
- linux删除一个文件夹 (65)
- mac安装java (72)
- reader.onload (61)
- outofmemoryerror是什么意思 (64)
- flask文件上传 (63)
- eacces (67)
- 查看mysql是否启动 (70)
- java是值传递还是引用传递 (58)
- 无效的列索引 (74)