Skip to content

Commit

Permalink
Updates and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Sep 14, 2024
1 parent 1aa9e10 commit e6e738b
Show file tree
Hide file tree
Showing 15 changed files with 1,343 additions and 1,712 deletions.
36 changes: 9 additions & 27 deletions include/smax-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
#ifndef SMAX_PRIVATE_H_
#define SMAX_PRIVATE_H_

#define __XCHANGE_INTERNAL_API__ ///< User internal definitions

#include "smax.h"
#include "redisx.h"

#define RELEASEID "<release>" ///< Redis PUB/SUB channel prefix for wait release notifications.

/// \cond PROTECTED


typedef struct PullRequest {
char *group;
char *key;
Expand All @@ -31,31 +31,13 @@ typedef struct PullRequest {
struct PullRequest *next;
} PullRequest;

/**
* An single entry (array of doubles) in a SMA-X buffer,
*/
typedef struct Entry {
double t;
double *values;
} Entry;

/**
* A buffered sequence of SMA-X numerical data.
*
*/
typedef struct Buffer {
pthread_mutex_t mutex;
int id;
char *channel;
char *table;
char *key;
int count;
int size;
int firstIndex;
int n;
Entry *entries;
struct Buffer *next;
} Buffer;
int smaxLockConfig();
int smaxUnlockConfig();
int smaxLockNotify();
int smaxUnlockNotify();

long smaxGetHash(const char *buf, const int size, const XType type);

int smaxRead(PullRequest *req, int channel);
int smaxWrite(const char *group, const XField *f);
Expand All @@ -65,7 +47,7 @@ void smaxProcessPipedWritesAsync(RESP *reply);
unsigned char smaxGetHashLookupIndex(const char *group, int lGroup, const char *key, int lKey);
char *smaxGetUpdateChannelPattern(const char *table, const char *key);
int smaxStorePush(const char *table, const XField *field);
void smaxTransmitErrorHandler(Redis *r, int channel, const char *op);
void smaxTransmitErrorHandler(Redis *r, enum redisx_channel channel, const char *op);
int smaxScriptError(const char *name, int status);
int smaxScriptErrorAsync(const char *name, int status);
boolean smaxIsDisabled();
Expand Down
70 changes: 39 additions & 31 deletions include/smax.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,29 @@
#include "redisx.h"
#include "xchange.h"

#ifndef SMAX_DEFAULT_HOSTNAME
# define SMAX_DEFAULT_HOSTNAME "smax" ///< Host name of Redis server used for SMA-X.
#endif

#define SMAX_DEFAULT_HOSTNAME "smax" ///< Host name of Redis server used for SMA-X.
#ifndef SMAX_DEFAULT_PIPELINE_ENABLED
# define SMAX_DEFAULT_PIPELINE_ENABLED TRUE ///< Whether pipelining is enabled by default.
#endif

#define SMAX_DEFAULT_PIPELINE_ENABLED TRUE ///< Whether pipelining is enabled by default.
#ifndef SMAX_RESTORE_QUEUE_ON_RECONNECT
# define SMAX_RESTORE_QUEUE_ON_RECONNECT TRUE ///< Whether read queues are restored if SMA-X is disconnected/reconnected.
#endif

#define SMAX_RESTORE_QUEUE_ON_RECONNECT TRUE ///< Whether read queues are restored if SMA-X is disconnected/reconnected.
#define SMAX_DEFAULT_MAX_QUEUED 1024 ///< Maximum number of pull requests allowed to be queued at once.
#define SMAX_PIPE_READ_TIMEOUT_MILLIS 3000 ///< (ms) Timeout for pipelined (queued) pull requests
#ifndef SMAX_DEFAULT_MAX_QUEUED
# define SMAX_DEFAULT_MAX_QUEUED 1024 ///< Maximum number of pull requests allowed to be queued at once.
#endif

#define SMAX_RECONNECT_RETRY_SECONDS 3 ///< (s) Time between reconnection attempts on lost SMA-X connections.
#ifndef SMAX_PIPE_READ_TIMEOUT_MILLIS
# define SMAX_PIPE_READ_TIMEOUT_MILLIS 3000 ///< (ms) Timeout for pipelined (queued) pull requests
#endif

#ifndef SMAX_RECONNECT_RETRY_SECONDS
# define SMAX_RECONNECT_RETRY_SECONDS 3 ///< (s) Time between reconnection attempts on lost SMA-X connections.
#endif


/**
Expand Down Expand Up @@ -152,28 +164,23 @@ void smaxResetMeta(XMeta *m);
int smaxGetMetaCount(const XMeta *m);
void smaxSetOrigin(XMeta *m, const char *origin);

// Configuration locks ---------------------------------------->
/// \cond PROTECTED
int smaxLockConfig();
int smaxUnlockConfig();
int smaxLockNotify();
int smaxUnlockNotify();
/// \endcond


// Globally available functions provided by SMA-X ------------->
int smaxSetServer(const char *host, int port);
int smaxSetAuth(const char *username, const char *password);
int smaxSetDB(int idx);
int smaxSetTcpBuf(int size);

int smaxConnect();
int smaxConnectTo(const char *server);
int smaxDisconnect();
int smaxIsConnected();
int smaxReconnect();

// Connect/disconnect callback hooks -------------------->
void smaxAddConnectHook(const void (*setupCall)(void));
void smaxRemoveConnectHook(const void (*setupCall)(void));
void smaxAddDisconnectHook(const void (*cleanupCall)(void));
void smaxRemoveDisconnectHook(const void (*cleanupCall)(void));

int smaxAddConnectHook(void (*setupCall)(void));
int smaxRemoveConnectHook(void (*setupCall)(void));
int smaxAddDisconnectHook(void (*cleanupCall)(void));
int smaxRemoveDisconnectHook(void (*cleanupCall)(void));

// Basic information exchage routines -------------------->
int smaxPull(const char *table, const char *key, XType type, int count, void *value, XMeta *meta);
Expand All @@ -194,19 +201,18 @@ double *smaxPullDoubles(const char *table, const char *key, XMeta *meta, int *n)
char **smaxPullStrings(const char *table, const char *key, XMeta *meta, int *n);
XStructure *smaxPullStruct(const char *name, XMeta *meta, int *status);

// Convenience method for structure fields --------------->

// Convenience methods for serialized strucures ---------->
boolean smaxGetBooleanField(const XStructure *s, const char *name, boolean defaultValue);
long long smaxGetLongField(const XStructure *s, const char *name, long long defaultValue);
double smaxGetDoubleField(const XStructure *s, const char *name, double defaultValue);
char *smaxGetRawField(const XStructure *s, const char *name, char *defaultValue);
int smaxGetArrayField(const XStructure *s, const char *name, void *dst, XType type, int count);
XField *smaxSetScalarField(XStructure *s, const char *name, XType type, const void *value);
XField *smaxSet1DField(XStructure *s, const char *name, XType type, int n, const void *value);


// Pipelined pull requests ------------------------------->
int smaxQueue(const char *table, const char *key, XType type, int count, void *value, XMeta *meta);
XSyncPoint *smaxCreateSyncPoint();
int smaxQueueCallback(void (*f)(char *), char *arg);
int smaxQueueCallback(void (*f)(void *), void *arg);
void smaxDestroySyncPoint(XSyncPoint *sync);
int smaxSync(XSyncPoint *sync, int timeoutMillis);
int smaxWaitQueueComplete(int timeoutMillis);
Expand Down Expand Up @@ -252,8 +258,8 @@ int smaxWaitOnSubscribedGroup(const char *matchTable, char **changedKey, int tim
int smaxWaitOnSubscribedVar(const char *matchKey, char **changedTable, int timeout);
int smaxWaitOnAnySubscribed(char **changedTable, char **changedKey, int timeout);
int smaxReleaseWaits();
void smaxAddSubscriber(const char *stem, RedisSubscriberCall f);
void smaxRemoveSubscribers(RedisSubscriberCall f);
int smaxAddSubscriber(const char *stem, RedisSubscriberCall f);
int smaxRemoveSubscribers(RedisSubscriberCall f);

// Messages --------------------------------------------------->
int smaxSendStatus(const char *msg);
Expand Down Expand Up @@ -306,6 +312,7 @@ int x2smaxStruct(XStructure *s);
int x2smaxField(XField *f);
int smax2xStruct(XStructure *s);
int smax2xField(XField *f);

XField *smaxCreateScalarField(const char *name, XType type, const void *value);
XField *smaxCreate1DField(const char *name, XType type, int size, const void *value);
XField *smaxCreateField(const char *name, XType type, int ndim, const int *sizes, const void *value);
Expand All @@ -331,24 +338,25 @@ char *smaxGetHostName();
void smaxSetHostName(const char *name);
char *smaxGetProgramID();
int smaxGetServerTime(struct timespec *t);
void smaxSetTcpBuf(int size);
int smaxGetTcpBuf();
const char *smaxErrorDescription(int code);
int smaxError(const char *func, int errorCode);

// Low-level utilities ---------------------------------------->
int smaxSplitID(char *id, char **pKey);
char *smaxStringType(XType type);
XType smaxTypeForString(const char *type);
int smaxUnpackStrings(const char *data, int len, int count, char **dst);
int smaxStringToValues(const char *str, void *value, XType type, int count, int *parsed);
char *smaxValuesToString(const void *value, XType type, int count, char *trybuf, int trylength);
int smaxTimestamp(char *buf);
int smaxParseTime(const char *timestamp, time_t *secs, long *nanosecs);
double smaxGetTime(const char *timestamp);
int smaxTimeToString(const struct timespec *time, char *buf);
long smaxGetHash(const char *buf, const int size, const XType type);
int smaxSetPipelineConsumer(void (*f)(RESP *));


#if !(__Lynx__ && __powerpc__)
int smaxDeletePattern(const char *pattern);
#endif


#endif /* SMAX_H_ */
Loading

0 comments on commit e6e738b

Please sign in to comment.