Chunking Strategies
Graviton supports multiple chunking algorithms for content-defined deduplication. Choosing the right strategy impacts storage efficiency and retrieval performance.
Overview
Chunking divides byte streams into addressable blocks. Graviton supports:
- Fixed-size chunking — Simple, predictable boundaries (
Chunker.fixed) - FastCDC — Fast content-defined chunking (
Chunker.fastCdc) - Delimiter chunking — Split on a byte delimiter using a streaming KMP matcher (
Chunker.delimiter)
TIP
This page also mentions a few planned chunkers (Anchored CDC, BuzHash, Rabin). The runtime APIs you can use today are Chunker.fixed, Chunker.fastCdc, and Chunker.delimiter.
Streaming API (what you actually use)
ZIO streams
graviton.streams.Chunker exposes chunkers as a typed ZPipeline[Any, Chunker.Err, Byte, Block]:
import graviton.core.types.UploadChunkSize
import graviton.streams.Chunker
import zio._
import zio.stream._
val chunkSize = UploadChunkSize(1024 * 1024) // compile-time refined
val blocks: ZStream[Any, Throwable | Chunker.Err, graviton.core.model.Block] =
ZStream.fromFileName("data.bin")
.via(Chunker.fixed(chunkSize).pipeline)“Pure” state machine (no ZIO)
If you want to run the chunker logic without streams (e.g. tests, benchmarks, non-ZIO integrations), use ChunkerCore:
import graviton.streams.ChunkerCore
import zio.Chunk
val st0 = ChunkerCore.init(ChunkerCore.Mode.FastCdc(minBytes = 256, avgBytes = 1024, maxBytes = 4096)).toOption.get
val (st1, blocks1) = st0.step(Chunk.fromArray(Array[Byte](1,2,3,4))).toOption.get
val (_, blocks2) = st1.finish.toOption.getstep returns an updated state and 0..N emitted Blocks, exactly like a “parse state machine” that you can lift into a stream.
Typed Blocks & Upload Chunks
All chunkers in Graviton emit opaque Block values (backed by Chunk[Byte]) so that size invariants are enforced at the type level.
import graviton.core.attributes.{BinaryAttributes, Source, Tracked}
import graviton.core.model.{Block, BlockBuilder, ByteConstraints}
import zio.Chunk
val bytes: Chunk[Byte] = Chunk.fromArray(inputArray)
val blocks: Chunk[Block] = BlockBuilder.chunkify(bytes)
// Strongly typed helpers
val firstBlockSize = blocks.headOption.map(_.blockSize) // -> ByteConstraints.BlockSize
val updatedAttributes = blocks.headOption
.fold(BinaryAttributes.empty) { block =>
BinaryAttributes.empty.confirmSize(Tracked.now(block.fileSize, Source.Derived))
}The same module exposes ByteConstraints.refine* helpers for validating raw sizes before constructing blocks or upload chunks. Downstream APIs continue to import graviton.core.types.BlockSize / ChunkCount as before, but now benefit from these refined guarantees.
Fixed-Size Chunking
Algorithm
Split every N bytes:
import graviton.core.types.UploadChunkSize
import graviton.streams.Chunker
import zio.stream.ZStream
val chunkSize = UploadChunkSize(1024 * 1024) // compile-time refined
ZStream.fromFileName("data.bin")
.via(Chunker.fixed(chunkSize).pipeline)Characteristics
| Property | Value |
|---|---|
| Speed | Fastest |
| Deduplication | Poor (boundary-sensitive) |
| Predictability | Excellent |
| Best for | Append-only logs, fixed-record data |
When to Use
✅ Good for:
- Append-only data (logs, time-series)
- When deduplication isn't important
- Predictable I/O patterns matter
❌ Avoid for:
- Documents with edits in the middle
- Data with insertions/deletions
- Maximum deduplication needed
FastCDC
Algorithm
FastCDC is content-defined chunking using a rolling hash updated byte-by-byte. The implementation in graviton-streams is single-pass and bounded by maxBytes.
import graviton.streams.Chunker
import zio.stream.ZStream
val min = 256 * 1024
val avg = 1024 * 1024
val max = 4 * 1024 * 1024
ZStream.fromFileName("data.bin")
.via(Chunker.fastCdc(min = min, avg = avg, max = max).pipeline)Bounds
min: don’t cut before this many bytes (avoids tiny blocks)avg: controls the expected boundary ratemax: hard cap; forces a cut even if the content-defined boundary hasn’t triggered
Characteristics
| Property | Value |
|---|---|
| Speed | Fast (2-3 GB/s) |
| Deduplication | Very Good |
| Chunk size variance | Low |
| Best for | General-purpose storage |
Configuration Guide
// High throughput, lower dedup
val highSpeed = FastCDCConfig(
minSize = 512 * 1024,
avgSize = 2 * 1024 * 1024,
maxSize = 8 * 1024 * 1024,
normalization = 1
)
// Balanced
val balanced = FastCDCConfig(
minSize = 256 * 1024,
avgSize = 1024 * 1024,
maxSize = 4 * 1024 * 1024,
normalization = 2
)
// Maximum deduplication
val maxDedup = FastCDCConfig(
minSize = 128 * 1024,
avgSize = 512 * 1024,
maxSize = 2 * 1024 * 1024,
normalization = 3
)Anchored CDC
WARNING
Anchored CDC is a roadmap item. It is documented here as a design pattern, but is not currently exposed as a runtime Chunker.
Algorithm
Uses content-defined anchors with rechunking:
final case class AnchoredCDCConfig(
anchorPattern: ByteString, // Pattern to search for
minSize: Int = 256 * 1024,
avgSize: Int = 1024 * 1024,
maxSize: Int = 4 * 1024 * 1024,
rechunkThreshold: Double = 0.8 // Rechunk if chunk > threshold * maxSize
)
object AnchoredCDC:
def chunker(config: AnchoredCDCConfig): ZPipeline[Any, Nothing, Byte, Block] =
ZPipeline.fromSink(
ZSink.foldWeightedDecompose(Chunk.empty[Byte])(_.size.toLong)(config.maxSize.toLong) {
case (acc, chunk) =>
val combined = acc ++ chunk
val anchors = findAnchors(combined, config.anchorPattern)
if anchors.nonEmpty then
// Split at anchor points
val (blocks, remaining) = splitAt Anchors(combined, anchors)
(blocks, remaining)
else if combined.size >= config.maxSize then
// Force split at max size
(Chunk.single(combined), Chunk.empty)
else
// Keep accumulating
(Chunk.empty, combined)
}
)Use Cases
Document formats:
// PDF: Split at object boundaries
val pdfAnchors = AnchoredCDCConfig(
anchorPattern = ByteString("endobj\n")
)
// ZIP: Split at file entries
val zipAnchors = AnchoredCDCConfig(
anchorPattern = ByteString(0x50, 0x4B, 0x03, 0x04) // PK.. signature
)BuzHash CDC
WARNING
BuzHash CDC is a roadmap item. It is documented here as a design pattern, but is not currently exposed as a runtime Chunker.
Algorithm
Classic rolling hash with simpler implementation:
final case class BuzHashConfig(
minSize: Int = 256 * 1024,
avgSize: Int = 1024 * 1024,
maxSize: Int = 4 * 1024 * 1024,
windowSize: Int = 64
)
object BuzHash:
def chunker(config: BuzHashConfig): ZPipeline[Any, Nothing, Byte, Block] =
val mask = maskFromAvgSize(config.avgSize)
ZPipeline.mapAccumChunks(State.initial(config)) { (state, chunk) =>
chunk.foldLeft((state, Chunk.empty[Block])) { case ((s, blocks), byte) =>
val hash = s.hash.roll(byte)
if s.size >= config.minSize && (hash & mask) == 0 then
// Boundary hit
val block = Block(s.buffer :+ byte, hash)
(State.initial(config), blocks :+ block)
else if s.size >= config.maxSize then
// Force boundary
val block = Block(s.buffer :+ byte, hash)
(State.initial(config), blocks :+ block)
else
// Keep accumulating
(s.copy(buffer = s.buffer :+ byte, hash = hash, size = s.size + 1), blocks)
}
}Characteristics
| Property | Value |
|---|---|
| Speed | Moderate (1-2 GB/s) |
| Deduplication | Good |
| Chunk size variance | Moderate |
| Best for | Legacy compatibility |
Comparison Matrix
| Algorithm | Speed | Dedup Quality | Variance | Memory |
|---|---|---|---|---|
| Fixed | ⭐⭐⭐⭐⭐ | ⭐ | None | Minimal |
| FastCDC | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | Low | Low |
| Anchored | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Medium | Medium |
| BuzHash | ⭐⭐⭐ | ⭐⭐⭐ | Medium | Low |
| Rabin | ⭐⭐ | ⭐⭐⭐⭐⭐ | High | Medium |
Choosing an Algorithm
Decision Tree
Recommendations
General-purpose:
val chunker = FastCDC.chunker(FastCDCConfig(
minSize = 256.KB,
avgSize = 1.MB,
maxSize = 4.MB,
normalization = 2
))High-throughput logs:
val chunker = FixedChunker(1.MB)Structured documents:
val chunker = AnchoredCDC.chunker(AnchoredCDCConfig(
anchorPattern = detectFormat(stream),
minSize = 128.KB,
avgSize = 512.KB,
maxSize = 2.MB
))Maximum deduplication:
val chunker = FastCDC.chunker(FastCDCConfig(
minSize = 64.KB,
avgSize = 256.KB,
maxSize = 1.MB,
normalization = 3
))Advanced Patterns
Adaptive Chunking
Switch algorithms based on detected content:
def adaptiveChunker: ZPipeline[Any, Throwable, Byte, Block] =
ZPipeline.fromSink(
for {
header <- ZSink.take(8192) // Sample first 8KB
format <- detectFormat(header)
chunker = format match
case Format.PDF => AnchoredCDC.chunker(pdfConfig)
case Format.ZIP => AnchoredCDC.chunker(zipConfig)
case Format.Log => FixedChunker(1.MB)
case _ => FastCDC.chunker(defaultConfig)
} yield chunker
)Two-Level Chunking
Chunk, then sub-chunk for better dedup:
val twoLevel =
FastCDC.chunker(FastCDCConfig(avgSize = 4.MB)) // Coarse
.via(ZPipeline.mapChunks { chunk =>
// Sub-chunk each large block
chunk.flatMap { block =>
if block.size > 1.MB then
rechunk(block, FastCDCConfig(avgSize = 256.KB))
else
Chunk.single(block)
}
})Dedup-aware Compression
Only compress non-deduped blocks:
def smartCompression(
chunker: ZPipeline[Any, Nothing, Byte, Block],
dedupIndex: DedupIndex
): ZPipeline[Any, Throwable, Byte, StorableBlock] =
chunker
.mapZIO { block =>
for {
hash <- HashAlgo.SHA256.hash(block.bytes)
exists <- dedupIndex.contains(hash)
final <- if exists then
ZIO.succeed(StorableBlock.Ref(hash)) // Already stored
else
compress(block).map(StorableBlock.Compressed(_)) // New block
} yield final
}Performance Tuning
Chunk Size Impact
Small chunks (64-256 KB):
- ✅ Better deduplication
- ✅ Finer-grained retrieval
- ❌ More metadata overhead
- ❌ More index lookups
Large chunks (2-8 MB):
- ✅ Less metadata
- ✅ Faster streaming
- ❌ Worse deduplication
- ❌ Larger retrieval granularity
Parallelization
Chunk multiple streams concurrently:
val parallelChunking = ZStream.fromIterable(files)
.mapZIOPar(8) { file =>
ZStream.fromFile(file)
.via(FastCDC.chunker(config))
.foreach(block => store.put(block))
}Memory Management
Use bounded queues:
def boundedChunking(
stream: ZStream[Any, Throwable, Byte],
chunker: ZPipeline[Any, Nothing, Byte, Block],
maxQueued: Int = 16
): ZStream[Any, Throwable, Block] =
stream
.via(chunker)
.buffer(maxQueued)Testing
Property Tests
test("chunk boundaries are content-defined") {
check(Gen.chunkOf(Gen.byte)) { bytes =>
val chunks1 = chunkBytes(bytes)
val chunks2 = chunkBytes(bytes.take(100) ++ bytes.drop(100))
// Boundaries after byte 100 should be identical
chunks1.drop(1) == chunks2.drop(1)
}
}
test("chunk sizes respect bounds") {
check(Gen.chunkOf(Gen.byte)) { bytes =>
val chunks = chunkBytes(bytes, config)
chunks.forall { chunk =>
chunk.size >= config.minSize && chunk.size <= config.maxSize
}
}
}See Also
- Scans & Events — Boundary detection
- End-to-end Upload — Complete ingest pipeline
- Performance Tuning — Optimization strategies
TIP
Start with FastCDC Level2 for most use cases. Profile with your actual data before optimizing!