From 9195e59c2c29eabb2f93e3acfd095204f739e2a5 Mon Sep 17 00:00:00 2001 From: Justin Clift Date: Tue, 21 Mar 2023 13:56:36 +1100 Subject: [PATCH] all: Add initial working prototype for "live" databases These are initially intended for use via the DBHub.io API, though if they prove useful will be hooked up to the other DB4S and DBHub.io pieces (webui, dio, etc). --- .github/workflows/cypress.yml | 2 +- .gitignore | 4 + README.md | 2 + api/handlers.go | 739 ++++++++++++++++-- api/main.go | 22 + api/templates/head.html | 7 - api/templates/root.html | 30 +- common/cypress.go | 27 + common/diff.go | 4 +- common/live.go | 536 +++++++++++++ common/minio.go | 102 ++- common/postgresql.go | 101 ++- common/sqlite.go | 345 +++++++- common/types.go | 27 +- common/userinput.go | 19 +- common/util.go | 117 +-- .../test_data/Join Testing with index.sqlite | Bin 0 -> 16384 bytes docker/Dockerfile | 118 ++- docker/config.toml | 8 +- go.mod | 14 +- go.sum | 51 +- live/main.go | 316 ++++++++ webui/main.go | 159 +++- webui/pages.go | 20 +- webui/templates/profile.html | 33 +- webui/templates/upload.html | 34 +- webui/vis.go | 6 +- 27 files changed, 2542 insertions(+), 301 deletions(-) delete mode 100644 api/templates/head.html create mode 100644 common/live.go create mode 100644 cypress/test_data/Join Testing with index.sqlite create mode 100644 live/main.go diff --git a/.github/workflows/cypress.yml b/.github/workflows/cypress.yml index 2234d402a..317de996f 100644 --- a/.github/workflows/cypress.yml +++ b/.github/workflows/cypress.yml @@ -22,7 +22,7 @@ jobs: uses: cypress-io/github-action@v5 with: wait-on: 'https://localhost:9443' - wait-on-timeout: 120 + wait-on-timeout: 180 build: sh ./build_dbhub_docker_and_local.sh start: yarn docker:startlocal command: yarn cypress:test diff --git a/.gitignore b/.gitignore index 165c00425..114e0d9ef 100644 --- a/.gitignore +++ b/.gitignore @@ -56,3 +56,7 @@ webui/js/markdown-editor.js # Local secrets .env + +# Other files +headers.out +curl_commands.txt diff --git a/README.md b/README.md index 7c3d2edd8..5f885d129 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ or run it locally for your own users. * [Minio](https://minio.io) - release 2016-11-26T02:23:47Z and later are known to work. * [NodeJS](https://nodejs.org) - version 18.x is known to work, others are untested. * [PostgreSQL](https://www.postgresql.org) - version 13 and above are known to work. +* [RabbitMQ](https://www.rabbitmq.com) - version 3.10.x is known to work, others are untested. * [Yarn](https://classic.yarnpkg.com) - version 1.22.x. Not Yarn 2.x or greater. ### Subdirectories @@ -31,6 +32,7 @@ or run it locally for your own users. * [default_licences](default_licences/) - Useful Open Source licences suitable for databases. * [db4s](db4s/) - REST server which [DB Browser for SQLite](http://sqlitebrowser.org) and [Dio](https://github.com/sqlitebrowser/dio) use for communicating with DBHub.io. +* [live](live/) - Internal daemon which manages live SQLite databases. * [webui](webui/) - The main public facing webUI. ### Libraries for accessing DBHub.io via API diff --git a/api/handlers.go b/api/handlers.go index 20b5e59d6..bcb3401d1 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -2,11 +2,16 @@ package main import ( "encoding/json" + "errors" "fmt" "log" + "mime/multipart" "net/http" "net/url" + "os" + "sort" + sqlite "github.com/gwenn/gosqlite" com "github.com/sqlitebrowser/dbhub.io/common" ) @@ -28,6 +33,17 @@ func branchesHandler(w http.ResponseWriter, r *http.Request) { } dbFolder := "/" + // If the database is a live database, we return an error message + isLive, _, err := com.CheckDBLive(dbOwner, dbFolder, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if isLive { + jsonErr(w, "That database is a live database. It doesn't have branches.", http.StatusBadRequest) + return + } + // Retrieve the branch list for the database brList, err := com.BranchListResponse(dbOwner, dbFolder, dbName) if err != nil { @@ -52,19 +68,19 @@ func branchesHandler(w http.ResponseWriter, r *http.Request) { // $ curl -F apikey="YOUR_API_KEY_HERE" \ // -F dbowner="justinclift" \ // -F dbname="Join Testing.sqlite" \ -// -F table="tablename" https://api.dbhub.io/v1/columns +// -F table="table1" https://api.dbhub.io/v1/columns // * "apikey" is one of your API keys. These can be generated from your Settings page once logged in // * "dbowner" is the owner of the database // * "dbname" is the name of the database // * "table" is the name of the table or view func columnsHandler(w http.ResponseWriter, r *http.Request) { - // Do auth check, grab request info, open the database - sdb, httpStatus, err := collectInfoAndOpen(w, r) + // Do auth check, grab request info + loggedInUser, dbOwner, dbName, _, httpStatus, err := collectInfo(w, r) if err != nil { jsonErr(w, err.Error(), httpStatus) return } - defer sdb.Close() + dbFolder := "/" // Extract the table name table, err := com.GetFormTable(r, false) @@ -79,13 +95,103 @@ func columnsHandler(w http.ResponseWriter, r *http.Request) { return } - // Retrieve the list of columns for the table - cols, err := sdb.Columns("", table) + // Check if the database is a live database, and get the node/queue to send the request to + isLive, liveNode, err := com.CheckDBLive(dbOwner, dbFolder, dbName) if err != nil { jsonErr(w, err.Error(), http.StatusInternalServerError) return } + // If a live database has been uploaded but doesn't have a live node handling its requests, then create one + if isLive && liveNode == "" { + // Send a request to the AMQP backend to setup a node with the database + err = com.LiveCreateDB(com.AmqpChan, dbOwner, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // If it's a standard database, process it locally. Else send the query to our AMQP backend + var cols []sqlite.Column + if !isLive { + // Get Minio bucket and object id for the SQLite file + bucket, id, _, err := com.MinioLocation(dbOwner, dbFolder, dbName, "", loggedInUser) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // Sanity check + if id == "" { + // The requested database wasn't found, or the user doesn't have permission to access it + jsonErr(w, "Requested database not found", http.StatusNotFound) + return + } + + // Retrieve the database from Minio, then open it + var sdb *sqlite.Conn + sdb, err = com.OpenSQLiteDatabase(bucket, id) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + defer sdb.Close() + + // Verify the requested table or view we're about to query does exist + var tablesViews []string + tablesViews, err = com.TablesAndViews(sdb, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + tableOrViewFound := false + for _, t := range tablesViews { + if t == table { + tableOrViewFound = true + } + } + if !tableOrViewFound { + jsonErr(w, "Provided table or view name doesn't exist in this database", http.StatusInternalServerError) + return + } + + // Retrieve the list of columns for the table + cols, err = sdb.Columns("", table) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } else { + // Send the columns request to our AMQP backend + var rawResponse []byte + rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "columns", loggedInUser, dbOwner, dbName, table) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + + // Decode the response + var resp com.LiveDBColumnsResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.Node == "" { + log.Printf("In API (Live) columnsHandler(). A node responded, but didn't identify itself.") + return + } + cols = resp.Columns + } + // Transfer the column info into our own structure, for better json formatting var jsonCols []com.APIJSONColumn for _, j := range cols { @@ -124,6 +230,17 @@ func commitsHandler(w http.ResponseWriter, r *http.Request) { } dbFolder := "/" + // If the database is a live database, we return an error message + isLive, _, err := com.CheckDBLive(dbOwner, dbFolder, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if isLive { + jsonErr(w, "That database is a live database. It doesn't have commits.", http.StatusBadRequest) + return + } + // Retrieve the commits commits, err := com.GetCommitList(dbOwner, dbFolder, dbName) if err != nil { @@ -141,10 +258,13 @@ func commitsHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, string(jsonData)) } -// databasesHandler returns the list of databases in the requesting users account +// databasesHandler returns the list of databases in the requesting users account. +// If the new (optional) "live" boolean text field is set to true, then it will return the list of live +// databases. Otherwise, it will return the list of standard databases. // This can be run from the command line using curl, like this: -// $ curl -F apikey="YOUR_API_KEY_HERE" https://api.dbhub.io/v1/databases +// $ curl -F apikey="YOUR_API_KEY_HERE" -F live="true" https://api.dbhub.io/v1/databases // * "apikey" is one of your API keys. These can be generated from your Settings page once logged in +// * "live" is whether to show Live databases, or standard ones func databasesHandler(w http.ResponseWriter, r *http.Request) { // Authenticate the request loggedInUser, err := checkAuth(w, r) @@ -153,14 +273,32 @@ func databasesHandler(w http.ResponseWriter, r *http.Request) { return } - // Retrieve the list of databases in the user account - var databases []com.DBInfo - databases, err = com.UserDBs(loggedInUser, com.DB_BOTH) + // Get "live" boolean value, if provided by the caller + var live bool + live, err = com.GetFormLive(r) if err != nil { - jsonErr(w, err.Error(), http.StatusInternalServerError) + jsonErr(w, err.Error(), http.StatusBadRequest) return } + // Retrieve the list of databases in the user account + var databases []com.DBInfo + if !live { + // Get the list of standard databases + databases, err = com.UserDBs(loggedInUser, com.DB_BOTH) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } else { + // Get the list of live databases + databases, err = com.LiveUserDBs(loggedInUser) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } + // Extract just the database names var list []string for _, j := range databases { @@ -191,7 +329,8 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) { } // Validate the database name - dbName, err := com.GetDatabase(r, false) + var dbName string + dbName, err = com.GetDatabase(r, false) if err != nil { jsonErr(w, err.Error(), http.StatusBadRequest) return @@ -199,12 +338,32 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) { dbOwner := loggedInUser dbFolder := "/" - // Invalidate the memcache data for the database - err = com.InvalidateCacheEntry(loggedInUser, dbOwner, dbFolder, dbName, "") // Empty string indicates "for all versions" + // Check if the database exists + exists, err := com.CheckDBPermissions(loggedInUser, dbOwner, dbFolder, dbName, false) if err != nil { jsonErr(w, err.Error(), http.StatusInternalServerError) return } + if !exists { + jsonErr(w, "Database does not exist, or user isn't authorised to access it", http.StatusNotFound) + return + } + + // For a standard database, invalidate its memcache data + var isLive bool + var liveNode string + isLive, liveNode, err = com.CheckDBLive(dbOwner, dbFolder, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusNotFound) + return + } + if !isLive { + err = com.InvalidateCacheEntry(loggedInUser, dbOwner, dbFolder, dbName, "") // Empty string indicates "for all versions" + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } // Delete the database err = com.DeleteDatabase(dbOwner, dbFolder, dbName) @@ -213,6 +372,44 @@ func deleteHandler(w http.ResponseWriter, r *http.Request) { return } + // Delete the database from Minio too + bucket := fmt.Sprintf("live-%s", dbOwner) + id := dbName + err = com.MinioDeleteDatabase("API server", dbOwner, dbName, bucket, id) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // For a live database, tell our AMQP backend to delete the database file + if isLive { + var rawResponse []byte + rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "delete", loggedInUser, dbOwner, dbName, "") + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + + // Decode the response + var resp com.LiveDBErrorResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.Node == "" { + log.Printf("In API (Live) deleteHandler(). A node responded, but didn't identify itself.") + return + } + } + // Return a "success" message z := com.StatusResponseContainer{Status: "OK"} jsonData, err := json.MarshalIndent(z, "", " ") @@ -334,6 +531,50 @@ func diffHandler(w http.ResponseWriter, r *http.Request) { return } + // Check permissions of the first database + var allowed bool + allowed, err = com.CheckDBPermissions(loggedInUser, dbOwnerA, "/", dbNameA, false) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if !allowed { + jsonErr(w, "Database not found", http.StatusNotFound) + return + } + + // Check permissions of the second database + allowed, err = com.CheckDBPermissions(loggedInUser, dbOwnerB, "/", dbNameB, false) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if !allowed { + jsonErr(w, "Database not found", http.StatusNotFound) + return + } + + // If either database is a live database, we return an error message + var isLive bool + isLive, _, err = com.CheckDBLive(dbOwnerA, "/", dbNameA) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if isLive { + jsonErr(w, fmt.Sprintf("'%s/%s' is a live database. It doesn't support diffs.", dbOwnerA, dbNameA), http.StatusBadRequest) + return + } + isLive, _, err = com.CheckDBLive(dbOwnerB, "/", dbNameB) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if isLive { + jsonErr(w, fmt.Sprintf("'%s/%s' is a live database. It doesn't support diffs.", dbOwnerB, dbNameB), http.StatusBadRequest) + return + } + // Perform diff diffs, err := com.Diff(dbOwnerA, "/", dbNameA, ca, dbOwnerB, "/", dbNameB, cb, loggedInUser, mergeStrategy, includeData) if err != nil { @@ -381,41 +622,113 @@ func downloadHandler(w http.ResponseWriter, r *http.Request) { // * "dbowner" is the owner of the database // * "dbname" is the name of the database func indexesHandler(w http.ResponseWriter, r *http.Request) { - // Do auth check, grab request info, open the database - sdb, httpStatus, err := collectInfoAndOpen(w, r) + // Do auth check, grab request info + loggedInUser, dbOwner, dbName, _, httpStatus, err := collectInfo(w, r) if err != nil { jsonErr(w, err.Error(), httpStatus) return } - defer sdb.Close() + dbFolder := "/" - // Retrieve the list of indexes - idx, err := sdb.Indexes("") + // Check if the database is a live database, and get the node/queue to send the request to + isLive, liveNode, err := com.CheckDBLive(dbOwner, dbFolder, dbName) if err != nil { jsonErr(w, err.Error(), http.StatusInternalServerError) return } - // Retrieve the column details for each index + // If a live database has been uploaded but doesn't have a live node handling its requests, then create one + if isLive && liveNode == "" { + // Send a request to the AMQP backend to set up a node with the database + err = com.LiveCreateDB(com.AmqpChan, dbOwner, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // If it's a standard database, process it locally. Else send the query to our AMQP backend var indexes []com.APIJSONIndex - for nam, tab := range idx { - oneIndex := com.APIJSONIndex{ - Name: nam, - Table: tab, - Columns: []com.APIJSONIndexColumn{}, + if !isLive { + // Get Minio bucket and object id for the SQLite file + bucket, id, _, err := com.MinioLocation(dbOwner, dbFolder, dbName, "", loggedInUser) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // Sanity check + if id == "" { + // The requested database wasn't found, or the user doesn't have permission to access it + jsonErr(w, "Requested database not found", http.StatusNotFound) + return } - cols, err := sdb.IndexColumns("", nam) + + // Retrieve the database from Minio, then open it + var sdb *sqlite.Conn + sdb, err = com.OpenSQLiteDatabase(bucket, id) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + defer sdb.Close() + + // Retrieve the list of indexes + var idx map[string]string + idx, err = sdb.Indexes("") + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // Retrieve the details for each index + for nam, tab := range idx { + oneIndex := com.APIJSONIndex{ + Name: nam, + Table: tab, + Columns: []com.APIJSONIndexColumn{}, + } + cols, err := sdb.IndexColumns("", nam) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + for _, k := range cols { + oneIndex.Columns = append(oneIndex.Columns, com.APIJSONIndexColumn{ + CID: k.Cid, + Name: k.Name, + }) + } + indexes = append(indexes, oneIndex) + } + } else { + // Send the indexes request to our AMQP backend + var rawResponse []byte + rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "indexes", loggedInUser, dbOwner, dbName, "") if err != nil { jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) return } - for _, k := range cols { - oneIndex.Columns = append(oneIndex.Columns, com.APIJSONIndexColumn{ - CID: k.Cid, - Name: k.Name, - }) + + // Decode the response + var resp com.LiveDBIndexesResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return } - indexes = append(indexes, oneIndex) + if resp.Error != "" { + err = errors.New(resp.Error) + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.Node == "" { + log.Printf("In API (Live) indexesHandler(). A node responded, but didn't identify itself.") + return + } + indexes = resp.Indexes } // Return the results @@ -443,6 +756,17 @@ func metadataHandler(w http.ResponseWriter, r *http.Request) { } dbFolder := "/" + // If the database is a live database, we return an error message + isLive, _, err := com.CheckDBLive(dbOwner, dbFolder, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if isLive { + jsonErr(w, "That database is a live database. It doesn't support metadata.", http.StatusBadRequest) + return + } + // Retrieve the metadata for the database meta, err := com.MetadataResponse(dbOwner, dbFolder, dbName) if err != nil { @@ -488,7 +812,7 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { // Grab the incoming SQLite query rawInput := r.FormValue("sql") - decodedStr, err := com.CheckUnicode(rawInput) + query, err := com.CheckUnicode(rawInput) if err != nil { jsonErr(w, err.Error(), http.StatusBadRequest) return @@ -506,14 +830,43 @@ func queryHandler(w http.ResponseWriter, r *http.Request) { return } - // Run the query - var data com.SQLiteRecordSet - data, err = com.SQLiteRunQueryDefensive(w, r, com.API, dbOwner, dbFolder, dbName, commitID, loggedInUser, decodedStr) + // Check if the database is a live database, and get the node/queue to send the request to + isLive, liveNode, err := com.CheckDBLive(dbOwner, dbFolder, dbName) if err != nil { jsonErr(w, err.Error(), http.StatusInternalServerError) return } + // If a live database has been uploaded but doesn't have a live node handling its requests, then create one + if isLive && liveNode == "" { + // Send a request to the AMQP backend to set up the database there, ready for querying + err = com.LiveCreateDB(com.AmqpChan, dbOwner, dbName) + if err != nil { + log.Println(err) // FIXME: Debug output while developing + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // Run the query + var data com.SQLiteRecordSet + if !isLive { + // Standard database + data, err = com.SQLiteRunQueryDefensive(w, r, com.QuerySourceAPI, dbOwner, dbFolder, dbName, commitID, loggedInUser, query) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } else { + // Send the query to the appropriate backend live node + data, err = com.LiveQueryDB(com.AmqpChan, liveNode, loggedInUser, dbOwner, dbName, query) + if err != nil { + log.Println(err) // FIXME: Debug output while developing + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } + // Return the results jsonData, err := json.MarshalIndent(data.Records, "", " ") if err != nil { @@ -539,6 +892,17 @@ func releasesHandler(w http.ResponseWriter, r *http.Request) { } dbFolder := "/" + // If the database is a live database, we return an error message + isLive, _, err := com.CheckDBLive(dbOwner, dbFolder, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if isLive { + jsonErr(w, "That database is a live database. It doesn't support releases.", http.StatusBadRequest) + return + } + // Retrieve the list of releases rels, err := com.GetReleases(dbOwner, dbFolder, dbName) if err != nil { @@ -589,22 +953,95 @@ func rootHandler(w http.ResponseWriter, r *http.Request) { // * "dbowner" is the owner of the database // * "dbname" is the name of the database func tablesHandler(w http.ResponseWriter, r *http.Request) { - // Do auth check, grab request info, open the database - sdb, httpStatus, err := collectInfoAndOpen(w, r) + // Do auth check, grab request info + loggedInUser, dbOwner, dbName, _, httpStatus, err := collectInfo(w, r) if err != nil { jsonErr(w, err.Error(), httpStatus) return } - defer sdb.Close() + dbFolder := "/" - // Retrieve the list of tables - tables, err := com.Tables(sdb) + // Check if the database is a live database, and get the node/queue to send the request to + isLive, liveNode, err := com.CheckDBLive(dbOwner, dbFolder, dbName) if err != nil { jsonErr(w, err.Error(), http.StatusInternalServerError) return } + // If a live database has been uploaded but doesn't have a live node handling its requests, then create one + if isLive && liveNode == "" { + // Send a request to the AMQP backend to setup a node with the database + err = com.LiveCreateDB(com.AmqpChan, dbOwner, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // If it's a standard database, process it locally. Else send the query to our AMQP backend + var tables []string + if !isLive { + // Get Minio bucket and object id for the SQLite file + bucket, id, _, err := com.MinioLocation(dbOwner, dbFolder, dbName, "", loggedInUser) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // Sanity check + if id == "" { + // The requested database wasn't found, or the user doesn't have permission to access it + jsonErr(w, "Requested database not found", http.StatusNotFound) + return + } + + // Retrieve the database from Minio, then open it + var sdb *sqlite.Conn + sdb, err = com.OpenSQLiteDatabase(bucket, id) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + defer sdb.Close() + + // Retrieve the list of tables + tables, err = com.Tables(sdb) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } else { + // Send the columns request to our AMQP backend + var rawResponse []byte + rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "tables", loggedInUser, dbOwner, dbName, "") + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + + // Decode the response + var resp com.LiveDBTablesResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.Node == "" { + log.Printf("In API (Live) tablesHandler(). A node responded, but didn't identify itself.") + return + } + tables = resp.Tables + } + // Return the results + sort.Strings(tables) jsonData, err := json.MarshalIndent(tables, "", " ") if err != nil { log.Printf("Error when JSON marshalling returned data in tablesHandler(): %v\n", err) @@ -629,6 +1066,17 @@ func tagsHandler(w http.ResponseWriter, r *http.Request) { } dbFolder := "/" + // If the database is a live database, we return an error message + isLive, _, err := com.CheckDBLive(dbOwner, dbFolder, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if isLive { + jsonErr(w, "That database is a live database. It doesn't support tags.", http.StatusBadRequest) + return + } + // Retrieve the tags tags, err := com.GetTags(dbOwner, dbFolder, dbName) if err != nil { @@ -660,9 +1108,10 @@ func tagsHandler(w http.ResponseWriter, r *http.Request) { // * "sourceurl" (optional) is the URL to the reference source of the data // * "lastmodified" (optional) is a datestamp in RFC3339 format // * "licence" (optional) is an identifier for a license that's "in the system" -// * "public" (optional) is whether or not the database should be public. True means "public", false means "not public" +// * "live" (optional) is a boolean string ("true", "false") indicating whether this upload is a live database +// * "public" (optional) is whether the database should be public. True means "public", false means "not public" // * "commit" (ignored for new databases, required for existing ones) is the commit ID this new database revision -// should be appended to. For new databases it's not needed, but for existing databases it's required (its used to +// should be appended to. For new databases it's not needed, but for existing databases it's required (it's used to // detect out of date / conflicting uploads) func uploadHandler(w http.ResponseWriter, r *http.Request) { // Authenticate the request @@ -710,14 +1159,125 @@ func uploadHandler(w http.ResponseWriter, r *http.Request) { } } + // Get "live" boolean value, if provided by the caller + var live bool + live, err = com.GetFormLive(r) + if err != nil { + jsonErr(w, err.Error(), http.StatusBadRequest) + return + } + // Process the upload var httpStatus int var x map[string]string dbOwner := loggedInUser // We always use the API key / cert owner as the database owner for uploads - x, httpStatus, err = com.UploadResponse(w, r, loggedInUser, dbOwner, dbName, commitID, "api") - if err != nil { - jsonErr(w, err.Error(), httpStatus) - return + if !live { + x, httpStatus, err = com.UploadResponse(w, r, loggedInUser, dbOwner, dbName, commitID, "api") + if err != nil { + jsonErr(w, err.Error(), httpStatus) + return + } + } else { + // FIXME: The code below is grabbed from com.UploadResponse(), and is also very similar to the code in the + // webui uploadDataHandler(). May be able to refactor them. + + // Grab the uploaded file and form variables + var tempFile multipart.File + var handler *multipart.FileHeader + tempFile, handler, err = r.FormFile("file") + if err != nil && err.Error() != "http: no such file" { + log.Printf("Uploading file failed: %v", err) + jsonErr(w, fmt.Sprintf("Something went wrong when grabbing the file data: '%s'", err.Error()), http.StatusBadRequest) + return + } + if err != nil { + if err.Error() == "http: no such file" { + // Check for a 'file1' FormFile too, as some clients can't use 'file' (without a number) due to a design bug + tempFile, handler, err = r.FormFile("file1") + if err != nil { + log.Printf("Uploading file failed: %v", err) + jsonErr(w, fmt.Sprintf("Something went wrong when grabbing the file data: '%s'", err.Error()), http.StatusBadRequest) + return + } + } + } + defer tempFile.Close() + + // If no database name was passed as a function argument, use the name given in the upload itself + if dbName == "" { + dbName = handler.Filename + } + + // Validate the database name + err = com.ValidateDB(dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusBadRequest) + return + } + + // Check if the database exists already + exists, err := com.CheckDBExists(loggedInUser, "/", dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // If the upload is a live database, but the database already exists, then abort the upload + // TODO: Consider if we want the existing "force" flag to be useful here, to potentially allow overwriting a + // live database + if exists && live { + jsonErr(w, "You're uploading a live database, but the same database name already exists. "+ + "Delete that one first if you really want to overwrite it", http.StatusConflict) + return + } + + // Write the incoming database to a temporary file on disk, and sanity check it + dbFolder := "/" + var numBytes int64 + var tempDB *os.File + numBytes, tempDB, _, _, err = com.WriteDBtoDisk(loggedInUser, dbOwner, dbFolder, dbName, tempFile) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + defer os.Remove(tempDB.Name()) + + // Rewind the internal cursor in the temporary file back to the start again + var newOffset int64 + newOffset, err = tempDB.Seek(0, 0) + if err != nil { + log.Printf("Seeking on the temporary file (2nd time) failed: %s", err) + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if newOffset != 0 { + jsonErr(w, "Seeking to start of temporary database file didn't work", http.StatusInternalServerError) + return + } + + // Store the database in Minio + err = com.LiveStoreDatabaseMinio(tempDB, dbOwner, dbName, numBytes) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // Log the successful database upload + log.Printf("API Server: Username '%s' uploaded LIVE database '%s/%s', bytes: %v", loggedInUser, + com.SanitiseLogString(dbOwner), com.SanitiseLogString(dbName), numBytes) + + // Send a request to the AMQP backend to set up the database there, ready for querying + err = com.LiveCreateDB(com.AmqpChan, dbOwner, dbName) + if err != nil { + log.Println(err) + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // Upload was successful, so we construct a fake commit ID then return a success message to the user + x = make(map[string]string) + x["commit_id"] = "" + x["url"] = fmt.Sprintf("/%s", dbOwner) } // Construct the response message @@ -753,22 +1313,95 @@ func uploadHandler(w http.ResponseWriter, r *http.Request) { // * "dbowner" is the owner of the database being queried // * "dbname" is the name of the database being queried func viewsHandler(w http.ResponseWriter, r *http.Request) { - // Do auth check, grab request info, open the database - sdb, httpStatus, err := collectInfoAndOpen(w, r) + // Do auth check, grab request info + loggedInUser, dbOwner, dbName, _, httpStatus, err := collectInfo(w, r) if err != nil { jsonErr(w, err.Error(), httpStatus) return } - defer sdb.Close() + dbFolder := "/" - // Retrieve the list of views - views, err := com.Views(sdb) + // Check if the database is a live database, and get the node/queue to send the request to + isLive, liveNode, err := com.CheckDBLive(dbOwner, dbFolder, dbName) if err != nil { jsonErr(w, err.Error(), http.StatusInternalServerError) return } + // If a live database has been uploaded but doesn't have a live node handling its requests, then create one + if isLive && liveNode == "" { + // Send a request to the AMQP backend to setup a node with the database + err = com.LiveCreateDB(com.AmqpChan, dbOwner, dbName) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } + + // If it's a standard database, process it locally. Else send the query to our AMQP backend + var views []string + if !isLive { + // Get Minio bucket and object id for the SQLite file + bucket, id, _, err := com.MinioLocation(dbOwner, dbFolder, dbName, "", loggedInUser) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + + // Sanity check + if id == "" { + // The requested database wasn't found, or the user doesn't have permission to access it + jsonErr(w, "Requested database not found", http.StatusNotFound) + return + } + + // Retrieve the database from Minio, then open it + var sdb *sqlite.Conn + sdb, err = com.OpenSQLiteDatabase(bucket, id) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + defer sdb.Close() + + // Retrieve the list of views + views, err = com.Views(sdb) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + } else { + // Send the columns request to our AMQP backend + var rawResponse []byte + rawResponse, err = com.MQSendRequest(com.AmqpChan, liveNode, "views", loggedInUser, dbOwner, dbName, "") + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + + // Decode the response + var resp com.LiveDBViewsResponse + err = json.Unmarshal(rawResponse, &resp) + if err != nil { + jsonErr(w, err.Error(), http.StatusInternalServerError) + log.Println(err) + return + } + if resp.Error != "" { + err = errors.New(resp.Error) + jsonErr(w, err.Error(), http.StatusInternalServerError) + return + } + if resp.Node == "" { + log.Printf("In API (Live) viewsHandler(). A node responded, but didn't identify itself.") + return + } + views = resp.Views + } + // Return the results + sort.Strings(views) jsonData, err := json.MarshalIndent(views, "", " ") if err != nil { log.Printf("Error when JSON marshalling returned data in viewsHandler(): %v\n", err) diff --git a/api/main.go b/api/main.go index ae22a7536..180cd02fb 100644 --- a/api/main.go +++ b/api/main.go @@ -1,5 +1,21 @@ package main +// TODO: API functions that still need updating for Live databases +// * diff - already updated to just return an error for live databases. needs testing though +// * download - Not done yet. Maybe use the SQLite backup API (https://www.sqlite.org/backup.html) or +// VACUUM INTO command to create a temp copy to send to the user? +// Maybe do that backup into Minio, then send from there? Probably need to experiment a bit. + +// FIXME: Update the documented Upload() function return values on the API doc page. Currently it talks about +// returning the commit ID for the upload. We'll probably return that field with a blank value for live +// databases though. TBD. + +// FIXME: We should probably add a "changelog.html" page to the API server, and record the differences between each +// version of the API we release + +// FIXME: After the API and webui pieces are done, figure out how the DB4S end +// point and dio should be updated to use live databases too + import ( "crypto/tls" "crypto/x509" @@ -64,6 +80,12 @@ func main() { log.Fatalf(err.Error()) } + // Connect to MQ server + com.AmqpChan, err = com.ConnectMQ("api server") + if err != nil { + log.Fatal(err) + } + // Connect to the Memcached server err = com.ConnectCache() if err != nil { diff --git a/api/templates/head.html b/api/templates/head.html deleted file mode 100644 index 0873c971b..000000000 --- a/api/templates/head.html +++ /dev/null @@ -1,7 +0,0 @@ -[[ define "head" ]] - - - DBHub.io API Documentation - - -[[ end ]] \ No newline at end of file diff --git a/api/templates/root.html b/api/templates/root.html index 5f92f8435..43adf7f91 100644 --- a/api/templates/root.html +++ b/api/templates/root.html @@ -1,7 +1,11 @@ [[ define "docs" ]] -[[ template "head" . ]] + + + DBHub.io API Documentation + +