Skip to content

DeltaFIFO

Overview

DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer, and the consumer is whatever calls the Pop() method.

  1. The actual data is stored in items in the for of map[string]Deltas.
  2. The order is stored in queue as []string.
type DeltaFIFO struct {
    lock sync.RWMutex
    cond sync.Cond
    items map[string]Deltas
    queue []string
    populated bool
    initialPopulationCount int
    keyFunc KeyFunc
    knownObjects KeyListerGetter
    closed bool
    emitDeltaTypeReplaced bool
}
type Deltas []Delta
type DeltaType string
type Delta struct {
    Type   DeltaType
    Object interface{}
}
  • Difference between DeltaFIFO and FIFO (Need to summarize the long explanation later)

    1. One is that the accumulator associated with a given object's key is not that object but rather a Deltas, which is a slice of Delta values for that object. Applying an object to a Deltas means to append a Delta except when the potentially appended Delta is a Deleted and the Deltas already ends with a Deleted. In that case the Deltas does not grow, although the terminal Deleted will be replaced by the new Deleted if the older Deleted's object is a DeletedFinalStateUnknown.

    2. The other difference is that DeltaFIFO has two additional ways that an object can be applied to an accumulator: Replaced and Sync. If EmitDeltaTypeReplaced is not set to true, Sync will be used in replace events for backwards compatibility. Sync is used for periodic resync events.

    3. DeltaFIFO solves this use case
    4. You want to process every object change (delta) at most once.
    5. When you process an object, you want to see everything that's happened to it since you last processed it.
    6. You want to process the deletion of some of the objects.
    7. You might want to periodically reprocess objects.

Usage: how DeltaFIFO is used in informer

  1. Create Indexer
  2. Create DeltaFIFO
  3. Call fifo.Add(xx) or fifo.Update(xx) or fifo.Delete(xx)
  4. Call fifo.Pop(process) with process function type PopProcessFunc func(interface{}) error, which converts the object into Deltas and process deltas with processDeltas.
    func process(obj interface{}) error { // type PopProcessFunc func(interface{}) error
        if deltas, ok := obj.(cache.Deltas); ok {
            return processDeltas(deltas)
        }
        return errors.New("object given as Process argument is not Deltas")
    }
    
  5. processDeltas updates/add/delete indexer.

For more details, you can check informer