CobolParser Para Spark

 import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.SparkSession;

import static org.apache.spark.sql.functions.*;


public class CobolParser {


    public static Dataset<Row> parseCobolFromLeft(SparkSession spark, String path) {

        Dataset<Row> df = spark.read().text(path)

            .withColumn("line_length", length(col("value")));


        // Extraer campos desde la izquierda

        df = df.withColumn("id",

                    when(col("line_length").geq(6),

                        substring(col("value"), 1, 6))

                    .otherwise(lit(null)))

              .withColumn("producto",

                    when(col("line_length").geq(8),

                        substring(col("value"), 7, 2))

                    .otherwise(lit(null)))

              .withColumn("precio",

                    when(col("line_length").geq(10),

                        substring(col("value"), 9, 2))

                    .otherwise(lit(null)))

              .withColumn("cantidad",

                    when(col("line_length").geq(21),

                        substring(col("value"), 11, 11))

                    .otherwise(lit(null)))

              .withColumn("estado",

                    when(col("line_length").geq(26),

                        substring(col("value"), 22, 5))

                    .otherwise(lit(null)));


        // Bandera para líneas incompletas

        df = df.withColumn("esNulo", col("line_length").lt(26));


        // Calcular importe final

        Column importeEntero = col("cantidad").cast("double");

        Column importeDecimal = col("decimales").cast("double").divide(lit(100000));


        df = df.withColumn("importe",

                when(col("cantidad").isNotNull().and(col("estado").isNotNull()),

                    importeEntero.plus(importeDecimal))

                .otherwise(lit(null)));


        // Reemplazar nulls por "" (para exportación o UI)

        df = df.withColumn("id", when(col("fecha").isNull(), lit("")).otherwise(col("fecha")))

               .withColumn("producto", when(col("moneda").isNull(), lit("")).otherwise(col("moneda")))

               .withColumn("precio", when(col("tipoCambio").isNull(), lit("")).otherwise(col("tipoCambio")))

               .withColumn("cantidad", when(col("importeRaw").isNull(), lit("")).otherwise(col("importeRaw")))

               .withColumn("estado", when(col("decimales").isNull(), lit("")).otherwise(col("decimales")));


        return df.drop("value", "line_length");

    }

}



public static Dataset<Row> parseCobolSmart(SparkSession spark, String path) {

    Dataset<Row> df = spark.read().text(path)

        .withColumn("line_length", length(col("value")));


    // Desde el final


 df = df

        .withColumn("decimales", expr("substring(value, length(value) - 4, 5)"))

        .withColumn("importeRaw", expr("substring(value, length(value) - 15, 11)"))

        .withColumn("tipoCambio", expr("substring(value, length(value) - 17, 2)"))

        .withColumn("monedaMaybe", expr("substring(value, length(value) - 20, 3)"))

        .withColumn("fechaMaybe", expr("substring(value, 1, length(value) - 23)"));


    df = df.withColumn("producto", substring(col("value"), col("line_length").minus(4), 5))

          .withColumn("precio", substring(col("value"), col("line_length").minus(15), 11))

          .withColumn("cambio", substring(col("value"), col("line_length").minus(17), 2))

          .withColumn("monedaMaybe", substring(col("value"), col("line_length").minus(20), 3))

          .withColumn("fechMaybe", substring(col("value"), lit(1), col("line_length").minus(23)));


    // Validaciones

    df = df.withColumn("moneda", when(col("monedaMaybe").rlike("^[A-Z]{3}$"), col("monedaMaybe")))

          .withColumn("fech", when(col("fechMaybe").rlike("^\\d{6}$"), col("fechMaybe")))

          .withColumn("cambio", when(col("cambio").rlike("^\\d{2}$"), col("cambio")));


    // Cálculo del importe (solo si precio y producto son numéricos)

    Column importe = when(

        col("precio").rlike("^\\d{1,11}$").and(col("producto").rlike("^\\d{1,5}$")),

        col("precio").cast("double").plus(

            col("producto").cast("double").divide(lit(100000))

        )

    ).otherwise(lit(null));


    df = df.withColumn("importe", importe);


    return df.select("fech", "moneda", "cambio", "precio", "producto", "importe");

}



Comentarios

Entradas populares de este blog

Pequeño server local

iteratorSeguro