Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jinchengchenghh committed Jul 29, 2024
1 parent 78a27db commit a3f817a
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 8 deletions.
4 changes: 1 addition & 3 deletions java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,7 @@ arrow::Result<std::shared_ptr<arrow::dataset::FragmentScanOptions>>
ToCsvFragmentScanOptions(const std::unordered_map<std::string, std::string>& configs) {
std::shared_ptr<arrow::dataset::CsvFragmentScanOptions> options =
std::make_shared<arrow::dataset::CsvFragmentScanOptions>();
for (auto const& it : configs) {
auto& key = it.first;
auto& value = it.second;
for (auto const& [key, value] : configs) {
if (key == "delimiter") {
options->parse_options.delimiter = value.data()[0];
} else if (key == "quoting") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.stream.Stream;
import org.apache.arrow.dataset.file.FileFormat;
import org.apache.arrow.dataset.scanner.FragmentScanOptions;
import org.apache.arrow.dataset.scanner.MapUtil;
import org.apache.arrow.dataset.utils.MapUtil;

public class CsvFragmentScanOptions implements FragmentScanOptions {
private final CsvConvertOptions convertOptions;
Expand Down Expand Up @@ -49,17 +49,18 @@ public CsvFragmentScanOptions(
}

/**
* File format id.
* File format.
*
* @return id
* @return file format.
*/
@Override
public FileFormat fileFormat() {
return FileFormat.CSV;
}

/**
* Serialize this class to string array and then called by JNI call.
* This is an internal function to invoke by serializer. Serialize this class to string array and
* then called by JNI call.
*
* @return string array as Map JNI bridge format.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.arrow.dataset.scanner;
package org.apache.arrow.dataset.utils;

import java.util.Map;

/** The utility class for Map. */
public class MapUtil {
private MapUtil() {}

/**
* Convert the map to string array as JNI bridge.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.CDataDictionaryProvider;
Expand Down Expand Up @@ -88,4 +89,80 @@ public void testCsvConvertOptions() throws Exception {
}
}
}

@Test
public void testCsvConvertOptionsDelimiterNotSet() throws Exception {
final Schema schema =
new Schema(
Arrays.asList(
Field.nullable("Id", new ArrowType.Int(32, true)),
Field.nullable("Name", new ArrowType.Utf8()),
Field.nullable("Language", new ArrowType.Utf8())),
null);
String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator);
CDataDictionaryProvider provider = new CDataDictionaryProvider()) {
Data.exportSchema(allocator, schema, provider, cSchema);
CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of());
convertOptions.setArrowSchema(cSchema);
CsvFragmentScanOptions fragmentScanOptions =
new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), ImmutableMap.of());
ScanOptions options =
new ScanOptions.Builder(/*batchSize*/ 32768)
.columns(Optional.empty())
.fragmentScanOptions(fragmentScanOptions)
.build();
try (DatasetFactory datasetFactory =
new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, path);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()) {

assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields());
int rowCount = 0;
while (reader.loadNextBatch()) {
final ValueIterableVector<Integer> idVector =
(ValueIterableVector<Integer>) reader.getVectorSchemaRoot().getVector("Id");
assertThat(idVector.getValueIterable(), IsIterableContainingInOrder.contains(1, 2, 3));
rowCount += reader.getVectorSchemaRoot().getRowCount();
}
assertEquals(3, rowCount);
}
}
}

@Test
public void testCsvConvertOptionsNoOption() throws Exception {
final Schema schema =
new Schema(
Collections.singletonList(Field.nullable("Id;Name;Language", new ArrowType.Utf8())),
null);
String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
ScanOptions options =
new ScanOptions.Builder(/*batchSize*/ 32768).columns(Optional.empty()).build();
try (DatasetFactory datasetFactory =
new FileSystemDatasetFactory(
allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, path);
Dataset dataset = datasetFactory.finish();
Scanner scanner = dataset.newScan(options);
ArrowReader reader = scanner.scanBatches()) {

assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields());
int rowCount = 0;
while (reader.loadNextBatch()) {
final ValueIterableVector<String> idVector =
(ValueIterableVector<String>)
reader.getVectorSchemaRoot().getVector("Id;Name;Language");
assertThat(
idVector.getValueIterable(),
IsIterableContainingInOrder.contains(
"1;Juno;Java\n" + "2;Peter;Python\n" + "3;Celin;C++"));
rowCount += reader.getVectorSchemaRoot().getRowCount();
}
assertEquals(3, rowCount);
}
}
}

0 comments on commit a3f817a

Please sign in to comment.