优秀的编程知识分享平台

网站首页 > 技术文章 正文

datax 学习一(dataxweb)

nanyue 2024-10-28 16:41:37 技术文章 4 ℃

下载地址

https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202309/datax.tar.gz

生成简易模板

-- 查看配置模板
python datax.py -r {YOUR_READER} -w {YOUR_WRITER}
-- 保存模板
python datax.py -r {YOUR_READER} -w {YOUR_WRITER} > YOUR_WRITER.JSON

启动 DataX

 cd {YOUR_DATAX_DIR_BIN}
 python datax.py ./stream2stream.json 

DataX 传参

在JSON配置文件中使用${param}引用参数,在提交任务时使用 **-p "-Dparam=value"** 传入参数值;

-- 执行任务
python bin/datax.py ./job/base_province_2.json -p "-Ddt=2020-06-14"

-- JSON 文件
{
    "job":{
        "content":[
            {
                "reader":{
                    "name":"mysqlreader",
                    "parameter":{
                        "connection":[
                            {
                                "jdbcUrl":[
                                    "jdbc:mysql://server15:3306/gmall2022"
                                ],
                                "querySql":[
                                    "select id,name from base_province where id>=3"
                                ]
                            }
                        ],
                        "password":"root",
                        "username":"root"
                    }
                },
                "writer":{
                    "name":"hdfswriter",
                    "parameter":{
                        "column":[
                            {
                                "name":"id",
                                "type":"bigint"
                            },
                            {
                                "name":"name",
                                "type":"string"
                            }
                        ],
                        "compress":"gzip",
                        "defaultFS":"hdfs://server16:8020",
                        "fieldDelimiter":"\t",
                        "fileName":"base_province",
                        "fileType":"text",
                        "path":"/base_province/${dt}",
                        "writeMode":"append"
                    }
                }
            }
        ],
        "setting":{
            "speed":{
                "channel":"1"
            }
        }
    }
}

优化

参数

说明

job.setting.speed.channel

总并发数

job.setting.speed.record

总record限速

job.setting.speed.byte

总byte限速

core.transport.channel.speed.record

单个channel的record限速,默认值为10000(10000条/s)

core.transport.channel.speed.byte

单个channel的byte限速,默认值1024*1024(1M/s)

注意事项:

    1.若配置了总record限速,则必须配置单个channel的record限速
    2.若配置了总byte限速,则必须配置单个channe的byte限速
    3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,实际channel并发数是通过计算得到的:
    	
    		计算公式为:
            min(总byte限速/单个channle的byte限速,总record限速/单个channel的record限速)

示例 :
{
    "core": {
        "transport": {
            "channel": {
                "speed": {
                    "byte": 1048576 //单个channel byte限速1M/s
                }
            }
        }
    },
    "job": {
        "setting": {
            "speed": {
                "byte" : 5242880 //总byte限速5M/s
            }
        },
        ...
    }
}

当提升DataX Job内Channel并发数时,内存的占用会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。

建议将内存设置为4G或者8G,这个也可以根据实际情况来调整。

调整JVM xms xmx参数的两种方式:
        一种是直接更改datax.py脚本;
        另一种是在启动的时候,加上对应的参数,如下:
								python datax/bin/datax.py --jvm="-Xms8G -Xmx8G" /path/to/your/job.json

平台集成

1. 使用Datax API进行集成

Datax提供了丰富的API接口,可以通过调用这些接口实现与云平台的集成。例如,可以使用POST方法调用Datax的startJob接口,启动一个数据同步任务。

import com.aliyun.datax.client.api.DataxClient;
import com.aliyun.datax.client.config.JobConfig;
import com.aliyun.datax.client.exception.DataXException;
public class DataxRunner {
    public static void main(String[] args) throws Exception {
        // 创建Datax客户端对象
        DataxClient client = new DataxClient("http://your_datax_host:your_datax_port");
        // 创建作业配置对象
        JobConfig jobConfig = new JobConfig();
        jobConfig.setProjectName("your_project_name");
        jobConfig.setJobName("your_job_name");
        // 启动作业
        client.startJob(jobConfig);
    }
}

2.使用Datax SDK进行集成

import com.aliyun.datax.client.dataxclient.api.IDataxClient;
import com.aliyun.datax.client.dataxclient.entity.JobRequest;
import com.aliyun.datax.client.dataxclient.entity.JobResponse;
import com.aliyun.datax.client.dataxclient.impl.DefaultDataxClient;
public class DataxRunner {
    public static void main(String[] args) throws Exception {
        // 创建Datax客户端对象
        IDataxClient client = new DefaultDataxClient("http://your_datax_host:your_datax_port");
        // 创建作业请求对象
        JobRequest jobRequest = new JobRequest();
        jobRequest.setProjectName("your_project_name");
        jobRequest.setJobName("your_job_name");
        // 启动作业
        JobResponse jobResponse = client.startJob(jobRequest);
        System.out.println("Job status: " + jobResponse.getStatus());
        System.out.println("Job id: " + jobResponse.getId());
    }
}

DataX-WEB 安装部署

1.下载

-- 源码
https://github.com/WeiYe-Jing/datax-web
--  部署文档
https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/datax-web-deploy.md

2.执行一键安装脚本

进入解压后的目录,找到bin目录下面的install.sh文件,如果选择交互式的安装,则直接执行

./bin/install.sh

3.数据库初始化(MySQL与 DataX-WEB不在同一台服务器上)

1. 登录 msyql 建库
     mysql> create database dataxweb;

2. 初始化脚本
    mysql> {INSTALL_PATH}/bin/db/datax-web.sql

3. 修改 datax-web 连接 MySQL 配置
    vim  {INSTALL_PATH}/modules/datax-admin/conf/bootstrap.properties
        #Database
        #DB_HOST=
        #DB_PORT=
        #DB_USERNAME=
        #DB_PASSWORD=
        #DB_DATABASE=
  1. 启动服务
一键启动所有服务
     ./bin/start-all.sh
单一地启动某一模块服务:
     ./bin/start.sh -m {module_name}
一键取消所有服务
     ./bin/stop-all.sh
单一地停止某一模块服务:
     ./bin/stop.sh -m {module_name}
  1. 查看服务
在Linux环境下使用JPS命令,查看是否出现 DataXAdminApplication 和 DataXExecutorApplication 进程,如果存在这表示项目运行成功

如果项目启动失败,请检查启动日志:modules/datax-admin/bin/console.out 或者 modules/datax-executor/bin/console.out
  1. 登录
在浏览器中输入 http://ip:port/index.html 就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为为datax-admin 指定的运行端口)

http://ip:9527/index.html
输入用户名 admin 密码 123456 就可以直接访问系统
  1. 配置
1. 在项目目录: /modules/datax-admin/bin/env.properties 
  配置邮件服务
     MAIL_USERNAME=""
     MAIL_PASSWORD=""
	访问端口
     SERVER_PORT=9527
2. 在项目目录下/modules/datax-execute/bin/env.properties 
	指定 PYTHON_PATH 的路径(datax.py 绝对路径)
     PYTHON_PATH=/opt/datax/bin/datax.py
   datax json 文件存放位置
     JSON_PATH=${BIN}/../json
   保持和datax-admin端口一致;默认是9527,如果没改datax-admin的端口,可以忽略
     DATAX_ADMIN_PORT=
  1. 运行日志
 部署完成之后,在modules/对应的项目/data/applogs下(用户也可以自己指定日志,修改application.yml 中的logpath地址即可),用户可以根据此日志跟踪项目实际启动情况

 如果执行器启动比admin快,执行器会连接失败,日志报"拒绝连接"的错误,一般是先启动admin,再启动executor,30秒之后会重连,如果成功请忽略这个异常。

Shell 脚本

#!/bin/bash
# 设置需要同步的哪一天的数据
# 一般情况下,当天产生的数据,会在第二天的凌晨增量导入到数据仓库中
dt=`date -d yesterday +"%Y-%m-%d"`
# 提取时间分量
year=${dt:0:4}
month=${dt:5:2}
# 设置 Datax的路径
DATAX_HOME=/opt/datax
# 设置jobs的路径
JOBS HOME=/opt/datax/jobs
# 增量导入数据
python3 $DATAX_HOME/bin/datax.py -p"-Ddate=$dt" $JOBS_HOME/incr-xxx.json

# 增量导入订单数据
# 1.检查Hive表中指定的分区是否存在,如果不存在,需要手动创建分区
if [ ! hive -e "show partitions datax_shop.orders partition(year='$year', month='$month')" ]; then 
    hive -e "alter table datax shop.orders add partition(year='$vear', month='$month')"
fi
# 2.执行增量入订单数据
python3 DATAX_HOME/bin/datax,py -p "-Ddate=Sdt -Dyear=Syear -Dmonth=Smonth" SJOBS-HOME/incr-orders.json

mysqlreader 插件任务切分

问题

1. DataX读取的字段比job中column的字段个数多或者少,或者一行数据转换为多行数据

		问题描述:
				DataX是将reader读取到的数据,默认是CSV格式,分隔符是\t,单行数据分割符是\n\r,所以当读取到的字段中包含\t时,会将一列数据转换为两列,包含\n\r时,会将一行数据转换为两行。导致writer时,报Reason: actual column number is more than schema column number.actual number: 21异常
    处理方式:
    		1、单独处理制表符
								REPLACE(columnName, ' ', '') 
				2、处理字段中包含制表符、回车符和换行符
    						REPLACE(REPLACE(REPLACE(columnName, chr(10),' '), chr(13), ''), chr(9),' ') 
    
  在  job.content.reader.parameter.connection.querySql 中通过 select SQL 语句替换;
    

Tags:

最近发表
标签列表