@@ -139,6 +139,7 @@ var (
139
139
pdStoresURI = "pd/api/v1/stores"
140
140
pdStoresLimitURI = "pd/api/v1/stores/limit"
141
141
pdRegionsCheckURI = "pd/api/v1/regions/check"
142
+ pdServicePrimaryURI = "pd/api/v2/ms/primary"
142
143
)
143
144
144
145
func tryURLs (endpoints []string , f func (endpoint string ) ([]byte , error )) ([]byte , error ) {
@@ -978,3 +979,118 @@ func (pc *PDClient) SetAllStoreLimits(value int) error {
978
979
pc .l ().Debugf ("setting store limit: %d" , value )
979
980
return pc .updateConfig (pdStoresLimitURI , bytes .NewBuffer (body ))
980
981
}
982
+
983
+ // GetServicePrimary queries for the primary of a service
984
+ func (pc * PDClient ) GetServicePrimary (service string ) (string , error ) {
985
+ endpoints := pc .getEndpoints (fmt .Sprintf ("%s/%s" , pdServicePrimaryURI , service ))
986
+
987
+ var primary string
988
+ _ , err := tryURLs (endpoints , func (endpoint string ) ([]byte , error ) {
989
+ body , err := pc .httpClient .Get (pc .ctx , endpoint )
990
+ if err != nil {
991
+ return body , err
992
+ }
993
+
994
+ return body , json .Unmarshal (body , & primary )
995
+ })
996
+ return primary , err
997
+ }
998
+
999
+ const (
1000
+ tsoStatusURI = "status"
1001
+ )
1002
+
1003
+ // TSOClient is an HTTP client of the TSO server
1004
+ type TSOClient struct {
1005
+ version string
1006
+ addrs []string
1007
+ tlsEnabled bool
1008
+ httpClient * utils.HTTPClient
1009
+ ctx context.Context
1010
+ }
1011
+
1012
+ // NewTSOClient returns a new TSOClient, the context must have
1013
+ // a *logprinter.Logger as value of "logger"
1014
+ func NewTSOClient (
1015
+ ctx context.Context ,
1016
+ addrs []string ,
1017
+ timeout time.Duration ,
1018
+ tlsConfig * tls.Config ,
1019
+ ) * TSOClient {
1020
+ enableTLS := false
1021
+ if tlsConfig != nil {
1022
+ enableTLS = true
1023
+ }
1024
+
1025
+ if _ , ok := ctx .Value (logprinter .ContextKeyLogger ).(* logprinter.Logger ); ! ok {
1026
+ panic ("the context must have logger inside" )
1027
+ }
1028
+
1029
+ cli := & TSOClient {
1030
+ addrs : addrs ,
1031
+ tlsEnabled : enableTLS ,
1032
+ httpClient : utils .NewHTTPClient (timeout , tlsConfig ),
1033
+ ctx : ctx ,
1034
+ }
1035
+
1036
+ cli .tryIdentifyVersion ()
1037
+ return cli
1038
+ }
1039
+
1040
+ // func (tc *TSOClient) l() *logprinter.Logger {
1041
+ // return tc.ctx.Value(logprinter.ContextKeyLogger).(*logprinter.Logger)
1042
+ // }
1043
+
1044
+ func (tc * TSOClient ) tryIdentifyVersion () {
1045
+ endpoints := tc .getEndpoints (tsoStatusURI )
1046
+ response := map [string ]string {}
1047
+ _ , err := tryURLs (endpoints , func (endpoint string ) ([]byte , error ) {
1048
+ body , err := tc .httpClient .Get (tc .ctx , endpoint )
1049
+ if err != nil {
1050
+ return body , err
1051
+ }
1052
+
1053
+ return body , json .Unmarshal (body , & response )
1054
+ })
1055
+ if err == nil {
1056
+ tc .version = response ["version" ]
1057
+ }
1058
+ }
1059
+
1060
+ // GetURL builds the client URL of PDClient
1061
+ func (tc * TSOClient ) GetURL (addr string ) string {
1062
+ httpPrefix := "http"
1063
+ if tc .tlsEnabled {
1064
+ httpPrefix = "https"
1065
+ }
1066
+ return fmt .Sprintf ("%s://%s" , httpPrefix , addr )
1067
+ }
1068
+
1069
+ func (tc * TSOClient ) getEndpoints (uri string ) (endpoints []string ) {
1070
+ for _ , addr := range tc .addrs {
1071
+ endpoint := fmt .Sprintf ("%s/%s" , tc .GetURL (addr ), uri )
1072
+ endpoints = append (endpoints , endpoint )
1073
+ }
1074
+
1075
+ return
1076
+ }
1077
+
1078
+ // CheckHealth checks the health of TSO node.
1079
+ func (tc * TSOClient ) CheckHealth () error {
1080
+ endpoints := tc .getEndpoints (tsoStatusURI )
1081
+
1082
+ _ , err := tryURLs (endpoints , func (endpoint string ) ([]byte , error ) {
1083
+ body , err := tc .httpClient .Get (tc .ctx , endpoint )
1084
+ if err != nil {
1085
+ return body , err
1086
+ }
1087
+
1088
+ return body , nil
1089
+ })
1090
+
1091
+ if err != nil {
1092
+ return err
1093
+ }
1094
+
1095
+ return nil
1096
+ }
0 commit comments