Skip to content

Commit

Permalink
Shapefile DataSource for Spark SQL API
Browse files Browse the repository at this point in the history
  • Loading branch information
Kontinuation committed Aug 19, 2024
1 parent e90b9d6 commit 0cde90f
Show file tree
Hide file tree
Showing 16 changed files with 1,079 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,21 @@ public static Geometry transformToGivenTarget(
}
}

/**
* Get the SRID of a CRS from a WKT string
*
* @param crsWKT WKT string for CRS
* @return SRID
*/
public static int wktCRSToSRID(String crsWKT) {
try {
CoordinateReferenceSystem crs = CRS.parseWKT(crsWKT);
return crsToSRID(crs);
} catch (FactoryException e) {
throw new IllegalArgumentException("Cannot parse CRS WKT", e);
}
}

/**
* Get the SRID of a CRS. We use the EPSG code of the CRS if available.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,13 @@ public void parseFileHead(DataInputStream inputStream) throws IOException {
}

/**
* draw raw byte array of effective record
* Parse the next record in the .dbf file
*
* @param inputStream
* @return
* @throws IOException
* @param inputStream input stream of .dbf file
* @return a list of fields as their original representation in the dbf file
* @throws IOException if an I/O error occurs
*/
public String parsePrimitiveRecord(DataInputStream inputStream) throws IOException {
public List<byte[]> parse(DataInputStream inputStream) throws IOException {
if (isDone()) {
return null;
}
Expand All @@ -160,50 +160,34 @@ public String parsePrimitiveRecord(DataInputStream inputStream) throws IOExcepti
byte[] primitiveBytes = new byte[recordLength];
inputStream.readFully(primitiveBytes);
numRecordRead++; // update number of record read
return primitiveToAttributes(ByteBuffer.wrap(primitiveBytes));
return extractFieldBytes(ByteBuffer.wrap(primitiveBytes));
}

/**
* abstract attributes from primitive bytes according to field descriptors.
*
* @param inputStream
* @return
* @throws IOException
*/
public String primitiveToAttributes(DataInputStream inputStream) throws IOException {
byte[] delimiter = {'\t'};
Text attributes = new Text();
for (int i = 0; i < fieldDescriptors.size(); ++i) {
FieldDescriptor descriptor = fieldDescriptors.get(i);
/** Extract attributes from primitive bytes according to field descriptors. */
private List<byte[]> extractFieldBytes(ByteBuffer buffer) {
int numFields = fieldDescriptors.size();
List<byte[]> fieldBytesList = new ArrayList<>(numFields);
for (FieldDescriptor descriptor : fieldDescriptors) {
byte[] fldBytes = new byte[descriptor.getFieldLength()];
inputStream.readFully(fldBytes);
// System.out.println(descriptor.getFiledName() + " " + new String(fldBytes));
byte[] attr = new String(fldBytes).trim().getBytes();
if (i > 0) {
attributes.append(delimiter, 0, 1); // first attribute doesn't append '\t'
}
attributes.append(attr, 0, attr.length);
buffer.get(fldBytes, 0, fldBytes.length);
fieldBytesList.add(fldBytes);
}
String attrs = attributes.toString();
return attributes.toString();
return fieldBytesList;
}

/**
* abstract attributes from primitive bytes according to field descriptors.
*
* @param buffer
* @return
* @throws IOException
* @param fieldBytesList a list of primitive bytes
* @return string attributes delimited by '\t'
*/
public String primitiveToAttributes(ByteBuffer buffer) throws IOException {
public static String fieldBytesToString(List<byte[]> fieldBytesList) {
byte[] delimiter = {'\t'};
Text attributes = new Text();
for (int i = 0; i < fieldDescriptors.size(); ++i) {
FieldDescriptor descriptor = fieldDescriptors.get(i);
byte[] fldBytes = new byte[descriptor.getFieldLength()];
buffer.get(fldBytes, 0, fldBytes.length);
for (int i = 0; i < fieldBytesList.size(); ++i) {
byte[] fldBytes = fieldBytesList.get(i);
String charset = System.getProperty("sedona.global.charset", "default");
Boolean utf8flag = charset.equalsIgnoreCase("utf8");
boolean utf8flag = charset.equalsIgnoreCase("utf8");
byte[] attr = utf8flag ? fldBytes : fastParse(fldBytes, 0, fldBytes.length).trim().getBytes();
if (i > 0) {
attributes.append(delimiter, 0, 1); // first attribute doesn't append '\t'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,21 @@
package org.apache.sedona.core.formatMapper.shapefileParser.shapes;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;
import org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.shp.ShapeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CombineShapeReader extends RecordReader<ShapeKey, PrimitiveShape> {

/** dubug logger */
static final Logger logger = Logger.getLogger(CombineShapeReader.class);
private static final Logger logger = LoggerFactory.getLogger(CombineShapeReader.class);
/** suffix of attribute file */
private static final String DBF_SUFFIX = "dbf";
/** suffix of shape record file */
Expand Down Expand Up @@ -93,20 +90,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)
if (shxSplit != null) {
// shape file exists, extract .shp with .shx
// first read all indexes into memory
Path filePath = shxSplit.getPath();
FileSystem fileSys = filePath.getFileSystem(context.getConfiguration());
FSDataInputStream shxInpuStream = fileSys.open(filePath);
shxInpuStream.skip(24);
int shxFileLength =
shxInpuStream.readInt() * 2 - 100; // get length in bytes, exclude header
// skip following 72 bytes in header
shxInpuStream.skip(72);
byte[] bytes = new byte[shxFileLength];
// read all indexes into memory, skip first 50 bytes(header)
shxInpuStream.readFully(bytes, 0, bytes.length);
IntBuffer buffer = ByteBuffer.wrap(bytes).asIntBuffer();
int[] indexes = new int[shxFileLength / 4];
buffer.get(indexes);
int[] indexes = ShxFileReader.readAll(shxSplit, context);
shapeFileReader = new ShapeFileReader(indexes);
} else {
shapeFileReader = new ShapeFileReader(); // no index, construct with no parameter
Expand All @@ -122,7 +106,7 @@ public void initialize(InputSplit split, TaskAttemptContext context)
}
}

public boolean nextKeyValue() throws IOException, InterruptedException {
public boolean nextKeyValue() throws IOException {

boolean hasNextShp = shapeFileReader.nextKeyValue();
if (hasDbf) {
Expand All @@ -132,10 +116,8 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
ShapeType curShapeType = shapeFileReader.getCurrentValue().getType();
while (hasNextShp && !curShapeType.isSupported()) {
logger.warn(
"[SEDONA] Shapefile type "
+ curShapeType.name()
+ " is not supported. Skipped this record."
+ " Please use QGIS or GeoPandas to convert it to a type listed in ShapeType.java");
"[SEDONA] Shapefile type {} is not supported. Skipped this record. Please use QGIS or GeoPandas to convert it to a type listed in ShapeType.java",
curShapeType.name());
if (hasDbf) {
hasNextDbf = dbfFileReader.nextKeyValue();
}
Expand All @@ -149,28 +131,28 @@ public boolean nextKeyValue() throws IOException, InterruptedException {
new Exception(
"shape record loses attributes in .dbf file at ID="
+ shapeFileReader.getCurrentKey().getIndex());
e.printStackTrace();
logger.warn(e.getMessage(), e);
} else if (!hasNextShp && hasNextDbf) {
Exception e = new Exception("Redundant attributes in .dbf exists");
e.printStackTrace();
logger.warn(e.getMessage(), e);
}
}
return hasNextShp;
}

public ShapeKey getCurrentKey() throws IOException, InterruptedException {
public ShapeKey getCurrentKey() {
return shapeFileReader.getCurrentKey();
}

public PrimitiveShape getCurrentValue() throws IOException, InterruptedException {
public PrimitiveShape getCurrentValue() {
PrimitiveShape value = new PrimitiveShape(shapeFileReader.getCurrentValue());
if (hasDbf && hasNextDbf) {
value.setAttributes(dbfFileReader.getCurrentValue());
}
return value;
}

public float getProgress() throws IOException, InterruptedException {
public float getProgress() {
return shapeFileReader.getProgress();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.sedona.core.formatMapper.shapefileParser.shapes;

import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.dbf.DbfParseUtil;
import org.apache.sedona.core.formatMapper.shapefileParser.parseUtils.dbf.FieldDescriptor;

public class DbfFileReader extends org.apache.hadoop.mapreduce.RecordReader<ShapeKey, String> {

Expand All @@ -34,45 +36,60 @@ public class DbfFileReader extends org.apache.hadoop.mapreduce.RecordReader<Shap
/** inputstream of .dbf file */
private FSDataInputStream inputStream = null;
/** primitive bytes array of one row */
private String value = null;
private List<byte[]> value = null;
/** key value of current row */
private ShapeKey key = null;
/** generated id of current row */
private int id = 0;

public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
FileSplit fileSplit = (FileSplit) split;
Path inputPath = fileSplit.getPath();
FileSystem fileSys = inputPath.getFileSystem(context.getConfiguration());
inputStream = fileSys.open(inputPath);
FSDataInputStream stream = fileSys.open(inputPath);
initialize(stream);
}

public void initialize(FSDataInputStream stream) throws IOException {
inputStream = stream;
dbfParser = new DbfParseUtil();
dbfParser.parseFileHead(inputStream);
}

public boolean nextKeyValue() throws IOException, InterruptedException {
public List<FieldDescriptor> getFieldDescriptors() {
return dbfParser.getFieldDescriptors();
}

public boolean nextKeyValue() throws IOException {
// first check deleted flag
String curbytes = dbfParser.parsePrimitiveRecord(inputStream);
if (curbytes == null) {
List<byte[]> fieldBytesList = dbfParser.parse(inputStream);
if (fieldBytesList == null) {
value = null;
return false;
} else {
value = curbytes;
value = fieldBytesList;
key = new ShapeKey();
key.setIndex(id++);
return true;
}
}

public ShapeKey getCurrentKey() throws IOException, InterruptedException {
public ShapeKey getCurrentKey() {
return key;
}

public String getCurrentValue() throws IOException, InterruptedException {
public List<byte[]> getCurrentFieldBytes() {
return value;
}

public float getProgress() throws IOException, InterruptedException {
public String getCurrentValue() {
if (value == null) {
return null;
}
return DbfParseUtil.fieldBytesToString(value);
}

public float getProgress() {
return dbfParser.getProgress();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ShapeFileReader extends RecordReader<ShapeKey, ShpRecord> {
private ShapeKey recordKey = null;
/** primitive bytes value */
private ShpRecord recordContent = null;
/** inputstream for .shp file */
/** input stream for .shp file */
private FSDataInputStream shpInputStream = null;
/** Iterator of indexes of records */
private int[] indexes;
Expand All @@ -53,7 +53,7 @@ public ShapeFileReader() {}
/**
* constructor with index
*
* @param indexes
* @param indexes offsets of records in the .shp file
*/
public ShapeFileReader(int[] indexes) {
this.indexes = indexes;
Expand All @@ -65,48 +65,55 @@ public void initialize(InputSplit split, TaskAttemptContext context)
FileSplit fileSplit = (FileSplit) split;
Path filePath = fileSplit.getPath();
FileSystem fileSys = filePath.getFileSystem(context.getConfiguration());
shpInputStream = fileSys.open(filePath);
// assign inputstream to parser and parse file header to init;
parser = new ShpFileParser(shpInputStream);
FSDataInputStream stream = fileSys.open(filePath);
initialize(stream);
}

public void initialize(FSDataInputStream stream) throws IOException {
shpInputStream = stream;
parser = new ShpFileParser(stream);
parser.parseShapeFileHead();
}

public boolean nextKeyValue() throws IOException, InterruptedException {
public boolean nextKeyValue() throws IOException {
if (useIndex) {
/** with index, iterate until end and extract bytes with information from indexes */
/* with index, iterate until end and extract bytes with information from indexes */
if (indexId == indexes.length) {
return false;
}
// check offset, if current offset in inputStream not match with information in shx, move it
if (shpInputStream.getPos() < indexes[indexId] * 2) {
shpInputStream.skip(indexes[indexId] * 2 - shpInputStream.getPos());
long pos = indexes[indexId] * 2L;
if (shpInputStream.getPos() < pos) {
long skipBytes = pos - shpInputStream.getPos();
if (shpInputStream.skip(skipBytes) != skipBytes) {
throw new IOException("Failed to seek to the right place in .shp file");
}
}
int currentLength = indexes[indexId + 1] * 2 - 4;
recordKey = new ShapeKey();
recordKey.setIndex(parser.parseRecordHeadID());
recordContent = parser.parseRecordPrimitiveContent(currentLength);
indexId += 2;
return true;
} else {
if (getProgress() >= 1) {
return false;
}
recordKey = new ShapeKey();
recordKey.setIndex(parser.parseRecordHeadID());
recordContent = parser.parseRecordPrimitiveContent();
return true;
}
return true;
}

public ShapeKey getCurrentKey() throws IOException, InterruptedException {
public ShapeKey getCurrentKey() {
return recordKey;
}

public ShpRecord getCurrentValue() throws IOException, InterruptedException {
public ShpRecord getCurrentValue() {
return recordContent;
}

public float getProgress() throws IOException, InterruptedException {
public float getProgress() {
return parser.getProgress();
}

Expand Down
Loading

0 comments on commit 0cde90f

Please sign in to comment.