Fixing JSON out-of-memory error with streaming and MapDB
Once upon time, an API returned 0.5 GB JSON and our server crashed. We cried, we re-implemented it with streaming parser and a disk-backed Map and it worked again - but was slow. We measured and tweaked and finally got to a pretty good time and decent memory and CPU consumption. Come to learn about our journey, streaming JSON parsing, and MapDB!
Originally we used Jackson to parse an array of Device
s into Java classes. I don’t remember how much heap we had, certainly not below 1GB, but it wasn’t enough once the data came close to 2 mil devices. The problem wasn’t only parsing the data but also holding that much data in memory. We knew that Jackson supports streaming parsing - a perfect fit for an array - and were looking for a data structure that can leverage the disk instead of holding everything in the memory. We found MapDB, which does just that and, conveniently, exposes the familiar java.util.Map
interface for working with the data.
First some data from our good friend JConsole, collected across multiple test runs:
# | Solution | Heap | CPU | Processing time | Data disk size |
---|---|---|---|---|---|
1. | All in memory | 1.2GB | 65% | 20s | - |
2. | Streaming, MapDB, Java serialization | 80-150MB | 30-55%* | ±2 min | ± 1GB |
3. | Streaming only (throwing the data out) | 60MB | 160% | few seconds | - |
4. | Streaming, MapDB, Jackson serialization | same | same | 35-40s | 650 MB |
*) The less max heap usage the higher CPU usage
Highlights:
We needed approximately twice the JSON size of heap to keep all the data in the memory
Streaming the data into MapDB cut the heap usage considerably, to ±10% (there is a balance of heap usage vs. CPU usage determined by the dynamic activity of the Java garbage collector)
Streaming + MapDB were 6-times slower; this is exclusively due to writing/reading data to/from disk since streaming parsing itself used just a few seconds (see #3)
Serialization is crucial - when we switched over to Jackson’s binary serialization we cut the time three times (i.e. still ± twice slower) and disk storage by half
(Note: We have also tried to batch `.put`s into the MapDB but it had no effect, presumabely because it already does its own batching)
The code
Here is our wonderful disk-backed map filed by a streaming JSON parser, consisting of the data class Device
, StreamingDeviceParser
, and OnDiskMap
:
// build.gradle:
compile "org.mapdb:mapdb:3.0.7"
compile "com.fasterxml.jackson.core:jackson-core:2.9.5"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.5"
/** Using the whole thing */
Map<String, Device> getAllDevices() {
def jsonReader = new StringReader(deviceService.getAllDevices())
def parser = new StreamingDeviceParser()
// NOTE: .withCloseable is equivallent to Java's try-with-resources
def map = new OnDiskMap(Device).delete().openForWrite().withCloseable { map ->
parser.parseDevices(jsonReader, { Device it -> map.put(it.imsi, it) })
return map
}
map.getData()
}
/** The data class */
import com.fasterxml.jackson.annotation.JsonProperty
class Device {
String imei
String imsi
String manufacturer
String model
@JsonProperty("os_name") String osName
@JsonProperty("os_version") String osVersion
@JsonProperty("registered_date") String registeredDate
@JsonProperty("updated_date") String updatedDate
@JsonProperty("device_type") String deviceType
@JsonProperty("last_usage") String lastUsage
}
/** The parser */
import com.fasterxml.jackson.core.*
import com.fasterxml.jackson.databind.ObjectMapper
class StreamingDeviceParser {
void parseDevices(Reader json, Closure processDevice) {
ObjectMapper mapper = new ObjectMapper();
JsonFactory factory = mapper.getFactory();
JsonParser parser = factory.createParser(json)
try {
assert parser.nextToken() == JsonToken.START_ARRAY
while( (parser.nextToken()) == JsonToken.START_OBJECT ) {
processDevice(parser.readValueAs(Device))
}
// Optionally: assert token == JsonToken.END_ARRAY
} finally {
parser.close()
}
}
}
/** The on-disk map */
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapdb.*
import java.util.concurrent.ConcurrentMap
/**
* Java Map backed by disk storage, for data too large to fit into the memory all at once.
* Call `open` before accessing it and `close` when done writing.
*/
class OnDiskMap<T> implements Closeable {
private DB db
private ConcurrentMap map
private Serializer valueSer
private Class<T> valueType
OnDiskMap(Class<T> valueType) {
this.valueType = valueType
valueSer = new JacksonSerializer(valueType)
}
/** Delete the data file */
OnDiskMap delete() {
if (db && !db.isClosed()) close()
new File(dbFileName()).delete()
return this
}
OnDiskMap openForWrite() {
if (db && !db.isClosed()) return this
db = createDb()
map = createMap(db)
return this
}
OnDiskMap put(String key, T value) {
if(db.isClosed()) { throw new IllegalStateException("DB is closed") }
map.put(key, value)
return this
}
/** Close the map, write any remaining data to the disk. */
void close() {
if (db) db.close()
db = null
map = null
}
/** Read the data. The map does not need to be open and `close` is not necessary. */
Map<String, T> getData() {
return createMap(createDb(true))
}
private DB createDb(boolean readOnly = false) {
def maker = DBMaker
.fileDB(dbFileName())
.fileMmapEnable()
if (readOnly) maker = maker.readOnly()
try {
return maker.make();
} catch (DBException.DataCorruption ignored) {
// We crashed before closing or something; just delete it
println("INFO: Corrupted old ${dbFileName()}, deleting...")
delete()
return maker.make()
}
}
private ConcurrentMap<String, T> createMap(DB db) {
return db
.hashMap("map")
.keySerializer(Serializer.STRING)
.valueSerializer(valueSer)
.createOrOpen() as ConcurrentMap<String, T>;
}
private String dbFileName() {
return "${valueType.getSimpleName()}.db".toString()
}
private static class JacksonSerializer implements Serializer {
ObjectMapper mapper = new ObjectMapper();
Class valueType
JacksonSerializer(Class valueType) {
this.valueType = valueType
}
void serialize(DataOutput2 out, Object value) throws IOException {
out.write(mapper.writeValueAsBytes(value))
}
Object deserialize(DataInput2 input, int available) throws IOException {
return mapper.readValue(input, valueType)
}
}
}