Saturday, April 24, 2021

Fast serialization on .net

This is the accompanying blog post for my packages slStreamUtilsProtobuf and slStreamUtilsMessagePack, which provides a set of open source .net standard 2.0 and above tools for improving the speed of (de)serialization of .net objects offered by both popular .net serialization tools Protobuf-net and MessagePack-CSharp. It’s available under GPL-3 on github.

The improvements gains will depend on the nature and size of the objects being serialized. The greatest benefits will be obtained while working with very large, undetermined-length I/O streams, such as logging the activity of a server process, or a data science project, which can grow to several hundred gigabytes. Regardless, there may be significant performance benefits when dealing with (much) smaller files. 

Introduction

While native .net binary serialization has always been notoriously slow, several faster 3rd party protocols and .net libraries have become available over the years. 

In this article I’ll focus on two of the most common tools, Protobuf-net and MessagePack-CSharp, and show how we can greatly accelerate object serialization using several different techniques, until we’re no longer I/O-bound on the serialization speed, allowing us to take advantage of recent, very fast I/O devices.

This post assumes the user is already familiar with the specifics and pros-cons of his chosen serialization protocol, either Protobuf or MessagePack. Both of the respective tool’s repositories offer good introductory elements on the subject. If a user is interested in only 1 of these tools, he may skip the paragraphs that explicitly deal with the other.

Note: find all examples used in this blog at the project repository’s folder Examples

Benchmarks

Benchark-net projects are available under Benchmarks/slStreamUtilsBenchmark

Please visit the project's page for benchmark charts and descriptions.

 

Tool 1: BufferedStreamReader & BufferedStreamWriter

FileStream, the native implementation of I/O streaming is safe but slow. It’s a wrapper for the unmanaged CreateFile call and it’s protected by a deep layer of redundant argument checks. These constant validations are important and we shouldn’t bypass them. 

This won’t be much of a problem if we’re only moving large buffers on the stream, but will be painfully slow when we move only a few bytes of data at a time. It can be ignored if we are already using a streaming wrapper that internally buffers its memory moves, such as MessagePack.MessagePackStreamReader, or if we use MessagePack’s or Protobuf-net’s methods that take a stream as an argument, because they will internally perform such buffering. 

Solution: We use a buffer to delay calls to the underlying FileStream until we can move a reasonably-sized amount of data. 

Surprisingly, while FileStream itself is already buffered, that doesn’t greatly improve performance. We could wrap it in a BufferedStream, but it also won’t bring great benefits and BufferedStream will complain (sending trace warnings) when the underlying stream is already of the type FileStream. I’ve implemented BufferedStreamReader and BufferedStreamWriter, both inheriting from Stream and splitting the dual nature of stream I/O onto a read and a write context (This of course outright excludes scenarios where we want to both read and write to the stream at once, but normally it’s feasible to alternate between using one and the other mode of operation). 

The scenario where this buffering implementation offers the greatest benefits is when we’re the final users of the stream, and call its Read and Write methods directly, moving small number of bytes at a time.

Usage samples:

BufferedStreamReaderExamples.cs

BufferedStreamWriterExamples.cs

 

 

 


Tool 2: PreFetch and DelayedWriter

 While reading, the OS and most device drivers will anticipate that more contiguous data will be requested, and will usually fetch larger blocks in the background, caching them for later requests. Likewise, for writing, the buffers will be delayed and combined in order to produce larger, more efficient I/O access. 

Solution: we do the same, but more. 

In our particular case of handling a large consecutive collection of objects, we know beforehand that we will repeatedly read/write large contiguous blocks from/to the stream. Both BufferedStreamReader and BufferedStreamWriter have the option of having worker threads interacting with the I/O in the background, respectively called PreFetch and DelayedWriter. They’ll allow work blocks to queue up for I/O access until a certain configurable limit is reached. Besides the obvious benefit of being processed concurrently (the Read and Write calls won’t normally block), this will be useful in scenarios where the data flow is very irregular, i.e., when our sequence of elements contains objects that differ greatly and randomly in size or in serialization complexity (certain types are harder / take longer to process than others, due to the nature of the proto/msgPack protocols).

Sample usage:

PreFetchExamples.cs

DelayedWriterExamples.cs

 


Message Framing

In this paragraph, we’ll assume that we’re dealing with the special case of an array of X, where X is a serializable type (either with MP or Protobuf). Furthermore, we’re interested in the case where the length of X[] isn’t known beforehand, which usually happens when we’re generating logs. The only way we can know that we’ve deserialized all elements is when we reach the end of the stream.


Character Count Message Framing:

In order to introduce concurrency, we’ll have to frame our data, i.e., introduce a delimiter between each X’s binary representation, specifying the length each instance of X will take on the stream. In this manner we can quickly read each X’s buffer and then queue it for further parallel processing. The approach followed will differ between Messagepack and Protobuf.


MesssagePack:

MessagePack has some similarities with the logic of a JSON message, so I’ll exemplify in JSON how the original array of X will look like before and after framing.

Before:


        "src": "Images/Sun.png",
        "hOffset": 250,
        "vOffset": 200,
},

        "src": "Images/Earth.png",
        "hOffset": 100,
        "vOffset": 100,
}


Note that this is actually not a single JSON msg, but a concatenation of individual JSON blocks (it’s missing the outer brackets). It’s not obvious by looking at the JSON representation, but it’s necessary to express X[] in this manner since we don’t know in advance how many elements the array has, and MessagePack needs to prefix each array or map with its number of elements. 

Let’s assume the number of bytes taken by the serialized representation of X[0] is 11 and X[1] is 9 (I’m getting these numbers out of thin air). Then we’ll introduce framing by wrapping each X instance thus:

{
        “length”: 11,
        “body”: 
        { 
                "src": "Images/Sun.png",
                "hOffset": 250,
                "vOffset": 200,
        }
},
{
        “length”: 9,
        “body”: 
        { 
                "src": "Images/Earth.png",
                "hOffset": 100,
                "vOffset": 100,
        }
}

And its binary representation would look like:

Array tag|2|int tag|11|Array tag|3|string tag|byte encoding of "Images/Sun.png"|int tag|250|int tag|200|Array tag|2|int tag|9|Array tag|3|string tag|byte encoding of "Images/Earth.png"|int tag|100|int tag|100

which is still a valid MessagePack encoding, insomuch as the original representation was, but now we can extract from the stream enough information to split it into blocks for parallel processing.



Protobuf:

Protobuf is a different protocol, offering us the option of writing something akin to “here’s another instance of an item belonging to the variable-length array property tag#z of our object; it will occupy the next X bytes of the message”, letting us properly frame our object. If the proto spec of X was, for example:

message X 
{
  string src = 1;
  int32 hOffset = 2;
  int32 vOffset = 3;
}

then an array of X’s could be represented by 

message Y 
{
  repeated X myArray;
}

And the actual byte representation for the example X[] = [X1, X2, X3] with its respective serialized sizes [N1, N2, N3] would look like:

PR|Base128 encoding of N1|proto encoding of X1|PR|Base128 encoding of N2|proto encoding of X2|PR|Base128 encoding of N3|proto encoding of X3|

where PR = 2 + (1 << 3), with 2 being the code for a repeated element, and 1 the proto tag of that element.


Thursday, April 22, 2021

Tool 3: Framing with Protobuf

The details from the paragraph on framing related with Protobuf are implemented for Protobuf-net in the package slStreamUtilsProtobuf, with contains tools for (de)serialization of a continuous flux of objects in a parallel manner using message framing.

Usage examples:

 

       

using slProto = slStreamUtils.MultiThreadedSerialization.Protobuf;
// ...
private static IEnumerable GetSampleArray()
{
    return Enumerable.Range(0, 10).Select(f => new X() { b1 = f % 2 == 0, i1 = f, l1 = f % 3 });
}
// ...
IEnumerable arr = GetSampleArray();
using var s = File.Create(fileName);
//...

// original implementation
foreach (var obj in arr)
    ProtoBuf.Serializer.SerializeWithLengthPrefix(s, obj, ProtoBuf.PrefixStyle.Base128, 1);

// framed multithreaded implementation
await using var ser = new CollectionSerializerAsync(s, new FIFOWorkerConfig(maxConcurrentTasks: 2));
foreach (var item in arr)
    await ser.SerializeAsync(item);


       
 


Wednesday, April 21, 2021

Tool 4: Framing with MessagePack

Here we will  show how to use the slStreamUtilsMessagePack package for MessagePack-CSharp, which implements the elements from the paragraph on framing related with MessagePack. It both contains tools for (de)serialization of a continuous flux of objects in a parallel manner using message framing, as well as formatters to automatically (de)serialize any array within any user-defined class containing Frame<T> elements.

Usage examples for collection of unknown length:

Serialization:

       
using var s = File.Create(fileName);
await using var ser = new CollectionSerializerAsync(s, 
    new FIFOWorkerConfig(maxConcurrentTasks: 2));
foreach (var item in myEnumerable)
    await ser.SerializeAsync(new Frame(item));
       
 

Deserialization:

       
using var s = File.OpenRead(fileName);
using var ds = new CollectionDeserializerAsync(maxConcurrentTasks: 2);
await foreach (var item in ds.DeserializeAsync(s))
    yield return item.Item;
       
 

Usage examples for individual class with arrays of known length:

Sample class containing arrays of framed elements:
       
[MessagePackObject]
public class ArrayX
{
    // ... fields
    [Key(N)]
    public Frame<X>[] arr; // where X is any serializable type
    // ... more fields
}
       
 
The Frame<T> struct is merely a wrapper for a length prefix and the underlying object X. It implements the implicit assignment, cast and equality operators so it can be used interchangeably with X.

It can be (de)serialized as usual using MessagePackSerializer.SerializeAsync and MessagePackSerializer.DeserializeAsync, with the options arguments FrameParallelOptions and FrameResolverPlusStandarResolver to control the level of parallelism:

       
ArrayX obj = new ArrayX(...)
int totWorkerThreads = 4;
var opts = new FrameParallelOptions(totWorkerThreads, 
    MessagePackSerializerOptions.Standard.WithResolver(
    FrameResolverPlusStandarResolver.Instance));
using (var s1 = File.Create(fileName))
    // this will include framing header and process ArrayX.arr 
    // in parallel while writing to the underlying stream
    await MessagePackSerializer.SerializeAsync(s1, obj, opts); 
using (var s2 = File.OpenRead(fileName))
    // this will process ArrayX.arr in parallel while loading 
    // since it has framing data
    newObj = await MessagePackSerializer.DeserializeAsync(s2, opts); 
        
 

.