Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache Mongo-DB calls (in memory only) #998

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ jobs:
image: mongo:4.2
ports:
- 27017:27017

strategy:
matrix:
node-version:
Expand All @@ -74,6 +75,7 @@ jobs:
image: mongo:4.2
ports:
- 27017:27017

steps:
- name: Git checkout
uses: actions/checkout@v2
Expand Down
48 changes: 48 additions & 0 deletions doc/caching-strategy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Configuration Data Caching Strategies

The IoTAgent Library options to enforce several different data caching strategies to increase throughput. The actual
data caching strategy required in each use case will differ, and will always be a compromise between speed and data
latency. Several options are discussed below.

## Memory Only

if `deviceRegistry.type = memory` within the config, a transient memory-based device registry will be used to register
all the devices. This registry will be emptied whenever the process is restarted. Since all data is lost on exit there
is no concept of disaster recovery. **Memory only** should only be used for testing and in scenarios where all
provisioning can be added by a script on start-up.

## MongoDB

if `deviceRegistry.type = mongodb` within the config. a MongoDB database will be used to store all the device
information, so it will be persistent from one execution to the other. This offers a disaster recovery mechanism so that
when any IoT Agent goes down, data is not lost. Furthermore since the data is no longer held locally and is always
received from the database this allows multiple IoT Agent instances to access the same data in common.

### MongoDB without Cache

This is the default operation mode with Mongo-DB. Whenever a measure is received, provisioning data is requested from
the database. This may become a bottleneck in high availability systems.

### MongoDB with in-memory Cache

if `memCache.enabled = true` within the config this provides a transient memory-based cache in front of the mongo-DB
instance. It effectively combines the advantages of fast in-memory access with the reliability of a Mongo-DB database.

```javascript
memCache = {
enabled: true,
deviceMax: 200,
deviceTTL: 60,
groupMax: 50,
groupTTL: 60,
};
```

The memCache data is not shared across instances and therefore should be reserved to short term data storage. Multiple
IoT Agents would potential hold inconsistent provisioning data until the cache has expired.

## Bypassing cache

In some cases consistent provisioning data is more vital than throughput. When creating or updating a provisioned device
or service group adding a `cache` attribute with the value `true` will ensure that the data can be cached, otherwise it
is never placed into a cache and therefore always consistently received from the Mongo-DB instance.
2 changes: 2 additions & 0 deletions doc/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ Module mocking during testing can be done with [proxyquire](https://github.com/t
To run tests, type

```bash
docker run --name mongodb --publish 27017:27017 --detach mongo:4.2

npm test
```

Expand Down
10 changes: 5 additions & 5 deletions doc/howto.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,11 +395,11 @@ function updateContextHandler(id, type, service, subservice, attributes, callbac
}
```

The updateContext handler deals with the modification requests that arrive at the North Port of the IoT Agent via `/v2/op/update`. It is
invoked once for each entity requested (note that a single request can contain multiple entity updates), with the same
parameters used in the queryContext handler. The only difference is the value of the attributes array, now containing a
list of attribute objects, each containing name, type and value. The handler must also make use of the callback to
return a list of updated attributes.
The updateContext handler deals with the modification requests that arrive at the North Port of the IoT Agent via
`/v2/op/update`. It is invoked once for each entity requested (note that a single request can contain multiple entity
updates), with the same parameters used in the queryContext handler. The only difference is the value of the attributes
array, now containing a list of attribute objects, each containing name, type and value. The handler must also make use
of the callback to return a list of updated attributes.

For this handler we have used a helper function called `createQueryFromAttributes()`, that transforms the NGSI
representation of the attributes to the UL type expected by the device:
Expand Down
17 changes: 16 additions & 1 deletion doc/installationguide.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ used for the same purpose. For instance:

```javascript
{
type: 'mongodb';
type: "mongodb";
}
```

Expand Down Expand Up @@ -204,6 +204,21 @@ used for the same purpose. For instance:
}
```

- **memCache**: Whether to use a memory cache in front of Mongo-DB when using the `mongodb` **deviceRegistry** option
to reduce I/O. This memory cache will hold and serve a set of recently requested groups and devices (up to a given
maximum time-to-live) and return the cached response so long as the value is still within `TTL`. When enabled the
default values are to hold up to 200 devices and 160 groups in memory and retain values for up to 60 seconds.

```javascript
{
enabled: true,
deviceMax: 200,
deviceTTL: 60,
groupMax: 50,
groupTTL: 60
}
```

- **iotManager**: configures all the information needed to register the IoT Agent in the IoTManager. If this section
is present, the IoTA will try to register to a IoTAM in the `host`, `port` and `path` indicated, with the
information configured in the object. The IoTAgent URL that will be reported will be the `providedUrl` (described
Expand Down
40 changes: 36 additions & 4 deletions lib/commonConfig.js
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ function processEnvironmentVariables() {
'IOTA_AUTH_TOKEN_PATH',
'IOTA_AUTH_PERMANENT_TOKEN',
'IOTA_REGISTRY_TYPE',
'IOTA_MEMCACHE_ENABLED',
'IOTA_MEMCACHE_DEVICE_MAX',
'IOTA_MEMCACHE_DEVICE_TTL',
'IOTA_MEMCACHE_GROUP_MAX',
'IOTA_MEMCACHE_GROUP_TTL',
'IOTA_LOG_LEVEL',
'IOTA_TIMESTAMP',
'IOTA_IOTAM_HOST',
Expand Down Expand Up @@ -243,9 +248,9 @@ function processEnvironmentVariables() {
config.contextBroker.jsonLdContext = process.env.IOTA_JSON_LD_CONTEXT.split(',').map((ctx) => ctx.trim());
}

if (Array.isArray(config.contextBroker.jsonLdContext) && config.contextBroker.jsonLdContext.length === 1){
if (Array.isArray(config.contextBroker.jsonLdContext) && config.contextBroker.jsonLdContext.length === 1) {
config.contextBroker.jsonLdContext = config.contextBroker.jsonLdContext[0];
}
}

config.contextBroker.fallbackTenant =
process.env.IOTA_FALLBACK_TENANT || config.contextBroker.service || 'iotagent';
Expand Down Expand Up @@ -323,6 +328,32 @@ function processEnvironmentVariables() {
config.deviceRegistry.type = process.env.IOTA_REGISTRY_TYPE;
}

// In Memory Caching policy
config.memCache = config.memCache || {};
if (process.env.IOTA_MEMCACHE_ENABLED) {
config.memCache.enabled = process.env.IOTA_MEMCACHE_ENABLED === 'true';
}
const memCache = config.memCache;
if (memCache.enabled) {
memCache.deviceMax = memCache.deviceMax || 200;
memCache.deviceTTL = memCache.deviceTTL || 60;
memCache.groupMax = memCache.groupMax || 50;
memCache.groupTTL = memCache.groupTTL || 60;

if (process.env.IOTA_MEMCACHE_DEVICE_MAX) {
memCache.deviceMax = process.env.IOTA_MEMCACHE_DEVICE_MAX;
}
if (process.env.IOTA_MEMCACHE_DEVICE_TTL) {
memCache.deviceTTL = process.env.IOTA_MEMCACHE_DEVICE_TTL;
}
if (process.env.IOTA_MEMCACHE_GROUP_MAX) {
memCache.groupMax = process.env.IOTA_MEMCACHE_GROUP_MAX;
}
if (process.env.IOTA_MEMCACHE_GROUP_TTL) {
memCache.groupTTL = process.env.IOTA_MEMCACHE_GROUP_TTL;
}
}

// Log Level configuration
if (process.env.IOTA_LOG_LEVEL) {
config.logLevel = process.env.IOTA_LOG_LEVEL;
Expand Down Expand Up @@ -470,7 +501,9 @@ function processEnvironmentVariables() {
if (process.env.IOTA_DEFAULT_ENTITY_NAME_CONJUNCTION) {
config.defaultEntityNameConjunction = process.env.IOTA_DEFAULT_ENTITY_NAME_CONJUNCTION;
} else {
config.defaultEntityNameConjunction = config.defaultEntityNameConjunction ? config.defaultEntityNameConjunction : ':';
config.defaultEntityNameConjunction = config.defaultEntityNameConjunction
? config.defaultEntityNameConjunction
: ':';
}
}

Expand Down Expand Up @@ -525,7 +558,6 @@ function ngsiVersion() {
return 'unknown';
}


/**
* It checks if a combination of typeInformation or common Config is LD
*
Expand Down
3 changes: 3 additions & 0 deletions lib/fiware-iotagent-lib.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ function doActivate(newConfig, callback) {
registry = require('./services/devices/deviceRegistryMongoDB');
groupRegistry = require('./services/groups/groupRegistryMongoDB');
commandRegistry = require('./services/commands/commandRegistryMongoDB');

registry.initialiseCaches(newConfig);
groupRegistry.initialiseCaches(newConfig);
} else {
logger.info(context, 'Falling back to Transient Memory registry for NGSI Library');

Expand Down
3 changes: 2 additions & 1 deletion lib/model/Device.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ const Device = new Schema({
autoprovision: Boolean,
expressionLanguage: String,
explicitAttrs: Group.ExplicitAttrsType,
ngsiVersion: String
ngsiVersion: String,
cache: Boolean
});

function load(db) {
Expand Down
3 changes: 2 additions & 1 deletion lib/model/Group.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ const Group = new Schema({
explicitAttrs: ExplicitAttrsType,
defaultEntityNameConjunction: String,
ngsiVersion: String,
entityNameExp: String
entityNameExp: String,
cache: Boolean
});

function load(db) {
Expand Down
3 changes: 2 additions & 1 deletion lib/services/common/iotManagerService.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ function register(callback) {
expressionLanguage: service.expressionLanguage,
defaultEntityNameConjunction: service.defaultEntityNameConjunction,
ngsiVersion: service.ngsiVersion,
entityNameExp: service.entityNameExp
entityNameExp: service.entityNameExp,
cache: service.cache
};
}

Expand Down
64 changes: 62 additions & 2 deletions lib/services/devices/deviceRegistryMongoDB.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ const errors = require('../../errors');
const constants = require('../../constants');
const Device = require('../../model/Device');
const async = require('async');
const cacheManager = require('cache-manager');

let memoryCache;
let cache;

let context = {
op: 'IoTAgentNGSI.MongoDBDeviceRegister'
};
Expand All @@ -56,8 +61,46 @@ const attributeList = [
'timestamp',
'explicitAttrs',
'expressionLanguage',
'ngsiVersion'
'ngsiVersion',
'cache'
];
/**
* Sets up the in-memory cache for devices, should one be required.
*/
function initialiseCaches(config) {
function isCacheableValue(value) {
if (value !== null && value !== false && value !== undefined) {
return value.cache;
}
return false;
}

function initialiseMemCache(memCache, isCacheableValue) {
return cacheManager.caching({
store: 'memory',
max: memCache.deviceMax,
ttl: memCache.deviceTTL,
isCacheableValue
});
}

memoryCache = config.memCache.enabled ? initialiseMemCache(config.memCache, isCacheableValue) : undefined;

if (memoryCache) {
cache = memoryCache;
} else {
cache = undefined;
}
}

/**
* Empties the memory cache
*/
function clearCache() {
if (cache) {
cache.reset();
}
}

/**
* Generates a handler for the save device operations. The handler will take the customary error and the saved device
Expand Down Expand Up @@ -136,6 +179,7 @@ function removeDevice(id, service, subservice, callback) {
callback(new errors.InternalDbError(error));
} else {
logger.debug(context, 'Device [%s] successfully removed.', id);
clearCache();

callback(null);
}
Expand Down Expand Up @@ -223,7 +267,20 @@ function getDeviceById(id, service, subservice, callback) {
};
context = fillService(context, queryParams);
logger.debug(context, 'Looking for device with id [%s].', id);
findOneInMongoDB(queryParams, id, callback);

if (cache) {
cache.wrap(
JSON.stringify(queryParams),
(cacheCallback) => {
findOneInMongoDB(queryParams, id, cacheCallback);
},
(error, data) => {
callback(error, data);
}
);
} else {
findOneInMongoDB(queryParams, id, callback);
}
}

/**
Expand Down Expand Up @@ -289,6 +346,7 @@ function update(device, callback) {
if (error) {
callback(error);
} else {
clearCache();
data.lazy = device.lazy;
data.active = device.active;
data.internalId = device.internalId;
Expand All @@ -303,6 +361,7 @@ function update(device, callback) {
data.registrationId = device.registrationId;
data.explicitAttrs = device.explicitAttrs;
data.ngsiVersion = device.ngsiVersion;
data.cache = device.cache;

/* eslint-disable-next-line new-cap */
const deviceObj = new Device.model(data);
Expand Down Expand Up @@ -370,3 +429,4 @@ exports.getSilently = getDevice;
exports.getByName = alarmsInt(constants.MONGO_ALARM, getByName);
exports.getByNameAndType = alarmsInt(constants.MONGO_ALARM, getByNameAndType);
exports.clear = alarmsInt(constants.MONGO_ALARM, clear);
exports.initialiseCaches = initialiseCaches;
Loading