Cleaning Data In Csv Files Using Dataflow
I am trying to read a CSV (with header) file from GCS which has about 150 columns and then 1. Set column data for particular columns 2. Update NaN with Null values for all columns
Solution 1:
Using the following produces the output you're looking for:
lines = p | ReadFromText(file_pattern="gs://<my-bucket>/input_file.csv")
def parse_method(line):
importcsvreader= csv.reader(line.split('\n'))
for csv_row in reader:
values = [x.decode('utf8') for x in csv_row]
row = []
for value in csv_row:
ifvalue== 'NaN':
value = 'Null'
row.append(value)
return",".join(row)
lines = lines | 'Split' >> beam.Map(parse_method)
line = lines | 'Output to file' >> WriteToText(file_pattern="gs://<my-bucket>/output_file.csv")
Now for editing the columns based on the header I'm not sure if there is some more straightforward way, but I would use pandas the following way:
lines = p | "ReadFromText" >> ReadFromText(file_pattern="gs://<my-bucket>/input_file.csv")
defparse_method(line):
import pandas as pd
line = line.split(',')
df = pd.DataFrame(data=[line],columns=['name','custom1','custom2'])
df['custom2'] = df.custom2.apply(lambda x: 'None'if x == 'Nan'else x)
values = list(df.loc[0].values)
return",".join(values)
lines = lines | "Split" >> beam.Map(parse_method)
line = lines | "Output to file" >> WriteToText(file_path_prefix="gs://<my-bucket>/output_file.csv")
Post a Comment for "Cleaning Data In Csv Files Using Dataflow"