TimeWindow
The following example takes the average stock price for a one minute window every 10 seconds starting 5 seconds after the hour:
val df = ... // schema => timestamp: TimestampType, stockId: StringType, price: DoubleType df.groupBy(window($"time", "1 minute", "10 seconds", "5 seconds"), $"stockId") .agg(mean("price"))
TimeWindow eventually is rewritten as Expand/Filter, etc. Itself is not evaluable.
Given a row, Expand will convert the row to maxNumOverlapping number of values, to generate these number of different rows. By this way, groupby can group the window correctly.