Skip to content Skip to sidebar Skip to footer

Can't Map A Function To Tarfile Members In Parallel

I have a tarfile containing bz2-compressed files. I want to apply the function clean_file to each of the bz2 files, and collate the results. In series, this is easy with a loop: im

Solution 1:

You didn't specify what platform you are running on but I suspect that it is Windows because you have ...

if __name__ == '__main__':
    main()

... which would be required for code that creates processes on platforms that use OS function spawn for creating new processes. But that also means that when a new process is created (e.g. all the processes in the process pool you are creating), each process begins by re-executing the source program from the very top of the program. This means that the following code is being executed by each pool process:

tr = tarfile.open('data.tar')
members = tr.getmembers()
num_files = len(members)

However, I don't see why this would in itself cause an error, but I can't be sure. The problem may be, however, that this is executing after the call to your worker function, clean_file is being called and so tr has not been set. If this code preceded clean_file it might work, but this is just a guess. Certainly extracting the members with members = tr.getmembers() in each pool process is wasteful. Each process needs to open the tar file, ideally just once.

But what is clear is that the stacktrace you published does not match your code. You show:

Traceback (most recent call last):
  File "parse_data.py", line 53, in <module>
    processed_files = list(tqdm.tqdm(p.imap(clean_file, members), total=num_files))

Yet your code does not have any reference to tqdm or using method imap. Now it becomes more difficult to analyze what your problem actually is when the code you post doesn't quite match the code that produces the exception.

On the off-chance you are running on a Mac, which might be using fork to create new processes, this can be problematic when the main process has created multiple threads (which you don't necessarily see, perhaps by the tarfile module) and you then create a new process, I have specified code to ensure that spawn is used to create new processes. Anyway, the following code should work. It also introduces a few optimizations. If it doesn't, please post a new stacktrace.

import pandas as pd
import json
import os
import bz2
import itertools
import datetime
import tarfile
from multiprocessing import get_context

defopen_tar():
    # open once for each process in the poolglobal tr
    tr = tarfile.open('data.tar')

defclean_file(member):
    f = tr.extractfile(member)

    with bz2.open(f, "rt") as bzinput:
        for line in bzinput:
            line = line.replace('"name"}', '"name":" "}')
            dat = json.loads(line)
            # since you are returning just the first occurrence:return dat

defmain():
    with tarfile.open('data.tar') as tr:
        members = tr.getmembers()
    # just pick members where '.bz2' is in member:
    filtered_members = filter(lambda member: '.bz2'instr(member), members)
    ctx = get_context('spawn')
    # open tar file just once for each process in the pool:with ctx.Pool(initializer=open_tar) as pool:
        processed_files = pool.map(clean_file, filtered_members)
        print(processed_files)

# required for when processes are created using spawn:if __name__ == '__main__':
    main()

Solution 2:

It seems some race condition was happening. Opening the tar file separately in every child process solves the issue:

import json
import bz2
import tarfile
import logging
from multiprocessing import Pool


defclean_file(member):
    if'.bz2'notinstr(member):
        returntry:
        with tarfile.open('data.tar') as tr:
            with tr.extractfile(member) as bz2_file:
                with bz2.open(bz2_file, "rt") as bzinput:
                    dicts = []
                    for i, line inenumerate(bzinput):
                        line = line.replace('"name"}', '"name":" "}')
                        dat = json.loads(line)
                        dicts.append(dat)
                        return dicts[0]
    except Exception:
        logging.exception(f"Error while processing {member}")


defprocess_serial():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    processed_files = []
    for i, member inenumerate(members):
        processed_files.append(clean_file(member))
        print(f'done {i}/{len(members)}')


defprocess_parallel():
    tr = tarfile.open('data.tar')
    members = tr.getmembers()
    with Pool() as pool:
        processed_files = pool.map(clean_file, members)
        print(processed_files)


defmain():
    process_parallel()


if __name__ == '__main__':
    main()

EDIT:

Note that another way to solve this problem is to just use the spawn start method:

multiprocessing.set_start_method('spawn')

By doing this, we are instructing Python to "deep-copy" file handles in child processes. Under the default "fork" start method, the file handles of parent and child share the same offsets.

Post a Comment for "Can't Map A Function To Tarfile Members In Parallel"