diff --git a/core/src/main/protobuf/osmformat.proto b/core/src/main/protobuf/osmformat.proto index 659b8c8..abdde6f 100644 --- a/core/src/main/protobuf/osmformat.proto +++ b/core/src/main/protobuf/osmformat.proto @@ -236,6 +236,9 @@ message Way { optional Info info = 4; repeated sint64 refs = 8 [packed = true]; // DELTA coded + + repeated sint64 lat = 9 [packed = true]; // DELTA coded, optional + repeated sint64 lon = 10 [packed = true]; // DELTA coded, optional } message Relation { diff --git a/core/src/main/scala/com/acervera/osm4scala/DenseNodesIterator.scala b/core/src/main/scala/com/acervera/osm4scala/DenseNodesIterator.scala index 4af2e2b..0315aed 100644 --- a/core/src/main/scala/com/acervera/osm4scala/DenseNodesIterator.scala +++ b/core/src/main/scala/com/acervera/osm4scala/DenseNodesIterator.scala @@ -26,6 +26,7 @@ package com.acervera.osm4scala import com.acervera.osm4scala.model.{Info, NodeEntity} +import com.acervera.osm4scala.utilities.CoordUtils.decompressCoord import com.acervera.osm4scala.utilities.StringTableUtils._ import org.openstreetmap.osmosis.osmbinary.osmformat.{DenseInfo, DenseNodes, StringTable} @@ -76,8 +77,8 @@ class DenseNodesIterator(osmosisStringTable: StringTable, // Calculate new values base in deltas and update deltas lastId = idIterator.next() + lastId - lastLatitude = decompressCoord(latOffset, latIterator.next(), lastLatitude) - lastLongitude = decompressCoord(lonOffset, lonIterator.next(), lastLongitude) + lastLatitude = decompressCoord(latOffset, latIterator.next(), lastLatitude, granularity) + lastLongitude = decompressCoord(lonOffset, lonIterator.next(), lastLongitude, granularity) // Create node NodeEntity( @@ -90,18 +91,6 @@ class DenseNodesIterator(osmosisStringTable: StringTable, } - /** - * Calculate coordinate applying offset, granularity and delta. - * - * @param offSet - * @param delta - * @param currentValue - * @return - */ - def decompressCoord(offSet: Long, delta: Long, currentValue: Double): Double = { - (.000000001 * (offSet + (granularity * delta))) + currentValue - } - // Decode DenseInfo trait InfoIterator extends Iterator[Option[Info]] diff --git a/core/src/main/scala/com/acervera/osm4scala/model/model.scala b/core/src/main/scala/com/acervera/osm4scala/model/model.scala index 0a8328f..cffc102 100644 --- a/core/src/main/scala/com/acervera/osm4scala/model/model.scala +++ b/core/src/main/scala/com/acervera/osm4scala/model/model.scala @@ -25,6 +25,7 @@ package com.acervera.osm4scala.model +import com.acervera.osm4scala.utilities.CoordUtils.convertToMicroDegrees import com.acervera.osm4scala.utilities.StringTableUtils._ import org.openstreetmap.osmosis.osmbinary.osmformat @@ -88,7 +89,9 @@ case class WayEntity( id: Long, nodes: Seq[Long], tags: Map[String, String], - info: Option[Info] = None + info: Option[Info] = None, + lat: Seq[Double] = Seq.empty, + lgn: Seq[Double] = Seq.empty ) extends OSMEntity { override val osmModel: OSMTypes.Value = OSMTypes.Way } @@ -101,7 +104,13 @@ object WayEntity { .scanLeft(0L) { _ + _ } .drop(1), // Calculate nodes references in stored in delta compression. TODO: extract to utility class. osmosisStringTable.extractTags(osmosisWay.keys, osmosisWay.vals), - Info(osmosisStringTable, osmosisWay.info) + Info(osmosisStringTable, osmosisWay.info), + osmosisWay.lat + .scanLeft(0L) { _ + _ } + .drop(1).map(x=> convertToMicroDegrees(x)), + osmosisWay.lon + .scanLeft(0L) { _ + _ } + .drop(1).map(x => convertToMicroDegrees(x)) ) } diff --git a/core/src/main/scala/com/acervera/osm4scala/utilities/CoordUtils.scala b/core/src/main/scala/com/acervera/osm4scala/utilities/CoordUtils.scala new file mode 100644 index 0000000..fb5e2c3 --- /dev/null +++ b/core/src/main/scala/com/acervera/osm4scala/utilities/CoordUtils.scala @@ -0,0 +1,57 @@ +/* + * The MIT License (MIT) + * + * Copyright (c) 2023 Ángel Cervera Claudio + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + */ + +package com.acervera.osm4scala.utilities + +import scala.math.BigDecimal.RoundingMode + +/** + * Utility to manage coordinates + */ +object CoordUtils { + + /** + * Calculate coordinate applying offset, granularity and delta. + * mode + * + * @param offSet + * @param delta + * @param currentValue + * @return + */ + def decompressCoord(offSet: Long, delta: Long, currentValue: Double, granularity: Int): Double = { + (.000000001 * (offSet + (granularity * delta))) + currentValue + } + + /** + * Calculate coordinate applying offset, granularity and delta. + * + * @param coordValue + * @return + */ + def convertToMicroDegrees(coordValue: Double): Double = { + .0000001 * coordValue + } +} diff --git a/core/src/test/resources/com/acervera/osm4scala/monaco-anonymized-with-geo.osm.pbf b/core/src/test/resources/com/acervera/osm4scala/monaco-anonymized-with-geo.osm.pbf new file mode 100644 index 0000000..6165fdc Binary files /dev/null and b/core/src/test/resources/com/acervera/osm4scala/monaco-anonymized-with-geo.osm.pbf differ diff --git a/core/src/test/resources/com/acervera/osm4scala/primitives/way/way-with-geo b/core/src/test/resources/com/acervera/osm4scala/primitives/way/way-with-geo new file mode 100644 index 0000000..d45f3dc Binary files /dev/null and b/core/src/test/resources/com/acervera/osm4scala/primitives/way/way-with-geo differ diff --git a/core/src/test/scala/com/acervera/osm4scala/model/WayEntitySuite.scala b/core/src/test/scala/com/acervera/osm4scala/model/WayEntitySuite.scala index 32bee0c..273588d 100644 --- a/core/src/test/scala/com/acervera/osm4scala/model/WayEntitySuite.scala +++ b/core/src/test/scala/com/acervera/osm4scala/model/WayEntitySuite.scala @@ -47,7 +47,28 @@ class WayEntitySuite extends AnyFunSuite with Matchers { // Test val way = WayEntity(strTable, osmosisWay) - way shouldBe WayEntity( + way shouldBe expectedWayEntity(Vector(), Vector()) + } + + test("read a real osmosis Way, with geometry.") { + + val strTable = StringTable parseFrom new FileInputStream( + "core/src/test/resources/com/acervera/osm4scala/primitives/way/strTable") + val osmosisWay = Way parseFrom new FileInputStream( + "core/src/test/resources/com/acervera/osm4scala/primitives/way/way-with-geo") + + // Test + val way = WayEntity(strTable, osmosisWay) + way shouldBe expectedWayEntity(Vector(43.7389436, 43.7392238, 43.7393298, 43.7395534, 43.7397211, 43.739036999999996, + 43.7391636, 43.739084899999995, 43.739427299999996, 43.7395025, 43.7392768, 43.7391243, 43.7394671, 43.7395255, + 43.7389997, 43.7393746, 43.739060099999996, 43.739576199999995), + Vector(7.4259531, 7.425661099999999, 7.4256563, 7.4256163, 7.425138199999999, 7.425796399999999, 7.4256817999999996, + 7.425743, 7.425708299999999, 7.425694699999999, 7.4256535999999995, 7.425708299999999, 7.4257117, 7.4256666, + 7.4258602, 7.4256801999999995, 7.425768199999999, 7.4255146)) + } + + private def expectedWayEntity(latitude: Vector[Double], longitude: Vector[Double]) = { + WayEntity( 4097656, Vector( 21912089L, 7265761724L, 1079750744L, 2104793864L, 6340961560L, 1110560507L, 21912093L, 6340961559L, 21912095L, @@ -70,8 +91,9 @@ class WayEntitySuite extends AnyFunSuite with Matchers { Some(""), None ) - ) + ), + latitude, + longitude, ) } - } diff --git a/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/SparkSuitesUtilities.scala b/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/SparkSuitesUtilities.scala index 870e882..cf144e1 100644 --- a/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/SparkSuitesUtilities.scala +++ b/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/SparkSuitesUtilities.scala @@ -37,5 +37,10 @@ object SparkSuitesUtilities { def monaco: DataFrame = spark.sqlContext.read .format("osm.pbf") .load("core/src/test/resources/com/acervera/osm4scala/monaco-anonymized.osm.pbf") + + def monacoWithGeometry: DataFrame = spark.sqlContext.read + .format("osm.pbf") + .option("wayWithGeometry", "true") + .load("core/src/test/resources/com/acervera/osm4scala/monaco-anonymized-with-geo.osm.pbf") } diff --git a/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/tagkeys/JobSpec.scala b/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/tagkeys/JobSpec.scala index e647722..9c384dd 100644 --- a/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/tagkeys/JobSpec.scala +++ b/examples/spark-utilities/src/test/scala/com/acervera/osm4scala/examples/spark/tagkeys/JobSpec.scala @@ -32,37 +32,38 @@ import org.scalatest.wordspec.AnyWordSpecLike class JobSpec extends AnyWordSpecLike with Matchers { - "TagKeys" should { - "extracting keys without filter" in { - monaco.createOrReplaceTempView("tag_keys_no_filter") - val result = Job.run(monaco, "tag_keys_no_filter", Config(tagKeysConfig = Some(TagKeysCfg()))) - .count() - result shouldBe 833 - } + for (dataFrame <- Seq(("withoutGeometry", monaco), ("withGeometry", monacoWithGeometry))) { + "TagKeys" should { + s"extracting keys without filter - ${dataFrame._1}" in { + dataFrame._2.createOrReplaceTempView("tag_keys_no_filter") + val result = Job.run(monaco, "tag_keys_no_filter", Config(tagKeysConfig = Some(TagKeysCfg()))) + .count() + result shouldBe 833 + } - "extracting keys filtering by nodes" in { - monaco.createOrReplaceTempView("tag_keys_nodes_filter") - val result = Job.run(monaco, "tag_keys_nodes_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(0))))) - .count() + s"extracting keys filtering by nodes - ${dataFrame._1}" in { + dataFrame._2.createOrReplaceTempView("tag_keys_nodes_filter") + val result = Job.run(monaco, "tag_keys_nodes_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(0))))) + .count() - result shouldBe 513 - } + result shouldBe 513 + } - "extracting keys filtering by ways" in { - monaco.createOrReplaceTempView("tag_keys_ways_filter") - val result = Job.run(monaco, "tag_keys_ways_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(1))))) - .count() + s"extracting keys filtering by ways - ${dataFrame._1}" in { + dataFrame._2.createOrReplaceTempView("tag_keys_ways_filter") + val result = Job.run(monaco, "tag_keys_ways_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(1))))) + .count() - result shouldBe 329 - } + result shouldBe 329 + } - "extracting keys filtering by relations" in { - monaco.createOrReplaceTempView("tag_keys_relations_filter") - val result = Job.run(monaco, "tag_keys_relations_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(2))))) - .count() + s"extracting keys filtering by relations - ${dataFrame._1}" in { + dataFrame._2.createOrReplaceTempView("tag_keys_relations_filter") + val result = Job.run(monaco, "tag_keys_relations_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(2))))) + .count() - result shouldBe 485 + result shouldBe 485 + } } } - } diff --git a/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfFormat.scala b/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfFormat.scala index b571610..7966e1b 100644 --- a/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfFormat.scala +++ b/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfFormat.scala @@ -122,7 +122,12 @@ class OsmPbfFormat extends FileFormat with DataSourceRegister { override def inferSchema(sparkSession: SparkSession, options: Map[String, String], - files: Seq[FileStatus]): Option[StructType] = Some(OsmSqlEntity.schema) + files: Seq[FileStatus]): Option[StructType] = + if (options.getOrElse("wayWithGeometry","false").toLowerCase().equals("true")) { + Some(OsmSqlEntity.schemaWithGeo) + } else { + Some(OsmSqlEntity.schema) + } override def prepareWrite(sparkSession: SparkSession, job: Job, diff --git a/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfRowIterator.scala b/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfRowIterator.scala index 1c49d0d..1d20243 100644 --- a/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfRowIterator.scala +++ b/spark/src/main/scala/com/acervera/osm4scala/spark/OsmPbfRowIterator.scala @@ -31,7 +31,7 @@ import com.acervera.osm4scala.spark.OsmSqlEntity._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.types.{ArrayType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, LongType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String class OsmPbfRowIterator(osmEntityIterator: Iterator[OSMEntity], requiredSchema: StructType) @@ -88,18 +88,57 @@ object OsmPbfRowIterator { case fieldName => throw new Exception(s"Field $fieldName not valid for Info.") }) - private def populateWay(entity: WayEntity, structType: StructType): Seq[Any] = structType.fieldNames.map { - case FIELD_ID => entity.id - case FIELD_TYPE => ENTITY_TYPE_WAY - case FIELD_LATITUDE => null - case FIELD_LONGITUDE => null - case FIELD_NODES => UnsafeArrayData.fromPrimitiveArray(entity.nodes.toArray) - case FIELD_RELATIONS => new GenericArrayData(Seq.empty) - case FIELD_TAGS => calculateTags(entity.tags) - case FIELD_INFO => entity.info.map(populateInfo).orNull - case fieldName => throw new Exception(s"Field $fieldName not valid for a Way.") + private def populateWay(entity: WayEntity, structType: StructType): Seq[Any] = structType.fields.map(f => + f.name match { + case FIELD_ID => entity.id + case FIELD_TYPE => ENTITY_TYPE_WAY + case FIELD_LATITUDE => null + case FIELD_LONGITUDE => null + case FIELD_NODES => calculateWayNodes(entity.nodes, entity.lat, entity.lgn, f) + case FIELD_RELATIONS => new GenericArrayData(Seq.empty) + case FIELD_TAGS => calculateTags(entity.tags) + case FIELD_INFO => entity.info.map(populateInfo).orNull + case fieldName => throw new Exception(s"Field $fieldName not valid for a Way.") + } + ) + + private def calculateWayNodes(nodeIds: Seq[Long], lat: Seq[Double], lgn: Seq[Double], structField: StructField) + : ArrayData = { + if (structField.dataType.asInstanceOf[ArrayType].elementType == LongType) { + //way nodes without geometry + UnsafeArrayData.fromPrimitiveArray(nodeIds.toArray) + } else { + //way nodes with geometry + calculateWayNodeWithGeometry(nodeIds, lat, lgn, structField) + } + } + + private def calculateWayNodeWithGeometry(nodeIds: Seq[Long], lat: Seq[Double], lgn: Seq[Double], structField: StructField) = { + val nodes: Seq[(Long, (Double, Double))] = (nodeIds zip (lat, lgn).zipped.toList) + new GenericArrayData( + structField.dataType match { + case ArrayType(elementType, _) => + elementType match { + case s: StructType => nodes.map(r => InternalRow.fromSeq(calculateWayNode(r, s))) + case s => + throw new UnsupportedOperationException( + s"Schema ${s} isn't supported. Only arrays of StructType are allowed for way nodes.") + } + case s => + throw new UnsupportedOperationException( + s"Schema ${s} isn't supported. Only arrays of StructType are allowed for way nodes.") + } + ) } + private def calculateWayNode(wayNode: (Long, (Double, Double)), structType: StructType): Seq[Any] = + structType.fieldNames.map { + case FIELD_ID => wayNode._1 + case FIELD_LATITUDE => wayNode._2._1 + case FIELD_LONGITUDE => wayNode._2._2 + case fieldName => throw new Exception(s"Field $fieldName not valid for a way node.") + } + private def populateRelation(entity: RelationEntity, structType: StructType): Seq[Any] = structType.fields.map(f => f.name match { diff --git a/spark/src/main/scala/com/acervera/osm4scala/spark/OsmSqlEntity.scala b/spark/src/main/scala/com/acervera/osm4scala/spark/OsmSqlEntity.scala index b5eedc3..d0eeadc 100644 --- a/spark/src/main/scala/com/acervera/osm4scala/spark/OsmSqlEntity.scala +++ b/spark/src/main/scala/com/acervera/osm4scala/spark/OsmSqlEntity.scala @@ -81,6 +81,12 @@ object OsmSqlEntity { StructField(FIELD_INFO_VISIBLE, BooleanType, true) )) + lazy val wayNodeSchema = StructType( + Seq( + StructField(FIELD_ID, LongType, true), + StructField(FIELD_LATITUDE, DoubleType, true), + StructField(FIELD_LONGITUDE, DoubleType, true) + )) lazy val schema = StructType( Seq( @@ -94,4 +100,16 @@ object OsmSqlEntity { StructField(FIELD_INFO, StructType(infoSchema), true) )) + lazy val schemaWithGeo = StructType( + Seq( + StructField(FIELD_ID, LongType, false), + StructField(FIELD_TYPE, ByteType, false), + StructField(FIELD_LATITUDE, DoubleType, true), + StructField(FIELD_LONGITUDE, DoubleType, true), + StructField(FIELD_NODES, ArrayType(StructType(wayNodeSchema), false), true), + StructField(FIELD_RELATIONS, ArrayType(relationSchema, false), true), + StructField(FIELD_TAGS, MapType(StringType, StringType, false), true), + StructField(FIELD_INFO, StructType(infoSchema), true) + )) + } diff --git a/spark/src/test/scala/com/acervera/osm4scala/spark/OsmPbfFormatSpec.scala b/spark/src/test/scala/com/acervera/osm4scala/spark/OsmPbfFormatSpec.scala index 1ef10c2..1a0b284 100644 --- a/spark/src/test/scala/com/acervera/osm4scala/spark/OsmPbfFormatSpec.scala +++ b/spark/src/test/scala/com/acervera/osm4scala/spark/OsmPbfFormatSpec.scala @@ -42,6 +42,7 @@ import scala.util.Random object SourcesForTesting { val madridPath = "core/src/test/resources/com/acervera/osm4scala/Madrid.bbbike.osm.pbf" val monacoPath = "core/src/test/resources/com/acervera/osm4scala/monaco-anonymized.osm.pbf" + val monacoWithGeoPath = "core/src/test/resources/com/acervera/osm4scala/monaco-anonymized-with-geo.osm.pbf" val threeBlocks = "core/src/test/resources/com/acervera/osm4scala/fileblock/three_blocks.pbf" } @@ -202,6 +203,46 @@ class OsmPbfFormatSpec extends AnyWordSpec with Matchers with SparkSessionBefore way3996192.getAs[Seq[Any]]("relations") shouldBe Seq.empty } + "is parsing ways with geometry" in { + val way25739583 = loadOsmPbf(spark, monacoWithGeoPath, None, Map("wayWithGeometry" -> "true")).filter("id == " + + "25739583").collect()(0) + way25739583.getAs[Long]("id") shouldBe 25739583L + way25739583.getAs[Byte]("type") shouldBe 1 + way25739583.getAs[AnyRef]("latitude") should be(null) + way25739583.getAs[AnyRef]("longitude") should be(null) + way25739583.getAs[Map[String, String]]("tags") shouldBe + Map("name" -> "Rue Hector Otto", + "highway" -> "residential", + "junction" -> "roundabout" + ) + + way25739583.getAs[Seq[Row]]("nodes").map(x => ( + x.getAs[Long]("id"), + x.getAs[Double]("latitude"), + x.getAs[Double]("longitude") + )) shouldBe Seq( + (257076297, 43.7335259, 7.4130796), + (1780610235, 43.733522199999996, 7.413021499999999), + (257076299, 43.733529499999996, 7.4129884), + (257076301, 43.7335487, 7.4129352), + (1780610236, 43.7335549, 7.412869199999999), + (280703046, 43.733543999999995, 7.4128292), + (257076304, 43.7335135, 7.4128022), + (280703045, 43.7334773, 7.412792), + (257076306, 43.7334402, 7.4128023), + (257076309, 43.7334118, 7.4128333), + (1780610229, 43.7334052, 7.4128794), + (257076311, 43.7334003, 7.4129156), + (280703047, 43.733399999999996, 7.4129789), + (257076297, 43.7335259, 7.4130796)) + way25739583.getAs[Seq[Any]]("relations") shouldBe Seq.empty + + //ensure nodes order is the same if you read with geometry or without + val way25739583WithoutGeo = loadOsmPbf(spark, monacoPath).filter("id == 25739583").collect()(0) + val nodeListWithoutGeo = way25739583WithoutGeo.getAs[Seq[Long]]("nodes") + way25739583.getAs[Seq[Row]]("nodes").map(x => x.getAs[Long]("id")) shouldBe nodeListWithoutGeo + } + "is parsing relations" in { val relation55799 = loadOsmPbf(spark, madridPath).filter("id == 55799").collect()(0) relation55799.getAs[Long]("id") shouldBe 55799