programing

Spark DataFrame을 피벗하는 방법은?

cafebook 2023. 10. 20. 14:21
반응형

Spark DataFrame을 피벗하는 방법은?

Spark DataFrame을 사용하기 시작했는데 데이터를 피벗하여 여러 행으로 된 1열 중 여러 열을 생성할 수 있어야 합니다.스캘딩에는 그것을 위한 기능이 내장되어 있고 저는 Python에는 Pandas가 있다고 믿지만 새로운 Spark Dataframe에 대한 기능을 찾을 수 없습니다.

나는 이것을 할 수 있는 일종의 맞춤 기능을 쓸 수 있다고 생각하지만, 특히 스파크의 초보자이기 때문에 어떻게 시작해야 할지도 잘 모르겠습니다.스칼라에 무엇인가를 쓰는 방법에 대한 제안이나 내장된 기능으로 이것을 하는 방법을 아는 사람이 있다면 매우 감사하겠습니다.

David Anderson Spark가 언급한 대로 버전 1.6부터 기능을 제공합니다.일반 구문은 다음과 같습니다.

df
  .groupBy(grouping_columns)
  .pivot(pivot_column, [values]) 
  .agg(aggregate_expressions)

및 를 사용한 사용 예csv형식:

파이썬:

from pyspark.sql.functions import avg

flights = (sqlContext
    .read
    .format("csv")
    .options(inferSchema="true", header="true")
    .load("flights.csv")
    .na.drop())

flights.registerTempTable("flights")
sqlContext.cacheTable("flights")

gexprs = ("origin", "dest", "carrier")
aggexpr = avg("arr_delay")

flights.count()
## 336776

%timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
## 10 loops, best of 3: 1.03 s per loop

스칼라:

val flights = sqlContext
  .read
  .format("csv")
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .load("flights.csv")

flights
  .groupBy($"origin", $"dest", $"carrier")
  .pivot("hour")
  .agg(avg($"arr_delay"))

Java:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;

Dataset<Row> df = spark.read().format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("flights.csv");

df.groupBy(col("origin"), col("dest"), col("carrier"))
        .pivot("hour")
        .agg(avg(col("arr_delay")));

R / SparkR:

library(magrittr)

flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

flights %>% 
  groupBy("origin", "dest", "carrier") %>% 
  pivot("hour") %>% 
  agg(avg(column("arr_delay")))

R / 반짝임

library(dplyr)

flights <- spark_read_csv(sc, "flights", "flights.csv")

avg.arr.delay <- function(gdf) {
   expr <- invoke_static(
      sc,
      "org.apache.spark.sql.functions",
      "avg",
      "arr_delay"
    )
    gdf %>% invoke("agg", expr, list())
}

flights %>% 
  sdf_pivot(origin + dest + carrier ~  hour, fun.aggregate=avg.arr.delay)

SQL:

Spark SQL의 PIVOT 키워드는 버전 2.4부터 지원됩니다.

CREATE TEMPORARY VIEW flights 
USING csv 
OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

 SELECT * FROM (
   SELECT origin, dest, carrier, arr_delay, hour FROM flights
 ) PIVOT (
   avg(arr_delay)
   FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
                13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
 );

예제 데이터:

"year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00

성능 고려 사항:

일반적으로 피벗은 비용이 많이 드는 작업입니다.

  • 당신이 가능하다면, 제공하려고 노력해보세요.valueslist: 유니크를 계산하기 위해 추가적인 히트를 피할 수 있기 때문입니다.

      vs = list(range(25))
      %timeit -n10 flights.groupBy(*gexprs ).pivot("hour", vs).agg(aggexpr).count()
      ## 10 loops, best of 3: 392 ms per loop
    
  • 어떤 경우에는 (2.0 또는 그 이후에 더 이상 노력할 가치가 없는 likely) 에 유익한 것으로 증명되었습니다.repartition및/또는 데이터를 미리 aggregate합니다.

  • 형상 변경에만 사용할 수 있습니다.first: Pyspark 데이터프레임의 피벗 문자열

관련 질문:

for loop을 작성하여 SQL 쿼리를 동적으로 생성함으로써 이를 극복하였습니다.예를 들어보겠습니다.

id  tag  value
1   US    50
1   UK    100
1   Can   125
2   US    75
2   UK    150
2   Can   175

그리고 나는:

id  US  UK   Can
1   50  100  125
2   75  150  175

피벗하고 싶은 값으로 목록을 만든 다음 필요한 SQL 쿼리를 포함하는 문자열을 만들 수 있습니다.

val countries = List("US", "UK", "Can")
val numCountries = countries.length - 1

var query = "select *, "
for (i <- 0 to numCountries-1) {
  query += """case when tag = """" + countries(i) + """" then value else 0 end as """ + countries(i) + ", "
}
query += """case when tag = """" + countries.last + """" then value else 0 end as """ + countries.last + " from myTable"

myDataFrame.registerTempTable("myTable")
val myDF1 = sqlContext.sql(query)

저는 그 때와 비슷한 쿼리를 작성할 수 있습니다.매우 우아한 해결책은 아니지만, 어떤 값의 목록에 대해서도 작동하고 유연하며, 코드를 호출할 때 인수로 전달할 수도 있습니다.

Spark 데이터프레임 API에 피벗 연산자가 추가되었으며 Spark 1.6의 일부입니다.

자세한 내용은 https://github.com/apache/spark/pull/7841 을 참조하십시오.

데이터 프레임을 사용하여 다음과 같은 단계로 유사한 문제를 해결했습니다.

'value'를 값으로 하여 모든 국가에 대한 열을 만듭니다.

import org.apache.spark.sql.functions._
val countries = List("US", "UK", "Can")
val countryValue = udf{(countryToCheck: String, countryInRow: String, value: Long) =>
  if(countryToCheck == countryInRow) value else 0
}
val countryFuncs = countries.map{country => (dataFrame: DataFrame) => dataFrame.withColumn(country, countryValue(lit(country), df("tag"), df("value"))) }
val dfWithCountries = Function.chain(countryFuncs)(df).drop("tag").drop("value")

데이터 프레임 'dfWithCountries'는 다음과 같습니다.

+--+--+---+---+
|id|US| UK|Can|
+--+--+---+---+
| 1|50|  0|  0|
| 1| 0|100|  0|
| 1| 0|  0|125|
| 2|75|  0|  0|
| 2| 0|150|  0|
| 2| 0|  0|175|
+--+--+---+---+

이제 원하는 결과 값을 모두 합할 수 있습니다.

dfWithCountries.groupBy("id").sum(countries: _*).show

결과:

+--+-------+-------+--------+
|id|SUM(US)|SUM(UK)|SUM(Can)|
+--+-------+-------+--------+
| 1|     50|    100|     125|
| 2|     75|    150|     175|
+--+-------+-------+--------+

하지만 그다지 우아한 해결책은 아닙니다.모든 열에 추가할 함수의 체인을 만들어야 했습니다.또한 제가 많은 국가를 가지고 있다면 임시 데이터 세트를 0이 많은 매우 넓은 세트로 확장할 것입니다.

피벗을 위한 간단한 방법이 있습니다.

  id  tag  value
  1   US    50
  1   UK    100
  1   Can   125
  2   US    75
  2   UK    150
  2   Can   175

  import sparkSession.implicits._

  val data = Seq(
    (1,"US",50),
    (1,"UK",100),
    (1,"Can",125),
    (2,"US",75),
    (2,"UK",150),
    (2,"Can",175),
  )

  val dataFrame = data.toDF("id","tag","value")

  val df2 = dataFrame
                    .groupBy("id")
                    .pivot("tag")
                    .max("value")
  df2.show()

+---+---+---+---+
| id|Can| UK| US|
+---+---+---+---+
|  1|125|100| 50|
|  2|175|150| 75|
+---+---+---+---+

심플하고 우아한 솔루션이 있습니다.

scala> spark.sql("select * from k_tags limit 10").show()
+---------------+-------------+------+
|           imsi|         name| value|
+---------------+-------------+------+
|246021000000000|          age|    37|
|246021000000000|       gender|Female|
|246021000000000|         arpu|    22|
|246021000000000|   DeviceType| Phone|
|246021000000000|DataAllowance|   6GB|
+---------------+-------------+------+

scala> spark.sql("select * from k_tags limit 10").groupBy($"imsi").pivot("name").agg(min($"value")).show()
+---------------+-------------+----------+---+----+------+
|           imsi|DataAllowance|DeviceType|age|arpu|gender|
+---------------+-------------+----------+---+----+------+
|246021000000000|          6GB|     Phone| 37|  22|Female|
|246021000000001|          1GB|     Phone| 72|  10|  Male|
+---------------+-------------+----------+---+----+------+

데이터셋/데이터프레임에 대한 피벗 작업의 예는 많이 있지만 SQL을 사용하는 예는 많이 찾을 수 없었습니다.여기 저에게 도움이 된 예가 있습니다.

create or replace temporary view faang 
as SELECT stock.date AS `Date`,
    stock.adj_close AS `Price`,
    stock.symbol as `Symbol` 
FROM stock  
WHERE (stock.symbol rlike '^(FB|AAPL|GOOG|AMZN)$') and year(date) > 2010;


SELECT * from faang 

PIVOT (max(price) for symbol in ('AAPL', 'FB', 'GOOG', 'AMZN')) order by date; 

처음에는 Al M의 솔루션을 채택했습니다.나중에 같은 생각을 하고 이 함수를 전치 함수로 다시 썼습니다.

키 및 값 열을 사용하여 df 행을 데이터 형식의 열로 바꿉니다.

입력 csv의 경우

id,tag,value
1,US,50a
1,UK,100
1,Can,125
2,US,75
2,UK,150
2,Can,175

아웃풋의

+--+---+---+---+
|id| UK| US|Can|
+--+---+---+---+
| 2|150| 75|175|
| 1|100|50a|125|
+--+---+---+---+

전치법:

def transpose(hc : HiveContext , df: DataFrame,compositeId: List[String], key: String, value: String) = {

val distinctCols =   df.select(key).distinct.map { r => r(0) }.collect().toList

val rdd = df.map { row =>
(compositeId.collect { case id => row.getAs(id).asInstanceOf[Any] },
scala.collection.mutable.Map(row.getAs(key).asInstanceOf[Any] -> row.getAs(value).asInstanceOf[Any]))
}
val pairRdd = rdd.reduceByKey(_ ++ _)
val rowRdd = pairRdd.map(r => dynamicRow(r, distinctCols))
hc.createDataFrame(rowRdd, getSchema(df.schema, compositeId, (key, distinctCols)))

}

private def dynamicRow(r: (List[Any], scala.collection.mutable.Map[Any, Any]), colNames: List[Any]) = {
val cols = colNames.collect { case col => r._2.getOrElse(col.toString(), null) }
val array = r._1 ++ cols
Row(array: _*)
}

private  def getSchema(srcSchema: StructType, idCols: List[String], distinctCols: (String, List[Any])): StructType = {
val idSchema = idCols.map { idCol => srcSchema.apply(idCol) }
val colSchema = srcSchema.apply(distinctCols._1)
val colsSchema = distinctCols._2.map { col => StructField(col.asInstanceOf[String], colSchema.dataType, colSchema.nullable) }
StructType(idSchema ++ colsSchema)
}

메인 토막글

import java.util.Date
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.StructField


...
...
def main(args: Array[String]): Unit = {

    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val dfdata1 = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true")
    .load("data.csv")
    dfdata1.show()  
    val dfOutput = transpose(new HiveContext(sc), dfdata1, List("id"), "tag", "value")
    dfOutput.show

}

내장된 스파크 피벗 기능은 비효율적입니다.Bellow 구현은 스파크 2.4+에서 작동합니다. 맵을 집계하고 값을 열로 추출하는 것이 목적입니다.유일한 제한 사항은 피벗된 열에서 집계 함수를 처리하지 않고 열만 처리한다는 것입니다.

8M 테이블에서 이러한 기능은 내장 스파크 버전에서는 40분이 아닌 3초 동안 적용됩니다.

# pass an optional list of string to avoid computation of columns
def pivot(df, group_by, key, aggFunction, levels=[]):
    if not levels:
        levels = [row[key] for row in df.filter(col(key).isNotNull()).groupBy(col(key)).agg(count(key)).select(key).collect()]
    return df.filter(col(key).isin(*levels) == True).groupBy(group_by).agg(map_from_entries(collect_list(struct(key, expr(aggFunction)))).alias("group_map")).select([group_by] + ["group_map." + l for l in levels])

# Usage
pivot(df, "id", "key", "value")
pivot(df, "id", "key", "array(value)")
// pass an optional list of string to avoid computation of columns
  def pivot(df: DataFrame, groupBy: Column, key: Column, aggFunct: String, _levels: List[String] = Nil): DataFrame = {
    val levels =
      if (_levels.isEmpty) df.filter(key.isNotNull).select(key).distinct().collect().map(row => row.getString(0)).toList
      else _levels

    df
      .filter(key.isInCollection(levels))
      .groupBy(groupBy)
      .agg(map_from_entries(collect_list(struct(key, expr(aggFunct)))).alias("group_map"))
      .select(groupBy.toString, levels.map(f => "group_map." + f): _*)
  }

// Usage:
pivot(df, col("id"), col("key"), "value")
pivot(df, col("id"), col("key"), "array(value)")

스파크는 스파크 데이터 프레임의 피벗(Pivoting)을 개선해 왔습니다.Spark DataFrame API에서 Spark 1.6 버전으로 피벗 기능이 추가되었으며 성능 문제가 있으며 Spark 2.0에서 수정되었습니다.

그러나 하위 버전을 사용하는 경우 피벗은 매우 비용이 많이 드는 작업이므로 아래와 같이 기능하기 위한 인수로 열 데이터(알려진 경우)를 제공하는 것이 좋습니다.

val countries = Seq("USA","China","Canada","Mexico")
val pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show()

스파크 데이터 프레임 피벗비피벗에서 자세히 설명했습니다.

행복한 배움!!

언급URL : https://stackoverflow.com/questions/30244910/how-to-pivot-spark-dataframe

반응형