+-
使用scala在Spark DataFrame中添加新的行。

我有一个数据框架,像。

Name_Index  City_Index
  2.0         1.0
  0.0         2.0
  1.0         0.0

我有一个新的值列表。

list(1.0,1.0)

我想把这些值添加到数据框中的新行中,如果之前所有的行都被删除的话。

我的代码。

 val spark = SparkSession.builder
      .master("local[*]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()


    var data = spark.read.option("header", "true")
      .option("inferSchema", "true")
      .csv("src/main/resources/student.csv")

   val someDF = Seq(
         (1.0,1.0)
        ).toDF("Name_Index","City_Index")

   data=data.union(someDF).show()

它显示的输出是这样的

Name_Index  City_Index
  2.0          1.0
  0.0          2.0
  1.0          0.0
  1.1          1.1

但输出应该是这样的 这样,所有之前的行都被删除,新的值被添加。

Name_Index   City_Index
  1.0          1.0
0
投票

你可以使用 limit & union 函数来实现。

scala> val df = Seq((2.0,1.0),(0.0,2.0),(1.0,0.0)).toDF("name_index","city_index")
df: org.apache.spark.sql.DataFrame = [name_index: double, city_index: double]

scala> df.show(false)
+----------+----------+
|name_index|city_index|
+----------+----------+
|2.0       |1.0       |
|0.0       |2.0       |
|1.0       |0.0       |
+----------+----------+


scala> val ndf = Seq((1.0,1.0)).toDF("name_index","city_index")
ndf: org.apache.spark.sql.DataFrame = [name_index: double, city_index: double]

scala> ndf.show
+----------+----------+
|name_index|city_index|
+----------+----------+
|       1.0|       1.0|
+----------+----------+


scala> df.limit(0).union(ndf).show(false) // this is not good approach., you can directly call ndf.show
+----------+----------+
|name_index|city_index|
+----------+----------+
|1.0       |1.0       |
+----------+----------+

0
投票

将最后一行改为

data=data.except(data).union(someDF).show()
0
投票

你可以试试这个方法

data = data.filter(_ => false).union(someDF)

产出

+----------+----------+
|Name_Index|City_Index|
+----------+----------+
|1.0       |1.0       |
+----------+----------+

希望能给你一些启示。

问候。

0
投票

据我所知,你只需要源Dataframe的列列表。

如果你的序列与源Dataframe的列的顺序相同,你可以重用schema,而不需要实际查询源Dataframe。从性能上来说,它的速度会更快。

    val srcDf = Seq((2.0,1.0),(0.0,2.0),(1.0,0.0)).toDF("name_index","city_index")

    val dstDf = Seq((1.0, 1.0)).toDF(srcDf.columns:_*)