///////////////////////////////////////////////////////////////////////////////////////////////////
///
/// WIENER Plein & Baus OPC DA Server
/// Copyright © 2005 WIENER Plein & Baus GmbH
///
/// \if SOURCEDOC
/// \file IoSnmp.cpp
/// \brief SNMP-based functions (initialisation, threads, ...)
/// \endif
///
///////////////////////////////////////////////////////////////////////////////////////////////////
#include "stdafx.h"
#include "IoSnmp.h" // SNMP
#include "config.h" // configuration file access
#include "SrvApp.h"
#include "DAObjs.h"
#define LOG_DEBUG_TIMEOUT 0
////#undef SYSLOG
//// #define SYSLOG(PRIORITY,printf_args) do { if(::getSOCmnTrace()->m_traceLevel[PRIORITY&3] & (PRIORITY&0xFFFFFFFC)) \
//// syslog(PRIORITY,syslog_printf printf_args);} while(0)
//#define _DEBUG
CSnmpOid::CSnmpOid() {
m_poid = NULL;
m_length = 0;
m_name.empty();
}
CSnmpOid::CSnmpOid(const char* name) {
m_poid = NULL;
initOid(name);
}
CSnmpOid::~CSnmpOid() {
if(m_poid) {
SYSLOG(LOG_DEBUG,("OID \"%s\" deleted",m_name));
delete[] m_poid;
}
}
//-----------------------------------------------------------------------------
// initOid
//
// - translate the textual object identifier to its corresponding list of
// sub-identifiers
//
// returns:
// address of the oid list
// NULL on any error
//
oid* CSnmpOid::initOid(const char* format,unsigned channelNumber) {
char* channelName = new char[256];
sprintf(channelName,format,channelNumber);
oid* poid = initOid(channelName);
delete [] channelName;
return poid;
}
oid* CSnmpOid::initOid(const char* name) {
// re-init: free previous oid first
if(m_poid) delete[] m_poid;
// Take a string containing a textual version of an object
// identifier (in either numeric or descriptor form), and transforms this
// into the corresponding list of sub-identifiers
oid localOid[MAX_OID_LEN];
size_t oidLength = MAX_OID_LEN;
if(!get_node(name,localOid,&oidLength)) {
m_poid = NULL;
m_length = 0;
m_name.empty();
syslog(LOG_ERR,"OID \"%s\" not found",name);
return NULL;
}
// store the oid
m_poid = new oid[oidLength];
memcpy(m_poid,localOid,sizeof(oid)*oidLength);
m_length = oidLength;
m_name = name;
SYSLOG(LOG_DEBUG,("OID \"%s\" initialized, %d elements",name,oidLength));
return m_poid;
} // initOid
///////////////////////////////////////////////////////////////////////////////////////////////////
/// Compare the OID of this object with another OID
///
/// \returns
/// - NULL if identical
/// - !NULL if different
///
int CSnmpOid::compareOid(CSnmpOid* poid) {
size_t length;
if((length = getOidLength())/*m_length*/ != poid->getOidLength()/*m_length*/) return !NULL;
oid* myOid = getOid()/*m_poid*/;
oid* otherOid = poid->getOid()/*m_poid*/;
for(/*size_t length = m_length*/;length > 0;length--) {
if(*myOid++ != *otherOid++) return !NULL; // not identical
}
return NULL; // identical
} //compareOid
///////////////////////////////////////////////////////////////////////////////////////////////////
// Configuration file format: SNMP
/**
\page configSNMP SNMP
All SNMP-specific parameters.
\CfgNode SNMP
\CfgParent \ref configfile "WienerOPCS"
\CfgChildren none
\CfgAttributes
\par Element Attributs
DOCTABHEAD Attribute | Default | Description
DOCTABLINE ReadCommunity | public | Community name for SNMP get operations.
DOCTABLINE WriteCommunity | private | Community name for SNMP set operations.
DOCTABTAIL
\subsection rwAccess Access Level
The SNMP subsystem uses different community names for different access levels. The higher
access level includes the lower ones.
The OPC server uses the ReadCommunity string for all read operations, and the WriteCommunity
string for all write operations.
The ReadCommunity and WriteCommunity attributes must be set to the appropriate community
name of the controlled crates.
Example: If change of the output voltage should be performed by the OPC server, the
WriteCommunity string must be set to the level 3 crate community name, default "guru".
DOCTABHEAD Access Level | SNMP Default Community Name | Description
DOCTABLINE R0 | public | Only read access.
DOCTABLINE R0, W1 | private | Switching channels on and off allowed.
DOCTABLINE R0, W2 | admin | Change of supervision limits allowed.
DOCTABLINE R0, W3 | guru | Change of output voltage and current limit allowed.
DOCTABTAIL
\see \ref config_index "Configuration File Element Index"
*/
//-----------------------------------------------------------------------------
// IoSnmpInit
//
// - initialize the OPC server application
//
// returns:
// TRUE - OK
// FALSE - error
//
bool CSnmp::initialize(void) {
syslog(LOG_DEBUG,"*** Initialise SNMP ***");
InitializeCriticalSection(&m_criticalSection);
init_snmp("WienerOPCS"); // WienerOPCS will be used to read WienerOPCS.conf files
init_mib(); // init MIB processing
if(!read_module("WIENER-CRATE-MIB")) { // read specific mibs
syslog(LOG_ERR,"Unable to load SNMP MIB file \"%s\"","WIENER-CRATE-MIB");
return false;
}
// read community strings from initialization file
char* community;
community = getServerApp()->getValue("WienerOPCS.SNMP","ReadCommunity",DEFAULT_READ_COMMUNITY);
m_readCommunity = community;
getServerApp()->release(community);
community = getServerApp()->getValue("WienerOPCS.SNMP","WriteCommunity",DEFAULT_WRITE_COMMUNITY);
m_writeCommunity = community;
getServerApp()->release(community);
syslog(LOG_DEBUG,"*** Initialise SNMP done ***");
SOCK_STARTUP; // only in main thread
return true;
}
void CSnmp::terminate(void) {
SOCK_CLEANUP;
DeleteCriticalSection(&m_criticalSection);
}
//void CSnmp::enterCriticalSection(void) {
// EnterCriticalSection(&m_criticalSection);
//}
//void CSnmp::leaveCriticalSection(void) {
// LeaveCriticalSection(&m_criticalSection);
//}
//-----------------------------------------------------------------------------
// CSnmpSession
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
// CSnmpSession
//
//
//
bool CSnmpSession::snmpOpen(LPCTSTR ipAddress) {
m_ipAddress = ipAddress;
m_numberOfPendingRequests = 0; // no requests open
struct snmp_session session;
snmp_sess_init(&session); // structure defaults
session.version = SNMP_VERSION_2c;
session.peername = strdup(m_ipAddress);
session.community = (u_char*)strdup(getServerApp()->m_snmp.getReadCommunity());
session.community_len = strlen((char*)session.community);
session.timeout = SNMP_TIMEOUT*1000; // ms -> us
session.retries = SNMP_RETRIES;
#ifdef SINGLE_SESSION_API
if(!(m_sessp = snmp_sess_open(&session))) {
#else
if(!(m_session = snmp_open(&session))) {
#endif
int liberr, syserr;
char *errstr;
snmp_error(&session, &liberr, &syserr, &errstr);
syslog(LOG_ERR,"Open SNMP session for host \"%s\": %s",m_ipAddress,errstr);
free(errstr);
return false;
}
#ifdef SINGLE_SESSION_API
m_session = snmp_sess_session(m_sessp); // get the session pointer
#endif
SYSLOG(LOG_INFO,("SNMP session for host \"%s\" opened",m_ipAddress));
return true;
}
void CSnmpSession::snmpClose(void) { // evtl. in SNMP_SESSION destructor ?
#ifdef SINGLE_SESSION_API
if(!m_sessp) return;
if(!snmp_sess_close(m_sessp)) {
#else
if(!m_session) return;
if(!snmp_close(m_session)) {
#endif
syslog(LOG_ERR,"Close SNMP session for host \"%s\": ERROR",m_ipAddress);
}
else syslog(LOG_INFO,"SNMP session for host \"%s\" closed",m_ipAddress);
#ifdef SINGLE_SESSION_API
m_sessp = NULL;
#endif
m_session = NULL;
}
//-----------------------------------------------------------------------------
// CSnmpSession
//
// Return: Pointer to an error message. Must be freed after usage.
//
char* CSnmpSession::getErrorString(void) {
int liberr, syserr;
char *errstr;
snmp_sess_error(m_sessp,&liberr,&syserr,&errstr);
return errstr;
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// SNMP event response handler.
///
/// Is called when the requested data arrives or on timeout.
//
/// TIMEOUT event:
/// All associated requests are completed with bad status
//
/// DATA event:
///
///
/// \return ?
///
int CSnmpSession::callback(int operation, struct snmp_session *pSession, int reqid,
struct snmp_pdu *pdu,void *magic) {
SOCmnPointer thisCrate = (CNodeCrate*)magic;
// complain about wrong request id
if(thisCrate->m_requestId != reqid) {
SYSLOG(LOG_ERR,("SNMP callback: Crate \"%s\": got wrong requestId %d, expected %d"
,thisCrate->m_ipAddress,reqid,thisCrate->m_requestId));
return 1;
}
// !!TODO: Test, ob Anzahl stimmt
/* ////////////////////
unsigned nRequest = thisCrate->m_numberOfPendingRequests;
while(nRequest--) {
SOCmnPointer cache;
cache = pRequest->getCache();
cache->setQuality(quality);
pRequest->complete(S_OK);
}
///////////////
*/
WORD quality;
switch(operation) {
//---------------------------------------------------------------------------------------------
/// DATA arrived event:
case NETSNMP_CALLBACK_OP_RECEIVED_MESSAGE:
/// received data has error => OPC_QUALITY_COMM_FAILURE
if(pdu->errstat != SNMP_ERR_NOERROR) {
SYSLOG(LOG_ERR,("SNMP callback: Crate \"%s\": ErrorStatus %d, ErrorIndex %d"
,thisCrate->m_ipAddress,pdu->errstat,pdu->errindex));
quality = OPC_QUALITY_COMM_FAILURE;
break;
}
/// DATA OK => call dataReceived for each SNMP var
else { // if(pdu->errstat == SNMP_ERR_NOERROR)
CCrateRequest **ppRequests = thisCrate->m_requests;
for(struct variable_list *vars = pdu->variables;vars;vars = vars->next_variable) {
CCrateRequest *pRequest = *ppRequests++;
pRequest->dataReceived(vars
#ifdef _DEBUG
,thisCrate->getIpAddress()
#endif
);
thisCrate->m_numberOfPendingRequests--;
} // for(vars = ...)
} // if(pdu->errstat == SNMP_ERR_NOERROR)
return 1;
//---------------------------------------------------------------------------------------------
/// ERROR event: OPC_QUALITY_NOT_CONNECTED or OPC_QUALITY_COMM_FAILURE
case NETSNMP_CALLBACK_OP_TIMED_OUT:
syslog(LOG_WARNING,"SNMP callback: Crate \"%s\": OP_TIMED_OUT"
,thisCrate->m_ipAddress);
quality = OPC_QUALITY_NOT_CONNECTED;
break;
case NETSNMP_CALLBACK_OP_SEND_FAILED:
syslog(LOG_WARNING,"SNMP callback: Crate \"%s\": OP_SEND_FAILED"
,thisCrate->m_ipAddress);
quality = OPC_QUALITY_COMM_FAILURE;
break;
case NETSNMP_CALLBACK_OP_CONNECT:
syslog(LOG_WARNING,"SNMP callback: Crate \"%s\": OP_CONNECT"
,thisCrate->m_ipAddress);
quality = OPC_QUALITY_COMM_FAILURE;
break;
case NETSNMP_CALLBACK_OP_DISCONNECT:
syslog(LOG_WARNING,"SNMP callback: Crate \"%s\": OP_DISCONNECT"
,thisCrate->m_ipAddress);
quality = OPC_QUALITY_COMM_FAILURE;
break;
default:
syslog(LOG_WARNING,"SNMP callback: Crate \"%s\": unknown OP %d"
,thisCrate->m_ipAddress,operation);
quality = OPC_QUALITY_COMM_FAILURE;
break;
}
thisCrate->completeRequests(quality); // complete all requests in the m_requests list
return 1;
}
///////////////////////////////////////////////////////////////////////////////////////////////////
/// Process one SNMP variable and inform the OPC request.
///
/// \return nothing
///
void CCrateRequest::dataReceived(struct variable_list *vars
#ifdef _DEBUG
,SOCmnString ipAddress,unsigned recursiveCall
#endif
) {
/// If the request has duplicates, this function calles itself for each duplicate recursively.
if(m_associatedRequest) m_associatedRequest->dataReceived(vars
#ifdef _DEBUG
,ipAddress,recursiveCall+1
#endif
); // handle associated requests, too
m_associatedRequest = NULL;
SOCmnPointer itemTag = /*pRequest->*/getItemTag();
_ASSERTION((itemTag->getUserData()&OBJECT_TYPE_CRATE_TAG) == OBJECT_TYPE_CRATE_TAG,"Unexpected class");
CTag* ctag = (CTag*)(SODaSItemTag*)itemTag;
//-----------------------------------------------------------------------------------------------
/// READ operation:
if(getOperation() == SODaSRequest::read) {
SOCmnPointer cache = getCache();
SOCmnVariant value;
/// Call getSnmpVar. If no failure, copy the data to OPC and set the OPC quality to OPC_QUALITY_GOOD
if(ctag->getSnmpVar(vars,value)) {
SYSLOG(LOG_DEBUG,("SNMP callback(%d): Crate \"%s\": Tag \"%s\": Complete read request 0x%X with OPC_QUALITY_GOOD"
,recursiveCall,ipAddress,ctag->getFullName(),this));
cache->setValue(value);
cache->setQuality(OPC_QUALITY_GOOD);
}
/// If any failure, set the OPC cache quality to OPC_QUALITY_DEVICE_FAILURE
else {
SYSLOG(LOG_DEBUG,("SNMP callback(%d): Crate \"%s\": Tag \"%s\": Complete read request 0x%X with OPC_QUALITY_DEVICE_FAILURE"
,recursiveCall,ipAddress,ctag->getFullName(),this));
cache->setQuality(OPC_QUALITY_DEVICE_FAILURE);
}
completeIO(S_OK); /// and complete the request with completeIO(S_OK)
}
//-----------------------------------------------------------------------------------------------
/// WRITE operaton:
else {
/// complete the reuest with completeIO(S_OK)
SYSLOG(LOG_DEBUG,("SNMP callback(%d): Crate \"%s\": Tag \"%s\": Complete write request 0x%X"
,recursiveCall,ipAddress,ctag->getFullName(),this/*pRequest*/));
// maybe it could be usefull to update the cache with the value returned by snmpset ?
/* completeIO(S_OK) setzt den Cache sowieso auf den Write-Wert!
SOCmnPointer cache = getCache();
SOCmnVariant value;
cache->getValue(value);
if(ctag->getSnmpVar(vars,value)) {
cache->setValue(value);
cache->setQuality(OPC_QUALITY_GOOD);
}*/
completeIO(S_OK); // write requests are OK
}
}
void CCrateRequest::dataError(WORD quality,/*DEBUG ONLY*/SOCmnString ipAddress) {
if(m_associatedRequest) m_associatedRequest->dataError(quality,ipAddress); // handle associated requests, too
m_associatedRequest = NULL;
// DEBUG
do {
SOCmnPointer itemTag;
itemTag = getItemTag();
SOCmnString sQuality;
quality2string(&sQuality,quality);
SYSLOG(LOG_DEBUG,("Queue \"%s\": Complete request \"%s\" with %s, pRequest 0x%X"
,ipAddress,((CTag*)(SODaSItemTag*)itemTag)->getFullName(),sQuality,this/*pRequest*/));
} while(0);
// complete read request
if(/*pRequest->*/getOperation() == SODaSRequest::read) {
SOCmnPointer cache;
cache = /*pRequest->*/getCache();
cache->setQuality(quality);
/*pRequest->*/completeIO(S_OK);
}
// complete write request (arrives here only at errors)
else {
/*pRequest->*/completeIO(E_FAIL,true,quality);
}
}
//-----------------------------------------------------------------------------
// CTag I/O access
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
// getSnmpVar
//
bool CTag::getSnmpVar(IN struct variable_list* var,OUT SOCmnVariant &variant) {
switch(getNativeDatatype()) {
case VT_DATE:
if (var->type == ASN_TIMETICKS) { // 0x43
unsigned long timestamp = *var->val.integer;
variant.setDATE(((double)timestamp)*(1./(24.*3600.*100.)));
return true;
}
break;
case VT_R4:
case VT_R8:
if (var->type == ASN_OPAQUE_FLOAT) { // 0x78
variant = *var->val.floatVal;
return true;
}
else if (var->type == ASN_OPAQUE_DOUBLE) { // 0x79
variant = *var->val.doubleVal;
return true;
}
else if(var->type == ASN_INTEGER) { // 0x02
variant = (float)*var->val.integer;
return true;
}
break;
case VT_BSTR:
if (var->type == ASN_OCTET_STR) { // 0x04
char *sp = new char[var->val_len+1];
memcpy(sp,var->val.string,var->val_len);
sp[var->val_len] = '\0';
variant = sp;
delete[] sp;
return true;
}
break;
case VT_BOOL:
if(var->type == ASN_INTEGER) { // 0x02
variant = (bool) (*var->val.integer != 0) ? true : false;
return true;
}
break;
case VT_I2:
case VT_I4:
if(var->type == ASN_INTEGER) { // 0x02
variant = *var->val.integer;
return true;
}
if(var->type == ASN_OCTET_STR) { // 0x04
unsigned long bitstring = 0;
for(int cpos = var->val_len-1;cpos >= 0;cpos--) {
unsigned char octet = var->val.string[cpos];
for(int bpos = 0;bpos < 8;bpos++) { // convert one character
bitstring <<= 1;
if(octet&0x01) bitstring |= 1;
octet >>= 1;
}
}
variant = bitstring;
return true;
}
break;
default:
break;
} // switch
/// Check SNMP message for special conditions:
if(var->type ==SNMP_NOSUCHOBJECT) { /// NOSUCHOBJECT
syslog(LOG_WARNING,"OPC tag \"%s\": No SNMP object \"%s\"",getName(),getOidName());
return false;
}
syslog(LOG_ERR,"CTag::getSnmpVar: unimplemented variable combination: SNMP(0x%X)->OPC(%s)"
,var->type,VARTYPE2STR(getNativeDatatype()));
return false;
} //getSnmpVar
bool CTag::setSnmpVar(IN SOCmnVariant writeValue,OUT struct snmp_pdu* pdu /*variable_list* var*/) {
int itmp;
long ltmp;
float vtmp;
switch(getNativeDatatype()) {
case VT_BOOL:
ltmp = writeValue.bVal ? 1 : 0;
snmp_pdu_add_variable(pdu,getOid(),getOidLength(),ASN_INTEGER,(u_char*)<mp,sizeof(ltmp));
return true;
case VT_R4:
vtmp = writeValue.fltVal;
snmp_pdu_add_variable(pdu,getOid(),getOidLength(),ASN_OPAQUE_FLOAT,(u_char*)&vtmp,sizeof(vtmp));
return true;
case VT_I2:
itmp = writeValue.iVal;
snmp_pdu_add_variable(pdu,getOid(),getOidLength(),ASN_INTEGER,(u_char*)&itmp,sizeof(itmp));
return true;
default:
break;
} // switch
syslog(LOG_WARNING,"CTag::setSnmpVar: unimplemented variable combination: OPC(%s)->SNMP"
,VARTYPE2STR(getNativeDatatype()));
return false;
} // setSnmpVar
///////////////////////////////////////////////////////////////////////////////////////////////////
/// CBitTag SNMP get data
///
bool CBitTag::getSnmpVar(IN struct variable_list* var,OUT SOCmnVariant &variant) {
_ASSERTION(m_commonDataTag.isNotNull(),"CBitTag must have a data tag");
SOCmnPointer cache = m_commonDataTag->getCache();
SOCmnVariant value;
/// Use the associated data tag to convert the data
if(!m_commonDataTag->getSnmpVar(var,value)) { // SNMP data -> variant
cache->setQuality(OPC_QUALITY_DEVICE_FAILURE); // die Quality ist evtl so nicht exakt, besser die Daten über ganz normalen request holen und in dieser Funktion nur Auswerten (auch mit Blick auf CANbus)
return false;
}
/// and store the data in that cache (necessary for coming write operations)
cache->setValue(value);
cache->setQuality(OPC_QUALITY_GOOD);
int bits = (value.iVal & m_dataMask)>>m_dataBitPosition;
switch(getNativeDatatype()) {
case VT_I2:
// if(var->type == ASN_INTEGER) { // 0x02
// int i = *var->val.integer;
// variant = (int) ((i & m_dataMask)>>m_dataBitPosition);
variant = (int)bits;
return true;
// }
// break;
case VT_BOOL:
//if(var->type == ASN_OCTET_STR) { // 0x04
// unsigned long bitstring = 0;
// for(int cpos = var->val_len-1;cpos >= 0;cpos--) {
// unsigned char octet = var->val.string[cpos];
// for(int bpos = 0;bpos < 8;bpos++) { // convert one character
// bitstring <<= 1;
// if(octet&0x01) bitstring |= 1;
// octet >>= 1;
// }
// }
// // TODO: Ergebnis in Elternknoten speichern
// variant = (bool) (((bitstring & m_dataMask)>>m_dataBitPosition) != 0) ? true : false;
variant = (bool) (bits != 0) ? true : false;
return true;
// } // if(var->type == ASN_OCTET_STR)
// break;
default:
break;
} // switch
syslog(LOG_WARNING,"CBitTag::getSnmpVar: unimplemented variable combination: SNMP(0x%X)->OPC(%s)"
,var->type,VARTYPE2STR(getNativeDatatype()));
return false;
} //getSnmpVar
///////////////////////////////////////////////////////////////////////////////////////////////////
/// CBitTag SNMP set data
///
bool CBitTag::setSnmpVar(IN SOCmnVariant writeValue,OUT struct snmp_pdu* pdu) {
_ASSERTION(m_commonDataTag.isNotNull(),"CBitTag must have a data tag");
SOCmnPointer cache = m_commonDataTag->getCache(); /// get the complete data from commonDataTag
SOCmnVariant value;
cache->getValue(value);
unsigned long allBits = value.ulVal;
allBits &= ~m_dataMask; /// clear the bits to be changed
unsigned long newBits = writeValue.ulVal; /// isolate new data
newBits <<= m_dataBitPosition;
newBits &= m_dataMask;
allBits |= newBits; /// generate new complete data and
writeValue = allBits; /// call snmpSetVar of commonDataTag
return m_commonDataTag->setSnmpVar(writeValue,pdu); /// \return The return value of the commonDataTag setSnmpVar function.
}
//-----------------------------------------------------------------------------
// Toolkit I/O handling thread
//-----------------------------------------------------------------------------
//
//-----------------------------------------------------------------------------
// ioThreadSnmp
//
// - main routine of I/O SNMP transmit thread
//
CTime now; // one time for all requests
// Achtung: Rückgabewert enthält keine Fehlerabfrage !!
// Return: Time until next timeout [ms]
DWORD CSnmpSession::requestSelectEvent() {
m_fds = 0; // not used in windows
// fd_set fdset;
FD_ZERO(&m_fdset);
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 100000;
int block = 1;
// getServerApp()->m_snmp.enterCriticalSection();
int numberOfOpenSessions = snmp_sess_select_info(m_sessp,&m_fds,&m_fdset,&timeout,&block);
// getServerApp()->m_snmp.leaveCriticalSection();
_ASSERTION(numberOfOpenSessions == 1,"Must be 1 in single session API");
// HANDLE hEventObject = hSnmpEvent;//!!getServerApp()->m_snmpEventHandle;
HANDLE hEventObject = getServerApp()->m_snmpEventHandle;
int rc = WSAEventSelect(m_fdset.fd_array[0],hEventObject,FD_READ);
if(rc != 0) {
SYSLOG(LOG_ERR,("WSAEventSelect(): WSAGetLastError 0x%X",WSAGetLastError()));
}
DWORD dwTimeout;
if(block) {
dwTimeout = INFINITE;
m_timeoutAt.setInfinite();
}
else {
dwTimeout = (timeout.tv_usec/1000)+timeout.tv_sec*1000;
m_timeoutAt = now; // update timeout time
m_timeoutAt.addMilliseconds(dwTimeout);
}
SYSLOG(LOG_DEBUG_TIMEOUT,("WSAEventSelect(): timeout %d",dwTimeout));
return dwTimeout;
}
/*
Timeout-Behandlung:
- Beim Senden eines Requests wird eine relative Timeout-Zeit definiert (z.B. 5000 ms)
- Die Event-Steuerung erwartet als Timeout eine relative Zeit (z.B. 5000 ms)
=> Bei einem Request / Threat (d.h. Event) kein Problem (Der Thread wird nur beim Enitreffen
der Daten oder Timeout aufgeweckt, NICHT beim eintreffen neuer Daten in die Queue!)
Sonst: Nach jedem Aufwecken des Threads muss die nächste sinnvolle Timeout-Zeit bestimmt
werden!
Möglichkeiten:
- Speichern der absoluten Timeout-Zeit jedes Requests, beim Warten auf Event wird die kürzeste Zeit
verwendet.
-
*/
DWORD _stdcall ioThreadSnmp(void *context) {
SYSLOG(LOG_INFO,("Start thread: ioThreadSnmp"));
CServerApp *pServerApp = getServerApp();
SOCmnThread *pMe = &pServerApp->m_ioThreadSnmp;
DWORD waitR;
DWORD timeoutMilliSeconds = INFINITE; // thread wait time until next timeout
while(true) {
// wait for new requests or the end
SYSLOG(LOG_DEBUG_TIMEOUT,("ioThreadSnmp: waiting for events (timeout %d ms)...",timeoutMilliSeconds));
waitR = pMe->waitForEvents(timeoutMilliSeconds); // wait ...
if (waitR == pMe->getStopEventId()) break;
timeoutMilliSeconds = INFINITE; // initialize next timeout
now.now(); // read one time for all requests
if(waitR == pServerApp->m_snmpEventId)
SYSLOG(LOG_INFO,("ioThreadSnmp: SNMP EVENT"));
else if(waitR == SOCMNEVENT_INVALID_ID)
SYSLOG(LOG_INFO,("ioThreadSnmp: TIMEOUT EVENT"));
else
SYSLOG(LOG_INFO,("ioThreadSnmp: QUEUE EVENT"));
// process ALL request queues /////////////////////////////////////////////
SOCmnList crateList(pServerApp->m_crateList);
SOCmnListPosition cratePos;
CNodeCrate *pCrate;
cratePos = crateList.getStartPosition();
while(cratePos) { // for all request in the queue
pCrate = crateList.getNext(cratePos);
// SNMP event (data from I/O has arrived) ///////////////////////////////
if(waitR == pServerApp->m_snmpEventId) {
if(pCrate->m_numberOfPendingRequests == 0) continue; // this crate has no pending i/o => no timeout update
SYSLOG(LOG_DEBUG,("snmp_sess_read \"%s\": start",pCrate->getName()));
// check if data has arrived for this socket
unsigned long nBytesReceived;
int rc = ioctlsocket(pCrate->m_fdset.fd_array[0],FIONREAD,&nBytesReceived);
if(rc != 0) {
SYSLOG(LOG_ERR,("ioctlsocket(): WSAGetLastError 0x%X",WSAGetLastError()));
}
if(nBytesReceived != 0) {
if(snmp_sess_read(pCrate->m_sessp,&pCrate->m_fdset)) { // read data & call callback
char* snmpError = pCrate->getErrorString();
syslog(LOG_INFO,"SNMP: snmp_read->\"%s\": %s",pCrate->getName(),snmpError);
free(snmpError);
}
pCrate->removeCompletedRequestsFromQueue();
} // if(nBytesReceived != 0)
}
// TIMEOUT event ////////////////////////////////////////////////////////
else if(waitR == SOCMNEVENT_INVALID_ID) {
if(pCrate->m_numberOfPendingRequests == 0) continue; // this crate has no pending i/o => no timeout update
SYSLOG(LOG_DEBUG,("snmp_sess_timeout \"%s\": start",pCrate->getName()));
snmp_sess_timeout(pCrate->m_sessp); // check if it's necessary to resent the request
pCrate->requestSelectEvent(); // select() will generate an event
pCrate->removeCompletedRequestsFromQueue(); // may be the request has been canceled
}
// check if somethin could be sent now //////////////////////////////////
pCrate->processRequestQueue();
DWORD timediff = pCrate->m_timeoutAt.getRelativeTime(now); // timeout update
if(timeoutMilliSeconds > timediff) {
SYSLOG(LOG_DEBUG_TIMEOUT,("**** NEW Timeout %d",timediff));
timeoutMilliSeconds = timediff;
}
//pCrate->removeCompletedRequestsFromQueue();
} // while(cratePos)
} // thread main loop
SYSLOG(LOG_INFO,("Stop thread: ioThreadSnmp"));
return 0;
} // ioThreadSnmp
/// scan the request queue for new requests, generate PDU and transmit
/// /returns
/// The m_timeout value is updated
///
/*!
\remarks
To reduce the number of I/O transmissions, OPC requests with identical
I/O operation will generate only a single I/O operation
Aber wie ??
- 1 beim Generieren der Sendedaten prüfen, ob identische Daten schon gesendet wurden
Bei SNMP möglich (da dort viele Daten gleichzeitig gesendet werden), bei CAN problematisch:
Wenn der Sendepuffer voll ist, wird die Message ja gesendet (und erst später, wenn die Antwort da ist,
weiterverarbeitet)
- 2 Beim Generieren der Sendetaten prüfen, ob Requests mit gleichen Daten in der Request-Queue stehen.
Die gefundenen Duplikate werden auch auf pendingIO() gesetzt, ein Zeiger auf den Duplikat-Request
wird mit setAssociatedRequest() beim vorhergehenden Request gespeichert (Einfache Kette)
Nicht schlecht, oder ? Ist jetzt realisiert!
Aber: Wenn dann später (bei einem erneuten Aufruf von processRequestQueue) ein neuer Request
eingefügt wurde, der identisch zu einem bereits laufendem Request ist, wird dies nicht bemerkt!
(Hat Vor- und Nachteile)
- 3 Beim Empfangen der Daten die Request-Queue checken und alle passenden Requests updaten
Auch nicht schlecht, oder? Evtl. Kombination 2&3 ?
*/
void CNodeCrate::processRequestQueue(void) {
SOCmnList requestQueue(getRequestQueue());
if(m_numberOfPendingRequests) { // previous request pending, try later again
#ifdef _DEBUG
DWORD count;
#endif
SYSLOG(LOG_WARNING,("Queue \"%s\": %d entries, %d requests pending"
,getName(),count = requestQueue.getCount(),m_numberOfPendingRequests));
_ASSERTION(count >= m_numberOfPendingRequests,"Number of pending requests too high!");
return;
}
SYSLOG(LOG_DEBUG,("Queue \"%s\": %d entries",getName(),requestQueue.getCount()));
CCrateRequest /*SODaSRequest*/ *pRequest;
SOCmnPointer cache;
SOCmnPointer itemTag;
m_timeoutAt.setInfinite(); // next timeout: never
struct snmp_pdu* pdu = NULL;
SOCmnListPosition pos = requestQueue.getStartPosition();
while(pos) { // for all request in the queue
pRequest = (CCrateRequest*)requestQueue.getNext(pos);
if(pRequest->isCompleted()) continue;
if(pRequest->isPending()) continue;
// SNMP connection not valid //////////////////////////////////////////////
if(!m_sessp) {
SOCmnPointer cache = pRequest->getCache();
cache->setQuality(OPC_QUALITY_NOT_CONNECTED);
pRequest->complete(S_OK);
continue;
}
// gather common information for read & write /////////////////////////////
itemTag = pRequest->getItemTag(); // get item tag
_ASSERTION((itemTag->getUserData()&OBJECT_TYPE_CRATE_TAG) == OBJECT_TYPE_CRATE_TAG,"Unexpected class");
CSnmpOid* poid = (CTag*)(SODaSItemTag*)itemTag; // get oid
// read operation /////////////////////////////////////////////////////////
if(pRequest->getOperation() == SODaSRequest::read) {
if(!pdu) pdu = snmp_pdu_create(SNMP_MSG_GET); // prepare get-request pdu
snmp_add_null_var(pdu,poid->getOid(),poid->getOidLength()); // generate request data
SYSLOG(LOG_DEBUG,("Queue \"%s\": Add \"%s\" to GetRequest-PDU, Request 0x%X"
,getName(),itemTag->getFullName(),pRequest));
pRequest->pendingIO(this);
m_requests[m_numberOfPendingRequests] = pRequest; // remember request
m_numberOfPendingRequests++;
//-------------------------------------------------------------------------------------------
// check if read requests with identical I/O operation are in the queue
//
SOCmnListPosition posSearchDuplicate = pos;
CCrateRequest *pRequestParent = pRequest;
while(posSearchDuplicate) { // for all request in the queue
CCrateRequest *pRequestDuplicate = (CCrateRequest*)requestQueue.getNext(posSearchDuplicate);
if(pRequestDuplicate->isCompleted()) continue;
if(pRequestDuplicate->isPending()) continue;
if(pRequestDuplicate->getOperation() == SODaSRequest::write) continue;
SOCmnPointer itemTagDuplicate = pRequestDuplicate->getItemTag(); // get item tag
_ASSERTION((itemTagDuplicate->getUserData()&OBJECT_TYPE_CRATE_TAG) == OBJECT_TYPE_CRATE_TAG,"Unexpected class");
CSnmpOid* poidDuplicate = (CTag*)(SODaSItemTag*)itemTagDuplicate; // get oid
int isIdentical = !poid->compareOid(poidDuplicate);
if(isIdentical) {
SYSLOG(LOG_DEBUG,("Request 0x%X is identical: \"%s\"",pRequestDuplicate,itemTagDuplicate->getFullName()));
pRequestDuplicate->pendingIO(this);
pRequestParent->setAssociatedRequest(pRequestDuplicate);
pRequestParent = pRequestDuplicate;
}
} // while(posSearchDuplicates)
///////////////////
if(m_numberOfPendingRequests >= maxVarBindings) break; // get-pdu ready ?
} // read operation
// write operation ////////////////////////////////////////////////////////
else {
if(m_numberOfPendingRequests) break; // write operations are not collected, finish read
pdu = snmp_pdu_create(SNMP_MSG_SET); // prepare set-request pdu
pdu->community = (u_char*)strdup(getServerApp()->m_snmp.getWriteCommunity());
pdu->community_len = strlen((char*)pdu->community);
/* add data to the PDU */
SOCmnVariant writeValue; /// get the data to be written to the I/O
pRequest->getWriteValue(&writeValue);
((CTag*)(SODaSItemTag*)itemTag)->setSnmpVar(writeValue,pdu);
SYSLOG(LOG_DEBUG,("Queue \"%s\": Add \"%s\" to SetRequest-PDU, Request 0x%X"
,getName(),itemTag->getName(),pRequest));
pRequest->pendingIO(this);
m_requests[m_numberOfPendingRequests] = pRequest; // remember request
m_numberOfPendingRequests++;
break; // only one request/pdu
} // write operation
} // for all requests in the queue
if(!pdu) return; // nothing to transmit
addRef(); // this is used in the callback later
/// \todo Achtung: addRef in Callback gelöscht, aber was ist bei Sendefehlern ?
m_requestId = snmp_sess_async_send(m_sessp,pdu,callback,this);
if(m_requestId) {
SYSLOG(LOG_DEBUG,("SNMP send (%d PDUs)->\"%s\" OK",m_numberOfPendingRequests,m_ipAddress));
////for(unsigned nRequest = 0;nRequest < m_numberOfPendingRequests;nRequest++) {
//// m_requests[nRequest]->pendingIO(this);
////}
requestSelectEvent(); // select() will generate an event
//m_timeoutAt = now; // update timeout time
//m_timeoutAt.addMilliseconds(timeout);
} // snmp_sess_async_send ok
else { // snmp_sess_async_send failure
char* snmpError = getErrorString();
syslog(LOG_ERR,"SNMP send (%d PDUs)->\"%s\": %s",m_numberOfPendingRequests,m_ipAddress,snmpError);
free(snmpError);
snmp_free_pdu(pdu);
completeRequests(OPC_QUALITY_COMM_FAILURE); // clears m_numberOfPendingRequests
} // snmp_sess_async_send failure
}
void CSnmpSession::completeRequests(WORD quality) {
CCrateRequest /*!!SODaSRequest*/ **ppRequests = m_requests;
unsigned nRequest = m_numberOfPendingRequests;
while(nRequest--) {
CCrateRequest /*!!SODaSRequest*/ *pRequest = *ppRequests++;
pRequest->dataError(quality,getIpAddress());
}
m_numberOfPendingRequests = 0;
}