diff --git a/.github/services/hdfs_native/hdfs/action.yml b/.github/services/hdfs_native/hdfs/action.yml new file mode 100644 index 00000000000..18a52345c4a --- /dev/null +++ b/.github/services/hdfs_native/hdfs/action.yml @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: hdfs_native +description: 'Behavior test for hdfs_native' + +runs: + using: "composite" + steps: + - name: Setup java env + uses: actions/setup-java@v4 + with: + distribution: temurin + java-version: "11" + - name: Setup + shell: bash + run: | + curl -LsSf https://dlcdn.apache.org/hadoop/common/hadoop-3.3.5/hadoop-3.3.5.tar.gz | tar zxf - -C /home/runner + + export HADOOP_HOME="/home/runner/hadoop-3.3.5" + export CLASSPATH=$(${HADOOP_HOME}/bin/hadoop classpath --glob) + + cp ./fixtures/hdfs/hdfs-site.xml ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml + + cat << EOF >> $GITHUB_ENV + HADOOP_HOME=${HADOOP_HOME} + CLASSPATH=${CLASSPATH} + LD_LIBRARY_PATH=${JAVA_HOME}/lib/server:${HADOOP_HOME}/lib/native + OPENDAL_HDFS_NATIVE_ROOT=/tmp/opendal/ + OPENDAL_HDFS_NATIVE_URL=hdfs://127.0.0.1:9000 + OPENDAL_HDFS_NATIVE_ENABLE_APPEND=false + EOF diff --git a/bindings/java/Cargo.toml b/bindings/java/Cargo.toml index 1dc0bfcbfa3..ce2d19a33fc 100644 --- a/bindings/java/Cargo.toml +++ b/bindings/java/Cargo.toml @@ -63,6 +63,7 @@ services-all = [ "services-gdrive", # FIXME how to support HDFS services in bindings? # "services-hdfs", + # "services-hdfs-native", "services-huggingface", "services-ipfs", "services-memcached", @@ -124,6 +125,7 @@ services-ftp = ["opendal/services-ftp"] services-gdrive = ["opendal/services-gdrive"] services-gridfs = ["opendal/services-gridfs"] services-hdfs = ["opendal/services-hdfs"] +services-hdfs-native = ["opendal/services-hdfs-native"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] diff --git a/bindings/nodejs/Cargo.toml b/bindings/nodejs/Cargo.toml index 37abb963755..83219ef73be 100644 --- a/bindings/nodejs/Cargo.toml +++ b/bindings/nodejs/Cargo.toml @@ -63,6 +63,7 @@ services-all = [ "services-gridfs", # FIXME how to support HDFS services in bindings? # "services-hdfs", + # "services-hdfs-native", "services-huggingface", "services-ipfs", "services-koofr", @@ -119,6 +120,7 @@ services-ftp = ["opendal/services-ftp"] services-gdrive = ["opendal/services-gdrive"] services-gridfs = ["opendal/services-gridfs"] services-hdfs = ["opendal/services-hdfs"] +services-hdfs-native = ["opendal/services-hdfs-native"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index f3b4de2c9b1..8ba6bac06b6 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -60,6 +60,7 @@ services-all = [ "services-gdrive", # FIXME how to support HDFS services in bindings? # "services-hdfs", + # "services-hdfs-native", "services-huggingface", "services-ipfs", "services-memcached", @@ -121,6 +122,7 @@ services-ftp = ["opendal/services-ftp"] services-gdrive = ["opendal/services-gdrive"] services-gridfs = ["opendal/services-gridfs"] services-hdfs = ["opendal/services-hdfs"] +services-hdfs-native = ["opendal/services-hdfs-native"] services-huggingface = ["opendal/services-huggingface"] services-ipfs = ["opendal/services-ipfs"] services-koofr = ["opendal/services-koofr"] diff --git a/core/src/services/hdfs_native/backend.rs b/core/src/services/hdfs_native/backend.rs index 257518e0003..9f796620837 100644 --- a/core/src/services/hdfs_native/backend.rs +++ b/core/src/services/hdfs_native/backend.rs @@ -172,10 +172,10 @@ unsafe impl Sync for HdfsNativeBackend {} impl Access for HdfsNativeBackend { type Reader = HdfsNativeReader; - type BlockingReader = (); type Writer = HdfsNativeWriter; - type BlockingWriter = (); type Lister = Option; + type BlockingReader = (); + type BlockingWriter = (); type BlockingLister = (); fn info(&self) -> AccessorInfo { @@ -184,7 +184,9 @@ impl Access for HdfsNativeBackend { .set_root(&self.root) .set_native_capability(Capability { stat: true, - + list: true, + read: true, + write: true, delete: true, rename: true, blocking: true, @@ -205,42 +207,6 @@ impl Access for HdfsNativeBackend { Ok(RpCreateDir::default()) } - async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { - let p = build_rooted_abs_path(&self.root, path); - - let f = self.client.read(&p).await.map_err(parse_hdfs_error)?; - - let r = HdfsNativeReader::new(f); - - Ok((RpRead::new(), r)) - } - - async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let p = build_rooted_abs_path(&self.root, path); - - let f = self - .client - .create(&p, WriteOptions::default()) - .await - .map_err(parse_hdfs_error)?; - - let w = HdfsNativeWriter::new(f); - - Ok((RpWrite::new(), w)) - } - - async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { - let from_path = build_rooted_abs_path(&self.root, from); - let to_path = build_rooted_abs_path(&self.root, to); - - self.client - .rename(&from_path, &to_path, false) - .await - .map_err(parse_hdfs_error)?; - - Ok(RpRename::default()) - } - async fn stat(&self, path: &str, _args: OpStat) -> Result { let p = build_rooted_abs_path(&self.root, path); @@ -266,6 +232,30 @@ impl Access for HdfsNativeBackend { Ok(RpStat::new(metadata)) } + async fn read(&self, path: &str, _args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_rooted_abs_path(&self.root, path); + + let f = self.client.read(&p).await.map_err(parse_hdfs_error)?; + + let r = HdfsNativeReader::new(f); + + Ok((RpRead::new(), r)) + } + + async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_rooted_abs_path(&self.root, path); + + let f = self + .client + .create(&p, WriteOptions::default()) + .await + .map_err(parse_hdfs_error)?; + + let w = HdfsNativeWriter::new(f); + + Ok((RpWrite::new(), w)) + } + async fn delete(&self, path: &str, _args: OpDelete) -> Result { let p = build_rooted_abs_path(&self.root, path); @@ -279,7 +269,20 @@ impl Access for HdfsNativeBackend { async fn list(&self, path: &str, _args: OpList) -> Result<(RpList, Self::Lister)> { let p = build_rooted_abs_path(&self.root, path); - let l = HdfsNativeLister::new(p, self.client.clone()); + let list_status_iterator = self.client.list_status_iter(&p, false); + let l = HdfsNativeLister::new(&self.root, list_status_iterator); Ok((RpList::default(), Some(l))) } + + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + let from_path = build_rooted_abs_path(&self.root, from); + let to_path = build_rooted_abs_path(&self.root, to); + + self.client + .rename(&from_path, &to_path, false) + .await + .map_err(parse_hdfs_error)?; + + Ok(RpRename::default()) + } } diff --git a/core/src/services/hdfs_native/docs.md b/core/src/services/hdfs_native/docs.md index 2b5367c0c08..3db1e78115b 100644 --- a/core/src/services/hdfs_native/docs.md +++ b/core/src/services/hdfs_native/docs.md @@ -6,12 +6,12 @@ Using [Native Rust HDFS client](https://github.com/Kimahriman/hdfs-native). This service can be used to: - [x] stat -- [ ] read -- [ ] write +- [x] read +- [x] write - [ ] create_dir - [x] delete - [x] rename -- [ ] list +- [x] list - [x] blocking - [ ] append diff --git a/core/src/services/hdfs_native/lister.rs b/core/src/services/hdfs_native/lister.rs index bd2863783f3..b5e2c3e0124 100644 --- a/core/src/services/hdfs_native/lister.rs +++ b/core/src/services/hdfs_native/lister.rs @@ -15,28 +15,62 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use chrono::DateTime; +use hdfs_native::client::ListStatusIterator; -use crate::raw::oio; use crate::raw::oio::Entry; +use crate::raw::{build_rel_path, oio}; +use crate::services::hdfs_native::error::parse_hdfs_error; use crate::*; pub struct HdfsNativeLister { - _path: String, - _client: Arc, + root: String, + lsi: ListStatusIterator, } impl HdfsNativeLister { - pub fn new(path: String, client: Arc) -> Self { - HdfsNativeLister { - _path: path, - _client: client, + pub fn new(root: &str, lsi: ListStatusIterator) -> Self { + Self { + root: root.to_string(), + lsi, } } } impl oio::List for HdfsNativeLister { async fn next(&mut self) -> Result> { - todo!() + if let Some(de) = self + .lsi + .next() + .await + .transpose() + .map_err(parse_hdfs_error)? + { + let path = build_rel_path(&self.root, &de.path); + + let entry = if !de.isdir { + let odt = DateTime::from_timestamp(de.modification_time as i64, 0); + + let Some(dt) = odt else { + return Err(Error::new( + ErrorKind::Unexpected, + &format!("Failure in extracting modified_time for {}", path), + )); + }; + let meta = Metadata::new(EntryMode::FILE) + .with_content_length(de.length as u64) + .with_last_modified(dt); + oio::Entry::new(&path, meta) + } else if de.isdir { + // Make sure we are returning the correct path. + oio::Entry::new(&format!("{path}/"), Metadata::new(EntryMode::DIR)) + } else { + oio::Entry::new(&path, Metadata::new(EntryMode::Unknown)) + }; + + Ok(Some(entry)) + } else { + Ok(None) + } } } diff --git a/core/src/services/hdfs_native/reader.rs b/core/src/services/hdfs_native/reader.rs index 1554af2bcd6..8d7901fa260 100644 --- a/core/src/services/hdfs_native/reader.rs +++ b/core/src/services/hdfs_native/reader.rs @@ -18,20 +18,28 @@ use hdfs_native::file::FileReader; use crate::raw::*; +use crate::services::hdfs_native::error::parse_hdfs_error; use crate::*; pub struct HdfsNativeReader { - _f: FileReader, + f: FileReader, } impl HdfsNativeReader { pub fn new(f: FileReader) -> Self { - HdfsNativeReader { _f: f } + HdfsNativeReader { f } } } impl oio::Read for HdfsNativeReader { - async fn read_at(&self, _offset: u64, _limit: usize) -> Result { - todo!() + async fn read_at(&self, offset: u64, limit: usize) -> Result { + // Perform the read operation using read_range + let bytes = self + .f + .read_range(offset as usize, limit) + .await + .map_err(parse_hdfs_error)?; + + Ok(Buffer::from(bytes)) } } diff --git a/core/src/services/hdfs_native/writer.rs b/core/src/services/hdfs_native/writer.rs index e6fb0205e4f..f99380a428c 100644 --- a/core/src/services/hdfs_native/writer.rs +++ b/core/src/services/hdfs_native/writer.rs @@ -18,25 +18,28 @@ use hdfs_native::file::FileWriter; use crate::raw::oio; +use crate::services::hdfs_native::error::parse_hdfs_error; use crate::*; pub struct HdfsNativeWriter { - _f: FileWriter, + f: FileWriter, } impl HdfsNativeWriter { pub fn new(f: FileWriter) -> Self { - HdfsNativeWriter { _f: f } + HdfsNativeWriter { f } } } impl oio::Write for HdfsNativeWriter { - async fn write(&mut self, _bs: Buffer) -> Result { - todo!() + async fn write(&mut self, bs: Buffer) -> Result { + let bytes = bs.to_bytes(); + let n = self.f.write(bytes).await.map_err(parse_hdfs_error)?; + Ok(n) } async fn close(&mut self) -> Result<()> { - todo!() + self.f.close().await.map_err(parse_hdfs_error) } async fn abort(&mut self) -> Result<()> {