Kouhei Sutou
null+****@clear*****
Sat May 13 10:46:39 JST 2017
Kouhei Sutou 2017-05-13 10:46:39 +0900 (Sat, 13 May 2017) New Revision: 270739c8b7bb65f78bdf93b4e0876378af46e453 https://github.com/groonga/groonga/commit/270739c8b7bb65f78bdf93b4e0876378af46e453 Message: [WIP] Support importing to and exporting from Apache Arrow It doesn't work yet!!! Added files: lib/arrow.cpp Modified files: configure.ac lib/Makefile.am lib/sources.am Modified: configure.ac (+23 -0) =================================================================== --- configure.ac 2017-05-11 22:52:31 +0900 (4362386) +++ configure.ac 2017-05-13 10:46:39 +0900 (7724f82) @@ -960,6 +960,29 @@ if test "x$with_jemalloc" != "xno"; then [AC_MSG_ERROR("No libjemalloc found")]) fi +# Apache Arrow +AC_ARG_ENABLE(arrow, + [AS_HELP_STRING([--disable-arrow], + [enable Apache Arrow support. [default=auto-detect]])], + [enable_arrow="$enableval"], + [enable_arrow="auto"]) +if test "x$enable_arrow" != "xno"; then + m4_ifdef([PKG_CHECK_MODULES], [ + PKG_CHECK_MODULES([ARROW], + [arrow], + [arrow_available=yes], + [arrow_available=no]) + ], + [arrow_available=no]) + if test "x$arrow_available" = "xyes"; then + AC_DEFINE(GRN_WITH_ARROW, [1], [Enable Apache Arrow support.]) + else + if test "x$enable_arrow" = "xyes"; then + AC_MSG_ERROR("No Apache Arrow found") + fi + fi +fi + # MeCab # NOTE: MUST be checked last Modified: lib/Makefile.am (+8 -1) =================================================================== --- lib/Makefile.am 2017-05-11 22:52:31 +0900 (4cf2ac9) +++ lib/Makefile.am 2017-05-13 10:46:39 +0900 (7a7281e) @@ -19,6 +19,12 @@ AM_CFLAGS = \ $(LIBLZ4_CFLAGS) \ $(LIBZSTD_CFLAGS) +AM_CXXFLAGS = \ + $(NO_STRICT_ALIASING_CFLAGS) \ + $(COVERAGE_CFLAGS) \ + $(GRN_CXXFLAGS) \ + $(ARROW_CFLAGS) + BUNDLED_LIBRARIES_CFLAGS = \ $(MRUBY_CFLAGS) \ $(ONIGMO_CFLAGS) @@ -56,7 +62,8 @@ libgroonga_la_LIBADD += \ $(ONIGMO_LIBS) \ $(LIBLZ4_LIBS) \ $(LIBZSTD_LIBS) \ - $(ATOMIC_LIBS) + $(ATOMIC_LIBS) \ + $(ARROW_LIBS) if WITH_LEMON BUILT_SOURCES = \ Added: lib/arrow.cpp (+164 -0) 100644 =================================================================== --- /dev/null +++ lib/arrow.cpp 2017-05-13 10:46:39 +0900 (92b1015) @@ -0,0 +1,164 @@ +/* -*- c-basic-offset: 2 -*- */ +/* + Copyright(C) 2017 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "grn.h" +#include <groonga.hpp> + +#include <arrow/api.h> +#include <arrow/io/file.h> +#include <arrow/ipc/api.h> + +namespace grnarrow { + class ColumnImportVisitor : public arrow::ArrayVisitor { + public: + ColumnImportVisitor(grn_ctx *ctx, + const grn_id *ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> &arrow_array) + : ctx_(ctx), + ids_(ids), + grn_column_(grn_column), + arrow_array_(arrow_array) { + switch (grn_column_->header.type) { + case GRN_DB_BOOL : + GRN_BOOL_INIT(&buffer_, 0); + default : + GRN_VOID_INIT(&buffer_); + break; + } + } + + ~ColumnImportVisitor() { + GRN_OBJ_FIN(ctx_, &buffer_); + } + + arrow::Status Visit(const arrow::BooleanArray &array) { + int64_t n_rows = array.length(); + for (int i = 0; i < n_rows; ++i) { + auto id = ids_[i]; + GRN_BULK_REWIND(&buffer_); + GRN_BOOL_SET(ctx_, &buffer_, array.Value(i)); + grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET); + } + return arrow::Status::OK(); + } + + private: + grn_ctx *ctx_; + const grn_id *ids_; + grn_obj *grn_column_; + std::shared_ptr<arrow::Array> arrow_array_; + grn_obj buffer_; + }; + + class Importer { + public: + Importer(grn_ctx *ctx, grn_obj *grn_table) + : ctx_(ctx), + grn_table_(grn_table), + key_column_name_(nullptr) { + } + + ~Importer() { + } + + grn_rc import_table(const std::shared_ptr<arrow::Table> &arrow_table) { + int n_columns = arrow_table->num_columns(); + + if (key_column_name_.empty()) { + grn_obj ids; + GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_)); + auto n_records = arrow_table->num_rows(); + for (int64_t i = 0; i < n_records; ++i) { + auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL); + GRN_RECORD_PUT(ctx_, &ids, id); + } + for (int i = 0; i < n_columns; ++i) { + int64_t offset = 0; + auto arrow_column = arrow_table->column(i); + auto column_name = arrow_column->name(); + auto grn_column = grn_obj_column(ctx_, grn_table_, + column_name.data(), + column_name.size()); + auto arrow_chunked_data = arrow_column->data(); + for (auto arrow_array : arrow_chunked_data->chunks()) { + grn_id *sub_ids = + reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset; + ColumnImportVisitor visitor(ctx_, + sub_ids, + grn_column, + arrow_array); + arrow_array->Accept(&visitor); + offset += arrow_array->length(); + } + if (grn_obj_is_accessor(ctx_, grn_column)) { + grn_obj_unlink(ctx_, grn_column); + } + } + GRN_OBJ_FIN(ctx_, &ids); + } else { + } + return ctx_->rc; + }; + + grn_rc import_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) { + std::shared_ptr<arrow::Table> arrow_table; + std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1); + arrow_record_batches[0] = arrow_record_batch; + auto status = + arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table); + // TODO: check status + return import_table(arrow_table); + }; + + private: + grn_ctx *ctx_; + grn_obj *grn_table_; + std::string key_column_name_; + }; +} + +extern "C" { +grn_rc +grn_arrow_import_from_path(grn_ctx *ctx, + grn_obj *table, + const char *path) +{ + std::shared_ptr<arrow::io::MemoryMappedFile> input; + auto status = + arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input); + // TODO: check status + std::shared_ptr<arrow::ipc::FileReader> reader; + status = arrow::ipc::FileReader::Open(input, &reader); + // TODO: check status + + grnarrow::Importer importer(ctx, table); + int n_record_batches = reader->num_record_batches(); + for (int i = 0; i < n_record_batches; ++i) { + std::shared_ptr<arrow::RecordBatch> record_batch; + status = reader->GetRecordBatch(i, &record_batch); + // TODO: check status + importer.import_record_batch(record_batch); + if (ctx->rc != GRN_SUCCESS) { + break; + } + } + + return ctx->rc; +} +} Modified: lib/sources.am (+1 -0) =================================================================== --- lib/sources.am 2017-05-11 22:52:31 +0900 (02868a0) +++ lib/sources.am 2017-05-13 10:46:39 +0900 (355929a) @@ -1,6 +1,7 @@ libgroonga_la_SOURCES = \ alloc.c \ grn_alloc.h \ + arrow.cpp \ cache.c \ grn_cache.h \ column.c \ -------------- next part -------------- HTML����������������������������... 下载