Here we will show how to use the slStreamUtilsMessagePack package for MessagePack-CSharp, which implements the elements from the paragraph on framing related with MessagePack. It both contains tools for (de)serialization of a continuous flux of objects in a parallel manner using message framing, as well as formatters to automatically (de)serialize any array within any user-defined class containing Frame<T> elements.
Usage examples for collection of unknown length:
Serialization:
using var s = File.Create(fileName);
await using var ser = new CollectionSerializerAsync(s,
new FIFOWorkerConfig(maxConcurrentTasks: 2));
foreach (var item in myEnumerable)
await ser.SerializeAsync(new Frame(item));
Deserialization:
using var s = File.OpenRead(fileName);
using var ds = new CollectionDeserializerAsync(maxConcurrentTasks: 2);
await foreach (var item in ds.DeserializeAsync(s))
yield return item.Item;
Usage examples for individual class with arrays of known length:
Sample class containing arrays of framed elements:
[MessagePackObject]
public class ArrayX
{
// ... fields
[Key(N)]
public Frame<X>[] arr; // where X is any serializable type
// ... more fields
}
The Frame<T> struct is merely a wrapper for a length prefix and the underlying object X. It implements the implicit assignment, cast and equality operators so it can be used interchangeably with X.It can be (de)serialized as usual using MessagePackSerializer.SerializeAsync and MessagePackSerializer.DeserializeAsync, with the options arguments FrameParallelOptions and FrameResolverPlusStandarResolver to control the level of parallelism:
ArrayX obj = new ArrayX(...)
int totWorkerThreads = 4;
var opts = new FrameParallelOptions(totWorkerThreads,
MessagePackSerializerOptions.Standard.WithResolver(
FrameResolverPlusStandarResolver.Instance));
using (var s1 = File.Create(fileName))
// this will include framing header and process ArrayX.arr
// in parallel while writing to the underlying stream
await MessagePackSerializer.SerializeAsync(s1, obj, opts);
using (var s2 = File.OpenRead(fileName))
// this will process ArrayX.arr in parallel while loading
// since it has framing data
newObj = await MessagePackSerializer.DeserializeAsync(s2, opts);
.
No comments:
Post a Comment