programing

두 개의 데이터 프레임을 결합하고, 한 열에서 모든 열을 선택하고 다른 열에서 일부 열을 선택합니다.

cafebook 2023. 9. 10. 12:39
반응형

두 개의 데이터 프레임을 결합하고, 한 열에서 모든 열을 선택하고 다른 열에서 일부 열을 선택합니다.

내가 스파크 데이터 프레임을 가지고 있다고 가정해 보겠습니다.df1, 몇 개의 열(그 중에서 열)이 있는id) 및 데이터 프레임df2두 개의 열로id그리고.other.

다음 명령을 복제하는 방법이 있습니까?

sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id")

다음과 같은 pyspark 기능만을 사용함으로써join(),select()뭐 그런거?

함수에 이 조인을 구현해야 하는데 함수 매개 변수로 sqlContext를 강제로 가지는 것을 원하지 않습니다.

별표()*)은 별칭과 함께 작동합니다.예:

from pyspark.sql.functions import *

df1 = df1.alias('df1')
df2 = df2.alias('df2')

df1.join(df2, df1.id == df2.id).select('df1.*')

가장 효율적인 방법은 확실하지 않지만, 이 방법은 제게 효과가 있었습니다.

from pyspark.sql.functions import col

df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])

비결은 다음과 같습니다.

[col('a.'+xx) for xx in a.columns] : all columns in a

[col('b.other1'),col('b.other2')] : some columns of b

별칭을 사용하지 않습니다.

df1.join(df2, df1.id == df2.id).select(df1["*"],df2["other"])

여기에는 SQL 컨텍스트가 필요하지 않지만 DataFrame의 메타데이터를 유지하는 솔루션이 있습니다.

a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']]).toDF(["other", "b_id"])
    
c = a.join(b, a.a_id == b.b_id).select(a["*"],b["other"])

그리고나서,c.show()산출량:

+----+-----+-----+
|a_id|extra|other|
+----+-----+-----+
|   a|  foo|   p1|
|   b|  hem|   p2|
|   c|  haw|   p3|
+----+-----+-----+

이 방법이 가장 쉽고 직관적인 방법이 될 것이라고 생각합니다.

final = (df1.alias('df1').join(df2.alias('df2'),
                               on = df1['id'] == df2['id'],
                               how = 'inner')
                         .select('df1.*',
                                 'df2.other')
)

중복 b_id를 삭제합니다.

c = a.join(b, a.a_id == b.b_id).drop(b.b_id)

다음은 내부 조인을 수행하고 데이터 프레임과 별칭에서 같은 열을 다른 열 이름으로 선택하는 코드 조각입니다.

emp_df  = spark.read.csv('Employees.csv', header =True);
dept_df = spark.read.csv('dept.csv', header =True)


emp_dept_df = emp_df.join(dept_df,'DeptID').select(emp_df['*'], dept_df['Name'].alias('DName'))
emp_df.show()
dept_df.show()
emp_dept_df.show()
Output  for 'emp_df.show()':

+---+---------+------+------+
| ID|     Name|Salary|DeptID|
+---+---------+------+------+
|  1|     John| 20000|     1|
|  2|    Rohit| 15000|     2|
|  3|    Parth| 14600|     3|
|  4|  Rishabh| 20500|     1|
|  5|    Daisy| 34000|     2|
|  6|    Annie| 23000|     1|
|  7| Sushmita| 50000|     3|
|  8| Kaivalya| 20000|     1|
|  9|    Varun| 70000|     3|
| 10|Shambhavi| 21500|     2|
| 11|  Johnson| 25500|     3|
| 12|     Riya| 17000|     2|
| 13|    Krish| 17000|     1|
| 14| Akanksha| 20000|     2|
| 15|   Rutuja| 21000|     3|
+---+---------+------+------+

Output  for 'dept_df.show()':
+------+----------+
|DeptID|      Name|
+------+----------+
|     1|     Sales|
|     2|Accounting|
|     3| Marketing|
+------+----------+

Join Output:
+---+---------+------+------+----------+
| ID|     Name|Salary|DeptID|     DName|
+---+---------+------+------+----------+
|  1|     John| 20000|     1|     Sales|
|  2|    Rohit| 15000|     2|Accounting|
|  3|    Parth| 14600|     3| Marketing|
|  4|  Rishabh| 20500|     1|     Sales|
|  5|    Daisy| 34000|     2|Accounting|
|  6|    Annie| 23000|     1|     Sales|
|  7| Sushmita| 50000|     3| Marketing|
|  8| Kaivalya| 20000|     1|     Sales|
|  9|    Varun| 70000|     3| Marketing|
| 10|Shambhavi| 21500|     2|Accounting|
| 11|  Johnson| 25500|     3| Marketing|
| 12|     Riya| 17000|     2|Accounting|
| 13|    Krish| 17000|     1|     Sales|
| 14| Akanksha| 20000|     2|Accounting|
| 15|   Rutuja| 21000|     3| Marketing|
+---+---------+------+------+----------+

제안된 코드를 사용하여 'a not found'라는 오류가 발생했습니다.

from pyspark.sql.functions import col df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])

옷을 갈아입었습니다a.columns로.df1.columns잘 해결됐습니다.

가입 후 중복된 열을 삭제하는 함수입니다.

확인해 보다

def dropDupeDfCols(df): newcols = [] dupcols = [ ]

for i in range(len(df.columns)):
    if df.columns[i] not in newcols:
        newcols.append(df.columns[i])
    else:
        dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
    df = df.drop(str(dupcol))

return df.toDF(*newcols)

위에서 언급한 답변 중 일부는 모호한 열 예외를 받았습니다(두 데이터 프레임의 열이 동일할 때 발생하며, 또한 데이터 벽돌에 스파크를 사용하고 있습니다).제가 해봤는데 효과가 있었어요.

df_join = df1.join(df2, (df1.a == df2.a) & (df1.b == df2.b), "inner").select(df1.columns,df2.columns)

저는 방금 df2에서 필요없는 칼럼을 삭제하고 가입했습니다.

sliced_df = df2.select(columns_of_interest)
df1.join(sliced_df, on=['id'], how='left')
**id should be in `columns_of_interest` tho
df1.join(df2, ['id']).drop(df2.id)

다른 pyspark dataframe에서 여러 열이 필요한 경우 이를 사용할 수 있습니다.

단일 가입 조건을 기준

x.join(y, x.id == y.id,"left").select(x["*"],y["col1"],y["col2"],y["col3"])

다중 가입 조건에 따라

x.join(y, (x.id == y.id) & (x.no == y.no),"left").select(x["*"],y["col1"],y["col2"],y["col3"])

저는 위의 제론의 답변이 매우 마음에 들고, 제 해결책과 기계적으로 동일하다고 생각합니다.이것은 데이터 벽돌에서 작동하며, 아마도 일반적인 스파크 환경에서 작동할 것입니다(키워드 "spark"를 "sqlcontext"로 바꿉니다).

df.createOrReplaceTempView('t1') #temp table t1
df2.createOrReplaceTempView('t2') #temp table t2

output = (
          spark.sql("""
                    select
                      t1.*
                      ,t2.desired_field(s)
                    from 
                      t1
                    left (or inner) join t2 on t1.id = t2.id
                    """
                   )
          )

당신은 그냥 가입을 할 수 있고 그 후에 원하는 칼럼을 선택할 수 있습니다. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe%20join#pyspark.sql.DataFrame.join

언급URL : https://stackoverflow.com/questions/36132322/join-two-data-frames-select-all-columns-from-one-and-some-columns-from-the-othe

반응형