2010-04-30 11:39 UTC+0200 Przemyslaw Czerpak (druzus/at/priv.onet.pl)

* harbour/src/rtl/hbznet.c
    ! fixed stupid typo which caused that hb_znetFlush() in encrypted
      buffers never returned 0 (the returned value was increased by 4)

  * harbour/contrib/hbnetio/netio.h
  * harbour/contrib/hbnetio/netiocli.c
  * harbour/contrib/hbnetio/netiosrv.c
  * harbour/contrib/hbnetio/readme.txt
    + added support for communication streams/channels which allow to
      send asynchronously data from server to client.
      The following client side functions had been added:
         NETIO_OPENDATASTREAM( <cStreamFuncName> [, <params,...>] )
            -> <nStreamID>
         NETIO_OPENITEMSTREAM( <cStreamFuncName> [, <params,...>] )
            -> <nStreamID>
         NETIO_CLOSESTREAM( <nStreamID>, [<cServer>], [<nPort>] )
            -> <lOK>
         NETIO_GETDATA( <nStreamID>, [<cServer>], [<nPort>] )
            -> <aData> | <cData> | NIL
      The following server side functions had been added:
         NETIO_SRVSTATUS( <pConnectionSocket> [, <nStreamID>] )
            -> <nStatus>
         NETIO_SRVSENDITEM( <pConnectionSocket>, <nStreamID>, <xData> )
            -> <lSent>
         NETIO_SRVSENDDATA( <pConnectionSocket>, <nStreamID>, <cData> )
            -> <lSent>
    + added RT errors to the netio client code. They should help to early
      detect any communication problems
    ! disabled hb_socketSetNoDelay() in client and server code.
      I enabled it for some local tests and by mistake committed to SVN
      and I didn't noticed it. It could cause serious problems in some cases
      i.e. with slow WiFi or WAN connections.

      Warning: server signature changed due to extensions in protocol.
               both client and server side has to be updated and use
               current SVN code.

  + harbour/contrib/hbnetio/tests/netiot03.prg
    + added demonstration/test code for alternative RDD IO API, RPC and
      asynchronous data streams in HBNETIO
This commit is contained in:
Przemyslaw Czerpak
2010-04-30 09:39:25 +00:00
parent c62dc7fefc
commit bfd863c576
7 changed files with 1062 additions and 83 deletions

View File

@@ -17,6 +17,48 @@
past entries belonging to author(s): Viktor Szakats.
*/
2010-04-30 11:39 UTC+0200 Przemyslaw Czerpak (druzus/at/priv.onet.pl)
* harbour/src/rtl/hbznet.c
! fixed stupid typo which caused that hb_znetFlush() in encrypted
buffers never returned 0 (the returned value was increased by 4)
* harbour/contrib/hbnetio/netio.h
* harbour/contrib/hbnetio/netiocli.c
* harbour/contrib/hbnetio/netiosrv.c
* harbour/contrib/hbnetio/readme.txt
+ added support for communication streams/channels which allow to
send asynchronously data from server to client.
The following client side functions had been added:
NETIO_OPENDATASTREAM( <cStreamFuncName> [, <params,...>] )
-> <nStreamID>
NETIO_OPENITEMSTREAM( <cStreamFuncName> [, <params,...>] )
-> <nStreamID>
NETIO_CLOSESTREAM( <nStreamID>, [<cServer>], [<nPort>] )
-> <lOK>
NETIO_GETDATA( <nStreamID>, [<cServer>], [<nPort>] )
-> <aData> | <cData> | NIL
The following server side functions had been added:
NETIO_SRVSTATUS( <pConnectionSocket> [, <nStreamID>] )
-> <nStatus>
NETIO_SRVSENDITEM( <pConnectionSocket>, <nStreamID>, <xData> )
-> <lSent>
NETIO_SRVSENDDATA( <pConnectionSocket>, <nStreamID>, <cData> )
-> <lSent>
+ added RT errors to the netio client code. They should help to early
detect any communication problems
! disabled hb_socketSetNoDelay() in client and server code.
I enabled it for some local tests and by mistake committed to SVN
and I didn't noticed it. It could cause serious problems in some cases
i.e. with slow WiFi or WAN connections.
Warning: server signature changed due to extensions in protocol.
both client and server side has to be updated and use
current SVN code.
+ harbour/contrib/hbnetio/tests/netiot03.prg
+ added demonstration/test code for alternative RDD IO API, RPC and
asynchronous data streams in HBNETIO
2010-04-29 02:00 UTC-0800 Pritpal Bedi (pritpal@vouchcac.com)
* contrib/hbqt/hbqt_hbqplaintextedit.cpp
% Improvement to horizontal ruler to respect long lines
@@ -30,7 +72,6 @@
* contrib/hbqt/hbqt_hbqplaintextedit.h
* contrib/hbqt/hbqt_hbslots.cpp
+ Implemented: horizontal ruler in the editor instances at the top
of the window. Opinions are welcome about its base and tab colors.

View File

@@ -76,7 +76,7 @@
#define NETIO_PASSWD_MAX 64
/* login string */
#define NETIO_LOGINSTRID "HarbourFileTcpIpServer\006"
#define NETIO_LOGINSTRID "HarbourFileTcpIpServer\007"
/* messages */
#define NETIO_LOGIN 1
@@ -98,27 +98,36 @@
#define NETIO_PROC 17
#define NETIO_PROCW 18
#define NETIO_FUNC 19
#define NETIO_FUNCCTRL 20
#define NETIO_SRVITEM 21
#define NETIO_SRVDATA 22
#define NETIO_SRVCLOSE 23
#define NETIO_CONNECTED 0x4321DEAD
/* messages format */
/* { NETIO_LOGIN, len[ 2 ]... } + loginstr[ len ] -> { NETIO_LOGIN, NETIO_CONNECTED[ 4 ], ... } */
/* { NETIO_EXISTS, len[ 2 ]... } + filename[ len ] -> { NETIO_EXISTS, ... } */
/* { NETIO_DELETE, len[ 2 ]... } + filename[ len ] -> { NETIO_DELETE, ... } */
/* { NETIO_RENAME, len[ 2 ], len2[ 2 ]... } + filename[ len ] + filename[ len2 ] -> { NETIO_RENAME, ... } */
/* { NETIO_OPEN, len[ 2 ], flags[ 2 ], def_ext[], 0, ... } + filename[ len ] -> { NETIO_OPEN, file_no[2], ... } */
/* { NETIO_READ, file_no[2], size[ 4 ], offset[ 8 ], ... } -> { NETIO_READ, read[ 4 ], err[ 4 ], ... } + data[ read ] */
/* { NETIO_WRITE, file_no[2], size[ 4 ], offset[ 8 ], ... } + data[ size ] -> { NETIO_WRITE, written[ 4 ], err[ 4 ], ... } */
/* { NETIO_LOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_LOCK, ... } */
/* { NETIO_TRUNC, file_no[2], offset[ 8 ], ... } -> { NETIO_TRUNC, ... } */
/* { NETIO_SIZE, file_no[2], ... } -> { NETIO_SIZE, size[ 8 ], err[ 4 ], ... } */
/* { NETIO_COMMIT, file_no[2], ... } -> { NETIO_SYNC, ... } | NULL */
/* { NETIO_CLOSE, file_no[2], ... } -> { NETIO_CLOSE, ... } */
/* { NETIO_UNLOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_SYNC, ... } | NULL */
/* { NETIO_PROCIS, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROCIS, ... } */
/* { NETIO_PROC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_SYNC, ... } | NULL */
/* { NETIO_PROCW, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROC, ... } */
/* { NETIO_FUNC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_FUNC, size[ 4 ] } + data[ size ] */
/* { NETIO_SYNC, ... } -> NULL */
/* { NETIO_LOGIN, len[ 2 ]... } + loginstr[ len ] -> { NETIO_LOGIN, NETIO_CONNECTED[ 4 ], ... } */
/* { NETIO_EXISTS, len[ 2 ]... } + filename[ len ] -> { NETIO_EXISTS, ... } */
/* { NETIO_DELETE, len[ 2 ]... } + filename[ len ] -> { NETIO_DELETE, ... } */
/* { NETIO_RENAME, len[ 2 ], len2[ 2 ]... } + filename[ len ] + filename[ len2 ] -> { NETIO_RENAME, ... } */
/* { NETIO_OPEN, len[ 2 ], flags[ 2 ], def_ext[], 0, ... } + filename[ len ] -> { NETIO_OPEN, file_no[2], ... } */
/* { NETIO_READ, file_no[2], size[ 4 ], offset[ 8 ], ... } -> { NETIO_READ, read[ 4 ], err[ 4 ], ... } + data[ read ] */
/* { NETIO_WRITE, file_no[2], size[ 4 ], offset[ 8 ], ... } + data[ size ] -> { NETIO_WRITE, written[ 4 ], err[ 4 ], ... } */
/* { NETIO_LOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_LOCK, ... } */
/* { NETIO_TRUNC, file_no[2], offset[ 8 ], ... } -> { NETIO_TRUNC, ... } */
/* { NETIO_SIZE, file_no[2], ... } -> { NETIO_SIZE, size[ 8 ], err[ 4 ], ... } */
/* { NETIO_COMMIT, file_no[2], ... } -> { NETIO_SYNC, ... } | NULL */
/* { NETIO_CLOSE, file_no[2], ... } -> { NETIO_CLOSE, ... } */
/* { NETIO_UNLOCK, file_no[2], start[ 8 ], len[ 8 ], flags[ 2 ], ... } -> { NETIO_SYNC, ... } | NULL */
/* { NETIO_PROCIS, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROCIS, ... } */
/* { NETIO_PROC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_SYNC, ... } | NULL */
/* { NETIO_PROCW, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_PROC, ... } */
/* { NETIO_FUNC, size[ 4 ] } + (funcname + \0 + data)[ size ] -> { NETIO_FUNC, size[ 4 ] } + data[ size ] */
/* { NETIO_FUNCCTRL, size[ 4 ], id[4], type[4] } + (funcname + \0 + data)[ size ] -> { NETIO_FUNCCTRL, size[ 4 ] } + data[ size ] */
/* { NETIO_SRVCLOSE, id[4], ... } -> { NETIO_SRVCLOSE, ... } */
/* { NETIO_SYNC, ... } -> NULL */
/* -> { NETIO_SYNC, ... } */
/* -> { NETIO_SRVITEM, id[4], size[ 4 ], ... } + data[ size ] */
/* -> { NETIO_SRVDATA, id[4], size[ 4 ], ... } + data[ size ] */
/* alternative answer for all messages: -> { NETIO_ERROR, err[ 4 ], ... } */
/* netio errors */
@@ -126,9 +135,11 @@
#define NETIO_ERR_WRONG_PARAM 0xff02
#define NETIO_ERR_WRONG_FILE_PATH 0xff03
#define NETIO_ERR_WRONG_FILE_HANDLE 0xff04
#define NETIO_ERR_FILES_MAX 0xff05
#define NETIO_ERR_READ 0xff06
#define NETIO_ERR_FILE_IO 0xff07
#define NETIO_ERR_NOT_EXISTS 0xff08
#define NETIO_ERR_UNSUPPORTED 0xff09
#define NETIO_ERR_REFUSED 0xff10
#define NETIO_ERR_WRONG_FILE_SIZE 0xff05
#define NETIO_ERR_WRONG_STREAMID 0xff06
#define NETIO_ERR_FILES_MAX 0xff07
#define NETIO_ERR_READ 0xff08
#define NETIO_ERR_FILE_IO 0xff09
#define NETIO_ERR_NOT_EXISTS 0xff0a
#define NETIO_ERR_UNSUPPORTED 0xff0b
#define NETIO_ERR_REFUSED 0xff0c

View File

@@ -24,6 +24,15 @@
* NETIO_PROCEXECW( <cProcName> [, <params,...>] ) -> <lExecuted>
* NETIO_FUNCEXEC( <cFuncName> [, <params,...>] ) -> <xFuncRetVal>
*
* NETIO_OPENDATASTREAM( <cStreamFuncName> [, <params,...>] )
* -> <nStreamID>
* NETIO_OPENITEMSTREAM( <cStreamFuncName> [, <params,...>] )
* -> <nStreamID>
* NETIO_CLOSESTREAM( <nStreamID>, [<cServer>], [<nPort>] )
* -> <lOK>
* NETIO_GETDATA( <nStreamID>, [<cServer>], [<nPort>] )
* -> <aData> | <cData> | NIL
*
* Copyright 2009 Przemyslaw Czerpak <druzus / at / priv.onet.pl>
* www - http://www.harbour-project.org
*
@@ -88,15 +97,30 @@
* client code
*/
typedef struct _HB_SRVDATA
{
int id;
int type;
PHB_ITEM array;
char * data;
HB_SIZE size;
HB_SIZE bufsize;
HB_SIZE maxsize;
struct _HB_SRVDATA * next;
}
HB_SRVDATA, * PHB_SRVDATA;
typedef struct _HB_CONCLI
{
HB_COUNTER used;
HB_COUNTER usrcount;
PHB_ITEM mutex;
HB_ERRCODE errcode;
int timeout;
int port;
HB_SOCKET sd;
PHB_ZNETSTREAM zstream;
PHB_SRVDATA srvdata;
struct _HB_CONCLI * next;
int level;
int strategy;
@@ -151,6 +175,18 @@ static const HB_FILE_FUNCS * s_fileMethods( void );
#define NETIO_TIMEOUT -1
static void hb_errRT_NETIO( HB_ERRCODE errGenCode, HB_ERRCODE errSubCode,
HB_ERRCODE errOsCode, const char * szDescription,
const char * szOperation )
{
PHB_ITEM pError;
pError = hb_errRT_New( ES_ERROR, "NETIO", errGenCode, errSubCode,
szDescription, szOperation, errOsCode, EF_NONE );
hb_errLaunch( pError );
hb_itemRelease( pError );
}
static long s_fileRecvAll( PHB_CONCLI conn, void * buffer, long len )
{
HB_BYTE * ptr = ( HB_BYTE * ) buffer;
@@ -169,6 +205,171 @@ static long s_fileRecvAll( PHB_CONCLI conn, void * buffer, long len )
return lRead;
}
static long s_fileRecvTest( PHB_CONCLI conn, void * buffer, long len )
{
HB_BYTE * ptr = ( HB_BYTE * ) buffer;
long lRead = 0, l;
HB_MAXINT timeout = 0;
while( lRead < len )
{
if( conn->zstream )
l = hb_znetRead( conn->zstream, conn->sd, ptr + lRead, len - lRead, timeout );
else
l = hb_socketRecv( conn->sd, ptr + lRead, len - lRead, 0, timeout );
if( l <= 0 )
break;
lRead += l;
timeout = NETIO_TIMEOUT;
}
return lRead;
}
static int s_fileGenSrvDataID( PHB_CONCLI conn )
{
PHB_SRVDATA pSrvData = conn->srvdata;
static int s_iStreamID = 0;
if( ++s_iStreamID <= 0 )
s_iStreamID = 1;
while( pSrvData )
{
if( pSrvData->id == s_iStreamID )
{
if( ++s_iStreamID <= 0 )
s_iStreamID = 1;
pSrvData = conn->srvdata;
}
else
pSrvData = pSrvData->next;
}
return s_iStreamID;
}
static PHB_SRVDATA s_fileFindSrvData( PHB_CONCLI conn, int iStreamID, int iType )
{
PHB_SRVDATA pSrvData = conn->srvdata;
while( pSrvData )
{
if( pSrvData->id == iStreamID )
return ( iType == 0 || pSrvData->type == iType ) ? pSrvData : NULL;
pSrvData = pSrvData->next;
}
return NULL;
}
static HB_BOOL s_fileCloseSrvData( PHB_CONCLI conn, int iStreamID )
{
PHB_SRVDATA * pSrvDataPtr = &conn->srvdata;
while( *pSrvDataPtr )
{
if( ( *pSrvDataPtr )->id == iStreamID )
{
PHB_SRVDATA pSrvData = *pSrvDataPtr;
*pSrvDataPtr = pSrvData->next;
if( pSrvData->array )
hb_itemRelease( pSrvData->array );
if( pSrvData->data )
hb_xfree( pSrvData->data );
hb_xfree( pSrvData );
if( !conn->srvdata )
hb_atomic_dec( &conn->used );
return HB_TRUE;
}
pSrvDataPtr = &( *pSrvDataPtr )->next;
}
return HB_FALSE;
}
static void s_fileNewSrvData( PHB_CONCLI conn, int iStreamID, int iType )
{
PHB_SRVDATA pSrvData = s_fileFindSrvData( conn, iStreamID, 0 );
if( !pSrvData )
{
pSrvData = ( PHB_SRVDATA ) memset( hb_xgrab( sizeof( HB_SRVDATA ) ),
0, sizeof( HB_SRVDATA ) );
pSrvData->id = iStreamID;
pSrvData->type = iType;
if( iType == NETIO_SRVITEM )
pSrvData->maxsize = 4096;
else if( iType == NETIO_SRVDATA )
pSrvData->maxsize = 0x10000;
pSrvData->next = conn->srvdata;
if( !conn->srvdata )
hb_atomic_inc( &conn->used );
conn->srvdata = pSrvData;
}
}
static HB_BOOL s_fileRecvSrvData( PHB_CONCLI conn, long len, int iStreamID, int iType )
{
char * buffer = ( char * ) hb_xgrab( len );
HB_BOOL fResult = HB_FALSE;
if( s_fileRecvAll( conn, buffer, len ) == len )
{
PHB_SRVDATA pSrvData = s_fileFindSrvData( conn, iStreamID, iType );
if( pSrvData )
{
if( pSrvData->size < pSrvData->maxsize )
{
if( iType == NETIO_SRVITEM )
{
HB_SIZE nSize = len;
const char * data = buffer;
PHB_ITEM pItem = hb_itemDeserialize( &data, &nSize );
if( pItem )
{
if( nSize == 0 )
{
if( pSrvData->array == NULL )
pSrvData->array = hb_itemArrayNew( 0 );
if( hb_arrayLen( pSrvData->array ) < pSrvData->maxsize )
hb_arrayAddForward( pSrvData->array, pItem );
}
hb_itemRelease( pItem );
}
}
else if( iType == NETIO_SRVDATA )
{
long lmax = pSrvData->maxsize - pSrvData->size;
if( len > lmax )
len = lmax;
if( pSrvData->size + len > pSrvData->bufsize )
{
pSrvData->bufsize = ( pSrvData->size + len ) << 1;
if( pSrvData->bufsize > pSrvData->maxsize )
pSrvData->bufsize = pSrvData->maxsize;
pSrvData->data = ( char * ) hb_xrealloc( pSrvData->data,
pSrvData->bufsize );
}
memcpy( pSrvData->data + pSrvData->size, buffer, len );
pSrvData->size += len;
}
}
}
fResult = HB_TRUE;
}
else
{
conn->errcode = hb_socketGetError();
hb_errRT_NETIO( EG_READ, 1001, conn->errcode, NULL, HB_ERR_FUNCNAME );
}
hb_xfree( buffer );
return fResult;
}
static HB_BOOL s_fileSendMsg( PHB_CONCLI conn, HB_BYTE * msgbuf,
const void * data, long len, HB_BOOL fWait )
{
@@ -210,22 +411,51 @@ static HB_BOOL s_fileSendMsg( PHB_CONCLI conn, HB_BYTE * msgbuf,
if( lSent == len )
{
if( conn->zstream )
hb_znetFlush( conn->zstream, conn->sd, NETIO_TIMEOUT );
if( fWait )
if( conn->zstream &&
hb_znetFlush( conn->zstream, conn->sd, NETIO_TIMEOUT ) != 0 )
{
int iMsg = HB_GET_LE_INT32( msgbuf );
conn->errcode = hb_socketGetError();
hb_errRT_NETIO( EG_WRITE, 1002, conn->errcode, NULL, HB_ERR_FUNCNAME );
}
else if( fWait )
{
int iMsg = HB_GET_LE_INT32( msgbuf ), iResult;
while( s_fileRecvAll( conn, msgbuf, NETIO_MSGLEN ) == NETIO_MSGLEN )
for( ;; )
{
int iResult = HB_GET_LE_INT32( msgbuf );
if( s_fileRecvAll( conn, msgbuf, NETIO_MSGLEN ) != NETIO_MSGLEN )
{
conn->errcode = hb_socketGetError();
hb_errRT_NETIO( EG_READ, 1003, conn->errcode, NULL, HB_ERR_FUNCNAME );
break;
}
if( iResult != NETIO_SYNC )
iResult = HB_GET_LE_INT32( msgbuf );
if( iResult == NETIO_SRVITEM || iResult == NETIO_SRVDATA )
{
int iStreamID = HB_GET_LE_UINT32( &msgbuf[ 4 ] );
len = HB_GET_LE_INT32( &msgbuf[ 8 ] );
if( len > 0 )
{
if( !s_fileRecvSrvData( conn, len, iStreamID, iResult ) )
break;
}
}
else if( iResult != NETIO_SYNC )
{
if( iResult == NETIO_ERROR )
hb_fsSetError( ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 4 ] ) );
else if( iResult == iMsg )
{
conn->errcode = ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 4 ] );
hb_fsSetError( conn->errcode );
}
else if( iResult != iMsg )
{
conn->errcode = NETIO_ERR_UNKNOWN_COMMAND;
hb_errRT_NETIO( EG_UNSUPPORTED, 1004, 0, NULL, HB_ERR_FUNCNAME );
}
else
fResult = HB_TRUE;
break;
}
@@ -234,6 +464,65 @@ static HB_BOOL s_fileSendMsg( PHB_CONCLI conn, HB_BYTE * msgbuf,
else
fResult = HB_TRUE;
}
else
{
conn->errcode = hb_socketGetError();
hb_errRT_NETIO( EG_WRITE, 1005, conn->errcode, NULL, HB_ERR_FUNCNAME );
}
return fResult;
}
static HB_BOOL s_fileProcessData( PHB_CONCLI conn )
{
HB_BYTE msgbuf[ NETIO_MSGLEN ];
HB_BOOL fResult = HB_TRUE;
int iMsg, iStreamID;
long len;
for( ;; )
{
len = s_fileRecvTest( conn, msgbuf, NETIO_MSGLEN );
if( len == NETIO_MSGLEN )
{
iMsg = HB_GET_LE_INT32( msgbuf );
if( iMsg == NETIO_SRVITEM || iMsg == NETIO_SRVDATA )
{
iStreamID = HB_GET_LE_INT32( &msgbuf[ 4 ] );
len = HB_GET_LE_INT32( &msgbuf[ 8 ] );
if( len > 0 )
{
if( !s_fileRecvSrvData( conn, len, iStreamID, iMsg ) )
{
fResult = HB_FALSE;
break;
}
}
}
else if( iMsg == NETIO_ERROR )
{
conn->errcode = ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 4 ] );
hb_fsSetError( conn->errcode );
}
else if( iMsg != NETIO_SYNC )
{
fResult = HB_FALSE;
conn->errcode = NETIO_ERR_UNKNOWN_COMMAND;
hb_errRT_NETIO( EG_UNSUPPORTED, 1006, 0, NULL, HB_ERR_FUNCNAME );
break;
}
}
else
{
if( len != 0 )
{
fResult = HB_FALSE;
conn->errcode = hb_socketGetError();
hb_errRT_NETIO( EG_READ, 1007, conn->errcode, NULL, HB_ERR_FUNCNAME );
}
break;
}
}
return fResult;
}
@@ -251,8 +540,10 @@ static PHB_CONCLI s_fileConNew( HB_SOCKET sd, const char * pszServer,
hb_atomic_set( &conn->used, 1 );
hb_atomic_set( &conn->usrcount, 0 );
conn->mutex = hb_threadMutexCreate();
conn->errcode = 0;
conn->sd = sd;
conn->zstream = NULL;
conn->srvdata = NULL;
conn->next = NULL;
conn->timeout = iTimeOut;
conn->port = iPort;
@@ -270,6 +561,16 @@ static void s_fileConFree( PHB_CONCLI conn )
{
hb_socketShutdown( conn->sd, HB_SOCKET_SHUT_RDWR );
hb_socketClose( conn->sd );
while( conn->srvdata )
{
PHB_SRVDATA pSrvData = conn->srvdata;
conn->srvdata = pSrvData->next;
if( pSrvData->array )
hb_itemRelease( pSrvData->array );
if( pSrvData->data )
hb_xfree( pSrvData->data );
hb_xfree( pSrvData );
}
if( conn->zstream )
hb_znetClose( conn->zstream );
if( conn->mutex )
@@ -344,8 +645,7 @@ static HB_BOOL s_fileUsrDisconnect( const char * pszServer, int iPort )
conn = s_connections;
while( conn )
{
if( conn->port == iPort &&
hb_stricmp( conn->server, pszServer ) == 0 )
if( conn->port == iPort && hb_stricmp( conn->server, pszServer ) == 0 )
{
if( hb_atomic_get( &conn->usrcount ) )
{
@@ -555,7 +855,7 @@ static PHB_CONCLI s_fileConnect( const char ** pFilename,
HB_PUT_LE_UINT16( &msgbuf[ 4 ], len );
memset( msgbuf + 6, '\0', sizeof( msgbuf ) - 6 );
hb_socketSetNoDelay( sd, HB_TRUE );
hb_socketSetNoDelay( sd, HB_FALSE );
conn = s_fileConNew( sd, pszIpAddres, iPort, iTimeOut,
pszPasswd, iPassLen, iLevel, iStrategy );
sd = HB_NO_SOCKET;
@@ -776,7 +1076,7 @@ static const char * s_netio_params( int iMsg, const char * pszName, HB_U32 * pSi
return data ? data : pszName;
}
static HB_BOOL s_netio_procexec( const char * pszProcName, int iMsg )
static HB_BOOL s_netio_procexec( const char * pszProcName, int iMsg, int iType )
{
HB_BOOL fResult = HB_FALSE;
@@ -792,33 +1092,55 @@ static HB_BOOL s_netio_procexec( const char * pszProcName, int iMsg )
const char * data;
char * buffer;
HB_U32 size;
int iStreamID = 0;
data = s_netio_params( iMsg, pszProcName, &size, &buffer );
HB_PUT_LE_UINT32( &msgbuf[ 0 ], iMsg );
HB_PUT_LE_UINT32( &msgbuf[ 4 ], size );
memset( msgbuf + 8, '\0', sizeof( msgbuf ) - 8 );
fResult = s_fileSendMsg( conn, msgbuf, data, size, iMsg != NETIO_PROC );
if( fResult && iMsg == NETIO_FUNC )
if( iMsg == NETIO_FUNCCTRL )
{
HB_ULONG ulResult = HB_GET_LE_UINT32( &msgbuf[ 4 ] );
iStreamID = s_fileGenSrvDataID( conn );
HB_PUT_LE_UINT32( &msgbuf[ 8 ], iStreamID );
HB_PUT_LE_UINT32( &msgbuf[ 12 ], iType );
memset( msgbuf + 16, '\0', sizeof( msgbuf ) - 16 );
}
else
memset( msgbuf + 8, '\0', sizeof( msgbuf ) - 8 );
fResult = s_fileSendMsg( conn, msgbuf, data, size, iMsg != NETIO_PROC );
if( fResult && ( iMsg == NETIO_FUNC || iMsg == NETIO_FUNCCTRL ) )
{
HB_SIZE nResult = HB_GET_LE_UINT32( &msgbuf[ 4 ] );
if( ulResult > 0 )
if( nResult > 0 )
{
PHB_ITEM pItem;
if( ulResult > size && buffer )
if( nResult > size && buffer )
{
hb_xfree( buffer );
buffer = NULL;
}
if( buffer == NULL )
buffer = ( char * ) hb_xgrab( ulResult );
ulResult = s_fileRecvAll( conn, buffer, ulResult );
buffer = ( char * ) hb_xgrab( nResult );
nResult = s_fileRecvAll( conn, buffer, ( long ) nResult );
data = buffer;
pItem = hb_itemDeserialize( &data, &ulResult );
pItem = hb_itemDeserialize( &data, &nResult );
if( pItem )
{
if( iMsg == NETIO_FUNCCTRL )
{
if( iStreamID == hb_itemGetNI( pItem ) )
s_fileNewSrvData( conn, iStreamID, iType );
else
hb_itemPutNI( pItem, -1 );
}
hb_itemReturnRelease( pItem );
/* else TODO: RTE */
}
else
{
conn->errcode = NETIO_ERR_WRONG_PARAM;
hb_errRT_NETIO( EG_CORRUPTION, 1008, 0, NULL, HB_ERR_FUNCNAME );
}
}
}
if( buffer )
@@ -840,7 +1162,7 @@ HB_FUNC( NETIO_PROCEXISTS )
{
const char * pszProcName = hb_parc( 1 );
hb_retl( s_netio_procexec( pszProcName, NETIO_PROCIS ) );
hb_retl( s_netio_procexec( pszProcName, NETIO_PROCIS, 0 ) );
}
/* execute function/procedure on server the side,
@@ -852,7 +1174,7 @@ HB_FUNC( NETIO_PROCEXEC )
{
const char * pszProcName = hb_parc( 1 );
hb_retl( s_netio_procexec( pszProcName, NETIO_PROC ) );
hb_retl( s_netio_procexec( pszProcName, NETIO_PROC, 0 ) );
}
/* execute function/procedure on the server side and wait for
@@ -864,7 +1186,7 @@ HB_FUNC( NETIO_PROCEXECW )
{
const char * pszProcName = hb_parc( 1 );
hb_retl( s_netio_procexec( pszProcName, NETIO_PROCW ) );
hb_retl( s_netio_procexec( pszProcName, NETIO_PROCW, 0 ) );
}
/* execute function on the server side and wait for its return value:
@@ -875,7 +1197,147 @@ HB_FUNC( NETIO_FUNCEXEC )
{
const char * pszProcName = hb_parc( 1 );
s_netio_procexec( pszProcName, NETIO_FUNC );
s_netio_procexec( pszProcName, NETIO_FUNC, 0 );
}
/* open communication stream/channel which allow to send data
* asynchronously from server to client:
*
* NETIO_OPENDATASTREAM( <cStreamFuncName> [, <params,...>] ) -> <nStreamID>
*
* it executes on the server side:
* <cStreamFuncName>( <pConnSock>, <nStreamID> [, <params,...>] )
* and then check value returned by above function. If it's equal to
* <nStreamID> then the communication stream is opened and <nStreamID>
* is returned to the client.
* The function returns new stream ID or -1 if the communication stream
* cannot be set.
*/
HB_FUNC( NETIO_OPENDATASTREAM )
{
const char * pszProcName = hb_parc( 1 );
s_netio_procexec( pszProcName, NETIO_FUNCCTRL, NETIO_SRVDATA );
}
/* open communication stream/channel which allow to send data
* asynchronously from server to client:
*
* NETIO_OPENITEMSTREAM( <cStreamFuncName> [, <params,...>] ) -> <nStreamID>
*
* it executes on the server side:
* <cStreamFuncName>( <pConnSock>, <nStreamID> [, <params,...>] )
* and then check value returned by above function. If it's equal to
* <nStreamID> then the communication stream is opened and <nStreamID>
* is returned to the client.
* The function returns new stream ID or -1 if the communication stream
* cannot be set.
*/
HB_FUNC( NETIO_OPENITEMSTREAM )
{
const char * pszProcName = hb_parc( 1 );
s_netio_procexec( pszProcName, NETIO_FUNCCTRL, NETIO_SRVITEM );
}
/* close communication stream/channel:
*
* NETIO_CLOSESTREAM( <nStreamID>, [<cServer>], [<nPort>] ) -> <lOK>
*/
HB_FUNC( NETIO_CLOSESTREAM )
{
int iStreamID = hb_parni( 1 );
HB_BOOL fResult = HB_FALSE;
if( iStreamID )
{
const char * pszServer = hb_parc( 2 );
char * pszIpAddres;
int iPort = hb_parni( 3 );
s_fileGetConnParam( &pszServer, &iPort, NULL, NULL, NULL );
pszIpAddres = hb_socketResolveAddr( pszServer, HB_SOCKET_AF_INET );
if( pszIpAddres != NULL )
{
PHB_CONCLI conn = s_fileConFind( pszIpAddres, iPort );
if( conn )
{
if( s_fileConLock( conn ) )
{
fResult = s_fileCloseSrvData( conn, iStreamID );
if( fResult )
{
HB_BYTE msgbuf[ NETIO_MSGLEN ];
HB_PUT_LE_UINT32( &msgbuf[ 0 ], NETIO_SRVCLOSE );
HB_PUT_LE_UINT32( &msgbuf[ 4 ], iStreamID );
memset( msgbuf + 8, '\0', sizeof( msgbuf ) - 8 );
s_fileSendMsg( conn, msgbuf, NULL, 0, HB_TRUE );
}
s_fileConUnlock( conn );
}
s_fileConClose( conn );
}
hb_xfree( pszIpAddres );
}
}
hb_retl( fResult );
}
/* retrieve data sent from the server by cominication stream
*
* NETIO_GETDATA( <nStreamID>, [<cServer>], [<nPort>] ) -> <aData>|<cData>|NIL
*/
HB_FUNC( NETIO_GETDATA )
{
int iStreamID = hb_parni( 1 );
if( iStreamID )
{
const char * pszServer = hb_parc( 2 );
char * pszIpAddres;
int iPort = hb_parni( 3 );
s_fileGetConnParam( &pszServer, &iPort, NULL, NULL, NULL );
pszIpAddres = hb_socketResolveAddr( pszServer, HB_SOCKET_AF_INET );
if( pszIpAddres != NULL )
{
PHB_CONCLI conn = s_fileConFind( pszIpAddres, iPort );
if( conn )
{
if( s_fileConLock( conn ) )
{
if( s_fileProcessData( conn ) )
{
PHB_SRVDATA pSrvData = s_fileFindSrvData( conn, iStreamID, 0 );
if( pSrvData )
{
if( pSrvData->type == NETIO_SRVITEM )
{
if( pSrvData->array )
{
hb_itemReturnForward( pSrvData->array );
hb_arrayNew( pSrvData->array, 0 );
}
else
hb_reta( 0 );
}
else if( pSrvData->type == NETIO_SRVDATA )
{
hb_retclen( pSrvData->data, pSrvData->size );
pSrvData->size = 0;
}
}
}
s_fileConUnlock( conn );
}
s_fileConClose( conn );
}
hb_xfree( pszIpAddres );
}
}
}
/* Client methods
@@ -1092,14 +1554,24 @@ static HB_SIZE s_fileReadAt( PHB_FILE pFile, void * data, HB_SIZE ulSize,
if( s_fileSendMsg( pFile->conn, msgbuf, NULL, 0, HB_TRUE ) )
{
HB_ERRCODE errCode = ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 8 ] );
ulResult = HB_GET_LE_UINT32( &msgbuf[ 4 ] );
if( ulResult > 0 )
{
if( ulResult > ulSize ) /* error, it should not happen, enemy attack? */
ulResult = ulSize;
ulResult = s_fileRecvAll( pFile->conn, data, ulResult );
{
pFile->conn->errcode = errCode = NETIO_ERR_WRONG_FILE_SIZE;
hb_errRT_NETIO( EG_DATAWIDTH, 1009, 0, NULL, HB_ERR_FUNCNAME );
ulResult = 0;
}
else if( ( HB_SIZE ) s_fileRecvAll( pFile->conn, data, ulResult ) != ulResult )
{
pFile->conn->errcode = hb_socketGetError();
errCode = NETIO_ERR_READ;
hb_errRT_NETIO( EG_READ, 1010, pFile->conn->errcode, NULL, HB_ERR_FUNCNAME );
}
}
hb_fsSetError( ( HB_ERRCODE ) HB_GET_LE_UINT32( &msgbuf[ 8 ] ) );
hb_fsSetError( errCode );
}
s_fileConUnlock( pFile->conn );
}

View File

@@ -22,7 +22,13 @@
* NETIO_RPC( <pListenSocket> | <pConnectionSocket> [, <lEnable>] )
* -> <lPrev>
* NETIO_RPCFILTER( <pConnectionSocket>,
<sFuncSym> | <hValue> | NIL ) -> NIL
* <sFuncSym> | <hValue> | NIL ) -> NIL
*
* NETIO_SRVSTATUS( <pConnectionSocket> [, <nStreamID>] ) -> <nStatus>
* NETIO_SRVSENDITEM( <pConnectionSocket>, <nStreamID>, <xData> )
* -> <lSent>
* NETIO_SRVSENDDATA( <pConnectionSocket>, <nStreamID>, <cData> )
* -> <lSent>
*
* Copyright 2009 Przemyslaw Czerpak <druzus / at / priv.onet.pl>
* www - http://www.harbour-project.org
@@ -86,6 +92,14 @@
* server code
*/
typedef struct _HB_CONSTREAM
{
int id;
int type;
struct _HB_CONSTREAM * next;
}
HB_CONSTREAM, * PHB_CONSTREAM;
typedef struct _HB_CONSRV
{
HB_SOCKET sd;
@@ -98,6 +112,8 @@ typedef struct _HB_CONSRV
HB_BOOL login;
PHB_SYMB rpcFunc;
PHB_ITEM rpcFilter;
PHB_ITEM mutex;
PHB_CONSTREAM streams;
int rootPathLen;
char rootPath[ HB_PATH_MAX ];
}
@@ -237,6 +253,16 @@ static void s_consrv_close( PHB_CONSRV conn )
if( conn->rpcFilter )
hb_itemRelease( conn->rpcFilter );
while( conn->streams )
{
PHB_CONSTREAM stream = conn->streams;
conn->streams = stream->next;
hb_xfree( stream );
}
if( conn->mutex )
hb_itemRelease( conn->mutex );
if( conn->sd != HB_NO_SOCKET )
hb_socketClose( conn->sd );
@@ -354,24 +380,29 @@ static long s_srvSendAll( PHB_CONSRV conn, void * buffer, long len )
HB_BYTE * ptr = ( HB_BYTE * ) buffer;
long lSent = 0, lLast = 1, l;
while( lSent < len && !conn->stop )
if( !conn->mutex || hb_threadMutexLock( conn->mutex ) )
{
if( conn->zstream )
l = hb_znetWrite( conn->zstream, conn->sd, ptr + lSent, len - lSent, -1, &lLast );
else
l = lLast = hb_socketSend( conn->sd, ptr + lSent, len - lSent, 0, -1 );
if( l > 0 )
lSent += l;
if( lLast <= 0 )
while( lSent < len && !conn->stop )
{
if( hb_socketGetError() != HB_SOCKET_ERR_TIMEOUT ||
hb_vmRequestQuery() != 0 )
break;
if( conn->zstream )
l = hb_znetWrite( conn->zstream, conn->sd, ptr + lSent, len - lSent, -1, &lLast );
else
l = lLast = hb_socketSend( conn->sd, ptr + lSent, len - lSent, 0, -1 );
if( l > 0 )
lSent += l;
if( lLast <= 0 )
{
if( hb_socketGetError() != HB_SOCKET_ERR_TIMEOUT ||
hb_vmRequestQuery() != 0 )
break;
}
}
}
if( conn->zstream && lLast > 0 && !conn->stop )
hb_znetFlush( conn->zstream, conn->sd, -1 );
if( conn->zstream && lLast > 0 && !conn->stop )
hb_znetFlush( conn->zstream, conn->sd, -1 );
if( conn->mutex )
hb_threadMutexUnlock( conn->mutex );
}
return lSent;
}
@@ -583,7 +614,8 @@ HB_FUNC( NETIO_ACCEPT )
if( connsd != HB_NO_SOCKET )
{
hb_socketSetNoDelay( connsd, HB_TRUE );
hb_socketSetKeepAlive( connsd, HB_TRUE );
hb_socketSetNoDelay( connsd, HB_FALSE );
conn = s_consrvNew( connsd, lsd->rootPath, lsd->rpc );
@@ -693,7 +725,7 @@ HB_FUNC( NETIO_SERVER )
HB_BOOL fNoAnswer = HB_FALSE;
HB_ERRCODE errCode = 0, errFsCode;
long len = 0, size, size2;
int iFileNo;
int iFileNo, iStreamID;
HB_U32 uiMsg;
HB_USHORT uiFalgs;
char * szExt;
@@ -983,11 +1015,44 @@ HB_FUNC( NETIO_SERVER )
fNoAnswer = HB_TRUE;
break;
case NETIO_SRVCLOSE:
iStreamID = HB_GET_LE_INT32( &msgbuf[ 4 ] );
if( iStreamID && conn->mutex && hb_threadMutexLock( conn->mutex ) )
{
PHB_CONSTREAM * pStreamPtr = &conn->streams;
while( *pStreamPtr )
{
if( ( *pStreamPtr )->id == iStreamID )
{
PHB_CONSTREAM stream = *pStreamPtr;
*pStreamPtr = stream->next;
hb_xfree( stream );
break;
}
pStreamPtr = &( *pStreamPtr )->next;
}
if( *pStreamPtr == NULL )
iStreamID = 0;
hb_threadMutexUnlock( conn->mutex );
}
else
iStreamID = 0;
if( iStreamID == 0 )
errCode = NETIO_ERR_WRONG_STREAMID;
else
{
HB_PUT_LE_UINT32( &msg[ 0 ], NETIO_SRVCLOSE );
memset( msg + 4, '\0', NETIO_MSGLEN - 4 );
}
break;
case NETIO_PROC:
fNoAnswer = HB_TRUE;
case NETIO_PROCIS:
case NETIO_PROCW:
case NETIO_FUNC:
case NETIO_FUNCCTRL:
if( !conn->rpc )
{
errCode = NETIO_ERR_UNSUPPORTED;
@@ -1030,10 +1095,12 @@ HB_FUNC( NETIO_SERVER )
{
if( hb_vmRequestReenter() )
{
HB_SIZE ulSize = size - size2;
HB_SIZE nSize = size - size2;
HB_USHORT uiPCount = 0;
HB_BOOL fSend = HB_FALSE;
int iStreamType;
iStreamID = 0;
data += size2;
if( pItem )
{
@@ -1053,21 +1120,47 @@ HB_FUNC( NETIO_SERVER )
hb_vmPushDynSym( pDynSym );
hb_vmPushNil();
}
while( ulSize )
if( uiMsg == NETIO_FUNCCTRL )
{
pItem = hb_itemDeserialize( &data, &ulSize );
iStreamID = HB_GET_LE_INT32( &msgbuf[ 8 ] );
iStreamType = HB_GET_LE_INT32( &msgbuf[ 12 ] );
hb_vmPush( hb_param( 1, HB_IT_ANY ) );
hb_vmPushInteger( iStreamID );
uiPCount += 2;
if( iStreamType != NETIO_SRVDATA &&
iStreamType != NETIO_SRVITEM )
iStreamID = 0;
if( iStreamID )
{
PHB_CONSTREAM stream = ( PHB_CONSTREAM )
hb_xgrab( sizeof( HB_CONSTREAM ) );
stream->id = iStreamID;
stream->type = iStreamType;
stream->next = conn->streams;
conn->streams = stream;
if( conn->mutex == NULL )
conn->mutex = hb_threadMutexCreate();
if( !hb_threadMutexLock( conn->mutex ) )
errCode = NETIO_ERR_REFUSED;
}
else
errCode = NETIO_ERR_WRONG_PARAM;
}
while( nSize > 0 && errCode == 0 )
{
pItem = hb_itemDeserialize( &data, &nSize );
if( !pItem )
{
ulSize = 1;
errCode = NETIO_ERR_WRONG_PARAM;
break;
}
++uiPCount;
hb_vmPush( pItem );
hb_itemRelease( pItem );
}
if( ulSize )
if( errCode != 0 )
{
errCode = NETIO_ERR_WRONG_PARAM;
uiPCount += 2;
do
hb_stackPop();
@@ -1079,10 +1172,11 @@ HB_FUNC( NETIO_SERVER )
hb_vmSend( uiPCount );
else
hb_vmProc( uiPCount );
if( uiMsg == NETIO_FUNC )
if( uiMsg == NETIO_FUNC || uiMsg == NETIO_FUNCCTRL )
{
HB_SIZE itmSize;
char * itmData = hb_itemSerialize( hb_stackReturnItem(), HB_TRUE, &itmSize );
PHB_ITEM pResult = hb_stackReturnItem();
char * itmData = hb_itemSerialize( pResult, HB_TRUE, &itmSize );
if( itmSize <= sizeof( buffer ) - NETIO_MSGLEN )
msg = buffer;
else if( !ptr || itmSize > ( HB_SIZE ) size - NETIO_MSGLEN )
@@ -1094,9 +1188,25 @@ HB_FUNC( NETIO_SERVER )
memcpy( msg + NETIO_MSGLEN, itmData, itmSize );
hb_xfree( itmData );
len = itmSize;
if( iStreamID && hb_itemGetNI( pResult ) == iStreamID )
{
hb_threadMutexUnlock( conn->mutex );
iStreamID = 0;
}
}
}
hb_vmRequestRestore();
if( iStreamID )
{
PHB_CONSTREAM stream = conn->streams;
if( stream->id == iStreamID )
{
conn->streams = stream->next;
hb_xfree( conn->streams );
}
hb_threadMutexUnlock( conn->mutex );
}
}
else
errCode = NETIO_ERR_REFUSED;
@@ -1147,3 +1257,120 @@ HB_FUNC( NETIO_SERVER )
}
}
}
/* NETIO_SRVSENDITEM( <pConnectionSocket>, <nStreamID>, <xData> ) -> <lSent>
*/
HB_FUNC( NETIO_SRVSENDITEM )
{
PHB_CONSRV conn = s_consrvParam( 1 );
int iStreamID = hb_parni( 2 );
PHB_ITEM pItem = hb_param( 3, HB_IT_ANY );
HB_BOOL fResult = HB_FALSE;
if( conn && conn->sd != HB_NO_SOCKET && !conn->stop && conn->mutex &&
iStreamID && pItem )
{
char * itmData, * msg;
HB_SIZE nLen;
itmData = hb_itemSerialize( pItem, HB_TRUE, &nLen );
msg = ( char * ) hb_xgrab( nLen + NETIO_MSGLEN );
HB_PUT_LE_UINT32( &msg[ 0 ], NETIO_SRVITEM );
HB_PUT_LE_UINT32( &msg[ 4 ], iStreamID );
HB_PUT_LE_UINT32( &msg[ 8 ], nLen );
memset( msg + 12, '\0', NETIO_MSGLEN - 12 );
memcpy( msg + NETIO_MSGLEN, itmData, nLen );
hb_xfree( itmData );
nLen += NETIO_MSGLEN;
if( hb_threadMutexLock( conn->mutex ) )
{
PHB_CONSTREAM stream = conn->streams;
while( stream )
{
if( stream->id == iStreamID )
break;
stream = stream->next;
}
if( stream && stream->type == NETIO_SRVITEM )
fResult = s_srvSendAll( conn, msg, nLen ) == ( long ) nLen;
hb_threadMutexUnlock( conn->mutex );
}
hb_xfree( msg );
}
hb_retl( fResult );
}
/* NETIO_SRVSENDDATA( <pConnectionSocket>, <nStreamID>, <cData> ) -> <lSent>
*/
HB_FUNC( NETIO_SRVSENDDATA )
{
PHB_CONSRV conn = s_consrvParam( 1 );
int iStreamID = hb_parni( 2 );
HB_SIZE nLen = hb_parclen( 3 );
HB_BOOL fResult = HB_FALSE;
if( conn && conn->sd != HB_NO_SOCKET && !conn->stop && conn->mutex &&
iStreamID && nLen > 0 )
{
char * msg;
msg = ( char * ) hb_xgrab( nLen + NETIO_MSGLEN );
HB_PUT_LE_UINT32( &msg[ 0 ], NETIO_SRVDATA );
HB_PUT_LE_UINT32( &msg[ 4 ], iStreamID );
HB_PUT_LE_UINT32( &msg[ 8 ], nLen );
memset( msg + 12, '\0', NETIO_MSGLEN - 12 );
memcpy( msg + NETIO_MSGLEN, hb_parc( 3 ), nLen );
nLen += NETIO_MSGLEN;
if( hb_threadMutexLock( conn->mutex ) )
{
PHB_CONSTREAM stream = conn->streams;
while( stream )
{
if( stream->id == iStreamID )
break;
stream = stream->next;
}
if( stream && stream->type == NETIO_SRVDATA )
fResult = s_srvSendAll( conn, msg, nLen ) == ( long ) nLen;
hb_threadMutexUnlock( conn->mutex );
}
hb_xfree( msg );
}
hb_retl( fResult );
}
/* NETIO_SRVSTATUS( <pConnectionSocket> [, <nStreamID>] ) -> <nStatus>
*/
HB_FUNC( NETIO_SRVSTATUS )
{
PHB_CONSRV conn = s_consrvParam( 1 );
int iStreamID = hb_parni( 2 );
int iStatus = 0;
if( !conn )
iStatus = -1;
else if( conn->sd != HB_NO_SOCKET )
iStatus = -2;
else if( conn->stop )
iStatus = -3;
else if( iStreamID != 0 && conn->mutex )
{
if( hb_threadMutexLock( conn->mutex ) )
{
PHB_CONSTREAM stream = conn->streams;
while( stream )
{
if( stream->id == iStreamID )
{
iStatus = stream->type == NETIO_SRVDATA ? 1 : 2;
break;
}
stream = stream->next;
}
hb_threadMutexUnlock( conn->mutex );
}
}
hb_retni( iStatus );
}

View File

@@ -89,6 +89,31 @@ Client side functions:
value sent by the server.
NETIO_OPENDATASTREAM( <cStreamFuncName> [, <params,...>] ) -> <nStreamID>
NETIO_OPENITEMSTREAM( <cStreamFuncName> [, <params,...>] ) -> <nStreamID>
open communication stream/channel which allow to send data
asynchronously from server to client.
It executes on the server side:
<cStreamFuncName>( <pConnSock>, <nStreamID> [, <params,...>] )
and then checks value returned by above function. If it's equal to
<nStreamID> then the communication stream is opened and <nStreamID>
is returned to the client.
The function returns new stream ID or -1 if the communication stream
cannot be set.
<cStreamFuncName> may contain information about connection parameters
just like <cProcName> in NETIO_PROC*() functions.
NETIO_CLOSESTREAM( <nStreamID>, [<cServer>], [<nPort>] ) -> <lOK>
close communication stream/channel
NETIO_GETDATA( <nStreamID>, [<cServer>], [<nPort>] ) -> <aData>|<cData>|NIL
retrieve data sent from the server by communication stream.
If stream was open by NETIO_OPENDATASTREAM() then data is returned
as string.
If stream was open by NETIO_OPENITEMSTREAM() then data is returned
as array of items received from the server.
Server side functions:
======================
@@ -108,3 +133,7 @@ Server side functions:
[<lRPC> | <sFuncSym> | <hValue>],
[<cPasswd>], [<nCompressionLevel>], [<nStrategy>] )
-> <pListenSocket>
NETIO_SRVSTATUS( <pConnectionSocket> [, <nStreamID>] ) -> <nStatus>
NETIO_SRVSENDITEM( <pConnectionSocket>, <nStreamID>, <xData> ) -> <lSent>
NETIO_SRVSENDDATA( <pConnectionSocket>, <nStreamID>, <cData> ) -> <lSent>

View File

@@ -0,0 +1,199 @@
/*
* $Id$
*/
/*
* Harbour Project source code:
* demonstration/test code for alternative RDD IO API, RPC and
* asynchronous data streams in NETIO
*
* Copyright 2010 Przemyslaw Czerpak <druzus / at / priv.onet.pl>
* www - http://www.harbour-project.org
*
*/
/* net:127.0.0.1:2941:topsecret:data/_tst_ */
#define DBSERVER "127.0.0.1"
#define DBPORT 2941
#define DBPASSWD "topsecret"
#define DBDIR "data"
#define DBFILE "_tst_"
#define DBNAME "net:" + DBSERVER + ":" + hb_ntos( DBPORT ) + ":" + ;
DBPASSWD + ":" + DBDIR + "/" + DBFILE
request DBFCDX
request HB_DIREXISTS
request MAKEDIR
request HB_DATETIME
proc main()
local pSockSrv, lExists, nStream1, nStream2, nSec, xData
set exclusive off
rddSetDefault( "DBFCDX" )
pSockSrv := netio_mtserver( DBPORT,,, /* RPC */ .T., DBPASSWD )
if empty( pSockSrv )
? "Cannot start NETIO server !!!"
wait "Press any key to exit..."
quit
endif
? "NETIO server activated."
hb_idleSleep( 0.1 )
wait
?
? "NETIO_CONNECT():", netio_connect( DBSERVER, DBPORT, , DBPASSWD )
?
netio_procexec( "QOut", "PROCEXEC", "P2", "P3", "P4" )
netio_funcexec( "QOut", "FUNCEXEC", "P2", "P3", "P4" )
? "SERVER TIME:", netio_funcexec( "hb_dateTime" )
?
wait
nStream1 := NETIO_OPENITEMSTREAM( "reg_stream" )
? "NETIO_OPENITEMSTREAM:", nStream1
nStream2 := NETIO_OPENDATASTREAM( "reg_charstream" )
? "NETIO_OPENDATASTREAM:", nStream2
hb_idleSleep( 3 )
? "NETIO_GETDATA 1:", hb_valToExp( NETIO_GETDATA( nStream1 ) )
? "NETIO_GETDATA 2:", hb_valToExp( NETIO_GETDATA( nStream2 ) )
nSec := seconds() + 3
while seconds() < nSec
xData := NETIO_GETDATA( nStream1 )
if !empty( xData )
? hb_valToExp( xData )
endif
xData := NETIO_GETDATA( nStream2 )
if !empty( xData )
?? "", hb_valToExp( xData )
endif
enddo
wait
? "NETIO_GETDATA 1:", hb_valToExp( NETIO_GETDATA( nStream1 ) )
? "NETIO_GETDATA 2:", hb_valToExp( NETIO_GETDATA( nStream2 ) )
wait
lExists := netio_funcexec( "HB_DirExists", "./data" )
? "Directory './data'", iif( !lExists, "not exists", "exists" )
if !lExists
? "Creating directory './data' ->", ;
iif( netio_funcexec( "MakeDir", "./data" ) == -1, "error", "OK" )
endif
createdb( DBNAME )
testdb( DBNAME )
wait
?
? "table exists:", dbExists( DBNAME )
wait
?
? "delete table with indexes:", dbDrop( DBNAME )
? "table exists:", dbExists( DBNAME )
wait
? "NETIO_GETDATA 1:", hb_valToExp( NETIO_GETDATA( nStream1 ) )
? "NETIO_GETDATA 2:", hb_valToExp( NETIO_GETDATA( nStream2 ) )
? "NETIO_DISCONNECT():", netio_disconnect( DBSERVER, DBPORT )
? "NETIO_CLOSESTREAM 1:", NETIO_CLOSESTREAM( nStream1 )
? "NETIO_CLOSESTREAM 2:", NETIO_CLOSESTREAM( nStream2 )
hb_idleSleep( 2 )
?
? "stopping the server..."
netio_serverstop( pSockSrv, .t. )
return
proc createdb( cName )
local n
dbCreate( cName, {{"F1", "C", 20, 0},;
{"F2", "M", 4, 0},;
{"F3", "N", 10, 2},;
{"F4", "T", 8, 0}} )
? "create neterr:", neterr(), hb_osError()
use (cName)
? "use neterr:", neterr(), hb_osError()
while lastrec() < 100
dbAppend()
n := recno() - 1
field->F1 := chr( n % 26 + asc( "A" ) ) + " " + time()
field->F2 := field->F1
field->F3 := n / 100
field->F4 := hb_dateTime()
enddo
index on field->F1 tag T1
index on field->F3 tag T3
index on field->F4 tag T4
close
?
return
proc testdb( cName )
local i, j
use (cName)
? "used:", used()
? "nterr:", neterr()
? "alias:", alias()
? "lastrec:", lastrec()
? "ordCount:", ordCount()
for i:=1 to ordCount()
ordSetFocus( i )
? i, "name:", ordName(), "key:", ordKey(), "keycount:", ordKeyCount()
next
ordSetFocus( 1 )
dbgotop()
while !eof()
if ! field->F1 == field->F2
? "error at record:", recno()
? " ! '" + field->F1 + "' == '" + field->F2 + "'"
endif
dbSkip()
enddo
wait
i := row()
j := col()
dbgotop()
browse()
setpos( i, j )
close
return
func reg_stream( pConnSock, nStream )
? PROCNAME(), nStream
hb_threadDetach( hb_threadStart( @rpc_timer(), pConnSock, nStream ) )
return nStream
func reg_charstream( pConnSock, nStream )
? PROCNAME(), nStream
hb_threadDetach( hb_threadStart( @rpc_charstream(), pConnSock, nStream ) )
return nStream
static func rpc_timer( pConnSock, nStream )
while .t.
if !netio_srvSendItem( pConnSock, nStream, time() )
? "CLOSED STREAM:", nStream
exit
endif
hb_idleSleep( 1 )
enddo
return nil
static func rpc_charstream( pConnSock, nStream )
local n := 0
while .t.
if !netio_srvSendData( pConnSock, nStream, chr( 65 + n ) )
? "CLOSED STREAM:", nStream
exit
endif
n := int( ( n + 1 ) % 26 )
hb_idleSleep( 0.1 )
enddo
return nil

View File

@@ -357,7 +357,7 @@ static long hb_znetStreamWrite( PHB_ZNETSTREAM pStream, HB_SOCKET sd, HB_MAXINT
*/
long hb_znetFlush( PHB_ZNETSTREAM pStream, HB_SOCKET sd, HB_MAXINT timeout )
{
uInt uiSize = HB_ZNET_BUFSIZE - ( pStream->crypt ? -2 : 0 );
uInt uiSize = HB_ZNET_BUFSIZE - ( pStream->crypt ? 2 : 0 );
if( pStream->wr.avail_out > 0 )
pStream->err = deflate( &pStream->wr, Z_SYNC_FLUSH );