火山引擎云原生数据仓库ByteHouse与** A** pache Airflow强强结合,为管理和执行数据流程提供了强大而高效的解决方案。 本文将带来ByteHouse与Apache Airflow结合使用的主要优势和特点,展示如何简化数据工作流程,并推动业务成功。
文 | Aelfric Lin 火山引擎ByteHouse团队
一、可扩展可靠的数据流程: Apache Airflow提供了一个强大的平台,用于设计和编排数据流程,更轻松的处理复杂的工作流程。搭配ByteHouse的云原生数据仓库解决方案,可以高效地存储和处理大量数据,确保数据流程的可扩展性和可靠性。
二、自动化工作流管理: Airflow的直观界面通过可视化的DAG(有向无环图)编辑器,使得创建和调度数据工作流程变得容易。通过与ByteHouse集成,可以自动化提取、转换和加载(ETL)过程,减少手动工作量,实现更高效的数据管理。
三、简单的部署和管理: Apache Airflow和ByteHouse均设计为简单的部署和管理。Airflow可以部署在本地或云端,而ByteHouse提供完全托管的云原生数据仓库解决方案。这种组合使得数据基础设施的设置和维护变得无缝化。
/ 业务场景 /
在这个客户场景中,一家名为“数据洞察有限公司(化名)”的分析公司, 他们将Apache Airflow作为数据管道编排工具,同时选择ByteHouse作为数据仓库解决方案, 以利用其强大的分析和机器学习功能。
数据洞察有限公司在电子商务行业运营,需要收集存储在AWS S3中的大量客户和交易数据,他们会定期将这些数据加载到ByteHouse,并执行各种分析任务,做到对业务运营情况的技术洞察。
/ 数据链路 /
数据洞察有限公司使用Apache Airflow,设置了一个基于特定事件或时间表的数据加载管道。例如,他们可以配置Airflow在每天的特定时间触发数据加载过程,或者当新的数据文件添加到指定的AWS S3存储桶时触发 。
当触发事件发生时,Airflow通过从AWS S3中检索相关数据文件来启动数据加载过程。 它使用适当的凭据和API集成确保与S3存储桶的安全身份验证和连接。 一旦数据从AWS S3中获取,Airflow会协调数据的转换和加载到ByteHouse中。 它利用ByteHouse的集成能力,根据预定义的模式和数据模型高效地存储和组织数据。
成功将数据加载到ByteHouse后,可以利用ByteHouse的功能进行分析和机器学习任务,使用ByteHouse的类SQL语言查询数据,进行复杂的分析后生成报告,并洞察客户、销售的趋势以及产品性能。
此外,数据洞察有限公司还利用ByteHouse的功能创建可视化的交互式仪表板。 通过动态仪表板显示实时指标,监控关键绩效指标,并可与其他工作人员同时操作。
最后,数据洞察有限公司利用ByteHouse的机器学习功能来开发预测模型、推荐系统或客户细分算法。 ByteHouse提供了必要的计算能力和存储基础设施,用于训练和部署机器学习模型,使数据洞察有限公司能够获得有价值的预测和算法。
/ 总结 /
通过使用Apache Airflow作为数据管道编排工具,并将其与ByteHouse集成,数据洞察有限公司实现了从AWS S3加载数据到ByteHouse的流畅自动化流程。 他们充分利用ByteHouse的强大分析、机器学习和仪表板功能,获得有价值的洞察,并推动组织内的数据驱动。
/ 步骤一:先决条件 /
在您的虚拟/本地环境中安装pip。
在您的虚拟/本地环境中安装ByteHouse CLI并登录到ByteHouse账户。
参考ByteHouse CLI以获取安装帮助。
macOS上使用Homebrew的示例
`brew install bytehouse-cli`
/ 步骤二:安装Apache Airflow /
在本教程中,我们使用pip在您的本地或虚拟环境中安装Apache Airflow。 了解更多信息,请参阅官方Airflow文档。
`# airflow需要一个目录,~/airflow是默认目录,`
`# 但如果您喜欢,可以选择其他位置`
`#(可选)`
`export AIRFLOW_HOME=~/airflow`
`AIRFLOW_VERSION=2.1.3`
`PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"`
`# 例如:3.6`
`CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"`
`pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"`
如果使用pip无法安装,请尝试使用pip3 install进行安装。 安装完成后,运行命令airflow info以获取有关Airflow的更多信息。
/ 步骤三:Airflow初始化 /
通过执行以下命令来初始化Airflow的Web服务器:
`# 初始化数据库`
`airflow db init`
`airflow users create \`
`--username admin \`
`--firstname admin \`
`--lastname admin \`
`--role Admin \`
`--email admin`
`# 启动Web服务器,默认端口是8080`
`# 或修改airflow.cfg设置web_server_port`
`airflow webserver --port 8080`
设置好Web服务器后,您可以访问 http://localhost:8080/ 使用先前设置的用户名和密码登录Airflow控制台。
在新的终端中,使用以下命令设置Airflow调度器。 然后刷新 http://localhost:8080/
/ 步骤四:YAML配置 /
使用 cd ~/airflow 命令进入Airflow文件夹。
打开名为 airflow.cfg 的配置文件, 添加配置并连接到数据库。 默认情况下,可以使用SQLite,但也可以连接到MySQL。
`# 默认情况下是SQLite,也可以连接到MySQL`
`sql_alchemy_conn = mysql+pymysql://airflow:airflow@xxx.xx.xx.xx:8080/airflow`
`# authenticate = False`
`# 禁用Alchemy连接池以防止设置Airflow调度器时出现故障 https://github.com/apache/airflow/issues/10055`
`sql_alchemy_pool_enabled = False`
`# 存放Airflow流水线的文件夹,通常是代码库中的子文件夹。该路径必须是绝对路径。`
`dags_folder = /home/admin/airflow/dags`
/ 步骤五:创建有向无环图(DAG)作业 /
在Airflow路径下创建一个名为dags的文件夹,然后创建test_bytehouse.py以启动一个新的DAG作业。
`~/airflow`
`mkdir dags`
`cd dags`
`nano test_bytehouse.py`
在test_bytehouse.py中添加以下代码,该作业可以连接到ByteHouse CLI,并使用BashOperator运行任务、查询或将数据加载到ByteHouse中。
`from datetime import timedelta`
`from textwrap import dedent`
`from airflow import DAG`
`from airflow.operators.bash import BashOperator`
`from airflow.utils.dates import days_ago`
`default_args = {`
`'owner': 'airflow',`
`'depends_on_past': False,`
`'email': ['airflow@example.com'],`
`'email_on_failure': False,`
`'email_on_retry': False,`
`'retries': 1,`
`'retry_delay': timedelta(minutes=5),`
`}`
`with DAG(`
`'test_bytehouse',`
`default_args=default_args,`
`description='A simple tutorial DAG',`
`schedule_interval=timedelta(days=1),`
`start_date=days_ago(1),`
`tags=['example'],`
`) as dag:`
`tImport = BashOperator(`
`task_id='ch_import',`
`depends_on_past=False,`
`bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml "INSERT INTO korver.cell_towers_1 FORMAT csv INFILE \'/opt/bytehousecli/data.csv\' "',`
`)`
`tSelect = BashOperator(`
`task_id='ch_select',`
`depends_on_past=False,`
`bash_command='$Bytehouse_HOME/bytehouse-cli -cf /root/bytehouse-cli/conf.toml -q "select * from korver.cell_towers_1 limit 10 into outfile \'/opt/bytehousecli/dataout.csv\' format csv "'`
`)`
`tSelect >> tImport`
在当前文件路径下运行python test_bytehouse.py 以在Airflow中创建DAG。
在浏览器中刷新网页,可以在DAG列表中看到新创建的名为test_bytehouse的DAG。
/ 步骤六:执行DAG /
在终端中运行以下Airflow命令,来查看DAG列表和test_bytehouse DAG中的子任务,再分别测试查询执行和数据导入任务。
`#打印"test_bytehouse" DAG中的任务列表`
`[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse`
`ch_import`
`ch_select`
`#打印"test_bytehouse" DAG中任务的层次结构`
`[root@VM-64-47-centos dags]# airflow tasks list test_bytehouse --tree`
`<Task(BashOperator): ch_select>`
`<Task(BashOperator): ch_import>`
运行完DAG后,查看ByteHouse账户中的查询历史页面和数据库模块,获取查询/加载数据成功执行的结果。
产品介绍
火山引擎ByteHouse
统一的大数据分析平台。目前提供企业版和云数仓两种版本,企业版是基于开源的企业级分析型数据库,支持用户交互式分析PB级别数据,通过多种自研表引擎,灵活支持各类数据分析和应用;云数仓版作为云原生的数据分析平台,实现统一的离线和实时数据分析,并通过弹性扩展的计算层和分布式存储层,有效降低 企业大数据分析。后台回复数字“6”了解产品