Как извлечь и сохранить ежедневные цены на бензин с помощью парсинга и Airflow
В эпоху больших данных умение извлекать, преобразовывать и сохранять данные из веб-ресурсов в различные системы хранения, такие как MongoDB, PostgreSQL, MinIO и Elasticsearch, является важным навыком для разработчиков и специалистов по обработке данных. Возможность предоставления этих данных через API для использования в приложениях фронтенда делает этот навык еще более ценным. В этой статье будет показано, как это сделать с использованием Python, Airflow и FastAPI.
Мы создадим систему, которая будет парсить веб-ресурсы для получения самых высоких и самых низких цен на бензин за день, сохранять эти данные в различных базах данных, а затем предоставлять эти данные через API. API можно использовать в приложении React или любом другом фреймворке фронтенда.
Для этого примера мы будем использовать следующую структуру данных:
{
"highest_price": {
"price": 163.9,
"station": "Esso Du Commerce / René Lévesque",
"city": "Verdun ( Île des Soeurs )"
},
"lowest_price": {
"price": 154.4,
"station": "Costco 2999 Aut.440 Laval/Ave. Jacques-Bureau",
"city": "Laval"
}
}
Парсинг веб-страницы
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airflow.providers.mongo.hooks.mongo import MongoHook
import requests
from bs4 import BeautifulSoup
from datetime import date
# Константы
DAG_ID = 'gas_prices_load_mongodb_dag'
MONGO_CONN_ID = 'mongodb_default' # Замените на ваш идентификатор подключения к MongoDB
def парсить_цены_на_бензин():
url = "https://www.essencemontreal.com/prices.php?l=f&prov=QC&city=Montreal"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
цены = soup.find_all('td', {'class': ['greencell', 'redcell', 'pricecell']})
станции = soup.find_all('td', {'class': 'stationcell'})
города = soup.find_all('td', {'class': 'citycell'})
времена_пользователей = soup.find_all('td', {'class': 'usercell'})
цены_на_бензин = []
for цена, станция, город, время_пользователя in zip(цены, станции, города, времена_пользователей):
газовая_станция = " ".join(станция.stripped_strings)
газовый_город = " ".join(город.stripped_strings)
газовая_цена = " ".join(цена.stripped_strings)
время_пользователя_газа = " ".join(время_пользователя.stripped_strings)
# Разделение времени_пользователя_газа на время и пользователя
время_газа, *пользователь_газа = время_пользователя_газа.split(maxsplit=1)
пользователь_газа = ' '.join(пользователь_газа)
# Получение текущей даты
сегодня = date.today()
# Добавление текущей даты к информации о цене на бензин
цены_на_бензин.append((газовая_цена, газовая_станция, газовый_город, время_газа, пользователь_газа, str(сегодня)))
return цены_на_бензин
def сохранить_в_mongodb(**context):
цены_на_бензин = context['task_instance'].xcom_pull(task_ids='парсить_цены_на_бензин_task')
mongo_hook = MongoHook(conn_id=MONGO_CONN_ID)
mongo_client = mongo_hook.get_conn()
# Замените 'gas_prices_db' и 'gas_prices' на названия вашей базы данных и коллекции
коллекция = mongo_client['gas_prices_db']['gas_prices']
for информация_о_цене in цены_на_бензин:
документ = {
'цена': информация_о_цене[0],
'станция': информация_о_цене[1],
'город': информация_о_цене[2],
'время': информация_о_цене[3],
'пользователь': информация_о_цене[4],
'дата': datetime.strptime(информация_о_цене[5], '%Y-%m-%d'),
}
коллекция.insert_one(документ)
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 0,
}
dag = DAG(
DAG_ID,
default_args=default_args,
description='DAG для парсинга цен на бензин и сохранения в MongoDB',
schedule_interval='@daily',
)
парсить_цены_на_бензин_task = PythonOperator(
task_id='парсить_цены_на_бензин_task',
python_callable=парсить_цены_на_бензин,
dag=dag,
)
сохранить_в_mongodb_task = PythonOperator(
task_id='сохранить_в_mongodb_task',
python_callable=сохранить_в_mongodb,
provide_context=True,
dag=dag,
)
парсить_цены_на_бензин_task >> сохранить_в_mongodb_task
API:
from typing import Dict, Any
from fastapi import FastAPI
from pydantic import BaseModel, Field
from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime, timedelta
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
class PyObjectId(ObjectId):
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, v):
if not ObjectId.is_valid(v):
raise ValueError('Неверный objectid')
return ObjectId(v)
@classmethod
def __modify_schema__(cls, field_schema):
field_schema.update(type='string')
class DBModel(BaseModel):
id: PyObjectId = Field(default_factory=PyObjectId, alias='_id')
class Config:
arbitrary_types_allowed = True
json_encoders = {
ObjectId: str
}
allow_population_by_field_name = True
class GasPriceModel(BaseModel):
price: float
station: str
city: str
app = FastAPI()
origins = [
"http://localhost:3000", # React приложение
"http://localhost:8000", # FastAPI сервер
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def get_db_connection():
client = MongoClient(
host="172.23.0.2", # замените на ваш хост MongoDB
username="root", # замените на ваше имя пользователя MongoDB
password="example" # замените на ваш пароль MongoDB
)
db = client["gas_prices_db"]
return db
@app.get("/prices", response_model=Dict[str, GasPriceModel])
def get_prices() -> Dict[str, Any]:
db = get_db_connection()
# Получение текущей даты и даты следующего дня
today = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
next_day = today + timedelta(days=1)
# Выполнение запроса для поиска строки с наибольшей ценой на сегодняшний день
highest_price_row = db.gas_prices.find_one({"date": {"$gte": today, "$lt": next_day}}, sort=[("price", -1)])
# Выполнение запроса для поиска строки с наименьшей ценой на сегодняшний день
lowest_price_row = db.gas_prices.find_one({"date": {"$gte": today, "$lt": next_day}}, sort=[("price", 1)])
return {"highest_price": GasPriceModel(**highest_price_row), "lowest_price": GasPriceModel(**lowest_price_row)}
# Если вы хотите запустить сервер с помощью скрипта python, раскомментируйте следующие строки
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
Front-End-React:
import React, { useState, useEffect } from 'react';
import axios from 'axios';
import { FaMoneyBillAlt, FaBuilding, FaCity } from 'react-icons/fa';
const HomePage = () => {
const [data, setData] = useState({ highest_price: {}, lowest_price: {} });
useEffect(() => {
const fetchData = async () => {
const result = await axios('http://localhost:8000/prices');
setData(result.data);
};
fetchData();
}, []);
return (
<div className="container">
<div className="price-card">
<h2>Самая высокая цена</h2>
<p><FaMoneyBillAlt /> Цена: {data.highest_price.price} $</p>
<p><FaBuilding /> Станция: {data.highest_price.station}</p>
<p><FaCity /> Город: {data.highest_price.city}</p>
</div>
<div className="price-card">
<h2>Самая низкая цена</h2>
<p><FaMoneyBillAlt /> Цена: {data.lowest_price.price} $</p>
<p><FaBuilding /> Станция: {data.lowest_price.station}</p>
<p><FaCity /> Город: {data.lowest_price.city}</p>
</div>
</div>
);
}
export default HomePage;