Skip to content

Add support of HDFS as remote object store #1062

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ unicode_expressions = ["unicode-segmentation"]
force_hash_collisions = []
# Used to enable the avro format
avro = ["avro-rs", "num-traits"]
# Used to enable hdfs as remote object store
hdfs = ["fs-hdfs"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


[dependencies]
ahash = "0.7"
Expand All @@ -72,6 +74,8 @@ smallvec = { version = "1.6", features = ["union"] }
rand = "0.8"
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
num-traits = { version = "0.2", optional = true }
fs-hdfs = { version = "^0.1.2", optional = true }
uuid = {version = "^0.8", features = ["v4"]}

[dev-dependencies]
criterion = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ pub use self::datasource::{TableProvider, TableType};
pub use self::memory::MemTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::{DataFusionError, Result};
use crate::physical_plan::common::build_file_list;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use object_store::list_file;
use std::sync::Arc;

/// Source for table input data
Expand Down Expand Up @@ -115,7 +115,7 @@ pub trait TableDescriptorBuilder {
provided_schema: Option<Schema>,
collect_statistics: bool,
) -> Result<TableDescriptor> {
let filenames = build_file_list(path, ext)?;
let filenames = list_file(path, Some(String::from(ext)))?;
if filenames.is_empty() {
return Err(DataFusionError::Plan(format!(
"No file (with .{} extension) found at path {}",
Expand Down
211 changes: 211 additions & 0 deletions datafusion/src/datasource/object_store/hdfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Object store that represents the HDFS File System.
use futures::{stream, StreamExt};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use async_trait::async_trait;
use futures::AsyncRead;

use hdfs::hdfs::{FileStatus, HdfsErr, HdfsFile, HdfsFs};

use crate::datasource::object_store::{
FileMeta, FileMetaStream, ListEntryStream, ObjectReader, ObjectStore,
};
use crate::error::{DataFusionError, Result};
use chrono::{NaiveDateTime, Utc};

pub mod os_parquet;

/// scheme for HDFS File System
pub static HDFS_SCHEME: &'static str = "hdfs";

/// Hadoop File.
#[derive(Clone)]
pub struct HadoopFile {
inner: HdfsFile,
}

unsafe impl Send for HadoopFile {}

unsafe impl Sync for HadoopFile {}

/// Hadoop File System as Object Store.
#[derive(Clone)]
pub struct HadoopFileSystem {
inner: HdfsFs,
}

impl HadoopFileSystem {
/// Get HdfsFs based on the input path
pub fn new(path: &str) -> Result<Self> {
HdfsFs::new(path)
.map(|fs| HadoopFileSystem { inner: fs })
.map_err(|e| DataFusionError::IoError(to_error(e)))
}

/// Open a HdfsFile with specified path
pub fn open(&self, path: &str) -> Result<HadoopFile> {
self.inner
.open(path)
.map(|file| HadoopFile { inner: file })
.map_err(|e| DataFusionError::IoError(to_error(e)))
}

/// Find out the files directly under a directory
fn find_files_in_dir(
&self,
path: String,
ext: &Option<String>,
to_visit: &mut Vec<String>,
) -> Result<Vec<FileMeta>> {
let mut files = Vec::new();

let children = self
.inner
.list_status(path.as_str())
.map_err(|e| to_error(e))?;
for child in children {
let child_path = child.name();
if child.is_directory() {
to_visit.push(child_path.to_string());
} else {
if ext.is_none() || child_path.ends_with(ext.as_ref().unwrap().as_str()) {
files.push(get_meta(child_path.to_owned(), child))
}
}
}

Ok(files)
}
}

impl Debug for HadoopFileSystem {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
todo!()
}
}

#[async_trait]
impl ObjectStore for HadoopFileSystem {
fn get_schema(&self) -> &'static str {
HDFS_SCHEME
}

async fn list_file(
&self,
prefix: &str,
ext: Option<String>,
) -> Result<FileMetaStream> {
// TODO list all of the files under a directory, including directly and indirectly
let files = self.find_files_in_dir(prefix.to_string(), &ext, &mut Vec::new())?;
get_files_in_dir(files).await
}

async fn list_dir(
&self,
prefix: &str,
delimiter: Option<String>,
) -> Result<ListEntryStream> {
todo!()
}

fn file_reader_from_path(&self, file_path: &str) -> Result<Arc<dyn ObjectReader>> {
let file_status = self
.inner
.get_file_status(file_path)
.map_err(|e| to_error(e))?;
if file_status.is_file() {
self.file_reader(get_meta(String::from(file_path), file_status))
} else {
Err(DataFusionError::IoError(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Path of {:?} is not for file", file_path),
)))
}
}

fn file_reader(&self, file: FileMeta) -> Result<Arc<dyn ObjectReader>> {
Ok(Arc::new(HadoopFileReader::new(file)?))
}
}

async fn get_files_in_dir(files: Vec<FileMeta>) -> Result<FileMetaStream> {
Ok(Box::pin(stream::iter(files).map(Ok)))
}

fn get_meta(path: String, file_status: FileStatus) -> FileMeta {
FileMeta {
path,
last_modified: Some(chrono::DateTime::<Utc>::from_utc(
NaiveDateTime::from_timestamp(file_status.last_modified(), 0),
Utc,
)),
size: file_status.len() as u64,
}
}

struct HadoopFileReader {
file: Arc<FileMeta>,
}

impl HadoopFileReader {
fn new(file: FileMeta) -> Result<Self> {
Ok(Self {
file: Arc::new(file),
})
}
}

#[async_trait]
impl ObjectReader for HadoopFileReader {
fn get_file_meta(&self) -> Arc<FileMeta> {
self.file.clone()
}

async fn chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Arc<dyn AsyncRead>> {
todo!()
}

fn length(&self) -> u64 {
self.file.size
}
}

fn to_error(err: HdfsErr) -> std::io::Error {
match err {
HdfsErr::FileNotFound(err_str) => {
std::io::Error::new(std::io::ErrorKind::NotFound, err_str.as_str())
}
HdfsErr::FileAlreadyExists(err_str) => {
std::io::Error::new(std::io::ErrorKind::AlreadyExists, err_str.as_str())
}
HdfsErr::CannotConnectToNameNode(err_str) => {
std::io::Error::new(std::io::ErrorKind::NotConnected, err_str.as_str())
}
HdfsErr::InvalidUrl(err_str) => {
std::io::Error::new(std::io::ErrorKind::InvalidInput, err_str.as_str())
}
HdfsErr::Unknown => std::io::Error::new(std::io::ErrorKind::Other, "Unknown"),
}
}
102 changes: 102 additions & 0 deletions datafusion/src/datasource/object_store/hdfs/os_parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Parquet related things for hdfs object store
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use parquet::file::reader::{ChunkReader, Length};
use parquet::util::io::{FileSource, TryClone};

use crate::datasource::object_store::hdfs::{to_error, HadoopFileSystem};
use crate::datasource::object_store::ObjectReader;
use crate::datasource::parquet::dynamic_reader::{DynChunkReader, DynRead};

use super::HadoopFile;
use crate::error::Result;

/// Parquet file reader for hdfs object store, by which we can get ``DynChunkReader``
#[derive(Clone)]
pub struct HadoopParquetFileReader {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(declaration: I am a newbie to this project, I may ask dumb questions)

Isn't the object store one layer lower than the file format? it should be transparent to a particular format, right?
Otherwise if we have N (local, hdfs, s3) types of object stores and M types of file format (csv, parquet, json), we will have N*M concrete implementations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, our current implementation of file formats work with a generic objectstore trait object, for example: https://github.com/apache/arrow-datafusion/blob/831e07debc4f136f2e47e126b20e441f7606bd74/datafusion/src/datasource/file_format/csv.rs#L98

Copy link

@coderplay coderplay Oct 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@houqp Then why do we need this HadoopParquetFileReader, plus the below LocalParquetFileReader ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I shouldn't have it, see my comment in #1062 (comment) :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR @yahoNanJing -- this is a great start. I haven't reviewed the code carefully, but the basic idea looks wonderful to me

How do you think we should proceed with parallel implementations with #1010 ? It seems like the actual hdfs bindings / object store implementation for hdfs are valuable, but the changes to the parquet reader are likely going to conflict

Thanks @alamb and @houqp for your comments. I'll do a refactoring based on the latest code merged with #1010. Sorry for the late reply since I'm asked to deal with other things😂

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally understand 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb, I've done the refactoring. Where should I create the PR for? Directly override the PR here or create another one? And as the discussion of #907, a more preferable way for the community may be to put connectors such as S3, HDFS in their own repositories for fast development iterations. Should I create another repository for this HDFS support.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @yahoNanJing -- if you have time here is what I recommend:

  1. Make a draft PR to this (arrow-datafusion) repo (so the conversation can include the code)
  2. Leave a note in the PR's description that you are looking for feedback about where to put the connector (this repo or a separate one)
  3. Then send a note to the [email protected] mailing list (or I can do this too) with a reference to the PR asking if anyone has feedback.

I have my own opinions on this matter, but I think we should get broader input before making a decision

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alamb, a new PR #1223 is created as a refactor of this PR.

Actually, in my option, it's better for DataFusion at least to support one remote object store by default in its own repository. In many companies around me, they still use HDFS. Therefore, it would be good to add HDFS support by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And now the interfaces of the object store is still not stable. If one remote object store implementation is added, it will also be helpful for the interface refactoring.

file: HadoopFile,
}

impl HadoopParquetFileReader {
/// Based on the trait object ``ObjectReader`` for local object store,
/// get an abstraction of the trait object ``ChunkReader``
pub fn get_dyn_chunk_reader(
obj_reader: Arc<dyn ObjectReader>,
) -> Result<DynChunkReader> {
let file_meta = obj_reader.get_file_meta();
let fs = HadoopFileSystem::new(file_meta.path.as_str())?;
Ok(DynChunkReader::new(Box::new(HadoopParquetFileReader {
file: fs.open(file_meta.path.as_str())?,
})))
}
}

impl Seek for HadoopParquetFileReader {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let offset = match pos {
SeekFrom::Start(offset) => offset,
SeekFrom::End(offset) | SeekFrom::Current(offset) => offset as u64,
};
if self.file.inner.seek(offset) {
Ok(offset)
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!(
"Fail to seek to {} for file {}",
offset,
self.file.inner.path()
),
))
}
}
}

impl Length for HadoopParquetFileReader {
fn len(&self) -> u64 {
let status = self.file.inner.get_file_status().ok().unwrap();
status.len() as u64
}
}

impl Read for HadoopParquetFileReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.file
.inner
.read(buf)
.map(|read_len| read_len as usize)
.map_err(|e| to_error(e))
}
}

impl TryClone for HadoopParquetFileReader {
fn try_clone(&self) -> std::io::Result<Self> {
Ok(self.clone())
}
}

impl ChunkReader for HadoopParquetFileReader {
type T = DynRead;

fn get_read(&self, start: u64, length: usize) -> parquet::errors::Result<Self::T> {
Ok(DynRead::new(Box::new(FileSource::new(self, start, length))))
}
}
Loading