[Groonga-commit] groonga/groonga at 270739c [support-arrow] [WIP] Support importing to and exporting from Apache Arrow

Back to archive index

Kouhei Sutou null+****@clear*****
Sat May 13 10:46:39 JST 2017


Kouhei Sutou	2017-05-13 10:46:39 +0900 (Sat, 13 May 2017)

  New Revision: 270739c8b7bb65f78bdf93b4e0876378af46e453
  https://github.com/groonga/groonga/commit/270739c8b7bb65f78bdf93b4e0876378af46e453

  Message:
    [WIP] Support importing to and exporting from Apache Arrow
    
    It doesn't work yet!!!

  Added files:
    lib/arrow.cpp
  Modified files:
    configure.ac
    lib/Makefile.am
    lib/sources.am

  Modified: configure.ac (+23 -0)
===================================================================
--- configure.ac    2017-05-11 22:52:31 +0900 (4362386)
+++ configure.ac    2017-05-13 10:46:39 +0900 (7724f82)
@@ -960,6 +960,29 @@ if test "x$with_jemalloc" != "xno"; then
                  [AC_MSG_ERROR("No libjemalloc found")])
 fi
 
+# Apache Arrow
+AC_ARG_ENABLE(arrow,
+  [AS_HELP_STRING([--disable-arrow],
+    [enable Apache Arrow support. [default=auto-detect]])],
+  [enable_arrow="$enableval"],
+  [enable_arrow="auto"])
+if test "x$enable_arrow" != "xno"; then
+  m4_ifdef([PKG_CHECK_MODULES], [
+    PKG_CHECK_MODULES([ARROW],
+                      [arrow],
+                      [arrow_available=yes],
+                      [arrow_available=no])
+  ],
+  [arrow_available=no])
+  if test "x$arrow_available" = "xyes"; then
+    AC_DEFINE(GRN_WITH_ARROW, [1], [Enable Apache Arrow support.])
+  else
+    if test "x$enable_arrow" = "xyes"; then
+      AC_MSG_ERROR("No Apache Arrow found")
+    fi
+  fi
+fi
+
 # MeCab
 # NOTE: MUST be checked last
 

  Modified: lib/Makefile.am (+8 -1)
===================================================================
--- lib/Makefile.am    2017-05-11 22:52:31 +0900 (4cf2ac9)
+++ lib/Makefile.am    2017-05-13 10:46:39 +0900 (7a7281e)
@@ -19,6 +19,12 @@ AM_CFLAGS =					\
 	$(LIBLZ4_CFLAGS)			\
 	$(LIBZSTD_CFLAGS)
 
+AM_CXXFLAGS =					\
+	$(NO_STRICT_ALIASING_CFLAGS)		\
+	$(COVERAGE_CFLAGS)			\
+	$(GRN_CXXFLAGS)				\
+	$(ARROW_CFLAGS)
+
 BUNDLED_LIBRARIES_CFLAGS =			\
 	$(MRUBY_CFLAGS)				\
 	$(ONIGMO_CFLAGS)
@@ -56,7 +62,8 @@ libgroonga_la_LIBADD +=				\
 	$(ONIGMO_LIBS)				\
 	$(LIBLZ4_LIBS)				\
 	$(LIBZSTD_LIBS)				\
-	$(ATOMIC_LIBS)
+	$(ATOMIC_LIBS)				\
+	$(ARROW_LIBS)
 
 if WITH_LEMON
 BUILT_SOURCES =					\

  Added: lib/arrow.cpp (+164 -0) 100644
===================================================================
--- /dev/null
+++ lib/arrow.cpp    2017-05-13 10:46:39 +0900 (92b1015)
@@ -0,0 +1,164 @@
+/* -*- c-basic-offset: 2 -*- */
+/*
+  Copyright(C) 2017 Brazil
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License version 2.1 as published by the Free Software Foundation.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#include "grn.h"
+#include <groonga.hpp>
+
+#include <arrow/api.h>
+#include <arrow/io/file.h>
+#include <arrow/ipc/api.h>
+
+namespace grnarrow {
+  class ColumnImportVisitor : public arrow::ArrayVisitor {
+  public:
+    ColumnImportVisitor(grn_ctx *ctx,
+                        const grn_id *ids,
+                        grn_obj *grn_column,
+                        std::shared_ptr<arrow::Array> &arrow_array)
+      : ctx_(ctx),
+        ids_(ids),
+        grn_column_(grn_column),
+        arrow_array_(arrow_array) {
+      switch (grn_column_->header.type) {
+      case GRN_DB_BOOL :
+        GRN_BOOL_INIT(&buffer_, 0);
+      default :
+        GRN_VOID_INIT(&buffer_);
+        break;
+      }
+    }
+
+    ~ColumnImportVisitor() {
+      GRN_OBJ_FIN(ctx_, &buffer_);
+    }
+
+    arrow::Status Visit(const arrow::BooleanArray &array) {
+      int64_t n_rows = array.length();
+      for (int i = 0; i < n_rows; ++i) {
+        auto id = ids_[i];
+        GRN_BULK_REWIND(&buffer_);
+        GRN_BOOL_SET(ctx_, &buffer_, array.Value(i));
+        grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET);
+      }
+      return arrow::Status::OK();
+    }
+
+  private:
+    grn_ctx *ctx_;
+    const grn_id *ids_;
+    grn_obj *grn_column_;
+    std::shared_ptr<arrow::Array> arrow_array_;
+    grn_obj buffer_;
+  };
+
+  class Importer {
+  public:
+    Importer(grn_ctx *ctx, grn_obj *grn_table)
+      : ctx_(ctx),
+        grn_table_(grn_table),
+        key_column_name_(nullptr) {
+    }
+
+    ~Importer() {
+    }
+
+    grn_rc import_table(const std::shared_ptr<arrow::Table> &arrow_table) {
+      int n_columns = arrow_table->num_columns();
+
+      if (key_column_name_.empty()) {
+        grn_obj ids;
+        GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_));
+        auto n_records = arrow_table->num_rows();
+        for (int64_t i = 0; i < n_records; ++i) {
+          auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL);
+          GRN_RECORD_PUT(ctx_, &ids, id);
+        }
+        for (int i = 0; i < n_columns; ++i) {
+          int64_t offset = 0;
+          auto arrow_column = arrow_table->column(i);
+          auto column_name = arrow_column->name();
+          auto grn_column = grn_obj_column(ctx_, grn_table_,
+                                           column_name.data(),
+                                           column_name.size());
+          auto arrow_chunked_data = arrow_column->data();
+          for (auto arrow_array : arrow_chunked_data->chunks()) {
+            grn_id *sub_ids =
+              reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset;
+            ColumnImportVisitor visitor(ctx_,
+                                        sub_ids,
+                                        grn_column,
+                                        arrow_array);
+            arrow_array->Accept(&visitor);
+            offset += arrow_array->length();
+          }
+          if (grn_obj_is_accessor(ctx_, grn_column)) {
+            grn_obj_unlink(ctx_, grn_column);
+          }
+        }
+        GRN_OBJ_FIN(ctx_, &ids);
+      } else {
+      }
+      return ctx_->rc;
+    };
+
+    grn_rc import_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) {
+      std::shared_ptr<arrow::Table> arrow_table;
+      std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1);
+      arrow_record_batches[0] = arrow_record_batch;
+      auto status =
+        arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table);
+      // TODO: check status
+      return import_table(arrow_table);
+    };
+
+  private:
+    grn_ctx *ctx_;
+    grn_obj *grn_table_;
+    std::string key_column_name_;
+  };
+}
+
+extern "C" {
+grn_rc
+grn_arrow_import_from_path(grn_ctx *ctx,
+                           grn_obj *table,
+                           const char *path)
+{
+  std::shared_ptr<arrow::io::MemoryMappedFile> input;
+  auto status =
+    arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input);
+  // TODO: check status
+  std::shared_ptr<arrow::ipc::FileReader> reader;
+  status = arrow::ipc::FileReader::Open(input, &reader);
+  // TODO: check status
+
+  grnarrow::Importer importer(ctx, table);
+  int n_record_batches = reader->num_record_batches();
+  for (int i = 0; i < n_record_batches; ++i) {
+    std::shared_ptr<arrow::RecordBatch> record_batch;
+    status = reader->GetRecordBatch(i, &record_batch);
+    // TODO: check status
+    importer.import_record_batch(record_batch);
+    if (ctx->rc != GRN_SUCCESS) {
+      break;
+    }
+  }
+
+  return ctx->rc;
+}
+}

  Modified: lib/sources.am (+1 -0)
===================================================================
--- lib/sources.am    2017-05-11 22:52:31 +0900 (02868a0)
+++ lib/sources.am    2017-05-13 10:46:39 +0900 (355929a)
@@ -1,6 +1,7 @@
 libgroonga_la_SOURCES =				\
 	alloc.c					\
 	grn_alloc.h				\
+	arrow.cpp				\
 	cache.c					\
 	grn_cache.h				\
 	column.c				\
-------------- next part --------------
HTML����������������������������...
下载 



More information about the Groonga-commit mailing list
Back to archive index