Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable reading of pbfs with node location for ways #111

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/main/protobuf/osmformat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ message Way {
optional Info info = 4;

repeated sint64 refs = 8 [packed = true]; // DELTA coded

repeated sint64 lat = 9 [packed = true]; // DELTA coded, optional
repeated sint64 lon = 10 [packed = true]; // DELTA coded, optional
}

message Relation {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
package com.acervera.osm4scala

import com.acervera.osm4scala.model.{Info, NodeEntity}
import com.acervera.osm4scala.utilities.CoordUtils.decompressCoord
import com.acervera.osm4scala.utilities.StringTableUtils._
import org.openstreetmap.osmosis.osmbinary.osmformat.{DenseInfo, DenseNodes, StringTable}

Expand Down Expand Up @@ -76,8 +77,8 @@ class DenseNodesIterator(osmosisStringTable: StringTable,

// Calculate new values base in deltas and update deltas
lastId = idIterator.next() + lastId
lastLatitude = decompressCoord(latOffset, latIterator.next(), lastLatitude)
lastLongitude = decompressCoord(lonOffset, lonIterator.next(), lastLongitude)
lastLatitude = decompressCoord(latOffset, latIterator.next(), lastLatitude, granularity)
lastLongitude = decompressCoord(lonOffset, lonIterator.next(), lastLongitude, granularity)

// Create node
NodeEntity(
Expand All @@ -90,18 +91,6 @@ class DenseNodesIterator(osmosisStringTable: StringTable,

}

/**
* Calculate coordinate applying offset, granularity and delta.
*
* @param offSet
* @param delta
* @param currentValue
* @return
*/
def decompressCoord(offSet: Long, delta: Long, currentValue: Double): Double = {
(.000000001 * (offSet + (granularity * delta))) + currentValue
}

// Decode DenseInfo

trait InfoIterator extends Iterator[Option[Info]]
Expand Down
13 changes: 11 additions & 2 deletions core/src/main/scala/com/acervera/osm4scala/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

package com.acervera.osm4scala.model

import com.acervera.osm4scala.utilities.CoordUtils.convertToMicroDegrees
import com.acervera.osm4scala.utilities.StringTableUtils._
import org.openstreetmap.osmosis.osmbinary.osmformat

Expand Down Expand Up @@ -88,7 +89,9 @@ case class WayEntity(
id: Long,
nodes: Seq[Long],
tags: Map[String, String],
info: Option[Info] = None
info: Option[Info] = None,
lat: Seq[Double] = Seq.empty,
lgn: Seq[Double] = Seq.empty
) extends OSMEntity {
override val osmModel: OSMTypes.Value = OSMTypes.Way
}
Expand All @@ -101,7 +104,13 @@ object WayEntity {
.scanLeft(0L) { _ + _ }
.drop(1), // Calculate nodes references in stored in delta compression. TODO: extract to utility class.
osmosisStringTable.extractTags(osmosisWay.keys, osmosisWay.vals),
Info(osmosisStringTable, osmosisWay.info)
Info(osmosisStringTable, osmosisWay.info),
osmosisWay.lat
.scanLeft(0L) { _ + _ }
.drop(1).map(x=> convertToMicroDegrees(x)),
osmosisWay.lon
.scanLeft(0L) { _ + _ }
.drop(1).map(x => convertToMicroDegrees(x))
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2023 Ángel Cervera Claudio
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/

package com.acervera.osm4scala.utilities

import scala.math.BigDecimal.RoundingMode

/**
* Utility to manage coordinates
*/
object CoordUtils {

/**
* Calculate coordinate applying offset, granularity and delta.
* mode
*
* @param offSet
* @param delta
* @param currentValue
* @return
*/
def decompressCoord(offSet: Long, delta: Long, currentValue: Double, granularity: Int): Double = {
Copy link
Member

@angelcervera angelcervera Jul 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these changes are a good idea to increase precision, but are out of the scope of adding the new fields.
Could you move it out of this PR? Before to add it, we need to check if so precision is necessary for the precision level used in OSM and also the performance penalty that we are adding.

BTW, I'm not sure, but I think that the compiler is not going to optimize the code enough so what in fact are constants (like (BigDecimal.valueOf(1E-9))) are going to be created every time you execute the function (so billions times in this case).

So what do you think about remove these changes from this PR and create a new one to talk about it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have removed them

(.000000001 * (offSet + (granularity * delta))) + currentValue
}

/**
* Calculate coordinate applying offset, granularity and delta.
*
* @param coordValue
* @return
*/
def convertToMicroDegrees(coordValue: Double): Double = {
.0000001 * coordValue
}
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,28 @@ class WayEntitySuite extends AnyFunSuite with Matchers {

// Test
val way = WayEntity(strTable, osmosisWay)
way shouldBe WayEntity(
way shouldBe expectedWayEntity(Vector(), Vector())
}

test("read a real osmosis Way, with geometry.") {

val strTable = StringTable parseFrom new FileInputStream(
"core/src/test/resources/com/acervera/osm4scala/primitives/way/strTable")
val osmosisWay = Way parseFrom new FileInputStream(
"core/src/test/resources/com/acervera/osm4scala/primitives/way/way-with-geo")

// Test
val way = WayEntity(strTable, osmosisWay)
way shouldBe expectedWayEntity(Vector(43.7389436, 43.7392238, 43.7393298, 43.7395534, 43.7397211, 43.739036999999996,
43.7391636, 43.739084899999995, 43.739427299999996, 43.7395025, 43.7392768, 43.7391243, 43.7394671, 43.7395255,
43.7389997, 43.7393746, 43.739060099999996, 43.739576199999995),
Vector(7.4259531, 7.425661099999999, 7.4256563, 7.4256163, 7.425138199999999, 7.425796399999999, 7.4256817999999996,
7.425743, 7.425708299999999, 7.425694699999999, 7.4256535999999995, 7.425708299999999, 7.4257117, 7.4256666,
7.4258602, 7.4256801999999995, 7.425768199999999, 7.4255146))
}

private def expectedWayEntity(latitude: Vector[Double], longitude: Vector[Double]) = {
WayEntity(
4097656,
Vector(
21912089L, 7265761724L, 1079750744L, 2104793864L, 6340961560L, 1110560507L, 21912093L, 6340961559L, 21912095L,
Expand All @@ -70,8 +91,9 @@ class WayEntitySuite extends AnyFunSuite with Matchers {
Some(""),
None
)
)
),
latitude,
longitude,
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,10 @@ object SparkSuitesUtilities {
def monaco: DataFrame = spark.sqlContext.read
.format("osm.pbf")
.load("core/src/test/resources/com/acervera/osm4scala/monaco-anonymized.osm.pbf")

def monacoWithGeometry: DataFrame = spark.sqlContext.read
.format("osm.pbf")
.option("wayWithGeometry", "true")
.load("core/src/test/resources/com/acervera/osm4scala/monaco-anonymized-with-geo.osm.pbf")
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,38 @@ import org.scalatest.wordspec.AnyWordSpecLike

class JobSpec extends AnyWordSpecLike with Matchers {

"TagKeys" should {
"extracting keys without filter" in {
monaco.createOrReplaceTempView("tag_keys_no_filter")
val result = Job.run(monaco, "tag_keys_no_filter", Config(tagKeysConfig = Some(TagKeysCfg())))
.count()
result shouldBe 833
}
for (dataFrame <- Seq(("withoutGeometry", monaco), ("withGeometry", monacoWithGeometry))) {
sergiupantiru marked this conversation as resolved.
Show resolved Hide resolved
"TagKeys" should {
s"extracting keys without filter - ${dataFrame._1}" in {
dataFrame._2.createOrReplaceTempView("tag_keys_no_filter")
val result = Job.run(monaco, "tag_keys_no_filter", Config(tagKeysConfig = Some(TagKeysCfg())))
.count()
result shouldBe 833
}

"extracting keys filtering by nodes" in {
monaco.createOrReplaceTempView("tag_keys_nodes_filter")
val result = Job.run(monaco, "tag_keys_nodes_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(0)))))
.count()
s"extracting keys filtering by nodes - ${dataFrame._1}" in {
dataFrame._2.createOrReplaceTempView("tag_keys_nodes_filter")
val result = Job.run(monaco, "tag_keys_nodes_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(0)))))
.count()

result shouldBe 513
}
result shouldBe 513
}

"extracting keys filtering by ways" in {
monaco.createOrReplaceTempView("tag_keys_ways_filter")
val result = Job.run(monaco, "tag_keys_ways_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(1)))))
.count()
s"extracting keys filtering by ways - ${dataFrame._1}" in {
dataFrame._2.createOrReplaceTempView("tag_keys_ways_filter")
val result = Job.run(monaco, "tag_keys_ways_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(1)))))
.count()

result shouldBe 329
}
result shouldBe 329
}

"extracting keys filtering by relations" in {
monaco.createOrReplaceTempView("tag_keys_relations_filter")
val result = Job.run(monaco, "tag_keys_relations_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(2)))))
.count()
s"extracting keys filtering by relations - ${dataFrame._1}" in {
dataFrame._2.createOrReplaceTempView("tag_keys_relations_filter")
val result = Job.run(monaco, "tag_keys_relations_filter", Config(tagKeysConfig = Some(TagKeysCfg(osmType = Some(2)))))
.count()

result shouldBe 485
result shouldBe 485
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,12 @@ class OsmPbfFormat extends FileFormat with DataSourceRegister {

override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = Some(OsmSqlEntity.schema)
files: Seq[FileStatus]): Option[StructType] =
if (options.getOrElse("wayWithGeometry","false").toLowerCase().equals("true")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep only one schema.
After this change , I think that I will increase the major version of the library, because back compatibility (and I will take this chance to drop Scala 2.11 comp).
I don't think that adding two nullable columns will affect too much.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to keep it backward compatible. I can keep only one if you think it's better

Some(OsmSqlEntity.schemaWithGeo)
} else {
Some(OsmSqlEntity.schema)
}

override def prepareWrite(sparkSession: SparkSession,
job: Job,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.acervera.osm4scala.spark.OsmSqlEntity._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types.{ArrayType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, LongType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String

class OsmPbfRowIterator(osmEntityIterator: Iterator[OSMEntity], requiredSchema: StructType)
Expand Down Expand Up @@ -88,18 +88,57 @@ object OsmPbfRowIterator {
case fieldName => throw new Exception(s"Field $fieldName not valid for Info.")
})

private def populateWay(entity: WayEntity, structType: StructType): Seq[Any] = structType.fieldNames.map {
case FIELD_ID => entity.id
case FIELD_TYPE => ENTITY_TYPE_WAY
case FIELD_LATITUDE => null
case FIELD_LONGITUDE => null
case FIELD_NODES => UnsafeArrayData.fromPrimitiveArray(entity.nodes.toArray)
case FIELD_RELATIONS => new GenericArrayData(Seq.empty)
case FIELD_TAGS => calculateTags(entity.tags)
case FIELD_INFO => entity.info.map(populateInfo).orNull
case fieldName => throw new Exception(s"Field $fieldName not valid for a Way.")
private def populateWay(entity: WayEntity, structType: StructType): Seq[Any] = structType.fields.map(f =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand this change. Maybe I'm missing something. Why do you prefer .fields over .fieldNames when the only use of field is to get the name so we are adding an extra step in the code? At the end it is exactly the same, but, in my opinion, removing extra step is more readable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the same as for Relations, the type of the field is needed

f.name match {
case FIELD_ID => entity.id
case FIELD_TYPE => ENTITY_TYPE_WAY
case FIELD_LATITUDE => null
case FIELD_LONGITUDE => null
case FIELD_NODES => calculateWayNodes(entity.nodes, entity.lat, entity.lgn, f)
case FIELD_RELATIONS => new GenericArrayData(Seq.empty)
case FIELD_TAGS => calculateTags(entity.tags)
case FIELD_INFO => entity.info.map(populateInfo).orNull
case fieldName => throw new Exception(s"Field $fieldName not valid for a Way.")
}
)

private def calculateWayNodes(nodeIds: Seq[Long], lat: Seq[Double], lgn: Seq[Double], structField: StructField)
: ArrayData = {
if (structField.dataType.asInstanceOf[ArrayType].elementType == LongType) {
//way nodes without geometry
UnsafeArrayData.fromPrimitiveArray(nodeIds.toArray)
} else {
//way nodes with geometry
calculateWayNodeWithGeometry(nodeIds, lat, lgn, structField)
}
}

private def calculateWayNodeWithGeometry(nodeIds: Seq[Long], lat: Seq[Double], lgn: Seq[Double], structField: StructField) = {
val nodes: Seq[(Long, (Double, Double))] = (nodeIds zip (lat, lgn).zipped.toList)
new GenericArrayData(
structField.dataType match {
case ArrayType(elementType, _) =>
elementType match {
case s: StructType => nodes.map(r => InternalRow.fromSeq(calculateWayNode(r, s)))
case s =>
throw new UnsupportedOperationException(
s"Schema ${s} isn't supported. Only arrays of StructType are allowed for way nodes.")
}
case s =>
throw new UnsupportedOperationException(
s"Schema ${s} isn't supported. Only arrays of StructType are allowed for way nodes.")
}
)
}

private def calculateWayNode(wayNode: (Long, (Double, Double)), structType: StructType): Seq[Any] =
structType.fieldNames.map {
case FIELD_ID => wayNode._1
case FIELD_LATITUDE => wayNode._2._1
case FIELD_LONGITUDE => wayNode._2._2
case fieldName => throw new Exception(s"Field $fieldName not valid for a way node.")
}

private def populateRelation(entity: RelationEntity, structType: StructType): Seq[Any] =
structType.fields.map(f =>
f.name match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ object OsmSqlEntity {
StructField(FIELD_INFO_VISIBLE, BooleanType, true)
))

lazy val wayNodeSchema = StructType(
Seq(
StructField(FIELD_ID, LongType, true),
StructField(FIELD_LATITUDE, DoubleType, true),
StructField(FIELD_LONGITUDE, DoubleType, true)
))

lazy val schema = StructType(
Seq(
Expand All @@ -94,4 +100,16 @@ object OsmSqlEntity {
StructField(FIELD_INFO, StructType(infoSchema), true)
))

lazy val schemaWithGeo = StructType(
Seq(
StructField(FIELD_ID, LongType, false),
StructField(FIELD_TYPE, ByteType, false),
StructField(FIELD_LATITUDE, DoubleType, true),
StructField(FIELD_LONGITUDE, DoubleType, true),
StructField(FIELD_NODES, ArrayType(StructType(wayNodeSchema), false), true),
StructField(FIELD_RELATIONS, ArrayType(relationSchema, false), true),
StructField(FIELD_TAGS, MapType(StringType, StringType, false), true),
StructField(FIELD_INFO, StructType(infoSchema), true)
))

}
Loading