6

I am working for the first time with dask and trying to run predict() from a trained keras model.

If I dont use dask, the function works fine (i.e. pd.DataFrame() versus dd.DataFrame () ). With Dask the error is below. Is this not a common use case (aside from scoring a groupby perhaps)

def calc_HR_ind_dsk(grp):
    model=keras.models.load_model('/home/embedding_model.h5')
    topk=10

    x=[grp['user'].values,grp['item'].values]
    pred_act=list(zip(model.predict(x)[:,0],grp['respond'].values))
    top=sorted(pred_act, key=lambda x: -x[0])[0:topk]
    hit=sum([x[1] for x in top])
    return(hit)



import dask.dataframe as dd

#step 1 - read in data as a dask df. We could reference more than 1 files using '*' wildcard
df = dd.read_csv('/home/test_coded_final.csv',dtype='int64')
results=df.groupby('user').apply(calc_HR_ind_dsk).compute()

TypeError: Cannot interpret feed_dict key as Tensor: Tensor Tensor("Placeholder_30:0", shape=(55188, 32), dtype=float32) is not an element of this graph.

Ioannis Nasios
  • 8,292
  • 4
  • 33
  • 55
B_Miner
  • 1,840
  • 4
  • 31
  • 66
  • Well a dask dataframe is a lazy summaries of ops with an idea hot the result should look like. So it make sense that it is not naturally supported like pandas Dataframe which is more or less a fancy numpy array. The support have to be implemented in keras. So you could open an issue there. Alternatively write a custom apply/map/map_partition function which do the prediction. But if you use a distributed dask cluster, you have to think about how to upload your model. – dennis-w Mar 14 '18 at 21:47
  • My thought was that calc_HR_ind_dsk would work on each groupby partition. Is this not true? Or is the issue that grp['user'].values is not exactly the same as a numpy array? – B_Miner Mar 14 '18 at 21:56
  • Sorry I overlooked that part. The problem should be your groupby apply function. I think dask have to test this one to know which output to expect. This force keras to handle whatever dask is testing your custom apply function on. You can fix this by specifying the meta parameter inside the apply function. – dennis-w Mar 14 '18 at 22:03
  • Ill take a look into that (back to reading :)). Do you happen to have experience with the apply function for groupby? I could not find that one could pass arguments. – B_Miner Mar 14 '18 at 22:05
  • I posted an answer to make my suggestion clearer. My experience with groupby apply is limited, but if the meta argument does not do the trick I‘ll look into it tomorrow. – dennis-w Mar 14 '18 at 22:22

3 Answers3

5

I found the answer. It is an issue with keras or tensorflow: https://github.com/keras-team/keras/issues/2397

Below code worked and using dask shaved 50% from the time versus standard pandas groupby.

#dask
model=keras.models.load_model('/home/embedding_model.h5')

#this part
import tensorflow as tf
global graph
graph = tf.get_default_graph()


def calc_HR_ind_dsk(grp):
    topk=10
    x=[grp['user'].values,grp['item'].values]

    with graph.as_default(): #and this part from https://github.com/keras-team/keras/issues/2397
        pred_act=list(zip(model.predict(x)[:,0],grp['respond'].values))
    top=sorted(pred_act, key=lambda x: -x[0])[0:topk]
    hit=sum([x[1] for x in top])

    return(hit)



import dask.dataframe as dd


df = dd.read_csv('/home/test_coded_final.csv',dtype='int64')
results=df.groupby('user').apply(calc_HR_ind_dsk).compute()
B_Miner
  • 1,840
  • 4
  • 31
  • 66
  • Interesting issue. I‘m suprised that dask don‘t give you a warning at least about the missing meta parameter. Also, do you know why dask is faster in this case? Groupby should cause a full shuffle and such a task should be faster in an in-memory op like pandas groupby. – dennis-w Mar 15 '18 at 05:59
  • Not sure. I assume for this example, the splitting into a couple partitions helped. – B_Miner Mar 15 '18 at 14:49
1

Have a look at: http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.groupby.DataFrameGroupBy.apply Unlike pandas, in dask many function, which let you define your own custom op, needs the meta parameter. Without this dask will sonehow test your custom function and pass weird things to keras which would might not be happening during calling compute.

dennis-w
  • 2,166
  • 1
  • 13
  • 23
  • If you look at my code, I am returning a scalar (it will be a 1 or 0). The issue is not on the return though, it is failing with the predict() function. – B_Miner Mar 15 '18 at 01:25
  • I found the answer (it is unrelated to dask) and will answer below. – B_Miner Mar 15 '18 at 01:26
0

A different answer I wrote might help here (use-case was using a Dask with a pre-trained ML model to predict on 1,000,000 examples): https://stackoverflow.com/a/59015702/4900327

Abhishek Divekar
  • 1,131
  • 2
  • 15
  • 31