干货|ByteHouse+Airflow:六步实现自动化数据管理流程

技术

picture.image

火山引擎云原生数据仓库ByteHouse与** A** pache Airflow强强结合,为管理和执行数据流程提供了强大而高效的解决方案。 本文将带来ByteHouse与Apache Airflow结合使用的主要优势和特点,展示如何简化数据工作流程,并推动业务成功。

picture.image

文 | Aelfric Lin 火山引擎ByteHouse团队

picture.image

一、可扩展可靠的数据流程: Apache Airflow提供了一个强大的平台,用于设计和编排数据流程,更轻松的处理复杂的工作流程。搭配ByteHouse的云原生数据仓库解决方案,可以高效地存储和处理大量数据,确保数据流程的可扩展性和可靠性。

二、自动化工作流管理: Airflow的直观界面通过可视化的DAG(有向无环图)编辑器,使得创建和调度数据工作流程变得容易。通过与ByteHouse集成,可以自动化提取、转换和加载(ETL)过程,减少手动工作量,实现更高效的数据管理。

三、简单的部署和管理: Apache Airflow和ByteHouse均设计为简单的部署和管理。Airflow可以部署在本地或云端,而ByteHouse提供完全托管的云原生数据仓库解决方案。这种组合使得数据基础设施的设置和维护变得无缝化。

picture.image

/ 业务场景 /

在这个客户场景中,一家名为“数据洞察有限公司(化名)”的分析公司, 他们将Apache Airflow作为数据管道编排工具,同时选择ByteHouse作为数据仓库解决方案, 以利用其强大的分析和机器学习功能。

数据洞察有限公司在电子商务行业运营,需要收集存储在AWS S3中的大量客户和交易数据,他们会定期将这些数据加载到ByteHouse,并执行各种分析任务,做到对业务运营情况的技术洞察。

/ 数据链路 /

数据洞察有限公司使用Apache Airflow,设置了一个基于特定事件或时间表的数据加载管道。例如,他们可以配置Airflow在每天的特定时间触发数据加载过程,或者当新的数据文件添加到指定的AWS S3存储桶时触发 。

当触发事件发生时,Airflow通过从AWS S3中检索相关数据文件来启动数据加载过程。 它使用适当的凭据和API集成确保与S3存储桶的安全身份验证和连接。 一旦数据从AWS S3中获取,Airflow会协调数据的转换和加载到ByteHouse中。 它利用ByteHouse的集成能力,根据预定义的模式和数据模型高效地存储和组织数据。

成功将数据加载到ByteHouse后,可以利用ByteHouse的功能进行分析和机器学习任务,使用ByteHouse的类SQL语言查询数据,进行复杂的分析后生成报告,并洞察客户、销售的趋势以及产品性能。

此外,数据洞察有限公司还利用ByteHouse的功能创建可视化的交互式仪表板。 通过动态仪表板显示实时指标,监控关键绩效指标,并可与其他工作人员同时操作。

最后,数据洞察有限公司利用ByteHouse的机器学习功能来开发预测模型、推荐系统或客户细分算法。 ByteHouse提供了必要的计算能力和存储基础设施,用于训练和部署机器学习模型,使数据洞察有限公司能够获得有价值的预测和算法。

/ 总结 /

通过使用Apache Airflow作为数据管道编排工具,并将其与ByteHouse集成,数据洞察有限公司实现了从AWS S3加载数据到ByteHouse的流畅自动化流程。 他们充分利用ByteHouse的强大分析、机器学习和仪表板功能,获得有价值的洞察,并推动组织内的数据驱动。

picture.image

/ 步骤一:先决条件 /

在您的虚拟/本地环境中安装pip。

在您的虚拟/本地环境中安装ByteHouse CLI并登录到ByteHouse账户。

参考ByteHouse CLI以获取安装帮助。

macOS上使用Homebrew的示例


        
 `brew install bytehouse-cli` 
 
      

/ 步骤二:安装Apache Airflow /

在本教程中,我们使用pip在您的本地或虚拟环境中安装Apache Airflow。 了解更多信息,请参阅官方Airflow文档。


        
  `# airflow需要一个目录,~/airflow是默认目录,`
  `# 但如果您喜欢,可以选择其他位置`
  `#(可选)`
  `export AIRFLOW_HOME=~/airflow`
 
  `AIRFLOW_VERSION=2.1.3`
  `PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"`
 
  `# 例如:3.6`
  `CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"`
  `pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"`
 
      

如果使用pip无法安装,请尝试使用pip3 install进行安装。 安装完成后,运行命令airflow info以获取有关Airflow的更多信息。

/ 步骤三:Airflow初始化 /

通过执行以下命令来初始化Airflow的Web服务器:


        
  `# 初始化数据库`
  `airflow db init`
 
 
  `airflow users create \`
  `--username admin \`
  `--firstname admin \`
  `--lastname admin \`
  `--role Admin \`
  `--email admin`
 
  `# 启动Web服务器,默认端口是8080`
  `# 或修改airflow.cfg设置web_server_port`
  `airflow webserver --port 8080`
 
      

设置好Web服务器后,您可以访问 http://localhost:8080/ 使用先前设置的用户名和密码登录Airflow控制台。

picture.image

在新的终端中,使用以下命令设置Airflow调度器。 然后刷新 http://localhost:8080/

/ 步骤四:YAML配置 /

使用 cd ~/airflow 命令进入Airflow文件夹。

打开名为 airflow.cfg 的配置文件, 添加配置并连接到数据库。 默认情况下,可以使用SQLite,但也可以连接到MySQL。


        
  `# 默认情况下是SQLite,也可以连接到MySQL`
  `sql_alchemy_conn = mysql+pymysql://airflow:airflow@xxx.xx.xx.xx:8080/airflow`
 
  `# authenticate = False`
  `# 禁用Alchemy连接池以防止设置Airflow调度器时出现故障 https://github.com/apache/airflow/issues/10055`
  `sql_alchemy_pool_enabled = False`
 
  `# 存放Airflow流水线的文件夹,通常是代码库中的子文件夹。该路径必须是绝对路径。`
  `dags_folder = /home/admin/airflow/dags`
 
      

/ 步骤五:创建有向无环图(DAG)作业 /

在Airflow路径下创建一个名为dags的文件夹,然后创建test_bytehouse.py以启动一个新的DAG作业。


        
  `~/airflow`
  `mkdir dags`
  `cd dags`
  `nano test_bytehouse.py`
 
      

在test_bytehouse.py中添加以下代码,该作业可以连接到ByteHouse CLI,并使用BashOperator运行任务、查询或将数据加载到ByteHouse中。


        
  `from datetime import timedelta`
  `from textwrap import dedent`
 
  `from airflow import DAG`
  `from airflow.operators.bash import BashOperator`
  `from airflow.utils.dates import days_ago`
 
  `default_args = {`
  `'owner': 'airflow',`
  `'depends_on_past': False,`
  `'email': ['airflow@example.com'],`
  `'email_on_failure': False,`
  `'email_on_retry': False,`
  `'retries': 1,`
  `'retry_delay': timedelta(minutes=5),`
  `}`
  `with DAG(`
  `'test_bytehouse',`
  `default_args=default_args,`
  `description='A simple tutorial DAG',`
  `schedule_interval=timedelta(days=1),`
  `start_date=days_ago(1),`
  `tags=['example'],`
  `) as dag:`
 
  `tImport = BashOperator(`
  `task_id='ch_import',`
  `depends_on_past=False,`
  `bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml "INSERT INTO korver.cell_towers_1 FORMAT csv INFILE \'/opt/bytehousecli/data.csv\' "',`
  `)`
 
  `tSelect = BashOperator(`
  `task_id='ch_select',`
  `depends_on_past=False,`
  `bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml -q "select * from korver.cell_towers_1 limit 10 into outfile \'/opt/bytehousecli/dataout.csv\' format csv "'`
  `)`
 
  `tSelect >> tImport`
 
      

在当前文件路径下运行python test_bytehouse.py 以在Airflow中创建DAG。

在浏览器中刷新网页,可以在DAG列表中看到新创建的名为test_bytehouse的DAG。

picture.image

/ 步骤六:执行DAG /

在终端中运行以下Airflow命令,来查看DAG列表和test_bytehouse DAG中的子任务,再分别测试查询执行和数据导入任务。


        
  `#打印"test_bytehouse" DAG中的任务列表`
  `[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse`
  `ch_import`
  `ch_select`
 
  `#打印"test_bytehouse" DAG中任务的层次结构`
  `[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse --tree`
  `<Task(BashOperator): ch_select>`
  `<Task(BashOperator): ch_import>`
 
      

运行完DAG后,查看ByteHouse账户中的查询历史页面和数据库模块,获取查询/加载数据成功执行的结果。

picture.image

产品介绍

火山引擎ByteHouse

统一的大数据分析平台。目前提供企业版和云数仓两种版本,企业版是基于开源的企业级分析型数据库,支持用户交互式分析PB级别数据,通过多种自研表引擎,灵活支持各类数据分析和应用;云数仓版作为云原生的数据分析平台,实现统一的离线和实时数据分析,并通过弹性扩展的计算层和分布式存储层,有效降低 企业大数据分析。后台回复数字“6”了解产品

picture.image

picture.image

picture.image

picture.image

picture.image

picture.image

0
0
0
0
相关资源
字节跳动基于 DataLeap 的 DataOps 实践
随着数字化转型的推进以及业务数仓建设不断完善,大数据开发体量及复杂性逐步上升,如何保证数据稳定、正确、持续产出成为数据开发者核心诉求,也成为平台建设面临的挑战之一。本次分享主要介绍字节对于DataOps的理解 以及 DataOps在内部业务如何落地实践。
相关产品
评论
未登录
看完啦,登录分享一下感受吧~
暂无评论