Skip to content Skip to sidebar Skip to footer

Use Keras In Multiprocessing

This is basically a duplicate of: Keras + Tensorflow and Multiprocessing in Python But my setup is a bit different, and their solution doesn't work for me. I need to train a keras

Solution 1:

The good news is that tensorflow sessions are thread-safe: Is it thread-safe when using tf.Session in inference service?

To use a keras model in multiple processes, you have to do the following:

  • set up the model
  • call _make_predict_function()
  • set up a session and use it to get the tensorflow graph
  • finalize this graph
  • everytime you predict something, supply this graph as_default_graph()

Here is some sample code:

# the usual importsimport numpy as np
import tensorflow as tf

from keras.models import *
from keras.layers import *

# set up the model
i = Input(shape=(10,))
b = Dense(1)(i)
model = Model(inputs=i, outputs=b)

# now to use it in multiprocessing, the following is necessary
model._make_predict_function()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
default_graph = tf.get_default_graph()
default_graph.finalize()

# now you share the model and graph between processes# in each process you can call this:with default_graph.as_default():
    return model.predict(something)

Solution 2:

This technique is not working for me.

I am loading my saved model and passing it on as an argument. My error message is slightly different than the one posted. It is

E tensorflow/core/grappler/clusters/utils.cc:83] Failed to get device properties, error code: 3

I do not have any trouble running it outside of multiprocessing. Also, if it means anythings, I'm using a docker image tensorflow/tensorflow-gpu-py3 version 1.13.1

Here is my Object Detection code below that takes an image and produces multiple scales of that image (called an image pyramid). It then processes one scale at a time. For each scale, it parses the image into smaller windows and then send each window to a processor. The processor then uses model.evaluate([window],[1]) to test if the current window contains my object. If the probability is high, the window box info is stored in a queue and retrieved later (along with the values from other processes)

Here is my code:

defstart_detection_mp3(image,winDim, minSize,  winStep=4, pyramidScale=1.5, minProb=0.7):
    # Code to use multiple processors (mp)
    boxes=[]
    probs=[]
    print("Loading CNN Keras Model .... ")
    checkpoint_path="trainedmodels/cp.ckpt"
    mymodel=create_CNN_model(2,winDim[0],winDim[1])
    mymodel.load_weights(checkpoint_path)
    mymodel._make_predict_function()
    (keepscale,keeplayer)=CalculateNumberOfScales(image,pyramidScale,minSize)
    printinfo("There are {} scales in this image.".format(len(keepscale)))
    for i inrange(0,len(keepscale)):
        printinfo("Working on layer {0:4d}. Scale {1:.2f}".format(i,keepscale[i]))
        (b,p)=detect_single_layer_mp3(keeplayer[i],keepscale[i],winStep,winDim,minProb,mysess,mymodel)

        boxes =boxes + b
        probs =probs + p
    mysess.close()
    return(boxes,probs)

defdetect_single_layer_mp3(layer,scale,winStep,winDim,minProb,mysess,mymodel): 
    # Use multiple processors
    q=[]
    p=[]
    d=[]
    i=0
    boxes=[]
    probs=[]
    xx, yy, windows= sliding_window_return(layer, winStep, winDim)
    # process in chunks of 4 (for four processors)
    NumOfProcessors=4;
    for aa inrange(0,len(xx)-1,4):
        for ii inrange(0,NumOfProcessors):
            ##print("aa: {}  ii: {}".format(aa,ii))
            printinfo("Processes {} of Loop {}".format(ii,aa))
            x=xx[aa]
            y=yy[aa]
            window=windows[aa]
            q=Queue() # Only need to create one Queue (FIFO buffer) to hold output from each process# when all processes are completed, the buffer will be emptied.
            p.append(Process(target=f2,args=(x,y,window,scale, minProb,winDim,q,mysess,mymodel)))
            pp=p[-1] # get last
            printinfo("Starting process {}".format(pp))
            pp.start()
            pp.join()

        whilenot q.empty():
            d=q.get()
            boxes = boxes + d[0]
            probs = probs + d[1]

        p=[]  # Clear Processes    
        p=[]
        q=[]   

    return(boxes,probs)


deff2(x,y,window,scale,minProb,winDim,q,mysess,mymodel):
    processID = os.getpid()
    boxes=[]
    probs=[]
    isHOG = 0
    isCNN = 0
    isCNN_Keras=1
    (winH, winW) = window.shape[:2]
    if winW == winDim[0] and winH ==winDim[1]: # Check that window dimension is if isCNN_Keras ==1:
            ### TODO  It appears that it is freezing at the prediction step                     
            printinfo("Process id: {} Starting test against CNN model".format(processID))
            window=window.reshape(-1,winH,winW,1)
            loss,prob = mymodel.evaluate([window],[1])
            print("Loss: {}  Accuracy: {}".format(loss,prob))

            if prob > minProb:
                printinfo("*** [INFO] ProcessID: {0:7d} Probability: {1:.3f}  Scale {2:.3f} ***".format(processID,prob,scale))
                # compute the (x, y)-coordinates of the bounding box using the current# scale of the image pyramid
                (startX, startY) = (int(scale * x), int(scale * y))
                endX = int(startX + (scale * winW))
                endY = int(startY + (scale * winH))

                # update the list of bounding boxes and probabilities
                boxes.append((startX, startY, endX, endY))
                probs.append(prob)      
    # return a tuple of the bounding boxes and probabilities            if q!=1:        
        q.put([boxes,probs])
        q.close()
        q=[]
    else:
        return(boxes,probs)

Post a Comment for "Use Keras In Multiprocessing"