Dataset/DataFrame

Dataset is a strong typed data structure. The difference from RDD is that Spark can understand the type of internal structure of Dataset and knows exactly how to serialize and deserialize it to/from binary form thanks to the Encoder.

DataFrame is a special DataSet with T as Row type (DataSet[Row]).

Internally, Dataset is represented as InternalRow.Take a look at toDF, it actually only attached the new RowEncoder(schema), indicating that underlying format already is a InternalRow.

On the one hand, in SQLContext.createDataset, it is found that the data T is always convert to InternalRow before constructing LogicalRDD.

val enc = encoderFor[T]
val attributes = enc.schema.toAttributes
val encoded = data.map(d => enc.toRow(d))
val plan = LogicalRDD(attributes, encoded)(self)

Dataset[T](this, plan)

On the other hand, in MapElementsExec.doExecute, it is found that the object is always constrcuted frow InternalRow, before apply the function, and then convert to InternalRow again. There are improvements to eliminate unnecessary serdes through wrapper.

Note that MapElements is embeded inside of two extra logical plan: DeserializeToObject and SerializeFromObject, which later may be eliminated by EliminateSerialization.

val callFunc: Any => Any = func match {
  case m: MapFunction[_, _] => i => m.asInstanceOf[MapFunction[Any, Any]].call(i)
  case _ => func.asInstanceOf[Any => Any]
}
child.execute().mapPartitionsInternal { iter =>
  val getObject = unwrapObjectFromRow(child.output.head.dataType)
  val outputObject = wrapObjectToRow(outputObjAttr.dataType)
  iter.map(row => outputObject(callFunc(getObject(row))))
}

At last, let's look what DataSet.as is doing, only new Encoder[U] is attached. The old type information [T] is lost, as it is not required at all, because the underlying format is already InternalRow with schema maintained.

results matching ""

    No results matching ""