Fixing JSON out-of-memory error with streaming and MapDB

  1. The code

Once upon time, an API returned 0.5 GB JSON and our server crashed. We cried, we re-implemented it with streaming parser and a disk-backed Map and it worked again - but was slow. We measured and tweaked and finally got to a pretty good time and decent memory and CPU consumption. Come to learn about our journey, streaming JSON parsing, and MapDB!

Originally we used Jackson to parse an array of Devices into Java classes. I don’t remember how much heap we had, certainly not below 1GB, but it wasn’t enough once the data came close to 2 mil devices. The problem wasn’t only parsing the data but also holding that much data in memory. We knew that Jackson supports streaming parsing - a perfect fit for an array - and were looking for a data structure that can leverage the disk instead of holding everything in the memory. We found MapDB, which does just that and, conveniently, exposes the familiar java.util.Map interface for working with the data.

First some data from our good friend JConsole, collected across multiple test runs:

#SolutionHeapCPUProcessing timeData disk size

1.

All in memory

1.2GB

65%

20s

-

2.

Streaming, MapDB, Java serialization

80-150MB

30-55%*

±2 min

± 1GB

3.

Streaming only (throwing the data out)

60MB

160%

few seconds

-

4.

Streaming, MapDB, Jackson serialization

same

same

35-40s

650 MB

*) The less max heap usage the higher CPU usage

Highlights:

  • We needed approximately twice the JSON size of heap to keep all the data in the memory

  • Streaming the data into MapDB cut the heap usage considerably, to ±10% (there is a balance of heap usage vs. CPU usage determined by the dynamic activity of the Java garbage collector)

  • Streaming + MapDB were 6-times slower; this is exclusively due to writing/reading data to/from disk since streaming parsing itself used just a few seconds (see #3)

  • Serialization is crucial - when we switched over to Jackson’s binary serialization we cut the time three times (i.e. still ± twice slower) and disk storage by half

  • (Note: We have also tried to batch `.put`s into the MapDB but it had no effect, presumabely because it already does its own batching)

The code

Here is our wonderful disk-backed map filed by a streaming JSON parser, consisting of the data class Device, StreamingDeviceParser, and OnDiskMap:

// build.gradle:
compile "org.mapdb:mapdb:3.0.7"
compile "com.fasterxml.jackson.core:jackson-core:2.9.5"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.5"
/** Using the whole thing */
Map<String, Device> getAllDevices() {
  def jsonReader = new StringReader(deviceService.getAllDevices())
  def parser = new StreamingDeviceParser()

  // NOTE: .withCloseable is equivallent to Java's try-with-resources
  def map = new OnDiskMap(Device).delete().openForWrite().withCloseable { map ->
    parser.parseDevices(jsonReader, { Device it -> map.put(it.imsi, it) })
    return map
  }
  map.getData()
}

/** The data class */
import com.fasterxml.jackson.annotation.JsonProperty
class Device {
    String imei
    String imsi
    String manufacturer
    String model
    @JsonProperty("os_name") String osName
    @JsonProperty("os_version") String osVersion
    @JsonProperty("registered_date") String registeredDate
    @JsonProperty("updated_date") String updatedDate
    @JsonProperty("device_type") String deviceType
    @JsonProperty("last_usage") String lastUsage
}

/** The parser */
import com.fasterxml.jackson.core.*
import com.fasterxml.jackson.databind.ObjectMapper
class StreamingDeviceParser {

    void parseDevices(Reader json, Closure processDevice) {
        ObjectMapper mapper = new ObjectMapper();
        JsonFactory factory = mapper.getFactory();
        JsonParser parser = factory.createParser(json)

        try {
            assert parser.nextToken() == JsonToken.START_ARRAY
            while( (parser.nextToken()) == JsonToken.START_OBJECT ) {
                processDevice(parser.readValueAs(Device))
            }
            // Optionally: assert token == JsonToken.END_ARRAY
        } finally {
            parser.close()
        }
    }
}

/** The on-disk map */
import com.fasterxml.jackson.databind.ObjectMapper
import org.mapdb.*
import java.util.concurrent.ConcurrentMap
/**
 * Java Map backed by disk storage, for data too large to fit into the memory all at once.
 * Call `open` before accessing it and `close` when done writing.
 */
class OnDiskMap<T> implements Closeable {

    private DB db
    private ConcurrentMap map
    private Serializer valueSer
    private Class<T> valueType

    OnDiskMap(Class<T> valueType) {
        this.valueType = valueType
        valueSer = new JacksonSerializer(valueType)
    }

    /** Delete the data file */
    OnDiskMap delete() {
        if (db && !db.isClosed()) close()
        new File(dbFileName()).delete()
        return this
    }

    OnDiskMap openForWrite() {
        if (db && !db.isClosed()) return this
        db = createDb()
        map = createMap(db)
        return this
    }

    OnDiskMap put(String key, T value) {
        if(db.isClosed()) { throw new IllegalStateException("DB is closed") }
        map.put(key, value)
        return this
    }

    /** Close the map, write any remaining data to the disk. */
    void close() {
        if (db) db.close()
        db = null
        map = null
    }

    /** Read the data. The map does not need to be open and `close` is not necessary. */
    Map<String, T> getData() {
        return createMap(createDb(true))
    }

    private DB createDb(boolean readOnly = false) {
        def maker = DBMaker
                .fileDB(dbFileName())
                .fileMmapEnable()
        if (readOnly) maker = maker.readOnly()
        try {
            return maker.make();
        } catch (DBException.DataCorruption ignored) {
            // We crashed before closing or something; just delete it
            println("INFO: Corrupted old ${dbFileName()}, deleting...")
            delete()
            return maker.make()
        }
    }

    private ConcurrentMap<String, T> createMap(DB db) {
        return db
                .hashMap("map")
                .keySerializer(Serializer.STRING)
                .valueSerializer(valueSer)
                .createOrOpen() as ConcurrentMap<String, T>;
    }

    private String dbFileName() {
        return "${valueType.getSimpleName()}.db".toString()
    }

    private static class JacksonSerializer implements Serializer {

        ObjectMapper mapper = new ObjectMapper();
        Class valueType

        JacksonSerializer(Class valueType) {
            this.valueType = valueType
        }

        void serialize(DataOutput2 out, Object value) throws IOException {
            out.write(mapper.writeValueAsBytes(value))
        }

        Object deserialize(DataInput2 input, int available) throws IOException {
            return mapper.readValue(input, valueType)
        }
    }
}

Tags: performance


Copyright © 2024 Jakub Holý
Powered by Cryogen
Theme by KingMob