diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 3870158d5..b3dcbf6af 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -314,6 +314,7 @@ linux_kubernetes_operator_PVC: when: always except: - schedules + allow_failure: true linux_deletion_simple: stage: test @@ -479,8 +480,9 @@ trigger_e2e_test: script: - echo - curl -X POST -F "token=$E2E_TRIGGER_TOKEN" -F "ref=master" -F "variables[DOTMESH_CI_COMMIT_SHA]=$CI_COMMIT_SHA" -F "variables[DOTMESH_CI_BUILD_REF_NAME]=$CI_BUILD_REF_NAME" http://gitlab.dotmesh.io:9999/api/v4/projects/31/trigger/pipeline - curl -X POST -F "token=$E2E_TRIGGER_TOKEN" -F "ref=master" -F "variables[DOTMESH_CI_COMMIT_SHA]=$CI_COMMIT_SHA" -F "variables[DOTMESH_CI_BUILD_REF_NAME]=$CI_BUILD_REF_NAME" http://gitlab.dotmesh.io:9999/api/v4/projects/31/trigger/pipeline + only: + - master except: - - /^release-.*$/ - schedules deploy_unstable_build: diff --git a/cmd/dm/pkg/commands/cluster.go b/cmd/dm/pkg/commands/cluster.go index 569e658fa..b44077742 100644 --- a/cmd/dm/pkg/commands/cluster.go +++ b/cmd/dm/pkg/commands/cluster.go @@ -67,6 +67,7 @@ var ( usePoolDir string usePoolName string discoveryUrl string + port int ) // names of environment variables we pass from the content of `dm cluster {init,join}` @@ -250,6 +251,11 @@ func NewCmdClusterInit(out io.Writer) *cobra.Command { &serverCount, "count", 1, "Initial cluster size", ) + // TODO: need to block using 32608, and probably block anything lower than 32xx for host port reasons? + cmd.Flags().IntVar( + &port, "port", 0, + "Port to run cluster on", + ) return cmd } @@ -663,6 +669,10 @@ func startDotmeshContainer(pkiPath string) error { "-e", fmt.Sprintf("DOTMESH_UPGRADES_URL=%s", checkpointUrl), "-e", fmt.Sprintf("DOTMESH_UPGRADES_INTERVAL_SECONDS=%d", checkpointInterval), } + if port != 0 { + args = append(args, "-e") + args = append(args, fmt.Sprintf("DOTMESH_SERVER_PORT=%d", port)) + } // inject the inherited env variables from the context of the dm binary into require_zfs.sh for _, envName := range inheritedEnvironment { @@ -877,7 +887,7 @@ func clusterCommonSetup(clusterUrl, adminPassword, adminKey, pkiPath string) err return err } } - err = config.AddRemote("local", "admin", getHostFromEnv(), 0, adminKey) + err = config.AddRemote("local", "admin", getHostFromEnv(), port, adminKey) if err != nil { return err } @@ -913,6 +923,7 @@ func clusterCommonSetup(clusterUrl, adminPassword, adminKey, pkiPath string) err time.Sleep(250 * time.Millisecond) } if err != nil { + fmt.Printf("Errored creating api") e() return false } diff --git a/cmd/dm/pkg/commands/dot.go b/cmd/dm/pkg/commands/dot.go index 21dcffe0b..ff2cd95ce 100644 --- a/cmd/dm/pkg/commands/dot.go +++ b/cmd/dm/pkg/commands/dot.go @@ -238,17 +238,17 @@ func dotShow(cmd *cobra.Command, args []string, out io.Writer) error { return err } - var localDot string + var qualifiedDotName string if len(args) == 1 { - localDot = args[0] + qualifiedDotName = args[0] } else { - localDot, err = dm.CurrentVolume() + qualifiedDotName, err = dm.CurrentVolume() if err != nil { return err } } - namespace, dot, err := remotes.ParseNamespacedVolume(localDot) + namespace, dot, err := remotes.ParseNamespacedVolume(qualifiedDotName) if err != nil { return err } @@ -307,12 +307,12 @@ func dotShow(cmd *cobra.Command, args []string, out io.Writer) error { } } - currentBranch, err := dm.CurrentBranch(localDot) + currentBranch, err := dm.CurrentBranch(qualifiedDotName) if err != nil { return err } - bs, err := dm.AllBranches(localDot) + bs, err := dm.AllBranches(qualifiedDotName) if err != nil { return err } @@ -329,7 +329,7 @@ func dotShow(cmd *cobra.Command, args []string, out io.Writer) error { if branch == "master" { branchDot = masterDot } else { - branchDot, err = dm.BranchInfo(namespace, localDot, branch) + branchDot, err = dm.BranchInfo(namespace, dot, branch) if err != nil { return err } @@ -371,7 +371,7 @@ func dotShow(cmd *cobra.Command, args []string, out io.Writer) error { branchInternalName = "" } - latency, err := dm.GetReplicationLatencyForBranch(localDot, branchInternalName) + latency, err := dm.GetReplicationLatencyForBranch(qualifiedDotName, branchInternalName) if err != nil { fmt.Fprintf(out, "unable to fetch replication status (%s), proceeding...\n", err) } else { diff --git a/cmd/dm/pkg/commands/main.go b/cmd/dm/pkg/commands/main.go index e43b5ce04..31795b5e1 100644 --- a/cmd/dm/pkg/commands/main.go +++ b/cmd/dm/pkg/commands/main.go @@ -165,7 +165,6 @@ func NewCmdRemote(out io.Writer) *cobra.Command { if err != nil { return err } - fmt.Printf("dm client %#v\n", dm) // allow this to be used be a script apiKey := os.Getenv("DOTMESH_PASSWORD") if apiKey == "" { diff --git a/cmd/dm/pkg/remotes/api.go b/cmd/dm/pkg/remotes/api.go index 62d0b35c4..79be72fa9 100644 --- a/cmd/dm/pkg/remotes/api.go +++ b/cmd/dm/pkg/remotes/api.go @@ -2,16 +2,16 @@ package remotes import ( "fmt" - "golang.org/x/net/context" - "gopkg.in/cheggaaa/pb.v1" "io" - "log" "os" "reflect" "regexp" "sort" "strings" "time" + + "golang.org/x/net/context" + "gopkg.in/cheggaaa/pb.v1" ) const DEFAULT_BRANCH string = "master" @@ -761,9 +761,6 @@ func (dm *DotmeshAPI) PollTransfer(transferId string, out io.Writer) error { } } - if debugMode { - log.Printf("\nGot DotmeshRPC.GetTransfer response: %+v", result) - } if !started { bar = pb.New64(result.Size) bar.ShowFinalTime = false @@ -772,6 +769,10 @@ func (dm *DotmeshAPI) PollTransfer(transferId string, out io.Writer) error { bar.Start() started = true } + + if result.Size != 0 { + bar.Total = result.Size + } // Numbers reported by data transferred thru dotmesh versus size // of stream reported by 'zfs send -nP' are off by a few kilobytes, // fudge it (maybe no one will notice). @@ -843,6 +844,7 @@ push type TransferRequest struct { Peer string User string + Port int ApiKey string Direction string LocalNamespace string @@ -986,6 +988,7 @@ func (dm *DotmeshAPI) RequestTransfer( "DotmeshRPC.Transfer", TransferRequest{ Peer: remote.Hostname, User: remote.User, + Port: remote.Port, ApiKey: remote.ApiKey, Direction: direction, LocalNamespace: localNamespace, diff --git a/cmd/dotmesh-server/pkg/dind-flexvolume/flexvolume.go b/cmd/dotmesh-server/pkg/dind-flexvolume/flexvolume.go index c14ed20a4..b00437b3e 100644 --- a/cmd/dotmesh-server/pkg/dind-flexvolume/flexvolume.go +++ b/cmd/dotmesh-server/pkg/dind-flexvolume/flexvolume.go @@ -38,7 +38,7 @@ var flexVolumeDebug = false const DIND_SHARED_FOLDER = "/dotmesh-test-pools/dind-flexvolume" func System(cmd string, args ...string) error { - log.Printf("[system] running %s %s", cmd, args) + logger.Printf("[system] running %s %s", cmd, args) c := exec.Command(cmd, args...) c.Stdout = os.Stdout c.Stderr = os.Stderr diff --git a/cmd/dotmesh-server/pkg/main/client.go b/cmd/dotmesh-server/pkg/main/client.go index 213caa001..9a80d1818 100644 --- a/cmd/dotmesh-server/pkg/main/client.go +++ b/cmd/dotmesh-server/pkg/main/client.go @@ -19,13 +19,15 @@ type JsonRpcClient struct { User string Hostname string ApiKey string + Port int } -func NewJsonRpcClient(user, hostname, apiKey string) *JsonRpcClient { +func NewJsonRpcClient(user, hostname, apiKey string, port int) *JsonRpcClient { return &JsonRpcClient{ User: user, Hostname: hostname, ApiKey: apiKey, + Port: port, } } @@ -35,9 +37,15 @@ func (j *JsonRpcClient) CallRemote( ctx context.Context, method string, args interface{}, result interface{}, ) error { // RPCs are always between clusters, so "external" - url, err := deduceUrl(ctx, []string{j.Hostname}, "external", j.User, j.ApiKey) - if err != nil { - return err + var url string + var err error + if j.Port == 0 { + url, err = deduceUrl(ctx, []string{j.Hostname}, "external", j.User, j.ApiKey) + if err != nil { + return err + } + } else { + url = fmt.Sprintf("http://%s:%d", j.Hostname, j.Port) } url = fmt.Sprintf("%s/rpc", url) return j.reallyCallRemote(ctx, method, args, result, url) diff --git a/cmd/dotmesh-server/pkg/main/controller.go b/cmd/dotmesh-server/pkg/main/controller.go index 8d96a7853..d078197d2 100644 --- a/cmd/dotmesh-server/pkg/main/controller.go +++ b/cmd/dotmesh-server/pkg/main/controller.go @@ -50,8 +50,8 @@ func NewInMemoryState(localPoolId string, config Config) *InMemoryState { // a sort of global event bus for filesystems getting new snapshots on // their masters, keyed on filesystem name, which interested parties // such as slaves for that filesystem may subscribe to - newSnapsOnMaster: NewObserver(), - localReceiveProgress: NewObserver(), + newSnapsOnMaster: NewObserver("newSnapsOnMaster"), + localReceiveProgress: NewObserver("localReceiveProgress"), // containers that are running with dotmesh volumes by filesystem id containers: d, containersLock: &sync.Mutex{}, @@ -139,13 +139,13 @@ func (s *InMemoryState) alignMountStateWithMasters(filesystemId string) error { fs, ok := (*s.filesystems)[filesystemId] if !ok { log.Printf( - "[maybeMountFilesystem] not doing anything - cannot find %v in fsMachines", + "[alignMountStateWithMasters] not doing anything - cannot find %v in fsMachines", filesystemId, ) return nil, false, fmt.Errorf("cannot find %v in fsMachines", filesystemId) } log.Printf( - "[maybeMountFilesystem] called for %v; masterFor=%v, myNodeId=%v; mounted=%b", + "[alignMountStateWithMasters] called for %v; masterFor=%v, myNodeId=%v; mounted=%b", filesystemId, s.masterFor(filesystemId), s.myNodeId, diff --git a/cmd/dotmesh-server/pkg/main/etcd.go b/cmd/dotmesh-server/pkg/main/etcd.go index 1a461da94..ff1a1ca65 100644 --- a/cmd/dotmesh-server/pkg/main/etcd.go +++ b/cmd/dotmesh-server/pkg/main/etcd.go @@ -21,48 +21,11 @@ import ( // etcd related pieces, including the parts of InMemoryState which interact with etcd -func getEtcdKeysApi() (client.KeysAPI, error) { - c, err := getEtcd() - if err != nil { - return nil, err - } - return client.NewKeysAPI(c), nil -} - -var etcdClient client.Client -var once Once -var onceAgain Once - -func transportFromTLS(certFile, keyFile, caFile string) (*http.Transport, error) { - // Load client cert - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - return nil, err - } - - // Load CA cert - caCert, err := ioutil.ReadFile(caFile) - if err != nil { - return nil, err - } - caCertPool := x509.NewCertPool() - caCertPool.AppendCertsFromPEM(caCert) +var etcdKeysAPI client.KeysAPI +var etcdConnectionOnce Once - // Setup HTTPS client - tlsConfig := &tls.Config{ - Certificates: []tls.Certificate{cert}, - RootCAs: caCertPool, - } - tlsConfig.BuildNameToCertificate() - transport := &http.Transport{ - TLSClientConfig: tlsConfig, - } - return transport, nil -} - -// TODO maybe connection pooling -func getEtcd() (client.Client, error) { - once.Do(func() { +func getEtcdKeysApi() (client.KeysAPI, error) { + etcdConnectionOnce.Do(func() { var err error endpoint := os.Getenv("DOTMESH_ETCD_ENDPOINT") if endpoint == "" { @@ -92,14 +55,43 @@ func getEtcd() (client.Client, error) { // unavailable HeaderTimeoutPerRequest: time.Second * 10, } - etcdClient, err = client.New(cfg) + etcdClient, err := client.New(cfg) if err != nil { // maybe retry, instead of ending it all panic(err) } + etcdKeysAPI = client.NewKeysAPI(etcdClient) }) - // TODO: change signature, never errors - return etcdClient, nil + return etcdKeysAPI, nil +} + +var onceAgain Once + +func transportFromTLS(certFile, keyFile, caFile string) (*http.Transport, error) { + // Load client cert + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, err + } + + // Load CA cert + caCert, err := ioutil.ReadFile(caFile) + if err != nil { + return nil, err + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(caCert) + + // Setup HTTPS client + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + tlsConfig.BuildNameToCertificate() + transport := &http.Transport{ + TLSClientConfig: tlsConfig, + } + return transport, nil } func guessIPv4Addresses() ([]string, error) { @@ -786,28 +778,26 @@ func (s *InMemoryState) updateSnapshotsFromKnownState( filesystem, s.masterFor(filesystem), server, ) if s.masterFor(filesystem) == server { - // notify any interested parties that there are some new snapshots on - // the master - var latest snapshot if len(*snapshots) > 0 { - latest = (*snapshots)[len(*snapshots)-1] - } else { - latest = snapshot{} + // notify any interested parties that there are some new snapshots on + // the master + + latest := (*snapshots)[len(*snapshots)-1] + log.Printf( + "[updateSnapshots] publishing latest snapshot %s on %s", + latest, filesystem, + ) + go func() { + err := s.newSnapsOnMaster.Publish(filesystem, latest) + if err != nil { + log.Printf( + "[updateSnapshotsFromKnownState] "+ + "error publishing to newSnapsOnMaster: %s", + err, + ) + } + }() } - log.Printf( - "[updateSnapshots] publishing latest snapshot %s on %s", - latest, filesystem, - ) - go func() { - err := s.newSnapsOnMaster.Publish(filesystem, latest) - if err != nil { - log.Printf( - "[updateSnapshotsFromKnownState] "+ - "error publishing to newSnapsOnMaster: %s", - err, - ) - } - }() } // also slice it filesystem-wise, and publish to any observers // listening on a per-filesystem observer parameterized on server diff --git a/cmd/dotmesh-server/pkg/main/http.go b/cmd/dotmesh-server/pkg/main/http.go index 994060a30..8aaf46f35 100644 --- a/cmd/dotmesh-server/pkg/main/http.go +++ b/cmd/dotmesh-server/pkg/main/http.go @@ -332,12 +332,12 @@ func rpcAfterFunc(reqInfo *rpc.RequestInfo) { return } rpcTracker.mutex.Lock() + defer rpcTracker.mutex.Unlock() startedAt, found := rpcTracker.rpcDuration[reqUUID] if !found { fmt.Printf("Error: Unable to find requestUUID in requestTracker: %s", reqUUID) return } - rpcTracker.mutex.Unlock() duration := time.Since(startedAt) url := fmt.Sprintf("%s", reqInfo.Request.URL) statusCode := fmt.Sprintf("%v", reqInfo.StatusCode) diff --git a/cmd/dotmesh-server/pkg/main/observer.go b/cmd/dotmesh-server/pkg/main/observer.go index c5286d22c..4cb8d072b 100644 --- a/cmd/dotmesh-server/pkg/main/observer.go +++ b/cmd/dotmesh-server/pkg/main/observer.go @@ -5,18 +5,21 @@ package main import ( "fmt" + "log" "strings" "sync" "time" ) type Observer struct { + name string events map[string][]chan interface{} rwMutex sync.RWMutex } -func NewObserver() *Observer { +func NewObserver(name string) *Observer { return &Observer{ + name: name, rwMutex: sync.RWMutex{}, events: map[string][]chan interface{}{}, } @@ -118,7 +121,17 @@ func (o *Observer) Publish(event string, data interface{}) error { return } }() - outputChan <- data + + select { + case outputChan <- data: + // Sent OK without timing out + break + case <-time.After(600 * time.Second): + // Took more than 10 minutes + log.Printf("[Observer.Publish] LONG WAIT to send %#v on %s:%s", data, o.name, event) + outputChan <- data + log.Printf("[Observer.Publish] Finally sent %#v on %s:%s after a long wait", data, o.name, event) + } }(outputChan) } diff --git a/cmd/dotmesh-server/pkg/main/rpc.go b/cmd/dotmesh-server/pkg/main/rpc.go index 9f2f628c3..377389438 100644 --- a/cmd/dotmesh-server/pkg/main/rpc.go +++ b/cmd/dotmesh-server/pkg/main/rpc.go @@ -1106,7 +1106,7 @@ func (d *DotmeshRPC) Transfer( args *TransferRequest, result *string, ) error { - client := NewJsonRpcClient(args.User, args.Peer, args.ApiKey) + client := NewJsonRpcClient(args.User, args.Peer, args.ApiKey, args.Port) log.Printf("[Transfer] starting with %+v", safeArgs(*args)) diff --git a/cmd/dotmesh-server/pkg/main/statemachines.go b/cmd/dotmesh-server/pkg/main/statemachines.go index 933fcdc39..109b96483 100644 --- a/cmd/dotmesh-server/pkg/main/statemachines.go +++ b/cmd/dotmesh-server/pkg/main/statemachines.go @@ -9,6 +9,7 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "strconv" "strings" "sync" @@ -37,11 +38,11 @@ func newFilesystemMachine(filesystemId string, s *InMemoryState) *fsMachine { snapshotsModified: make(chan bool), state: s, snapshotsLock: &sync.Mutex{}, - newSnapsOnServers: NewObserver(), + newSnapsOnServers: NewObserver(fmt.Sprintf("newSnapsOnServers:%s", filesystemId)), currentState: "discovering", status: "", lastTransitionTimestamp: time.Now().UnixNano(), - transitionObserver: NewObserver(), + transitionObserver: NewObserver(fmt.Sprintf("transitionObserver:%s", filesystemId)), lastTransferRequest: TransferRequest{}, // In the case where we're receiving a push (pushPeerState), it's the // POST handler on our http server which handles the receiving of the @@ -249,6 +250,7 @@ func (f *fsMachine) updateEtcdAboutSnapshots() error { if err != nil { return err } + // as soon as we're connected, eagerly: if we know about some // snapshots, **or the absence of them**, set this in etcd. serialized, err := func() ([]byte, error) { @@ -282,9 +284,13 @@ func (f *fsMachine) updateEtcdAboutSnapshots() error { ) // wait until the state machine notifies us that it's changed the - // snapshots - _ = <-f.snapshotsModified - log.Printf("[updateEtcdAboutSnapshots] going 'round the loop") + // snapshots, but have a timeout in case this filesystem is deleted so we don't block forever + select { + case _ = <-f.snapshotsModified: + log.Printf("[updateEtcdAboutSnapshots] going 'round the loop") + case <-time.After(60 * time.Second): + } + return nil } @@ -343,6 +349,15 @@ func (f *fsMachine) transitionedTo(state string, status string) { log.Printf("error updating etcd: %s", update, err) return } + // we don't hear our own echo, so set it locally too. + f.state.globalStateCacheLock.Lock() + defer f.state.globalStateCacheLock.Unlock() + // fake an etcd version for anyone expecting a version field + update["version"] = "0" + if _, ok := (*f.state.globalStateCache)[f.state.myNodeId]; !ok { + (*f.state.globalStateCache)[f.state.myNodeId] = map[string]map[string]string{} + } + (*f.state.globalStateCache)[f.state.myNodeId][f.filesystemId] = update } // state functions @@ -520,46 +535,15 @@ waitingForSlaveSnapshot: } func (f *fsMachine) mount() (responseEvent *Event, nextState stateFn) { - /* - out, err := exec.Command( - "mkdir", "-p", mnt(f.filesystemId)).CombinedOutput() - if err != nil { - log.Printf("%v while trying to mkdir mountpoint %s", err, fq(f.filesystemId)) - return &Event{ - Name: "failed-mkdir-mountpoint", - Args: &EventArgs{"err": err, "combined-output": string(out)}, - }, backoffState - } - out, err = exec.Command("mount.zfs", "-o", "noatime", - fq(f.filesystemId), mnt(f.filesystemId)).CombinedOutput() - if err != nil { - log.Printf("%v while trying to mount %s", err, fq(f.filesystemId)) - return &Event{ - Name: "failed-mount", - Args: &EventArgs{"err": err, "combined-output": string(out)}, - }, backoffState - } - // trust that zero exit codes from mkdir && mount.zfs means - // that it worked and that the filesystem now exists and is - // mounted - f.snapshotsLock.Lock() - defer f.snapshotsLock.Unlock() - f.filesystem.exists = true // needed in create case - f.filesystem.mounted = true - return &Event{Name: "mounted", Args: &EventArgs{}}, activeState - */ - mountPath := mnt(f.filesystemId) zfsPath := fq(f.filesystemId) - // only try to make the folder if it doesn't already exist - // if there is an error - it means we could not mount so don't - // update the filesystem with mounted = true + // only try to make the directory if it doesn't already exist if _, err := os.Stat(mountPath); os.IsNotExist(err) { out, err := exec.Command( "mkdir", "-p", mountPath).CombinedOutput() if err != nil { - log.Printf("%v while trying to mkdir mountpoint %s", err, zfsPath) + log.Printf("[mount:%s] %v while trying to mkdir mountpoint %s", f.filesystemId, err, zfsPath) return &Event{ Name: "failed-mkdir-mountpoint", Args: &EventArgs{"err": err, "combined-output": string(out)}, @@ -567,7 +551,7 @@ func (f *fsMachine) mount() (responseEvent *Event, nextState stateFn) { } } - // omly try to use mount.zfs if it's not already present in the output + // only try to use mount.zfs if it's not already present in the output // of calling "mount" mounted, err := isFilesystemMounted(f.filesystemId) if err != nil { @@ -580,7 +564,76 @@ func (f *fsMachine) mount() (responseEvent *Event, nextState stateFn) { out, err := exec.Command("mount.zfs", "-o", "noatime", zfsPath, mountPath).CombinedOutput() if err != nil { - log.Printf("%v while trying to mount %s", err, zfsPath) + log.Printf("[mount:%s] %v while trying to mount %s", f.filesystemId, err, zfsPath) + if strings.Contains(string(out), "already mounted") { + // This can happen when the filesystem is mounted in some other + // namespace for some reason. Try searching for it in all + // processes' mount namespaces, and recursively unmounting it + // from one namespace at a time until becomes free... + firstPidNSToUnmount, rerr := func() (string, error) { + mountTables, err := filepath.Glob("/proc/*/mounts") + if err != nil { + return "", err + } + if mountTables == nil { + return "", fmt.Errorf("no mount tables in /proc/*/mounts") + } + for _, mountTable := range mountTables { + mounts, err := ioutil.ReadFile(mountTable) + if err != nil { + // pids can disappear between globbing and reading + log.Printf( + "[mount:%s] ignoring error reading pid mount table %v: %v", + mountTable, err, + ) + continue + } + // return the first namespace found, as we'll unmount + // in there and then try again (recursively) + for _, line := range strings.Split(string(mounts), "\n") { + if strings.Contains(line, f.filesystemId) { + shrapnel := strings.Split(mountTable, "/") + // e.g. (0)/(1)proc/(2)X/(3)mounts + return shrapnel[2], nil + } + } + } + return "", fmt.Errorf("unable to find %s in any /proc/*/mounts", f.filesystemId) + }() + if rerr != nil { + return &Event{ + Name: "failed-finding-namespace-to-unmount", + Args: &EventArgs{ + "original-err": err, "original-combined-output": string(out), + "recovery-err": rerr, + }, + }, backoffState + } + log.Printf( + "[mount:%s] attempting recovery-unmount in ns %s after %v/%v", + f.filesystemId, firstPidNSToUnmount, err, string(out), + ) + rout, rerr := exec.Command( + "nsenter", "-t", firstPidNSToUnmount, "-m", "-u", "-n", "-i", + "umount", mountPath, + ).CombinedOutput() + if rerr != nil { + return &Event{ + Name: "failed-recovery-unmount", + Args: &EventArgs{ + "original-err": err, "original-combined-output": string(out), + "recovery-err": rerr, "recovery-combined-output": string(rout), + }, + }, backoffState + } + // recurse, maybe we've made enough progress to be able to + // mount this time? + // + // TODO limit recursion depth + return f.mount() + } + // if there is an error - it means we could not mount so don't + // update the filesystem with mounted = true return &Event{ Name: "failed-mount", Args: &EventArgs{"err": err, "combined-output": string(out)}, @@ -1147,10 +1200,17 @@ func transferRequestify(in interface{}) (TransferRequest, error) { "Unable to cast %s to map[string]interface{}", in, ) } + var port int + if typed["Port"] == nil { + port = 0 + } else { + port = int(typed["Port"].(float64)) + } return TransferRequest{ Peer: typed["Peer"].(string), User: typed["User"].(string), ApiKey: typed["ApiKey"].(string), + Port: port, Direction: typed["Direction"].(string), LocalNamespace: typed["LocalNamespace"].(string), LocalName: typed["LocalName"].(string), @@ -1195,6 +1255,7 @@ func missingState(f *fsMachine) stateFn { f.transitionedTo("missing", "waiting for snapshots or requests") select { case _ = <-newSnapsOnMaster: + f.transitionedTo("missing", "new snapshots found on master") return receivingState case e := <-f.innerRequests: f.transitionedTo("missing", fmt.Sprintf("handling %s", e.Name)) @@ -1305,6 +1366,12 @@ func missingState(f *fsMachine) stateFn { f.innerResponses <- responseEvent return nextState } + } else if e.Name == "mount" { + f.innerResponses <- &Event{ + Name: "nothing-to-mount", + Args: &EventArgs{}, + } + return missingState } else { f.innerResponses <- &Event{ Name: "unhandled", @@ -1319,6 +1386,19 @@ func missingState(f *fsMachine) stateFn { return backoffState } +func backoffStateWithReason(reason string) func(f *fsMachine) stateFn { + return func(f *fsMachine) stateFn { + f.transitionedTo("backoff", fmt.Sprintf("pausing due to %s", reason)) + log.Printf("entering backoff state for %s", f.filesystemId) + // TODO if we know that we're supposed to be mounted or unmounted, based on + // etcd state, actually put us back into the required state rather than + // just passively going back into discovering... or maybe, do that in + // discoveringState? + time.Sleep(time.Second) + return discoveringState + } +} + func backoffState(f *fsMachine) stateFn { f.transitionedTo("backoff", "pausing") log.Printf("entering backoff state for %s", f.filesystemId) @@ -1330,6 +1410,12 @@ func backoffState(f *fsMachine) stateFn { return discoveringState } +func failedState(f *fsMachine) stateFn { + f.transitionedTo("failed", "never coming back") + log.Printf("entering failed state for %s", f.filesystemId) + select {} +} + func (f *fsMachine) discover() error { // discover system state synchronously filesystem, err := discoverSystem(f.filesystemId) @@ -1384,11 +1470,12 @@ func discoveringState(f *fsMachine) stateFn { err := f.state.alignMountStateWithMasters(f.filesystemId) if err != nil { log.Printf( - "[discoveringState:%s] error trying to align mount state with masters: %v", + "[discoveringState:%s] error trying to align mount state with masters: %v, "+ + "going into failed state forever", f.filesystemId, err, ) - return backoffState + return failedState } // TODO do we need to acquire some locks here? if f.filesystem.mounted { @@ -1458,21 +1545,18 @@ func receivingState(f *fsMachine) stateFn { if err != nil { switch err := err.(type) { - // TODO should the following 'discoveringState's be 'backoffState's? case *ToSnapsUpToDate: - log.Printf("receivingState: ToSnapsUpToDate %s got %s", f.filesystemId, err) // this is fine, we're up-to-date - return discoveringState + return backoffStateWithReason(fmt.Sprintf("receivingState: ToSnapsUpToDate %s got %s", f.filesystemId, err)) case *NoFromSnaps: - log.Printf("receivingState: NoFromSnaps %s got %s", f.filesystemId, err) // this is fine, no snaps; can't replicate yet, but will - return discoveringState + return backoffStateWithReason(fmt.Sprintf("receivingState: NoFromSnaps %s got %s", f.filesystemId, err)) case *ToSnapsAhead: log.Printf("receivingState: ToSnapsAhead %s got %s", f.filesystemId, err) // erk, slave is ahead of master errx := f.recoverFromDivergence(err.latestCommonSnapshot) if errx != nil { - log.Printf("receivingState(%s): Unable to recover from divergence: %+v", f.filesystemId, errx) + return backoffStateWithReason(fmt.Sprintf("receivingState(%s): Unable to recover from divergence: %+v", f.filesystemId, errx)) } // Go to discovering state, to update the world with our recent ZFS actions. return discoveringState @@ -1480,19 +1564,17 @@ func receivingState(f *fsMachine) stateFn { log.Printf("receivingState: ToSnapsDiverged %s got %s", f.filesystemId, err) errx := f.recoverFromDivergence(err.latestCommonSnapshot) if errx != nil { - log.Printf("receivingState(%s): Unable to recover from divergence: %+v", f.filesystemId, errx) + return backoffStateWithReason(fmt.Sprintf("receivingState(%s): Unable to recover from divergence: %+v", f.filesystemId, errx)) } // Go to discovering state, to update the world with our recent ZFS actions. return discoveringState case *NoCommonSnapshots: - log.Printf("receivingState: NoCommonSnapshots %s got %s", f.filesystemId, err) // erk, no common snapshots between master and slave // TODO: create a new local clone (branch), then delete the current // filesystem to enable replication to continue - return discoveringState + return backoffStateWithReason(fmt.Sprintf("receivingState: NoCommonSnapshots %s got %+v", f.filesystemId, err)) default: - log.Printf("receivingState: default error handler %s got %s", f.filesystemId, err) - return discoveringState + return backoffStateWithReason(fmt.Sprintf("receivingState: default error handler %s got %s", f.filesystemId, err)) } } @@ -1509,8 +1591,7 @@ func receivingState(f *fsMachine) stateFn { case NoSuchClone: // Normal case for non-clone filesystems, continue. default: - log.Printf("Error trying to lookup clone by id: %s", err) - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: Error trying to lookup clone by id: %+v", err)) } } else { // Found a clone, let's base our pull on it @@ -1526,19 +1607,16 @@ func receivingState(f *fsMachine) stateFn { f.state.masterFor(f.filesystemId), ) if len(addresses) == 0 { - log.Printf("No known address for current master of %s", f.filesystemId) - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: No known address for current master of %s", f.filesystemId)) } _, _, apiKey, err := getPasswords("admin") if err != nil { - log.Printf("Attempting to pull %s got %s", f.filesystemId, err) - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: Attempting to pull %s got %+v", f.filesystemId, err)) } url, err := deduceUrl(context.Background(), addresses, "internal", "admin", apiKey) if err != nil { - log.Printf("%s", err) - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: deduceUrl failed with %+v", err)) } req, err := http.NewRequest( @@ -1552,15 +1630,13 @@ func receivingState(f *fsMachine) stateFn { nil, ) if err != nil { - log.Printf("Attempting to pull %s got %s", f.filesystemId, err) - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: Attempting to pull %s got %+v", f.filesystemId, err)) } req.SetBasicAuth("admin", apiKey) client := &http.Client{} resp, err := client.Do(req) if err != nil { - log.Printf("Attempting to pull %s got %s", f.filesystemId, err) - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: Attempting to pull %s got %+v", f.filesystemId, err)) } log.Printf( "Debug: curl -u admin:[pw] %s/filesystems/%s/%s/%s", @@ -1605,7 +1681,7 @@ func receivingState(f *fsMachine) stateFn { prelude, err := consumePrelude(pipeReader) if err != nil { _ = <-finished - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: error consuming prelude: %+v", err)) } log.Printf("[pull] Got prelude %v", prelude) @@ -1617,18 +1693,16 @@ func receivingState(f *fsMachine) stateFn { f.transitionedTo("receiving", "finished pipe") if err != nil { - log.Printf( - "Got error %s when running zfs recv for %s, check zfs-recv-stderr.log", + return backoffStateWithReason(fmt.Sprintf("receivingState: Got error %+v when running zfs recv for %s, check zfs-recv-stderr.log", err, f.filesystemId, - ) - return backoffState + )) } else { log.Printf("Successfully received %s => %s for %s", fromSnap, snapRange.toSnap.Id) } log.Printf("[pull] about to start applying prelude on %v", pipeReader) err = applyPrelude(prelude, fq(f.filesystemId)) if err != nil { - return backoffState + return backoffStateWithReason(fmt.Sprintf("receivingState: Error applying prelude: %+v", err)) } return discoveringState } @@ -1748,6 +1822,7 @@ func pushInitiatorState(f *fsMachine) stateFn { transferRequest.User, transferRequest.Peer, transferRequest.ApiKey, + transferRequest.Port, ) // TODO should we wait for the remote to ack that it's gone into the right state? @@ -2047,20 +2122,25 @@ func (f *fsMachine) pull( // 2. Perform GET, as receivingState does. Update as we go, similar to how // push does it. - url, err := deduceUrl( - context.Background(), - []string{transferRequest.Peer}, - // pulls are between clusters, so use external address where - // appropriate - "external", - transferRequest.User, - transferRequest.ApiKey, - ) - if err != nil { - return &Event{ - Name: "push-initiator-cant-deduce-url", - Args: &EventArgs{"err": err}, - }, backoffState + var url string + if transferRequest.Port == 0 { + url, err = deduceUrl( + context.Background(), + []string{transferRequest.Peer}, + // pulls are between clusters, so use external address where + // appropriate + "external", + transferRequest.User, + transferRequest.ApiKey, + ) + if err != nil { + return &Event{ + Name: "push-initiator-cant-deduce-url", + Args: &EventArgs{"err": err}, + }, backoffState + } + } else { + url = fmt.Sprintf("http://%s:%d", transferRequest.Peer, transferRequest.Port) } url = fmt.Sprintf( @@ -2321,22 +2401,26 @@ func (f *fsMachine) push( defer postWriter.Close() defer postReader.Close() - url, err := deduceUrl( - ctx, - []string{transferRequest.Peer}, - // pushes are between clusters, so use external address where - // appropriate - "external", - transferRequest.User, - transferRequest.ApiKey, - ) - if err != nil { - return &Event{ - Name: "push-initiator-cant-deduce-url", - Args: &EventArgs{"err": err}, - }, backoffState + var url string + if transferRequest.Port == 0 { + url, err = deduceUrl( + ctx, + []string{transferRequest.Peer}, + // pushes are between clusters, so use external address where + // appropriate + "external", + transferRequest.User, + transferRequest.ApiKey, + ) + if err != nil { + return &Event{ + Name: "push-initiator-cant-deduce-url", + Args: &EventArgs{"err": err}, + }, backoffState + } + } else { + url = fmt.Sprintf("http://%s:%d", transferRequest.Peer, transferRequest.Port) } - url = fmt.Sprintf( "%s/filesystems/%s/%s/%s", url, @@ -2510,6 +2594,10 @@ func (f *fsMachine) push( resp, err := postClient.Do(req) if err != nil { log.Printf("[actualPush:%s] error in postClient.Do: %s", filesystemId, err) + + go func() { + _ = <-errch + }() _ = <-finished return &Event{ Name: "error-from-post-when-pushing", @@ -2526,6 +2614,10 @@ func (f *fsMachine) push( filesystemId, string(responseBody), err, ) + + go func() { + _ = <-errch + }() _ = <-finished return &Event{ Name: "error-reading-push-response-body", @@ -2536,7 +2628,9 @@ func (f *fsMachine) push( log.Printf("[actualPush:%s] Got response body while pushing: status %d, body %s", filesystemId, resp.StatusCode, string(responseBody)) if resp.StatusCode != 200 { - log.Printf("ABS TEST: Aborting!") + go func() { + _ = <-errch + }() _ = <-finished return &Event{ Name: "error-pushing-posting", @@ -2833,6 +2927,7 @@ func pullInitiatorState(f *fsMachine) stateFn { transferRequest.User, transferRequest.Peer, transferRequest.ApiKey, + transferRequest.Port, ) var path PathToTopLevelFilesystem diff --git a/cmd/dotmesh-server/pkg/main/types.go b/cmd/dotmesh-server/pkg/main/types.go index d6d23abd5..d7519f65a 100644 --- a/cmd/dotmesh-server/pkg/main/types.go +++ b/cmd/dotmesh-server/pkg/main/types.go @@ -251,6 +251,7 @@ type fsMachine struct { type TransferRequest struct { Peer string // hostname User string + Port int ApiKey string //protected value in toString Direction string // "push" or "pull" LocalNamespace string diff --git a/cmd/dotmesh-server/pkg/main/utils.go b/cmd/dotmesh-server/pkg/main/utils.go index 50cc36259..d0bc1a2d5 100644 --- a/cmd/dotmesh-server/pkg/main/utils.go +++ b/cmd/dotmesh-server/pkg/main/utils.go @@ -12,6 +12,7 @@ import ( "net/http" "os" "os/exec" + "runtime" "strings" "sync" "sync/atomic" @@ -43,7 +44,7 @@ func deduceUrl(ctx context.Context, hostnames []string, mode, user, apiKey strin for _, urlToTry := range urlsToTry { // hostname (2nd arg) doesn't matter because we're just calling // reallyCallRemote which doesn't use it. - j := NewJsonRpcClient(user, "", apiKey) + j := NewJsonRpcClient(user, "", apiKey, 0) var result bool err := j.reallyCallRemote(ctx, "DotmeshRPC.Ping", nil, &result, urlToTry+"/rpc") if err == nil { @@ -281,7 +282,7 @@ func runForever(f func() error, label string, errorBackoff, successBackoff time. } } -var deathObserver *Observer = NewObserver() +var deathObserver *Observer = NewObserver("deathObserver") // run while filesystem lives func runWhileFilesystemLives(f func() error, label string, filesystemId string, errorBackoff, successBackoff time.Duration) { @@ -671,3 +672,16 @@ func quietLogger(logMessage string) { log.Printf(logMessage) } } + +func getMyStack() string { + len := 1024 + for { + buf := make([]byte, len) + used := runtime.Stack(buf, false) + if used < len { + return string(buf[:used]) + } else { + len = len * 2 + } + } +} diff --git a/cmd/dotmesh-server/pkg/operator/main.go b/cmd/dotmesh-server/pkg/operator/main.go index 4cbf45aad..f65738ec1 100644 --- a/cmd/dotmesh-server/pkg/operator/main.go +++ b/cmd/dotmesh-server/pkg/operator/main.go @@ -599,7 +599,7 @@ func (c *dotmeshController) process() error { continue } - if status == v1.PodFailed { + if status == v1.PodFailed || status == v1.PodSucceeded { // We're deleting the pod, so the user can't "kubectl describe" it, so let's log lots of stuff glog.Infof("Observing pod %s - status %s: FAILED (Message: %s) (Reason: %s)", podName, @@ -905,6 +905,7 @@ nodeLoop: // Create a dotmesh pod (with local storage for now) assigned to this node privileged := true + terminationGracePeriodSeconds := int64(500) newDotmesh := v1.Pod{ ObjectMeta: meta_v1.ObjectMeta{ @@ -985,9 +986,10 @@ nodeLoop: }, }, }, - RestartPolicy: v1.RestartPolicyNever, - ServiceAccountName: "dotmesh", - Volumes: volumes, + RestartPolicy: v1.RestartPolicyNever, + TerminationGracePeriodSeconds: &terminationGracePeriodSeconds, + ServiceAccountName: "dotmesh", + Volumes: volumes, }, } diff --git a/cmd/dotmesh-server/require_zfs.sh b/cmd/dotmesh-server/require_zfs.sh index 5e369e65c..29f668697 100755 --- a/cmd/dotmesh-server/require_zfs.sh +++ b/cmd/dotmesh-server/require_zfs.sh @@ -34,14 +34,17 @@ function fetch_zfs { echo "Successfully loaded downloaded ZFS for $KERN :)" } +# Find the hostname from the actual host, rather than the container. +HOSTNAME="`nsenter -t 1 -m -u -n -i hostname`" + # Put the data file inside /var/lib so that we end up on the big # partition if we're in a LinuxKit env. POOL_SIZE=${POOL_SIZE:-10G} DIR=${USE_POOL_DIR:-/var/lib/dotmesh} -DIR=$(echo $DIR |sed s/\#HOSTNAME\#/$(hostname)/) +DIR=$(echo $DIR |sed s/\#HOSTNAME\#/$HOSTNAME/) FILE=${DIR}/dotmesh_data POOL=${USE_POOL_NAME:-pool} -POOL=$(echo $POOL |sed s/\#HOSTNAME\#/$(hostname)/) +POOL=$(echo $POOL |sed s/\#HOSTNAME\#/$HOSTNAME/) DOTMESH_INNER_SERVER_NAME=${DOTMESH_INNER_SERVER_NAME:-dotmesh-server-inner} FLEXVOLUME_DRIVER_DIR=${FLEXVOLUME_DRIVER_DIR:-/usr/libexec/kubernetes/kubelet-plugins/volume/exec} INHERIT_ENVIRONMENT_NAMES=( "FILESYSTEM_METADATA_TIMEOUT" "DOTMESH_UPGRADES_URL" "DOTMESH_UPGRADES_INTERVAL_SECONDS") @@ -78,21 +81,8 @@ if [ -n "$CONTAINER_POOL_MNT" ]; then echo "$DIR seems to be mounted from $BLOCK_DEVICE" OUTER_DIR=`nsenter -t 1 -m -u -n -i /bin/sh -c 'mount' | grep $BLOCK_DEVICE | cut -f 3 -d ' ' | head -n 1` echo "$BLOCK_DEVICE seems to be mounted on $OUTER_DIR in the host" - - if [ $OUTER_DIR != $DIR ] - then - # Make paths involving $OUTER_DIR work in OUR namespace, by binding $DIR to $OUTER_DIR - mkdir -p $OUTER_DIR - mount --make-rshared / - mount --bind $DIR $OUTER_DIR - mount --make-rshared $OUTER_DIR - echo "Here's the contents of $OUTER_DIR in the require_zfs.sh container:" - ls -l $OUTER_DIR - echo "Here's the contents of $DIR in the require_zfs.sh container:" - ls -l $DIR - echo "They should be the same!" - fi else + BLOCK_DEVICE="n/a" OUTER_DIR="$DIR" fi @@ -100,7 +90,25 @@ fi MOUNTPOINT=${MOUNTPOINT:-$OUTER_DIR/mnt} CONTAINER_MOUNT_PREFIX=${CONTAINER_MOUNT_PREFIX:-$OUTER_DIR/container_mnt} -# Set up mounts that are needed +# Set the shared flag on the working directory on the host. This is +# essential; it, combined with the presence of the shared flag on the +# bind-mount of this into the container namespace when we run the +# dotmesh server container, means that mounts created by the dotmesh +# server will be propogated back up to the host. + +# The bind mount of the outer dir onto itself is a trick to make sure +# it's actually a mount - if it's just a directory on the host (eg, +# /var/lib/dotmesh), then we can't set the shared flag on it as it's +# part of some larger mount. Creating a bind mount means we have a +# distinct mount point at the local. However, it's not necessary if it +# really IS a mountpoint already (eg, a k8s pv). + +# Confused? Here's some further reading, in order: + +# https://lwn.net/Articles/689856/ +# https://lwn.net/Articles/690679/ +# https://www.kernel.org/doc/Documentation/filesystems/sharedsubtree.txt + nsenter -t 1 -m -u -n -i /bin/sh -c \ "set -xe $EXTRA_HOST_COMMANDS @@ -133,31 +141,44 @@ else fi fi -POOL_LOGFILE=$DIR/dotmesh_pool.log +POOL_LOGFILE=$DIR/dotmesh_pool_log + +run_in_zfs_container() { + NAME=$1 + shift + docker run -i --rm --pid=host --privileged --name=dotmesh-$NAME-$$ \ + -v $OUTER_DIR:$OUTER_DIR:rshared \ + $DOTMESH_DOCKER_IMAGE \ + "$@" +} set -ex if [ ! -e /dev/zfs ]; then mknod -m 660 /dev/zfs c $(cat /sys/class/misc/zfs/dev |sed 's/:/ /g') fi -if ! zpool status $POOL; then + +echo "`date`: On host '$HOSTNAME', working directory = '$OUTER_DIR', device = '$BLOCK_DEVICE', zfs mountpoint = '$MOUNTPOINT', pool = '$POOL', Dotmesh image = '$DOTMESH_DOCKER_IMAGE'" + +if ! run_in_zfs_container zpool-status zpool status $POOL; then # TODO: make case where truncate previously succeeded but zpool create # failed or never run recoverable. if [ ! -f $FILE ]; then truncate -s $POOL_SIZE $FILE - zpool create -m $MOUNTPOINT $POOL "$OUTER_DIR/dotmesh_data" + run_in_zfs_container zpool-create zpool create -m $MOUNTPOINT $POOL "$OUTER_DIR/dotmesh_data" echo "This directory contains dotmesh data files, please leave them alone unless you know what you're doing. See github.com/dotmesh-io/dotmesh for more information." > $DIR/README - zpool get -H guid $POOL |cut -f 3 > $DIR/dotmesh_pool_id + run_in_zfs_container zpool-get zpool get -H guid $POOL |cut -f 3 > $DIR/dotmesh_pool_id if [ -n "$CONTAINER_POOL_PVC_NAME" ]; then echo "$CONTAINER_POOL_PVC_NAME" > $DIR/dotmesh_pvc_name fi + echo "`date`: Pool created" >> $POOL_LOGFILE else - zpool import -f -d $OUTER_DIR $POOL + run_in_zfs_container zpool-import zpool import -f -d $OUTER_DIR $POOL + echo "`date`: Pool imported" >> $POOL_LOGFILE fi - echo "`date`: Pool '$POOL' mounted from host mountpoint '$OUTER_DIR', zfs mountpoint '$MOUNTPOINT'" >> $POOL_LOGFILE else - echo "`date`: Pool '$POOL' already exists, adopted by new dotmesh server" >> $POOL_LOGFILE + echo "`date`: Pool already exists" >> $POOL_LOGFILE fi # Clear away stale socket if existing @@ -206,8 +227,8 @@ pki_volume_mount="" if [ "$PKI_PATH" != "" ]; then pki_volume_mount="-v $PKI_PATH:/pki" fi - -net="-p 32607:32607 -p 32608:32608" +PORT=${DOTMESH_SERVER_PORT:-32607} +net="-p ${PORT}:32607 -p 32608:32608" link="" # this setting means we have set DOTMESH_ETCD_ENDPOINT to a known working @@ -281,19 +302,7 @@ done # a pod but a container run from /var/run/docker.sock (while true; do docker logs -f dotmesh-server-inner || true; sleep 1; done) & -# In order of the -v options below: - -# 1. Mount the docker socket so that we can stop and start containers around -# e.g. dm reset. - -# 2. Be able to install the docker plugin. - -# 3. Be able to mount zfs filesystems from inside the container in -# such a way that they propogate up to the host, and be able to -# create some symlinks that we hand to the docker volume plugin. - -# 4. Be able to install a Kubernetes FlexVolume driver (we make symlinks -# where it tells us to). +# Prepare cleanup logic TERMINATING=no @@ -302,20 +311,51 @@ cleanup() { if [ $TERMINATING = no ] then - echo "`date`: Shutting down due to $REASON" >> $POOL_LOGFILE + echo "Shutting down due to $REASON" TERMINATING=yes else - echo "`date`: Ignoring $REASON as we're already shutting down" >> $POOL_LOGFILE + echo "Ignoring $REASON as we're already shutting down" return fi - # Release the ZFS pool - echo "`date`: Unmounting $MOUNTPOINT:" >> $POOL_LOGFILE - umount "$MOUNTPOINT" >> $POOL_LOGFILE 2>&1 || true - echo "`date`: zpool exporting $POOL:" >> $POOL_LOGFILE - zpool export -f "$POOL" >> $POOL_LOGFILE 2>&1 + if true + then + # Log mounts + + # '| egrep "$DIR|$OUTER_DIR"' might make this less verbose, but + # also might miss out useful information about parent + # mountpoints. For instaince, in dind mode in the test suite, the + # relevent mountpoint in the host is `/dotmesh-test-pools` rather + # than the full $OUTER_DIR. + + echo "DEBUG mounts on host:" + nsenter -t 1 -m -u -n -i cat /proc/self/mountinfo | sed 's/^/HOST: /' || true + echo "DEBUG mounts in require_zfs.sh container:" + cat /proc/self/mountinfo | sed 's/^/OUTER: /' || true + echo "DEBUG mounts in an inner container:" + run_in_zfs_container inspect-namespace /bin/cat /proc/self/mountinfo | sed 's/^/INNER: /' || true + echo "DEBUG End of mount tables." + fi + + # Release the ZFS pool. Do so in a mount namespace which has $OUTER_DIR + # rshared, otherwise zpool export's unmounts can be mighty confusing. + + # Step 1: Unmount any dots and the ZFS mountpoint (if it's there) + echo "Unmount dots in $MOUNTPOINT/mnt/dmfs:" + run_in_zfs_container zpool-unmount-dots sh -c "cd \"$MOUNTPOINT/mnt/dmfs\"; for i in *; do umount --force $i; done" || true + echo "Unmounting $MOUNTPOINT:" + run_in_zfs_container zpool-unmount umount --force --recursive "$MOUNTPOINT"|| true - echo "`date`: Finished cleanup: zpool export returned $?" >> $POOL_LOGFILE + # Step 2: Shut down the pool. + echo "zpool exporting $POOL:" + if run_in_zfs_container zpool-export zpool export -f "$POOL" + then + echo "`date`: Exported pool OK" >> $POOL_LOGFILE + echo "Exported pool OK." + else + echo "`date`: ERROR: Failed exporting pool!" >> $POOL_LOGFILE + echo "ERROR: Failed exporting pool!." + fi } shutdown() { @@ -325,11 +365,8 @@ shutdown() { trap - $SIGNAL cleanup "signal $SIGNAL" - - exit 0 } -trap 'shutdown EXIT' EXIT trap 'shutdown SIGTERM' SIGTERM trap 'shutdown SIGINT' SIGINT trap 'shutdown SIGQUIT' SIGQUIT @@ -338,7 +375,26 @@ trap 'shutdown SIGKILL' SIGKILL set +e -docker run -i $rm_opt --privileged --name=$DOTMESH_INNER_SERVER_NAME \ +# In order of the -v options below: + +# 1. Mount the docker socket so that we can stop and start containers around +# e.g. dm reset. + +# 2. Be able to install the docker plugin. + +# 3. Be able to mount zfs filesystems from inside the container in +# such a way that they propogate up to the host, and be able to +# create some symlinks that we hand to the docker volume plugin. + +# 4. Be able to install a Kubernetes FlexVolume driver (we make symlinks +# where it tells us to). + +# NOTE: The *source* path in the -v flags IS AS SEEN BY THE HOST, +# *not* as seen by this container running require_zfs.sh, because +# we're talking to the host docker. That's why we must map +# $OUTER_DIR:$OUTER_DIR and not $DIR:$OUTER_DIR... + +docker run -i $rm_opt --pid=host --privileged --name=$DOTMESH_INNER_SERVER_NAME \ -v /var/run/docker.sock:/var/run/docker.sock \ -v /run/docker/plugins:/run/docker/plugins \ -v $OUTER_DIR:$OUTER_DIR:rshared \ diff --git a/docs/dev-commands.md b/docs/dev-commands.md index 52fc735cc..667a3e0a9 100644 --- a/docs/dev-commands.md +++ b/docs/dev-commands.md @@ -196,28 +196,13 @@ vagrant ssh bash /vagrant/scripts/reset_vagrant.sh ``` -#### symlink code +#### Working from your host **Optional**: If you would like to work on code from your local machine (i.e not inside a VM), follow this section. Alternatively, you can just change code inside the vagrant VM and commit/push from inside it. - -There can be issues with Vagrant shared folders hence this being a manual step. - -```bash -vagrant ssh -cd $GOPATH/src/github.com/dotmesh-io -# might as well keep this - backs up the version vagrant checked out for you -mv dotmesh dotmesh2 -ln -s /vagrant dotmesh -# now $GOPATH/src/github.com/dotmesh-io/dotmesh -> /vagrant -> this repo on your host ``` - -**NOTE:** using the symlink can drastically slow down docker builds. - -You can use this script which copies the latest git hash from your host: - -```bash -cd /vagrant -make vagrant.sync +open smb://admin:password@172.17.1.178/vagrantshare ``` +This should open a shared network drive which allows you to edit files in the dotmesh-io folder of your vagrant machine go path from your host machine. + ## generic setup instructions diff --git a/scripts/prepare_vagrant.sh b/scripts/prepare_vagrant.sh index 1ccb718fa..21d578eeb 100644 --- a/scripts/prepare_vagrant.sh +++ b/scripts/prepare_vagrant.sh @@ -54,3 +54,6 @@ cd $HOME/$DISCOVERY_REPO cd $GOPATH/src/$GITHUB_HOST/$GITHUB_ORG/$GITHUB_REPO ./prime.sh + +# start samba for sharing between host and vm +docker run -d -p 139:139 -p 445:445 --name samba --restart always -v $GOPATH/src/$GITHUB_HOST/$GITHUB_ORG:/mount dperson/samba -u "admin;password" -s "vagrantshare;/mount;yes;no;no;admin;admin;admin;comment" \ No newline at end of file diff --git a/tests/Gopkg.lock b/tests/Gopkg.lock index fe78baa1c..f2dd3f5c5 100644 --- a/tests/Gopkg.lock +++ b/tests/Gopkg.lock @@ -5,7 +5,7 @@ branch = "master" name = "github.com/dotmesh-io/citools" packages = ["."] - revision = "3563ec39f6b6f11462dd0416e9c3ed4acc400f7e" + revision = "930915871cd64abbed410f1767e2b3c6a470f674" [[projects]] branch = "master" diff --git a/tests/acceptance_test.go b/tests/acceptance_test.go index 1cd1adc52..391fe5750 100644 --- a/tests/acceptance_test.go +++ b/tests/acceptance_test.go @@ -1321,8 +1321,8 @@ func TestTwoSingleNodeClusters(t *testing.T) { citools.TeardownFinishedTestRuns() f := citools.Federation{ - citools.NewCluster(1), // cluster_0_node_0 - citools.NewCluster(1), // cluster_1_node_0 + citools.NewCluster(1), // cluster_0_node_0 + citools.NewClusterOnPort(32609, 1), // cluster_1_node_0 } defer citools.TestMarkForCleanup(f) citools.AddFuncToCleanups(func() { citools.TestMarkForCleanup(f) }) @@ -1336,7 +1336,7 @@ func TestTwoSingleNodeClusters(t *testing.T) { node2 := f[1].GetNode(0).Container t.Run("SpecifyPort", func(t *testing.T) { - citools.RunOnNode(t, node1, "echo "+f[1].GetNode(0).ApiKey+" | dm remote add funny_port_remote admin@"+f[1].GetNode(0).IP+":32607") + citools.RunOnNode(t, node1, "echo "+f[1].GetNode(0).ApiKey+" | dm remote add funny_port_remote admin@"+f[1].GetNode(0).IP+":32609") }) t.Run("PushCommitBranchExtantBase", func(t *testing.T) { diff --git a/tests/vendor/github.com/dotmesh-io/citools/testtools.go b/tests/vendor/github.com/dotmesh-io/citools/testtools.go index 7593d9b79..7ee491af8 100644 --- a/tests/vendor/github.com/dotmesh-io/citools/testtools.go +++ b/tests/vendor/github.com/dotmesh-io/citools/testtools.go @@ -852,10 +852,12 @@ type Node struct { IP string ApiKey string Password string + Port int } type Cluster struct { DesiredNodeCount int + Port int Env map[string]string ClusterArgs string Nodes []Node @@ -874,19 +876,24 @@ type Pair struct { RemoteName string } +func NewClusterOnPort(port, desiredNodeCount int) *Cluster { + emptyEnv := make(map[string]string) + return NewClusterWithArgs(desiredNodeCount, port, emptyEnv, "") +} + func NewCluster(desiredNodeCount int) *Cluster { emptyEnv := make(map[string]string) - return NewClusterWithArgs(desiredNodeCount, emptyEnv, "") + return NewClusterWithArgs(desiredNodeCount, 0, emptyEnv, "") } func NewClusterWithEnv(desiredNodeCount int, env map[string]string) *Cluster { - return NewClusterWithArgs(desiredNodeCount, env, "") + return NewClusterWithArgs(desiredNodeCount, 0, env, "") } // custom arguments that are passed through to `dm cluster {init,join}` -func NewClusterWithArgs(desiredNodeCount int, env map[string]string, args string) *Cluster { +func NewClusterWithArgs(desiredNodeCount, port int, env map[string]string, args string) *Cluster { env["DOTMESH_UPGRADES_URL"] = "" //set default test env vars - return &Cluster{DesiredNodeCount: desiredNodeCount, Env: env, ClusterArgs: args} + return &Cluster{DesiredNodeCount: desiredNodeCount, Port: port, Env: env, ClusterArgs: args} } func NewKubernetes(desiredNodeCount int, storageMode string, dindStorage bool) *Kubernetes { @@ -922,15 +929,22 @@ func NodeFromNodeName(t *testing.T, now int64, i, j int, clusterName string) Nod nil, ) var apiKey string + var port int if err != nil { fmt.Printf("no dm config found, proceeding without recording apiKey\n") } else { fmt.Printf("dm config on %s: %s\n", nodeName(now, i, j), dotmeshConfig) m := struct { - Remotes struct{ Local struct{ ApiKey string } } + Remotes struct { + Local struct { + ApiKey string + Port int + } + } }{} json.Unmarshal([]byte(dotmeshConfig), &m) apiKey = m.Remotes.Local.ApiKey + port = m.Remotes.Local.Port } // /root/.dotmesh/admin-password.txt is created on docker @@ -951,6 +965,7 @@ func NodeFromNodeName(t *testing.T, now int64, i, j int, clusterName string) Nod IP: nodeIP, ApiKey: apiKey, Password: password, + Port: port, } } @@ -1024,10 +1039,11 @@ SEARCHABLE HEADER: STARTING CLUSTER _, err := docker( pair.From.Container, fmt.Sprintf( - "echo %s |dm remote add %s admin@%s", + "echo %s |dm remote add %s admin@%s:%d", pair.To.ApiKey, pair.RemoteName, pair.To.IP, + pair.To.Port, ), nil, ) @@ -1600,6 +1616,7 @@ func (c *Cluster) Start(t *testing.T, now int64, i int) error { " --use-pool-dir /dotmesh-test-pools/" + poolId(now, i, 0) + " --use-pool-name " + poolId(now, i, 0) + " --dotmesh-upgrades-url ''" + + " --port " + strconv.Itoa(c.Port) + c.ClusterArgs fmt.Printf("running dm cluster init with following command: %s\n", dmInitCommand)