Cómo Validar la Calidad de Datos en Spark: Guía Práctica
Cómo Validar la Calidad de Datos en Spark: Guía Práctica
En el mundo del Big Data, garantizar la calidad de nuestros datos es tan importante como procesarlos. Hoy te mostraré una técnica efectiva para validar DataFrames en Spark que implementé recientemente en un proyecto real.
El Problema
Cuando trabajamos con pipelines de datos, es común encontrarnos con:
Datasets vacíos o mal formados
Columnas faltantes que son críticas para nuestro análisis
Filas completamente vacías o con valores nulos en campos importantes
public static Optional<String> processColumns(Dataset<Row> df, SparkSession spark, String outputPath) {
if (df == null || df.columns().length == 0) {
return Optional.of("El dataset está vacío o no tiene columnas.");
}
List<String> expectedColumns = Arrays.asList(
"usuario"
);
Set<String> actualColumnSet = new HashSet<>(Arrays.asList(df.columns()));
List<String> missingColumns = expectedColumns.stream()
.filter(col -> !actualColumnSet.contains(col))
.collect(Collectors.toList());
if (!missingColumns.isEmpty()) {
String mensaje = "Columnas faltantes: " + String.join(", ", missingColumns);
createDataset(spark, mensaje)
.write()
.mode(SaveMode.Overwrite)
.json(outputPath + "/error");
return Optional.of(mensaje);
}
// Validar si el Dataset está vacío (sin filas)
if (df.isEmpty()) {
String mensaje = "El dataset no contiene registros.";
createDataset(spark, mensaje)
.write()
.mode(SaveMode.Overwrite)
.json(outputPath + "/error");
return Optional.of(mensaje);
}
// Validar si hay filas completamente vacías (todos los campos null)
long emptyRows = df.filter((Row row) -> {
for (String col : df.columns()) {
if (!row.isNullAt(row.fieldIndex(col))) {
return false;
}
}
return true;
}, Encoders.bean(Row.class)).count();
if (emptyRows > 0) {
String mensaje = "Existen " + emptyRows + " filas completamente vacías.";
createDataset(spark, mensaje)
.write()
.mode(SaveMode.Overwrite)
.json(outputPath + "/error");
return Optional.of(mensaje);
}
// Validar si hay campos requeridos nulos
List<String> camposCriticos = Arrays.asList("usuario");
Column anyNull = camposCriticos.stream()
.map(c -> functions.col(c).isNull())
.reduce(functions.lit(false), Column::or);
long nullCampos = df.filter(anyNull).count();
if (nullCampos > 0) {
String mensaje = "Existen " + nullCampos + " filas con campos requeridos nulos.";
createDataset(spark, mensaje)
.write()
.mode(SaveMode.Overwrite)
.json(outputPath + "/error");
return Optional.of(mensaje);
}
return Optional.empty(); // Todo bien
}
Comentarios
Publicar un comentario