val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) logInfo(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + s"open cost is considered as scanning $openCostInBytes bytes.")
val splitFiles = selectedPartitions.flatMap { partition => partition.files.flatMap { file => val blockLocations = getBlockLocations(file) if (fsRelation.fileFormat.isSplitable( fsRelation.sparkSession, fsRelation.options, file.getPath)) { (0L until file.getLen by maxSplitBytes).map { offset => val remaining = file.getLen - offset val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining val hosts = getBlockHosts(blockLocations, offset, size) PartitionedFile( partition.values, file.getPath.toUri.toString, offset, size, hosts) } } else { val hosts = getBlockHosts(blockLocations, 0, file.getLen) Seq(PartitionedFile( partition.values, file.getPath.toUri.toString, 0, file.getLen, hosts)) } } }.toArray.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
val partitions = newArrayBuffer[FilePartition] val currentFiles = newArrayBuffer[PartitionedFile] var currentSize = 0L
/** Close the current partition and move to the next. */ defclosePartition(): Unit = { if (currentFiles.nonEmpty) { val newPartition = FilePartition( partitions.size, currentFiles.toArray.toSeq) // Copy to a new Array. partitions += newPartition } currentFiles.clear() currentSize = 0 }
// Assign files to partitions using "First Fit Decreasing" (FFD) splitFiles.foreach { file => if (currentSize + file.length > maxSplitBytes) { closePartition() } // Add the given file to the current partition. currentSize += file.length + openCostInBytes currentFiles += file } closePartition()
val fileSplit = newFileSplit(newPath(newURI(file.filePath)), file.start, file.length, Array.empty)
val split = new org.apache.parquet.hadoop.ParquetInputSplit( fileSplit.getPath, fileSplit.getStart, fileSplit.getStart + fileSplit.getLength, fileSplit.getLength, fileSplit.getLocations, null)
val attemptId = newTaskAttemptID(newTaskID(newJobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = newTaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
// Try to push down filters when filter push-down is enabled. // Notice: This push-down is RowGroups level, not individual records. if (pushed.isDefined) { ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get) } val parquetReader = if (enableVectorizedReader) { val vectorizedReader = newVectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() } vectorizedReader } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow val reader = pushed match { caseSome(filter) => newParquetRecordReader[UnsafeRow]( newParquetReadSupport, FilterCompat.get(filter, null)) case _ => newParquetRecordReader[UnsafeRow](newParquetReadSupport) } reader.initialize(split, hadoopAttemptContext) reader }
val iter = newRecordReaderIterator(parquetReader) Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
// UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && enableVectorizedReader) { iter.asInstanceOf[Iterator[InternalRow]] } else { val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes val joinedRow = newJoinedRow() val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
// This is a horrible erasure hack... if we type the iterator above, then it actually check // the type in next() and we get a class cast exception. If we make that function return // Object, then we can defer the cast until later! if (partitionSchema.length == 0) { // There is no partition columns iter.asInstanceOf[Iterator[InternalRow]] } else { iter.asInstanceOf[Iterator[InternalRow]] .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues))) } } }