Saltar a contenido

Data Engineer - Guía de Entorno

Guía para configurar el entorno de Data Engineering con Claude Code.

Resumen de Capacidades

Capacidad Herramientas
Orchestration Apache Airflow, Dagster, Prefect
Processing Apache Spark, dbt, Polars
Streaming Apache Kafka, Flink, Pulsar
Storage Delta Lake, Iceberg, Parquet
Warehouses Snowflake, BigQuery, Redshift
Quality Great Expectations, dbt tests

Python Environment

# Instalar Python
brew install python@3.12  # macOS
winget install Python.Python.3.12  # Windows

# Gestor de paquetes
pip install uv
# o
pip install poetry

# Virtual environment
uv venv
source .venv/bin/activate
# o
python -m venv .venv

Apache Airflow

Instalación Local

# Con pip
pip install apache-airflow

# Inicializar
export AIRFLOW_HOME=~/airflow
airflow db init
airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin

# Ejecutar
airflow webserver -p 8080 &
airflow scheduler &

Docker Compose

# docker-compose.airflow.yml
version: '3.8'
x-airflow-common:
  &airflow-common
  image: apache/airflow:2.8.0
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins

services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    depends_on:
      - postgres

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    depends_on:
      - postgres

DAG Ejemplo

# dags/example_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['data@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'example_etl',
    default_args=default_args,
    description='ETL pipeline example',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'example'],
) as dag:

    def extract_data(**context):
        # Extract logic
        return {'records': 1000}

    def transform_data(**context):
        ti = context['ti']
        data = ti.xcom_pull(task_ids='extract')
        # Transform logic
        return {'transformed': data['records']}

    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )

    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )

    load = PostgresOperator(
        task_id='load',
        postgres_conn_id='postgres_default',
        sql='INSERT INTO processed SELECT * FROM staging;',
    )

    extract >> transform >> load

dbt (Data Build Tool)

Instalación

pip install dbt-core dbt-postgres dbt-snowflake dbt-bigquery

# Verificar
dbt --version

Proyecto dbt

# Crear proyecto
dbt init my_project

# Estructura
my_project/
├── dbt_project.yml
├── profiles.yml
├── models/
   ├── staging/
      ├── stg_orders.sql
      └── stg_customers.sql
   ├── marts/
      └── dim_customers.sql
   └── schema.yml
├── seeds/
├── snapshots/
├── macros/
└── tests/

Modelo dbt

-- models/marts/dim_customers.sql
{{ config(
    materialized='table',
    schema='analytics'
) }}

with customers as (
    select * from {{ ref('stg_customers') }}
),

orders as (
    select
        customer_id,
        count(*) as total_orders,
        sum(amount) as total_spent,
        min(order_date) as first_order,
        max(order_date) as last_order
    from {{ ref('stg_orders') }}
    group by customer_id
)

select
    c.customer_id,
    c.customer_name,
    c.email,
    c.created_at,
    coalesce(o.total_orders, 0) as total_orders,
    coalesce(o.total_spent, 0) as total_spent,
    o.first_order,
    o.last_order,
    case
        when o.total_spent > 1000 then 'premium'
        when o.total_spent > 100 then 'regular'
        else 'new'
    end as customer_tier
from customers c
left join orders o on c.customer_id = o.customer_id

Comandos dbt

dbt debug              # Verificar conexión
dbt run                # Ejecutar modelos
dbt test               # Ejecutar tests
dbt build              # run + test
dbt docs generate      # Generar documentación
dbt docs serve         # Servir docs

# Selección
dbt run --select staging.*
dbt run --select +dim_customers
dbt run --select tag:daily

Apache Spark

PySpark Local

pip install pyspark

# Verificar
pyspark --version
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Crear sesión
spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Leer datos
df = spark.read.parquet("data/events/")

# Transformaciones
result = df \
    .filter(F.col("event_type") == "purchase") \
    .groupBy("user_id", F.date_trunc("day", "timestamp").alias("date")) \
    .agg(
        F.count("*").alias("purchases"),
        F.sum("amount").alias("total_amount")
    ) \
    .orderBy("date", "total_amount", ascending=[True, False])

# Escribir
result.write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet("output/daily_purchases/")

spark.stop()

Spark con Delta Lake

# pip install delta-spark
from delta import configure_spark_with_delta_pip

builder = SparkSession.builder \
    .appName("DeltaApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# Escribir Delta
df.write.format("delta").save("/data/delta/events")

# Time travel
spark.read.format("delta").option("versionAsOf", 0).load("/data/delta/events")

Polars (Alternativa moderna a Pandas)

pip install polars
import polars as pl

# Leer datos
df = pl.read_parquet("data/*.parquet")

# Lazy evaluation
lazy_df = pl.scan_parquet("data/*.parquet")

result = lazy_df \
    .filter(pl.col("status") == "active") \
    .group_by("category") \
    .agg([
        pl.count().alias("count"),
        pl.col("amount").sum().alias("total"),
        pl.col("amount").mean().alias("avg")
    ]) \
    .sort("total", descending=True) \
    .collect()  # Ejecutar

Apache Kafka

Docker

# docker-compose.kafka.yml
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

Python Producer/Consumer

from confluent_kafka import Producer, Consumer

# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_report(err, msg):
    if err:
        print(f'Delivery failed: {err}')

producer.produce('my-topic', key='key', value='message', callback=delivery_report)
producer.flush()

# Consumer
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
})

consumer.subscribe(['my-topic'])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    print(f'Received: {msg.value().decode("utf-8")}')

Data Quality (Great Expectations)

pip install great_expectations
import great_expectations as gx

# Crear contexto
context = gx.get_context()

# Crear expectation suite
suite = context.add_expectation_suite("my_suite")

# Agregar expectations
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name="my_suite"
)

validator.expect_column_values_to_not_be_null("user_id")
validator.expect_column_values_to_be_unique("email")
validator.expect_column_values_to_be_between("age", min_value=0, max_value=120)

# Validar
results = validator.validate()

Comandos que Claude Code Ejecutará

# Airflow
airflow dags list
airflow dags trigger my_dag
airflow tasks test my_dag my_task 2024-01-01

# dbt
dbt run
dbt test
dbt build --select staging.*
dbt docs generate && dbt docs serve

# Spark
spark-submit app.py
pyspark

# Kafka
kafka-topics --list --bootstrap-server localhost:9092
kafka-console-producer --topic my-topic --bootstrap-server localhost:9092
kafka-console-consumer --topic my-topic --from-beginning --bootstrap-server localhost:9092

# Great Expectations
great_expectations init
great_expectations suite new
great_expectations checkpoint run my_checkpoint

VS Code Extensions

code --install-extension ms-python.python
code --install-extension ms-toolsai.jupyter
code --install-extension innoverio.vscode-dbt-power-user
code --install-extension BasedEngineers.jinja-htmlsql

Verificación del Entorno

#!/bin/bash
echo "=== Verificación Entorno Data Engineer ==="

echo -e "\n--- Python ---"
python --version
pip show pyspark polars dbt-core great_expectations 2>/dev/null | grep -E "Name|Version" || echo "Paquetes no instalados"

echo -e "\n--- Airflow ---"
airflow version 2>/dev/null || echo "Airflow no instalado"

echo -e "\n--- dbt ---"
dbt --version 2>/dev/null || echo "dbt no instalado"

echo -e "\n--- Spark ---"
spark-submit --version 2>/dev/null | head -3 || echo "Spark no instalado"

echo -e "\n=== Verificación Completa ==="

Recursos