From bfd863c57664e69336a2eac0d22f96857da0b509 Mon Sep 17 00:00:00 2001 From: Przemyslaw Czerpak Date: Fri, 30 Apr 2010 09:39:25 +0000 Subject: [PATCH] 2010-04-30 11:39 UTC+0200 Przemyslaw Czerpak (druzus/at/priv.onet.pl) * harbour/src/rtl/hbznet.c ! fixed stupid typo which caused that hb_znetFlush() in encrypted buffers never returned 0 (the returned value was increased by 4) * harbour/contrib/hbnetio/netio.h * harbour/contrib/hbnetio/netiocli.c * harbour/contrib/hbnetio/netiosrv.c * harbour/contrib/hbnetio/readme.txt + added support for communication streams/channels which allow to send asynchronously data from server to client. The following client side functions had been added: NETIO_OPENDATASTREAM( [, ] ) -> NETIO_OPENITEMSTREAM( [, ] ) -> NETIO_CLOSESTREAM( , [], [] ) -> NETIO_GETDATA( , [], [] ) -> | | NIL The following server side functions had been added: NETIO_SRVSTATUS( [, ] ) -> NETIO_SRVSENDITEM( , , ) -> NETIO_SRVSENDDATA( , , ) -> + added RT errors to the netio client code. They should help to early detect any communication problems ! disabled hb_socketSetNoDelay() in client and server code. I enabled it for some local tests and by mistake committed to SVN and I didn't noticed it. It could cause serious problems in some cases i.e. with slow WiFi or WAN connections. Warning: server signature changed due to extensions in protocol. both client and server side has to be updated and use current SVN code. + harbour/contrib/hbnetio/tests/netiot03.prg + added demonstration/test code for alternative RDD IO API, RPC and asynchronous data streams in HBNETIO --- harbour/ChangeLog | 43 +- harbour/contrib/hbnetio/netio.h | 61 ++- harbour/contrib/hbnetio/netiocli.c | 534 +++++++++++++++++++-- harbour/contrib/hbnetio/netiosrv.c | 277 ++++++++++- harbour/contrib/hbnetio/readme.txt | 29 ++ harbour/contrib/hbnetio/tests/netiot03.prg | 199 ++++++++ harbour/src/rtl/hbznet.c | 2 +- 7 files changed, 1062 insertions(+), 83 deletions(-) create mode 100644 harbour/contrib/hbnetio/tests/netiot03.prg diff --git a/harbour/ChangeLog b/harbour/ChangeLog index a2da8f3976..d79c27c6dc 100644 --- a/harbour/ChangeLog +++ b/harbour/ChangeLog @@ -17,6 +17,48 @@ past entries belonging to author(s): Viktor Szakats. */ +2010-04-30 11:39 UTC+0200 Przemyslaw Czerpak (druzus/at/priv.onet.pl) + * harbour/src/rtl/hbznet.c + ! fixed stupid typo which caused that hb_znetFlush() in encrypted + buffers never returned 0 (the returned value was increased by 4) + + * harbour/contrib/hbnetio/netio.h + * harbour/contrib/hbnetio/netiocli.c + * harbour/contrib/hbnetio/netiosrv.c + * harbour/contrib/hbnetio/readme.txt + + added support for communication streams/channels which allow to + send asynchronously data from server to client. + The following client side functions had been added: + NETIO_OPENDATASTREAM( [, ] ) + -> + NETIO_OPENITEMSTREAM( [, ] ) + -> + NETIO_CLOSESTREAM( , [], [] ) + -> + NETIO_GETDATA( , [], [] ) + -> | | NIL + The following server side functions had been added: + NETIO_SRVSTATUS( [, ] ) + -> + NETIO_SRVSENDITEM( , , ) + -> + NETIO_SRVSENDDATA( , , ) + -> + + added RT errors to the netio client code. They should help to early + detect any communication problems + ! disabled hb_socketSetNoDelay() in client and server code. + I enabled it for some local tests and by mistake committed to SVN + and I didn't noticed it. It could cause serious problems in some cases + i.e. with slow WiFi or WAN connections. + + Warning: server signature changed due to extensions in protocol. + both client and server side has to be updated and use + current SVN code. + + + harbour/contrib/hbnetio/tests/netiot03.prg + + added demonstration/test code for alternative RDD IO API, RPC and + asynchronous data streams in HBNETIO + 2010-04-29 02:00 UTC-0800 Pritpal Bedi (pritpal@vouchcac.com) * contrib/hbqt/hbqt_hbqplaintextedit.cpp % Improvement to horizontal ruler to respect long lines @@ -30,7 +72,6 @@ * contrib/hbqt/hbqt_hbqplaintextedit.h * contrib/hbqt/hbqt_hbslots.cpp - + Implemented: horizontal ruler in the editor instances at the top of the window. Opinions are welcome about its base and tab colors. diff --git a/harbour/contrib/hbnetio/netio.h b/harbour/contrib/hbnetio/netio.h index 8a7fcb496e..ee5581a77c 100644 --- a/harbour/contrib/hbnetio/netio.h +++ b/harbour/contrib/hbnetio/netio.h @@ -76,7 +76,7 @@ #define NETIO_PASSWD_MAX 64 /* login string */ -#define NETIO_LOGINSTRID "HarbourFileTcpIpServer\006" +#define NETIO_LOGINSTRID "HarbourFileTcpIpServer\007" /* messages */ #define NETIO_LOGIN 1 @@ -98,27 +98,36 @@ #define NETIO_PROC 17 #define NETIO_PROCW 18 #define NETIO_FUNC 19 +#define NETIO_FUNCCTRL 20 +#define NETIO_SRVITEM 21 +#define NETIO_SRVDATA 22 +#define NETIO_SRVCLOSE 23 #define NETIO_CONNECTED 0x4321DEAD /* messages format */ -/* { NETIO_LOGIN, len[ 2 ]... } + loginstr[ len ] -> { NETIO_LOGIN, NETIO_CONNECTED[ 4 ], ... } */ -/* { NETIO_EXISTS, len[ 2 ]... } + filename[ len ] -> { NETIO_EXISTS, ... } */ -/* { NETIO_DELETE, len[ 2 ]... } + filename[ len ] -> { NETIO_DELETE, ... } */ -/* { NETIO_RENAME, len[ 2 ], len2[ 2 ]... } + filename[ len ] + filename[ len2 ] -> { NETIO_RENAME, ... } */ -/* { NETIO_OPEN, len[ 2 ], flags[ 2 ], def_ext[], 0, ... } + filename[ len ] -> { NETIO_OPEN, file_no[2], ... } */ -/* { NETIO_READ, file_no[2], size[ 4 ], offset[ 8 ], ... } -> { NETIO_READ, read[ 4 ], err[ 4 ], ... } + data[ read ] */ -/* { NETIO_WRITE, file_no[2], size[ 4 ], offset[ 8 ], ... } + data[ size ] -> { NETIO_WRITE, written[ 4 ], err[ 4 ], ... } */ -/* { NETIO_LOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_LOCK, ... } */ -/* { NETIO_TRUNC, file_no[2], offset[ 8 ], ... } -> { NETIO_TRUNC, ... } */ -/* { NETIO_SIZE, file_no[2], ... } -> { NETIO_SIZE, size[ 8 ], err[ 4 ], ... } */ -/* { NETIO_COMMIT, file_no[2], ... } -> { NETIO_SYNC, ... } | NULL */ -/* { NETIO_CLOSE, file_no[2], ... } -> { NETIO_CLOSE, ... } */ -/* { NETIO_UNLOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_SYNC, ... } | NULL */ -/* { NETIO_PROCIS, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROCIS, ... } */ -/* { NETIO_PROC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_SYNC, ... } | NULL */ -/* { NETIO_PROCW, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROC, ... } */ -/* { NETIO_FUNC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_FUNC, size[ 4 ] } + data[ size ] */ -/* { NETIO_SYNC, ... } -> NULL */ +/* { NETIO_LOGIN, len[ 2 ]... } + loginstr[ len ] -> { NETIO_LOGIN, NETIO_CONNECTED[ 4 ], ... } */ +/* { NETIO_EXISTS, len[ 2 ]... } + filename[ len ] -> { NETIO_EXISTS, ... } */ +/* { NETIO_DELETE, len[ 2 ]... } + filename[ len ] -> { NETIO_DELETE, ... } */ +/* { NETIO_RENAME, len[ 2 ], len2[ 2 ]... } + filename[ len ] + filename[ len2 ] -> { NETIO_RENAME, ... } */ +/* { NETIO_OPEN, len[ 2 ], flags[ 2 ], def_ext[], 0, ... } + filename[ len ] -> { NETIO_OPEN, file_no[2], ... } */ +/* { NETIO_READ, file_no[2], size[ 4 ], offset[ 8 ], ... } -> { NETIO_READ, read[ 4 ], err[ 4 ], ... } + data[ read ] */ +/* { NETIO_WRITE, file_no[2], size[ 4 ], offset[ 8 ], ... } + data[ size ] -> { NETIO_WRITE, written[ 4 ], err[ 4 ], ... } */ +/* { NETIO_LOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_LOCK, ... } */ +/* { NETIO_TRUNC, file_no[2], offset[ 8 ], ... } -> { NETIO_TRUNC, ... } */ +/* { NETIO_SIZE, file_no[2], ... } -> { NETIO_SIZE, size[ 8 ], err[ 4 ], ... } */ +/* { NETIO_COMMIT, file_no[2], ... } -> { NETIO_SYNC, ... } | NULL */ +/* { NETIO_CLOSE, file_no[2], ... } -> { NETIO_CLOSE, ... } */ +/* { NETIO_UNLOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_SYNC, ... } | NULL */ +/* { NETIO_PROCIS, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROCIS, ... } */ +/* { NETIO_PROC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_SYNC, ... } | NULL */ +/* { NETIO_PROCW, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROC, ... } */ +/* { NETIO_FUNC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_FUNC, size[ 4 ] } + data[ size ] */ +/* { NETIO_FUNCCTRL, size[ 4 ], id[4], type[4] } + (funcname + \0 + data)[ size ] -> { NETIO_FUNCCTRL, size[ 4 ] } + data[ size ] */ +/* { NETIO_SRVCLOSE, id[4], ... } -> { NETIO_SRVCLOSE, ... } */ +/* { NETIO_SYNC, ... } -> NULL */ +/* -> { NETIO_SYNC, ... } */ +/* -> { NETIO_SRVITEM, id[4], size[ 4 ], ... } + data[ size ] */ +/* -> { NETIO_SRVDATA, id[4], size[ 4 ], ... } + data[ size ] */ /* alternative answer for all messages: -> { NETIO_ERROR, err[ 4 ], ... } */ /* netio errors */ @@ -126,9 +135,11 @@ #define NETIO_ERR_WRONG_PARAM 0xff02 #define NETIO_ERR_WRONG_FILE_PATH 0xff03 #define NETIO_ERR_WRONG_FILE_HANDLE 0xff04 -#define NETIO_ERR_FILES_MAX 0xff05 -#define NETIO_ERR_READ 0xff06 -#define NETIO_ERR_FILE_IO 0xff07 -#define NETIO_ERR_NOT_EXISTS 0xff08 -#define NETIO_ERR_UNSUPPORTED 0xff09 -#define NETIO_ERR_REFUSED 0xff10 +#define NETIO_ERR_WRONG_FILE_SIZE 0xff05 +#define NETIO_ERR_WRONG_STREAMID 0xff06 +#define NETIO_ERR_FILES_MAX 0xff07 +#define NETIO_ERR_READ 0xff08 +#define NETIO_ERR_FILE_IO 0xff09 +#define NETIO_ERR_NOT_EXISTS 0xff0a +#define NETIO_ERR_UNSUPPORTED 0xff0b +#define NETIO_ERR_REFUSED 0xff0c diff --git a/harbour/contrib/hbnetio/netiocli.c b/harbour/contrib/hbnetio/netiocli.c index ac429aa2dc..79818a9434 100644 --- a/harbour/contrib/hbnetio/netiocli.c +++ b/harbour/contrib/hbnetio/netiocli.c @@ -24,6 +24,15 @@ * NETIO_PROCEXECW( [, ] ) -> * NETIO_FUNCEXEC( [, ] ) -> * + * NETIO_OPENDATASTREAM( [, ] ) + * -> + * NETIO_OPENITEMSTREAM( [, ] ) + * -> + * NETIO_CLOSESTREAM( , [], [] ) + * -> + * NETIO_GETDATA( , [], [] ) + * -> | | NIL + * * Copyright 2009 Przemyslaw Czerpak * www - http://www.harbour-project.org * @@ -88,15 +97,30 @@ * client code */ +typedef struct _HB_SRVDATA +{ + int id; + int type; + PHB_ITEM array; + char * data; + HB_SIZE size; + HB_SIZE bufsize; + HB_SIZE maxsize; + struct _HB_SRVDATA * next; +} +HB_SRVDATA, * PHB_SRVDATA; + typedef struct _HB_CONCLI { HB_COUNTER used; HB_COUNTER usrcount; PHB_ITEM mutex; + HB_ERRCODE errcode; int timeout; int port; HB_SOCKET sd; PHB_ZNETSTREAM zstream; + PHB_SRVDATA srvdata; struct _HB_CONCLI * next; int level; int strategy; @@ -151,6 +175,18 @@ static const HB_FILE_FUNCS * s_fileMethods( void ); #define NETIO_TIMEOUT -1 +static void hb_errRT_NETIO( HB_ERRCODE errGenCode, HB_ERRCODE errSubCode, + HB_ERRCODE errOsCode, const char * szDescription, + const char * szOperation ) +{ + PHB_ITEM pError; + + pError = hb_errRT_New( ES_ERROR, "NETIO", errGenCode, errSubCode, + szDescription, szOperation, errOsCode, EF_NONE ); + hb_errLaunch( pError ); + hb_itemRelease( pError ); +} + static long s_fileRecvAll( PHB_CONCLI conn, void * buffer, long len ) { HB_BYTE * ptr = ( HB_BYTE * ) buffer; @@ -169,6 +205,171 @@ static long s_fileRecvAll( PHB_CONCLI conn, void * buffer, long len ) return lRead; } +static long s_fileRecvTest( PHB_CONCLI conn, void * buffer, long len ) +{ + HB_BYTE * ptr = ( HB_BYTE * ) buffer; + long lRead = 0, l; + HB_MAXINT timeout = 0; + + while( lRead < len ) + { + if( conn->zstream ) + l = hb_znetRead( conn->zstream, conn->sd, ptr + lRead, len - lRead, timeout ); + else + l = hb_socketRecv( conn->sd, ptr + lRead, len - lRead, 0, timeout ); + if( l <= 0 ) + break; + lRead += l; + timeout = NETIO_TIMEOUT; + } + return lRead; +} + +static int s_fileGenSrvDataID( PHB_CONCLI conn ) +{ + PHB_SRVDATA pSrvData = conn->srvdata; + static int s_iStreamID = 0; + + if( ++s_iStreamID <= 0 ) + s_iStreamID = 1; + + while( pSrvData ) + { + if( pSrvData->id == s_iStreamID ) + { + if( ++s_iStreamID <= 0 ) + s_iStreamID = 1; + pSrvData = conn->srvdata; + } + else + pSrvData = pSrvData->next; + } + + return s_iStreamID; +} + +static PHB_SRVDATA s_fileFindSrvData( PHB_CONCLI conn, int iStreamID, int iType ) +{ + PHB_SRVDATA pSrvData = conn->srvdata; + + while( pSrvData ) + { + if( pSrvData->id == iStreamID ) + return ( iType == 0 || pSrvData->type == iType ) ? pSrvData : NULL; + pSrvData = pSrvData->next; + } + + return NULL; +} + +static HB_BOOL s_fileCloseSrvData( PHB_CONCLI conn, int iStreamID ) +{ + PHB_SRVDATA * pSrvDataPtr = &conn->srvdata; + + while( *pSrvDataPtr ) + { + if( ( *pSrvDataPtr )->id == iStreamID ) + { + PHB_SRVDATA pSrvData = *pSrvDataPtr; + *pSrvDataPtr = pSrvData->next; + if( pSrvData->array ) + hb_itemRelease( pSrvData->array ); + if( pSrvData->data ) + hb_xfree( pSrvData->data ); + hb_xfree( pSrvData ); + if( !conn->srvdata ) + hb_atomic_dec( &conn->used ); + return HB_TRUE; + } + pSrvDataPtr = &( *pSrvDataPtr )->next; + } + + return HB_FALSE; +} + +static void s_fileNewSrvData( PHB_CONCLI conn, int iStreamID, int iType ) +{ + PHB_SRVDATA pSrvData = s_fileFindSrvData( conn, iStreamID, 0 ); + + if( !pSrvData ) + { + pSrvData = ( PHB_SRVDATA ) memset( hb_xgrab( sizeof( HB_SRVDATA ) ), + 0, sizeof( HB_SRVDATA ) ); + pSrvData->id = iStreamID; + pSrvData->type = iType; + if( iType == NETIO_SRVITEM ) + pSrvData->maxsize = 4096; + else if( iType == NETIO_SRVDATA ) + pSrvData->maxsize = 0x10000; + pSrvData->next = conn->srvdata; + if( !conn->srvdata ) + hb_atomic_inc( &conn->used ); + conn->srvdata = pSrvData; + } +} + +static HB_BOOL s_fileRecvSrvData( PHB_CONCLI conn, long len, int iStreamID, int iType ) +{ + char * buffer = ( char * ) hb_xgrab( len ); + HB_BOOL fResult = HB_FALSE; + + if( s_fileRecvAll( conn, buffer, len ) == len ) + { + PHB_SRVDATA pSrvData = s_fileFindSrvData( conn, iStreamID, iType ); + + if( pSrvData ) + { + if( pSrvData->size < pSrvData->maxsize ) + { + if( iType == NETIO_SRVITEM ) + { + HB_SIZE nSize = len; + const char * data = buffer; + PHB_ITEM pItem = hb_itemDeserialize( &data, &nSize ); + + if( pItem ) + { + if( nSize == 0 ) + { + if( pSrvData->array == NULL ) + pSrvData->array = hb_itemArrayNew( 0 ); + if( hb_arrayLen( pSrvData->array ) < pSrvData->maxsize ) + hb_arrayAddForward( pSrvData->array, pItem ); + } + hb_itemRelease( pItem ); + } + } + else if( iType == NETIO_SRVDATA ) + { + long lmax = pSrvData->maxsize - pSrvData->size; + + if( len > lmax ) + len = lmax; + if( pSrvData->size + len > pSrvData->bufsize ) + { + pSrvData->bufsize = ( pSrvData->size + len ) << 1; + if( pSrvData->bufsize > pSrvData->maxsize ) + pSrvData->bufsize = pSrvData->maxsize; + pSrvData->data = ( char * ) hb_xrealloc( pSrvData->data, + pSrvData->bufsize ); + } + memcpy( pSrvData->data + pSrvData->size, buffer, len ); + pSrvData->size += len; + } + } + } + fResult = HB_TRUE; + } + else + { + conn->errcode = hb_socketGetError(); + hb_errRT_NETIO( EG_READ, 1001, conn->errcode, NULL, HB_ERR_FUNCNAME ); + } + hb_xfree( buffer ); + + return fResult; +} + static HB_BOOL s_fileSendMsg( PHB_CONCLI conn, HB_BYTE * msgbuf, const void * data, long len, HB_BOOL fWait ) { @@ -210,22 +411,51 @@ static HB_BOOL s_fileSendMsg( PHB_CONCLI conn, HB_BYTE * msgbuf, if( lSent == len ) { - if( conn->zstream ) - hb_znetFlush( conn->zstream, conn->sd, NETIO_TIMEOUT ); - - if( fWait ) + if( conn->zstream && + hb_znetFlush( conn->zstream, conn->sd, NETIO_TIMEOUT ) != 0 ) { - int iMsg = HB_GET_LE_INT32( msgbuf ); + conn->errcode = hb_socketGetError(); + hb_errRT_NETIO( EG_WRITE, 1002, conn->errcode, NULL, HB_ERR_FUNCNAME ); + } + else if( fWait ) + { + int iMsg = HB_GET_LE_INT32( msgbuf ), iResult; - while( s_fileRecvAll( conn, msgbuf, NETIO_MSGLEN ) == NETIO_MSGLEN ) + for( ;; ) { - int iResult = HB_GET_LE_INT32( msgbuf ); + if( s_fileRecvAll( conn, msgbuf, NETIO_MSGLEN ) != NETIO_MSGLEN ) + { + conn->errcode = hb_socketGetError(); + hb_errRT_NETIO( EG_READ, 1003, conn->errcode, NULL, HB_ERR_FUNCNAME ); + break; + } - if( iResult != NETIO_SYNC ) + iResult = HB_GET_LE_INT32( msgbuf ); + + if( iResult == NETIO_SRVITEM || iResult == NETIO_SRVDATA ) + { + int iStreamID = HB_GET_LE_UINT32( &msgbuf[ 4 ] ); + + len = HB_GET_LE_INT32( &msgbuf[ 8 ] ); + if( len > 0 ) + { + if( !s_fileRecvSrvData( conn, len, iStreamID, iResult ) ) + break; + } + } + else if( iResult != NETIO_SYNC ) { if( iResult == NETIO_ERROR ) - hb_fsSetError( ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 4 ] ) ); - else if( iResult == iMsg ) + { + conn->errcode = ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 4 ] ); + hb_fsSetError( conn->errcode ); + } + else if( iResult != iMsg ) + { + conn->errcode = NETIO_ERR_UNKNOWN_COMMAND; + hb_errRT_NETIO( EG_UNSUPPORTED, 1004, 0, NULL, HB_ERR_FUNCNAME ); + } + else fResult = HB_TRUE; break; } @@ -234,6 +464,65 @@ static HB_BOOL s_fileSendMsg( PHB_CONCLI conn, HB_BYTE * msgbuf, else fResult = HB_TRUE; } + else + { + conn->errcode = hb_socketGetError(); + hb_errRT_NETIO( EG_WRITE, 1005, conn->errcode, NULL, HB_ERR_FUNCNAME ); + } + + return fResult; +} + +static HB_BOOL s_fileProcessData( PHB_CONCLI conn ) +{ + HB_BYTE msgbuf[ NETIO_MSGLEN ]; + HB_BOOL fResult = HB_TRUE; + int iMsg, iStreamID; + long len; + + for( ;; ) + { + len = s_fileRecvTest( conn, msgbuf, NETIO_MSGLEN ); + if( len == NETIO_MSGLEN ) + { + iMsg = HB_GET_LE_INT32( msgbuf ); + if( iMsg == NETIO_SRVITEM || iMsg == NETIO_SRVDATA ) + { + iStreamID = HB_GET_LE_INT32( &msgbuf[ 4 ] ); + len = HB_GET_LE_INT32( &msgbuf[ 8 ] ); + if( len > 0 ) + { + if( !s_fileRecvSrvData( conn, len, iStreamID, iMsg ) ) + { + fResult = HB_FALSE; + break; + } + } + } + else if( iMsg == NETIO_ERROR ) + { + conn->errcode = ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 4 ] ); + hb_fsSetError( conn->errcode ); + } + else if( iMsg != NETIO_SYNC ) + { + fResult = HB_FALSE; + conn->errcode = NETIO_ERR_UNKNOWN_COMMAND; + hb_errRT_NETIO( EG_UNSUPPORTED, 1006, 0, NULL, HB_ERR_FUNCNAME ); + break; + } + } + else + { + if( len != 0 ) + { + fResult = HB_FALSE; + conn->errcode = hb_socketGetError(); + hb_errRT_NETIO( EG_READ, 1007, conn->errcode, NULL, HB_ERR_FUNCNAME ); + } + break; + } + } return fResult; } @@ -251,8 +540,10 @@ static PHB_CONCLI s_fileConNew( HB_SOCKET sd, const char * pszServer, hb_atomic_set( &conn->used, 1 ); hb_atomic_set( &conn->usrcount, 0 ); conn->mutex = hb_threadMutexCreate(); + conn->errcode = 0; conn->sd = sd; conn->zstream = NULL; + conn->srvdata = NULL; conn->next = NULL; conn->timeout = iTimeOut; conn->port = iPort; @@ -270,6 +561,16 @@ static void s_fileConFree( PHB_CONCLI conn ) { hb_socketShutdown( conn->sd, HB_SOCKET_SHUT_RDWR ); hb_socketClose( conn->sd ); + while( conn->srvdata ) + { + PHB_SRVDATA pSrvData = conn->srvdata; + conn->srvdata = pSrvData->next; + if( pSrvData->array ) + hb_itemRelease( pSrvData->array ); + if( pSrvData->data ) + hb_xfree( pSrvData->data ); + hb_xfree( pSrvData ); + } if( conn->zstream ) hb_znetClose( conn->zstream ); if( conn->mutex ) @@ -344,8 +645,7 @@ static HB_BOOL s_fileUsrDisconnect( const char * pszServer, int iPort ) conn = s_connections; while( conn ) { - if( conn->port == iPort && - hb_stricmp( conn->server, pszServer ) == 0 ) + if( conn->port == iPort && hb_stricmp( conn->server, pszServer ) == 0 ) { if( hb_atomic_get( &conn->usrcount ) ) { @@ -555,7 +855,7 @@ static PHB_CONCLI s_fileConnect( const char ** pFilename, HB_PUT_LE_UINT16( &msgbuf[ 4 ], len ); memset( msgbuf + 6, '\0', sizeof( msgbuf ) - 6 ); - hb_socketSetNoDelay( sd, HB_TRUE ); + hb_socketSetNoDelay( sd, HB_FALSE ); conn = s_fileConNew( sd, pszIpAddres, iPort, iTimeOut, pszPasswd, iPassLen, iLevel, iStrategy ); sd = HB_NO_SOCKET; @@ -776,7 +1076,7 @@ static const char * s_netio_params( int iMsg, const char * pszName, HB_U32 * pSi return data ? data : pszName; } -static HB_BOOL s_netio_procexec( const char * pszProcName, int iMsg ) +static HB_BOOL s_netio_procexec( const char * pszProcName, int iMsg, int iType ) { HB_BOOL fResult = HB_FALSE; @@ -792,33 +1092,55 @@ static HB_BOOL s_netio_procexec( const char * pszProcName, int iMsg ) const char * data; char * buffer; HB_U32 size; + int iStreamID = 0; data = s_netio_params( iMsg, pszProcName, &size, &buffer ); HB_PUT_LE_UINT32( &msgbuf[ 0 ], iMsg ); HB_PUT_LE_UINT32( &msgbuf[ 4 ], size ); - memset( msgbuf + 8, '\0', sizeof( msgbuf ) - 8 ); - fResult = s_fileSendMsg( conn, msgbuf, data, size, iMsg != NETIO_PROC ); - if( fResult && iMsg == NETIO_FUNC ) + if( iMsg == NETIO_FUNCCTRL ) { - HB_ULONG ulResult = HB_GET_LE_UINT32( &msgbuf[ 4 ] ); + iStreamID = s_fileGenSrvDataID( conn ); + HB_PUT_LE_UINT32( &msgbuf[ 8 ], iStreamID ); + HB_PUT_LE_UINT32( &msgbuf[ 12 ], iType ); + memset( msgbuf + 16, '\0', sizeof( msgbuf ) - 16 ); + } + else + memset( msgbuf + 8, '\0', sizeof( msgbuf ) - 8 ); + fResult = s_fileSendMsg( conn, msgbuf, data, size, iMsg != NETIO_PROC ); + if( fResult && ( iMsg == NETIO_FUNC || iMsg == NETIO_FUNCCTRL ) ) + { + HB_SIZE nResult = HB_GET_LE_UINT32( &msgbuf[ 4 ] ); - if( ulResult > 0 ) + if( nResult > 0 ) { PHB_ITEM pItem; - if( ulResult > size && buffer ) + if( nResult > size && buffer ) { hb_xfree( buffer ); buffer = NULL; } if( buffer == NULL ) - buffer = ( char * ) hb_xgrab( ulResult ); - ulResult = s_fileRecvAll( conn, buffer, ulResult ); + buffer = ( char * ) hb_xgrab( nResult ); + nResult = s_fileRecvAll( conn, buffer, ( long ) nResult ); data = buffer; - pItem = hb_itemDeserialize( &data, &ulResult ); + pItem = hb_itemDeserialize( &data, &nResult ); if( pItem ) + { + if( iMsg == NETIO_FUNCCTRL ) + { + if( iStreamID == hb_itemGetNI( pItem ) ) + s_fileNewSrvData( conn, iStreamID, iType ); + else + hb_itemPutNI( pItem, -1 ); + } hb_itemReturnRelease( pItem ); - /* else TODO: RTE */ + } + else + { + conn->errcode = NETIO_ERR_WRONG_PARAM; + hb_errRT_NETIO( EG_CORRUPTION, 1008, 0, NULL, HB_ERR_FUNCNAME ); + } } } if( buffer ) @@ -840,7 +1162,7 @@ HB_FUNC( NETIO_PROCEXISTS ) { const char * pszProcName = hb_parc( 1 ); - hb_retl( s_netio_procexec( pszProcName, NETIO_PROCIS ) ); + hb_retl( s_netio_procexec( pszProcName, NETIO_PROCIS, 0 ) ); } /* execute function/procedure on server the side, @@ -852,7 +1174,7 @@ HB_FUNC( NETIO_PROCEXEC ) { const char * pszProcName = hb_parc( 1 ); - hb_retl( s_netio_procexec( pszProcName, NETIO_PROC ) ); + hb_retl( s_netio_procexec( pszProcName, NETIO_PROC, 0 ) ); } /* execute function/procedure on the server side and wait for @@ -864,7 +1186,7 @@ HB_FUNC( NETIO_PROCEXECW ) { const char * pszProcName = hb_parc( 1 ); - hb_retl( s_netio_procexec( pszProcName, NETIO_PROCW ) ); + hb_retl( s_netio_procexec( pszProcName, NETIO_PROCW, 0 ) ); } /* execute function on the server side and wait for its return value: @@ -875,7 +1197,147 @@ HB_FUNC( NETIO_FUNCEXEC ) { const char * pszProcName = hb_parc( 1 ); - s_netio_procexec( pszProcName, NETIO_FUNC ); + s_netio_procexec( pszProcName, NETIO_FUNC, 0 ); +} + +/* open communication stream/channel which allow to send data + * asynchronously from server to client: + * + * NETIO_OPENDATASTREAM( [, ] ) -> + * + * it executes on the server side: + * ( , [, ] ) + * and then check value returned by above function. If it's equal to + * then the communication stream is opened and + * is returned to the client. + * The function returns new stream ID or -1 if the communication stream + * cannot be set. + */ +HB_FUNC( NETIO_OPENDATASTREAM ) +{ + const char * pszProcName = hb_parc( 1 ); + + s_netio_procexec( pszProcName, NETIO_FUNCCTRL, NETIO_SRVDATA ); +} + +/* open communication stream/channel which allow to send data + * asynchronously from server to client: + * + * NETIO_OPENITEMSTREAM( [, ] ) -> + * + * it executes on the server side: + * ( , [, ] ) + * and then check value returned by above function. If it's equal to + * then the communication stream is opened and + * is returned to the client. + * The function returns new stream ID or -1 if the communication stream + * cannot be set. + */ +HB_FUNC( NETIO_OPENITEMSTREAM ) +{ + const char * pszProcName = hb_parc( 1 ); + + s_netio_procexec( pszProcName, NETIO_FUNCCTRL, NETIO_SRVITEM ); +} + +/* close communication stream/channel: + * + * NETIO_CLOSESTREAM( , [], [] ) -> + */ +HB_FUNC( NETIO_CLOSESTREAM ) +{ + int iStreamID = hb_parni( 1 ); + HB_BOOL fResult = HB_FALSE; + + if( iStreamID ) + { + const char * pszServer = hb_parc( 2 ); + char * pszIpAddres; + int iPort = hb_parni( 3 ); + + s_fileGetConnParam( &pszServer, &iPort, NULL, NULL, NULL ); + pszIpAddres = hb_socketResolveAddr( pszServer, HB_SOCKET_AF_INET ); + if( pszIpAddres != NULL ) + { + PHB_CONCLI conn = s_fileConFind( pszIpAddres, iPort ); + + if( conn ) + { + if( s_fileConLock( conn ) ) + { + fResult = s_fileCloseSrvData( conn, iStreamID ); + if( fResult ) + { + HB_BYTE msgbuf[ NETIO_MSGLEN ]; + + HB_PUT_LE_UINT32( &msgbuf[ 0 ], NETIO_SRVCLOSE ); + HB_PUT_LE_UINT32( &msgbuf[ 4 ], iStreamID ); + memset( msgbuf + 8, '\0', sizeof( msgbuf ) - 8 ); + s_fileSendMsg( conn, msgbuf, NULL, 0, HB_TRUE ); + } + s_fileConUnlock( conn ); + } + s_fileConClose( conn ); + } + hb_xfree( pszIpAddres ); + } + } + hb_retl( fResult ); +} + +/* retrieve data sent from the server by cominication stream + * + * NETIO_GETDATA( , [], [] ) -> ||NIL + */ +HB_FUNC( NETIO_GETDATA ) +{ + int iStreamID = hb_parni( 1 ); + + if( iStreamID ) + { + const char * pszServer = hb_parc( 2 ); + char * pszIpAddres; + int iPort = hb_parni( 3 ); + + s_fileGetConnParam( &pszServer, &iPort, NULL, NULL, NULL ); + pszIpAddres = hb_socketResolveAddr( pszServer, HB_SOCKET_AF_INET ); + if( pszIpAddres != NULL ) + { + PHB_CONCLI conn = s_fileConFind( pszIpAddres, iPort ); + + if( conn ) + { + if( s_fileConLock( conn ) ) + { + if( s_fileProcessData( conn ) ) + { + PHB_SRVDATA pSrvData = s_fileFindSrvData( conn, iStreamID, 0 ); + if( pSrvData ) + { + if( pSrvData->type == NETIO_SRVITEM ) + { + if( pSrvData->array ) + { + hb_itemReturnForward( pSrvData->array ); + hb_arrayNew( pSrvData->array, 0 ); + } + else + hb_reta( 0 ); + } + else if( pSrvData->type == NETIO_SRVDATA ) + { + hb_retclen( pSrvData->data, pSrvData->size ); + pSrvData->size = 0; + } + } + } + s_fileConUnlock( conn ); + } + s_fileConClose( conn ); + } + hb_xfree( pszIpAddres ); + } + } } /* Client methods @@ -1092,14 +1554,24 @@ static HB_SIZE s_fileReadAt( PHB_FILE pFile, void * data, HB_SIZE ulSize, if( s_fileSendMsg( pFile->conn, msgbuf, NULL, 0, HB_TRUE ) ) { + HB_ERRCODE errCode = ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 8 ] ); ulResult = HB_GET_LE_UINT32( &msgbuf[ 4 ] ); if( ulResult > 0 ) { if( ulResult > ulSize ) /* error, it should not happen, enemy attack? */ - ulResult = ulSize; - ulResult = s_fileRecvAll( pFile->conn, data, ulResult ); + { + pFile->conn->errcode = errCode = NETIO_ERR_WRONG_FILE_SIZE; + hb_errRT_NETIO( EG_DATAWIDTH, 1009, 0, NULL, HB_ERR_FUNCNAME ); + ulResult = 0; + } + else if( ( HB_SIZE ) s_fileRecvAll( pFile->conn, data, ulResult ) != ulResult ) + { + pFile->conn->errcode = hb_socketGetError(); + errCode = NETIO_ERR_READ; + hb_errRT_NETIO( EG_READ, 1010, pFile->conn->errcode, NULL, HB_ERR_FUNCNAME ); + } } - hb_fsSetError( ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 8 ] ) ); + hb_fsSetError( errCode ); } s_fileConUnlock( pFile->conn ); } diff --git a/harbour/contrib/hbnetio/netiosrv.c b/harbour/contrib/hbnetio/netiosrv.c index c12cfb8477..f25803604a 100644 --- a/harbour/contrib/hbnetio/netiosrv.c +++ b/harbour/contrib/hbnetio/netiosrv.c @@ -22,7 +22,13 @@ * NETIO_RPC( | [, ] ) * -> * NETIO_RPCFILTER( , - | | NIL ) -> NIL + * | | NIL ) -> NIL + * + * NETIO_SRVSTATUS( [, ] ) -> + * NETIO_SRVSENDITEM( , , ) + * -> + * NETIO_SRVSENDDATA( , , ) + * -> * * Copyright 2009 Przemyslaw Czerpak * www - http://www.harbour-project.org @@ -86,6 +92,14 @@ * server code */ +typedef struct _HB_CONSTREAM +{ + int id; + int type; + struct _HB_CONSTREAM * next; +} +HB_CONSTREAM, * PHB_CONSTREAM; + typedef struct _HB_CONSRV { HB_SOCKET sd; @@ -98,6 +112,8 @@ typedef struct _HB_CONSRV HB_BOOL login; PHB_SYMB rpcFunc; PHB_ITEM rpcFilter; + PHB_ITEM mutex; + PHB_CONSTREAM streams; int rootPathLen; char rootPath[ HB_PATH_MAX ]; } @@ -237,6 +253,16 @@ static void s_consrv_close( PHB_CONSRV conn ) if( conn->rpcFilter ) hb_itemRelease( conn->rpcFilter ); + while( conn->streams ) + { + PHB_CONSTREAM stream = conn->streams; + conn->streams = stream->next; + hb_xfree( stream ); + } + + if( conn->mutex ) + hb_itemRelease( conn->mutex ); + if( conn->sd != HB_NO_SOCKET ) hb_socketClose( conn->sd ); @@ -354,24 +380,29 @@ static long s_srvSendAll( PHB_CONSRV conn, void * buffer, long len ) HB_BYTE * ptr = ( HB_BYTE * ) buffer; long lSent = 0, lLast = 1, l; - while( lSent < len && !conn->stop ) + if( !conn->mutex || hb_threadMutexLock( conn->mutex ) ) { - if( conn->zstream ) - l = hb_znetWrite( conn->zstream, conn->sd, ptr + lSent, len - lSent, -1, &lLast ); - else - l = lLast = hb_socketSend( conn->sd, ptr + lSent, len - lSent, 0, -1 ); - if( l > 0 ) - lSent += l; - if( lLast <= 0 ) + while( lSent < len && !conn->stop ) { - if( hb_socketGetError() != HB_SOCKET_ERR_TIMEOUT || - hb_vmRequestQuery() != 0 ) - break; + if( conn->zstream ) + l = hb_znetWrite( conn->zstream, conn->sd, ptr + lSent, len - lSent, -1, &lLast ); + else + l = lLast = hb_socketSend( conn->sd, ptr + lSent, len - lSent, 0, -1 ); + if( l > 0 ) + lSent += l; + if( lLast <= 0 ) + { + if( hb_socketGetError() != HB_SOCKET_ERR_TIMEOUT || + hb_vmRequestQuery() != 0 ) + break; + } } - } - if( conn->zstream && lLast > 0 && !conn->stop ) - hb_znetFlush( conn->zstream, conn->sd, -1 ); + if( conn->zstream && lLast > 0 && !conn->stop ) + hb_znetFlush( conn->zstream, conn->sd, -1 ); + if( conn->mutex ) + hb_threadMutexUnlock( conn->mutex ); + } return lSent; } @@ -583,7 +614,8 @@ HB_FUNC( NETIO_ACCEPT ) if( connsd != HB_NO_SOCKET ) { - hb_socketSetNoDelay( connsd, HB_TRUE ); + hb_socketSetKeepAlive( connsd, HB_TRUE ); + hb_socketSetNoDelay( connsd, HB_FALSE ); conn = s_consrvNew( connsd, lsd->rootPath, lsd->rpc ); @@ -693,7 +725,7 @@ HB_FUNC( NETIO_SERVER ) HB_BOOL fNoAnswer = HB_FALSE; HB_ERRCODE errCode = 0, errFsCode; long len = 0, size, size2; - int iFileNo; + int iFileNo, iStreamID; HB_U32 uiMsg; HB_USHORT uiFalgs; char * szExt; @@ -983,11 +1015,44 @@ HB_FUNC( NETIO_SERVER ) fNoAnswer = HB_TRUE; break; + case NETIO_SRVCLOSE: + iStreamID = HB_GET_LE_INT32( &msgbuf[ 4 ] ); + if( iStreamID && conn->mutex && hb_threadMutexLock( conn->mutex ) ) + { + PHB_CONSTREAM * pStreamPtr = &conn->streams; + while( *pStreamPtr ) + { + if( ( *pStreamPtr )->id == iStreamID ) + { + PHB_CONSTREAM stream = *pStreamPtr; + *pStreamPtr = stream->next; + hb_xfree( stream ); + break; + } + pStreamPtr = &( *pStreamPtr )->next; + } + if( *pStreamPtr == NULL ) + iStreamID = 0; + hb_threadMutexUnlock( conn->mutex ); + } + else + iStreamID = 0; + + if( iStreamID == 0 ) + errCode = NETIO_ERR_WRONG_STREAMID; + else + { + HB_PUT_LE_UINT32( &msg[ 0 ], NETIO_SRVCLOSE ); + memset( msg + 4, '\0', NETIO_MSGLEN - 4 ); + } + break; + case NETIO_PROC: fNoAnswer = HB_TRUE; case NETIO_PROCIS: case NETIO_PROCW: case NETIO_FUNC: + case NETIO_FUNCCTRL: if( !conn->rpc ) { errCode = NETIO_ERR_UNSUPPORTED; @@ -1030,10 +1095,12 @@ HB_FUNC( NETIO_SERVER ) { if( hb_vmRequestReenter() ) { - HB_SIZE ulSize = size - size2; + HB_SIZE nSize = size - size2; HB_USHORT uiPCount = 0; HB_BOOL fSend = HB_FALSE; + int iStreamType; + iStreamID = 0; data += size2; if( pItem ) { @@ -1053,21 +1120,47 @@ HB_FUNC( NETIO_SERVER ) hb_vmPushDynSym( pDynSym ); hb_vmPushNil(); } - while( ulSize ) + if( uiMsg == NETIO_FUNCCTRL ) { - pItem = hb_itemDeserialize( &data, &ulSize ); + iStreamID = HB_GET_LE_INT32( &msgbuf[ 8 ] ); + iStreamType = HB_GET_LE_INT32( &msgbuf[ 12 ] ); + hb_vmPush( hb_param( 1, HB_IT_ANY ) ); + hb_vmPushInteger( iStreamID ); + uiPCount += 2; + if( iStreamType != NETIO_SRVDATA && + iStreamType != NETIO_SRVITEM ) + iStreamID = 0; + if( iStreamID ) + { + PHB_CONSTREAM stream = ( PHB_CONSTREAM ) + hb_xgrab( sizeof( HB_CONSTREAM ) ); + stream->id = iStreamID; + stream->type = iStreamType; + stream->next = conn->streams; + conn->streams = stream; + + if( conn->mutex == NULL ) + conn->mutex = hb_threadMutexCreate(); + if( !hb_threadMutexLock( conn->mutex ) ) + errCode = NETIO_ERR_REFUSED; + } + else + errCode = NETIO_ERR_WRONG_PARAM; + } + while( nSize > 0 && errCode == 0 ) + { + pItem = hb_itemDeserialize( &data, &nSize ); if( !pItem ) { - ulSize = 1; + errCode = NETIO_ERR_WRONG_PARAM; break; } ++uiPCount; hb_vmPush( pItem ); hb_itemRelease( pItem ); } - if( ulSize ) + if( errCode != 0 ) { - errCode = NETIO_ERR_WRONG_PARAM; uiPCount += 2; do hb_stackPop(); @@ -1079,10 +1172,11 @@ HB_FUNC( NETIO_SERVER ) hb_vmSend( uiPCount ); else hb_vmProc( uiPCount ); - if( uiMsg == NETIO_FUNC ) + if( uiMsg == NETIO_FUNC || uiMsg == NETIO_FUNCCTRL ) { HB_SIZE itmSize; - char * itmData = hb_itemSerialize( hb_stackReturnItem(), HB_TRUE, &itmSize ); + PHB_ITEM pResult = hb_stackReturnItem(); + char * itmData = hb_itemSerialize( pResult, HB_TRUE, &itmSize ); if( itmSize <= sizeof( buffer ) - NETIO_MSGLEN ) msg = buffer; else if( !ptr || itmSize > ( HB_SIZE ) size - NETIO_MSGLEN ) @@ -1094,9 +1188,25 @@ HB_FUNC( NETIO_SERVER ) memcpy( msg + NETIO_MSGLEN, itmData, itmSize ); hb_xfree( itmData ); len = itmSize; + if( iStreamID && hb_itemGetNI( pResult ) == iStreamID ) + { + hb_threadMutexUnlock( conn->mutex ); + iStreamID = 0; + } } } hb_vmRequestRestore(); + if( iStreamID ) + { + PHB_CONSTREAM stream = conn->streams; + + if( stream->id == iStreamID ) + { + conn->streams = stream->next; + hb_xfree( conn->streams ); + } + hb_threadMutexUnlock( conn->mutex ); + } } else errCode = NETIO_ERR_REFUSED; @@ -1147,3 +1257,120 @@ HB_FUNC( NETIO_SERVER ) } } } + +/* NETIO_SRVSENDITEM( , , ) -> + */ +HB_FUNC( NETIO_SRVSENDITEM ) +{ + PHB_CONSRV conn = s_consrvParam( 1 ); + int iStreamID = hb_parni( 2 ); + PHB_ITEM pItem = hb_param( 3, HB_IT_ANY ); + HB_BOOL fResult = HB_FALSE; + + if( conn && conn->sd != HB_NO_SOCKET && !conn->stop && conn->mutex && + iStreamID && pItem ) + { + char * itmData, * msg; + HB_SIZE nLen; + + itmData = hb_itemSerialize( pItem, HB_TRUE, &nLen ); + msg = ( char * ) hb_xgrab( nLen + NETIO_MSGLEN ); + HB_PUT_LE_UINT32( &msg[ 0 ], NETIO_SRVITEM ); + HB_PUT_LE_UINT32( &msg[ 4 ], iStreamID ); + HB_PUT_LE_UINT32( &msg[ 8 ], nLen ); + memset( msg + 12, '\0', NETIO_MSGLEN - 12 ); + memcpy( msg + NETIO_MSGLEN, itmData, nLen ); + hb_xfree( itmData ); + + nLen += NETIO_MSGLEN; + if( hb_threadMutexLock( conn->mutex ) ) + { + PHB_CONSTREAM stream = conn->streams; + while( stream ) + { + if( stream->id == iStreamID ) + break; + stream = stream->next; + } + if( stream && stream->type == NETIO_SRVITEM ) + fResult = s_srvSendAll( conn, msg, nLen ) == ( long ) nLen; + hb_threadMutexUnlock( conn->mutex ); + } + hb_xfree( msg ); + } + hb_retl( fResult ); +} + +/* NETIO_SRVSENDDATA( , , ) -> + */ +HB_FUNC( NETIO_SRVSENDDATA ) +{ + PHB_CONSRV conn = s_consrvParam( 1 ); + int iStreamID = hb_parni( 2 ); + HB_SIZE nLen = hb_parclen( 3 ); + HB_BOOL fResult = HB_FALSE; + + if( conn && conn->sd != HB_NO_SOCKET && !conn->stop && conn->mutex && + iStreamID && nLen > 0 ) + { + char * msg; + + msg = ( char * ) hb_xgrab( nLen + NETIO_MSGLEN ); + HB_PUT_LE_UINT32( &msg[ 0 ], NETIO_SRVDATA ); + HB_PUT_LE_UINT32( &msg[ 4 ], iStreamID ); + HB_PUT_LE_UINT32( &msg[ 8 ], nLen ); + memset( msg + 12, '\0', NETIO_MSGLEN - 12 ); + memcpy( msg + NETIO_MSGLEN, hb_parc( 3 ), nLen ); + + nLen += NETIO_MSGLEN; + if( hb_threadMutexLock( conn->mutex ) ) + { + PHB_CONSTREAM stream = conn->streams; + while( stream ) + { + if( stream->id == iStreamID ) + break; + stream = stream->next; + } + if( stream && stream->type == NETIO_SRVDATA ) + fResult = s_srvSendAll( conn, msg, nLen ) == ( long ) nLen; + hb_threadMutexUnlock( conn->mutex ); + } + hb_xfree( msg ); + } + hb_retl( fResult ); +} + +/* NETIO_SRVSTATUS( [, ] ) -> + */ +HB_FUNC( NETIO_SRVSTATUS ) +{ + PHB_CONSRV conn = s_consrvParam( 1 ); + int iStreamID = hb_parni( 2 ); + int iStatus = 0; + + if( !conn ) + iStatus = -1; + else if( conn->sd != HB_NO_SOCKET ) + iStatus = -2; + else if( conn->stop ) + iStatus = -3; + else if( iStreamID != 0 && conn->mutex ) + { + if( hb_threadMutexLock( conn->mutex ) ) + { + PHB_CONSTREAM stream = conn->streams; + while( stream ) + { + if( stream->id == iStreamID ) + { + iStatus = stream->type == NETIO_SRVDATA ? 1 : 2; + break; + } + stream = stream->next; + } + hb_threadMutexUnlock( conn->mutex ); + } + } + hb_retni( iStatus ); +} diff --git a/harbour/contrib/hbnetio/readme.txt b/harbour/contrib/hbnetio/readme.txt index 2f0cb36fcc..0255aa86bf 100644 --- a/harbour/contrib/hbnetio/readme.txt +++ b/harbour/contrib/hbnetio/readme.txt @@ -89,6 +89,31 @@ Client side functions: value sent by the server. + NETIO_OPENDATASTREAM( [, ] ) -> + NETIO_OPENITEMSTREAM( [, ] ) -> + open communication stream/channel which allow to send data + asynchronously from server to client. + It executes on the server side: + ( , [, ] ) + and then checks value returned by above function. If it's equal to + then the communication stream is opened and + is returned to the client. + The function returns new stream ID or -1 if the communication stream + cannot be set. + may contain information about connection parameters + just like in NETIO_PROC*() functions. + + NETIO_CLOSESTREAM( , [], [] ) -> + close communication stream/channel + + NETIO_GETDATA( , [], [] ) -> ||NIL + retrieve data sent from the server by communication stream. + If stream was open by NETIO_OPENDATASTREAM() then data is returned + as string. + If stream was open by NETIO_OPENITEMSTREAM() then data is returned + as array of items received from the server. + + Server side functions: ====================== @@ -108,3 +133,7 @@ Server side functions: [ | | ], [], [], [] ) -> + + NETIO_SRVSTATUS( [, ] ) -> + NETIO_SRVSENDITEM( , , ) -> + NETIO_SRVSENDDATA( , , ) -> diff --git a/harbour/contrib/hbnetio/tests/netiot03.prg b/harbour/contrib/hbnetio/tests/netiot03.prg new file mode 100644 index 0000000000..142a62a242 --- /dev/null +++ b/harbour/contrib/hbnetio/tests/netiot03.prg @@ -0,0 +1,199 @@ +/* + * $Id$ + */ + +/* + * Harbour Project source code: + * demonstration/test code for alternative RDD IO API, RPC and + * asynchronous data streams in NETIO + * + * Copyright 2010 Przemyslaw Czerpak + * www - http://www.harbour-project.org + * + */ + +/* net:127.0.0.1:2941:topsecret:data/_tst_ */ + +#define DBSERVER "127.0.0.1" +#define DBPORT 2941 +#define DBPASSWD "topsecret" +#define DBDIR "data" +#define DBFILE "_tst_" + +#define DBNAME "net:" + DBSERVER + ":" + hb_ntos( DBPORT ) + ":" + ; + DBPASSWD + ":" + DBDIR + "/" + DBFILE + +request DBFCDX + +request HB_DIREXISTS +request MAKEDIR +request HB_DATETIME + +proc main() + local pSockSrv, lExists, nStream1, nStream2, nSec, xData + + set exclusive off + rddSetDefault( "DBFCDX" ) + + pSockSrv := netio_mtserver( DBPORT,,, /* RPC */ .T., DBPASSWD ) + if empty( pSockSrv ) + ? "Cannot start NETIO server !!!" + wait "Press any key to exit..." + quit + endif + + ? "NETIO server activated." + hb_idleSleep( 0.1 ) + wait + + ? + ? "NETIO_CONNECT():", netio_connect( DBSERVER, DBPORT, , DBPASSWD ) + ? + + netio_procexec( "QOut", "PROCEXEC", "P2", "P3", "P4" ) + netio_funcexec( "QOut", "FUNCEXEC", "P2", "P3", "P4" ) + ? "SERVER TIME:", netio_funcexec( "hb_dateTime" ) + ? + wait + + nStream1 := NETIO_OPENITEMSTREAM( "reg_stream" ) + ? "NETIO_OPENITEMSTREAM:", nStream1 + nStream2 := NETIO_OPENDATASTREAM( "reg_charstream" ) + ? "NETIO_OPENDATASTREAM:", nStream2 + + hb_idleSleep( 3 ) + ? "NETIO_GETDATA 1:", hb_valToExp( NETIO_GETDATA( nStream1 ) ) + ? "NETIO_GETDATA 2:", hb_valToExp( NETIO_GETDATA( nStream2 ) ) + nSec := seconds() + 3 + while seconds() < nSec + xData := NETIO_GETDATA( nStream1 ) + if !empty( xData ) + ? hb_valToExp( xData ) + endif + xData := NETIO_GETDATA( nStream2 ) + if !empty( xData ) + ?? "", hb_valToExp( xData ) + endif + enddo + wait + ? "NETIO_GETDATA 1:", hb_valToExp( NETIO_GETDATA( nStream1 ) ) + ? "NETIO_GETDATA 2:", hb_valToExp( NETIO_GETDATA( nStream2 ) ) + wait + + lExists := netio_funcexec( "HB_DirExists", "./data" ) + ? "Directory './data'", iif( !lExists, "not exists", "exists" ) + if !lExists + ? "Creating directory './data' ->", ; + iif( netio_funcexec( "MakeDir", "./data" ) == -1, "error", "OK" ) + endif + + createdb( DBNAME ) + testdb( DBNAME ) + wait + + ? + ? "table exists:", dbExists( DBNAME ) + wait + + ? + ? "delete table with indexes:", dbDrop( DBNAME ) + ? "table exists:", dbExists( DBNAME ) + wait + + ? "NETIO_GETDATA 1:", hb_valToExp( NETIO_GETDATA( nStream1 ) ) + ? "NETIO_GETDATA 2:", hb_valToExp( NETIO_GETDATA( nStream2 ) ) + ? "NETIO_DISCONNECT():", netio_disconnect( DBSERVER, DBPORT ) + ? "NETIO_CLOSESTREAM 1:", NETIO_CLOSESTREAM( nStream1 ) + ? "NETIO_CLOSESTREAM 2:", NETIO_CLOSESTREAM( nStream2 ) + hb_idleSleep( 2 ) + ? + ? "stopping the server..." + netio_serverstop( pSockSrv, .t. ) +return + +proc createdb( cName ) + local n + + dbCreate( cName, {{"F1", "C", 20, 0},; + {"F2", "M", 4, 0},; + {"F3", "N", 10, 2},; + {"F4", "T", 8, 0}} ) + ? "create neterr:", neterr(), hb_osError() + use (cName) + ? "use neterr:", neterr(), hb_osError() + while lastrec() < 100 + dbAppend() + n := recno() - 1 + field->F1 := chr( n % 26 + asc( "A" ) ) + " " + time() + field->F2 := field->F1 + field->F3 := n / 100 + field->F4 := hb_dateTime() + enddo + index on field->F1 tag T1 + index on field->F3 tag T3 + index on field->F4 tag T4 + close + ? +return + +proc testdb( cName ) + local i, j + use (cName) + ? "used:", used() + ? "nterr:", neterr() + ? "alias:", alias() + ? "lastrec:", lastrec() + ? "ordCount:", ordCount() + for i:=1 to ordCount() + ordSetFocus( i ) + ? i, "name:", ordName(), "key:", ordKey(), "keycount:", ordKeyCount() + next + ordSetFocus( 1 ) + dbgotop() + while !eof() + if ! field->F1 == field->F2 + ? "error at record:", recno() + ? " ! '" + field->F1 + "' == '" + field->F2 + "'" + endif + dbSkip() + enddo + wait + i := row() + j := col() + dbgotop() + browse() + setpos( i, j ) + close +return + +func reg_stream( pConnSock, nStream ) + ? PROCNAME(), nStream + hb_threadDetach( hb_threadStart( @rpc_timer(), pConnSock, nStream ) ) +return nStream + +func reg_charstream( pConnSock, nStream ) + ? PROCNAME(), nStream + hb_threadDetach( hb_threadStart( @rpc_charstream(), pConnSock, nStream ) ) +return nStream + +static func rpc_timer( pConnSock, nStream ) + while .t. + if !netio_srvSendItem( pConnSock, nStream, time() ) + ? "CLOSED STREAM:", nStream + exit + endif + hb_idleSleep( 1 ) + enddo +return nil + +static func rpc_charstream( pConnSock, nStream ) + local n := 0 + while .t. + if !netio_srvSendData( pConnSock, nStream, chr( 65 + n ) ) + ? "CLOSED STREAM:", nStream + exit + endif + n := int( ( n + 1 ) % 26 ) + hb_idleSleep( 0.1 ) + enddo +return nil diff --git a/harbour/src/rtl/hbznet.c b/harbour/src/rtl/hbznet.c index 8395aac207..87e3535169 100644 --- a/harbour/src/rtl/hbznet.c +++ b/harbour/src/rtl/hbznet.c @@ -357,7 +357,7 @@ static long hb_znetStreamWrite( PHB_ZNETSTREAM pStream, HB_SOCKET sd, HB_MAXINT */ long hb_znetFlush( PHB_ZNETSTREAM pStream, HB_SOCKET sd, HB_MAXINT timeout ) { - uInt uiSize = HB_ZNET_BUFSIZE - ( pStream->crypt ? -2 : 0 ); + uInt uiSize = HB_ZNET_BUFSIZE - ( pStream->crypt ? 2 : 0 ); if( pStream->wr.avail_out > 0 ) pStream->err = deflate( &pStream->wr, Z_SYNC_FLUSH );