C/C++教程

用PydanticAI和Gemini 2.0构建Airflow的AI助手

本文主要是介绍用PydanticAI和Gemini 2.0构建Airflow的AI助手,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
使用 PydanticAI 创建一个可以与 Airflow DAGs 进行交互的 AI 代理程序
从混乱到掌控之旅

在航空的早期开拓年代,飞行员在云层中穿梭飞行,仅依靠基本的仪器和纯粹的直觉。每一次飞行都是人类判断与机械力量之间微妙的舞蹈,很大程度上依赖经验和直觉来取得成功。一点点计算失误或突如其来的天气变化都可能带来灾难。他们使用了令人惊叹的技术,却对其控制有限。

当我第一次将LLM应用到生产系统中时,我感觉自己就像早期的试飞员(test pilots),掌控着巨大的力量,却只有简陋的工具。每一次部署都像是在赌注。

AI代理们准备迎接Airflow,来源:由Adobe Firefly生成

今天的AI发展就像早期航空挑战一样。我们有像Gemini 2.0这样强大的模型——能够理解上下文,生成类似人类的回应,并处理复杂的指令。然而,把这种力量用于实际应用时,常常感觉像是在没有导航工具的情况下穿越风暴。

就像现代航空通过适当的仪器和控制系统的应用,从冒险演变为可靠的交通工具一样,人工智能的发展也在向代理转变。与传统的AI只是回答问题不同,代理积极地与其环境进行互动。它们不仅做出决策,使用工具,还会代你执行任务。现代人工智能代理,如由Gemini等大型语言模型驱动的代理,能够理解自然语言指令,将复杂任务分解为更小的步骤,并提供结构化的输出和监控结果。

PydanticAI 正从天边出现。它是由 Pydantic 的团队创建的——同一个为许多知名项目提供支持的基础——它是一个专为现代AI开发设计的框架,为代理系统带来了控制力和可靠性。

想象 PydanticAI 是你飞机的现代化驾驶舱——将助手系统、发动机控制和仪表盘整合成一个统一的界面。它提供清晰的读数和可预测的控制,最关键是,它能让你有信心应对复杂情况。它把混乱变得有条理。

在这篇文章中,我们将通过构建一个与Apache Airflow交互的AI代理来测试PydanticAI的功能。我们将创建一个系统,该系统能够理解关于您工作流的自然语言查询,获取实时状态更新,用结构化、可靠的数据回应。从此不再对您的DAG操作感到迷茫。

🚀 直接看代码?访问 GitHub 上的项目:_
https://github.com/vojay-dev/pydantic-airflow-agent

– 从混乱到掌控的旅程
– 为何选择PydanticAI?体验AI开发中的FastAPI风格
−− 当前的行业态势
−− PydanticAI的简洁性
−− PydanticAI的未来
– PydanticAI基础:快速上手指南
– 镜子里的DAG状态如何?
−− 实现一些示例DAG
−− 使用PydanticAI实现Airflow AI代理
−− 演示
– 从盲飞到晴空

为何选择 PydanticAI?利用FastAPI开发AI时的流畅体验

构建生产级别的AI应用程序不应该像解谜一样让人感觉费劲。然而,当我第一次探索AI框架的世界时,它确实让我有这种感觉。让我来分享一下为什么PydanticAI正成为我进行现代AI开发的首选工具。

现在的状况

AI框架生态系统中选择丰富:

  • LangChain: 功能全面但较为复杂,提供了许多集成和功能。
  • crewAI: 专长于多代理编排,适合处理复杂的代理互动。
  • Instructor: 专注于结构化输出和指令执行。

每个方法都有其优势,但往往伴随着相当复杂的特性以及较陡的学习曲线。

PydanticAI 简洁
from pydantic_ai import Agent

agent = Agent('gemini-1.5-flash', system_prompt='请简答。')
result = agent.run_sync('为什么选用PydanticAI?')

当我第一次看到 PydanticAI 的示例,它让我想起了我第一次接触 FastAPI 的经历——干净、直观,刚刚好。

PydanticAI 的独特之处在于

由 Pydantic 团队打造

  • 深度集成到 Pydantic 生态系统中
  • 真正能帮助开发的类型安全性
  • 让 FastAPI 开发者熟悉的模式

生产就绪的设计

  • 适用于各种模型(OpenAI,Anthropic,Gemini,Ollama)
  • 内置依赖注入以进行测试
  • 无缝集成 Logfire,实现实时监控功能

保持代码整洁,易于维护

  • 最少的样板代码
  • 强类型检查
  • 易于理解的错误信息

注意: Logfire 的集成简单而优雅,能够详细了解代理的流动。官方 PydanticAI 页面上有不少精彩的示例,不过我还没试过。如果你想深入了解这个框架,我强烈推荐你去看看。如果试了,请告诉我你的感受哦😉

PydanticAI 的未来展望

PydanticAI 的真正力量在于它与现代 Python 开发实践的符合。因此,随着 Pydantic 仍然是许多主要 Python 框架和 AI 库的支柱,PydanticAI 的紧密整合变得越来越重要。

它的未来前景看起来很有前途,因为

  • Pydantic生态系统集成的扩展和增强
  • 核心Pydantic团队的积极开发工作
  • 专注于提升开发者体验和确保产品就绪

什么时候考虑其他选择?如果你需要 LangChain 的大量集成支持、crewAI 的多代理功能,或 Instructor 的专门指令管理。但对于大多数 AI 应用来说,PydanticAI 提供了您所需要的所有功能,但复杂度更低。

PydanticAI 带来了开发 AI 的“FastAPI 风格”——而这正是这个领域迫切需要的。这不仅仅是减少代码的问题;更是关于编写更好、更易于维护的 AI 应用。

注意: PydanticAI 仍处于早期开发阶段,但是鉴于 Pydantic 团队以往的表现,我对它的未来感到非常乐观。它已经预示了未来 AI 开发工具的模样。

PydanticAI 快速入门:基础知识

构建 PydanticAI 的 AI 代理感觉就像编写普通的 Python 代码一样自然。现在我们来看看三个核心模式,这些模式使其既强大又简单易用。

我会尽可能简短地说明,毫不夸张地说,PydanticAI 的文档是我读过的最好的文档之一。总体来说,Pydantic 项目拥有出色的文档,文风生动有趣,信息丰富。因此,想要了解更多,直接查阅文档是最好的选择。本文不仅仅局限于官方文档,而是旨在探索该框架在现实世界中的一个创意应用。

基本代理人

agent = Agent('gemini-1.5-flash', system_prompt='简短回答。')  
result = agent.run_sync('PydanticAI是什么?')

最简单来说,代理程序就是一个封装LLM的工具,负责管理对话流程。你选择你的模型并设置系统提示,就可以开始聊天了。

结构化结果

    class 天气信息类(BaseModel):  
        temperature: float  # 温度信息
        condition: str  # 天气状况

    天气代理 = Agent('gemini-1.5-flash', result_type=天气信息类)  # 天气代理

而不是解析自由文本,PydanticAI 引导大型语言模型返回结构化数据。你的集成开发环境会获得类型提示,你可以确保得到正确的数据结构。

工具

     @agent工具装饰器  
    async def 获取城市温度(city: str) -> float:  
        """获取指定城市的当前气温。"""  
        return await weather_api.get(city)

工具是你的代理可以使用的一些功能。例如,它们可以让代理执行实际操作,比如调用API或获取数据。

PydanticAI的工具系统特别聪明的地方在于它处理函数签名的方法。该框架会自动提取参数(除了RunContext参数)来构建工具的结构,甚至还会从你的注释中提取参数的描述,利用griffe。

这种智能解析功能意味着你的工具不仅实用,它们还自带文档。大语言模型很清楚如何使用这些工具,因为文档已经内置在结构中。再也不用手动维护单独的工具说明了!

我喜欢 PydanticAI 的地方是它的模式可以自然地组合在一起。从一个基本的代理开始入手,当你需要干净的数据时,就可以添加结构;当你需要现实世界的互动时,就可以加入工具。它会随着你的需求一起成长!🛠️ 添加工具,更实用

有了这个基础就足够了,开始构建强大的AI代理程序。接下来我们将要介绍的Airflow示例中,我们会看到这些模式在实际项目中的应用情况。

镜子,挂在墙上的镜子,DAG的状态是什么?

本教程的完整代码可以在 GitHub 上找到。尽管会在此介绍关键部分,但您可以直接克隆仓库以便边做边学。

    git clone git@github.com:vojay-dev/pydantic-airflow-agent.git

这会克隆一个Git仓库到你的本地机器。

请记住,PydanticAI 正在快速开发,不过这同时也意味着这个演示项目的细节未来可能会有所调整。然而,它一定会帮助你掌握基本原理,并启发你自己的 PydanticAI 项目。

通过这个项目,我们的目标是超越简单的文档和示例。让我们创建一个能通过Airflow REST API与Airflow进行互动的人工智能代理。你只需问它关于某个DAG的状态,而无需指定确切的DAG ID。只需描述这个DAG,代理会检索所有的DAG,然后找出最相关的一个,并获取并返回所选DAG的状态信息,以结构化格式呈现。

为了简便起见,我们使用了一个带有Docker和Astro CLI(可以通过brew install astro安装)的本地Airflow环境,这是一种有效启动Airflow项目的有效方式。我们将PydanticAI代理和Airflow设置整合到同一项目中,以方便使用。通常,这两个部分是分开的。

我们正在使用最新版本的Airflow,即2.10.4(截至本文撰写之时)。如果您在Airflow 3发布之后阅读此文,那真是太棒了!我迫不及待想看到新的UI和其他酷炫的功能。不过,这也意味着很多地方可能已经发生了显著变化。不过,您仍然可以从中了解到如何适应新变化。

首先,我们使用Poetry来设置项目并安装所需的依赖项,首先是安装PydanticAI,然后通过Astro CLI来创建一个Airflow环境。

    poetry new pydantic-airflow-agent  # 新建一个名为pydantic-airflow-agent的项目
    cd pydantic-airflow-agent          # 切换到pydantic-airflow-agent目录下
    poetry add pydantic-ai            # 安装pydantic-ai这个依赖项
    poetry add colorlog               # 安装colorlog这个依赖项

在添加 Airflow 依赖项之前,记得修改 pyproject.toml 文件中的 Python 依赖项要求。

    python = ">=3.12,<3.13"(要求Python版本在3.12及以上但低于3.13)

现在,让我们添加Airflow依赖项。

    poetry add apache-airflow

注:上述命令是在使用Poetry包管理器时添加apache-airflow包的指令。

最后,把本地的Airflow环境运行起来。

    astro dev init # 选择在非空文件夹中初始化项目  
    astro dev start # 启动项目
实现一些示例的DAG

重点是 PydanticAI 驱动的 AI 代理,然而,如果没有一些 DAG,我们无法进行任何交互,我们尽量做到最简单,并且仅仅添加一些基本上没有实际作用的 DAG。

    import pendulum    
    from airflow.decorators import dag, task    
    from airflow.operators.smooth import SmoothOperator    

    start_date = pendulum.datetime(2024, 12, 1, tz="UTC")  # 开始日期设置为UTC时间的2024年12月1日    

    @dag(schedule='@daily', start_date=start_date)    
    def payment_report():    
        SmoothOperator(task_id='some_task')  # 创建一个任务,任务ID为'some_task'    

    @dag(schedule='@daily', start_date=start_date)    
    def customer_profile():    
        SmoothOperator(task_id='some_task')  # 创建一个任务,任务ID为'some_task'    

    payment_report()    
    customer_profile()

如果你还不熟悉 SmoothOperator,可以在 Airflow 的日志中查找有关它的信息。这是一个让人开心的小惊喜,能让数据工程师们会心一笑。

本地的Airflow设置包含两个示例DAG,由作者提供

实现一个Airflow AI代理程序

因为我们想通过Airflow的REST API来操作Airflow,我们可以从中推断出代理的一些依赖项。我们需要Airflow服务的基URL,API运行的端口和用户名和密码。

我们也期望我们的AI代理能以一个结构化的对象回应,该对象表示DAG(有向无环图)的状态,包括一些有趣的属性。我们来定义一下依赖关系和输出模型。

    @dataclass    
    class Deps:    
        airflow_api_base_uri: str    
        airflow_api_port: int    
        airflow_api_user: str    
        airflow_api_pass: str    # API 密码    

    class DAGStatus(BaseModel):    
        dag_id: str = Field(description='DAG的ID')    
        dag_display_name: str = Field(description='DAG的显示名称')    
        is_paused: bool = Field(description='DAG 是否处于暂停状态')    
        next_dag_run_data_interval_start: str = Field(description='下一个 DAG 运行的开始时间')    
        next_dag_run_data_interval_end: str = Field(description='下一个 DAG 运行的结束时间')    
        last_dag_run_id: str = Field(default='无运行记录', description='最后一次 DAG 运行的 ID')    
        last_dag_run_state: str = Field(default='无运行记录', description='最后一次 DAG 运行的状态')    
        total_dag_runs: int = Field(description='总运行次数')

就这样,我们可以定义我们的模型和代理。在这一示例中,我们使用最新的 Gemini 2.0 版本。

注意: 我尝试了多种模型。许多模型在交互工具功能的顺序或利用结果构建最终结构化输出方面遇到了困难。Gemini 2.0 Flash 表现最好,但我也建议试试其他支持的模型。你可以通过 Ollama 使用 Mistral 或 Llama 3.3,这两个模型也支持工具功能和结构化输出。然而,在这个演示中,我们将使用表现最好的模型。最终,我们希望有一个值得信赖的代理。你会乘坐一架只能偶尔正常工作的飞机吗?

    model = VertexAI模型(    
        model_name='gemini-2.0-flash-exp',    
        service_account_file='gcp-credentials.json'    
    )    

    airflow_agent = 代理(    
        model=model,    
        system_prompt=(    
            '你是一个Airflow监控助手。对于每个请求,你应:\n'    
            '1. 首先使用`list_dags`获取可用的DAG\n'    
            '2. 将用户请求匹配到最相关的DAG ID\n'    
            '3. 使用`get_dag_status`获取DAG的状态详情'    ),    
        result_type=DAG状态,    
        deps_type=依赖项,    
        retries=2    
    )

正如你所见,我对代理应该如何处理请求并与工具功能互动有着非常清楚和严格的要求。这很大程度上取决于你用的是什么模型。根据不同的使用场景,有时候不明确指定任何工具功能也能很好地运行。

接下来,我们将给AI代理添加一个功能,让它能查到DAG列表。我们会提供DAG的ID和显示名称,这样模型就能选择一个最适合回答用户问题的DAG。

    @airflow_agent.tool  
    async def list_dags(ctx: RunContext[Deps]) -> str:  
        """  
        获取所有DAG的列表。返回DAG的ID及其显示名称。  
        """  
        logger.info('正在获取所有DAG...')  
        uri = f'{ctx.deps.airflow_api_base_uri}:{ctx.deps.airflow_api_port}/api/v1/dags'  
        auth = (ctx.deps.airflow_api_user, ctx.deps_airflow_api_pass)  

        async with AsyncClient() as client:  
            response = await client.get(uri, auth=auth)  
            response.raise_for_status()  

            dags_data = response.json()['dags']  
            result = json.dumps([  
                {'dag_id': dag['dag_id'], 'dag_display_name': dag['dag_display_name']} for dag in dags_data  
            ])  
            logger.debug(f'已获取的DAG如下:{result}')  
            return result

每个工具功能都会接收到RunContext,其中包含了之前定义的依赖。这样一来,我们就可以轻松地连接到Airflow API并获取所需的数据。

当代理确定了哪一个DAG ID最适合用户的请求后,它需要检索DAG及其运行的详细信息以便计算结构化的输出,也就是我们前面提到的模型类别。

因此,让我们再加一个工具功能以便获取更多细节。

    @airflow_agent.tool  
    async def get_dag_status(ctx: RunContext[Deps], dag_id: str) -> str:  
        """  
        根据DAG ID获取特定DAG的详细状态信息。  
        """  
        logger.info(f'正在获取ID为 {dag_id} 的DAG的状态信息')  
        base_url = f'{ctx.deps.airflow_api_base_uri}:{ctx.deps_airflow_api_port}/api/v1'  
        auth = (ctx.deps.airflow_api_user, ctx.deps_airflow_api_pass)  

        try:  
            async with AsyncClient() as client:  
                dag_response = await client.get(f'{base_url}/dags/{dag_id}', auth=auth)  
                dag_response.raise_for_status()  

                runs_response = await client.get(  
                    f'{base_url}/dags/{dag_id}/dagRuns',  
                    auth=auth,  
                    params={'order_by': '-execution_date', 'limit': 1}  
                )  
                runs_response.raise_for_status()  

                result = {  
                    'dag_data': dag_response.json(),  
                    'runs_data': runs_response.json()  
                }  

                logger.debug(f'DAG的状态信息: {json.dumps(result)}')  
                return json.dumps(result)  

        except httpx.HTTPStatusError as e:  
            if e.response.status_code == 404:  
                return f'未找到指定ID {dag_id} 的DAG'  
            raise

有了这些,我们已经具备了所有必需的工具,可以按如下所示运行这个代理。

    async def 主():  
        deps = Deps(  
            airflow_api_base_uri='http://localhost',  
            airflow_api_port=8080,  
            airflow_api_user='admin',  
            airflow_api_pass='admin'  
        )  

        用户查询 = '我们每日支付报告的DAG状态如何?'  
        结果 = await airflow_agent.run(用户查询, deps)  
        打印(结果.data)  

    if __name__ == '__main__':  
        asyncio.run(主())

你可能已经注意到,PydanticAI 中许多操作使用了 asyncawait。这并不是随意的选择,而是一个非常有用的特性,这使得我们的应用程序在处理像 API 调用或模型交互操作这样的 I/O 操作时更加高效。

可以将 async 想象成一个擅长同时处理多项任务的高手。当你在做饭时,你会准备其他食材,而不是干等着水烧开。这就是 async 在我们代码中所做的。当我们的代理发出 API 请求或等待 LLM 响应时,async 会让我们的代理在等待 API 响应或 LLM 回复时,不会停滞其他任务,而是可以处理其他任务——比如处理另一个请求,或者更新日志。这在高效运作至关重要的生产环境中特别有价值。

让我们在演示开始前先把所有内容整合一下。下面是完整的Airflow AI代理代码,采用了PydanticAI。

    import asyncio  
    import json  
    import logging  
    from dataclasses import dataclass  
    from devtools import pprint  

    import colorlog  
    import httpx  
    from httpx import AsyncClient  
    from pydantic import BaseModel, Field  
    from pydantic_ai import Agent, RunContext  
    from pydantic_ai.models.vertexai import VertexAIModel  

    log_format = '%(log_color)s%(asctime)s [%(levelname)s] %(reset)s%(purple)s[%(name)s] %(reset)s%(blue)s%(message)s'  
    handler = colorlog.StreamHandler()  
    handler.setFormatter(colorlog.ColoredFormatter(log_format))  
    logging.basicConfig(level=logging.INFO, handlers=[handler])  

    logger = logging.getLogger(__name__)  

    @dataclass  
    class Deps:  
        airflow_api_base_uri: str  
        airflow_api_port: int  
        airflow_api_user: str  
        airflow_api_pass: str  

    class DAGStatus(BaseModel):  
        dag_id: str = Field(description='DAG的ID')  
        dag_display_name: str = Field(description='DAG的显示名称')  
        is_paused: bool = Field(description='DAG是否暂停')  
        next_dag_run_data_interval_start: str = Field(description='下一个DAG运行的数据区间开始时间')  
        next_dag_run_data_interval_end: str = Field(description='下一个DAG运行的数据区间结束时间')  
        last_dag_run_id: str = Field(default='没有DAG运行记录', description='最后一次运行的DAG的ID')  
        last_dag_run_state: str = Field(default='没有DAG运行记录', description='最后一次运行的DAG的状态')  
        total_dag_runs: int = Field(description='总DAG运行次数')  

    model = VertexAIModel(  
        model_name='gemini-2.0-flash-exp',  
        service_account_file='gcp-credentials.json'  
    )  

    airflow_agent = Agent(  
        model=model,  
        system_prompt=(  
            '你是一个Airflow监控助手。对于每个请求:\n'  
            '1. 首先使用`list_dags`获取可用的DAG列表\n'  
            '2. 将用户请求与最相关的DAG ID匹配\n'  
            '3. 使用`get_dag_status`获取DAG的状态详情'  
        ),  
        result_type=DAGStatus,  
        deps_type=Deps,  
        retries=2  
    )  

    @airflow_agent.tool  
    async def list_dags(ctx: RunContext[Deps]) -> str:  
        """  
        从Airflow实例获取所有DAG的列表。返回DAG的ID和显示名称。  
        """  
        logger.info('正在获取可用的DAG列表...')  
        uri = f'{ctx.deps.airflow_api_base_uri}:{ctx.deps.airflow_api_port}/api/v1/dags'  
        auth = (ctx.deps.airflow_api_user, ctx.deps_airflow_api_pass)  

        async with AsyncClient() as client:  
            response = await client.get(uri, auth=auth)  
            response.raise_for_status()  

            dags_data = response.json()['dags']  
            result = json.dumps([  
                {'dag_id': dag['dag_id'], 'dag_display_name': dag['dag_display_name']} for dag in dags_data  
            ])  
            logger.debug(f'可用的DAG列表:{result}')  
            return result  

    @airflow_agent.tool  
    async def get_dag_status(ctx: RunContext[Deps], dag_id: str) -> str:  
        """  
        通过DAG ID获取特定DAG的详细状态。  
        """  
        logger.info(f'正在获取ID为 {dag_id} 的DAG的状态...')  
        base_url = f'{ctx.deps.airflow_api_base_uri}:{ctx.deps_airflow_api_port}/api/v1'  
        auth = (ctx.deps_airflow_api_user, ctx.deps_airflow_api_pass)  

        try:  
            async with AsyncClient() as client:  
                dag_response = await client.get(f'{base_url}/dags/{dag_id}', auth=auth)  
                dag_response.raise_for_status()  

                runs_response = await client.get(  
                    f'{base_url}/dags/{dag_id}/dagRuns',  
                    auth=auth,  
                    params={'order_by': '-execution_date', 'limit': 1}  
                )  
                runs_response.raise_for_status()  

                result = {  
                    'dag_data': dag_response.json(),  
                    'runs_data': runs_response.json()  
                }  

                logger.debug(f'DAG状态:{json.dumps(result)}')  
                return json.dumps(result)  

        except httpx.HTTPStatusError as e:  
            if e.response.status_code == 404:  
                return f'未找到ID为 {dag_id} 的DAG,可能已经删除或不存在。'  
            raise  

    async def main():  
        deps = Deps(  
            airflow_api_base_uri='http://localhost',  
            airflow_api_port=8080,  
            airflow_api_user='admin',  
            airflow_api_pass='admin'  
        )  

        user_request = '我们每日付款报告的DAG状态如何?'  
        result = await airflow_agent.run(user_request, deps=deps)  
        pprint(result.data)  

    if __name__ == "__main__":  
        asyncio.run(main())

我发现至关重要的是,要对智能代理在开发过程中如何调用工具和LLM保持高度透明。在上面的例子中,我们使用了日志记录。不过,我强烈建议您试一下Logfire日志服务集成,这样更符合中文习惯,同时也更具体地说明了Logfire的功能。

示例

让魔法开始吧。我们来用下面的用户问题运行代理吧。

我们每日支付报告的DAG(有向无环图)状态怎么样?

记住,我们定义了两个DAG:payment_reportcustomer_profile。在上面提到的问题中,我们没有使用确切的DAG ID(如定义的那样);代理需要自行确定它们的ID。让我们通过查看输出来看看它如何处理我们的请求。

(.venv) ~/projects/pydantic-airflow-agent  
poetry run python pydantic_airflow_agent/agent.py  
[2024-12-23 14:49:05,127][INFO][httpx] HTTP 请求:POST https://us-central1-aiplatform.googleapis.com/v1/projects/vojay-329716/locations/us-central1/publishers/google/models/gemini-2.0-flash-exp:generateContent "HTTP/1.1 200 OK"  
[2024-12-23 14:49:05,132][INFO][__main__] 获取可用的 DAG 列表...  
[2024-12-23 14:49:05,241][INFO][httpx] HTTP 请求:GET http://localhost:8080/api/v1/dags "HTTP/1.1 200 OK"  
[2024-12-23 14:49:06,640][INFO][httpx] HTTP 请求:POST https://us-central1-aiplatform.googleapis.com/v1/projects/vojay-329716/locations/us-central1/publishers/google/models/gemini-2.0-flash-exp:generateContent "HTTP/1.1 200 OK"  
[2024-12-23 14:49:06,643][INFO][__main__] 获取具有 ID 为 payment_report 的 DAG 状态...  
[2024-12-23 14:49:06,721][INFO][httpx] HTTP 请求:GET http://localhost:8080/api/v1/dags/payment_report "HTTP/1.1 200 OK"  
[2024-12-23 14:49:06,798][INFO][httpx] HTTP 请求:GET http://localhost:8080/api/v1/dags/payment_report/dagRuns?order_by=-execution_date&limit=1 "HTTP/1.1 200 OK"  
[2024-12-23 14:49:09,915][INFO][httpx] HTTP 请求:POST https://us-central1-aiplatform.googleapis.com/v1/projects/vojay-329716/locations/us-central1/publishers/google/models/gemini-2.0-flash-exp:generateContent "HTTP/1.1 200 OK"  

DAGStatus(  
    dag_id='payment_report',  
    dag_display_name='payment_report',  
    is_paused=False,  
    next_dag_run_data_interval_start='2024-12-23T00:00:00+00:00',  
    next_dag_run_data_interval_end='2024-12-24T00:00:00+00:00',  
    last_dag_run_id='scheduled__2024-12-22T00:00:00+00:00',  
    last_dag_run_state='success',  
    total_dag_runs=22,  
)

如你所知,它从开始获取可用的DAGs(有向无环图)。

2024-12-23 14:49:05,132 [INFO], [__main__] 正在获取可用的DAGs...

然后选择了最适合用户输入的DAG图,并利用其他工具来获取详细信息。

2024-12-23 14:49:06,643 [INFO] [__main__] 获取 'payment_report' ID 对应的 DAG 的状态

最后使用结构化的输出结果返回一个 DAGStatus

由 PydanticAI 驅動的 Airflow AI 代理的實際運轉情況,(作者提供)

说实话,当我第一次成功运行这个时,我感到非常惊讶和震撼。这个强大的原型将简洁性与类型安全的集成结合在一起,我立刻被它吸引住。我开始思考如何利用这样的代理来创造价值,例如。想象有人在数据团队的Slack频道中问为什么某个报告没有更新。这样的代理可以自主地找到相关的DAG并与用户对话,而数据工程团队则可以安心地享受他们的咖啡时间。

从盲目飞行到晴朗的天空

这篇文章表明,构建生产级别的AI应用并不需要感觉像在风暴中航行。有了正确的工具和框架,构建这样的AI应用可以像现在的航空旅行一样——既强大又安全可靠。

人工智能的发展领域正在迅速演变,但像 PydanticAI 这样的框架为我们提供了稳固的基石。就像现代航空在确保安全和可靠的同时不断发展,PydanticAI 同样为未来的创新铺平了道路,而不以稳定性为代价。

请关注 PydanticAI 项目,它很值得关注。凭借其背后出色的团队以及优雅的设计框架,我相信它的潜力才刚刚开始展现。

无论是为了Airflow监控、客服支持,还是其他任何用例构建AI代理,请记住:你不再需要摸索前行。工具已经准备好,操作简单直观,起飞的最佳时机已经到来。

现在,如果你们允许,我要去跟一些人聊聊DAG了! &#128516;

觉得这篇文章不错吗?&#129782;
  • &#128079; 如果你觉得这篇帖子有价值,就给它点鼓励的掌声,最多可以拍50次
  • &#128173; 在下方评论区分享你的想法——我很想听听你的想法
  • ✨ 把最喜欢的观点标出来,以后方便回顾

    &#128591; 你的支持对我来说非常重要,它帮助更多的人看到这份内容。

这篇关于用PydanticAI和Gemini 2.0构建Airflow的AI助手的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!