CobolParser Para Spark
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.*;
public class CobolParser {
public static Dataset<Row> parseCobolFromLeft(SparkSession spark, String path) {
Dataset<Row> df = spark.read().text(path)
.withColumn("line_length", length(col("value")));
// Extraer campos desde la izquierda
df = df.withColumn("id",
when(col("line_length").geq(6),
substring(col("value"), 1, 6))
.otherwise(lit(null)))
.withColumn("producto",
when(col("line_length").geq(8),
substring(col("value"), 7, 2))
.otherwise(lit(null)))
.withColumn("precio",
when(col("line_length").geq(10),
substring(col("value"), 9, 2))
.otherwise(lit(null)))
.withColumn("cantidad",
when(col("line_length").geq(21),
substring(col("value"), 11, 11))
.otherwise(lit(null)))
.withColumn("estado",
when(col("line_length").geq(26),
substring(col("value"), 22, 5))
.otherwise(lit(null)));
// Bandera para líneas incompletas
df = df.withColumn("esNulo", col("line_length").lt(26));
// Calcular importe final
Column importeEntero = col("cantidad").cast("double");
Column importeDecimal = col("decimales").cast("double").divide(lit(100000));
df = df.withColumn("importe",
when(col("cantidad").isNotNull().and(col("estado").isNotNull()),
importeEntero.plus(importeDecimal))
.otherwise(lit(null)));
// Reemplazar nulls por "" (para exportación o UI)
df = df.withColumn("id", when(col("fecha").isNull(), lit("")).otherwise(col("fecha")))
.withColumn("producto", when(col("moneda").isNull(), lit("")).otherwise(col("moneda")))
.withColumn("precio", when(col("tipoCambio").isNull(), lit("")).otherwise(col("tipoCambio")))
.withColumn("cantidad", when(col("importeRaw").isNull(), lit("")).otherwise(col("importeRaw")))
.withColumn("estado", when(col("decimales").isNull(), lit("")).otherwise(col("decimales")));
return df.drop("value", "line_length");
}
}
public static Dataset<Row> parseCobolSmart(SparkSession spark, String path) {
Dataset<Row> df = spark.read().text(path)
.withColumn("line_length", length(col("value")));
// Desde el final
df = df
.withColumn("decimales", expr("substring(value, length(value) - 4, 5)"))
.withColumn("importeRaw", expr("substring(value, length(value) - 15, 11)"))
.withColumn("tipoCambio", expr("substring(value, length(value) - 17, 2)"))
.withColumn("monedaMaybe", expr("substring(value, length(value) - 20, 3)"))
.withColumn("fechaMaybe", expr("substring(value, 1, length(value) - 23)"));
df = df.withColumn("producto", substring(col("value"), col("line_length").minus(4), 5))
.withColumn("precio", substring(col("value"), col("line_length").minus(15), 11))
.withColumn("cambio", substring(col("value"), col("line_length").minus(17), 2))
.withColumn("monedaMaybe", substring(col("value"), col("line_length").minus(20), 3))
.withColumn("fechMaybe", substring(col("value"), lit(1), col("line_length").minus(23)));
// Validaciones
df = df.withColumn("moneda", when(col("monedaMaybe").rlike("^[A-Z]{3}$"), col("monedaMaybe")))
.withColumn("fech", when(col("fechMaybe").rlike("^\\d{6}$"), col("fechMaybe")))
.withColumn("cambio", when(col("cambio").rlike("^\\d{2}$"), col("cambio")));
// Cálculo del importe (solo si precio y producto son numéricos)
Column importe = when(
col("precio").rlike("^\\d{1,11}$").and(col("producto").rlike("^\\d{1,5}$")),
col("precio").cast("double").plus(
col("producto").cast("double").divide(lit(100000))
)
).otherwise(lit(null));
df = df.withColumn("importe", importe);
return df.select("fech", "moneda", "cambio", "precio", "producto", "importe");
}
Comentarios
Publicar un comentario