Data Engineer - Guía de Entorno¶
Guía para configurar el entorno de Data Engineering con Claude Code.
Resumen de Capacidades¶
| Capacidad | Herramientas |
|---|---|
| Orchestration | Apache Airflow, Dagster, Prefect |
| Processing | Apache Spark, dbt, Polars |
| Streaming | Apache Kafka, Flink, Pulsar |
| Storage | Delta Lake, Iceberg, Parquet |
| Warehouses | Snowflake, BigQuery, Redshift |
| Quality | Great Expectations, dbt tests |
Python Environment¶
# Instalar Python
brew install python@3.12 # macOS
winget install Python.Python.3.12 # Windows
# Gestor de paquetes
pip install uv
# o
pip install poetry
# Virtual environment
uv venv
source .venv/bin/activate
# o
python -m venv .venv
Apache Airflow¶
Instalación Local¶
# Con pip
pip install apache-airflow
# Inicializar
export AIRFLOW_HOME=~/airflow
airflow db init
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com \
--password admin
# Ejecutar
airflow webserver -p 8080 &
airflow scheduler &
Docker Compose¶
# docker-compose.airflow.yml
version: '3.8'
x-airflow-common:
&airflow-common
image: apache/airflow:2.8.0
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
depends_on:
- postgres
airflow-scheduler:
<<: *airflow-common
command: scheduler
depends_on:
- postgres
DAG Ejemplo¶
# dags/example_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email': ['data@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'example_etl',
default_args=default_args,
description='ETL pipeline example',
schedule_interval='0 6 * * *', # Daily at 6 AM
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['etl', 'example'],
) as dag:
def extract_data(**context):
# Extract logic
return {'records': 1000}
def transform_data(**context):
ti = context['ti']
data = ti.xcom_pull(task_ids='extract')
# Transform logic
return {'transformed': data['records']}
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
load = PostgresOperator(
task_id='load',
postgres_conn_id='postgres_default',
sql='INSERT INTO processed SELECT * FROM staging;',
)
extract >> transform >> load
dbt (Data Build Tool)¶
Instalación¶
Proyecto dbt¶
# Crear proyecto
dbt init my_project
# Estructura
my_project/
├── dbt_project.yml
├── profiles.yml
├── models/
│ ├── staging/
│ │ ├── stg_orders.sql
│ │ └── stg_customers.sql
│ ├── marts/
│ │ └── dim_customers.sql
│ └── schema.yml
├── seeds/
├── snapshots/
├── macros/
└── tests/
Modelo dbt¶
-- models/marts/dim_customers.sql
{{ config(
materialized='table',
schema='analytics'
) }}
with customers as (
select * from {{ ref('stg_customers') }}
),
orders as (
select
customer_id,
count(*) as total_orders,
sum(amount) as total_spent,
min(order_date) as first_order,
max(order_date) as last_order
from {{ ref('stg_orders') }}
group by customer_id
)
select
c.customer_id,
c.customer_name,
c.email,
c.created_at,
coalesce(o.total_orders, 0) as total_orders,
coalesce(o.total_spent, 0) as total_spent,
o.first_order,
o.last_order,
case
when o.total_spent > 1000 then 'premium'
when o.total_spent > 100 then 'regular'
else 'new'
end as customer_tier
from customers c
left join orders o on c.customer_id = o.customer_id
Comandos dbt¶
dbt debug # Verificar conexión
dbt run # Ejecutar modelos
dbt test # Ejecutar tests
dbt build # run + test
dbt docs generate # Generar documentación
dbt docs serve # Servir docs
# Selección
dbt run --select staging.*
dbt run --select +dim_customers
dbt run --select tag:daily
Apache Spark¶
PySpark Local¶
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Crear sesión
spark = SparkSession.builder \
.appName("MyApp") \
.config("spark.driver.memory", "4g") \
.getOrCreate()
# Leer datos
df = spark.read.parquet("data/events/")
# Transformaciones
result = df \
.filter(F.col("event_type") == "purchase") \
.groupBy("user_id", F.date_trunc("day", "timestamp").alias("date")) \
.agg(
F.count("*").alias("purchases"),
F.sum("amount").alias("total_amount")
) \
.orderBy("date", "total_amount", ascending=[True, False])
# Escribir
result.write \
.mode("overwrite") \
.partitionBy("date") \
.parquet("output/daily_purchases/")
spark.stop()
Spark con Delta Lake¶
# pip install delta-spark
from delta import configure_spark_with_delta_pip
builder = SparkSession.builder \
.appName("DeltaApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Escribir Delta
df.write.format("delta").save("/data/delta/events")
# Time travel
spark.read.format("delta").option("versionAsOf", 0).load("/data/delta/events")
Polars (Alternativa moderna a Pandas)¶
import polars as pl
# Leer datos
df = pl.read_parquet("data/*.parquet")
# Lazy evaluation
lazy_df = pl.scan_parquet("data/*.parquet")
result = lazy_df \
.filter(pl.col("status") == "active") \
.group_by("category") \
.agg([
pl.count().alias("count"),
pl.col("amount").sum().alias("total"),
pl.col("amount").mean().alias("avg")
]) \
.sort("total", descending=True) \
.collect() # Ejecutar
Apache Kafka¶
Docker¶
# docker-compose.kafka.yml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
kafka-ui:
image: provectuslabs/kafka-ui:latest
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
Python Producer/Consumer¶
from confluent_kafka import Producer, Consumer
# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
def delivery_report(err, msg):
if err:
print(f'Delivery failed: {err}')
producer.produce('my-topic', key='key', value='message', callback=delivery_report)
producer.flush()
# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'my-group',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['my-topic'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
print(f'Received: {msg.value().decode("utf-8")}')
Data Quality (Great Expectations)¶
import great_expectations as gx
# Crear contexto
context = gx.get_context()
# Crear expectation suite
suite = context.add_expectation_suite("my_suite")
# Agregar expectations
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name="my_suite"
)
validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_unique("email")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)
# Validar
results = validator.validate()
Comandos que Claude Code Ejecutará¶
# Airflow
airflow dags list
airflow dags trigger my_dag
airflow tasks test my_dag my_task 2024-01-01
# dbt
dbt run
dbt test
dbt build --select staging.*
dbt docs generate && dbt docs serve
# Spark
spark-submit app.py
pyspark
# Kafka
kafka-topics --list --bootstrap-server localhost:9092
kafka-console-producer --topic my-topic --bootstrap-server localhost:9092
kafka-console-consumer --topic my-topic --from-beginning --bootstrap-server localhost:9092
# Great Expectations
great_expectations init
great_expectations suite new
great_expectations checkpoint run my_checkpoint
VS Code Extensions¶
code --install-extension ms-python.python
code --install-extension ms-toolsai.jupyter
code --install-extension innoverio.vscode-dbt-power-user
code --install-extension BasedEngineers.jinja-htmlsql
Verificación del Entorno¶
#!/bin/bash
echo "=== Verificación Entorno Data Engineer ==="
echo -e "\n--- Python ---"
python --version
pip show pyspark polars dbt-core great_expectations 2>/dev/null | grep -E "Name|Version" || echo "Paquetes no instalados"
echo -e "\n--- Airflow ---"
airflow version 2>/dev/null || echo "Airflow no instalado"
echo -e "\n--- dbt ---"
dbt --version 2>/dev/null || echo "dbt no instalado"
echo -e "\n--- Spark ---"
spark-submit --version 2>/dev/null | head -3 || echo "Spark no instalado"
echo -e "\n=== Verificación Completa ==="