En PySpark, la función aggregate()
permite realizar operaciones de agregación personalizadas sobre un RDD (Resilient Distributed Dataset). A diferencia de funciones como reduce()
, aggregate()
proporciona mayor control, permitiendo definir valores iniciales y funciones separadas para combinaciones locales y globales.
aggregate()
rdd.aggregate(zeroValue, seqOp, combOp)
1. zeroValue
: Valor inicial de la agregación (puede ser 0, una lista vacía, etc.).
2. seqOp
: Función que se aplica a cada partición del RDD de forma local.
3. combOp
: Función que combina los resultados parciales de cada partición.
from pyspark.sql import SparkSession
# Inicializar Spark
spark = SparkSession.builder.appName("EjemploAggregate").getOrCreate()
sc = spark.sparkContext
# Crear un RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])
# Sumar todos los elementos usando aggregate
resultado = rdd.aggregate(0, lambda acc, x: acc + x, lambda acc1, acc2: acc1 + acc2)
print(resultado) # Salida: 15
Explicación:
zeroValue = 0
: Valor inicial de la suma.
lambda acc, x: acc + x
: Suma los valores dentro de cada partición.
lambda acc1, acc2: acc1 + acc2
: Suma los resultados de cada partición.
rdd = sc.parallelize(["Hola", "Spark", "Python"])
resultado = rdd.aggregate(0, lambda acc, x: acc + len(x), lambda acc1, acc2: acc1 + acc2)
print(resultado) # Salida: 14 (4+5+6)
aggregate()
Podemos calcular el promedio sin usar mean()
, devolviendo una tupla (suma, contador)
.
datos = sc.parallelize([1, 2, 3, 4, 5])
promedio = datos.aggregate(
(0, 0),
lambda acc, x: (acc[0] + x, acc[1] + 1),
lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])
)
resultado = promedio[0] / promedio[1]
print(resultado) # Salida: 3.0
Explicación:
zeroValue = (0, 0)
: Inicializa (suma, contador)
.
seqOp
: Suma valores y cuenta elementos en cada partición.
combOp
: Suma resultados de las particiones.
Jorge García
Fullstack developer