在本系列中,你了解了数据工程中的 Python 最佳实践,以及如何构建更健壮和可扩展的软件。今天,我们将采用一种称为设计模式的更高级编程概念,它是软件设计中常见问题的可重用解决方案。您可能会在数据工程项目中遇到这些问题。
我们将看看什么是设计模式,为什么要使用它们,并特别深入探讨一种模式:工厂模式。我们还将讨论为什么数据工程师会从使用工厂模式中受益。
工厂模式优雅地简化了数据连接器的创建,并使现有数据基础结构更具可扩展性。这对于处理具有不同处理需求的各种数据源的数据工程师特别有用。
工厂模式定义:在软件设计中,“工厂模式”允许您创建一个类,其子类决定要实例化哪个类。工厂模式允许在不指定将要创建的确切对象类的情况下创建对象,从而促进代码中的松散耦合和可伸缩性。此模式通常使用工厂方法或工厂类来实现,以基于输入条件或条件生成对象的实例。
工厂模式称为“创建设计模式”(本文稍后将对此进行说明)。
在本课程的这一部分中,我们将了解工厂模式如何在数据工程中实现可重用性、可伸缩性和可维护性。
Python 中的设计模式作为模板工作,可以应用于重复性任务或问题,因此在数据工程中非常有用。对于 Python 数据工程师来说,设计模式为数据处理和集成任务中反复出现的挑战提供了结构化且高效的解决方案。它们还提供共享词汇表,促进团队成员之间更清晰的沟通,从而实现更一致和协作的软件设计。
Python 中的设计模式以及一般编程中的设计模式通常被认为是中级到高级概念,因为它们通常需要了解编程原理、面向对象设计以及识别和抽象更大、更复杂的系统(即代码架构)中反复出现的问题的能力。Python的设计模式通常分为三种类型:
设计模式为数据工程任务提供了明显的优势,这些任务与软件设计的核心原则相呼应:
通过将资产工厂等设计模式集成到数据工程工作流(从数据提取、分析数据、数据转换等)中,我们为更顺畅的操作、更少的错误和更高效的系统铺平了道路,确保管理和优化数据以获得最佳结果。
每个设计模式都用于防止在项目缩放时出现特定问题。今天,我们将重点介绍工厂模式,该模式用于构建多个类似的东西,以促进集中配置、标准化测试,并在遵守一致性的同时允许灵活性。
工厂模式被归类为创建模式,因为它们在 Python 编程中创建对象。它们根据某些条件语句或参数返回不同的对象。
将工厂模式视为公司中的一个专业部门,只专注于生产某些产品。该部门负责制造的所有细节,公司的其他部门只需在需要时要求产品,而不必担心其制造方式。
同样,工厂模式负责创建特定对象的所有细节。应用程序的其余部分不需要知道这些对象的创建方式或它们需要哪些参数。它只是要求“工厂”生产对象,并信任它来处理其余的。这种分离使代码更清晰、更易于理解。
在 Python 中,实现工厂模式特别精简,这要归功于它的动态类型和一流的函数。您可以从工厂函数返回不同的类甚至函数,而无需太多样板。
此外,许多 Python 库和框架利用工厂模式或类似工厂的模式,即使它不是显式的或完全相同的。例如,像SQLAlchemy这样的ORM(对象关系映射库)使用工厂来创建数据库会话对象。SQLAlchemy 可以比作工厂模式,因为它生成新的会话实例,充当数据库通信的主要接口。sessionmaker()
Python 的内置功能(如装饰器)可用于增强工厂模式。例如,装饰器可用于向工厂注册类,从而扩展工厂的功能,而无需显式修改它。
想象一下数据工程中的一个常见场景:用于操作来自不同文件格式的数据的数据管道:CSV、JSON 和 XML 文件。根据文件类型,应应用不同的分析步骤。
我们将使用一个简单的字典作为这些文件解析器的“注册表”,并将函数作为我们的工厂。
首先,我们将定义解析函数:
- import csv
- import json
- import xml.etree.ElementTree as ET
-
- def parse_csv(file_path):
- with open(file_path, mode='r') as file:
- reader = csv.reader(file)
- return list(reader)
-
- def parse_json(file_path):
- with open(file_path, mode='r') as file:
- return json.load(file)
-
- def parse_xml(file_path):
- tree = ET.parse(file_path)
- root = tree.getroot()
- return root # you'd typically add more logic to process the XML tree
然后,我们将定义一个装饰器来注册这些解析器:
- PARSERS = {}
-
- def register_parser(file_type):
- def decorator(fn):
- PARSERS[file_type] = fn
- return fn
- return decorator
我们将注册我们的解析器:
- @register_parser('csv')
- def csv_parser(file_path):
- return parse_csv(file_path)
-
- @register_parser('json')
- def json_parser(file_path):
- return parse_json(file_path)
-
- @register_parser('xml')
- def xml_parser(file_path):
- return parse_xml(file_path)
最后,我们将编写一个函数来获取正确的解析器,并使用工厂来解析文件:
- def get_parser(file_type):
- return PARSERS.get(file_type)
-
- data_csv = get_parser('csv')('data.csv')
- data_json = get_parser('json')('data.json')
- data_xml = get_parser('xml')('data.xml')
对于数据工程师来说,处理多种文件格式是很常见的,并且能够使用新的解析器(如XML,Parquet等)轻松扩展系统至关重要。通过此设置,数据工程师只需定义一个新的解析函数并将其注册到装饰器,即可轻松扩展系统以支持新的文件类型。
无需接触现有的工厂逻辑,使其易于维护和扩展。通过将工厂模式与装饰器一起使用,我们可以简化此过程并维护更干净、更模块化的代码。
数据工程师或数据科学家经常将工厂模式用于日常任务,例如批处理、构建实时数据流和 ETL 管道。
例如,假设工作流中有各种类型的数据连接,例如数据库、文件或 API。您可以使用工厂模式根据当时的需求为您创建正确的连接,而不是手动创建与每个连接的连接。可以把它想象成一条装配线,在需要时准确地生产你需要的东西,而不会用不必要的细节弄乱其余的代码。
让我们考虑一个需要连接到不同类型的数据库的场景,如MySQL和PostgreSQL。工厂模式可用于根据给定输入创建适当的数据库连接。下面是一个说明此模式的简单示例:
让我们首先定义每个数据库的连接:
- import mysql.connector
- import psycopg2
-
- def connect_mysql(host, user, password, database):
- connection = mysql.connector.connect(
- host=host,
- user=user,
- password=password,
- database=database
- )
- return connection
-
- def connect_postgresql(host, user, password, database):
- connection = psycopg2.connect(
- host=host,
- user=user,
- password=password,
- dbname=database
- )
- return connection
接下来,我们将定义一个装饰器来注册数据库连接:
- DB_CONNECTIONS = {}
-
- def register_db_connector(db_type):
- def decorator(fn):
- DB_CONNECTIONS[db_type] = fn
- return fn
- return decorator
然后,我们将注册连接:
- @register_db_connector('mysql')
- def mysql_connector(host, user, password, database):
- return connect_mysql(host, user, password, database)
-
- @register_db_connector('postgresql')
- def postgresql_connector(host, user, password, database):
- return connect_postgresql(host, user, password, database)
最后,我们将编写一个函数来获取正确的连接器,并使用工厂来获取适当的数据库连接:
- def get_db_connector(db_type):
- if db_type not in DB_CONNECTIONS:
- raise ValueError(f"Unsupported database type: {db_type}")
- return DB_CONNECTIONS[db_type]
-
- # Example usage:
- mysql_conn = get_db_connector('mysql')('localhost', 'user', 'password', 'mydb')
- postgres_conn = get_db_connector('postgresql')('localhost', 'user', 'password', 'mydb')
通过此设置,将来添加对新型数据库连接的支持非常简单。我们首先定义连接函数,然后使用装饰器注册它。无需更改其他部件,展示了工厂模式的可维护性和可扩展性优势。
工厂在以下情况下特别有用:
我们将看两个示例,说明工厂资产在现实世界中的工作方式。
当您需要抓取不同类型的页面但希望为所有页面保持一致的界面时,工厂模式在网页抓取中特别有用。让我们看看工厂模式如何用于按人口抓取维基百科的国家和依赖关系表:
首先,请确保安装以下内容:
- pip install requests
- pip install beautifulsoup4
然后,定义函数以从维基百科中抓取不同的表。假设维基百科可能有多个表以不同的格式表示这些数据。一个表可能是标准表,而另一个表可能是针对移动设备优化的。
- from bs4 import BeautifulSoup
- import requests
-
- def scrape_standard_table(url):
- page = requests.get(url)
- soup = BeautifulSoup(page.content, 'html.parser')
-
- # Assuming the first table on the page is the one of interest
- table = soup.find_all("table")[0]
-
- rows = table.find_all("tr")
- data = []
- for row in rows[1:]: # skipping the header row
- columns = row.find_all("td")
- country = columns[0].get_text(strip=True)
- population = columns[1].get_text(strip=True)
- data.append((country, population))
- return data
-
- def scrape_mobile_table(url):
- page = requests.get(url)
- soup = BeautifulSoup(page.content, 'html.parser')
-
- # Mobile tables might be different, for the sake of example let's assume they're div-based
- table_div = soup.find("div", {"class": "mobile-table"})
- rows = table_div.find_all("div", {"class": "row"})
- data = []
- for row in rows:
- country = row.find("div", {"class": "country"}).get_text(strip=True)
- population = row.find("div", {"class": "population"}).get_text(strip=True)
- data.append((country, population))
- return data
然后,我们将定义一个装饰器来注册抓取函数:
- SCRAPERS = {}
-
- def register_scraper(scraper_type):
- def decorator(fn):
- SCRAPERS[scraper_type] = fn
- return fn
- return decorator
现在我们将注册我们的抓取函数:
- @register_scraper('standard')
- def standard_scraper(url):
- return scrape_standard_table(url)
-
- @register_scraper('mobile')
- def mobile_scraper(url):
- return scrape_mobile_table(url)
最后,我们将编写我们的函数来获取正确的抓取工具,并使用工厂从维基百科获取数据:
- def get_scraper(scraper_type):
- if scraper_type not in SCRAPERS:
- raise ValueError(f"Unsupported scraper type: {scraper_type}")
- return SCRAPERS[scraper_type]
-
- # Example usage:
- url = "https://en.wikipedia.org/wiki/List_of_countries_and_dependencies_by_population"
- data_standard = get_scraper('standard')(url)
- # data_mobile = get_scraper('mobile')(url) # if you had a mobile URL
在此示例中,添加对将来抓取不同格式的支持(如维基百科中的不同表结构)很简单:定义抓取函数,然后使用装饰器注册它。这可确保抓取代码保持模块化且易于扩展,而无需修改现有逻辑。
Dagster 是一个数据编排器,可为数据处理的不同阶段(从引入到机器学习)提供单一管理平台。Dagster帮助安排和观察广泛的数据工程工具,Python作为其编程语言。它被数据工程师和数据科学家广泛用于数据科学、数据分析、大数据、机器学习等一系列应用。
Dagster 的功能之一是能够管理资产,这些资产是数据计算的输出。资产表示一段数据或具有价值且值得跟踪的计算结果。这可以是数据库中的表、磁盘上的文件、模型工件等。资产通常是管道的输出。
资源工厂是 Dagster 中的一项功能,允许用户以声明方式定义资产的生成方式。它们可以被视为通过定义生成资产所需的输入、输出和计算来创建资产的模板。
让我们演练一个简单的示例,在该示例中,我们将通过在 Dagster 中应用 Factory 模式来重构现有代码块以生成一组资产。
在开始之前,请确保安装Dagster:
pip install dagster dagster-webserver
假设我们是非营利组织数据工程团队的一员。我们有一些现有的代码来查询捐赠者平台的 API 并将结果写入文件(CSV 或 JSON)。它目前看起来像这样:
- from dagster import asset
- import requests
- import csv
-
- @asset
- def volunteers():
- result = requests.get('www.donorplatform.org/api/v1/volunteers')
- with open('volunteers.csv', 'w') as f:
- writer = csv.writer(f)
- writer.writerows(result)
-
- @asset
- def donations():
- result = requests.get('www.donorplatform.org/api/v2/donations')
- with open('donations.csv', 'w') as f:
- writer = csv.writer(f)
- writer.writerows(result)
-
- @asset
- def donors():
- result = requests.get('www.donorplatform.org/api/v1/donors')
- with open('donors.json', 'w') as f:
- f.write(result)
我们的运营团队最近扩大了对捐赠者平台的使用,并要求我们从 50 个新的 API 端点运行数据提取。这变得不守规矩且难以管理,因为您知道这将需要很长时间,数据工程师将偏离他们检索数据的方式,并且很难测试。
使用我们新的 python 技能,让我们为此应用工厂模式来解决这些问题。
首先,让我们定义可以配置的内容。需要为每个资产自定义三个部分:
因此,我们将定义一个 JSON 对象来集中资产的可能配置。
- specs = [
- {
- 'name': 'volunteers',
- 'endpoint': 'v1/volunteers',
- 'file_type': 'csv'
- },
- {
- 'name': 'donations',
- 'endpoint': 'v2/donations',
- 'file_type': 'csv'
- },
- {
- 'name': 'donors',
- 'endpoint': 'v1/donors',
- 'file_type': 'json'
- }
- ]
然后,我们将定义和泛化我们的资产函数,以采用规范并生成资产
- spec = specs[0] # take a single spec as reference while building
-
- @asset(name=spec['name'])
- def generic_asset():
- result = requests.get(f'www.donorplatform.org/api/{spec["endpoint"]}')
- with open(f'{spec["name"]}.f{spec["file_type"]}', 'w') as f:
- if spec["file_type"] == 'csv':
- writer = csv.writer(f)
- writer.writerows(result)
- elif spec["file_type"] == 'json':
- f.write(result)
最后,让我们将通用资产包装在一个函数中,该函数将用作生成所有资产的工厂。该函数将采用规范并将其应用于我们的资产。
- def generate_donor_platform_asset(spec):
- @asset(name=spec['name'])
- def _asset():
- result = requests.get(f'www.donorplatform.org/api/{spec["endpoint"]}')
- with open(f'{spec["name"]}.f{spec["file_type"]}', 'w') as f:
- if spec["file_type"] == 'csv':
- writer = csv.writer(f)
- writer.writerows(result)
- elif spec["file_type"] == 'json':
- f.write(result)
-
- return _asset
在您的工厂中,您只需定义其他规格即可生产更多资产。以下是它在生产中的使用方式:
- from dagster import Definitions, asset
- import requests
- import csv
-
- specs = [
- {'name': 'volunteers', 'endpoint': 'v1/volunteers', 'file_type': 'csv'},
- {'name': 'donations', 'endpoint': 'v2/donations', 'file_type': 'csv'},
- {'name': 'donors', 'endpoint': 'v1/donors', 'file_type': 'json'},
- {'name': 'projects', 'endpoint': 'v1/projects', 'file_type': 'json'},
- {'name': 'fundraisers', 'endpoint': 'v1/fundraisers', 'file_type': 'csv'},
- ]
-
- def generate_donor_platform_asset(spec):
- @asset(name=spec['name'])
- def _asset():
- result = requests.get(f'www.donorplatform.org/api/{spec["endpoint"]}')
- with open(f'{spec["name"]}.f{spec["file_type"]}', 'w') as f:
- if spec["file_type"] == 'csv':
- writer = csv.writer(f)
- writer.writerows(result)
- elif spec["file_type"] == 'json':
- f.write(result)
-
- return _asset
-
-
- defs = Definitions(assets=[generate_donor_platform_asset(spec) for spec in specs])
如果您运行 ,您将可以访问 dagster 的 UI 和 localhost:3000 上的资产图。当您向列表中添加更多规范并重新加载定义时,您将看到生成的更多资产。dagster dev
这是 Dagster 中资产工厂入门的基本数据工程示例。随着您深入研究 Dagster,您可以探索更高级的功能。Dagster还提供与许多数据库和数据系统的集成。这使得使用资源工厂以各种格式和位置生成资产变得容易。可以使用资产工厂在云存储系统上生成文件、执行 SQL 或训练机器学习模型。
因此,python项目中的资产工厂提供了相同的三个好处:
工厂模式是数据工程师的宝贵工具,尤其是在处理不同类型的数据源或复杂对象创建时。它简化了您的代码,使其更具可重用性、可扩展性和可维护性。
作为最流行的编程语言之一,在使用 Python 进行数据工程时,工厂模式非常有用。虽然这些也适用于其他语言,但您最有可能遇到 Python 作为通用编程语言,落后于大多数流行的数据工程任务工具。