Skip to content

Updated Responses to adhere to YCSB API changes and added Exception reporting #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 33 additions & 34 deletions src/main/java/couchdb/CouchdbClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Set;
import java.util.Vector;

import com.yahoo.ycsb.*;
import org.ektorp.CouchDbConnector;
import org.ektorp.DocumentNotFoundException;
import org.ektorp.UpdateConflictException;
Expand All @@ -18,11 +19,6 @@
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.StringByteIterator;

import couchdb.StringToStringMap;

/*
Expand Down Expand Up @@ -51,12 +47,7 @@ public class CouchdbClient extends DB{
private static final String PROTOCOL = "http";
// Database connector
private CouchDbConnector dbConnector;
// Return codes
private static final int OK = 0;
private static final int UPDATE_CONFLICT = -2;
private static final int DOC_NOT_FOUND = -3;
private static final int JSON_PARSING_FAULT = -4;


public CouchdbClient(){
this.dbConnector = null;
}
Expand All @@ -65,7 +56,7 @@ public CouchdbClient(){
public CouchdbClient(List<URL> urls){
if(urls == null)
throw new IllegalArgumentException("urls is null");
this.dbConnector = new LoadBalancedConnector(urls, DEFAULT_DATABASE_NAME);
this.dbConnector = new LoadBalancedConnector(urls, DEFAULT_DATABASE_NAME, null, null);
}

private List<URL> getUrlsForHosts() throws DBException{
Expand Down Expand Up @@ -99,7 +90,15 @@ private URL getUrlForHost(String host) throws DBException{
@Override
public void init() throws DBException{
List<URL> urls = getUrlsForHosts();
this.dbConnector = new LoadBalancedConnector(urls, DEFAULT_DATABASE_NAME);
String username = null;
String password = null;
if (getProperties().getProperty("username") != null && getProperties().getProperty("username").length() > 0) {
username = getProperties().getProperty("username");
}
if (getProperties().getProperty("password") != null && getProperties().getProperty("password").length() > 0 ) {
password = getProperties().getProperty("password");
}
this.dbConnector = new LoadBalancedConnector(urls, DEFAULT_DATABASE_NAME, username, password);
}

@Override
Expand All @@ -115,32 +114,32 @@ private StringToStringMap executeReadOperation(String key){
}
}

private int executeWriteOperation(String key, StringToStringMap dataToWrite){
private Status executeWriteOperation(String key, StringToStringMap dataToWrite){
try{
dataToWrite.put("_id", key);
this.dbConnector.create(dataToWrite);
} catch(UpdateConflictException exc){
return UPDATE_CONFLICT;
return new Status("Conflict", "Conflict while updating record");
}
return OK;
return Status.OK;
}

private int executeDeleteOperation(StringToStringMap dataToDelete){
private Status executeDeleteOperation(StringToStringMap dataToDelete){
try{
this.dbConnector.delete(dataToDelete);
} catch(UpdateConflictException exc){
return UPDATE_CONFLICT;
return new Status("Conflict", "Conflict while updating record");
}
return OK;
return Status.OK;
}

private int executeUpdateOperation(StringToStringMap dataToUpdate){
private Status executeUpdateOperation(StringToStringMap dataToUpdate){
try{
this.dbConnector.update(dataToUpdate);
} catch(UpdateConflictException exc){
return UPDATE_CONFLICT;
return new Status("Conflict", "Conflict while updating record");
}
return OK;
return Status.OK;
}

private void copyRequestedFieldsToResultMap(Set<String> fields,
Expand All @@ -166,27 +165,27 @@ private void copyAllFieldsToResultMap(StringToStringMap inputMap,

// Table variable is not used => already contained in database connector
@Override
public int read(String table, String key, Set<String> fields,
public Status read(String table, String key, Set<String> fields,
HashMap<String, ByteIterator> result) {
StringToStringMap queryResult = this.executeReadOperation(key);
if(queryResult == null)
return DOC_NOT_FOUND;
return Status.NOT_FOUND;
if(fields == null){
this.copyAllFieldsToResultMap(queryResult, result);
}else{
this.copyRequestedFieldsToResultMap(fields, queryResult, result);
}
return OK;
return Status.OK;
}

@Override
public int scan(String table, String startkey, int recordcount,
public Status scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, ByteIterator>> result) {
ViewResult viewResult = this.executeView(startkey, recordcount);
for(Row row: viewResult.getRows()){
JSONObject jsonObj = this.parseAsJsonObject(row.getDoc());
if(jsonObj == null)
return JSON_PARSING_FAULT;
return new Status("Error", "Json parsing failed");
if(fields == null){
@SuppressWarnings("unchecked")
Set<String> requestedFields = jsonObj.keySet();
Expand All @@ -195,7 +194,7 @@ public int scan(String table, String startkey, int recordcount,
result.add(this.getFieldsFromJsonObj(fields, jsonObj));
}
}
return OK;
return Status.OK;
}

private ViewResult executeView(String startKey, int amountOfRecords){
Expand Down Expand Up @@ -228,11 +227,11 @@ private HashMap<String, ByteIterator> getFieldsFromJsonObj(Set<String> fields, J

// Table variable is not used => already contained in database connector
@Override
public int update(String table, String key,
HashMap<String, ByteIterator> values) {
public Status update(String table, String key,
HashMap<String, ByteIterator> values) {
StringToStringMap queryResult = this.executeReadOperation(key);
if(queryResult == null)
return DOC_NOT_FOUND;
return Status.NOT_FOUND;
StringToStringMap updatedMap = this.updateFields(queryResult, values);
return this.executeUpdateOperation(updatedMap);
}
Expand All @@ -248,18 +247,18 @@ private StringToStringMap updateFields(StringToStringMap toUpdate,

// Table variable is not used => already contained in database connector
@Override
public int insert(String table, String key,
public Status insert(String table, String key,
HashMap<String, ByteIterator> values) {
StringToStringMap dataToInsert = new StringToStringMap(values);
return this.executeWriteOperation(key, dataToInsert);
}

// Table variable is not used => already contained in database connector
@Override
public int delete(String table, String key) {
public Status delete(String table, String key) {
StringToStringMap toDelete = this.executeReadOperation(key);
if(toDelete == null)
return DOC_NOT_FOUND;
return Status.NOT_FOUND;
return this.executeDeleteOperation(toDelete);
}

Expand Down
54 changes: 40 additions & 14 deletions src/main/java/couchdb/LoadBalancedConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,28 @@ public class LoadBalancedConnector implements CouchDbConnector{
private final List<CouchDbConnector> connectors;
private int nextConnector;

public LoadBalancedConnector(List<URL> urlsOfNodesInCluster, String databaseName){
public LoadBalancedConnector(List<URL> urlsOfNodesInCluster, String databaseName, String username, String password){
if(urlsOfNodesInCluster == null)
throw new IllegalArgumentException("urlsOfNodesInClusterIsNull");
if(urlsOfNodesInCluster.isEmpty())
throw new IllegalArgumentException("At least one node required");
this.connectors = this.createConnectors(urlsOfNodesInCluster, databaseName);
this.connectors = this.createConnectors(urlsOfNodesInCluster, databaseName, username, password);
this.nextConnector = 0;
}

private List<CouchDbConnector> createConnectors(List<URL> urlsForConnectors, String databaseName){
private List<CouchDbConnector> createConnectors(List<URL> urlsForConnectors, String databaseName, String username, String password){
List<CouchDbConnector> result = new ArrayList<CouchDbConnector>();
for(URL url : urlsForConnectors){
HttpClient httpClient = new StdHttpClient.Builder().url(url).build();
CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient);
StdHttpClient.Builder httpClient = new StdHttpClient.Builder()
.url(url);
if (username != null) {
httpClient.username(username);
}
if (password != null) {
httpClient.password(password);
}
httpClient.socketTimeout(100000);
CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient.build());
// 2nd paramter true => Create database if not exists
CouchDbConnector dbConnector = dbInstance.createConnector(databaseName, true);
result.add(dbConnector);
Expand Down Expand Up @@ -107,7 +115,9 @@ public void create(String id, Object o) {
failed = false;
} catch(UpdateConflictException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
if(failed)
throw new NoNodeReacheableException();
Expand All @@ -122,7 +132,9 @@ public void create(Object o) {
failed = false;
} catch(UpdateConflictException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
if(failed)
throw new NoNodeReacheableException();
Expand All @@ -137,7 +149,9 @@ public void update(Object o) {
failed = false;
} catch(UpdateConflictException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
if(failed)
throw new NoNodeReacheableException();
Expand All @@ -150,7 +164,9 @@ public String delete(Object o) {
return this.getConnectorForMutationOperations().delete(o);
} catch(UpdateConflictException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
throw new NoNodeReacheableException();
}
Expand All @@ -162,7 +178,9 @@ public String delete(String id, String revision) {
return this.getConnectorForMutationOperations().delete(id, revision);
} catch(UpdateConflictException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
throw new NoNodeReacheableException();
}
Expand Down Expand Up @@ -190,7 +208,9 @@ public <T> T get(Class<T> c, String id) {
return this.getConnector().get(c, id);
} catch(DocumentNotFoundException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
throw new NoNodeReacheableException();
}
Expand All @@ -202,7 +222,9 @@ public <T> T get(Class<T> c, String id, Options options) {
return this.getConnector().get(c, id, options);
} catch(DocumentNotFoundException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
throw new NoNodeReacheableException();
}
Expand Down Expand Up @@ -309,7 +331,9 @@ public ViewResult queryView(ViewQuery query) {
for(int i=0; i<this.connectors.size(); i++){
try{
return this.getConnector().queryView(query);
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
throw new NoNodeReacheableException();
}
Expand Down Expand Up @@ -496,7 +520,9 @@ public void update(String id, InputStream document, long length,
failed = false;
} catch(UpdateConflictException exc){
throw exc;
} catch(Exception exc){}
} catch(Exception exc){
exc.printStackTrace();
}
}
if(failed)
throw new NoNodeReacheableException();
Expand Down