Skip to content

castra 'TypeError: data type not understood' when storing categoricals #56

@user32000

Description

@user32000
dd = df.from_pandas(d, npartitions=2)

dd.dtypes
Out[49]: 
a             int64
b    datetime64[ns]
c            object
dtype: object

c = dd.to_castra('delme0.castra')

c = None

c = dd.to_castra('delme1.castra', categories=True)

c
Out[54]: <castra.core.Castra at 0x10983eb00>

ee = df.from_castra('delme1.castra')

ee
Out[56]: dd.DataFrame<from-castra-7c5f3b6d9b74449a9e27408736e8859a, divisions=(0, 4, 9)>

ee.dtypes
Out[57]: 
a             int64
b    datetime64[ns]
c          category
dtype: object

c = ee.to_castra('delme2.castra')
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-58-62c09f024c21> in <module>()
----> 1 c = ee.to_castra('delme2.castra')

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/core.py in to_castra(self, fn, categories, sorted_index_column, compute)
   1409         from .io import to_castra
   1410         return to_castra(self, fn, categories, sorted_index_column,
-> 1411                          compute=compute)
   1412 
   1413     def to_bag(self, index=False):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/io.py in to_castra(df, fn, categories, sorted_index_column, compute)
    769     keys = [(name, -1), (name, df.npartitions - 1)]
    770     if compute:
--> 771         c, _ = DataFrame._get(dsk, keys, get=get_sync)
    772         return c
    773     else:

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in _get(cls, dsk, keys, get, **kwargs)
     41         get = get or _globals['get'] or cls._default_get
     42         dsk2 = cls._optimize(dsk, keys, **kwargs)
---> 43         return get(dsk2, keys, **kwargs)
     44 
     45     @classmethod

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_sync(dsk, keys, **kwargs)
    514     queue = Queue()
    515     return get_async(apply_sync, 1, dsk, keys, queue=queue,
--> 516                      raise_on_exception=True, **kwargs)
    517 
    518 

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
    485             f(key, res, dsk, state, worker_id)
    486         while state['ready'] and len(state['running']) < num_workers:
--> 487             fire_task()
    488 
    489     # Final reporting

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in fire_task()
    456         # Submit
    457         apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 458                                         get_id, raise_on_exception])
    459 
    460     # Seed initial tasks into the thread pool

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in apply_sync(func, args, kwds)
    506 def apply_sync(func, args=(), kwds={}):
    507     """ A naive synchronous version of apply_async """
--> 508     return func(*args, **kwds)
    509 
    510 

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in execute_task(key, task, data, queue, get_id, raise_on_exception)
    262     """
    263     try:
--> 264         result = _execute_task(task, data)
    265         id = get_id()
    266         result = key, result, None, id

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
    243     elif istask(arg):
    244         func, args = arg[0], arg[1:]
--> 245         args2 = [_execute_task(a, cache) for a in args]
    246         return func(*args2)
    247     elif not ishashable(arg):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in <listcomp>(.0)
    243     elif istask(arg):
    244         func, args = arg[0], arg[1:]
--> 245         args2 = [_execute_task(a, cache) for a in args]
    246         return func(*args2)
    247     elif not ishashable(arg):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
    244         func, args = arg[0], arg[1:]
    245         args2 = [_execute_task(a, cache) for a in args]
--> 246         return func(*args2)
    247     elif not ishashable(arg):
    248         return arg

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in extend(self, df)
    217         # Store columns
    218         for col in df.columns:
--> 219             pack_file(df[col].values, self.dirname(partition_name, col))
    220 
    221         # Store index

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in pack_file(x, fn, encoding)
    390     if x.dtype != 'O':
    391         bloscpack.pack_ndarray_file(x, fn, bloscpack_args=bp_args,
--> 392                 blosc_args=blosc_args(x.dtype))
    393     else:
    394         bytes = blosc.compress(msgpack.packb(x.tolist(), encoding=encoding), 1)

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in blosc_args(dt)
     28 
     29 def blosc_args(dt):
---> 30     if np.issubdtype(dt, int):
     31         return bloscpack.BloscArgs(dt.itemsize, clevel=3, shuffle=True)
     32     if np.issubdtype(dt, np.datetime64):

/Applications/anaconda/envs/python3/lib/python3.5/site-packages/numpy/core/numerictypes.py in issubdtype(arg1, arg2)
    759     else:
    760         val = mro[0]
--> 761     return issubclass(dtype(arg1).type, val)
    762 
    763 

TypeError: data type not understood

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions