Skip to content

Commit 0618d98

Browse files
committed
Add strip demo.
1 parent c6fb73c commit 0618d98

File tree

5 files changed

+76
-1
lines changed

5 files changed

+76
-1
lines changed

internal/config/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Config struct {
3737
Tables Tables `json:"tables" yaml:"tables"`
3838
Statsd *StatsD `json:"statsd,omitempty" yaml:"statsd" env:"STATSD"`
3939
Computed []Computed `json:"computed" yaml:"computed" env:"COMPUTED"`
40+
Filters []Computed `json:"filters" yaml:"filters" env:"FILTERS"`
4041
K8s *K8s `json:"k8s,omitempty" yaml:"k8s" env:"K8S"`
4142
}
4243

internal/encoding/block/strip.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
2+
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file
3+
4+
package block
5+
6+
import (
7+
"github.com/kelindar/talaria/internal/column/computed"
8+
"github.com/kelindar/talaria/internal/encoding/typeof"
9+
)
10+
11+
// Strip runs the computed Values and overwrites/appends them to the set.
12+
func Strip(filter *typeof.Schema, computed ...computed.Computed) applyFunc {
13+
return func(r Row) (Row, error) {
14+
for _, c := range computed {
15+
v, err := c.Value(r.Values)
16+
if err != nil || v == nil {
17+
continue
18+
}
19+
if v.(bool) == true {
20+
if r.Values["bch"] == "test" {
21+
out := NewRow(nil, 0)
22+
return out, nil
23+
}
24+
}
25+
}
26+
27+
return r, nil
28+
// // Create a new output row and copy the column values from the input
29+
// schema := make(typeof.Schema, len(r.Schema))
30+
// out := NewRow(schema, len(r.Values)+len(computed))
31+
// for k, v := range r.Values {
32+
// if filter == nil || filter.HasConvertible(k, r.Schema[k]) {
33+
// out.Values[k] = v
34+
// out.Schema[k] = r.Schema[k]
35+
// }
36+
// }
37+
38+
// // Compute the Values
39+
// for _, c := range computed {
40+
// if filter != nil && !filter.Contains(c.Name(), c.Type()) {
41+
// continue // Skip computed Values which aren't part of the filter
42+
// }
43+
44+
// // Compute the column
45+
// // v, err := c.Value(r.Values)
46+
// // if err != nil || v == nil {
47+
// // continue
48+
// // }
49+
50+
// // If the column with the same name is already present in the input row,
51+
// // we need to overwrite this column and set a new type.
52+
// // out.Schema[c.Name()] = c.Type()
53+
// delete(out.Schema, "")
54+
// delete(out.Values, "")
55+
// }
56+
// return out, nil
57+
}
58+
}

internal/encoding/merge/orc.go

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ import (
1313

1414
// ToOrc merges multiple blocks together and outputs a key and merged orc data
1515
func ToOrc(blocks []block.Block, schema typeof.Schema) ([]byte, error) {
16+
if len(schema) == 0 {
17+
return nil, nil
18+
}
1619
orcSchema, err := orc.SchemaFor(schema)
1720
if err != nil {
1821
return nil, errors.Internal("merge: error generating orc schema", err)

internal/server/server.go

+13
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,18 @@ func New(conf config.Func, monitor monitor.Monitor, tables ...table.Table) *Serv
6666
server.computed = append(server.computed, col)
6767
}
6868

69+
// Load filter columns
70+
for _, c := range conf().Filters {
71+
col, err := computed.NewComputed(c.Name, c.FuncName, c.Type, c.Func, monitor)
72+
if err != nil {
73+
monitor.Error(err)
74+
continue
75+
}
76+
77+
monitor.Info("server: loaded filter %v of type %v", c.Name, c.Type)
78+
server.filter = append(server.filter, col)
79+
}
80+
6981
// Register the gRPC servers
7082
talaria.RegisterIngressServer(server.server, server)
7183
talaria.RegisterQueryServer(server.server, server)
@@ -86,6 +98,7 @@ type Server struct {
8698
cancel context.CancelFunc // The cancellation function for the server
8799
tables map[string]table.Table // The list of tables
88100
computed []computed.Computed // The set of computed columns
101+
filter []computed.Computed // The set of filters
89102
s3sqs *s3sqs.Ingress // The S3SQS Ingress (optional)
90103
}
91104

internal/server/server_ingest.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (s *Server) Ingest(ctx context.Context, request *talaria.IngestRequest) (*t
3939
}
4040

4141
// Functions to be applied
42-
funcs := []applyFunc{block.Transform(filter, s.computed...)}
42+
funcs := []applyFunc{block.Strip(filter, s.filter...), block.Transform(filter, s.computed...)}
4343

4444
// If table supports streaming, add publishing function
4545
if streamer, ok := t.(storage.Streamer); ok {

0 commit comments

Comments
 (0)