Skip to content Skip to sidebar Skip to footer

Cleaning Data In Csv Files Using Dataflow

I am trying to read a CSV (with header) file from GCS which has about 150 columns and then 1. Set column data for particular columns 2. Update NaN with Null values for all columns

Solution 1:

Using the following produces the output you're looking for:

    lines = p | ReadFromText(file_pattern="gs://<my-bucket>/input_file.csv")

    def parse_method(line):
        importcsvreader= csv.reader(line.split('\n'))
        for csv_row in reader:
            values = [x.decode('utf8') for x in csv_row]
            row = []
            for value in csv_row:
                ifvalue== 'NaN':
                    value = 'Null'
                row.append(value)

        return",".join(row)



    lines = lines | 'Split' >> beam.Map(parse_method)
    line = lines | 'Output to file' >> WriteToText(file_pattern="gs://<my-bucket>/output_file.csv")

Now for editing the columns based on the header I'm not sure if there is some more straightforward way, but I would use pandas the following way:

    lines = p | "ReadFromText" >> ReadFromText(file_pattern="gs://<my-bucket>/input_file.csv")

    defparse_method(line):
        import pandas as pd

        line = line.split(',')
        df = pd.DataFrame(data=[line],columns=['name','custom1','custom2'])
        df['custom2'] = df.custom2.apply(lambda x: 'None'if x == 'Nan'else x)
        values = list(df.loc[0].values)
        return",".join(values)

    lines = lines | "Split" >> beam.Map(parse_method)
    line = lines | "Output to file" >> WriteToText(file_path_prefix="gs://<my-bucket>/output_file.csv")

Post a Comment for "Cleaning Data In Csv Files Using Dataflow"