dd = df.from_pandas(d, npartitions=2)
dd.dtypes
Out[49]:
a int64
b datetime64[ns]
c object
dtype: object
c = dd.to_castra('delme0.castra')
c = None
c = dd.to_castra('delme1.castra', categories=True)
c
Out[54]: <castra.core.Castra at 0x10983eb00>
ee = df.from_castra('delme1.castra')
ee
Out[56]: dd.DataFrame<from-castra-7c5f3b6d9b74449a9e27408736e8859a, divisions=(0, 4, 9)>
ee.dtypes
Out[57]:
a int64
b datetime64[ns]
c category
dtype: object
c = ee.to_castra('delme2.castra')
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-58-62c09f024c21> in <module>()
----> 1 c = ee.to_castra('delme2.castra')
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/core.py in to_castra(self, fn, categories, sorted_index_column, compute)
1409 from .io import to_castra
1410 return to_castra(self, fn, categories, sorted_index_column,
-> 1411 compute=compute)
1412
1413 def to_bag(self, index=False):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/dataframe/io.py in to_castra(df, fn, categories, sorted_index_column, compute)
769 keys = [(name, -1), (name, df.npartitions - 1)]
770 if compute:
--> 771 c, _ = DataFrame._get(dsk, keys, get=get_sync)
772 return c
773 else:
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/base.py in _get(cls, dsk, keys, get, **kwargs)
41 get = get or _globals['get'] or cls._default_get
42 dsk2 = cls._optimize(dsk, keys, **kwargs)
---> 43 return get(dsk2, keys, **kwargs)
44
45 @classmethod
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_sync(dsk, keys, **kwargs)
514 queue = Queue()
515 return get_async(apply_sync, 1, dsk, keys, queue=queue,
--> 516 raise_on_exception=True, **kwargs)
517
518
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, queue, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, **kwargs)
485 f(key, res, dsk, state, worker_id)
486 while state['ready'] and len(state['running']) < num_workers:
--> 487 fire_task()
488
489 # Final reporting
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in fire_task()
456 # Submit
457 apply_async(execute_task, args=[key, dsk[key], data, queue,
--> 458 get_id, raise_on_exception])
459
460 # Seed initial tasks into the thread pool
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in apply_sync(func, args, kwds)
506 def apply_sync(func, args=(), kwds={}):
507 """ A naive synchronous version of apply_async """
--> 508 return func(*args, **kwds)
509
510
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in execute_task(key, task, data, queue, get_id, raise_on_exception)
262 """
263 try:
--> 264 result = _execute_task(task, data)
265 id = get_id()
266 result = key, result, None, id
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
243 elif istask(arg):
244 func, args = arg[0], arg[1:]
--> 245 args2 = [_execute_task(a, cache) for a in args]
246 return func(*args2)
247 elif not ishashable(arg):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in <listcomp>(.0)
243 elif istask(arg):
244 func, args = arg[0], arg[1:]
--> 245 args2 = [_execute_task(a, cache) for a in args]
246 return func(*args2)
247 elif not ishashable(arg):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/dask/async.py in _execute_task(arg, cache, dsk)
244 func, args = arg[0], arg[1:]
245 args2 = [_execute_task(a, cache) for a in args]
--> 246 return func(*args2)
247 elif not ishashable(arg):
248 return arg
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in extend(self, df)
217 # Store columns
218 for col in df.columns:
--> 219 pack_file(df[col].values, self.dirname(partition_name, col))
220
221 # Store index
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in pack_file(x, fn, encoding)
390 if x.dtype != 'O':
391 bloscpack.pack_ndarray_file(x, fn, bloscpack_args=bp_args,
--> 392 blosc_args=blosc_args(x.dtype))
393 else:
394 bytes = blosc.compress(msgpack.packb(x.tolist(), encoding=encoding), 1)
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/castra/core.py in blosc_args(dt)
28
29 def blosc_args(dt):
---> 30 if np.issubdtype(dt, int):
31 return bloscpack.BloscArgs(dt.itemsize, clevel=3, shuffle=True)
32 if np.issubdtype(dt, np.datetime64):
/Applications/anaconda/envs/python3/lib/python3.5/site-packages/numpy/core/numerictypes.py in issubdtype(arg1, arg2)
759 else:
760 val = mro[0]
--> 761 return issubclass(dtype(arg1).type, val)
762
763
TypeError: data type not understood