Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add strip function #93

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Config struct {
Tables Tables `json:"tables" yaml:"tables"`
Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"`
Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"`
Filters []Computed `json:"filters" yaml:"filters" env:"FILTERS"`
K8s *K8s `json:"k8s,omitempty" yaml:"k8s" env:"K8S"`
}

Expand Down
58 changes: 58 additions & 0 deletions internal/encoding/block/strip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file

package block

import (
"github.com/kelindar/talaria/internal/column/computed"
"github.com/kelindar/talaria/internal/encoding/typeof"
)

// Strip runs the computed Values and overwrites/appends them to the set.
func Strip(filter *typeof.Schema, computed ...computed.Computed) applyFunc {
return func(r Row) (Row, error) {
for _, c := range computed {
v, err := c.Value(r.Values)
if err != nil || v == nil {
continue
}
if v.(bool) == true {
if r.Values["bch"] == "test" {
out := NewRow(nil, 0)
return out, nil
}
}
}

return r, nil
// // Create a new output row and copy the column values from the input
// schema := make(typeof.Schema, len(r.Schema))
// out := NewRow(schema, len(r.Values)+len(computed))
// for k, v := range r.Values {
// if filter == nil || filter.HasConvertible(k, r.Schema[k]) {
// out.Values[k] = v
// out.Schema[k] = r.Schema[k]
// }
// }

// // Compute the Values
// for _, c := range computed {
// if filter != nil && !filter.Contains(c.Name(), c.Type()) {
// continue // Skip computed Values which aren't part of the filter
// }

// // Compute the column
// // v, err := c.Value(r.Values)
// // if err != nil || v == nil {
// // continue
// // }

// // If the column with the same name is already present in the input row,
// // we need to overwrite this column and set a new type.
// // out.Schema[c.Name()] = c.Type()
// delete(out.Schema, "")
// delete(out.Values, "")
// }
// return out, nil
}
}
3 changes: 3 additions & 0 deletions internal/encoding/merge/orc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (

// ToOrc merges multiple blocks together and outputs a key and merged orc data
func ToOrc(blocks []block.Block, schema typeof.Schema) ([]byte, error) {
if len(schema) == 0 {
return nil, nil
}
orcSchema, err := orc.SchemaFor(schema)
if err != nil {
return nil, errors.Internal("merge: error generating orc schema", err)
Expand Down
13 changes: 13 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@ func New(conf config.Func, monitor monitor.Monitor, tables ...table.Table) *Serv
server.computed = append(server.computed, col)
}

// Load filter columns
for _, c := range conf().Filters {
col, err := computed.NewComputed(c.Name, c.FuncName, c.Type, c.Func, monitor)
if err != nil {
monitor.Error(err)
continue
}

monitor.Info("server: loaded filter %v of type %v", c.Name, c.Type)
server.filter = append(server.filter, col)
}

// Register the gRPC servers
talaria.RegisterIngressServer(server.server, server)
talaria.RegisterQueryServer(server.server, server)
Expand All @@ -86,6 +98,7 @@ type Server struct {
cancel context.CancelFunc // The cancellation function for the server
tables map[string]table.Table // The list of tables
computed []computed.Computed // The set of computed columns
filter []computed.Computed // The set of filters
s3sqs *s3sqs.Ingress // The S3SQS Ingress (optional)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/server/server_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
}

// Functions to be applied
funcs := []applyFunc{block.Transform(filter, s.computed...)}
funcs := []applyFunc{block.Strip(filter, s.filter...), block.Transform(filter, s.computed...)}

// If table supports streaming, add publishing function
if streamer, ok := t.(storage.Streamer); ok {
Expand Down