diff --git a/cpp/csp/adapters/parquet/ParquetDictBasketOutputWriter.cpp b/cpp/csp/adapters/parquet/ParquetDictBasketOutputWriter.cpp index 325dd2497..67c9e9878 100644 --- a/cpp/csp/adapters/parquet/ParquetDictBasketOutputWriter.cpp +++ b/cpp/csp/adapters/parquet/ParquetDictBasketOutputWriter.cpp @@ -20,11 +20,11 @@ void ParquetDictBasketOutputWriter::start() m_indexFileWriterContainer = std::make_unique( arrow::schema( { arrow::field( m_cycleIndexOutputAdapter -> getColumnArrayBuilder( 0 ) -> getColumnName(), m_cycleIndexOutputAdapter -> getColumnArrayBuilder( 0 ) -> getDataType() ) } ), - m_adapterMgr.isWriteArrowBinary() ); - if( !m_adapterMgr.getFileName().empty() ) + m_adapterMgr -> isWriteArrowBinary() ); + if( !m_adapterMgr -> getFileName().empty() ) { - m_indexFileWriterContainer -> open( m_adapterMgr.getFileName(), - m_adapterMgr.getCompression(), m_adapterMgr.isAllowOverwrite() ); + m_indexFileWriterContainer -> open( m_adapterMgr -> getFileName(), + m_adapterMgr -> getCompression(), m_adapterMgr -> isAllowOverwrite() ); } } @@ -45,7 +45,7 @@ void ParquetDictBasketOutputWriter::stop() void ParquetDictBasketOutputWriter::writeValue( const std::string &valueKey, const TimeSeriesProvider *ts ) { - m_adapterMgr.scheduleEndCycle(); + m_adapterMgr -> scheduleEndCycle(); m_symbolOutputAdapter -> writeValue( valueKey ); ParquetWriter::onEndCycle(); ++m_nextCycleIndex; @@ -86,7 +86,7 @@ void ParquetDictBasketOutputWriter::onFileNameChange( const std::string &fileNam if(!fileName.empty()) { m_indexFileWriterContainer - -> open( fileName, m_adapterMgr.getCompression(), m_adapterMgr.isAllowOverwrite() ); + -> open( fileName, m_adapterMgr -> getCompression(), m_adapterMgr -> isAllowOverwrite() ); } } diff --git a/cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp b/cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp index f73092254..3cfef17fc 100644 --- a/cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp +++ b/cpp/csp/adapters/parquet/ParquetOutputAdapter.cpp @@ -127,10 +127,10 @@ ListColumnParquetOutputHandler::ListColumnParquetOutputHandler( Engine *engine, listWriterInterface ) ) { m_valueHandler = std::make_unique( - [ this ]( const TimeSeriesProvider *input ) + [ this ]( const DialectGenericType& input ) { static_cast(this -> m_columnArrayBuilder.get()) - -> setValue( input -> lastValueTyped() ); + -> setValue( input ); } ); } @@ -176,7 +176,7 @@ std::shared_ptr<::arrow::ArrayBuilder> ListColumnParquetOutputHandler::createVal void ListColumnParquetOutputAdapter::executeImpl() { - ( *m_valueHandler )( input() ); + ( *m_valueHandler )( input() -> lastValueTyped() ); m_parquetWriter.scheduleEndCycleEvent(); } @@ -192,6 +192,16 @@ StructParquetOutputHandler::StructParquetOutputHandler( Engine *engine, ParquetW } } +void StructParquetOutputHandler::writeValueFromArgs( const StructPtr input ) +{ + const Struct *structData = input.get(); + + for( auto &&valueHandler: m_valueHandlers ) + { + valueHandler( structData ); + } +} + void StructParquetOutputHandler::writeValueFromTs( const TimeSeriesProvider *input ) { const Struct *structData = input -> lastValueTyped().get(); diff --git a/cpp/csp/adapters/parquet/ParquetOutputAdapter.h b/cpp/csp/adapters/parquet/ParquetOutputAdapter.h index 7148d0fa9..4ebf55518 100644 --- a/cpp/csp/adapters/parquet/ParquetOutputAdapter.h +++ b/cpp/csp/adapters/parquet/ParquetOutputAdapter.h @@ -93,18 +93,23 @@ class ListColumnParquetOutputHandler : public ParquetOutputHandler static_cast(this -> m_columnArrayBuilder.get()) -> setValue( value ); } - void writeValueFromTs( const TimeSeriesProvider *input ) override final + void writeValueFromArgs( const DialectGenericType& input ) { ( *m_valueHandler )( input ); } + void writeValueFromTs( const TimeSeriesProvider *input ) override final + { + ( *m_valueHandler )( input -> lastValueTyped() ); + } + private: std::shared_ptr createValueBuilder( const CspTypePtr &elemType, DialectGenericListWriterInterface::Ptr &listWriterInterface ); protected : - using ValueHandler = std::function; + using ValueHandler = std::function; std::unique_ptr m_valueHandler; std::shared_ptr m_columnArrayBuilder; @@ -151,6 +156,7 @@ class StructParquetOutputHandler : public ParquetOutputHandler } void writeValueFromTs( const TimeSeriesProvider *input ) override final; + void writeValueFromArgs( const StructPtr input ); private: using ValueHandler = std::function; diff --git a/cpp/csp/adapters/parquet/ParquetReader.cpp b/cpp/csp/adapters/parquet/ParquetReader.cpp index 81cfd40ec..02e0512a7 100644 --- a/cpp/csp/adapters/parquet/ParquetReader.cpp +++ b/cpp/csp/adapters/parquet/ParquetReader.cpp @@ -192,9 +192,17 @@ void SingleTableParquetReader::setColumnAdaptersFromCurrentTable() columnAdapter = createColumnAdapter( *this, *field, getCurFileOrTableName(), &getStructColumnMeta() ); auto &fieldInfo = fieldsInfo[ index ]; - for( std::size_t i = 0; i < fieldInfo.m_width; ++i ) + if( isArrowIPC() ) + { + // Needed for all memory tables + m_neededColumnIndices.push_back( index ); + } + else { - m_neededColumnIndices.push_back( fieldInfo.m_startColumnIndex + i ); + for( std::size_t i = 0; i < fieldInfo.m_width; ++i ) + { + m_neededColumnIndices.push_back( fieldInfo.m_startColumnIndex + i ); + } } } else @@ -382,10 +390,13 @@ void SingleFileParquetReader::clear() } InMemoryTableParquetReader::InMemoryTableParquetReader( GeneratorPtr generatorPtr, std::vector columns, - bool allowMissingColumns, std::optional symbolColumnName ) + bool allowMissingColumns, std::optional symbolColumnName, bool call_init ) : SingleTableParquetReader( columns, true, allowMissingColumns, symbolColumnName ), m_generatorPtr( generatorPtr ) { - init(); + if( call_init ) + { + init(); + } } bool InMemoryTableParquetReader::openNextFile() diff --git a/cpp/csp/adapters/parquet/ParquetReader.h b/cpp/csp/adapters/parquet/ParquetReader.h index c48e738e1..e0e6745e1 100644 --- a/cpp/csp/adapters/parquet/ParquetReader.h +++ b/cpp/csp/adapters/parquet/ParquetReader.h @@ -424,21 +424,28 @@ class SingleFileParquetReader final : public SingleTableParquetReader bool m_allowMissingFiles; }; -class InMemoryTableParquetReader final : public SingleTableParquetReader +class InMemoryTableParquetReader : public SingleTableParquetReader { public: using GeneratorPtr = csp::Generator, csp::DateTime, csp::DateTime>::Ptr; InMemoryTableParquetReader( GeneratorPtr generatorPtr, std::vector columns, bool allowMissingColumns, - std::optional symbolColumnName = {} ); + std::optional symbolColumnName = {}, + bool call_init = true); std::string getCurFileOrTableName() const override{ return "IN_MEMORY_TABLE"; } protected: - bool openNextFile() override; - bool readNextRowGroup() override; + virtual bool openNextFile() override; + virtual bool readNextRowGroup() override; + void setTable( std::shared_ptr table ) + { + m_fullTable = table; + m_nextChunkIndex = 0; + m_curTable = nullptr; + } - void clear() override; + virtual void clear() override; private: GeneratorPtr m_generatorPtr; diff --git a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp index b8380ce53..3df0d25ca 100644 --- a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp +++ b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.cpp @@ -749,6 +749,24 @@ void BaseListColumnAdapter::addSu << " in file " << m_parquetReader.getCurFileOrTableName() ); } +template< typename ArrowListArrayType, typename ValueArrayType, typename ValueType> +void BaseListColumnAdapter::addSubscriber( csp::adapters::utils::ValueDispatcher::SubscriberType subscriber, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) +{ + CSP_TRUE_OR_THROW_RUNTIME( m_listReader == nullptr, + "Trying to subscribe list column in parquet reader more than once, this is not supported" ); + CSP_TRUE_OR_THROW_RUNTIME( listReader != nullptr, + "Trying to subscribe list column in parquet reader with null listReader" ); + m_listReader = std::dynamic_pointer_cast>( listReader ); + CSP_TRUE_OR_THROW_RUNTIME( m_listReader != nullptr, + "Subscribed to parquet column " << getColumnName() << " with type " + << "NumpyArray[" << listReader -> getValueType() -> type().asString() + << "] while " + << " column type in file is NumpyArray[" + << getContainerValueType() -> type().asString() << "]" + << " in file " << m_parquetReader.getCurFileOrTableName() ); + m_dispatcher.addSubscriber( subscriber, symbol ); +} + template< typename ArrowListArrayType, typename ValueArrayType, typename ValueType > void NativeListColumnAdapter::readCurValue() { diff --git a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.h b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.h index 1207ecbf7..4c4067646 100644 --- a/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.h +++ b/cpp/csp/adapters/parquet/ParquetReaderColumnAdapter.h @@ -48,6 +48,8 @@ class ParquetColumnAdapter virtual void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol = {} ) = 0; // NOTE: This API is only defined for ListType Column Adapters virtual void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) = 0; + // NOTE: This API is only used to add subscriber for ListType column adapters in cases where there is no ManagedSimInputAdapter + virtual void addSubscriber( csp::adapters::utils::ValueDispatcher::SubscriberType subscriber, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) = 0; virtual void dispatchValue( const utils::Symbol *symbol ) = 0; @@ -124,6 +126,10 @@ class MissingColumnAdapter : public ParquetColumnAdapter virtual void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol = {} ) override {}; virtual void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) override {}; + virtual void addSubscriber( csp::adapters::utils::ValueDispatcher::SubscriberType subscriber, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) override + { + CSP_THROW(TypeError, "Trying to add DIALECT_GENERIC subscriber on non container type"); + } virtual void dispatchValue( const utils::Symbol *symbol ) override {}; @@ -172,6 +178,10 @@ class BaseTypedColumnAdapter : public ParquetColumnAdapter void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol = {} ) override; void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) override; + virtual void addSubscriber( csp::adapters::utils::ValueDispatcher::SubscriberType subscriber, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) override + { + CSP_THROW(TypeError, "Trying to add DIALECT_GENERIC subscriber on non container type"); + } void dispatchValue( const utils::Symbol *symbol ) override; void ensureType( CspType::Ptr cspType ) override; @@ -288,9 +298,11 @@ class BaseListColumnAdapter : public BaseTypedColumnAdapter::BaseTypedColumnAdapter; using BaseTypedColumnAdapter::getColumnName; + using BaseTypedColumnAdapter::m_dispatcher; void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol = {} ) override; void addSubscriber( ManagedSimInputAdapter *inputAdapter, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) override; + void addSubscriber( csp::adapters::utils::ValueDispatcher::SubscriberType subscriber, std::optional symbol, const DialectGenericListReaderInterface::Ptr &listReader ) override; CspTypePtr getNativeCspType() const override {return nullptr;} bool isListType() const override{ return true; }; CspTypePtr getContainerValueType() const override{ return CspType::fromCType::type(); } diff --git a/cpp/csp/adapters/parquet/ParquetWriter.cpp b/cpp/csp/adapters/parquet/ParquetWriter.cpp index e40d1889f..e7d95233f 100644 --- a/cpp/csp/adapters/parquet/ParquetWriter.cpp +++ b/cpp/csp/adapters/parquet/ParquetWriter.cpp @@ -13,8 +13,12 @@ namespace csp::adapters::parquet { +ParquetWriter::ParquetWriter() + : m_adapterMgr( nullptr ), m_engine( nullptr ), m_curChunkSize( 0 ), m_writeTimestampColumn( std::optional{} ) +{} + ParquetWriter::ParquetWriter( ParquetOutputAdapterManager *mgr, std::optional writeTimestampColumn ) - : m_adapterMgr( *mgr ), m_engine( mgr -> engine() ), m_curChunkSize( 0 ), m_writeTimestampColumn( writeTimestampColumn ) + : m_adapterMgr( mgr ), m_engine( mgr -> engine() ), m_curChunkSize( 0 ), m_writeTimestampColumn( writeTimestampColumn ) {} ParquetWriter::ParquetWriter( ParquetOutputAdapterManager *mgr, const Dictionary & properties ) : ParquetWriter( mgr, std::optional{} ) @@ -128,12 +132,12 @@ PushInputAdapter *ParquetWriter::getStatusAdapter() void ParquetWriter::start() { std::vector> arrowFields; - if( !m_writeTimestampColumn.has_value() && !m_adapterMgr.getTimestampColumnName().empty() ) + if( !m_writeTimestampColumn.has_value() && !m_adapterMgr -> getTimestampColumnName().empty() ) { m_writeTimestampColumn = true; - m_columnBuilders.push_back( std::make_shared( m_adapterMgr.getTimestampColumnName(), getChunkSize() ) ); + m_columnBuilders.push_back( std::make_shared( m_adapterMgr -> getTimestampColumnName(), getChunkSize() ) ); std::shared_ptr colMetaData; - auto colMetaIt = m_columnMetaData.find( m_adapterMgr.getTimestampColumnName() ); + auto colMetaIt = m_columnMetaData.find( m_adapterMgr -> getTimestampColumnName() ); if( colMetaIt != m_columnMetaData.end() ) { colMetaData = colMetaIt -> second; @@ -141,7 +145,7 @@ void ParquetWriter::start() } arrowFields.push_back( - arrow::field( m_adapterMgr.getTimestampColumnName(), m_columnBuilders.back() -> getDataType(), colMetaData ) ); + arrow::field( m_adapterMgr -> getTimestampColumnName(), m_columnBuilders.back() -> getDataType(), colMetaData ) ); } else { @@ -194,7 +198,7 @@ void ParquetWriter::onEndCycle() if( m_writeTimestampColumn.value() ) { // Set the timestamp value it's always the first - now = m_adapterMgr.rootEngine() -> now(); + now = m_adapterMgr -> rootEngine() -> now(); static_cast(m_columnBuilders[ 0 ].get()) -> setValue( now ); } for( auto &&columnBuilder:m_columnBuilders ) @@ -221,7 +225,7 @@ void ParquetWriter::onFileNameChange( const std::string &fileName ) if( !fileName.empty() ) { m_fileWriterWrapperContainer - -> open( fileName, m_adapterMgr.getCompression(), m_adapterMgr.isAllowOverwrite() ); + -> open( fileName, m_adapterMgr -> getCompression(), m_adapterMgr -> isAllowOverwrite() ); } } @@ -245,20 +249,20 @@ StructParquetOutputHandler *ParquetWriter::createStructOutputHandler( CspTypePtr void ParquetWriter::initFileWriterContainer( std::shared_ptr schema ) { - if( m_adapterMgr.isSplitColumnsToFiles() ) + if( m_adapterMgr -> isSplitColumnsToFiles() ) { m_fileWriterWrapperContainer = std::make_unique( schema, - m_adapterMgr.isWriteArrowBinary() ); + m_adapterMgr -> isWriteArrowBinary() ); } else { m_fileWriterWrapperContainer = std::make_unique( schema, - m_adapterMgr.isWriteArrowBinary() ); + m_adapterMgr -> isWriteArrowBinary() ); } - if( !m_adapterMgr.getFileName().empty() ) + if( !m_adapterMgr -> getFileName().empty() ) { - m_fileWriterWrapperContainer -> open( m_adapterMgr.getFileName(), - m_adapterMgr.getCompression(), m_adapterMgr.isAllowOverwrite() ); + m_fileWriterWrapperContainer -> open( m_adapterMgr -> getFileName(), + m_adapterMgr -> getCompression(), m_adapterMgr -> isAllowOverwrite() ); } } diff --git a/cpp/csp/adapters/parquet/ParquetWriter.h b/cpp/csp/adapters/parquet/ParquetWriter.h index ebf89a1e6..88ef57e6a 100644 --- a/cpp/csp/adapters/parquet/ParquetWriter.h +++ b/cpp/csp/adapters/parquet/ParquetWriter.h @@ -31,6 +31,7 @@ class FileWriterWrapperContainer; class ParquetWriter : public EndCycleListener { public: + ParquetWriter(); ParquetWriter( ParquetOutputAdapterManager *mgr, std::optional writeTimestampColumn = {} ); ParquetWriter( ParquetOutputAdapterManager *mgr, const Dictionary & properties ); @@ -53,11 +54,11 @@ class ParquetWriter : public EndCycleListener void onEndCycle() override; - std::uint32_t getChunkSize() const{ return m_adapterMgr.getBatchSize(); } + virtual std::uint32_t getChunkSize() const{ return m_adapterMgr -> getBatchSize(); } virtual void scheduleEndCycleEvent() { - m_adapterMgr.scheduleEndCycle(); + m_adapterMgr -> scheduleEndCycle(); } bool isFileOpen() const; @@ -76,7 +77,7 @@ class ParquetWriter : public EndCycleListener using Adapters = std::vector; using PublishedColumnNames = std::unordered_set; - ParquetOutputAdapterManager &m_adapterMgr; + ParquetOutputAdapterManager *m_adapterMgr; Engine *m_engine; private: Adapters m_adapters; diff --git a/cpp/csp/python/CMakeLists.txt b/cpp/csp/python/CMakeLists.txt index d5f863e1a..a575c8e0e 100644 --- a/cpp/csp/python/CMakeLists.txt +++ b/cpp/csp/python/CMakeLists.txt @@ -36,6 +36,7 @@ set(CSPIMPL_PUBLIC_HEADERS PyBasketInputProxy.h PyBasketOutputProxy.h PyCppNode.h + PyDialectGenericListsInterface.h PyEngine.h PyInputAdapterWrapper.h PyInputProxy.h @@ -90,9 +91,35 @@ target_compile_definitions(cspimpl PUBLIC NPY_NO_DEPRECATED_API=NPY_1_7_API_VERS target_compile_definitions(cspimpl PRIVATE CSPIMPL_EXPORTS=1) +find_package(Arrow REQUIRED) +find_package(Parquet REQUIRED) + +if(WIN32) + if(CSP_USE_VCPKG) + set(ARROW_PACKAGES_TO_LINK Arrow::arrow_static Parquet::parquet_static ) + target_compile_definitions(csp_parquet_adapter PUBLIC ARROW_STATIC) + target_compile_definitions(csp_parquet_adapter PUBLIC PARQUET_STATIC) + else() + # use dynamic variants + # Until we manage to get the fix for ws3_32.dll in arrow-16 into conda, manually fix the error here + get_target_property(LINK_LIBS Arrow::arrow_shared INTERFACE_LINK_LIBRARIES) + string(REPLACE "ws2_32.dll" "ws2_32" FIXED_LINK_LIBS "${LINK_LIBS}") + set_target_properties(Arrow::arrow_shared PROPERTIES INTERFACE_LINK_LIBRARIES "${FIXED_LINK_LIBS}") + set(ARROW_PACKAGES_TO_LINK parquet_shared arrow_shared) + endif() +else() + if(CSP_USE_VCPKG) + # use static variants + set(ARROW_PACKAGES_TO_LINK parquet_static arrow_static) + else() + # use dynamic variants + set(ARROW_PACKAGES_TO_LINK parquet arrow) + endif() +endif() + ## Baselib c++ module add_library(cspbaselibimpl SHARED cspbaselibimpl.cpp) -target_link_libraries(cspbaselibimpl cspimpl baselibimpl) +target_link_libraries(cspbaselibimpl cspimpl baselibimpl csp_parquet_adapter ${ARROW_PACKAGES_TO_LINK}) # Include exprtk include directory for exprtk node target_include_directories(cspbaselibimpl PRIVATE ${EXPRTK_INCLUDE_DIRS}) diff --git a/cpp/csp/python/PyCppNode.h b/cpp/csp/python/PyCppNode.h index 0ff6f1acc..6eb0ab3a7 100644 --- a/cpp/csp/python/PyCppNode.h +++ b/cpp/csp/python/PyCppNode.h @@ -33,6 +33,6 @@ REGISTER_MODULE_METHOD( #Name, Name##_cppnode_create, METH_VARARGS, #Name ); #define REGISTER_CPPNODE( Namespace, NodeName ) CPPNODE_CREATE_FWD_DECL( Namespace, NodeName ) \ _REGISTER_CPPNODE( NodeName, Namespace::CPPNODE_CREATE_METHOD( NodeName ) ) -#endif - } + +#endif diff --git a/cpp/csp/python/PyDialectGenericListsInterface.h b/cpp/csp/python/PyDialectGenericListsInterface.h new file mode 100644 index 000000000..774b6ecf9 --- /dev/null +++ b/cpp/csp/python/PyDialectGenericListsInterface.h @@ -0,0 +1,307 @@ +#ifndef _IN_CSP_PYTHON_DIALECT_GENERIC_INTERFACE_H +#define _IN_CSP_PYTHON_DIALECT_GENERIC_INTERFACE_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace csp::python +{ + +template< typename CspCType> +class NumpyArrayWriterImpl : public csp::adapters::parquet::TypedDialectGenericListWriterInterface +{ +public: + NumpyArrayWriterImpl( PyArray_Descr *expectedArrayDesc ) + : m_expectedArrayDesc( expectedArrayDesc ) + { + } + + void writeItems( const csp::DialectGenericType &listObject ) override + { + PyObject *object = csp::python::toPythonBorrowed( listObject ); + if( !PyArray_Check( object ) ) + { + CSP_THROW( csp::TypeError, "While writing to parquet expected numpy array type, got " << Py_TYPE( object ) -> tp_name ); + } + + PyArrayObject *arrayObject = ( PyArrayObject * ) ( object ); + char npy_type = PyArray_DESCR( arrayObject ) -> type; + if( PyArray_DESCR( arrayObject ) -> kind != m_expectedArrayDesc -> kind ) + { + CSP_THROW( csp::TypeError, + "Expected array of type " << csp::python::PyObjectPtr::own( PyObject_Repr( ( PyObject * ) m_expectedArrayDesc ) ) + << " got " + << csp::python::PyObjectPtr::own( PyObject_Repr( ( PyObject * ) PyArray_DESCR( arrayObject ) ) ) ); + } + + auto ndim = PyArray_NDIM( arrayObject ); + + CSP_TRUE_OR_THROW_RUNTIME( ndim == 1, "While writing to parquet expected numpy array with 1 dimension" << " got " << ndim ); + switch( npy_type ) + { + case NPY_BYTELTR: writeValues( arrayObject ); break; + case NPY_UBYTELTR: writeValues( arrayObject ); break; + case NPY_SHORTLTR: writeValues( arrayObject ); break; + case NPY_USHORTLTR: writeValues( arrayObject ); break; + case NPY_INTLTR: writeValues( arrayObject ); break; + case NPY_UINTLTR: writeValues( arrayObject ); break; + case NPY_LONGLTR: writeValues( arrayObject ); break; + case NPY_ULONGLTR: writeValues( arrayObject ); break; + case NPY_LONGLONGLTR: writeValues( arrayObject ); break; + case NPY_ULONGLONGLTR: writeValues( arrayObject ); break; + + case NPY_FLOATLTR: writeValues( arrayObject ); break; + case NPY_DOUBLELTR: writeValues( arrayObject ); break; + default: + writeValues( arrayObject ); + } + } +private: + template + void writeValues( PyArrayObject * arrayObject ) + { + auto arraySize = PyArray_Size( ( PyObject * ) arrayObject ); + if( PyArray_ISCARRAY_RO(arrayObject) ) + { + NumpyCType* data = reinterpret_cast( PyArray_DATA( arrayObject ) ); + for (decltype(arraySize) i = 0; i < arraySize; ++i) + { + this->writeValue(static_cast(data[i])); + } + } + else + { + for (decltype(arraySize) i = 0; i < arraySize; ++i) + { + this->writeValue(static_cast(*reinterpret_cast(PyArray_GETPTR1(arrayObject, i)))); + } + } + } + + PyArray_Descr *m_expectedArrayDesc; +}; + +class NumpyUnicodeArrayWriter : public csp::adapters::parquet::TypedDialectGenericListWriterInterface +{ +public: + NumpyUnicodeArrayWriter() + { + } + + void writeItems( const csp::DialectGenericType &listObject ) override + { + PyObject *object = csp::python::toPythonBorrowed( listObject ); + + if( !PyArray_Check( object ) ) + { + CSP_THROW( csp::TypeError, "While writing to parquet expected numpy array type, got " << Py_TYPE( object ) -> tp_name ); + } + PyArrayObject *arrayObject = ( PyArrayObject * ) ( object ); + + if( PyArray_DESCR( arrayObject ) -> type_num != NPY_UNICODE ) + { + CSP_THROW( csp::TypeError, + "Expected array of type " << csp::python::PyObjectPtr::own( PyObject_Repr( ( PyObject * ) m_expectedArrayDesc ) ) + << " got " + << csp::python::PyObjectPtr::own( + PyObject_Repr( ( PyObject * ) PyArray_DESCR( arrayObject ) ) ) ); + } + + auto elementSize = PyArray_DESCR( arrayObject ) -> elsize; + auto ndim = PyArray_NDIM( arrayObject ); + + CSP_TRUE_OR_THROW_RUNTIME( ndim == 1, "While writing to parquet expected numpy array with 1 dimension" << " got " << ndim ); + std::wstring_convert,char32_t> converter; + + auto arraySize = PyArray_Size( object ); + if( PyArray_ISCARRAY_RO( arrayObject ) ) + { + auto data = reinterpret_cast(PyArray_DATA( arrayObject )); + + for( decltype( arraySize ) i = 0; i < arraySize; ++i ) + { + + std::string value = converter.to_bytes( reinterpret_cast(data + elementSize * i), + reinterpret_cast(data + elementSize * ( i + 1 )) ); + this -> writeValue( value ); + } + } + else + { + for( decltype( arraySize ) i = 0; i < arraySize; ++i ) + { + char *elementPtr = reinterpret_cast(PyArray_GETPTR1( arrayObject, i )); + std::string value = converter.to_bytes( reinterpret_cast(elementPtr), + reinterpret_cast(elementPtr + elementSize ) ); + this -> writeValue( value ); + } + } + } + +private: + PyArray_Descr *m_expectedArrayDesc; +}; + +inline csp::adapters::parquet::DialectGenericListWriterInterface::Ptr create_numpy_array_writer_impl( const csp::CspTypePtr &type ) +{ + try + { + return csp::PartialSwitchCspType::invoke( + type.get(), + []( auto tag ) -> csp::adapters::parquet::DialectGenericListWriterInterface::Ptr + { + using CValueType = typename decltype( tag )::type; + auto numpy_dtype = PyArray_DescrFromType( csp::python::NPY_TYPE::value ); + + if constexpr (std::is_same_v) + { + return std::make_shared(); + } + else + { + return std::make_shared>(numpy_dtype); + } + } + ); + } + catch( csp::TypeError &e ) + { + CSP_THROW( csp::TypeError, "Unsupported array value type when writing to parquet:" << type -> type().asString() ); + } +} + + +template< typename V > +class NumpyArrayReaderImpl final : public csp::adapters::parquet::TypedDialectGenericListReaderInterface +{ +public: + NumpyArrayReaderImpl( PyArray_Descr *expectedArrayDesc ) + : m_expectedArrayDesc( expectedArrayDesc ) + { + } + virtual csp::DialectGenericType create(uint32_t size) override + { + npy_intp iSize = size; + + Py_INCREF(m_expectedArrayDesc); + PyObject* arr = PyArray_SimpleNewFromDescr( 1, &iSize, m_expectedArrayDesc ); + // Since arr already has reference count + csp::python::PyObjectPtr objectPtr{csp::python::PyObjectPtr::own(arr)}; + + // We need to make sure that's the case, since we are going to return pointer to raw buffer + CSP_ASSERT(PyArray_ISCARRAY( reinterpret_cast(arr))); + + csp::DialectGenericType res{csp::python::fromPython(arr)}; + return res; + } + + csp::DialectGenericType create( uint32_t size, uint32_t maxElementSize ) override + { + CSP_NOT_IMPLEMENTED; + } + + virtual V *getRawDataBuffer( const csp::DialectGenericType &list ) const override + { + auto arrayObject = reinterpret_cast(csp::python::toPythonBorrowed(list)); + return reinterpret_cast(PyArray_DATA( arrayObject )); + } + + virtual void setValue(const csp::DialectGenericType& list, int index, const V& value) override + { + getRawDataBuffer(list)[index] = value; + } + +private: + PyArray_Descr *m_expectedArrayDesc; +}; + +class NumpyUnicodeReaderImpl final : public csp::adapters::parquet::TypedDialectGenericListReaderInterface +{ +public: + NumpyUnicodeReaderImpl() + { + } + + virtual csp::DialectGenericType create( uint32_t size ) override + { + CSP_NOT_IMPLEMENTED; + } + + csp::DialectGenericType create( uint32_t size, uint32_t maxElementSize ) override + { + npy_intp iSize = size; + + PyArray_Descr *typ; + PyObject *type_string_descr = csp::python::toPython( std::string( "U" ) + std::to_string( maxElementSize ) ); + PyArray_DescrConverter( type_string_descr, &typ ); + Py_DECREF( type_string_descr ); + + PyObject *arr = PyArray_SimpleNewFromDescr( 1, &iSize, typ ); + + // Since arr already has reference count + csp::python::PyObjectPtr objectPtr{ csp::python::PyObjectPtr::own( arr ) }; + + csp::DialectGenericType res{ csp::python::fromPython( arr ) }; + return res; + } + + std::string *getRawDataBuffer( const csp::DialectGenericType &list ) const override + { + return nullptr; + } + + void setValue( const csp::DialectGenericType &list, int index, const std::string &value ) override + { + auto arrayObject = reinterpret_cast(csp::python::toPythonBorrowed( list )); + std::wstring_convert,char32_t> converter; + auto elementSize = PyArray_DESCR( arrayObject ) -> elsize; + auto wideValue = converter.from_bytes( value ); + auto nElementsToCopy = std::min( int(elementSize / sizeof(char32_t)), int( wideValue.size() + 1 ) ); + std::copy_n( wideValue.c_str(), nElementsToCopy, reinterpret_cast(PyArray_GETPTR1( arrayObject, index )) ); + } +}; + + +inline csp::adapters::parquet::DialectGenericListReaderInterface::Ptr create_numpy_array_reader_impl( const csp::CspTypePtr &type ) + +{ + try + { + return csp::PartialSwitchCspType::invoke( type.get(), + []( auto tag ) -> csp::adapters::parquet::DialectGenericListReaderInterface::Ptr + { + using TagType = decltype(tag); + using CValueType = typename TagType::type; + auto numpy_dtype = PyArray_DescrFromType( + csp::python::NPY_TYPE::value ); + + if( numpy_dtype -> type_num == NPY_UNICODE ) + { + return std::make_shared(); + } + else + { + return std::make_shared>( + numpy_dtype ); + } + } + ); + } + catch( csp::TypeError &e ) + { + CSP_THROW( csp::TypeError, "Unsupported array value type when reading from parquet:" << type -> type().asString() ); + } +} + +} + +#endif diff --git a/cpp/csp/python/adapters/parquetadapterimpl.cpp b/cpp/csp/python/adapters/parquetadapterimpl.cpp index 1adaff120..1097789c6 100644 --- a/cpp/csp/python/adapters/parquetadapterimpl.cpp +++ b/cpp/csp/python/adapters/parquetadapterimpl.cpp @@ -15,7 +15,7 @@ #include #include #include -#include +#include #include #include #include @@ -186,294 +186,6 @@ class ArrowTableGenerator : public csp::Generator, csp::python::PyObjectPtr m_data; }; -template< typename CspCType> -class NumpyArrayWriterImpl : public TypedDialectGenericListWriterInterface -{ -public: - NumpyArrayWriterImpl( PyArray_Descr *expectedArrayDesc ) - : m_expectedArrayDesc( expectedArrayDesc ) - { - } - - void writeItems( const csp::DialectGenericType &listObject ) override - { - PyObject *object = csp::python::toPythonBorrowed( listObject ); - if( !PyArray_Check( object ) ) - { - CSP_THROW( csp::TypeError, "While writing to parquet expected numpy array type, got " << Py_TYPE( object ) -> tp_name ); - } - - PyArrayObject *arrayObject = ( PyArrayObject * ) ( object ); - char npy_type = PyArray_DESCR( arrayObject ) -> type; - if( PyArray_DESCR( arrayObject ) -> kind != m_expectedArrayDesc -> kind ) - { - CSP_THROW( csp::TypeError, - "Expected array of type " << csp::python::PyObjectPtr::own( PyObject_Repr( ( PyObject * ) m_expectedArrayDesc ) ) - << " got " - << csp::python::PyObjectPtr::own( PyObject_Repr( ( PyObject * ) PyArray_DESCR( arrayObject ) ) ) ); - } - - auto ndim = PyArray_NDIM( arrayObject ); - - CSP_TRUE_OR_THROW_RUNTIME( ndim == 1, "While writing to parquet expected numpy array with 1 dimension" << " got " << ndim ); - switch( npy_type ) - { - case NPY_BYTELTR: writeValues( arrayObject ); break; - case NPY_UBYTELTR: writeValues( arrayObject ); break; - case NPY_SHORTLTR: writeValues( arrayObject ); break; - case NPY_USHORTLTR: writeValues( arrayObject ); break; - case NPY_INTLTR: writeValues( arrayObject ); break; - case NPY_UINTLTR: writeValues( arrayObject ); break; - case NPY_LONGLTR: writeValues( arrayObject ); break; - case NPY_ULONGLTR: writeValues( arrayObject ); break; - case NPY_LONGLONGLTR: writeValues( arrayObject ); break; - case NPY_ULONGLONGLTR: writeValues( arrayObject ); break; - - case NPY_FLOATLTR: writeValues( arrayObject ); break; - case NPY_DOUBLELTR: writeValues( arrayObject ); break; - default: - writeValues( arrayObject ); - } - } -private: - template - void writeValues( PyArrayObject * arrayObject ) - { - auto arraySize = PyArray_Size( ( PyObject * ) arrayObject ); - if( PyArray_ISCARRAY_RO(arrayObject) ) - { - NumpyCType* data = reinterpret_cast( PyArray_DATA( arrayObject ) ); - for (decltype(arraySize) i = 0; i < arraySize; ++i) - { - this->writeValue(static_cast(data[i])); - } - } - else - { - for (decltype(arraySize) i = 0; i < arraySize; ++i) - { - this->writeValue(static_cast(*reinterpret_cast(PyArray_GETPTR1(arrayObject, i)))); - } - } - } - - PyArray_Descr *m_expectedArrayDesc; -}; - -class NumpyUnicodeArrayWriter : public TypedDialectGenericListWriterInterface -{ -public: - NumpyUnicodeArrayWriter() - { - } - - void writeItems( const csp::DialectGenericType &listObject ) override - { - PyObject *object = csp::python::toPythonBorrowed( listObject ); - - if( !PyArray_Check( object ) ) - { - CSP_THROW( csp::TypeError, "While writing to parquet expected numpy array type, got " << Py_TYPE( object ) -> tp_name ); - } - PyArrayObject *arrayObject = ( PyArrayObject * ) ( object ); - - if( PyArray_DESCR( arrayObject ) -> type_num != NPY_UNICODE ) - { - CSP_THROW( csp::TypeError, - "Expected array of type " << csp::python::PyObjectPtr::own( PyObject_Repr( ( PyObject * ) m_expectedArrayDesc ) ) - << " got " - << csp::python::PyObjectPtr::own( - PyObject_Repr( ( PyObject * ) PyArray_DESCR( arrayObject ) ) ) ); - } - - auto elementSize = PyArray_DESCR( arrayObject ) -> elsize; - auto ndim = PyArray_NDIM( arrayObject ); - - CSP_TRUE_OR_THROW_RUNTIME( ndim == 1, "While writing to parquet expected numpy array with 1 dimension" << " got " << ndim ); - std::wstring_convert,char32_t> converter; - - auto arraySize = PyArray_Size( object ); - if( PyArray_ISCARRAY_RO( arrayObject ) ) - { - auto data = reinterpret_cast(PyArray_DATA( arrayObject )); - - for( decltype( arraySize ) i = 0; i < arraySize; ++i ) - { - - std::string value = converter.to_bytes( reinterpret_cast(data + elementSize * i), - reinterpret_cast(data + elementSize * ( i + 1 )) ); - this -> writeValue( value ); - } - } - else - { - for( decltype( arraySize ) i = 0; i < arraySize; ++i ) - { - char *elementPtr = reinterpret_cast(PyArray_GETPTR1( arrayObject, i )); - std::string value = converter.to_bytes( reinterpret_cast(elementPtr), - reinterpret_cast(elementPtr + elementSize ) ); - this -> writeValue( value ); - } - } - } - -private: - PyArray_Descr *m_expectedArrayDesc; -}; - -static inline DialectGenericListWriterInterface::Ptr create_numpy_array_writer_impl( const csp::CspTypePtr &type ) -{ - try - { - return csp::PartialSwitchCspType::invoke( - type.get(), - []( auto tag ) -> DialectGenericListWriterInterface::Ptr - { - using CValueType = typename decltype( tag )::type; - auto numpy_dtype = PyArray_DescrFromType( csp::python::NPY_TYPE::value ); - - if constexpr (std::is_same_v) - { - return std::make_shared(); - } - else - { - return std::make_shared>(numpy_dtype); - } - } - ); - } - catch( csp::TypeError &e ) - { - CSP_THROW( csp::TypeError, "Unsupported array value type when writing to parquet:" << type -> type().asString() ); - } -} - - -template< typename V > -class NumpyArrayReaderImpl final : public TypedDialectGenericListReaderInterface -{ -public: - NumpyArrayReaderImpl( PyArray_Descr *expectedArrayDesc ) - : m_expectedArrayDesc( expectedArrayDesc ) - { - } - virtual csp::DialectGenericType create(uint32_t size) override - { - npy_intp iSize = size; - - Py_INCREF(m_expectedArrayDesc); - PyObject* arr = PyArray_SimpleNewFromDescr( 1, &iSize, m_expectedArrayDesc ); - // Since arr already has reference count - csp::python::PyObjectPtr objectPtr{csp::python::PyObjectPtr::own(arr)}; - - // We need to make sure that's the case, since we are going to return pointer to raw buffer - CSP_ASSERT(PyArray_ISCARRAY( reinterpret_cast(arr))); - - csp::DialectGenericType res{csp::python::fromPython(arr)}; - return res; - } - - csp::DialectGenericType create( uint32_t size, uint32_t maxElementSize ) override - { - CSP_NOT_IMPLEMENTED; - } - - virtual V *getRawDataBuffer( const csp::DialectGenericType &list ) const override - { - auto arrayObject = reinterpret_cast(csp::python::toPythonBorrowed(list)); - return reinterpret_cast(PyArray_DATA( arrayObject )); - } - - virtual void setValue(const csp::DialectGenericType& list, int index, const V& value) override - { - getRawDataBuffer(list)[index] = value; - } - -private: - PyArray_Descr *m_expectedArrayDesc; -}; - -class NumpyUnicodeReaderImpl final : public TypedDialectGenericListReaderInterface -{ -public: - NumpyUnicodeReaderImpl() - { - } - - virtual csp::DialectGenericType create( uint32_t size ) override - { - CSP_NOT_IMPLEMENTED; - } - - csp::DialectGenericType create( uint32_t size, uint32_t maxElementSize ) override - { - npy_intp iSize = size; - - PyArray_Descr *typ; - PyObject *type_string_descr = csp::python::toPython( std::string( "U" ) + std::to_string( maxElementSize ) ); - PyArray_DescrConverter( type_string_descr, &typ ); - Py_DECREF( type_string_descr ); - - PyObject *arr = PyArray_SimpleNewFromDescr( 1, &iSize, typ ); - - // Since arr already has reference count - csp::python::PyObjectPtr objectPtr{ csp::python::PyObjectPtr::own( arr ) }; - - csp::DialectGenericType res{ csp::python::fromPython( arr ) }; - return res; - } - - std::string *getRawDataBuffer( const csp::DialectGenericType &list ) const override - { - return nullptr; - } - - void setValue( const csp::DialectGenericType &list, int index, const std::string &value ) override - { - auto arrayObject = reinterpret_cast(csp::python::toPythonBorrowed( list )); - std::wstring_convert,char32_t> converter; - auto elementSize = PyArray_DESCR( arrayObject ) -> elsize; - auto wideValue = converter.from_bytes( value ); - auto nElementsToCopy = std::min( int(elementSize / sizeof(char32_t)), int( wideValue.size() + 1 ) ); - std::copy_n( wideValue.c_str(), nElementsToCopy, reinterpret_cast(PyArray_GETPTR1( arrayObject, index )) ); - } -}; - - -inline DialectGenericListReaderInterface::Ptr create_numpy_array_reader_impl( const csp::CspTypePtr &type ) - -{ - try - { - return csp::PartialSwitchCspType::invoke( type.get(), - []( auto tag ) -> DialectGenericListReaderInterface::Ptr - { - using TagType = decltype(tag); - using CValueType = typename TagType::type; - auto numpy_dtype = PyArray_DescrFromType( - csp::python::NPY_TYPE::value ); - - if( numpy_dtype -> type_num == NPY_UNICODE ) - { - return std::make_shared(); - } - else - { - return std::make_shared>( - numpy_dtype ); - } - } - ); - } - catch( csp::TypeError &e ) - { - CSP_THROW( csp::TypeError, "Unsupported array value type when reading from parquet:" << type -> type().asString() ); - } -} - } namespace csp::python @@ -512,7 +224,7 @@ create_parquet_input_adapter( csp::AdapterManager *manager, PyEngine *pyengine, { auto &&valueType = pyTypeAsCspType( toPythonBorrowed( propertiesDict.get( "array_value_type" ) ) ); return parquetManager -> getInputAdapter( valueType, propertiesDict, pushMode, - create_numpy_array_reader_impl( valueType ) ); + csp::python::create_numpy_array_reader_impl( valueType ) ); } else { @@ -539,7 +251,7 @@ static OutputAdapter *create_parquet_output_adapter( csp::AdapterManager *manage if( propertiesDict.get( "is_array", false ) ) { auto &&valueType = pyTypeAsCspType( toPythonBorrowed( propertiesDict.get( "array_value_type" ) ) ); - return parquetManager -> getListOutputAdapter( valueType, propertiesDict, create_numpy_array_writer_impl( valueType ) ); + return parquetManager -> getListOutputAdapter( valueType, propertiesDict, csp::python::create_numpy_array_writer_impl( valueType ) ); } else { diff --git a/cpp/csp/python/cspbaselibimpl.cpp b/cpp/csp/python/cspbaselibimpl.cpp index 1ada5ced9..220a73c1f 100644 --- a/cpp/csp/python/cspbaselibimpl.cpp +++ b/cpp/csp/python/cspbaselibimpl.cpp @@ -5,6 +5,19 @@ #include #include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + static void * init_nparray() { csp::python::AcquireGIL gil; @@ -13,8 +26,54 @@ static void * init_nparray() } static void * s_init_array = init_nparray(); +void ReleaseArrowSchemaPyCapsule( PyObject* capsule ) { + struct ArrowSchema* schema = + ( struct ArrowSchema* )PyCapsule_GetPointer( capsule, "arrow_schema" ); + if ( schema->release != NULL ) { + schema->release( schema ); + } + free( schema ); +} + +void ReleaseArrowArrayPyCapsule( PyObject* capsule ) { + struct ArrowArray* array = + ( struct ArrowArray* )PyCapsule_GetPointer( capsule, "arrow_array"); + if ( array->release != NULL ) { + array->release( array ); + } + free( array ); +} + +static csp::DialectGenericType numpy_ndarray_reshape( csp::DialectGenericType data, csp::DialectGenericType dims ) +{ + auto arrayObject = reinterpret_cast(csp::python::toPythonBorrowed(data)); + auto dimsObject = reinterpret_cast(csp::python::toPythonBorrowed(dims)); + + auto new_array = csp::python::PyObjectPtr::own(PyArray_Reshape( arrayObject, dimsObject )); + return csp::python::fromPython(new_array.get()); +} + +static std::pair numpy_ndarray_flatten( csp::DialectGenericType data ) +{ + auto arrayObject = reinterpret_cast(csp::python::toPythonBorrowed(data)); + npy_intp ndims[1] = { PyArray_NDIM( arrayObject ) }; + auto dims = PyArray_DIMS( arrayObject ); + auto shape = csp::python::PyObjectPtr::own( PyArray_SimpleNew( 1, ndims, NPY_INT ) ); + int* shape_data = reinterpret_cast( PyArray_DATA( reinterpret_cast( shape.get() ) ) ); + for(int i = 0; i < ndims[0]; i++) + { + shape_data[i] = dims[i]; + } + PyArray_SetBaseObject( reinterpret_cast( shape.get() ), Py_None ); + auto flat = csp::python::PyObjectPtr::own( PyArray_Flatten( arrayObject, NPY_CORDER ) ); + auto flat_array = csp::python::fromPython( flat.get() ); + auto shape_array = csp::python::fromPython( shape.get() ); + return std::make_pair( std::move(flat_array), std::move(shape_array) ); +} + namespace csp::cppnodes { + DECLARE_CPPNODE( exprtk_impl ) { class BaseValueContainer @@ -325,6 +384,430 @@ DECLARE_CPPNODE( exprtk_impl ) EXPORT_CPPNODE( exprtk_impl ); +DECLARE_CPPNODE( record_batches_to_struct ) +{ + using InMemoryTableParquetReader = csp::adapters::parquet::InMemoryTableParquetReader; + using DialectGenericSubscriber = csp::adapters::utils::ValueDispatcher::SubscriberType; + class RecordBatchReader : public InMemoryTableParquetReader + { + public: + RecordBatchReader( std::vector columns, std::shared_ptr schema ): + InMemoryTableParquetReader( nullptr, columns, false, {}, false ) + { + m_schema = schema; + } + std::string getCurFileOrTableName() const override{ return "IN_RECORD_BATCH"; } + void initialize() { setColumnAdaptersFromCurrentTable(); } + void parseBatches( std::vector> record_batches ) + { + auto table_result = arrow::Table::FromRecordBatches( m_schema, record_batches ); + if( !table_result.ok() ) + CSP_THROW( ValueError, "Failed to load all the record batches into a table: " << table_result.status().ToString() ); + + setTable( table_result.ValueUnsafe() ); + + if( !readNextRowGroup() ) + CSP_THROW( ValueError, "Unable to read the first row group from table" ); + + while( readNextRow() ) + { + for( auto& adapter: getStructAdapters() ) + { + adapter -> dispatchValue( nullptr ); + } + for(auto& adapter: m_postAdapters) + { + adapter -> dispatchValue( nullptr ); + } + } + } + void setPostAdapters( std::vector& col_names ) + { + for( auto& col_name: col_names ) + { + m_postAdapters.emplace_back( (*this)[col_name].get() ); + } + } + + void stop() + { + InMemoryTableParquetReader::clear(); + } + protected: + bool openNextFile() override { return false; } + void clear() override { setTable( nullptr ); } + std::vector m_postAdapters; + }; + + SCALAR_INPUT( DialectGenericType, schema_ptr ); + SCALAR_INPUT( StructMetaPtr, cls ); + SCALAR_INPUT( DictionaryPtr, properties ); + TS_INPUT( Generic, data ); + + TS_OUTPUT( Generic ); + + std::shared_ptr m_reader; + std::vector* m_structsVecPtr; + std::vector m_ndarrayDimColData; + + INIT_CPPNODE( record_batches_to_struct ) + { + auto & input_def = tsinputDef( "data" ); + if( input_def.type -> type() != CspType::Type::ARRAY ) + CSP_THROW( TypeError, "record_batches_to_struct expected ts array type, got " << input_def.type -> type() ); + + auto * aType = static_cast( input_def.type.get() ); + CspTypePtr elemType = aType -> elemType(); + if( elemType -> type() != CspType::Type::DIALECT_GENERIC ) + CSP_THROW( TypeError, "record_batches_to_struct expected ts array of DIALECT_GENERIC type, got " << elemType -> type() ); + + auto & output_def = tsoutputDef( "" ); + if( output_def.type -> type() != CspType::Type::ARRAY ) + CSP_THROW( TypeError, "record_batches_to_struct expected ts array type, got " << output_def.type -> type() ); + } + + void addListSubscriber( std::string col_name, DialectGenericType generic_type, DialectGenericSubscriber subscriber ) + { + auto &&field_type = csp::python::pyTypeAsCspType( csp::python::toPythonBorrowed( generic_type ) ); + auto list_reader_interface = csp::python::create_numpy_array_reader_impl( field_type ); + (*m_reader)[col_name] -> addSubscriber( subscriber, {}, list_reader_interface ); + } + + START() + { + // Create Adapters for Schema + PyObject* capsule = csp::python::toPythonBorrowed( schema_ptr ); + struct ArrowSchema* c_schema = reinterpret_cast( PyCapsule_GetPointer( capsule, "arrow_schema") ); + auto result = arrow::ImportSchema( c_schema ); + if( !result.ok() ) + CSP_THROW( ValueError, "Failed to load the arrow schema: " << result.status().ToString() ); + std::shared_ptr schema = result.ValueUnsafe(); + std::vector columns; + auto field_map = properties.value() -> get( "field_map" ); + // Extract the columns names + for( auto it = field_map -> begin(); it != field_map -> end(); ++it ) + { + if( schema -> GetFieldByName( it.key() ) ) + columns.push_back( it.key() ); + else + CSP_THROW( ValueError, "column " << it.key() << " not found in schema" ); + } + std::vector extra_columns; + + // Extract the numpy column names + auto numpy_fields = properties.value() -> get( "numpy_fields" ); + auto numpy_field_types = properties.value() -> get( "numpy_field_types" ); + auto numpy_dimension_names = properties.value() -> get( "numpy_dimension_names" ); + auto numpy_dimension_types = properties.value() -> get( "numpy_dimension_types" ); + for( auto it = numpy_fields-> begin(); it != numpy_fields-> end(); ++it ) + { + if( schema -> GetFieldByName( it.key() ) ) + { + auto col_name = it.key(); + columns.push_back( col_name ); + if( numpy_dimension_names -> exists( col_name ) ) + { + auto dim_col_name = numpy_dimension_names -> get( col_name ); + columns.push_back( dim_col_name ); + } + } + else + CSP_THROW( ValueError, "column " << it.key() << " not found in schema" ); + } + m_reader = std::make_shared( columns, schema ); + m_reader -> initialize(); + + m_ndarrayDimColData.resize( numpy_dimension_names -> size() ); + std::shared_ptr out_type = std::make_shared( cls.value() ); + + // Add adapters for numpy arrays + unsigned ndarray_idx = 0; + for( auto it = numpy_fields -> begin(); it != numpy_fields -> end(); ++it ) + { + auto col_name = it.key(); + auto dialect_generic_type = numpy_field_types -> get(col_name); + + auto field_name = it.value(); + auto &&field_ptr = out_type -> meta() -> field( field_name ); + if( numpy_dimension_names -> exists( col_name ) ) + { + // Is NDArray + auto dim_col_name = numpy_dimension_names -> get(col_name); + auto dim_col_dialect_generic_type = numpy_dimension_types -> get(col_name); + + auto* data_ptr = &m_ndarrayDimColData[ndarray_idx++]; + addListSubscriber( dim_col_name, dim_col_dialect_generic_type, + [data_ptr]( const DialectGenericType * d ) + { + if( d ) *data_ptr = *d; + else CSP_THROW( ValueError, "Failed to create DIALECT_GENERIC while parsing the record batches" ); + } + ); + extra_columns.push_back( dim_col_name ); + + addListSubscriber( col_name, dialect_generic_type, + [this, field_ptr, data_ptr]( const DialectGenericType * d ) + { + if( d ) + { + auto new_data = numpy_ndarray_reshape( *d, *data_ptr ); + field_ptr -> setValue( this -> m_structsVecPtr -> back().get(), new_data ); + } + else CSP_THROW( ValueError, "Failed to create DIALECT_GENERIC while parsing the record batches" ); + } + ); + } + else + { + // Is 1DArray + addListSubscriber( col_name, dialect_generic_type, + [this, field_ptr]( const DialectGenericType * d ) + { + if( d ) field_ptr -> setValue( this -> m_structsVecPtr -> back().get(), *d ); + else CSP_THROW( ValueError, "Failed to create DIALECT_GENERIC while parsing the record batches" ); + } + ); + } + extra_columns.push_back( col_name ); + } + m_reader -> setPostAdapters( extra_columns ); + + // Add the adapter for struct + csp::adapters::utils::StructAdapterInfo key{ std::move( out_type ), std::move( field_map ) }; + auto& struct_adapter = m_reader -> getStructAdapter( key ); + struct_adapter.addSubscriber( [this]( StructPtr * s ) + { + if( s ) this -> m_structsVecPtr -> push_back( *s ); + else CSP_THROW( ValueError, "Failed to create struct while parsing the record batches" ); + }, {} ); + + } + + INVOKE() + { + if( csp.ticked( data ) ) + { + auto & py_batches = data.lastValue>(); + std::vector> batches; + for( auto& py_batch: py_batches ) + { + PyObject* py_tuple = csp::python::toPythonBorrowed( py_batch ); + PyObject* py_schema = PyTuple_GET_ITEM( py_tuple, 0 ); + PyObject* py_array = PyTuple_GET_ITEM( py_tuple, 1 ); + struct ArrowSchema* c_schema = reinterpret_cast( PyCapsule_GetPointer( py_schema, "arrow_schema" ) ); + struct ArrowArray* c_array = reinterpret_cast( PyCapsule_GetPointer( py_array, "arrow_array" ) ); + auto result = arrow::ImportRecordBatch( c_array, c_schema ); + if( !result.ok() ) + CSP_THROW( ValueError, "Failed to load record batches through PyCapsule C Data interface: " << result.status().ToString() ); + batches.emplace_back( result.ValueUnsafe() ); + } + std::vector & out = unnamed_output().reserveSpace>(); + out.clear(); + m_structsVecPtr = &out; + m_reader -> parseBatches( batches ); + m_structsVecPtr = nullptr; + } + } +}; + +EXPORT_CPPNODE( record_batches_to_struct ); + +DECLARE_CPPNODE( struct_to_record_batches ) +{ + SCALAR_INPUT( DialectGenericType, schema_ptr ); + SCALAR_INPUT( StructMetaPtr, cls ); + SCALAR_INPUT( DictionaryPtr, properties ); + SCALAR_INPUT( int64_t, chunk_size ); + TS_INPUT( Generic, data ); + + TS_OUTPUT( Generic ); + + using StructParquetOutputHandler = csp::adapters::parquet::StructParquetOutputHandler; + using ListColumnParquetOutputHandler = csp::adapters::parquet::ListColumnParquetOutputHandler; + using ParquetWriter = csp::adapters::parquet::ParquetWriter; + class MyParquetWriter : public ParquetWriter + { + public: + MyParquetWriter( int64_t chunk_size ): ParquetWriter(), m_chunkSize( chunk_size ) + { + if( m_chunkSize <= 0 ) + { + CSP_THROW( ValueError, "Chunk size should be >= 0" ); + } + } + std::uint32_t getChunkSize() const override{ return m_chunkSize; } + private: + int64_t m_chunkSize = 0; + }; + + std::shared_ptr m_handler; + std::vector> m_numpyArrayHandlers; + std::vector m_numpyArrayWriters; + std::vector m_listFieldPtrs; + std::vector m_dimIndex; + std::vector> m_numpyArrayDimHandlers; + std::vector m_numpyArrayDimWriters; + std::shared_ptr m_writer; + std::shared_ptr m_schema; + std::vector> m_dimColumnMapping; + CspTypePtr m_outType; + + INIT_CPPNODE( struct_to_record_batches ) + { + auto & input_def = tsinputDef( "data" ); + if( input_def.type -> type() != CspType::Type::ARRAY ) + CSP_THROW( TypeError, "struct_to_record_batches expected ts array type, got " << input_def.type -> type() ); + + auto * aType = static_cast( input_def.type.get() ); + CspTypePtr elemType = aType -> elemType(); + if( elemType -> type() != CspType::Type::STRUCT ) + CSP_THROW( TypeError, "struct_to_record_batches expected ts array of structs type, got " << elemType -> type() ); + + auto & output_def = tsoutputDef( "" ); + if( output_def.type -> type() != CspType::Type::ARRAY ) + CSP_THROW( TypeError, "struct_to_record_batches expected ts array type, got " << output_def.type -> type() ); + } + + START() + { + // Create Adapters for Schema + auto field_map = properties.value() -> get( "field_map" ); + m_writer = std::make_shared( chunk_size.value() ); + m_outType = std::make_shared( cls.value() ); + m_handler = std::make_shared( engine(), *m_writer, m_outType, field_map ); + std::vector> arrow_fields; + for( unsigned i = 0; i < m_handler -> getNumColumns(); i++ ) + { + arrow_fields.push_back( arrow::field( m_handler -> getColumnArrayBuilder( i ) -> getColumnName(), + m_handler -> getColumnArrayBuilder( i ) -> getDataType() ) ); + } + auto numpy_fields = properties.value() -> get( "numpy_fields" ); + auto numpy_field_types = properties.value() -> get( "numpy_field_types" ); + auto numpy_dimension_names = properties.value() -> get( "numpy_dimension_names" ); + auto numpy_dimension_types = properties.value() -> get( "numpy_dimension_types" ); + + m_numpyArrayWriters.resize( numpy_fields -> size() ); + m_numpyArrayHandlers.resize( numpy_fields -> size() ); + m_dimIndex.resize( numpy_fields -> size() ); + m_numpyArrayDimWriters.resize( numpy_dimension_names -> size() ); + m_numpyArrayDimHandlers.resize( numpy_dimension_names -> size() ); + unsigned array_idx = 0; + unsigned dim_array_idx = 0; + + auto out_type = std::make_shared( cls.value() ); + for( auto it = numpy_fields-> begin(); it != numpy_fields-> end(); ++it ) + { + auto field_name = it.key(); + auto col_name = it.value(); + auto field_type = numpy_field_types -> get(field_name); + auto &&value_type = csp::python::pyTypeAsCspType( csp::python::toPythonBorrowed( field_type ) ); + m_numpyArrayWriters[array_idx] = csp::python::create_numpy_array_writer_impl( value_type ); + m_numpyArrayHandlers[array_idx] = std::make_shared( engine(), *m_writer, value_type, col_name, m_numpyArrayWriters[array_idx] ); + m_listFieldPtrs.push_back( out_type -> meta() -> field( field_name ) ); + arrow_fields.push_back( arrow::field( m_numpyArrayHandlers[array_idx] -> getColumnArrayBuilder( 0 ) -> getColumnName(), + m_numpyArrayHandlers[array_idx] -> getColumnArrayBuilder( 0 ) -> getDataType() ) ); + if( numpy_dimension_names -> exists( field_name ) ) + { + auto dim_col_name = numpy_dimension_names -> get( field_name ); + auto dim_col_type = numpy_dimension_types -> get( field_name ); + auto &&dim_col_value_type = csp::python::pyTypeAsCspType( csp::python::toPythonBorrowed( dim_col_type ) ); + m_numpyArrayDimWriters[dim_array_idx] = csp::python::create_numpy_array_writer_impl( dim_col_value_type ); + m_numpyArrayDimHandlers[dim_array_idx] = std::make_shared( engine(), *m_writer, dim_col_value_type, dim_col_name, m_numpyArrayDimWriters[dim_array_idx] ); + arrow_fields.push_back( arrow::field( m_numpyArrayDimHandlers[dim_array_idx] -> getColumnArrayBuilder( 0 ) -> getColumnName(), + m_numpyArrayDimHandlers[dim_array_idx] -> getColumnArrayBuilder( 0 ) -> getDataType() ) ); + m_dimIndex[array_idx] = dim_array_idx; + dim_array_idx++; + } + else + { + m_dimIndex[array_idx] = -1; + } + array_idx++; + } + m_schema = arrow::schema( arrow_fields ); + } + + DialectGenericType getData( int num_rows ) + { + std::vector> columns; + columns.reserve( m_handler -> getNumColumns() ); + for( unsigned i = 0; i < m_handler -> getNumColumns(); i++ ) + { + columns.push_back( m_handler -> getColumnArrayBuilder( i ) -> buildArray() ); + } + for( unsigned i = 0; i < m_listFieldPtrs.size(); i++ ) + { + columns.push_back( m_numpyArrayHandlers[i] -> getColumnArrayBuilder( 0 ) -> buildArray() ); + if( m_dimIndex[i] != -1 ) + { + columns.push_back( m_numpyArrayDimHandlers[m_dimIndex[i]] -> getColumnArrayBuilder( 0 ) -> buildArray() ); + } + } + auto rb_ptr = arrow::RecordBatch::Make( m_schema, num_rows, columns ); + const arrow::RecordBatch& rb = *rb_ptr; + struct ArrowSchema* rb_schema = ( struct ArrowSchema* )malloc( sizeof( struct ArrowSchema ) ); + struct ArrowArray* rb_array = ( struct ArrowArray* )malloc( sizeof( struct ArrowArray ) ); + arrow::Status st = arrow::ExportRecordBatch( rb, rb_array, rb_schema ); + auto py_schema = csp::python::PyObjectPtr::own( PyCapsule_New( rb_schema, "arrow_schema", ReleaseArrowSchemaPyCapsule ) ); + auto py_array = csp::python::PyObjectPtr::own( PyCapsule_New( rb_array, "arrow_array", ReleaseArrowArrayPyCapsule ) ); + auto py_tuple = csp::python::PyObjectPtr::own( PyTuple_Pack( 2, py_schema.get(), py_array.get() ) ); + return csp::python::fromPython( py_tuple.get() ); + } + + INVOKE() + { + if( csp.ticked( data ) ) + { + std::vector & out = unnamed_output().reserveSpace>(); + out.clear(); + auto & structs = data.lastValue>(); + uint32_t cur_chunk_size = 0; + for( auto& st: structs ) + { + m_handler -> writeValueFromArgs( st ); + for( unsigned i = 0; i < m_handler -> getNumColumns(); i++ ) + { + m_handler -> getColumnArrayBuilder( i ) -> handleRowFinished(); + } + for( unsigned i = 0; i < m_listFieldPtrs.size(); i++ ) + { + auto& field_ptr = m_listFieldPtrs[i]; + if( m_dimIndex[i] != -1 ) + { + auto& list_handler = m_numpyArrayHandlers[i]; + auto ndarray = field_ptr -> value( st.get() ); + DialectGenericType flat_array, shape; + std::tie( flat_array, shape ) = numpy_ndarray_flatten( std::move(ndarray) ); + list_handler -> writeValueFromArgs( flat_array ); + list_handler -> getColumnArrayBuilder( 0 ) -> handleRowFinished(); + + auto& dim_list_handler = m_numpyArrayDimHandlers[m_dimIndex[i]]; + dim_list_handler -> writeValueFromArgs( shape ); + dim_list_handler -> getColumnArrayBuilder( 0 ) -> handleRowFinished(); + } + else + { + auto& list_handler = m_numpyArrayHandlers[i]; + list_handler -> writeValueFromArgs( field_ptr -> value( st.get() ) ); + list_handler -> getColumnArrayBuilder( 0 ) -> handleRowFinished(); + } + } + if( ++cur_chunk_size >= m_writer -> getChunkSize() ) + { + out.emplace_back( getData( cur_chunk_size ) ); + cur_chunk_size = 0; + } + } + if( cur_chunk_size > 0) + { + out.emplace_back( getData( cur_chunk_size ) ); + } + } + } +}; + +EXPORT_CPPNODE( struct_to_record_batches ); + } // Base nodes @@ -350,6 +833,8 @@ REGISTER_CPPNODE( csp::cppnodes, struct_fromts ); REGISTER_CPPNODE( csp::cppnodes, struct_collectts ); REGISTER_CPPNODE( csp::cppnodes, exprtk_impl ); +REGISTER_CPPNODE( csp::cppnodes, record_batches_to_struct ); +REGISTER_CPPNODE( csp::cppnodes, struct_to_record_batches ); static PyModuleDef _cspbaselibimpl_module = { PyModuleDef_HEAD_INIT, diff --git a/csp/adapters/arrow.py b/csp/adapters/arrow.py new file mode 100644 index 000000000..d1426788d --- /dev/null +++ b/csp/adapters/arrow.py @@ -0,0 +1,194 @@ +from typing import Iterable, List + +import pyarrow as pa +import pyarrow.compute as pc +import pyarrow.parquet as pq +import pytz + +import csp +from csp.impl.types.tstype import ts +from csp.impl.wiring import py_pull_adapter_def + +__all__ = [ + "ArrowHistoricalAdapter", + "write_record_batches", +] + + +class ArrowHistoricalAdapterImpl(csp.impl.pulladapter.PullInputAdapter): + """Stream record batches from some source into csp""" + + def __init__(self, ts_col_name: str, source: Iterable[pa.RecordBatch]): + """ + Args: + ts_col_name: name of column that contains the timestamp field + source: an optional iterable of record batches + # tables: an optional iterable for arrow tables to read from + # filenames: an optional iterable of parquet files to read from + + NOTE: The user is responsible for ensuring that the data is sorted in + ascending order on the 'ts_col_name' field + NOTE: The user is responsible for ensuring that all the record batches + have the same schema + """ + self.source = source + self.ts_col_name = ts_col_name + self.utc_tz = pytz.timezone("UTC") + super().__init__() + + def start(self, start_time, end_time): + if start_time.tzinfo is None: + self.start_time = self.utc_tz.localize(start_time) + else: + self.start_time = start_time.astimezone(self.utc_tz) + if end_time.tzinfo is None: + self.end_time = self.utc_tz.localize(end_time) + else: + self.end_time = end_time.astimezone(self.utc_tz) + + # Info about the last chunk of data + self.last_chunk = None + self.last_ts = None + # No of chunks in this batch + self.batch_chunks_count = 0 + # Iterator for iterating over the chunks in a batch + self.chunk_index_iter = None + # No of chunks processed till now + self.processed_chunks_count = 0 + # current batch being processed + self.batch = None + # all batches processed + self.finished = False + # start time filtering done + self.filtered_start_time = False + # the starting batch with start_time filtered + self.starting_batch = None + super().start(start_time, end_time) + + def next(self): + if self.finished: + return None + + # Filter out all batches which have ts < start time + while not self.filtered_start_time and not self.finished: + try: + batch = next(self.source) + if batch.num_rows != 0: + # NOTE: filter might be a good option to avoid this indirect way of computing the slice, + # however I am not sure if filter will be zero copy + ts_col = batch[self.ts_col_name] + start_time = self.start_time + if ts_col.type.tz is None: + start_time = self.start_time.replace(tzinfo=None) + else: + start_time = self.start_time.astimezone(pytz.timezone(ts_col.type.tz)) + valid_indices = pc.indices_nonzero(pc.greater_equal(ts_col, start_time)) + if len(valid_indices) != 0: + # Slice to only get the records with ts >= start_time + self.starting_batch = batch.slice(offset=valid_indices[0].as_py()) + self.filtered_start_time = True + except StopIteration: + self.finished = True + + while not self.finished: + # Process all the chunks in current batch + if self.chunk_index_iter: + try: + start_idx, next_start_idx = next(self.chunk_index_iter) + new_batches = [self.batch.slice(offset=start_idx, length=next_start_idx - start_idx)] + new_ts = self.batch[self.ts_col_name][start_idx].as_py() + if new_ts.tzinfo is None: + new_ts = self.utc_tz.localize(new_ts) + self.processed_chunks_count += 1 + if self.last_chunk: + if self.last_ts == new_ts: + new_batches = self.last_chunk + new_batches + self.last_chunk = None + self.last_ts = None + else: + raise Exception("last_chunk and new_batches have different timestamps") + + if self.processed_chunks_count == self.batch_chunks_count: + self.last_chunk = new_batches + self.last_ts = new_ts + self.processed_chunks_count = 0 + else: + if new_ts > self.end_time: + self.finished = True + continue + return (new_ts, new_batches) + except StopIteration: + raise Exception("chunk_index_iter reached end, how?") + + # Try to get a new batch of data + try: + if self.starting_batch: + # Use the sliced batch from start_time filtering + self.batch = self.starting_batch + self.starting_batch = None + else: + # Get the next batch of data + self.batch = next(self.source) + if self.batch.num_rows == 0: + continue + + all_timestamps = self.batch[self.ts_col_name] + unique_timestamps = all_timestamps.unique() + indexes = pc.index_in(unique_timestamps, all_timestamps).to_pylist() + [self.batch.num_rows] + self.chunk_index_iter = zip(indexes, indexes[1:]) + self.batch_chunks_count = len(unique_timestamps) + starting_ts = unique_timestamps[0].as_py() + if starting_ts != self.last_ts and self.last_chunk: + new_batches = self.last_chunk + new_ts = self.last_ts + self.last_chunk = None + self.last_ts = None + if new_ts > self.end_time: + self.finished = True + continue + return (new_ts, new_batches) + except StopIteration: + self.finished = True + if self.last_chunk: + if self.last_ts > self.end_time: + continue + return (self.last_ts, self.last_chunk) + return None + + +ArrowHistoricalAdapter = py_pull_adapter_def( + "ArrowHistoricalAdapter", + ArrowHistoricalAdapterImpl, + ts[List[pa.RecordBatch]], + ts_col_name=str, + source=Iterable[pa.RecordBatch], +) + + +@csp.node +def write_record_batches(where: str, merge_record_batches: bool, batches: csp.ts[List[pa.RecordBatch]], kwargs: dict): + """ + Dump all the record batches to a parquet file + + Args: + where: destination to write the data to + merge_record_batches: A flag to combine all the record batches of a single tick into a single record batch (can save some space at the cost of memory) + batches: The timeseries of list of record batches + **kwargs: additional args to pass to the ParquetWriter + """ + with csp.state(): + s_writer = None + s_destination = where + s_merge_batches = merge_record_batches + + with csp.stop(): + s_writer.close() + + if csp.ticked(batches): + if s_merge_batches: + batches = [pa.concat_batches(batches)] + + for batch in batches: + if s_writer is None: + s_writer = pq.ParquetWriter(s_destination, batch.schema, **kwargs) + s_writer.write_batch(batch) diff --git a/csp/baselib.py b/csp/baselib.py index 1a628afe1..28f50c7d7 100644 --- a/csp/baselib.py +++ b/csp/baselib.py @@ -5,9 +5,10 @@ import queue import threading from datetime import datetime, timedelta -from typing import Callable, Dict, List, Optional, TypeVar, Union +from typing import Callable, Dict, List, Optional, Tuple, TypeVar, Union import numpy as np +import pyarrow as pa import pytz import csp @@ -15,6 +16,7 @@ from csp.impl.constants import UNSET from csp.impl.types.common_definitions import OutputBasket, Outputs from csp.impl.types.tstype import ts +from csp.impl.types.typing_utils import CspTypingUtils from csp.impl.wiring import DelayedEdge, Edge, OutputsContainer, graph, input_adapter_def, node from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef from csp.lib import _cspbaselibimpl @@ -62,6 +64,8 @@ "times_ns", "unroll", "wrap_feedback", + "record_batches_to_struct", + "struct_to_record_batches", ] T = TypeVar("T") @@ -622,6 +626,129 @@ def accum(x: ts["T"], start: "~T" = 0) -> ts["T"]: return s_accum +@node(cppimpl=_cspbaselibimpl.struct_to_record_batches) +def _struct_to_record_batches( + schema_ptr: object, + cls: "T", + properties: dict, + chunk_size: int, + data: ts[List["T"]], +) -> ts[List[Tuple[object, object]]]: + raise NotImplementedError("No python implementation of exprtk_impl") + return None + + +class _ArrowDummy: + def __init__(self, tup): + self.tup = tup + + def __arrow_c_array__(self, requested_schema=None): + return self.tup + + +@graph +def struct_to_record_batches( + schema_ptr: pa.Schema, cls: "T", properties: dict, chunk_size: int, data: ts[List["T"]] +) -> ts[List[pa.RecordBatch]]: + field_map = properties["field_map"] + numpy_dimensions_column_map = properties.get("numpy_dimensions_column_map", {}) + meta_typed = cls.metadata(typed=True) + new_field_map = {} + numpy_fields = {} + numpy_field_types = {} + numpy_dimension_names = {} + numpy_dimension_types = {} + for struct_field_name, arrow_field_name in field_map.items(): + field_typ = meta_typed[struct_field_name] + if CspTypingUtils.is_numpy_array_type(field_typ): + value_type = field_typ.__args__[0] + if CspTypingUtils.get_origin(field_typ) is csp.typing.NumpyNDArray: + from csp.adapters.output_adapters.parquet import resolve_array_shape_column_name as get_dim_col_name + + array_dimensions_column_name = get_dim_col_name( + arrow_field_name, numpy_dimensions_column_map.get(arrow_field_name, None) + ) + numpy_fields[arrow_field_name] = struct_field_name + numpy_field_types[arrow_field_name] = value_type + numpy_dimension_names[arrow_field_name] = array_dimensions_column_name + numpy_dimension_types[arrow_field_name] = int + else: + numpy_fields[arrow_field_name] = struct_field_name + numpy_field_types[arrow_field_name] = value_type + else: + new_field_map[arrow_field_name] = struct_field_name + properties["field_map"] = new_field_map + properties["numpy_fields"] = numpy_fields + properties["numpy_field_types"] = numpy_field_types + properties["numpy_dimension_names"] = numpy_dimension_names + properties["numpy_dimension_types"] = numpy_dimension_types + + tups = _struct_to_record_batches( + schema_ptr.__arrow_c_schema__(), + cls, + properties, + chunk_size, + data, + ) + return apply(tups, lambda tups: [pa.record_batch(_ArrowDummy(tup)) for tup in tups], List[pa.RecordBatch]) + + +@node(cppimpl=_cspbaselibimpl.record_batches_to_struct) +def _record_batches_to_struct( + schema_ptr: object, + cls: "T", + properties: dict, + data: ts[List[Tuple[object, object]]], +) -> ts[List["T"]]: + raise NotImplementedError("No python implementation of exprtk_impl") + return None + + +@graph +def record_batches_to_struct( + schema_ptr: pa.Schema, cls: "T", properties: dict, data: ts[List[pa.RecordBatch]] +) -> ts[List["T"]]: + field_map = properties["field_map"] + numpy_dimensions_column_map = properties.get("numpy_dimensions_column_map", {}) + meta_typed = cls.metadata(typed=True) + new_field_map = {} + numpy_fields = {} + numpy_field_types = {} + numpy_dimension_names = {} + numpy_dimension_types = {} + for arrow_field_name, struct_field_name in field_map.items(): + field_typ = meta_typed[struct_field_name] + if CspTypingUtils.is_numpy_array_type(field_typ): + value_type = field_typ.__args__[0] + if CspTypingUtils.get_origin(field_typ) is csp.typing.NumpyNDArray: + from csp.adapters.output_adapters.parquet import resolve_array_shape_column_name as get_dim_col_name + + array_dimensions_column_name = get_dim_col_name( + arrow_field_name, numpy_dimensions_column_map.get(arrow_field_name, None) + ) + numpy_fields[arrow_field_name] = struct_field_name + numpy_field_types[arrow_field_name] = value_type + numpy_dimension_names[arrow_field_name] = array_dimensions_column_name + numpy_dimension_types[arrow_field_name] = int + else: + numpy_fields[arrow_field_name] = struct_field_name + numpy_field_types[arrow_field_name] = value_type + else: + new_field_map[arrow_field_name] = struct_field_name + properties["field_map"] = new_field_map + properties["numpy_fields"] = numpy_fields + properties["numpy_field_types"] = numpy_field_types + properties["numpy_dimension_names"] = numpy_dimension_names + properties["numpy_dimension_types"] = numpy_dimension_types + + return _record_batches_to_struct( + schema_ptr.__arrow_c_schema__(), + cls, + properties, + apply(data, lambda rbs: [rb.__arrow_c_array__() for rb in rbs], List[Tuple[object, object]]), + ) + + @node(cppimpl=_cspbaselibimpl.exprtk_impl) def _csp_exprtk_impl( expression_str: str,