Castear Tipo de Dato en Spark
// Ya tienes
Dataset<?> datos;
StructType schema;
Class<Tabla> dtoClass;
// 1. Casteamos las columnas
Dataset<Row> datosCasteados = DatasetUtils.castearSegunSchema((Dataset<Row>) datos, schema);
// 2. Mapear al DTO
Dataset<Tabla> datosTyped = datosCasteados.as(Encoders.bean(dtoClass));
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class DatasetUtils {
/**
* Castea las columnas de un Dataset<Row> según un esquema dado (StructType).
*
* @param dataset Dataset original, donde las columnas pueden ser StringType.
* @param schema StructType con los tipos esperados.
* @return Dataset<Row> con las columnas casteadas a los tipos correctos.
*/
public static Dataset<Row> castearSegunSchema(Dataset<Row> dataset, StructType schema) {
if (dataset == null || schema == null) {
throw new IllegalArgumentException("Dataset y Schema no pueden ser null.");
}
Dataset<Row> result = dataset;
for (StructField field : schema.fields()) {
String nombreColumna = field.name();
String tipoSpark = field.dataType().typeName();
// Solo castear si la columna existe
if (dataset.columns() != null && java.util.Arrays.asList(dataset.columns()).contains(nombreColumna)) {
result = result.withColumn(nombreColumna, functions.col(nombreColumna).cast(tipoSpark));
} else {
System.out.println("⚠️ Advertencia: columna [" + nombreColumna + "] no encontrada en dataset. Se omite.");
}
}
return result;
}
}
Comentarios
Publicar un comentario