Scans & Events
Graviton's scan system provides composable, type-safe stream processing. Today there are two related (but distinct) APIs:
graviton.core.scan.Scan— a direct, “sicko” stateful transducer you can compose and run (including as aZPipeline).graviton.core.scan.FreeScanV2— an inspectable free scan program used by existing chunkers/hashes/etc (see “FreeScanV2” sections below).
Overview
A Scan is a stateful transducer from inputs I to outputs O, paired with typed state S and supporting:
- Composable state: state is threaded automatically through
>>>,&&&,+++,|||,first,second,dimap - Ergonomic state: model state as a domain case class (or use
Scan.NoStatefor “no state”) - Runnable: interpret to
ZPipelinewithscan.toPipeline
Core API (graviton.core.scan.Scan)
Scan Interface
import zio.Chunk
trait Scan[-I, +O, S, +C]:
def init(): S
def step(state: S, input: I): (S, O)
def flush(state: S): (S, Chunk[O])Key properties:
- Empty state is
Scan.NoState: stateless scans useS = Scan.NoState. - Capabilities are tracked at the type level:
Ccomposes viaScan.CapUnion(identity isAny, likeFreeArrow). - State representation is user-defined:
Scan be a case class, etc. - Composition carries state: composing scans uses a carrier
ComposeState[SA, SB]:Scan.NoStateis an identity (ComposeState[Scan.NoState, SB] = SB,ComposeState[SA, Scan.NoState] = SA)- otherwise it becomes a tuple
(SA, SB)
Stateful Record Example
import graviton.core.scan.Scan
import zio.Chunk
final case class CountState(count: Long)
val counting: Scan[Long, Long, CountState, Any] =
Scan.fold[Long, Long, CountState](CountState(0L)) { (s, _) =>
val next = s.count + 1L
(CountState(next), next)
}(s => (s, Chunk.empty))Stateless Scans
import graviton.core.scan.Scan
val doubled: Scan[Int, Int, Scan.NoState, Any] =
Scan.pure(_ * 2)Running as a ZIO pipeline
import zio.stream.ZPipeline
import graviton.core.scan.Scan
val pipeline: ZPipeline[Any, Nothing, Int, Int] =
Scan.pure[Int, Int](_ * 2).toPipelineArrow Combinators
Sequential Composition (>>>)
import graviton.core.scan.Scan
val pipeline =
Scan.pure[Int, Int](_ + 1) >>> Scan.pure[Int, Int](_ * 2)Dimap (dimap)
import graviton.core.scan.Scan
val program =
Scan.pure[Int, Int](_ * 2).dimap[String, String](_.toInt, _.toString)Parallel on tuples (+++)
import graviton.core.scan.Scan
val left = Scan.pure[Int, Int](_ + 1)
val right = Scan.pure[String, Int](_.length)
val both = left +++ right
// (Int, String) => (Int, Int)Choice on either (|||)
import graviton.core.scan.Scan
val ints = Scan.pure[Int, Int](_ + 1)
val chars = Scan.pure[String, Int](_.length)
val routed = ints ||| chars
// Either[Int, String] => Either[Int, Int]Fanout / broadcast (&&&)
&&& runs both scans on the same input and returns a product output (tuples by default, or use .labelled[...] for labelled outputs).
import graviton.core.scan.Scan
import kyo.Tag.given
val a = Scan.pure[Int, Int](_ + 1)
val b = Scan.pure[Int, String](_.toString)
val out = a &&& bFirst / Second
import graviton.core.scan.Scan
val scan = Scan.pure[Int, Int](_ + 1)
val first = scan.first[String] // (Int, String) => (Int, String)
val second = scan.second[String] // (String, Int) => (String, Int)Free Representation (FreeScanV2)
If you need an inspectable program (for optimization/visualization), see graviton.core.scan.FreeScanV2 (FreeScan, FS, and the Compile interpreter). This is the API used by existing batteries-included scans (hashing, chunking, manifests, etc.).
Benefits:
- Lawful: Verify category, arrow, choice, parallel laws
- Optimizable: Fusion, dead-code elimination before interpretation
- Inspectable: Introspect scan structure for debugging/metrics
- Multi-target: Interpret to ZIO, pure loops, Spark, etc.
Built-in FreeScanV2 primitives
FreeScanV2 provides a small set of batteries-included primitives in graviton.core.scan.FS (see FreeScanV2.scala):
FS.counter/FS.byteCounterFS.hashBytesFS.fixedChunkerFS.buildManifest
Example:
import graviton.core.scan.FS.*
import kyo.Tag.given
import zio.Chunk
val program =
counter[Chunk[Byte]].labelled["count"] &&&
byteCounter.labelled["bytes"]Composition Examples
Scan: sequential + state
import graviton.core.scan.Scan
import zio.Chunk
final case class CountState(count: Long)
val counting: Scan[Long, Long, CountState, Any] =
Scan.fold[Long, Long, CountState](CountState(0L)) { (s, _) =>
val next = s.count + 1L
(CountState(next), next)
}(s => (s, Chunk.empty))
val program = Scan.pure[Long, Long](_ * 10) >>> countingFreeScanV2: broadcast with labelled record output
import graviton.core.scan.FS.*
import graviton.core.scan.Tensor
import kyo.Tag.given
import zio.Chunk
val scan =
counter[Chunk[Byte]].labelled["count"] &&&
byteCounter.labelled["bytes"]
val inputs = List(Chunk.fromArray("ab".getBytes()), Chunk.fromArray("c".getBytes()))
val out = scan.runChunk(inputs).map(Tensor.toTuple["count", "bytes", Long, Long]).toListZIO Integration
Both Scan and FreeScanV2 have toPipeline / toChannel helpers for running on ZIO Streams.
Capabilities (historical)
Earlier designs modeled “capabilities” via F[_]/G[_]. The current implementations (Scan and FreeScanV2) are concrete and are interpreted directly to runners like ZPipeline.
Laws & Properties
All combinators satisfy:
Category Laws
// Associativity
(f >>> g) >>> h ≡ f >>> (g >>> h)
// Identity
id >>> f ≡ f ≡ f >>> idArrow Laws
// arr composition
arr(f) >>> arr(g) ≡ arr(g ∘ f)
// first commutation
first(f) >>> arr(swap) ≡ arr(swap) >>> second(f)Choice Laws
// Distribution
(f ||| g) >>> h ≡ (f >>> h) ||| (g >>> h)Parallel Laws
// Product
(f +++ g) >>> (h +++ k) ≡ (f >>> h) +++ (g >>> k)State Laws
State is threaded linearly and must be treated as local to a scan instance: don’t share it between instances. When you need inspectable, explicit capability/state tracking for optimization, use FreeScanV2 / FreeArrow-style APIs; Scan stays runtime-direct.
Performance
Fusion
Free representation enables automatic fusion:
// Before optimization
val unfused =
scan1.dimap(f1)(g1) >>>
scan2.dimap(f2)(g2) >>>
scan3.dimap(f3)(g3)
// After fusion (interpreter optimizes)
// Single pass, no intermediate allocationsState Management
- State is mutable within scan step for performance
- Immutable across boundaries (composition)
- No boxing for primitive fields (Long, Int, etc.)
Memory
- Bounded: State size is compile-time known
- Zero-copy: Chunk-based scans avoid copying
- Streaming: Process arbitrarily large inputs
Advanced Patterns
- Stateful windows: use
Scan.foldwith a domain case class as the scan state. - Routing: use
|||(choice) forEither-based routing, or model richer routing withFreeArrow/FreeScanV2. - Metrics: wrap a scan with
map/dimapand record counters externally; internal “metered scan” helpers are still evolving.
See Also
- Transducer Algebra — The production pipeline engine built on top of Scan concepts
- Pipeline Explorer — Interactive transducer composition visualizer
- Schema & Types — type-level programming
- Ranges & Boundaries — Span operations
- Chunking Strategies — CDC algorithms
TIP
Scans are pure and deterministic — same inputs always produce same outputs. Use FreeScan for composition, interpret to ZPipeline for execution.
WARNING
State S is mutable within step, immutable across composition. Don't share state between scan instances!