Golang Stream Api

时间:2021-04-13 12:15:56   收藏:0   阅读:0
技术图片
package stream

import (
    "log"
    "reflect"
    "sort"
)

type (
    // a Stream is where one can drain data from
    Stream chan interface{}
    // buffer stream
    BufferStream struct {
        Stream
        Size int
    }
    // retains those element agreed with FilterFunc
    FilterFunc func(i interface{}) bool
    // used by sort
    LessFunc func(i, j interface{}) bool
    //
    KeyFunc func(i interface{}) interface{}
)

// instance an empty Stream
func NewStream() Stream {
    return make(chan interface{})
}

// init a Stream from a source
func (s Stream) From(source interface{}) Stream {
    v := reflect.ValueOf(source)
    switch k := v.Kind(); k {
    case reflect.Slice:
        go func() {
            defer close(s)
            for i := 0; i < v.Len(); i++ {
                s <- v.Index(i).Interface()
            }
        }()
    default:
        panic("got a non-slice kind source")
    }
    return s
}

func (s Stream) Retain(f FilterFunc) Stream {
    c := make(chan interface{})
    go func() {
        defer close(c)
        for i := range s {
            if f(i) {
                c <- i
            }
        }
    }()
    return c
}

func (s Stream) Sort(lessFunc LessFunc) Stream {
    cache := make([]interface{}, 0)
    for i := range s {
        cache = append(cache, i)
    }
    sort.Slice(cache, func(i, j int) bool {
        return lessFunc(cache[i], cache[j])
    })
    return NewStream().From(cache)
}

func (s Stream) Reverse() Stream {
    var items []interface{}
    for item := range s {
        items = append(items, item)
    }
    // reverse, official method
    for i := len(items)/2 - 1; i >= 0; i-- {
        opp := len(items) - 1 - i
        items[i], items[opp] = items[opp], items[i]
    }
    return NewStream().From(items)
}

// sink: print
func (s Stream) Print() {
    for i := range s {
        log.Println(i)
    }
}

// sink: size
func (s Stream) Size() int {
    count := 0
    for range s {
        count += 1
    }
    return count
}

// sink
func (s Stream) First(n int) Stream {
    c := make(chan interface{}, n)
    count := 0
    go func() {
        defer close(c)
        for i := range s {
            if count < n {
                c <- i
                count += 1
            }
        }
    }()
    return c
}

// sink
func (s Stream) FirstOne() (interface{}, bool) {
    c := s.First(1)
    cache := make([]interface{}, 0)
    for i := range c {
        cache = append(cache, i)
    }
    if len(cache) == 0 {
        return nil, false
    }
    return cache[0], true
}

// sink
func (s Stream) Sum(f KeyFunc) float64 {
    result := 0.0
    for i := range s {
        v := reflect.ValueOf(f(i))
        switch v.Kind() {
        case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
            result += float64(v.Int())
        case reflect.Float32, reflect.Float64:
            result += v.Float()
        }
    }
    return result
}
stream
技术图片
package stream

type (
    GroupedRecord struct {
        Key   interface{}
        Value []interface{}
    }

    AggregatedRecord struct {
        Key   interface{}
        Value interface{}
    }

    AggFunc func(elements []interface{}) interface{}

    GroupedStream chan GroupedRecord

    AggregatedGroupedStream chan AggregatedRecord
)

func (s Stream) GroupBy(f KeyFunc) GroupedStream {
    c := make(chan GroupedRecord)
    cache := make(map[interface{}][]interface{})
    for i := range s {
        key := f(i)
        cache[key] = append(cache[key], i)
    }
    go func() {
        defer close(c)
        for k, v := range cache {
            c <- GroupedRecord{
                Key:   k,
                Value: v,
            }
        }
    }()
    return c
}

func (gs GroupedStream) Agg(aggFunc AggFunc) AggregatedGroupedStream {
    c := make(chan AggregatedRecord)
    go func() {
        defer close(c)
        for gr := range gs {
            c <- AggregatedRecord{
                Key:   gr.Key,
                Value: aggFunc(gr.Value),
            }
        }
    }()
    return c
}

func (ags AggregatedGroupedStream) Gather() map[interface{}]interface{} {
    r := make(map[interface{}]interface{})
    for i := range ags {
        r[i.Key] = i.Value
    }
    return r
}
stream group
技术图片
package stream

import "log"

type (
    JoinedValue struct {
        Left  interface{}
        Right interface{}
    }

    JoinedRecord struct {
        Key   interface{}
        Value []JoinedValue
    }

    JoinedStream chan JoinedRecord

    JoinedFilterFunc func(left, right interface{}) bool

    JoinAggFunc func(left, right interface{}) interface{}
)

func Join(left, right Stream, leftBy, rightBy KeyFunc) JoinedStream {
    c := make(chan JoinedRecord)
    cache := make(map[interface{}][]JoinedValue)
    leftCache, rightCache := make([]interface{}, 0), make([]interface{}, 0)
    for i := range left {
        leftCache = append(leftCache, i)
    }
    for j := range right {
        rightCache = append(rightCache, j)
    }
    for _, i := range leftCache {
        for _, j := range rightCache {
            keyLeft, keyRight := leftBy(i), rightBy(j)
            if keyLeft == keyRight {
                cache[keyLeft] = append(cache[keyLeft], JoinedValue{
                    Left:  i,
                    Right: j,
                })
            }
        }
    }
    go func() {
        defer close(c)
        for k, v := range cache {
            c <- JoinedRecord{
                Key:   k,
                Value: v,
            }
        }
    }()
    return c
}

func (js JoinedStream) Filter(f JoinedFilterFunc) JoinedStream {
    c := make(chan JoinedRecord)
    go func() {
        defer close(c)
        for jr := range js {
            cache := make([]JoinedValue, 0)
            for _, jv := range jr.Value {
                if f(jv.Left, jv.Right) {
                    cache = append(cache, jv)
                }
            }
            if len(cache) != 0 {
                c <- JoinedRecord{
                    Key:   jr.Key,
                    Value: cache,
                }
            }
        }
    }()
    return c
}

// sink
func (js JoinedStream) Print() {
    for i := range js {
        log.Println(i)
    }
}

// convert a stream of (key,[]JoinValue) to a stream of (key, []interface)
// a.k.a a JoinedStream -> a GroupedStream
func (js JoinedStream) Fold(f JoinAggFunc) GroupedStream {
    c := make(chan GroupedRecord)
    go func() {
        defer close(c)
        for jr := range js {
            cache := make([]interface{}, 0)
            for _, jv := range jr.Value {
                cache = append(cache, f(jv.Left, jv.Right))
            }
            c <- GroupedRecord{
                Key:   jr.Key,
                Value: cache,
            }
        }
    }()
    return c
}
stream join

 

评论(0
© 2014 mamicode.com 版权所有 京ICP备13008772号-2  联系我们:gaon5@hotmail.com
迷上了代码!