diff --git a/backend/backend-core/Dockerfile b/backend/backend-core/Dockerfile index 94e883b..eaca90e 100644 --- a/backend/backend-core/Dockerfile +++ b/backend/backend-core/Dockerfile @@ -13,8 +13,6 @@ WORKDIR /app/backend-core # Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed RUN go mod tidy RUN go mod download -# Run unit tests -RUN go test ./... # Build the Go app RUN CGO_ENABLED=0 GOOS=linux go build -o RIoT-backend-core src/main.go # Run the executable diff --git a/backend/backend-core/src/api/graphql/gsc/gsc.go b/backend/backend-core/src/api/graphql/gsc/gsc.go index b95ed86..6cd45fa 100644 --- a/backend/backend-core/src/api/graphql/gsc/gsc.go +++ b/backend/backend-core/src/api/graphql/gsc/gsc.go @@ -60,6 +60,7 @@ type MutationResolver interface { CreateSDInstanceGroup(ctx context.Context, input graphQLModel.SDInstanceGroupInput) (graphQLModel.SDInstanceGroup, error) UpdateSDInstanceGroup(ctx context.Context, id uint32, input graphQLModel.SDInstanceGroupInput) (graphQLModel.SDInstanceGroup, error) DeleteSDInstanceGroup(ctx context.Context, id uint32) (bool, error) + StatisticsMutate(ctx context.Context, inputData graphQLModel.InputData) (bool, error) } type QueryResolver interface { SdType(ctx context.Context, id uint32) (graphQLModel.SDType, error) @@ -70,6 +71,8 @@ type QueryResolver interface { KpiFulfillmentCheckResults(ctx context.Context) ([]graphQLModel.KPIFulfillmentCheckResult, error) SdInstanceGroup(ctx context.Context, id uint32) (graphQLModel.SDInstanceGroup, error) SdInstanceGroups(ctx context.Context) ([]graphQLModel.SDInstanceGroup, error) + StatisticsQuerySimpleSensors(ctx context.Context, request *graphQLModel.StatisticsInput, sensors graphQLModel.SimpleSensors) ([]graphQLModel.OutputData, error) + StatisticsQuerySensorsWithFields(ctx context.Context, request *graphQLModel.StatisticsInput, sensors graphQLModel.SensorsWithFields) ([]graphQLModel.OutputData, error) } type SubscriptionResolver interface { OnSDInstanceRegistered(ctx context.Context) (<-chan graphQLModel.SDInstance, error) @@ -101,12 +104,17 @@ func (e *executableSchema) Exec(ctx context.Context) graphql.ResponseHandler { rc := graphql.GetOperationContext(ctx) ec := executionContext{rc, e, 0, 0, make(chan graphql.DeferredResult)} inputUnmarshalMap := graphql.BuildUnmarshalerMap( + ec.unmarshalInputInputData, ec.unmarshalInputKPIDefinitionInput, ec.unmarshalInputKPINodeInput, ec.unmarshalInputSDInstanceGroupInput, ec.unmarshalInputSDInstanceUpdateInput, ec.unmarshalInputSDParameterInput, ec.unmarshalInputSDTypeInput, + ec.unmarshalInputSensorField, + ec.unmarshalInputSensorsWithFields, + ec.unmarshalInputSimpleSensors, + ec.unmarshalInputStatisticsInput, ) first := true @@ -428,6 +436,90 @@ input SDInstanceGroupInput { sdInstanceIDs: [ID!]! } +# ----- Statistics + +scalar Date + +input SimpleSensors { + sensors: [String!]! +} + +input SensorsWithFields { + sensors: [SensorField!]! +} + +input SensorField { + key: String! + values: [String!]! +} + +enum StatisticsOperation { + mean + min + max + first + sum + last + none + count + integral + median + mode + quantile + reduce + skew + spread + stddev + timeweightedavg +} + +""" +Data used for querying the selected bucket +""" +input StatisticsInput { + """ + Start of the querying window + """ + from: Date + """ + End of the querying window + """ + to: Date + """ + Amount of minutes to aggregate by + For example if the queried range has 1 hour and aggregateMinutes is set to 10 the aggregation will result in 6 points + """ + aggregateMinutes: Int + """ + Timezone override default UTC. + For more details why and how this affects queries see: https://www.influxdata.com/blog/time-zones-in-flux/. + In most cases you can ignore this and some edge aggregations can be influenced. + If you need a precise result or the aggregation uses high amount of minutes provide the target time zone. + """ + timezone: String + """ + Aggregation operator to use, if needed + """ + operation: StatisticsOperation +} + + +scalar JSON + +type OutputData { + time: Date! + deviceId: String! + deviceType: String + data: JSON! +} + +input InputData { + time: Date! + deviceId: String! + deviceType: String + data: JSON! +} + # ----- Queries, mutations and subscriptions ----- type Query { @@ -439,6 +531,8 @@ type Query { kpiFulfillmentCheckResults: [KPIFulfillmentCheckResult!]! sdInstanceGroup(id: ID!): SDInstanceGroup! sdInstanceGroups: [SDInstanceGroup!]! + statisticsQuerySimpleSensors(request: StatisticsInput sensors: SimpleSensors!): [OutputData!]! + statisticsQuerySensorsWithFields(request: StatisticsInput sensors: SensorsWithFields!): [OutputData!]! } type Mutation { @@ -451,6 +545,7 @@ type Mutation { createSDInstanceGroup(input: SDInstanceGroupInput!): SDInstanceGroup! updateSDInstanceGroup(id: ID!, input: SDInstanceGroupInput!): SDInstanceGroup! deleteSDInstanceGroup(id: ID!): Boolean! + statisticsMutate(inputData: InputData!): Boolean! } type Subscription { @@ -555,6 +650,21 @@ func (ec *executionContext) field_Mutation_deleteSDType_args(ctx context.Context return args, nil } +func (ec *executionContext) field_Mutation_statisticsMutate_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 graphQLModel.InputData + if tmp, ok := rawArgs["inputData"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("inputData")) + arg0, err = ec.unmarshalNInputData2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐInputData(ctx, tmp) + if err != nil { + return nil, err + } + } + args["inputData"] = arg0 + return args, nil +} + func (ec *executionContext) field_Mutation_updateKPIDefinition_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -687,6 +797,54 @@ func (ec *executionContext) field_Query_sdType_args(ctx context.Context, rawArgs return args, nil } +func (ec *executionContext) field_Query_statisticsQuerySensorsWithFields_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 *graphQLModel.StatisticsInput + if tmp, ok := rawArgs["request"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("request")) + arg0, err = ec.unmarshalOStatisticsInput2ᚖgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐStatisticsInput(ctx, tmp) + if err != nil { + return nil, err + } + } + args["request"] = arg0 + var arg1 graphQLModel.SensorsWithFields + if tmp, ok := rawArgs["sensors"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sensors")) + arg1, err = ec.unmarshalNSensorsWithFields2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSensorsWithFields(ctx, tmp) + if err != nil { + return nil, err + } + } + args["sensors"] = arg1 + return args, nil +} + +func (ec *executionContext) field_Query_statisticsQuerySimpleSensors_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { + var err error + args := map[string]interface{}{} + var arg0 *graphQLModel.StatisticsInput + if tmp, ok := rawArgs["request"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("request")) + arg0, err = ec.unmarshalOStatisticsInput2ᚖgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐStatisticsInput(ctx, tmp) + if err != nil { + return nil, err + } + } + args["request"] = arg0 + var arg1 graphQLModel.SimpleSensors + if tmp, ok := rawArgs["sensors"]; ok { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sensors")) + arg1, err = ec.unmarshalNSimpleSensors2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSimpleSensors(ctx, tmp) + if err != nil { + return nil, err + } + } + args["sensors"] = arg1 + return args, nil +} + func (ec *executionContext) field___Type_enumValues_args(ctx context.Context, rawArgs map[string]interface{}) (map[string]interface{}, error) { var err error args := map[string]interface{}{} @@ -2214,6 +2372,61 @@ func (ec *executionContext) fieldContext_Mutation_deleteSDInstanceGroup(ctx cont return fc, nil } +func (ec *executionContext) _Mutation_statisticsMutate(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Mutation_statisticsMutate(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Mutation().StatisticsMutate(rctx, fc.Args["inputData"].(graphQLModel.InputData)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(bool) + fc.Result = res + return ec.marshalNBoolean2bool(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Mutation_statisticsMutate(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Mutation", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Boolean does not have child fields") + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Mutation_statisticsMutate_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + func (ec *executionContext) _NumericEQAtomKPINode_id(ctx context.Context, field graphql.CollectedField, obj *graphQLModel.NumericEQAtomKPINode) (ret graphql.Marshaler) { fc, err := ec.fieldContext_NumericEQAtomKPINode_id(ctx, field) if err != nil { @@ -3519,6 +3732,179 @@ func (ec *executionContext) fieldContext_NumericLTAtomKPINode_numericReferenceVa return fc, nil } +func (ec *executionContext) _OutputData_time(ctx context.Context, field graphql.CollectedField, obj *graphQLModel.OutputData) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_OutputData_time(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Time, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNDate2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_OutputData_time(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "OutputData", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type Date does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _OutputData_deviceId(ctx context.Context, field graphql.CollectedField, obj *graphQLModel.OutputData) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_OutputData_deviceId(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.DeviceID, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNString2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_OutputData_deviceId(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "OutputData", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _OutputData_deviceType(ctx context.Context, field graphql.CollectedField, obj *graphQLModel.OutputData) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_OutputData_deviceType(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.DeviceType, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*string) + fc.Result = res + return ec.marshalOString2ᚖstring(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_OutputData_deviceType(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "OutputData", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type String does not have child fields") + }, + } + return fc, nil +} + +func (ec *executionContext) _OutputData_data(ctx context.Context, field graphql.CollectedField, obj *graphQLModel.OutputData) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_OutputData_data(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return obj.Data, nil + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.(string) + fc.Result = res + return ec.marshalNJSON2string(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_OutputData_data(_ context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "OutputData", + Field: field, + IsMethod: false, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + return nil, errors.New("field of type JSON does not have child fields") + }, + } + return fc, nil +} + func (ec *executionContext) _Query_sdType(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { fc, err := ec.fieldContext_Query_sdType(ctx, field) if err != nil { @@ -3988,8 +4374,8 @@ func (ec *executionContext) fieldContext_Query_sdInstanceGroups(_ context.Contex return fc, nil } -func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { - fc, err := ec.fieldContext_Query___type(ctx, field) +func (ec *executionContext) _Query_statisticsQuerySimpleSensors(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_statisticsQuerySimpleSensors(ctx, field) if err != nil { return graphql.Null } @@ -4002,37 +4388,167 @@ func (ec *executionContext) _Query___type(ctx context.Context, field graphql.Col }() resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { ctx = rctx // use context from middleware stack in children - return ec.introspectType(fc.Args["name"].(string)) + return ec.resolvers.Query().StatisticsQuerySimpleSensors(rctx, fc.Args["request"].(*graphQLModel.StatisticsInput), fc.Args["sensors"].(graphQLModel.SimpleSensors)) }) if err != nil { ec.Error(ctx, err) return graphql.Null } if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } return graphql.Null } - res := resTmp.(*introspection.Type) + res := resTmp.([]graphQLModel.OutputData) fc.Result = res - return ec.marshalO__Type2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋgraphqlᚋintrospectionᚐType(ctx, field.Selections, res) + return ec.marshalNOutputData2ᚕgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐOutputDataᚄ(ctx, field.Selections, res) } -func (ec *executionContext) fieldContext_Query___type(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { +func (ec *executionContext) fieldContext_Query_statisticsQuerySimpleSensors(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { fc = &graphql.FieldContext{ Object: "Query", Field: field, IsMethod: true, - IsResolver: false, + IsResolver: true, Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { switch field.Name { - case "kind": - return ec.fieldContext___Type_kind(ctx, field) - case "name": - return ec.fieldContext___Type_name(ctx, field) - case "description": - return ec.fieldContext___Type_description(ctx, field) - case "fields": - return ec.fieldContext___Type_fields(ctx, field) - case "interfaces": + case "time": + return ec.fieldContext_OutputData_time(ctx, field) + case "deviceId": + return ec.fieldContext_OutputData_deviceId(ctx, field) + case "deviceType": + return ec.fieldContext_OutputData_deviceType(ctx, field) + case "data": + return ec.fieldContext_OutputData_data(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type OutputData", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_statisticsQuerySimpleSensors_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + +func (ec *executionContext) _Query_statisticsQuerySensorsWithFields(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query_statisticsQuerySensorsWithFields(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.resolvers.Query().StatisticsQuerySensorsWithFields(rctx, fc.Args["request"].(*graphQLModel.StatisticsInput), fc.Args["sensors"].(graphQLModel.SensorsWithFields)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + if !graphql.HasFieldError(ctx, fc) { + ec.Errorf(ctx, "must not be null") + } + return graphql.Null + } + res := resTmp.([]graphQLModel.OutputData) + fc.Result = res + return ec.marshalNOutputData2ᚕgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐOutputDataᚄ(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query_statisticsQuerySensorsWithFields(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: true, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "time": + return ec.fieldContext_OutputData_time(ctx, field) + case "deviceId": + return ec.fieldContext_OutputData_deviceId(ctx, field) + case "deviceType": + return ec.fieldContext_OutputData_deviceType(ctx, field) + case "data": + return ec.fieldContext_OutputData_data(ctx, field) + } + return nil, fmt.Errorf("no field named %q was found under type OutputData", field.Name) + }, + } + defer func() { + if r := recover(); r != nil { + err = ec.Recover(ctx, r) + ec.Error(ctx, err) + } + }() + ctx = graphql.WithFieldContext(ctx, fc) + if fc.Args, err = ec.field_Query_statisticsQuerySensorsWithFields_args(ctx, field.ArgumentMap(ec.Variables)); err != nil { + ec.Error(ctx, err) + return fc, err + } + return fc, nil +} + +func (ec *executionContext) _Query___type(ctx context.Context, field graphql.CollectedField) (ret graphql.Marshaler) { + fc, err := ec.fieldContext_Query___type(ctx, field) + if err != nil { + return graphql.Null + } + ctx = graphql.WithFieldContext(ctx, fc) + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = graphql.Null + } + }() + resTmp, err := ec.ResolverMiddleware(ctx, func(rctx context.Context) (interface{}, error) { + ctx = rctx // use context from middleware stack in children + return ec.introspectType(fc.Args["name"].(string)) + }) + if err != nil { + ec.Error(ctx, err) + return graphql.Null + } + if resTmp == nil { + return graphql.Null + } + res := resTmp.(*introspection.Type) + fc.Result = res + return ec.marshalO__Type2ᚖgithubᚗcomᚋ99designsᚋgqlgenᚋgraphqlᚋintrospectionᚐType(ctx, field.Selections, res) +} + +func (ec *executionContext) fieldContext_Query___type(ctx context.Context, field graphql.CollectedField) (fc *graphql.FieldContext, err error) { + fc = &graphql.FieldContext{ + Object: "Query", + Field: field, + IsMethod: true, + IsResolver: false, + Child: func(ctx context.Context, field graphql.CollectedField) (*graphql.FieldContext, error) { + switch field.Name { + case "kind": + return ec.fieldContext___Type_kind(ctx, field) + case "name": + return ec.fieldContext___Type_name(ctx, field) + case "description": + return ec.fieldContext___Type_description(ctx, field) + case "fields": + return ec.fieldContext___Type_fields(ctx, field) + case "interfaces": return ec.fieldContext___Type_interfaces(ctx, field) case "possibleTypes": return ec.fieldContext___Type_possibleTypes(ctx, field) @@ -6915,6 +7431,54 @@ func (ec *executionContext) fieldContext___Type_specifiedByURL(_ context.Context // region **************************** input.gotpl ***************************** +func (ec *executionContext) unmarshalInputInputData(ctx context.Context, obj interface{}) (graphQLModel.InputData, error) { + var it graphQLModel.InputData + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + fieldsInOrder := [...]string{"time", "deviceId", "deviceType", "data"} + for _, k := range fieldsInOrder { + v, ok := asMap[k] + if !ok { + continue + } + switch k { + case "time": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("time")) + data, err := ec.unmarshalNDate2string(ctx, v) + if err != nil { + return it, err + } + it.Time = data + case "deviceId": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("deviceId")) + data, err := ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + it.DeviceID = data + case "deviceType": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("deviceType")) + data, err := ec.unmarshalOString2ᚖstring(ctx, v) + if err != nil { + return it, err + } + it.DeviceType = data + case "data": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("data")) + data, err := ec.unmarshalNJSON2string(ctx, v) + if err != nil { + return it, err + } + it.Data = data + } + } + + return it, nil +} + func (ec *executionContext) unmarshalInputKPIDefinitionInput(ctx context.Context, obj interface{}) (graphQLModel.KPIDefinitionInput, error) { var it graphQLModel.KPIDefinitionInput asMap := map[string]interface{}{} @@ -7196,6 +7760,149 @@ func (ec *executionContext) unmarshalInputSDTypeInput(ctx context.Context, obj i return it, nil } +func (ec *executionContext) unmarshalInputSensorField(ctx context.Context, obj interface{}) (graphQLModel.SensorField, error) { + var it graphQLModel.SensorField + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + fieldsInOrder := [...]string{"key", "values"} + for _, k := range fieldsInOrder { + v, ok := asMap[k] + if !ok { + continue + } + switch k { + case "key": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("key")) + data, err := ec.unmarshalNString2string(ctx, v) + if err != nil { + return it, err + } + it.Key = data + case "values": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("values")) + data, err := ec.unmarshalNString2ᚕstringᚄ(ctx, v) + if err != nil { + return it, err + } + it.Values = data + } + } + + return it, nil +} + +func (ec *executionContext) unmarshalInputSensorsWithFields(ctx context.Context, obj interface{}) (graphQLModel.SensorsWithFields, error) { + var it graphQLModel.SensorsWithFields + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + fieldsInOrder := [...]string{"sensors"} + for _, k := range fieldsInOrder { + v, ok := asMap[k] + if !ok { + continue + } + switch k { + case "sensors": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sensors")) + data, err := ec.unmarshalNSensorField2ᚕgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSensorFieldᚄ(ctx, v) + if err != nil { + return it, err + } + it.Sensors = data + } + } + + return it, nil +} + +func (ec *executionContext) unmarshalInputSimpleSensors(ctx context.Context, obj interface{}) (graphQLModel.SimpleSensors, error) { + var it graphQLModel.SimpleSensors + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + fieldsInOrder := [...]string{"sensors"} + for _, k := range fieldsInOrder { + v, ok := asMap[k] + if !ok { + continue + } + switch k { + case "sensors": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("sensors")) + data, err := ec.unmarshalNString2ᚕstringᚄ(ctx, v) + if err != nil { + return it, err + } + it.Sensors = data + } + } + + return it, nil +} + +func (ec *executionContext) unmarshalInputStatisticsInput(ctx context.Context, obj interface{}) (graphQLModel.StatisticsInput, error) { + var it graphQLModel.StatisticsInput + asMap := map[string]interface{}{} + for k, v := range obj.(map[string]interface{}) { + asMap[k] = v + } + + fieldsInOrder := [...]string{"from", "to", "aggregateMinutes", "timezone", "operation"} + for _, k := range fieldsInOrder { + v, ok := asMap[k] + if !ok { + continue + } + switch k { + case "from": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("from")) + data, err := ec.unmarshalODate2ᚖstring(ctx, v) + if err != nil { + return it, err + } + it.From = data + case "to": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("to")) + data, err := ec.unmarshalODate2ᚖstring(ctx, v) + if err != nil { + return it, err + } + it.To = data + case "aggregateMinutes": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("aggregateMinutes")) + data, err := ec.unmarshalOInt2ᚖint(ctx, v) + if err != nil { + return it, err + } + it.AggregateMinutes = data + case "timezone": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("timezone")) + data, err := ec.unmarshalOString2ᚖstring(ctx, v) + if err != nil { + return it, err + } + it.Timezone = data + case "operation": + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithField("operation")) + data, err := ec.unmarshalOStatisticsOperation2ᚖgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐStatisticsOperation(ctx, v) + if err != nil { + return it, err + } + it.Operation = data + } + } + + return it, nil +} + // endregion **************************** input.gotpl ***************************** // region ************************** interface.gotpl *************************** @@ -7683,6 +8390,13 @@ func (ec *executionContext) _Mutation(ctx context.Context, sel ast.SelectionSet) if out.Values[i] == graphql.Null { out.Invalids++ } + case "statisticsMutate": + out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { + return ec._Mutation_statisticsMutate(ctx, field) + }) + if out.Values[i] == graphql.Null { + out.Invalids++ + } default: panic("unknown field " + strconv.Quote(field.Name)) } @@ -8011,6 +8725,57 @@ func (ec *executionContext) _NumericLTAtomKPINode(ctx context.Context, sel ast.S return out } +var outputDataImplementors = []string{"OutputData"} + +func (ec *executionContext) _OutputData(ctx context.Context, sel ast.SelectionSet, obj *graphQLModel.OutputData) graphql.Marshaler { + fields := graphql.CollectFields(ec.OperationContext, sel, outputDataImplementors) + + out := graphql.NewFieldSet(fields) + deferred := make(map[string]*graphql.FieldSet) + for i, field := range fields { + switch field.Name { + case "__typename": + out.Values[i] = graphql.MarshalString("OutputData") + case "time": + out.Values[i] = ec._OutputData_time(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "deviceId": + out.Values[i] = ec._OutputData_deviceId(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + case "deviceType": + out.Values[i] = ec._OutputData_deviceType(ctx, field, obj) + case "data": + out.Values[i] = ec._OutputData_data(ctx, field, obj) + if out.Values[i] == graphql.Null { + out.Invalids++ + } + default: + panic("unknown field " + strconv.Quote(field.Name)) + } + } + out.Dispatch(ctx) + if out.Invalids > 0 { + return graphql.Null + } + + atomic.AddInt32(&ec.deferred, int32(len(deferred))) + + for label, dfs := range deferred { + ec.processDeferredGroup(graphql.DeferredGroup{ + Label: label, + Path: graphql.GetPath(ctx), + FieldSet: dfs, + Context: ctx, + }) + } + + return out +} + var queryImplementors = []string{"Query"} func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) graphql.Marshaler { @@ -8205,6 +8970,50 @@ func (ec *executionContext) _Query(ctx context.Context, sel ast.SelectionSet) gr func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "statisticsQuerySimpleSensors": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_statisticsQuerySimpleSensors(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) + case "statisticsQuerySensorsWithFields": + field := field + + innerFunc := func(ctx context.Context, fs *graphql.FieldSet) (res graphql.Marshaler) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + } + }() + res = ec._Query_statisticsQuerySensorsWithFields(ctx, field) + if res == graphql.Null { + atomic.AddUint32(&fs.Invalids, 1) + } + return res + } + + rrm := func(ctx context.Context) graphql.Marshaler { + return ec.OperationContext.RootResolverMiddleware(ctx, + func(ctx context.Context) graphql.Marshaler { return innerFunc(ctx, out) }) + } + out.Concurrently(i, func(ctx context.Context) graphql.Marshaler { return rrm(innerCtx) }) case "__type": out.Values[i] = ec.OperationContext.RootResolverMiddleware(innerCtx, func(ctx context.Context) (res graphql.Marshaler) { @@ -8867,6 +9676,21 @@ func (ec *executionContext) marshalNBoolean2bool(ctx context.Context, sel ast.Se return res } +func (ec *executionContext) unmarshalNDate2string(ctx context.Context, v interface{}) (string, error) { + res, err := graphql.UnmarshalString(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNDate2string(ctx context.Context, sel ast.SelectionSet, v string) graphql.Marshaler { + res := graphql.MarshalString(v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) unmarshalNFloat2float64(ctx context.Context, v interface{}) (float64, error) { res, err := graphql.UnmarshalFloatContext(ctx, v) return res, graphql.ErrorOnPath(ctx, err) @@ -8929,6 +9753,26 @@ func (ec *executionContext) marshalNID2ᚕuint32ᚄ(ctx context.Context, sel ast return ret } +func (ec *executionContext) unmarshalNInputData2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐInputData(ctx context.Context, v interface{}) (graphQLModel.InputData, error) { + res, err := ec.unmarshalInputInputData(ctx, v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) unmarshalNJSON2string(ctx context.Context, v interface{}) (string, error) { + res, err := graphql.UnmarshalString(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalNJSON2string(ctx context.Context, sel ast.SelectionSet, v string) graphql.Marshaler { + res := graphql.MarshalString(v) + if res == graphql.Null { + if !graphql.HasFieldError(ctx, graphql.GetFieldContext(ctx)) { + ec.Errorf(ctx, "the requested element is null which the schema does not allow") + } + } + return res +} + func (ec *executionContext) marshalNKPIDefinition2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐKPIDefinition(ctx context.Context, sel ast.SelectionSet, v graphQLModel.KPIDefinition) graphql.Marshaler { return ec._KPIDefinition(ctx, sel, &v) } @@ -9130,6 +9974,54 @@ func (ec *executionContext) marshalNLogicalOperationType2githubᚗcomᚋMichalBu return v } +func (ec *executionContext) marshalNOutputData2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐOutputData(ctx context.Context, sel ast.SelectionSet, v graphQLModel.OutputData) graphql.Marshaler { + return ec._OutputData(ctx, sel, &v) +} + +func (ec *executionContext) marshalNOutputData2ᚕgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐOutputDataᚄ(ctx context.Context, sel ast.SelectionSet, v []graphQLModel.OutputData) graphql.Marshaler { + ret := make(graphql.Array, len(v)) + var wg sync.WaitGroup + isLen1 := len(v) == 1 + if !isLen1 { + wg.Add(len(v)) + } + for i := range v { + i := i + fc := &graphql.FieldContext{ + Index: &i, + Result: &v[i], + } + ctx := graphql.WithFieldContext(ctx, fc) + f := func(i int) { + defer func() { + if r := recover(); r != nil { + ec.Error(ctx, ec.Recover(ctx, r)) + ret = nil + } + }() + if !isLen1 { + defer wg.Done() + } + ret[i] = ec.marshalNOutputData2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐOutputData(ctx, sel, v[i]) + } + if isLen1 { + f(i) + } else { + go f(i) + } + + } + wg.Wait() + + for _, e := range ret { + if e == graphql.Null { + return graphql.Null + } + } + + return ret +} + func (ec *executionContext) marshalNSDInstance2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSDInstance(ctx context.Context, sel ast.SelectionSet, v graphQLModel.SDInstance) graphql.Marshaler { return ec._SDInstance(ctx, sel, &v) } @@ -9379,6 +10271,38 @@ func (ec *executionContext) unmarshalNSDTypeInput2githubᚗcomᚋMichalBuresᚑO return res, graphql.ErrorOnPath(ctx, err) } +func (ec *executionContext) unmarshalNSensorField2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSensorField(ctx context.Context, v interface{}) (graphQLModel.SensorField, error) { + res, err := ec.unmarshalInputSensorField(ctx, v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) unmarshalNSensorField2ᚕgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSensorFieldᚄ(ctx context.Context, v interface{}) ([]graphQLModel.SensorField, error) { + var vSlice []interface{} + if v != nil { + vSlice = graphql.CoerceList(v) + } + var err error + res := make([]graphQLModel.SensorField, len(vSlice)) + for i := range vSlice { + ctx := graphql.WithPathContext(ctx, graphql.NewPathWithIndex(i)) + res[i], err = ec.unmarshalNSensorField2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSensorField(ctx, vSlice[i]) + if err != nil { + return nil, err + } + } + return res, nil +} + +func (ec *executionContext) unmarshalNSensorsWithFields2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSensorsWithFields(ctx context.Context, v interface{}) (graphQLModel.SensorsWithFields, error) { + res, err := ec.unmarshalInputSensorsWithFields(ctx, v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) unmarshalNSimpleSensors2githubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐSimpleSensors(ctx context.Context, v interface{}) (graphQLModel.SimpleSensors, error) { + res, err := ec.unmarshalInputSimpleSensors(ctx, v) + return res, graphql.ErrorOnPath(ctx, err) +} + func (ec *executionContext) unmarshalNString2string(ctx context.Context, v interface{}) (string, error) { res, err := graphql.UnmarshalString(v) return res, graphql.ErrorOnPath(ctx, err) @@ -9705,6 +10629,22 @@ func (ec *executionContext) marshalOBoolean2ᚖbool(ctx context.Context, sel ast return res } +func (ec *executionContext) unmarshalODate2ᚖstring(ctx context.Context, v interface{}) (*string, error) { + if v == nil { + return nil, nil + } + res, err := graphql.UnmarshalString(v) + return &res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalODate2ᚖstring(ctx context.Context, sel ast.SelectionSet, v *string) graphql.Marshaler { + if v == nil { + return graphql.Null + } + res := graphql.MarshalString(*v) + return res +} + func (ec *executionContext) unmarshalOFloat2ᚖfloat64(ctx context.Context, v interface{}) (*float64, error) { if v == nil { return nil, nil @@ -9737,6 +10677,22 @@ func (ec *executionContext) marshalOID2ᚖuint32(ctx context.Context, sel ast.Se return res } +func (ec *executionContext) unmarshalOInt2ᚖint(ctx context.Context, v interface{}) (*int, error) { + if v == nil { + return nil, nil + } + res, err := graphql.UnmarshalInt(v) + return &res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalOInt2ᚖint(ctx context.Context, sel ast.SelectionSet, v *int) graphql.Marshaler { + if v == nil { + return graphql.Null + } + res := graphql.MarshalInt(*v) + return res +} + func (ec *executionContext) unmarshalOLogicalOperationType2ᚖgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐLogicalOperationType(ctx context.Context, v interface{}) (*graphQLModel.LogicalOperationType, error) { if v == nil { return nil, nil @@ -9753,6 +10709,30 @@ func (ec *executionContext) marshalOLogicalOperationType2ᚖgithubᚗcomᚋMicha return v } +func (ec *executionContext) unmarshalOStatisticsInput2ᚖgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐStatisticsInput(ctx context.Context, v interface{}) (*graphQLModel.StatisticsInput, error) { + if v == nil { + return nil, nil + } + res, err := ec.unmarshalInputStatisticsInput(ctx, v) + return &res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) unmarshalOStatisticsOperation2ᚖgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐStatisticsOperation(ctx context.Context, v interface{}) (*graphQLModel.StatisticsOperation, error) { + if v == nil { + return nil, nil + } + var res = new(graphQLModel.StatisticsOperation) + err := res.UnmarshalGQL(v) + return res, graphql.ErrorOnPath(ctx, err) +} + +func (ec *executionContext) marshalOStatisticsOperation2ᚖgithubᚗcomᚋMichalBuresᚑOGᚋbpᚑburesᚑRIoTᚑbackendᚑcoreᚋsrcᚋmodelᚋgraphQLModelᚐStatisticsOperation(ctx context.Context, sel ast.SelectionSet, v *graphQLModel.StatisticsOperation) graphql.Marshaler { + if v == nil { + return graphql.Null + } + return v +} + func (ec *executionContext) unmarshalOString2ᚖstring(ctx context.Context, v interface{}) (*string, error) { if v == nil { return nil, nil diff --git a/backend/backend-core/src/api/graphql/schema.graphqls b/backend/backend-core/src/api/graphql/schema.graphqls index 8b7db18..3c69f99 100644 --- a/backend/backend-core/src/api/graphql/schema.graphqls +++ b/backend/backend-core/src/api/graphql/schema.graphqls @@ -205,6 +205,90 @@ input SDInstanceGroupInput { sdInstanceIDs: [ID!]! } +# ----- Statistics + +scalar Date + +input SimpleSensors { + sensors: [String!]! +} + +input SensorsWithFields { + sensors: [SensorField!]! +} + +input SensorField { + key: String! + values: [String!]! +} + +enum StatisticsOperation { + mean + min + max + first + sum + last + none + count + integral + median + mode + quantile + reduce + skew + spread + stddev + timeweightedavg +} + +""" +Data used for querying the selected bucket +""" +input StatisticsInput { + """ + Start of the querying window + """ + from: Date + """ + End of the querying window + """ + to: Date + """ + Amount of minutes to aggregate by + For example if the queried range has 1 hour and aggregateMinutes is set to 10 the aggregation will result in 6 points + """ + aggregateMinutes: Int + """ + Timezone override default UTC. + For more details why and how this affects queries see: https://www.influxdata.com/blog/time-zones-in-flux/. + In most cases you can ignore this and some edge aggregations can be influenced. + If you need a precise result or the aggregation uses high amount of minutes provide the target time zone. + """ + timezone: String + """ + Aggregation operator to use, if needed + """ + operation: StatisticsOperation +} + + +scalar JSON + +type OutputData { + time: Date! + deviceId: String! + deviceType: String + data: JSON! +} + +input InputData { + time: Date! + deviceId: String! + deviceType: String + data: JSON! +} + # ----- Queries, mutations and subscriptions ----- type Query { @@ -216,6 +300,8 @@ type Query { kpiFulfillmentCheckResults: [KPIFulfillmentCheckResult!]! sdInstanceGroup(id: ID!): SDInstanceGroup! sdInstanceGroups: [SDInstanceGroup!]! + statisticsQuerySimpleSensors(request: StatisticsInput sensors: SimpleSensors!): [OutputData!]! + statisticsQuerySensorsWithFields(request: StatisticsInput sensors: SensorsWithFields!): [OutputData!]! } type Mutation { @@ -228,6 +314,7 @@ type Mutation { createSDInstanceGroup(input: SDInstanceGroupInput!): SDInstanceGroup! updateSDInstanceGroup(id: ID!, input: SDInstanceGroupInput!): SDInstanceGroup! deleteSDInstanceGroup(id: ID!): Boolean! + statisticsMutate(inputData: InputData!): Boolean! } type Subscription { diff --git a/backend/backend-core/src/api/graphql/schema.resolvers.go b/backend/backend-core/src/api/graphql/schema.resolvers.go index b895dc6..0fce5df 100644 --- a/backend/backend-core/src/api/graphql/schema.resolvers.go +++ b/backend/backend-core/src/api/graphql/schema.resolvers.go @@ -81,6 +81,10 @@ func (r *mutationResolver) DeleteSDInstanceGroup(ctx context.Context, id uint32) return true, nil } +func (r *mutationResolver) StatisticsMutate(ctx context.Context, inputData graphQLModel.InputData) (bool, error) { + return domainLogicLayer.Save(inputData).Unwrap() +} + func (r *queryResolver) SdType(ctx context.Context, id uint32) (graphQLModel.SDType, error) { getSDTypeResult := domainLogicLayer.GetSDType(id) if getSDTypeResult.IsFailure() { @@ -145,6 +149,18 @@ func (r *queryResolver) SdInstanceGroups(ctx context.Context) ([]graphQLModel.SD return getSDInstanceGroupsResult.Unwrap() } +func (r *queryResolver) StatisticsQuerySimpleSensors(ctx context.Context, request *graphQLModel.StatisticsInput, sensors graphQLModel.SimpleSensors) ([]graphQLModel.OutputData, error) { + convertedRequest, _ := domainLogicLayer.MapStatisticsInputToReadRequestBody(request, &sensors, nil) + data := domainLogicLayer.Query(*convertedRequest) + return data.Unwrap() +} + +func (r *queryResolver) StatisticsQuerySensorsWithFields(ctx context.Context, request *graphQLModel.StatisticsInput, sensors graphQLModel.SensorsWithFields) ([]graphQLModel.OutputData, error) { + convertedRequest, _ := domainLogicLayer.MapStatisticsInputToReadRequestBody(request, nil, &sensors) + data := domainLogicLayer.Query(*convertedRequest) + return data.Unwrap() +} + func (r *subscriptionResolver) OnSDInstanceRegistered(ctx context.Context) (<-chan graphQLModel.SDInstance, error) { return SDInstanceGraphQLSubscriptionChannel, nil } diff --git a/backend/backend-core/src/domainLogicLayer/statistics.go b/backend/backend-core/src/domainLogicLayer/statistics.go new file mode 100644 index 0000000..df9b34d --- /dev/null +++ b/backend/backend-core/src/domainLogicLayer/statistics.go @@ -0,0 +1,193 @@ +package domainLogicLayer + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/MichalBures-OG/bp-bures-RIoT-backend-core/src/model/graphQLModel" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/rabbitmq" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedConstants" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedModel" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedUtils" + amqp "github.com/rabbitmq/amqp091-go" + "log" + "math/rand" + "time" +) + +func randomString(l int) string { + rand.Seed(time.Now().UnixNano()) + bytes := make([]byte, l) + for i := 0; i < l; i++ { + bytes[i] = byte(randInt(65, 90)) + } + return string(bytes) +} + +func randInt(min int, max int) int { + return min + rand.Intn(max-min) +} + +func Query(input sharedModel.ReadRequestBody) sharedUtils.Result[[]graphQLModel.OutputData] { + request := sharedUtils.SerializeToJSON(input) + + rabbitMQClient := getDLLRabbitMQClient() + + correlationId := randomString(32) + + outputChannel := make(chan sharedUtils.Result[[]sharedModel.OutputData]) + + go func() { + client := rabbitmq.NewClient() + defer client.Dispose() + + err := rabbitmq.ConsumeJSONMessagesWithAccessToDelivery[sharedModel.ReadRequestResponseOrError]( + client, + sharedConstants.TimeSeriesReadRequestBackendCoreResponseQueueName, + correlationId, + func(readRequestResponseOrError sharedModel.ReadRequestResponseOrError, delivery amqp.Delivery) error { + if readRequestResponseOrError.Error != "" { + outputChannel <- sharedUtils.NewFailureResult[[]sharedModel.OutputData](errors.New(readRequestResponseOrError.Error)) + } else { + outputChannel <- sharedUtils.NewSuccessResult[[]sharedModel.OutputData](readRequestResponseOrError.Data) + } + + close(outputChannel) + return nil + }, + ) + + if err != nil { + log.Printf("Statistics Query | %s", err) + outputChannel <- sharedUtils.NewFailureResult[[]sharedModel.OutputData](err) + close(outputChannel) + } + }() + + err := rabbitMQClient.PublishJSONMessageRPC( + sharedUtils.NewEmptyOptional[string](), + sharedUtils.NewOptionalOf(sharedConstants.TimeSeriesReadRequestQueueName), + request.GetPayload(), + correlationId, + sharedUtils.NewOptionalOf(sharedConstants.TimeSeriesReadRequestBackendCoreResponseQueueName), + ) + + if err != nil { + log.Printf("Statistics Query | %s", err) + return sharedUtils.NewFailureResult[[]graphQLModel.OutputData](err) + } + + result := <-outputChannel + + if result.IsFailure() { + log.Printf("Statistics Query | %s", result.GetError()) + return sharedUtils.NewFailureResult[[]graphQLModel.OutputData](result.GetError()) + } + + convertedResult, err := ConvertOutputData(result.GetPayload()) + + if err != nil { + log.Printf("Statistics Query | %s", err) + return sharedUtils.NewFailureResult[[]graphQLModel.OutputData](err) + } + + return sharedUtils.NewSuccessResult[[]graphQLModel.OutputData](convertedResult) +} + +func Save(input graphQLModel.InputData) sharedUtils.Result[bool] { + rabbitMQClient := getDLLRabbitMQClient() + rabbitMQClient.PublishJSONMessage(sharedUtils.NewOptionalOf(sharedConstants.TimeSeriesStoreDataQueueName), sharedUtils.NewOptionalOf(""), sharedUtils.SerializeToJSON(input).GetPayload()) + return sharedUtils.NewSuccessResult[bool](true) +} + +// MapStatisticsInputToReadRequestBody Refactored function with explicit parameters for SimpleSensors and SensorsWithFields +func MapStatisticsInputToReadRequestBody(statsInput *graphQLModel.StatisticsInput, simpleSensors *graphQLModel.SimpleSensors, sensorsWithFields *graphQLModel.SensorsWithFields) (*sharedModel.ReadRequestBody, error) { + readRequestBody := &sharedModel.ReadRequestBody{} + + // Check if SimpleSensors is provided + if simpleSensors != nil { + readRequestBody.Sensors = simpleSensors.Sensors + } else if sensorsWithFields != nil { + resultMap := make(map[string][]string) + + // Loop through each SensorField and add it to the result map + for _, sensor := range sensorsWithFields.Sensors { + resultMap[sensor.Key] = sensor.Values + } + + readRequestBody.Sensors = resultMap + } else { + return nil, fmt.Errorf("sensors are required, but neither SimpleSensors nor SensorsWithFields were provided") + } + + // If statsInput is not nil, copy the values + if statsInput != nil { + // Copy Operation if present + if statsInput.Operation != nil { + readRequestBody.Operation = sharedModel.Operation(*statsInput.Operation) + } + + // Copy Timezone if present + if statsInput.Timezone != nil { + readRequestBody.Timezone = *statsInput.Timezone + } + + // Copy AggregateMinutes if present + if statsInput.AggregateMinutes != nil { + readRequestBody.AggregateMinutes = *statsInput.AggregateMinutes + } + + // Convert From time (string to *time.Time) + if statsInput.From != nil { + parsedTime, err := time.Parse(time.RFC3339, *statsInput.From) + if err != nil { + return nil, fmt.Errorf("failed to parse From time: %v", err) + } + readRequestBody.From = &parsedTime + } + + // Convert To time (string to *time.Time) + if statsInput.To != nil { + parsedTime, err := time.Parse(time.RFC3339, *statsInput.To) + if err != nil { + return nil, fmt.Errorf("failed to parse To time: %v", err) + } + readRequestBody.To = &parsedTime + } + } + + return readRequestBody, nil +} + +// ConvertOutputData converts a slice of sharedModel.OutputData to a slice of graphQLModel.OutputData +func ConvertOutputData(sharedData []sharedModel.OutputData) ([]graphQLModel.OutputData, error) { + var result []graphQLModel.OutputData + + for _, item := range sharedData { + // Convert Time to string + timeString := item.Time.Format(time.RFC3339) + + // Convert Data map to JSON string + dataBytes, err := json.Marshal(item.Data) + if err != nil { + return nil, fmt.Errorf("error marshaling data: %v", err) + } + dataString := string(dataBytes) + + // Handle DeviceType (may be empty in sharedModel, so we use pointer in graphQLModel) + var deviceType *string + if item.DeviceType != "" { + deviceType = &item.DeviceType + } + + // Map to graphQLModel.OutputData + result = append(result, graphQLModel.OutputData{ + Time: timeString, + DeviceID: item.DeviceID, + DeviceType: deviceType, + Data: dataString, + }) + } + + return result, nil +} diff --git a/backend/backend-core/src/isc/setup.go b/backend/backend-core/src/isc/setup.go index d2bd81a..363f433 100644 --- a/backend/backend-core/src/isc/setup.go +++ b/backend/backend-core/src/isc/setup.go @@ -12,7 +12,17 @@ func getQueueDeclarationErrorMessage(queueName string) string { } func SetupRabbitMQInfrastructureForISC(rabbitMQClient rabbitmq.Client) { - namesOfQueuesToDeclare := sharedUtils.SliceOf[string](sharedConstants.KPIFulfillmentCheckResultsQueueName, sharedConstants.KPIFulfillmentCheckRequestsQueueName, sharedConstants.SDInstanceRegistrationRequestsQueueName, sharedConstants.SetOfSDTypesUpdatesQueueName, sharedConstants.SetOfSDInstancesUpdatesQueueName, sharedConstants.MessageProcessingUnitConnectionNotificationsQueueName) + namesOfQueuesToDeclare := sharedUtils.SliceOf[string]( + sharedConstants.KPIFulfillmentCheckResultsQueueName, + sharedConstants.KPIFulfillmentCheckRequestsQueueName, + sharedConstants.SDInstanceRegistrationRequestsQueueName, + sharedConstants.SetOfSDTypesUpdatesQueueName, + sharedConstants.SetOfSDInstancesUpdatesQueueName, + sharedConstants.MessageProcessingUnitConnectionNotificationsQueueName, + sharedConstants.TimeSeriesStoreDataQueueName, + sharedConstants.TimeSeriesReadRequestQueueName, + sharedConstants.TimeSeriesReadRequestBackendCoreResponseQueueName, + ) sharedUtils.ForEach(namesOfQueuesToDeclare, func(nameOfQueuesToDeclare string) { sharedUtils.TerminateOnError(rabbitMQClient.DeclareQueue(nameOfQueuesToDeclare), getQueueDeclarationErrorMessage(nameOfQueuesToDeclare)) }) diff --git a/backend/backend-core/src/main.go b/backend/backend-core/src/main.go index d554b82..364044a 100644 --- a/backend/backend-core/src/main.go +++ b/backend/backend-core/src/main.go @@ -40,7 +40,7 @@ func main() { log.Println("Waiting for dependencies...") waitForDependencies() log.Println("Dependencies ready...") - sharedUtils.StartLoggingProfilingInformationPeriodically(time.Minute) + //sharedUtils.StartLoggingProfilingInformationPeriodically(time.Minute) kickstartISC() graphql.SetupGraphQLServer() } diff --git a/backend/backend-core/src/model/dbModel/model.go b/backend/backend-core/src/model/dbModel/model.go index d4af206..7ea8727 100644 --- a/backend/backend-core/src/model/dbModel/model.go +++ b/backend/backend-core/src/model/dbModel/model.go @@ -1,5 +1,10 @@ package dbModel +import ( + "gorm.io/gorm" + "time" +) + type KPIDefinitionEntity struct { ID uint32 `gorm:"column:id;primaryKey"` UserIdentifier string `gorm:"column:user_identifier;not null"` @@ -55,6 +60,7 @@ type SDTypeEntity struct { ID uint32 `gorm:"column:id;primaryKey;not null"` Denotation string `gorm:"column:denotation;not null;index"` // Denotation is a separately indexed field Parameters []SDParameterEntity `gorm:"foreignKey:SDTypeID;constraint:OnDelete:CASCADE"` + Commands []SDCommandEntity `gorm:"foreignKey:SDTypeID;constraint:OnDelete:CASCADE"` } func (SDTypeEntity) TableName() string { @@ -122,3 +128,57 @@ type SDInstanceKPIDefinitionRelationshipEntity struct { SDInstanceID uint32 `gorm:"column:sd_instance_id;primaryKey;not null"` SDInstanceUID string `gorm:"column:sd_instance_uid;not null"` } + +// UserEntity represents a user of the application who can log in using various OAuth providers. +type UserEntity struct { + ID uint `gorm:"primaryKey"` // Primary key for the user + CreatedAt time.Time // Timestamp of creation + UpdatedAt time.Time // Timestamp of the last update + DeletedAt gorm.DeletedAt `gorm:"index"` // Soft delete field + + // Basic User Information + Username string `gorm:"uniqueIndex;size:100"` // Unique username for the user + Email string `gorm:"uniqueIndex;size:255"` // User's email address (unique) + Name string `gorm:"size:255"` // Full name of the user + ProfileImage string `gorm:"size:500"` // URL to the user's profile image + + // OAuth Information + Provider string `gorm:"size:50"` // OAuth provider name (e.g., google, github) + ProviderID string `gorm:"size:255;index"` // Unique ID provided by the OAuth provider + OAuthToken string `gorm:"size:500"` // OAuth access token + RefreshToken string `gorm:"size:500"` // OAuth refresh token, if available + TokenExpiry time.Time // Expiration time of the OAuth token + + // Additional Metadata + LastLoginAt time.Time // Timestamp of the last login + IsActive bool `gorm:"default:true"` // Whether the user's account is active + Invocations []SDCommandInvocationEntity `gorm:"foreignKey:UserId;constraint:OnDelete:CASCADE"` +} + +func (UserEntity) TableName() string { + return "user" +} + +type SDCommandEntity struct { + ID uint32 `gorm:"column:id;primaryKey;not null"` + SDTypeID uint32 `gorm:"column:sd_type_id;not null"` + Denotation string `gorm:"column:denotation;not null"` + Type string `gorm:"column:type;not null"` + Payload string `gorm:"column:payload;not null"` + Invocations []SDCommandInvocationEntity `gorm:"foreignKey:ID;constraint:OnDelete:CASCADE"` +} + +func (SDCommandEntity) TableName() string { + return "command" +} + +type SDCommandInvocationEntity struct { + ID uint32 `gorm:"column:id;primaryKey;not null"` + InvocationTime time.Time + Payload string `gorm:"column:payload;not null"` + UserId uint32 `gorm:"column:user_id"` +} + +func (SDCommandInvocationEntity) TableName() string { + return "command_invocation" +} diff --git a/backend/backend-core/src/model/graphQLModel/graphQLModel.go b/backend/backend-core/src/model/graphQLModel/graphQLModel.go index 785e9ae..517b1cf 100644 --- a/backend/backend-core/src/model/graphQLModel/graphQLModel.go +++ b/backend/backend-core/src/model/graphQLModel/graphQLModel.go @@ -43,6 +43,13 @@ func (this BooleanEQAtomKPINode) GetSdParameterSpecification() string { return this.SdParameterSpecification } +type InputData struct { + Time string `json:"time"` + DeviceID string `json:"deviceId"` + DeviceType *string `json:"deviceType,omitempty"` + Data string `json:"data"` +} + type KPIDefinition struct { ID uint32 `json:"id"` SdTypeID uint32 `json:"sdTypeID"` @@ -204,6 +211,13 @@ func (this NumericLTAtomKPINode) GetSdParameterSpecification() string { return this.SdParameterSpecification } +type OutputData struct { + Time string `json:"time"` + DeviceID string `json:"deviceId"` + DeviceType *string `json:"deviceType,omitempty"` + Data string `json:"data"` +} + type Query struct { } @@ -253,6 +267,37 @@ type SDTypeInput struct { Parameters []SDParameterInput `json:"parameters"` } +type SensorField struct { + Key string `json:"key"` + Values []string `json:"values"` +} + +type SensorsWithFields struct { + Sensors []SensorField `json:"sensors"` +} + +type SimpleSensors struct { + Sensors []string `json:"sensors"` +} + +// Data used for querying the selected bucket +type StatisticsInput struct { + // Start of the querying window + From *string `json:"from,omitempty"` + // End of the querying window + To *string `json:"to,omitempty"` + // Amount of minutes to aggregate by + // For example if the queried range has 1 hour and aggregateMinutes is set to 10 the aggregation will result in 6 points + AggregateMinutes *int `json:"aggregateMinutes,omitempty"` + // Timezone override default UTC. + // For more details why and how this affects queries see: https://www.influxdata.com/blog/time-zones-in-flux/. + // In most cases you can ignore this and some edge aggregations can be influenced. + // If you need a precise result or the aggregation uses high amount of minutes provide the target time zone. + Timezone *string `json:"timezone,omitempty"` + // Aggregation operator to use, if needed + Operation *StatisticsOperation `json:"operation,omitempty"` +} + type StringEQAtomKPINode struct { ID uint32 `json:"id"` ParentNodeID *uint32 `json:"parentNodeID,omitempty"` @@ -456,3 +501,74 @@ func (e *SDParameterType) UnmarshalGQL(v interface{}) error { func (e SDParameterType) MarshalGQL(w io.Writer) { fmt.Fprint(w, strconv.Quote(e.String())) } + +type StatisticsOperation string + +const ( + StatisticsOperationMean StatisticsOperation = "mean" + StatisticsOperationMin StatisticsOperation = "min" + StatisticsOperationMax StatisticsOperation = "max" + StatisticsOperationFirst StatisticsOperation = "first" + StatisticsOperationSum StatisticsOperation = "sum" + StatisticsOperationLast StatisticsOperation = "last" + StatisticsOperationNone StatisticsOperation = "none" + StatisticsOperationCount StatisticsOperation = "count" + StatisticsOperationIntegral StatisticsOperation = "integral" + StatisticsOperationMedian StatisticsOperation = "median" + StatisticsOperationMode StatisticsOperation = "mode" + StatisticsOperationQuantile StatisticsOperation = "quantile" + StatisticsOperationReduce StatisticsOperation = "reduce" + StatisticsOperationSkew StatisticsOperation = "skew" + StatisticsOperationSpread StatisticsOperation = "spread" + StatisticsOperationStddev StatisticsOperation = "stddev" + StatisticsOperationTimeweightedavg StatisticsOperation = "timeweightedavg" +) + +var AllStatisticsOperation = []StatisticsOperation{ + StatisticsOperationMean, + StatisticsOperationMin, + StatisticsOperationMax, + StatisticsOperationFirst, + StatisticsOperationSum, + StatisticsOperationLast, + StatisticsOperationNone, + StatisticsOperationCount, + StatisticsOperationIntegral, + StatisticsOperationMedian, + StatisticsOperationMode, + StatisticsOperationQuantile, + StatisticsOperationReduce, + StatisticsOperationSkew, + StatisticsOperationSpread, + StatisticsOperationStddev, + StatisticsOperationTimeweightedavg, +} + +func (e StatisticsOperation) IsValid() bool { + switch e { + case StatisticsOperationMean, StatisticsOperationMin, StatisticsOperationMax, StatisticsOperationFirst, StatisticsOperationSum, StatisticsOperationLast, StatisticsOperationNone, StatisticsOperationCount, StatisticsOperationIntegral, StatisticsOperationMedian, StatisticsOperationMode, StatisticsOperationQuantile, StatisticsOperationReduce, StatisticsOperationSkew, StatisticsOperationSpread, StatisticsOperationStddev, StatisticsOperationTimeweightedavg: + return true + } + return false +} + +func (e StatisticsOperation) String() string { + return string(e) +} + +func (e *StatisticsOperation) UnmarshalGQL(v interface{}) error { + str, ok := v.(string) + if !ok { + return fmt.Errorf("enums must be strings") + } + + *e = StatisticsOperation(str) + if !e.IsValid() { + return fmt.Errorf("%s is not a valid StatisticsOperation", str) + } + return nil +} + +func (e StatisticsOperation) MarshalGQL(w io.Writer) { + fmt.Fprint(w, strconv.Quote(e.String())) +} diff --git a/backend/commons/src/rabbitmq/client.go b/backend/commons/src/rabbitmq/client.go index 6d753b3..83b971d 100644 --- a/backend/commons/src/rabbitmq/client.go +++ b/backend/commons/src/rabbitmq/client.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedUtils" amqp "github.com/rabbitmq/amqp091-go" + "log" + "strconv" "sync" "time" ) @@ -55,8 +57,10 @@ func (c *connectionManager) release() { type Client interface { GetChannel() *amqp.Channel PublishJSONMessage(exchangeNameOptional sharedUtils.Optional[string], routingKeyOptional sharedUtils.Optional[string], messagePayload []byte) error + PublishJSONMessageRPC(exchangeNameOptional sharedUtils.Optional[string], routingKeyOptional sharedUtils.Optional[string], messagePayload []byte, correlationId string, replyTo sharedUtils.Optional[string]) error DeclareQueue(queueName string) error SetupMessageConsumption(queueName string, messageConsumerFunction func(message amqp.Delivery) error) error + SetupMessageConsumptionWithCorrelationId(queueName string, correlationId string, messageConsumerFunction func(message amqp.Delivery) error) error Dispose() } @@ -87,6 +91,24 @@ func (c *ClientImpl) PublishJSONMessage(exchangeNameOptional sharedUtils.Optiona }) } +func (c *ClientImpl) PublishJSONMessageRPC(exchangeNameOptional sharedUtils.Optional[string], routingKeyOptional sharedUtils.Optional[string], messagePayload []byte, correlationId string, replyToOptional sharedUtils.Optional[string]) error { + timeout := 10 * time.Second + ctx, cancelFunction := context.WithTimeout(context.Background(), timeout) + expiration := strconv.Itoa(int(timeout / time.Millisecond)) + defer cancelFunction() + exchangeName := exchangeNameOptional.GetPayloadOrDefault("") + routingKey := routingKeyOptional.GetPayloadOrDefault("") + replyTo := replyToOptional.GetPayloadOrDefault("") + + return c.channel.PublishWithContext(ctx, exchangeName, routingKey, false, false, amqp.Publishing{ + ContentType: "application/json", + Body: messagePayload, + CorrelationId: correlationId, + ReplyTo: replyTo, + Expiration: expiration, + }) +} + func (c *ClientImpl) DeclareQueue(queueName string) error { _, err := c.channel.QueueDeclare(queueName, false, false, false, false, nil) return err @@ -111,6 +133,80 @@ func (c *ClientImpl) SetupMessageConsumption(queueName string, messageConsumerFu return nil } +func (c *ClientImpl) SetupMessageConsumptionWithCorrelationId(queueName string, correlationId string, messageConsumerFunction func(message amqp.Delivery) error) error { + consumerName := fmt.Sprintf("%s-consumer", correlationId) + if err := c.channel.Qos(1, 0, false); err != nil { + return err + } + + messageChannel, err := c.channel.Consume(queueName, consumerName, false, false, false, false, nil) + if err != nil { + log.Printf("SetupMessageConsumptionWithCorrelationId | %s", err) + return err + } + + // Timeout after a time, so you don't ping-pong with a request. Most likely the API won't even care after this time + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + for { + select { + case message, ok := <-messageChannel: + if !ok { + return fmt.Errorf("SetupMessageConsumptionWithCorrelationId | Message channel closed") + } + + log.Printf("SetupMessageConsumptionWithCorrelationId | Seeing message correlationId %s from a consumer %s\n", message.CorrelationId, consumerName) + + if message.CorrelationId != correlationId { + // Return the message back to the queue if it has a wrong correlation ID + if err := message.Nack(false, true); err != nil { + log.Printf("SetupMessageConsumptionWithCorrelationId | %s", err) + return err + } + continue + } + + if err := message.Ack(false); err != nil { + log.Printf("SetupMessageConsumptionWithCorrelationId | %s", err) + return err + } + + err := c.channel.Cancel(consumerName, true) + if err != nil { + log.Printf("SetupMessageConsumptionWithCorrelationId | %s", err) + return err + } + + if err := messageConsumerFunction(message); err != nil { + log.Printf("SetupMessageConsumptionWithCorrelationId | %s", err) + return err + } + + log.Println("SetupMessageConsumptionWithCorrelationId | Consuming message and closing channel") + + err = c.channel.Cancel(consumerName, true) // Stop consuming + if err != nil { + log.Printf("SetupMessageConsumptionWithCorrelationId | %s", err) + return err + } + + log.Println("SetupMessageConsumptionWithCorrelationId | Closed channel, exiting") + + return nil // Exit after processing + + case <-ctx.Done(): + log.Println("SetupMessageConsumptionWithCorrelationId | Timeout reached. Stopping message consumption.") + err := c.channel.Cancel(consumerName, true) // Stop consuming + if err != nil { + log.Printf("SetupMessageConsumptionWithCorrelationId | %s", err) + return err + } + return fmt.Errorf("SetupMessageConsumptionWithCorrelationId | message consumption timed out after 100 seconds") + } + } +} + func (c *ClientImpl) Dispose() { sharedUtils.LogPossibleErrorThenProceed(c.channel.Close(), "[RabbitMQ client] Failed to close a channel") getConnectionManagerInstance().release() @@ -130,6 +226,28 @@ func ConsumeJSONMessages[T any](client Client, queueName string, messagePayloadC }) } +func ConsumeJSONMessagesWithAccessToDelivery[T any](client Client, queueName string, correlationId string, messagePayloadConsumerFunction func(messagePayload T, delivery amqp.Delivery) error) error { + messageConsumerFunction := func(message amqp.Delivery) error { + messageContentType := message.ContentType + if messageContentType != "application/json" { + return fmt.Errorf("incorrect message content type: %s", messageContentType) + } + + jsonDeserializationResult := sharedUtils.DeserializeFromJSON[T](message.Body) + if jsonDeserializationResult.IsFailure() { + fmt.Println(jsonDeserializationResult.GetError()) + return jsonDeserializationResult.GetError() + } + return messagePayloadConsumerFunction(jsonDeserializationResult.GetPayload(), message) + } + + if correlationId == "" { + return client.SetupMessageConsumption(queueName, messageConsumerFunction) + } else { + return client.SetupMessageConsumptionWithCorrelationId(queueName, correlationId, messageConsumerFunction) + } +} + func ConsumeJSONMessagesFromFanoutExchange[T any](client Client, fanoutExchangeName string, messagePayloadConsumerFunction func(messagePayload T) error) error { queue, err := client.GetChannel().QueueDeclare("", false, false, true, false, nil) if err != nil { diff --git a/backend/commons/src/sharedConstants/sharedConstants.go b/backend/commons/src/sharedConstants/sharedConstants.go index ac14c27..4b6f788 100644 --- a/backend/commons/src/sharedConstants/sharedConstants.go +++ b/backend/commons/src/sharedConstants/sharedConstants.go @@ -8,4 +8,7 @@ const ( SDInstanceRegistrationRequestsQueueName = "sd-instance-registration-requests" SetOfSDInstancesUpdatesQueueName = "set-of-sd-instances-updates" SetOfSDTypesUpdatesQueueName = "set-of-sd-types-updates" + TimeSeriesStoreDataQueueName = "time-series-store-data" + TimeSeriesReadRequestQueueName = "time-series-read-request" + TimeSeriesReadRequestBackendCoreResponseQueueName = "time-series-read-request-backend-core-response" ) diff --git a/backend/commons/src/sharedModel/iscMessages.go b/backend/commons/src/sharedModel/iscMessages.go index 43a068f..073582e 100644 --- a/backend/commons/src/sharedModel/iscMessages.go +++ b/backend/commons/src/sharedModel/iscMessages.go @@ -33,3 +33,8 @@ type SDInstanceConfigurationUpdateISCMessage []SDInstanceInfo type KPIConfigurationUpdateISCMessage map[string][]KPIDefinition type MessageProcessingUnitConnectionNotification struct{} + +type ReadRequestResponseOrError struct { + Data []OutputData `json:"data,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/backend/commons/src/sharedModel/timeSeries.go b/backend/commons/src/sharedModel/timeSeries.go new file mode 100644 index 0000000..ddc59b7 --- /dev/null +++ b/backend/commons/src/sharedModel/timeSeries.go @@ -0,0 +1,136 @@ +package sharedModel + +import ( + "encoding/json" + "fmt" + "time" +) + +// Sensors represents the sensors to be queried. +type Sensors interface{} + +// SimpleSensors represents a simple definition of sensors. +type SimpleSensors []string + +// SensorsWithFields represents sensors with specific fields. +type SensorsWithFields map[string][]string + +// AreSimpleSensors checks if the sensors are of type SimpleSensors. +func AreSimpleSensors(sensors interface{}) bool { + switch sensors.(type) { + case SimpleSensors: + return true + case []string: + return true + default: + return false + } +} + +// Operation represents aggregation operations. +type Operation string + +// InputData represents a single input data point. +type InputData struct { + Timestamp float64 `json:"timestamp"` + SDInstanceUID string `json:"sdInstanceUID"` + SDTypeSpecification string `json:"sdTypeSpecification"` + Parameters any `json:"parameters"` +} + +// OutputData represents a single data point retrieved from InfluxDB. +type OutputData struct { + Result string `json:"result"` // Result metadata + Table int64 `json:"table"` // Table number metadata + Time time.Time `json:"time"` // Time of the current data sample + DeviceID string `json:"deviceId"` // Device identifier + DeviceType string `json:"deviceType"` // Device type + Data map[string]interface{} `json:"data"` +} + +// ReadRequestBody represents the request body for the statistics endpoint. +type ReadRequestBody struct { + Sensors Sensors `json:"sensors"` + Operation Operation `json:"operation,omitempty"` + Timezone string `json:"timezone,omitempty"` + From *time.Time `json:"from,omitempty"` + To *time.Time `json:"to,omitempty"` + AggregateMinutes int `json:"aggregateMinutes,omitempty"` +} + +// UnmarshalJSON customizes the JSON unmarshaling for ReadRequestBody. +func (r *ReadRequestBody) UnmarshalJSON(data []byte) error { + type Alias ReadRequestBody + aux := &struct { + From string `json:"from"` + To string `json:"to"` + Sensors json.RawMessage `json:"sensors"` // Delay parsing sensors + Alias + }{ + Alias: (Alias)(*r), + } + + // Unmarshal into aux + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + + // Parse time fields safely + if aux.From != "" { + fromTime, err := time.Parse(time.RFC3339, aux.From) + if err != nil { + return fmt.Errorf("invalid 'from' time format: %s", aux.From) + } + r.From = &fromTime + } + + if aux.To != "" { + toTime, err := time.Parse(time.RFC3339, aux.To) + if err != nil { + return fmt.Errorf("invalid 'to' time format: %s", aux.To) + } + r.To = &toTime + } + + // Assign other fields + r.Operation = aux.Operation + r.Timezone = aux.Timezone + r.AggregateMinutes = aux.AggregateMinutes + + // Try to parse Sensors as SimpleSensors + var simpleSensors SimpleSensors + if err := json.Unmarshal(aux.Sensors, &simpleSensors); err == nil { + r.Sensors = simpleSensors + return nil + } + + // Try to parse Sensors as SensorsWithFields + var sensorsWithFields SensorsWithFields + if err := json.Unmarshal(aux.Sensors, &sensorsWithFields); err == nil { + r.Sensors = sensorsWithFields + return nil + } + + // If neither format works, return an error + return fmt.Errorf("unsupported sensors format: %s", string(aux.Sensors)) +} + +// MarshalJSON marshals ReadRequestBody to JSON. +func (r *ReadRequestBody) MarshalJSON() ([]byte, error) { + type Alias ReadRequestBody + if r.Sensors == nil { + // If Sensors field is nil, omit it from the JSON output + r.Sensors = []string{} + } + return json.Marshal(&struct{ Alias }{Alias: (Alias)(*r)}) +} + +// MarshalJSON marshals SimpleSensors to JSON. +func (simpleSensors SimpleSensors) MarshalJSON() ([]byte, error) { + return json.Marshal([]string(simpleSensors)) +} + +// MarshalJSON marshals SensorsWithFields to JSON. +func (sensorsWithFields SensorsWithFields) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string][]string(sensorsWithFields)) +} diff --git a/backend/mqtt-preprocessor/go.mod b/backend/mqtt-preprocessor/go.mod index 0571edf..012093b 100644 --- a/backend/mqtt-preprocessor/go.mod +++ b/backend/mqtt-preprocessor/go.mod @@ -5,6 +5,7 @@ go 1.22 require ( github.com/MichalBures-OG/bp-bures-RIoT-commons v0.0.0-00010101000000-000000000000 github.com/eclipse/paho.mqtt.golang v1.4.3 + github.com/google/uuid v1.6.0 ) replace github.com/MichalBures-OG/bp-bures-RIoT-commons => ./../commons diff --git a/backend/mqtt-preprocessor/go.sum b/backend/mqtt-preprocessor/go.sum index bcb6203..6b4606e 100644 --- a/backend/mqtt-preprocessor/go.sum +++ b/backend/mqtt-preprocessor/go.sum @@ -2,6 +2,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= diff --git a/backend/mqtt-preprocessor/src/main.go b/backend/mqtt-preprocessor/src/main.go index bff2c36..f7e6c7a 100644 --- a/backend/mqtt-preprocessor/src/main.go +++ b/backend/mqtt-preprocessor/src/main.go @@ -133,14 +133,31 @@ func processMQTTMessagePayload(mqttMessagePayload []byte, rabbitMQClient rabbitm } messagePayloadObject := jsonDeserializationResult.GetPayload() sd := messagePayloadObject.Data.SDArray[0] + + inputData := sharedModel.InputData{ + Timestamp: messagePayloadObject.Notification.Timestamp, + SDInstanceUID: sd.UID, + SDTypeSpecification: sd.Type, + Parameters: sd.Parameters, + } + jsonSerializationResult := sharedUtils.SerializeToJSON(inputData) + + err := rabbitMQClient.PublishJSONMessage(sharedUtils.NewEmptyOptional[string](), sharedUtils.NewOptionalOf(sharedConstants.TimeSeriesStoreDataQueueName), jsonSerializationResult.GetPayload()) + if err != nil { + log.Println("Failed to publish a time series store request message") + return + } + + log.Printf("Succesfully to published a time series store request message for %s with timestamp %s", inputData.SDInstanceUID, inputData.Timestamp) + if !mqttMessageSDTypeCorrespondsToSDTypeDefinitions(sd.Type) { return } switch determineSDInstanceScenario(sd.UID) { case unknownSDInstance: - generateSDInstanceRegistrationRequest(sd.UID, sd.Type, messagePayloadObject.Notification.Timestamp, rabbitMQClient) + generateSDInstanceRegistrationRequest(sd.UID, sd.Type, float32(messagePayloadObject.Notification.Timestamp), rabbitMQClient) case confirmedSDInstance: - generateKPIFulfillmentCheckRequest(sd.UID, sd.Type, sd.Parameters, messagePayloadObject.Notification.Timestamp, rabbitMQClient) + generateKPIFulfillmentCheckRequest(sd.UID, sd.Type, sd.Parameters, float32(messagePayloadObject.Notification.Timestamp), rabbitMQClient) } } diff --git a/backend/mqtt-preprocessor/src/model/logimic.go b/backend/mqtt-preprocessor/src/model/logimic.go index f273cb7..d8105f1 100644 --- a/backend/mqtt-preprocessor/src/model/logimic.go +++ b/backend/mqtt-preprocessor/src/model/logimic.go @@ -3,7 +3,7 @@ package model type UpstreamMQTTMessageInJSONBasedProprietaryFormatOfLogimic struct { Notification struct { MessageID string `json:"msgId"` - Timestamp float32 `json:"tst"` + Timestamp float64 `json:"tst"` } `json:"ntf"` Data struct { SDArray []struct { diff --git a/backend/time-series-store/Dockerfile b/backend/time-series-store/Dockerfile new file mode 100644 index 0000000..7c2703c --- /dev/null +++ b/backend/time-series-store/Dockerfile @@ -0,0 +1,19 @@ +# Use an official Go runtime as a parent image +FROM golang:1.22-alpine +# Set the working directory inside the container +WORKDIR /app +# Copy the go.mod file, go.sum file, and src directory from time-series-store +COPY ./time-series-store/go.mod ./time-series-store/go.sum /app/time-series-store/ +COPY ./time-series-store/src /app/time-series-store/src +# Copy the go.mod file, go.sum file, and src directory from commons +COPY ./commons/go.mod ./commons/go.sum /app/commons/ +COPY ./commons/src /app/commons/src +# Change working directory to where the Go app's main module is located +WORKDIR /app/time-series-store +# Download all dependencies. Dependencies will be cached if the go.mod and go.sum files are not changed +RUN go mod tidy +RUN go mod download +# Build the Go app +RUN CGO_ENABLED=0 GOOS=linux go build -o RIoT-time-series-store src/main.go +# Run the executable +CMD ["/app/time-series-store/RIoT-time-series-store"] diff --git a/backend/time-series-store/go.mod b/backend/time-series-store/go.mod new file mode 100644 index 0000000..f702561 --- /dev/null +++ b/backend/time-series-store/go.mod @@ -0,0 +1,23 @@ +module github.com/xjohnp00/jiap/backend/shared/time-series-store + +go 1.22 + +toolchain go1.22.4 + +require ( + github.com/MichalBures-OG/bp-bures-RIoT-commons v0.0.0-00010101000000-000000000000 + github.com/influxdata/influxdb-client-go/v2 v2.13.0 + github.com/rabbitmq/amqp091-go v1.10.0 +) + +replace github.com/MichalBures-OG/bp-bures-RIoT-commons => ./../commons + +require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect + golang.org/x/net v0.17.0 // indirect +) + diff --git a/backend/time-series-store/go.sum b/backend/time-series-store/go.sum new file mode 100644 index 0000000..ac0d0c7 --- /dev/null +++ b/backend/time-series-store/go.sum @@ -0,0 +1,33 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= +github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/influxdata/influxdb-client-go/v2 v2.13.0 h1:ioBbLmR5NMbAjP4UVA5r9b5xGjpABD7j65pI8kFphDM= +github.com/influxdata/influxdb-client-go/v2 v2.13.0/go.mod h1:k+spCbt9hcvqvUiz0sr5D8LolXHqAAOfPw9v/RIRHl4= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/backend/time-series-store/src/internal/Influx2Client.go b/backend/time-series-store/src/internal/Influx2Client.go new file mode 100644 index 0000000..566c4aa --- /dev/null +++ b/backend/time-series-store/src/internal/Influx2Client.go @@ -0,0 +1,232 @@ +package internal + +import ( + "context" + "fmt" + sharedModel "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedModel" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedUtils" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "log" + "strings" + "time" +) + +type Influx2Client struct { + endpoint string + organization string + bucket string + client influxdb2.Client + writeApi api.WriteAPIBlocking + queryApi api.QueryAPI +} + +func NewInflux2Client(endpoint string, token string, organization string, bucket string) Influx2Client { + client := influxdb2.NewClientWithOptions(endpoint, token, influxdb2.DefaultOptions().SetBatchSize(20)) + + influx2Client := Influx2Client{ + endpoint: endpoint, + organization: organization, + bucket: bucket, + client: client, + writeApi: client.WriteAPIBlocking(organization, bucket), + queryApi: client.QueryAPI(organization), + } + return influx2Client +} + +func (influx2Client Influx2Client) Query(body sharedModel.ReadRequestBody) sharedUtils.Result[[]sharedModel.OutputData] { + aggregation := createAggregation(body) + timeRange := convertTimeToQueryTimePart(body) + filter := createFilter(body) + imports := "" + + if body.Timezone != "" { + imports = "import \"timezone\"" + } + + query := fmt.Sprintf("%s\n\n"+ // imports + "from(bucket: \"%s\")\n"+ + " %s\n"+ // time range + " %s\n"+ // filter + " %s\n"+ // aggregation + " |> drop(columns: [\"_start\", \"_stop\", \"host\", \"deviceType\"])\n"+ + " |> pivot(columnKey: [\"_field\"], rowKey: [\"_measurement\", \"_time\"], valueColumn: \"_value\")\n"+ + " |> rename(columns: {_time: \"time\", _measurement: \"deviceId\"})\n"+ + " |> group(columns: [\"measurement\"], mode: \"by\")", imports, influx2Client.bucket, timeRange, filter, aggregation) + + log.Println(query) + + result, err := influx2Client.queryApi.Query(context.Background(), query) + + if err != nil { + return sharedUtils.NewFailureResult[[]sharedModel.OutputData](err) + } + + outputData := []sharedModel.OutputData{} + + if result.Err() != nil { + return sharedUtils.NewFailureResult[[]sharedModel.OutputData](result.Err()) + } + + for result.Next() { + outputData = append(outputData, mapToOutputData(result.Record().Values())) + } + + return sharedUtils.NewSuccessResult[[]sharedModel.OutputData](outputData) +} + +func (influx2Client Influx2Client) Write(data sharedModel.InputData) { + log.Printf("Writing %s with ts %s received ts %f\n", data.SDInstanceUID, time.Unix(int64(data.Timestamp), 0), data.Timestamp) + if parameters, ok := data.Parameters.(map[string]interface{}); ok { + // Drop nil / null values + for key, value := range parameters { + if value == nil { + log.Printf("Dropping %s because it was nil / null", key) + delete(parameters, key) + } + } + + point := influxdb2.NewPoint(data.SDInstanceUID, map[string]string{"deviceType": data.SDTypeSpecification}, parameters, time.Unix(int64(data.Timestamp), 0)) + + ctx, cancelFunction := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelFunction() + + err := influx2Client.writeApi.WritePoint(ctx, point) + if err != nil { + log.Printf("Writing %s failed with %s\n", data.SDInstanceUID, err) + } + + err = influx2Client.writeApi.Flush(ctx) + if err != nil { + log.Printf("Writing %s failed with %s\n", data.SDInstanceUID, err) + } + + } else { + log.Println("parameterAsAny is not a map[string]interface{}") + } +} + +func (influx2Client Influx2Client) Close() { + influx2Client.client.Close() +} + +func convertTimeToQueryTimePart(body sharedModel.ReadRequestBody) string { + if body.From != nil && body.To == nil { + currentDate := time.Now() + + currentDateString := currentDate.Format(time.RFC3339) + + return fmt.Sprintf("|> range(start: %s, stop: %s)", body.From.Format(time.RFC3339), currentDateString) + } + + if body.From == nil && body.To != nil { + thirtyDaysAgo := body.To.AddDate(0, 0, -30) + + thirtyDaysAgoString := thirtyDaysAgo.Format(time.RFC3339) + + return fmt.Sprintf("|> range(start: %s, stop: %s)", thirtyDaysAgoString, body.To.Format(time.RFC3339)) + } + + if body.From == nil && body.To == nil { + currentDate := time.Now() + thirtyDaysAgo := currentDate.AddDate(0, 0, -30) + + currentDateString := currentDate.Format(time.RFC3339) + thirtyDaysAgoString := thirtyDaysAgo.Format(time.RFC3339) + + return fmt.Sprintf("|> range(start: %s, stop: %s)", thirtyDaysAgoString, currentDateString) + } + + return fmt.Sprintf("|> range(start: %s, stop: %s)", body.From.Format(time.RFC3339), body.To.Format(time.RFC3339)) +} + +func createFilter(body sharedModel.ReadRequestBody) string { + var filterStrings []string + + log.Printf("Getting sensors: %s\n", body.Sensors) + if sharedModel.AreSimpleSensors(body.Sensors) { + simpleSensors := body.Sensors.(sharedModel.SimpleSensors) + for _, sensor := range simpleSensors { + filterStrings = append(filterStrings, fmt.Sprintf(`r["_measurement"] == "%s"`, sensor)) + } + } else { + sensorsWithFields := body.Sensors.(sharedModel.SensorsWithFields) + for sensor, fields := range sensorsWithFields { + var fieldConditions []string + for _, field := range fields { + fieldConditions = append(fieldConditions, fmt.Sprintf(`r["_field"] == "%s"`, field)) + } + fieldCondition := strings.Join(fieldConditions, " or ") + if fieldCondition != "" { + fieldCondition = " and (" + fieldCondition + ")" + } + filterStrings = append(filterStrings, fmt.Sprintf(`(r["_measurement"] == "%s"%s)`, sensor, fieldCondition)) + } + } + + filter := strings.Join(filterStrings, " or ") + + return fmt.Sprintf("|> filter(fn: (r) => %s)", filter) +} + +func createAggregation(body sharedModel.ReadRequestBody) string { + aggregation := "" + + if body.Operation != "" { + zone := "" + + if body.AggregateMinutes == 0 { + body.AggregateMinutes = 10 + } + + if body.Timezone != "" { + zone = fmt.Sprintf(", location: timezone.location(name: \"%s\")", body.Timezone) + } + + aggregation = fmt.Sprintf("|> aggregateWindow(every: %dm, fn: %s, createEmpty: false%s)", body.AggregateMinutes, body.Operation, zone) + } + + return aggregation +} + +// mapToOutputData converts a map[string]interface{} to an OutputData struct. +func mapToOutputData(influxOutput map[string]interface{}) sharedModel.OutputData { + outputData := sharedModel.OutputData{ + Result: "", + Table: influxOutput["table"].(int64), + Time: influxOutput["time"].(time.Time), + DeviceID: influxOutput["deviceId"].(string), + DeviceType: "", + } + + if value, exists := influxOutput["deviceType"]; exists { + outputData.DeviceType = value.(string) + delete(influxOutput, "deviceType") + } else { + outputData.DeviceType = "" + } + + if value, exists := influxOutput["result"]; exists { + outputData.DeviceType = value.(string) + delete(influxOutput, "result") + } else { + outputData.DeviceType = "" + } + + for key, value := range influxOutput { + if value == nil { + delete(influxOutput, key) + } + } + + delete(influxOutput, "result") + delete(influxOutput, "table") + delete(influxOutput, "time") + delete(influxOutput, "deviceId") + delete(influxOutput, "host") + + outputData.Data = influxOutput + + return outputData +} diff --git a/backend/time-series-store/src/internal/TimeSeriesStoreEnvironment.go b/backend/time-series-store/src/internal/TimeSeriesStoreEnvironment.go new file mode 100644 index 0000000..d7cdc68 --- /dev/null +++ b/backend/time-series-store/src/internal/TimeSeriesStoreEnvironment.go @@ -0,0 +1,9 @@ +package internal + +type TimeSeriesStoreEnvironment struct { + InfluxToken string + InfluxUrl string + InfluxOrg string + InfluxBucket string + AmqpURLValue string +} diff --git a/backend/time-series-store/src/main.go b/backend/time-series-store/src/main.go new file mode 100644 index 0000000..8a512f0 --- /dev/null +++ b/backend/time-series-store/src/main.go @@ -0,0 +1,167 @@ +package main + +import ( + "encoding/json" + "fmt" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/rabbitmq" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedConstants" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedModel" + "github.com/MichalBures-OG/bp-bures-RIoT-commons/src/sharedUtils" + amqp "github.com/rabbitmq/amqp091-go" + "github.com/xjohnp00/jiap/backend/shared/time-series-store/src/internal" + "log" + "net/url" + "os" + "time" +) + +func main() { + hasError, environment := parseParameters() + + if hasError { + os.Exit(1) + } + + log.Println("Waiting for dependencies...") + rawBackendCoreURL := sharedUtils.GetEnvironmentVariableValue("BACKEND_CORE_URL").GetPayloadOrDefault("http://riot-backend-core:9090") + parsedBackendCoreURL, err := url.Parse(rawBackendCoreURL) + sharedUtils.TerminateOnError(err, fmt.Sprintf("Unable to parse the backend-core URL: %s", rawBackendCoreURL)) + + parsedInfluxURL, err := url.Parse(environment.InfluxUrl) + sharedUtils.TerminateOnError(err, fmt.Sprintf("Unable to parse the InfluxDB URL: %s", environment.InfluxUrl)) + + parsedRabbitMQURL, err := url.Parse(environment.AmqpURLValue) + sharedUtils.TerminateOnError(err, fmt.Sprintf("Unable to parse the RabbitMQ URL: %s", environment.AmqpURLValue)) + + sharedUtils.TerminateOnError(sharedUtils.WaitForDSs(time.Minute, + sharedUtils.NewPairOf(parsedBackendCoreURL.Hostname(), parsedBackendCoreURL.Port()), + sharedUtils.NewPairOf(parsedInfluxURL.Hostname(), parsedInfluxURL.Port()), + sharedUtils.NewPairOf(parsedRabbitMQURL.Hostname(), parsedRabbitMQURL.Port()), + ), "Some dependencies of this application are inaccessible") + log.Println("Dependencies should be up and running...") + + log.Println("Time Series Store Starting") + + influx := internal.NewInflux2Client(environment.InfluxUrl, environment.InfluxToken, environment.InfluxOrg, environment.InfluxBucket) + + rabbitMQClient := rabbitmq.NewClient() + defer rabbitMQClient.Dispose() + defer influx.Close() + + log.Println("Time Series Store Ready") + + sharedUtils.WaitForAll( + func() { + err := consumeInputMessages(rabbitMQClient, influx) + + if err != nil { + log.Println(err.Error()) + } + }, + func() { + err := consumeReadRequests(rabbitMQClient, influx) + + if err != nil { + log.Println(err.Error()) + } + }, + ) +} + +func consumeInputMessages(rabbitMQClient rabbitmq.Client, influx internal.Influx2Client) error { + err := rabbitmq.ConsumeJSONMessages[sharedModel.InputData](rabbitMQClient, sharedConstants.TimeSeriesStoreDataQueueName, func(messagePayload sharedModel.InputData) error { + log.Printf("Writing: %s \n", messagePayload.SDInstanceUID) + influx.Write(messagePayload) + return nil + }) + return err +} + +func consumeReadRequests(rabbitMQClient rabbitmq.Client, influx internal.Influx2Client) error { + err := rabbitmq.ConsumeJSONMessagesWithAccessToDelivery[sharedModel.ReadRequestBody]( + rabbitMQClient, + sharedConstants.TimeSeriesReadRequestQueueName, + "", + func(readRequestBody sharedModel.ReadRequestBody, delivery amqp.Delivery) error { + result := influx.Query(readRequestBody) + + if result.IsFailure() { + log.Println(result.GetError()) + } + + responseWithData := sharedModel.ReadRequestResponseOrError{ + Data: nil, + Error: "", + } + + if result.IsSuccess() { + responseWithData.Data = result.GetPayload() + } + + if result.IsFailure() { + responseWithData.Error = result.GetError().Error() + } + + jsonData, err := json.Marshal(responseWithData) + + if err != nil { + log.Printf("Error During Marshall: %s", err) + } + + err = rabbitMQClient.PublishJSONMessageRPC( + sharedUtils.NewEmptyOptional[string](), + sharedUtils.NewOptionalOf(delivery.ReplyTo), + jsonData, + delivery.CorrelationId, + sharedUtils.NewEmptyOptional[string](), + ) + + if err != nil { + log.Fatalf("Error: %s", err) + return err + } + return nil + }, + ) + return err +} + +func parseParameters() (bool, internal.TimeSeriesStoreEnvironment) { + token := sharedUtils.GetEnvironmentVariableValue("INFLUX_TOKEN") + url := sharedUtils.GetEnvironmentVariableValue("INFLUX_URL") + org := sharedUtils.GetEnvironmentVariableValue("INFLUX_ORGANIZATION") + bucket := sharedUtils.GetEnvironmentVariableValue("INFLUX_BUCKET") + ampqUrl := sharedUtils.GetEnvironmentVariableValue("RABBITMQ_URL") + + hasError := token.IsEmpty() || url.IsEmpty() || org.IsEmpty() || bucket.IsEmpty() || ampqUrl.IsEmpty() + + if token.IsEmpty() { + log.Fatalln("Empty token") + } + + if url.IsEmpty() { + log.Fatalln("Empty url") + } + + if org.IsEmpty() { + log.Fatalln("Empty org") + } + + if bucket.IsEmpty() { + log.Fatalln("Empty bucket") + } + + if ampqUrl.IsEmpty() { + log.Fatalln("Empty ampqUrl") + } + + environment := sharedUtils.Ternary[internal.TimeSeriesStoreEnvironment](!hasError, internal.TimeSeriesStoreEnvironment{ + InfluxToken: token.GetPayload(), + InfluxUrl: url.GetPayload(), + InfluxOrg: org.GetPayload(), + InfluxBucket: bucket.GetPayload(), + AmqpURLValue: ampqUrl.GetPayload(), + }, internal.TimeSeriesStoreEnvironment{}) + + return hasError, environment +} diff --git a/docker-compose.yml b/docker-compose.yml index 5bca127..1f54fac 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: '2' - services: riot-frontend: @@ -71,7 +69,7 @@ services: - ./docker/mosquitto-log:/mosquitto/log postgres: - image: postgres:latest + image: postgres:16 environment: POSTGRES_USER: ${POSTGRES_USER} POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} @@ -89,6 +87,39 @@ services: ports: - "8081:80" + influxdb: + image: influxdb:2.7-alpine + restart: unless-stopped + ports: + - "8086:8086" + environment: + - DOCKER_INFLUXDB_INIT_MODE=setup + - DOCKER_INFLUXDB_INIT_USERNAME=user + - DOCKER_INFLUXDB_INIT_PASSWORD=password + - DOCKER_INFLUXDB_INIT_ORG=jiap + - DOCKER_INFLUXDB_INIT_BUCKET=jiap-time-series + - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=Fgp2ozMxmkYnUBkzwLpkx6ydOVXyQqF4-ZPctGjv8-xkirYPYRvoBtrpAHMCr_joYoJMOqZjl8djjuyOx-MR_A== + healthcheck: + test: [ "CMD", "influx", "ping" ] + interval: 5s + timeout: 5s + retries: 5 + + riot-time-series-store: + build: + context: ./backend + dockerfile: time-series-store/Dockerfile + environment: + BACKEND_CORE_URL: ${BACKEND_CORE_URL} + RABBITMQ_URL: ${RABBITMQ_URL} + INFLUX_ORGANIZATION: jiap + INFLUX_BUCKET: jiap-time-series + INFLUX_URL: http://influxdb:8086 + INFLUX_TOKEN: Fgp2ozMxmkYnUBkzwLpkx6ydOVXyQqF4-ZPctGjv8-xkirYPYRvoBtrpAHMCr_joYoJMOqZjl8djjuyOx-MR_A== + depends_on: + - rabbitmq + - riot-backend-core # 'Backend-core' has to set up the 'RabbitMQ infrastructure' + rabbitmq: image: rabbitmq:3-management ports: @@ -98,6 +129,7 @@ services: grafana: image: grafana/grafana-enterprise + profiles: ["dev"] volumes: - ./docker/grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards - ./docker/grafana/provisioning/datasources:/etc/grafana/provisioning/datasources @@ -110,12 +142,14 @@ services: prometheus: image: prom/prometheus + profiles: ["dev"] volumes: - "./docker/prometheus.yml:/etc/prometheus/prometheus.yml" ports: - "9091:9090" mqtt-prometheus-exporter: + profiles: ["dev"] image: kpetrem/mqtt-exporter ports: - "9000:9000" diff --git a/frontend/src/generated/graphql.ts b/frontend/src/generated/graphql.ts index 5e41dde..27fe53f 100644 --- a/frontend/src/generated/graphql.ts +++ b/frontend/src/generated/graphql.ts @@ -12,6 +12,8 @@ export type Scalars = { Boolean: { input: boolean; output: boolean; } Int: { input: number; output: number; } Float: { input: number; output: number; } + Date: { input: any; output: any; } + StatisticsParameterValue: { input: any; output: any; } }; export type AtomKpiNode = { @@ -32,6 +34,13 @@ export type BooleanEqAtomKpiNode = AtomKpiNode & KpiNode & { sdParameterSpecification: Scalars['String']['output']; }; +export type InputData = { + data: StatisticsFieldInput; + deviceId: Scalars['String']['input']; + deviceType?: InputMaybe; + time: Scalars['Date']['input']; +}; + export type KpiDefinition = { __typename?: 'KPIDefinition'; id: Scalars['ID']['output']; @@ -115,6 +124,7 @@ export type Mutation = { deleteKPIDefinition: Scalars['Boolean']['output']; deleteSDInstanceGroup: Scalars['Boolean']['output']; deleteSDType: Scalars['Boolean']['output']; + statisticsMutate: Scalars['Boolean']['output']; updateKPIDefinition: KpiDefinition; updateSDInstance: SdInstance; updateSDInstanceGroup: SdInstanceGroup; @@ -151,6 +161,11 @@ export type MutationDeleteSdTypeArgs = { }; +export type MutationStatisticsMutateArgs = { + inputData: InputData; +}; + + export type MutationUpdateKpiDefinitionArgs = { id: Scalars['ID']['input']; input: KpiDefinitionInput; @@ -218,6 +233,14 @@ export type NumericLtAtomKpiNode = AtomKpiNode & KpiNode & { sdParameterSpecification: Scalars['String']['output']; }; +export type OutputData = { + __typename?: 'OutputData'; + data: StatisticsField; + deviceId: Scalars['String']['output']; + deviceType?: Maybe; + time: Scalars['Date']['output']; +}; + export type Query = { __typename?: 'Query'; kpiDefinition: KpiDefinition; @@ -228,6 +251,8 @@ export type Query = { sdInstances: Array; sdType: SdType; sdTypes: Array; + statisticsQuerySensorsWithFields: Array; + statisticsQuerySimpleSensors: Array; }; @@ -245,6 +270,18 @@ export type QuerySdTypeArgs = { id: Scalars['ID']['input']; }; + +export type QueryStatisticsQuerySensorsWithFieldsArgs = { + request?: InputMaybe; + sensors: SensorsWithFields; +}; + + +export type QueryStatisticsQuerySimpleSensorsArgs = { + request?: InputMaybe; + sensors: SimpleSensors; +}; + export type SdInstance = { __typename?: 'SDInstance'; confirmedByUser: Scalars['Boolean']['output']; @@ -306,6 +343,72 @@ export type SdTypeInput = { parameters: Array; }; +export type SensorField = { + key: Scalars['String']['input']; + values: Array; +}; + +export type SensorsWithFields = { + sensors: Array; +}; + +export type SimpleSensors = { + sensors: Array; +}; + +export type StatisticsField = { + __typename?: 'StatisticsField'; + key: Scalars['String']['output']; + value: Scalars['StatisticsParameterValue']['output']; +}; + +export type StatisticsFieldInput = { + key: Scalars['String']['input']; + value: Scalars['StatisticsParameterValue']['input']; +}; + +/** Data used for querying the selected bucket */ +export type StatisticsInput = { + /** + * Amount of minutes to aggregate by + * For example if the queried range has 1 hour and aggregateMinutes is set to 10 the aggregation will result in 6 points + */ + aggregateMinutes?: InputMaybe; + /** Start of the querying window */ + from?: InputMaybe; + /** Aggregation operator to use, if needed */ + operation?: InputMaybe; + /** + * Timezone override default UTC. + * For more details why and how this affects queries see: https://www.influxdata.com/blog/time-zones-in-flux/. + * In most cases you can ignore this and some edge aggregations can be influenced. + * If you need a precise result or the aggregation uses high amount of minutes provide the target time zone. + */ + timezone?: InputMaybe; + /** End of the querying window */ + to?: InputMaybe; +}; + +export enum StatisticsOperation { + Count = 'COUNT', + First = 'FIRST', + Integral = 'INTEGRAL', + Last = 'LAST', + Max = 'MAX', + Mean = 'MEAN', + Median = 'MEDIAN', + Min = 'MIN', + Mode = 'MODE', + None = 'NONE', + Quantile = 'QUANTILE', + Reduce = 'REDUCE', + Skew = 'SKEW', + Spread = 'SPREAD', + Stddev = 'STDDEV', + Sum = 'SUM', + Timeweightedavg = 'TIMEWEIGHTEDAVG' +} + export type StringEqAtomKpiNode = AtomKpiNode & KpiNode & { __typename?: 'StringEQAtomKPINode'; id: Scalars['ID']['output'];