Removing Duplicate Columns After A Df Join In Spark
Solution 1:
If the join columns at both data frames have the same names and you only need equi join, you can specify the join columns as a list, in which case the result will only keep one of the join columns:
df1.show()
+---+----+| id|val1|+---+----+|1|2||2|3||4|4||5|5|+---+----+
df2.show()
+---+----+| id|val2|+---+----+|1|2||1|3||2|4||3|5|+---+----+
df1.join(df2, ['id']).show()
+---+----+----+| id|val1|val2|+---+----+----+|1|2|2||1|2|3||2|3|4|+---+----+----+
Otherwise you need to give the join data frames alias and refer to the duplicated columns by the alias later:
df1.alias("a").join(
df2.alias("b"), df1['id'] == df2['id']
).select("a.id", "a.val1", "b.val2").show()
+---+----+----+| id|val1|val2|+---+----+----+|1|2|2||1|2|3||2|3|4|+---+----+----+
Solution 2:
df.join(other, on, how)
when on
is a column name string, or a list of column names strings, the returned dataframe will prevent duplicate columns.
when on
is a join expression, it will result in duplicate columns. We can use .drop(df.a)
to drop duplicate columns. Example:
cond = [df.a == other.a, df.b == other.bb, df.c == other.ccc]
# result will have duplicate column aresult = df.join(other, cond, 'inner').drop(df.a)
Solution 3:
Assuming 'a' is a dataframe with column 'id' and 'b' is another dataframe with column 'id'
I use the following two methods to remove duplicates:
Method 1: Using String Join Expression as opposed to boolean expression. This automatically remove a duplicate column for you
a.join(b, 'id')
Method 2: Renaming the column before the join and dropping it after
b.withColumnRenamed('id', 'b_id')
joinexpr = a['id'] == b['b_id']
a.join(b, joinexpr).drop('b_id)
Solution 4:
The code below works with Spark 1.6.0 and above.
salespeople_df.show()
+---+------+-----+
|Num| Name|Store|
+---+------+-----+
| 1| Henry| 100|
| 2| Karen| 100|
| 3| Paul| 101|
| 4| Jimmy| 102|
| 5|Janice| 103|
+---+------+-----+
storeaddress_df.show()
+-----+--------------------+
|Store| Address|
+-----+--------------------+
| 100| 64 E Illinos Ave|
| 101| 74 Grand Pl|
| 102| 2298 Hwy 7|
| 103|No address available|
+-----+--------------------+
Assuming -in this example- that the name of the shared column is the same:
joined=salespeople_df.join(storeaddress_df, ['Store'])
joined.orderBy('Num', ascending=True).show()
+-----+---+------+--------------------+|Store|Num| Name| Address|+-----+---+------+--------------------+|100|1| Henry|64 E Illinos Ave||100|2| Karen|64 E Illinos Ave||101|3| Paul|74 Grand Pl||102|4| Jimmy|2298 Hwy 7||103|5|Janice|No address available|+-----+---+------+--------------------+
.join
will prevent the duplication of the shared column.
Let's assume that you want to remove the column Num
in this example, you can just use .drop('colname')
joined=joined.drop('Num')
joined.show()
+-----+------+--------------------+
|Store| Name| Address|
+-----+------+--------------------+
| 103|Janice|No address available|
| 100| Henry| 64 E Illinos Ave|
| 100| Karen| 64 E Illinos Ave|
| 101| Paul| 74 Grand Pl|
| 102| Jimmy| 2298 Hwy 7|
+-----+------+--------------------+
Solution 5:
After I've joined multiple tables together, I run them through a simple function to drop columns in the DF if it encounters duplicates while walking from left to right. Alternatively, you could rename these columns too.
Where Names
is a table with columns ['Id', 'Name', 'DateId', 'Description']
and Dates
is a table with columns ['Id', 'Date', 'Description']
, the columns Id
and Description
will be duplicated after being joined.
Names = sparkSession.sql("SELECT * FROM Names")
Dates = sparkSession.sql("SELECT * FROM Dates")
NamesAndDates = Names.join(Dates, Names.DateId == Dates.Id, "inner")
NamesAndDates = dropDupeDfCols(NamesAndDates)
NamesAndDates.saveAsTable("...", format="parquet", mode="overwrite", path="...")
Where dropDupeDfCols
is defined as:
def dropDupeDfCols(df):
newcols = []
dupcols = []
for i in range(len(df.columns)):
if df.columns[i] not in newcols:
newcols.append(df.columns[i])
else:
dupcols.append(i)
df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
df = df.drop(str(dupcol))
return df.toDF(*newcols)
The resulting data frame will contain columns ['Id', 'Name', 'DateId', 'Description', 'Date']
.
Post a Comment for "Removing Duplicate Columns After A Df Join In Spark"