ShuffleReader

BlockStoreShuffleReader use ExternalSorter to do mergeSort.
In detail, it get mulitple input stream from the reducer through ShuffleBlockFetcherIterator. Then streams is flatMapped to record iterator and fed into ExternalSorter by sorter.insertAll(aggregatedIter), this function will read all records, do aggregation by map.

If spill is required, it use WritablePartitionedPairCollection.destructiveSortedWritablePartitionedIterator to write the record in order.

Later on , these spilled files will be sorted by mergeSort if ordering is required.

Note that here the ShuffleReader is not leverage the ordering on the map side.

TransportResponseHandler receives response and based on resp.streamChunkId find the listener. Note that the buffer.release is invoked (which only decrease the reference count without release the buffer, because the listener (ShuffleBlockFetcherIterator) invoke retain() to increase the count).

results in ShuffleBlockFetcherIterator saves the net buffer, which is retain() before enqueued. The results will be accessed in different thread.

ShuffleBlockFetcherIterator is an iterator, its next() call results.take() get the result, and wrapper the buffer as BufferReleasingInputStream. When the stream is closed, the buffer will be released.

BlockStoreShuffleReader create ShuffleBlockFetcherIterator. In side flatMap serializerInstance.deserializeStream(wrappedStream) convert the shuffle block to KV iterator.

Overall, besides released by TransportResponseHandler, in addition the netty buffer is retain/release in ShuffleBlockFetcherIterator.

results matching ""

    No results matching ""