Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create SlicingDice sink #1542

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
[cygnus-ngsi][KafkaSink] Using global connection to zookeeper instead of creating one each time an event arrives
[cygnus-ngsi][NGSINameMappingsInterceptor] Now namemapping checks sevice, subervice and (type of entity and id entity) of EntityMapping (#1535)
[cygnus-ngsi][NGSIEvent] Unable to deliver event: null pointer getAttributeForNaming (#1506)
[cygnus-ngsi][SlicingDiceSink] Added SlicingDice sink to cygnus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue number should be cited at the end of the line (check other lines in the same file, please).

Small typo: cygnus -> Cygnus ;)

Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright 2018 Telefonica Investigación y Desarrollo, S.A.U
*
* This file is part of fiware-cygnus (FIWARE project).
*
* fiware-cygnus is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
* General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
* fiware-cygnus is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along with fiware-cygnus. If not, see
* http://www.gnu.org/licenses/.
*
* For those usages not covered by the GNU Affero General Public License please contact with iot_support at tid dot es
*
* Authorship: SlicingDice
*
*/

package com.telefonica.iot.cygnus.backends.slicingdice;

import com.telefonica.iot.cygnus.errors.CygnusBadContextData;
import com.telefonica.iot.cygnus.errors.CygnusPersistenceError;
import com.telefonica.iot.cygnus.errors.CygnusRuntimeError;

public interface SlicingDiceBackend {

/**
* Creates the necessary columns on SlicingDice dimension.
* @param fieldNames
* @throws CygnusRuntimeError
* @throws CygnusPersistenceError
*/
void createColumns(String fieldNames) throws CygnusRuntimeError, CygnusPersistenceError;

/**
* Insert already processed context data into the given dimension.
* @param valuesForInsert
* @throws com.telefonica.iot.cygnus.errors.CygnusBadContextData
* @throws com.telefonica.iot.cygnus.errors.CygnusRuntimeError
* @throws com.telefonica.iot.cygnus.errors.CygnusPersistenceError
*/
void insertContextData(String valuesForInsert)
throws CygnusBadContextData, CygnusRuntimeError, CygnusPersistenceError;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright 2018 Telefonica Investigación y Desarrollo, S.A.U
*
* This file is part of fiware-cygnus (FIWARE project).
*
* fiware-cygnus is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
* General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
* fiware-cygnus is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along with fiware-cygnus. If not, see
* http://www.gnu.org/licenses/.
*
* For those usages not covered by the GNU Affero General Public License please contact with iot_support at tid dot es
*
* Authorship: SlicingDice
*
*/

package com.telefonica.iot.cygnus.backends.slicingdice;

import com.telefonica.iot.cygnus.backends.http.HttpBackend;
import com.telefonica.iot.cygnus.backends.http.JsonResponse;
import com.telefonica.iot.cygnus.errors.CygnusBadContextData;
import com.telefonica.iot.cygnus.errors.CygnusPersistenceError;
import com.telefonica.iot.cygnus.errors.CygnusRuntimeError;
import com.telefonica.iot.cygnus.log.CygnusLogger;
import java.util.ArrayList;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.entity.StringEntity;
import org.apache.http.message.BasicHeader;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

public class SlicingDiceBackendImpl extends HttpBackend implements SlicingDiceBackend {

private static final CygnusLogger LOGGER = new CygnusLogger(SlicingDiceBackendImpl.class);

// this is the SlicingDice host, the user will not be able to change it
private static final String SLICING_DICE_HOST = "api.slicingdice.com/v1";

// this is the SlicingDice port, the user will not be able to change it
private static final String SLICING_DICE_PORT = "443";

private static final boolean IS_SSL = true;

// max connections used by SlicingDice
private static final int MAX_CONNECTIONS = 50;

// database key used to access SlicingDice API
private final String databaseKey;

/**
* Constructor for the SlicingDice backend.
*
* @param databaseKey - the api key used to connect to the SlicingDice account
*/
public SlicingDiceBackendImpl(final String databaseKey) {
super(SLICING_DICE_HOST, SLICING_DICE_PORT, IS_SSL, false, null, null, null, null, MAX_CONNECTIONS, MAX_CONNECTIONS);

this.databaseKey = databaseKey;
}

@Override
public void createColumns(final String columnsToCreate) throws CygnusRuntimeError, CygnusPersistenceError {
final String urlPath = "/column/";

// do the SlicingDice request
final JsonResponse res = doSlicingDiceRequest("POST", urlPath, columnsToCreate);

// check the status
if (res.getStatusCode() == 200) {
LOGGER.debug("Successful column creation");
} else if (res.getStatusCode() == 400) {
final JSONArray errors = (JSONArray) res.getJsonObject().get("errors");
final JSONObject error = (JSONObject) errors.get(0);
final Long code = (Long) error.get("code");

if (code == 3003) {
LOGGER.debug("Column already exists");
} else {
throw new CygnusPersistenceError("Could not create the columns, " +
"statusCode=" + res.getStatusCode() + ")");
}
} else {
throw new CygnusPersistenceError("Could not create the columns, " +
"statusCode=" + res.getStatusCode() + ")");
} // if else
} // createColumns

@Override
public void insertContextData(final String valuesForInsert) throws CygnusBadContextData, CygnusRuntimeError, CygnusPersistenceError {
final String urlPath = "/insert/";

// do the SlicingDice request
final JsonResponse res = doSlicingDiceRequest("POST", urlPath, valuesForInsert);

// check the status
if (res.getStatusCode() == 200) {
LOGGER.debug("Successful inserted data on SlicingDice");
} else {
throw new CygnusPersistenceError("Could not create the columns, " +
"statusCode=" + res.getStatusCode() + ")");
} // if else
} // insertContextData

JsonResponse doSlicingDiceRequest(final String method, final String urlPath,
final String jsonString)
throws CygnusPersistenceError, CygnusRuntimeError {
ArrayList<Header> headers = new ArrayList<>();
headers.add(new BasicHeader("Authorization", databaseKey));
headers.add(new BasicHeader("Content-Type", "application/json"));
return doRequest(method, urlPath, true, headers, new StringEntity(jsonString, "UTF-8"));
} // doSlicingDiceRequest

@Override
protected JsonResponse createJsonResponse(final HttpResponse httpRes) throws CygnusRuntimeError {
return super.createJsonResponse(httpRes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Copyright 2018 Telefonica Investigación y Desarrollo, S.A.U
*
* This file is part of fiware-cygnus (FIWARE project).
*
* fiware-cygnus is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
* General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
* fiware-cygnus is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
* implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License
* for more details.
*
* You should have received a copy of the GNU Affero General Public License along with fiware-cygnus. If not, see
* http://www.gnu.org/licenses/.
*
* For those usages not covered by the GNU Affero General Public License please contact with iot_support at tid dot es
*
* Authorship: SlicingDice
*
*/

package com.telefonica.iot.cygnus.backends.slicingdice;

import com.telefonica.iot.cygnus.backends.http.JsonResponse;
import com.telefonica.iot.cygnus.errors.CygnusPersistenceError;
import com.telefonica.iot.cygnus.errors.CygnusRuntimeError;
import java.io.IOException;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseFactory;
import org.apache.http.HttpVersion;
import org.apache.http.ProtocolVersion;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.DefaultHttpResponseFactory;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.message.BasicStatusLine;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class SlicingDiceBackendImplTest {

// constants
private static final String DATABASE_KEY = "oiasdiondasidasndasomn";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fake key? (just to check)

private static final String COLUMNS_TO_CREATE = "[" +
" {" +
" \"name\": \"temperature\"," +
" \"api-name\": \"temperature\"," +
" \"type\": \"decimal-event\"," +
" \"decimal-places\": 5" +
" }" +
"]";

private static final String VALUES_TO_INSERT = "{" +
" \"ROOM1\": {" +
" \"temperature\": {" +
" \"value\": 15.5, " +
" \"date\": \"2018-10-21T00:00:00,000Z\"" +
" }" +
" }," +
" \"auto-create\": [\"dimension\", \"column\"]" +
"}";

// instance to be tested
private SlicingDiceBackendImpl backend;

// mocks
@Mock
private HttpClient mockHttpClient;

@Before
public void setUp() throws Exception {
backend = new SlicingDiceBackendImpl(DATABASE_KEY);
final BasicHttpResponse response = new BasicHttpResponse(new ProtocolVersion("http", 1, 1), 200, "ok");
response.setEntity(new StringEntity("{\"result\": {\"whatever\":\"whatever\"}}"));
when(mockHttpClient.execute(Mockito.any(HttpUriRequest.class))).thenReturn(response);
}

@Test
public void testCreateColumns() {
System.out.println("Testing SlicingDiceBackendImpl.createColumns");

try {
backend.setHttpClient(mockHttpClient);
backend.createColumns(COLUMNS_TO_CREATE);
} catch (final Exception e) {
Assert.fail(e.getMessage());
} finally {
Assert.assertTrue(true);
}
}

@Test
public void testColumnAlreadyExist() throws IOException, CygnusRuntimeError, CygnusPersistenceError {
System.out.println("Testing SlicingDiceBackendImpl.createColumns");

HttpResponseFactory factory = new DefaultHttpResponseFactory();
HttpResponse response = factory
.newHttpResponse(new BasicStatusLine(HttpVersion.HTTP_1_1, 400, null), null);
response.setHeader("Content-Type", "application/json");
String responseStr = "{\"errors\": [{\"code\": 3003, \"message\": \"Column: Column already exists.\"}]}";
response.setEntity(new StringEntity(responseStr));
final JsonResponse jsonRes = backend.createJsonResponse(response);

final SlicingDiceBackendImpl mockedBackend = mock(SlicingDiceBackendImpl.class);
when(mockedBackend.doSlicingDiceRequest(anyString(), anyString(), anyString())).thenReturn(
jsonRes);
doCallRealMethod().when(mockedBackend).createColumns(anyString());

try {
mockedBackend.setHttpClient(mockHttpClient);
mockedBackend.createColumns(COLUMNS_TO_CREATE);
} catch (final Exception e) {
Assert.fail(e.getMessage());
} finally {
Assert.assertTrue(true);
}
}

@Test
public void testColumnError400() throws IOException, CygnusRuntimeError, CygnusPersistenceError {
System.out.println("Testing SlicingDiceBackendImpl.createColumns");

HttpResponseFactory factory = new DefaultHttpResponseFactory();
HttpResponse response = factory
.newHttpResponse(new BasicStatusLine(HttpVersion.HTTP_1_1, 400, null), null);
response.setHeader("Content-Type", "application/json");
String responseStr = "{\"errors\": [{\"code\": 4026, \"message\": \"Query: Invalid query format. Must be a list.\"}]}";
response.setEntity(new StringEntity(responseStr));
final JsonResponse jsonRes = backend.createJsonResponse(response);

final SlicingDiceBackendImpl mockedBackend = mock(SlicingDiceBackendImpl.class);
when(mockedBackend.doSlicingDiceRequest(anyString(), anyString(), anyString())).thenReturn(
jsonRes);
doCallRealMethod().when(mockedBackend).createColumns(anyString());

try {
mockedBackend.setHttpClient(mockHttpClient);
mockedBackend.createColumns(COLUMNS_TO_CREATE);
Assert.fail();
} catch (final Exception e) {
// this response should call an exception because isn't a normal behavior
Assert.assertTrue(true);
}
}

@Test
public void testInsertContextData() {
System.out.println("Testing SlicingDiceBackendImpl.insertContextData");

try {
backend.setHttpClient(mockHttpClient);
backend.insertContextData(VALUES_TO_INSERT);
} catch (final Exception e) {
Assert.fail(e.getMessage());
} finally {
Assert.assertTrue(true);
}
}
}
Loading