Feature #41
Implement Message Sequencing
Status: | Resolved | Start date: | 03/17/2011 | |
---|---|---|---|---|
Priority: | Normal | Due date: | ||
Assignee: | M. Goetting | % Done: | 100% | |
Category: | C++ | |||
Target version: | - |
Description
In order to send larger event notifications over fragmented port implementations.
Subtasks
History
#1 Updated by J. Wienke over 13 years ago
- Category set to C++
#2 Updated by S. Wrede over 13 years ago
- Due date set to 08/30/2010
- Assignee set to M. Goetting
- Estimated time set to 10.00
#3 Updated by S. Wrede over 13 years ago
Important Classes:
- SpreadMessage
- SpreadConnection
First test case could be to just send lena over the wire and see what happens...
#4 Updated by S. Wrede over 13 years ago
I just came across an example for msg sequencing in noted once. I'll post it here just for reference, however we're going to implement it:
//---------------------------------------------------------------------------- int vtkSocketCommunicator::SendInternal(int socket, void* data, int length) { char* buffer = reinterpret_cast<char*>(data); int total = 0; do { int n = send(socket, buffer+total, length-total, 0); if(n < 1) { return 0; } total += n; } while(total < length); return 1; } //---------------------------------------------------------------------------- int vtkSocketCommunicator::ReceiveInternal(int socket, void* data, int length) { char* buffer = reinterpret_cast<char*>(data); int total = 0; do { #if defined(_WIN32) && !defined(__CYGWIN__) int trys = 0; #endif int n = recv(socket, buffer+total, length-total, 0); if(n < 1) { #if defined(_WIN32) && !defined(__CYGWIN__) // On long messages, Windows recv sometimes fails with WSAENOBUFS, but // will work if you try again. int error = WSAGetLastError(); if ((error == WSAENOBUFS) && (trys++ < 1000)) { Sleep(1); continue; } #endif return 0; } total += n; } while(total < length); return 1; } //---------------------------------------------------------------------------- int vtkSocketCommunicator::SendTagged(void* data, int wordSize, int numWords, int tag, const char* logName) { if(!this->SendInternal(this->Socket, &tag, static_cast<int>(sizeof(int)))) { if (this->ReportErrors) { vtkErrorMacro("Could not send tag."); } return 0; } int length = wordSize * numWords; if(!this->SendInternal(this->Socket, &length, static_cast<int>(sizeof(int)))) { if (this->ReportErrors) { vtkErrorMacro("Could not send length."); } return 0; } if(!this->SendInternal(this->Socket, data, wordSize*numWords)) { if (this->ReportErrors) { vtkErrorMacro("Could not send message."); } return 0; } // Log this event. this->LogTagged("Sent", data, wordSize, numWords, tag, logName); return 1; } //---------------------------------------------------------------------------- int vtkSocketCommunicator::ReceiveTagged(void* data, int wordSize, int numWords, int tag, const char* logName) { int success = 0; int length = -1; while ( !success ) { int recvTag = -1; length = -1; if(!this->ReceiveInternal(this->Socket, &recvTag, static_cast<int>(sizeof(int)))) { if (this->ReportErrors) { vtkErrorMacro("Could not receive tag. " << tag); } return 0; } if(this->SwapBytesInReceivedData == vtkSocketCommunicator::SwapOn) { vtkSwap4(reinterpret_cast<char*>(&recvTag)); } if(!this->ReceiveInternal(this->Socket, &length, static_cast<int>(sizeof(int)))) { if (this->ReportErrors) { vtkErrorMacro("Could not receive length."); } return 0; } if(this->SwapBytesInReceivedData == vtkSocketCommunicator::SwapOn) { vtkSwap4(reinterpret_cast<char*>(&length)); } if(recvTag != tag) { char* idata = new char[length + sizeof(recvTag) + sizeof(length)]; char* ptr = idata; memcpy(ptr, (void*)&recvTag, sizeof(recvTag)); ptr += sizeof(recvTag); memcpy(ptr, (void*)&length, sizeof(length)); ptr += sizeof(length); this->ReceivePartialTagged(ptr, 1, length, tag, "Wrong tag"); int res = this->InvokeEvent(vtkCommand::WrongTagEvent, idata); delete [] idata; if ( res ) { continue; } if (this->ReportErrors) { vtkErrorMacro("Tag mismatch: got " << recvTag << ", expecting " << tag << "."); } return 0; } else { success = 1; } } // Length may not be correct for the first message sent as an // endian handshake because the SwapBytesInReceivedData flag // is not initialized at this point. We could just initialize it // here, but what is the point. if ((wordSize * numWords) != length && this->SwapBytesInReceivedData != vtkSocketCommunicator::SwapNotSet) { if (this->ReportErrors) { vtkErrorMacro("Requested size (" << (wordSize * numWords) << ") is different than the size that was sent (" << length << ")"); } return 0; } return this->ReceivePartialTagged(data, wordSize, numWords, tag, logName); } //---------------------------------------------------------------------------- int vtkSocketCommunicator::ReceivePartialTagged(void* data, int wordSize, int numWords, int tag, const char* logName) { if(!this->ReceiveInternal(this->Socket, data, wordSize*numWords)) { if (this->ReportErrors) { vtkErrorMacro("Could not receive message."); } return 0; } // Unless we're dealing with chars, then check byte ordering. // This is really bad and should probably use some enum for types if(this->SwapBytesInReceivedData == vtkSocketCommunicator::SwapOn) { if(wordSize == 4) { vtkDebugMacro(<< " swapping 4 range, size = " << wordSize << " length = " << numWords); vtkSwap4Range(reinterpret_cast<char*>(data), numWords); } else if(wordSize == 8) { vtkDebugMacro(<< " swapping 8 range, size = " << wordSize << " length = " << numWords ); vtkSwap8Range(reinterpret_cast<char*>(data), numWords); } } // Log this event. this->LogTagged("Received", data, wordSize, numWords, tag, logName); return 1; } //---------------------------------------------------------------------------- template <class T, class OutType> void vtkSocketCommunicatorLogArray(ostream& os, T* array, int length, int max, OutType*) { if(length > 0) { int num = (length <= max)? length:max; os << " data={" << static_cast<OutType>(array[0]); for(int i=1; i < num; ++i) { os << " " << static_cast<OutType>(array[i]); } if(length > max) { os << " ..."; } os << "}"; }
#5 Updated by S. Wrede over 13 years ago
From previous considerations about this topic (just ideas from earlier times):
How to encapuslate message de-multiplexing?- Write basic SpreadNetworkMessage class that serializes data plus status information on a stream, at least * if message is self-contained * how many parts follow * number of current part * if it is the final part * what else?
- Extend SpreadMessage class by * isComplete() * isError()
- An incremental buildup factory method for construction of SpreadMessage out of a sequence of SpreadNetworkMessages?!?
- The semantics of SpreadConnection::receive would change as it shall provide the next 'complete' message to its caller
Just ideas from ancient brainstorming... ;-)
#6 Updated by S. Wrede over 13 years ago
Any updates on the status? We probably need this feature now for HUMAVIPS... So, lets discuss hwo to solve this on Monday!
#7 Updated by S. Wrede over 13 years ago
Any updates on this topic?
#8 Updated by S. Wrede over 13 years ago
- Status changed from New to Feedback
- Priority changed from High to Urgent
#9 Updated by J. Wienke about 13 years ago
We reaaaallllllyyyyyyyyyy need this for the humavips demonstrator to not rely on the strange patched spread daemon anymore. Any chance that you can implement this during the next days?
#10 Updated by J. Wienke about 13 years ago
Isn't this implemented now?? Why is the ticket still opened?
#11 Updated by J. Wienke almost 13 years ago
- Status changed from Feedback to Resolved
There even is a unit test now. It works ;)