This past week I had to deal with loading few terra bytes of data into our Spark cluster. This data is stored in a JSON array, and there is no line break to separate individual JSON objects. Spark can easily deal with JSON, but your JSON must be one object per line. I had to write a custom Hadoop RecordReader to work around this issue.
newAPIHadoopFile
It is easy to load a file in Spark with any line format. Here is how I ended up doing it:
import org.apache.hadoop.conf.Configuration
import mojio.spark.TripJsonFileInputFormat
val conf = new Configuration(sc.hadoopConfiguration)
val rddraw = sc.newAPIHadoopFile("path to my JSON file", classOf[TripJsonFileInputFormat],
classOf[String], classOf[String], conf)
As you can see above, I had to define a new FileInputFormat which outputs a String key and String value.
FileInputFormat
Only thing this custom FileInputFormat does is to implement the createRecordReader method to load my custom RecordReader:
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
class TripJsonFileInputFormat extends FileInputFormat[String, String] {
override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
RecordReader[String, String] = new TripJsonRecordReader()
}
TripJsonRecordReader
Record Reader is where the bulk of the operations happens. It is responsible to process the InputSplit (chunk of input). In summary, it opens a stream to the file, seeks to the start of the split and reads key/value pairs until it reaches the end of the split. In my case, I could rely on the fact that my JSON objects always started with a special property (Type:Trip).
import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import scala.io.Source
class TripJsonRecordReader() extends RecordReader[String, String] {
var start, end, pos = 0L
var reader: Source = _
var key: String = _
var value: String = _
var objBuffer = new StringBuilder
val pattern = "regex to find the unique id".r
override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
// find start and end of the split
val split = inputSplit.asInstanceOf[FileSplit]
start = split.getStart
pos = start
end = start + split.getLength
// open a stream to the data, pointing to the start of the split
val stream = split.getPath
.getFileSystem(context.getConfiguration)
.open(split.getPath)
stream.seek(start)
reader = Source.fromInputStream(stream, "utf-8")
}
override def nextKeyValue(): Boolean = {
var objStart, objEnd = -1
objStart = findNextObjectStart(0)
if (objStart != -1) {
objEnd = findNextObjectStart(objStart + 1)
}
if (objEnd == -1) {
// We've reached end of Split
key = null
value = null
false
} else {
// Value is the JSON string we found
value = objBuffer.substring(objStart, objEnd - 1)
// Trip Id is the key
key = pattern.findFirstMatchIn(value).map(_.group(1)).getOrElse("id not found")
// remove this object from the buffer
objBuffer = objBuffer.drop(objEnd - 1)
// Tell Hadoop a new line has been found
true
}
}
override def getCurrentKey(): String = {
key
}
override def getCurrentValue(): String = {
value
}
override def getProgress(): Float = {
if (start == end) {
0.0f
} else {
Math.min(1.0f, (pos - start) / (end - start).toFloat)
}
}
override def close(): Unit = {
if (reader != null) {
reader.close()
}
}
def loadNextChunk() : Unit = {
// read a chunk
val chars = reader.take(8192).mkString
// move pointer
pos += chars.getBytes().length
// append characters to string buffer
objBuffer.append(chars)
}
def findNextObjectStart(after: Int) : Int = {
while (true) {
// find start of the Trip object
val found = objBuffer.indexOf("""{"Type":"trip",""", after)
if (found != -1) {
return found
}
if (pos >= end) {
val len = objBuffer.toString().length
if (len < 15)
return -1
else
return len
}
// not found, load another chunk
loadNextChunk()
}
-1
}
}
81b504c0-b61a-4110-866f-a7429a8735c4|0|.0|96d5b379-7e1d-4dac-a6ba-1e50db561b04