두 개의 데이터 프레임을 결합하고, 한 열에서 모든 열을 선택하고 다른 열에서 일부 열을 선택합니다.
내가 스파크 데이터 프레임을 가지고 있다고 가정해 보겠습니다.df1
, 몇 개의 열(그 중에서 열)이 있는id
) 및 데이터 프레임df2
두 개의 열로id
그리고.other
.
다음 명령을 복제하는 방법이 있습니까?
sqlContext.sql("SELECT df1.*, df2.other FROM df1 JOIN df2 ON df1.id = df2.id")
다음과 같은 pyspark 기능만을 사용함으로써join()
,select()
뭐 그런거?
함수에 이 조인을 구현해야 하는데 함수 매개 변수로 sqlContext를 강제로 가지는 것을 원하지 않습니다.
별표()*
)은 별칭과 함께 작동합니다.예:
from pyspark.sql.functions import *
df1 = df1.alias('df1')
df2 = df2.alias('df2')
df1.join(df2, df1.id == df2.id).select('df1.*')
가장 효율적인 방법은 확실하지 않지만, 이 방법은 제게 효과가 있었습니다.
from pyspark.sql.functions import col
df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])
비결은 다음과 같습니다.
[col('a.'+xx) for xx in a.columns] : all columns in a
[col('b.other1'),col('b.other2')] : some columns of b
별칭을 사용하지 않습니다.
df1.join(df2, df1.id == df2.id).select(df1["*"],df2["other"])
여기에는 SQL 컨텍스트가 필요하지 않지만 DataFrame의 메타데이터를 유지하는 솔루션이 있습니다.
a = sc.parallelize([['a', 'foo'], ['b', 'hem'], ['c', 'haw']]).toDF(['a_id', 'extra'])
b = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']]).toDF(["other", "b_id"])
c = a.join(b, a.a_id == b.b_id).select(a["*"],b["other"])
그리고나서,c.show()
산출량:
+----+-----+-----+
|a_id|extra|other|
+----+-----+-----+
| a| foo| p1|
| b| hem| p2|
| c| haw| p3|
+----+-----+-----+
이 방법이 가장 쉽고 직관적인 방법이 될 것이라고 생각합니다.
final = (df1.alias('df1').join(df2.alias('df2'),
on = df1['id'] == df2['id'],
how = 'inner')
.select('df1.*',
'df2.other')
)
중복 b_id를 삭제합니다.
c = a.join(b, a.a_id == b.b_id).drop(b.b_id)
다음은 내부 조인을 수행하고 데이터 프레임과 별칭에서 같은 열을 다른 열 이름으로 선택하는 코드 조각입니다.
emp_df = spark.read.csv('Employees.csv', header =True);
dept_df = spark.read.csv('dept.csv', header =True)
emp_dept_df = emp_df.join(dept_df,'DeptID').select(emp_df['*'], dept_df['Name'].alias('DName'))
emp_df.show()
dept_df.show()
emp_dept_df.show()
Output for 'emp_df.show()':
+---+---------+------+------+
| ID| Name|Salary|DeptID|
+---+---------+------+------+
| 1| John| 20000| 1|
| 2| Rohit| 15000| 2|
| 3| Parth| 14600| 3|
| 4| Rishabh| 20500| 1|
| 5| Daisy| 34000| 2|
| 6| Annie| 23000| 1|
| 7| Sushmita| 50000| 3|
| 8| Kaivalya| 20000| 1|
| 9| Varun| 70000| 3|
| 10|Shambhavi| 21500| 2|
| 11| Johnson| 25500| 3|
| 12| Riya| 17000| 2|
| 13| Krish| 17000| 1|
| 14| Akanksha| 20000| 2|
| 15| Rutuja| 21000| 3|
+---+---------+------+------+
Output for 'dept_df.show()':
+------+----------+
|DeptID| Name|
+------+----------+
| 1| Sales|
| 2|Accounting|
| 3| Marketing|
+------+----------+
Join Output:
+---+---------+------+------+----------+
| ID| Name|Salary|DeptID| DName|
+---+---------+------+------+----------+
| 1| John| 20000| 1| Sales|
| 2| Rohit| 15000| 2|Accounting|
| 3| Parth| 14600| 3| Marketing|
| 4| Rishabh| 20500| 1| Sales|
| 5| Daisy| 34000| 2|Accounting|
| 6| Annie| 23000| 1| Sales|
| 7| Sushmita| 50000| 3| Marketing|
| 8| Kaivalya| 20000| 1| Sales|
| 9| Varun| 70000| 3| Marketing|
| 10|Shambhavi| 21500| 2|Accounting|
| 11| Johnson| 25500| 3| Marketing|
| 12| Riya| 17000| 2|Accounting|
| 13| Krish| 17000| 1| Sales|
| 14| Akanksha| 20000| 2|Accounting|
| 15| Rutuja| 21000| 3| Marketing|
+---+---------+------+------+----------+
제안된 코드를 사용하여 'a not found'라는 오류가 발생했습니다.
from pyspark.sql.functions import col df1.alias('a').join(df2.alias('b'),col('b.id') == col('a.id')).select([col('a.'+xx) for xx in a.columns] + [col('b.other1'),col('b.other2')])
옷을 갈아입었습니다a.columns
로.df1.columns
잘 해결됐습니다.
가입 후 중복된 열을 삭제하는 함수입니다.
확인해 보다
def dropDupeDfCols(df): newcols = [] dupcols = [ ]
for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)
df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))
return df.toDF(*newcols)
위에서 언급한 답변 중 일부는 모호한 열 예외를 받았습니다(두 데이터 프레임의 열이 동일할 때 발생하며, 또한 데이터 벽돌에 스파크를 사용하고 있습니다).제가 해봤는데 효과가 있었어요.
df_join = df1.join(df2, (df1.a == df2.a) & (df1.b == df2.b), "inner").select(df1.columns,df2.columns)
저는 방금 df2에서 필요없는 칼럼을 삭제하고 가입했습니다.
sliced_df = df2.select(columns_of_interest)
df1.join(sliced_df, on=['id'], how='left')
**id should be in `columns_of_interest` tho
df1.join(df2, ['id']).drop(df2.id)
다른 pyspark dataframe에서 여러 열이 필요한 경우 이를 사용할 수 있습니다.
단일 가입 조건을 기준
x.join(y, x.id == y.id,"left").select(x["*"],y["col1"],y["col2"],y["col3"])
다중 가입 조건에 따라
x.join(y, (x.id == y.id) & (x.no == y.no),"left").select(x["*"],y["col1"],y["col2"],y["col3"])
저는 위의 제론의 답변이 매우 마음에 들고, 제 해결책과 기계적으로 동일하다고 생각합니다.이것은 데이터 벽돌에서 작동하며, 아마도 일반적인 스파크 환경에서 작동할 것입니다(키워드 "spark"를 "sqlcontext"로 바꿉니다).
df.createOrReplaceTempView('t1') #temp table t1
df2.createOrReplaceTempView('t2') #temp table t2
output = (
spark.sql("""
select
t1.*
,t2.desired_field(s)
from
t1
left (or inner) join t2 on t1.id = t2.id
"""
)
)
당신은 그냥 가입을 할 수 있고 그 후에 원하는 칼럼을 선택할 수 있습니다. https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=dataframe%20join#pyspark.sql.DataFrame.join
언급URL : https://stackoverflow.com/questions/36132322/join-two-data-frames-select-all-columns-from-one-and-some-columns-from-the-othe
'programing' 카테고리의 다른 글
오라클에서 JDBC 배치 삽입에서 생성된 키를 가져오는 방법은 무엇입니까? (0) | 2023.09.10 |
---|---|
본문에서 스크롤 사용 안 함 (0) | 2023.09.10 |
더 큰 글리폰 (0) | 2023.09.10 |
Android의 색상 이해 (6자) (0) | 2023.09.10 |
PowerShell을 사용하여 분할 경로에서 파일의 최하위 디렉터리 이름 가져오기 (0) | 2023.09.10 |