Cómo Validar la Calidad de Datos en Spark: Guía Práctica

 Cómo Validar la Calidad de Datos en Spark: Guía Práctica

En el mundo del Big Data, garantizar la calidad de nuestros datos es tan importante como procesarlos. Hoy te mostraré una técnica efectiva para validar DataFrames en Spark que implementé recientemente en un proyecto real.


El Problema

Cuando trabajamos con pipelines de datos, es común encontrarnos con:


Datasets vacíos o mal formados


Columnas faltantes que son críticas para nuestro análisis


Filas completamente vacías o con valores nulos en campos importantes



public static Optional<String> processColumns(Dataset<Row> df, SparkSession spark, String outputPath) {

    if (df == null || df.columns().length == 0) {

        return Optional.of("El dataset está vacío o no tiene columnas.");

    }


    List<String> expectedColumns = Arrays.asList(

            "usuario"

    );


    Set<String> actualColumnSet = new HashSet<>(Arrays.asList(df.columns()));

    List<String> missingColumns = expectedColumns.stream()

            .filter(col -> !actualColumnSet.contains(col))

            .collect(Collectors.toList());


    if (!missingColumns.isEmpty()) {

        String mensaje = "Columnas faltantes: " + String.join(", ", missingColumns);

        createDataset(spark, mensaje)

                .write()

                .mode(SaveMode.Overwrite)

                .json(outputPath + "/error");

        return Optional.of(mensaje);

    }


    // Validar si el Dataset está vacío (sin filas)

    if (df.isEmpty()) {

        String mensaje = "El dataset no contiene registros.";

        createDataset(spark, mensaje)

                .write()

                .mode(SaveMode.Overwrite)

                .json(outputPath + "/error");

        return Optional.of(mensaje);

    }


    // Validar si hay filas completamente vacías (todos los campos null)

    long emptyRows = df.filter((Row row) -> {

        for (String col : df.columns()) {

            if (!row.isNullAt(row.fieldIndex(col))) {

                return false;

            }

        }

        return true;

    }, Encoders.bean(Row.class)).count();


    if (emptyRows > 0) {

        String mensaje = "Existen " + emptyRows + " filas completamente vacías.";

        createDataset(spark, mensaje)

                .write()

                .mode(SaveMode.Overwrite)

                .json(outputPath + "/error");

        return Optional.of(mensaje);

    }


    // Validar si hay campos requeridos nulos

    List<String> camposCriticos = Arrays.asList("usuario");


    Column anyNull = camposCriticos.stream()

            .map(c -> functions.col(c).isNull())

            .reduce(functions.lit(false), Column::or);


    long nullCampos = df.filter(anyNull).count();


    if (nullCampos > 0) {

        String mensaje = "Existen " + nullCampos + " filas con campos requeridos nulos.";

        createDataset(spark, mensaje)

                .write()

                .mode(SaveMode.Overwrite)

                .json(outputPath + "/error");

        return Optional.of(mensaje);

    }


    return Optional.empty(); // Todo bien

}




private static String buildNullRecordsErrorMessage(Dataset<Row> enrichedNullRecords, long totalNullCount) {
    // Conteo por tipo de error
    long columnadosNullCount = enrichedNullRecords.filter(
        functions.col("columnados").isNull()
    ).count();
    
    long columnaunoNullCount = enrichedNullRecords.filter(
        functions.col("columnauno").isNull()
    ).count();
    
    long multiNullCount = enrichedNullRecords.filter(
        functions.col("nombre").isNull()
        .or(functions.col("columnauno").isNull())
        .and(functions.col("columnados").isNull())
    ).count();
    
    return String.format(
        "Se encontraron %d registros con campos nulos:\n" +
        "- %d con 'columnados' nulo\n" +
        "- %d con 'columnauno' nulo\n" +
        "- %d con múltiples campos nulos",
        totalNullCount, columnadosNullCount, columnaunoNullCount, multiNullCount
    );
}


Comentarios

Entradas populares de este blog

Pequeño server local

CobolParser Para Spark

iteratorSeguro