33 #ifndef __H__PCL__PCL_INTERFACE_COMMUNICATOR_IMPL__
34 #define __H__PCL__PCL_INTERFACE_COMMUNICATOR_IMPL__
48 template <
class TLayout>
51 m_bDebugCommunication(false),
52 m_bSendBuffersFixed(true)
59 template <
class TLayout>
61 send_raw(
int targetProc,
const void* pBuff,
int bufferSize,
62 bool bSizeKnownAtTarget)
64 assert((targetProc == -1) || (targetProc >= 0 && targetProc <
pcl::NumProcs()));
67 m_curOutProcs.insert(targetProc);
69 if(!bSizeKnownAtTarget)
70 buffer.
write((
char*)&bufferSize,
sizeof(
int));
72 buffer.
write((
const char*)pBuff, bufferSize);
73 m_bSendBuffersFixed = m_bSendBuffersFixed
74 && bSizeKnownAtTarget;
78 template <
class TLayout>
83 if(!interface.empty()){
84 assert((targetProc == -1 || targetProc >= 0) && targetProc <
pcl::NumProcs());
87 m_curOutProcs.insert(targetProc);
89 commPol.
collect(buffer, interface);
90 m_bSendBuffersFixed = m_bSendBuffersFixed
96 template <
class TLayout>
103 send_data(layout, commPol,
typename TLayout::category_tag());
108 template <
class TLayout>
114 typename Layout::const_iterator iter = layout.begin();
115 typename Layout::const_iterator end = layout.end();
119 for(; iter != end; ++iter)
121 if(!layout.interface(iter).empty()){
123 m_curOutProcs.insert(layout.proc_id(iter));
125 commPol.
collect(buffer, layout.interface(iter));
126 m_bSendBuffersFixed = m_bSendBuffersFixed
135 template <
class TLayout>
143 for(
size_t i = 0; i < layout.
num_levels(); ++i)
148 for(; iter != end; ++iter)
152 m_curOutProcs.insert(layout.
proc_id(iter));
155 m_bSendBuffersFixed = m_bSendBuffersFixed
164 template <
class TLayout>
175 template <
class TLayout>
177 receive_raw(
int srcProc,
void* bufOut,
int bufSize)
186 template <
class TLayout>
191 if(!interface.empty()){
199 template <
class TLayout>
210 template <
class TLayout>
211 template <
class TLayoutMap>
214 const typename TLayoutMap::Key& keyFrom,
215 const typename TLayoutMap::Key& keyTo,
218 if(layoutMap.template has_layout<Type>(keyFrom)){
219 send_data(layoutMap.template get_layout<Type>(keyFrom), commPol);
222 if(layoutMap.template has_layout<Type>(keyTo)){
223 receive_data(layoutMap.template get_layout<Type>(keyTo), commPol);
228 template <
class TLayout>
231 std::set<int>& curProcs,
232 const TLayout& layout)
234 prepare_receiver_buffer_map(bufMap, curProcs, layout,
235 typename TLayout::category_tag());
239 template <
class TLayout>
242 std::set<int>& curProcs,
243 const TLayout& layout,
247 for(
typename TLayout::const_iterator li = layout.begin();
248 li != layout.end(); ++li)
250 if(!layout.interface(li).empty()){
251 bufMap[layout.proc_id(li)];
252 curProcs.insert(layout.proc_id(li));
258 template <
class TLayout>
261 std::set<int>& curProcs,
262 const TLayout& layout,
266 for(
size_t i = 0; i < layout.num_levels(); ++i)
268 for(
typename TLayout::const_iterator li = layout.begin(i);
269 li != layout.end(i); ++li)
271 if(!layout.interface(li).empty()){
272 bufMap[layout.proc_id(li)];
273 curProcs.insert(layout.proc_id(li));
280 template <
class TLayout>
284 std::map<int, int>* pMapBuffSizesOut,
287 for(
typename TLayout::const_iterator li = layout.begin();
288 li != layout.end(); ++li)
290 if(!layout.interface(li).empty()){
293 layout.interface(li));
298 else if(pMapBuffSizesOut){
300 std::map<int, int>::iterator iter = pMapBuffSizesOut->find(layout.proc_id(li));
301 if(iter != pMapBuffSizesOut->end())
302 iter->second += buffSize;
304 (*pMapBuffSizesOut)[layout.proc_id(li)] = buffSize;
312 template <
class TLayout>
316 std::map<int, int>* pMapBuffSizesOut,
321 for(
size_t i = 0; i < layout.num_levels(); ++i){
322 for(
typename TLayout::const_iterator li = layout.begin(i);
323 li != layout.end(i); ++li)
325 if(!layout.interface(li).empty()){
328 layout.interface(li));
333 else if(pMapBuffSizesOut){
335 std::map<int, int>::iterator iter = pMapBuffSizesOut->find(layout.proc_id(li));
336 if(iter != pMapBuffSizesOut->end())
337 iter->second += buffSize;
339 (*pMapBuffSizesOut)[layout.proc_id(li)] = buffSize;
348 template <
class TLayout>
353 extract_data(layout, bufMap,
355 typename TLayout::category_tag());
359 template <
class TLayout>
368 for(
typename Layout::const_iterator li = layout.begin();
369 li != layout.end(); ++li)
371 if(!layout.interface(li).empty()){
372 extractor.
extract(bufMap[layout.proc_id(li)],
373 layout.interface(li));
381 template <
class TLayout>
389 for(
size_t i = 0; i < layout.num_levels(); ++i)
392 for(
typename Layout::const_iterator li = layout.begin(i);
393 li != layout.end(i); ++li)
395 if(!layout.interface(li).empty()){
396 extractor.
extract(bufMap[layout.proc_id(li)],
397 layout.interface(li));
407 template <
class TLayout>
411 bool retValue = communicate_and_resume(tag);
418 template <
class TLayout>
424 if(!(m_vSendRequests.empty() && m_vReceiveRequests.empty())){
425 UG_THROW(
"Can't communicate since a previous communication is still pending! "
426 "Make sure to call wait() after each communicate_and_resume()!");
431 m_curInProcs.clear();
435 for(BufferMap::iterator iter = m_bufMapIn.begin();
436 iter != m_bufMapIn.end(); ++iter)
438 iter->second.clear();
444 for(
typename ExtractorInfoList::iterator iter = m_extractorInfos.begin();
445 iter != m_extractorInfos.end(); ++iter)
454 prepare_receiver_buffer_map(m_bufMapIn, m_curInProcs, *info.
m_layout);
461 std::vector<int> dbgRecvFrom, dbgSendTo;
463 if(communication_debugging_enabled()){
465 for(std::set<int>::iterator iter = m_curInProcs.begin();
466 iter != m_curInProcs.end(); ++iter){
467 dbgRecvFrom.push_back(*iter);
470 for(std::set<int>::iterator iter = m_curOutProcs.begin();
471 iter != m_curOutProcs.end(); ++iter)
472 dbgSendTo.push_back(*iter);
475 UG_THROW(
"ERROR in InterfaceCommunicator::communicate(): send / receive mismatch. Aborting.\n");
480 size_t numOutStreams = m_curOutProcs.size();
481 size_t numInStreams = m_curInProcs.size();
484 m_vSendRequests.resize(numOutStreams);
485 m_vReceiveRequests.resize(numInStreams);
490 std::vector<int> vBufferSizesIn(numInStreams);
491 bool allBufferSizesFixed = m_bSendBuffersFixed;
493 if(allBufferSizesFixed)
496 std::map<int, int> mapBuffSizes;
498 for(std::set<int>::iterator iter = m_curInProcs.begin();
499 iter != m_curInProcs.end(); ++iter)
501 mapBuffSizes[*iter] = 0;
505 for(
typename ExtractorInfoList::iterator iter = m_extractorInfos.begin();
506 iter != m_extractorInfos.end(); ++iter)
518 allBufferSizesFixed =
false;
522 mapBuffSizes[info.
m_srcProc] += buffSize;
525 if(!collect_layout_buffer_sizes(*info.
m_layout,
528 typename TLayout::category_tag()))
530 allBufferSizesFixed =
false;
539 if(allBufferSizesFixed){
541 for(std::set<int>::iterator iter = m_curInProcs.begin();
542 iter != m_curInProcs.end(); ++iter, ++counter)
544 vBufferSizesIn[counter] = mapBuffSizes[*iter];
552 if(!allBufferSizesFixed)
558 std::vector<int> streamSizes;
562 for(std::set<int>::iterator iter = m_curInProcs.begin();
563 iter != m_curInProcs.end(); ++iter, ++counter)
565 MPI_Irecv(&vBufferSizesIn[counter],
sizeof(
int), MPI_UNSIGNED_CHAR,
571 streamSizes.resize(m_curOutProcs.size());
572 for(std::set<int>::iterator iter = m_curOutProcs.begin();
573 iter != m_curOutProcs.end(); ++iter, ++counter)
575 streamSizes[counter] = (int)m_bufMapOut[*iter].write_pos();
577 MPI_Isend(&streamSizes[counter],
sizeof(
int), MPI_UNSIGNED_CHAR,
585 Waitall(m_vReceiveRequests, m_vSendRequests);
590 PCL_PROFILE(pcl_IntCom_communicate_resizeRecvBufs);
592 for(std::set<int>::iterator iter = m_curInProcs.begin();
593 iter != m_curInProcs.end(); ++iter, ++counter)
596 buf.
reserve(vBufferSizesIn[counter]);
606 if(communication_debugging_enabled()){
607 std::vector<int> recvBufSizes, sendBufSizes;
608 for(std::set<int>::iterator iter = m_curInProcs.begin();
609 iter != m_curInProcs.end(); ++iter)
610 recvBufSizes.push_back((
int)m_bufMapIn[*iter].write_pos());
612 for(std::set<int>::iterator iter = m_curOutProcs.begin();
613 iter != m_curOutProcs.end(); ++iter)
614 sendBufSizes.push_back((
int)m_bufMapOut[*iter].write_pos());
617 dbgSendTo, sendBufSizes, m_debugProcComm))
619 UG_LOG(
"ERROR in InterfaceCommunicator::communicate(): "
620 "send / receive buffer size mismatch. Aborting.\n");
635 for(std::set<int>::iterator iter = m_curInProcs.begin();
636 iter != m_curInProcs.end(); ++iter, ++counter)
639 <<
"(" << vBufferSizesIn[counter] <<
")");
643 MPI_Irecv(binBuf.
buffer(), vBufferSizesIn[counter], MPI_UNSIGNED_CHAR,
651 for(std::set<int>::iterator iter = m_curOutProcs.begin();
652 iter != m_curOutProcs.end(); ++iter, ++counter)
665 m_bSendBuffersFixed =
true;
672 template <
class TLayout>
682 Waitall(m_vReceiveRequests, m_vSendRequests);
687 for(
typename ExtractorInfoList::iterator iter = m_extractorInfos.begin();
688 iter != m_extractorInfos.end(); ++iter)
704 assert(info.
m_binBuffer &&
"ERROR in InterfaceCommunicator::communicate: No valid receiver specified.");
709 binBuf.
read((
char*)&rawSize,
sizeof(
int));
728 for(BufferMap::iterator iter = m_bufMapOut.begin();
729 iter != m_bufMapOut.end(); ++iter)
732 iter->second.clear();
735 m_curOutProcs.clear();
736 m_extractorInfos.clear();
737 m_vSendRequests.clear();
738 m_vReceiveRequests.clear();
743 template <
class TLayout>
747 static bool bFirstTime =
true;
749 UG_LOG(
"WARNING: Communication debugging enabled in InterfaceCommunicator.");
750 UG_LOG(
" Expect performance penalty!\n");
754 m_bDebugCommunication =
true;
755 m_debugProcComm = involvedProcs;
758 template <
class TLayout>
762 m_bDebugCommunication =
false;
765 template <
class TLayout>
769 return m_bDebugCommunication;
specializations are responsible to pack and unpack interface data during communication.
Definition: pcl_communication_structs.h:790
virtual int get_required_buffer_size(const Interface &interface)
returns the size of the buffer in bytes, that will be required for interface-communication.
Definition: pcl_communication_structs.h:813
virtual bool end_layout_collection(const Layout *pLayout)
signals the end of a layout collection
Definition: pcl_communication_structs.h:827
virtual bool extract(ug::BinaryBuffer &buff, const Interface &interface)=0
extract data from the buffer and assigns it to the interface-elements.
virtual bool begin_layout_collection(const Layout *pLayout)
signals the beginning of a layout collection.
Definition: pcl_communication_structs.h:821
virtual bool collect(ug::BinaryBuffer &buff, const Interface &interface)=0
should write data which is associated with the interface elements to the buffer.
virtual void begin_level_extraction(int level)
signals that a new layout-level will now be processed.
Definition: pcl_communication_structs.h:855
virtual bool end_layout_extraction(const Layout *pLayout)
signals the end of a layout extraction
Definition: pcl_communication_structs.h:845
virtual bool begin_layout_extraction(const Layout *pLayout)
signals the beginning of a layout extraction.
Definition: pcl_communication_structs.h:839
bool communicate_and_resume(int tag=749345)
collects data and communicates it with other processes without waiting for receive
Definition: pcl_interface_communicator_impl.hpp:420
void prepare_receiver_buffer_map(BufferMap &bufMap, std::set< int > &curProcs, const TLayout &layout)
prepare stream-pack-in
Definition: pcl_interface_communicator_impl.hpp:230
void wait()
waits for the data communicated by communicate_and_resume() and extracts it
Definition: pcl_interface_communicator_impl.hpp:674
void send_raw(int targetProc, const void *pBuff, int bufferSize, bool bSizeKnownAtTarget=false)
sends raw data to a target-proc.
Definition: pcl_interface_communicator_impl.hpp:61
TLayout Layout
Definition: pcl_interface_communicator.h:71
void receive_raw(int srcProc, ug::BinaryBuffer &bufOut, int bufSize=-1)
registers a binary-stream to receive data from a source-proc.
Definition: pcl_interface_communicator_impl.hpp:166
Layout::Interface Interface
Definition: pcl_interface_communicator.h:72
void extract_data(const TLayout &layout, BufferMap &bufMap, CommPol &extractor)
extract data from stream-pack
Definition: pcl_interface_communicator_impl.hpp:350
InterfaceCommunicator()
Definition: pcl_interface_communicator_impl.hpp:50
bool communicate(int tag=749345)
sends and receives the collected data.
Definition: pcl_interface_communicator_impl.hpp:409
void send_data(int targetProc, const Interface &interface, ICommunicationPolicy< TLayout > &commPol)
collects data that will be send during communicate.
Definition: pcl_interface_communicator_impl.hpp:80
void exchange_data(const TLayoutMap &layoutMap, const typename TLayoutMap::Key &keyFrom, const typename TLayoutMap::Key &keyTo, ICommunicationPolicy< TLayout > &commPol)
internally calls send_data and receive_data with the specified layouts.
Definition: pcl_interface_communicator_impl.hpp:213
bool communication_debugging_enabled()
returns true if communication debugging is enabled
Definition: pcl_interface_communicator_impl.hpp:767
void disable_communication_debugging()
disables debugging of communication
Definition: pcl_interface_communicator_impl.hpp:760
void receive_data(int srcProc, const Interface &interface, ICommunicationPolicy< TLayout > &commPol)
registers a communication-policy to receive data on communicate.
Definition: pcl_interface_communicator_impl.hpp:188
void enable_communication_debugging(const ProcessCommunicator &involvedProcs=ProcessCommunicator(PCD_WORLD))
enables debugging of communication. This has a severe effect on performance!
Definition: pcl_interface_communicator_impl.hpp:745
bool collect_layout_buffer_sizes(const TLayout &layout, ICommunicationPolicy< TLayout > &commPol, std::map< int, int > *pMapBuffSizesOut, const layout_tags::single_level_layout_tag &)
collects buffer sizes for a given layout and stores them in a map
Definition: pcl_interface_communicator_impl.hpp:282
std::map< int, ug::BinaryBuffer > BufferMap
Definition: pcl_interface_communicator.h:272
the standard multi-level-layout implementation
Definition: pcl_communication_structs.h:615
Interface & interface(iterator iter)
returns the interface to the given iterator.
Definition: pcl_communication_structs.h:683
int proc_id(const_iterator iter) const
returns the interface to the given iterator.
Definition: pcl_communication_structs.h:687
iterator end(size_t level)
returns the iterator to the last interface of the layout in the given level.
Definition: pcl_communication_structs.h:666
size_t num_levels() const
returns the number of levels.
Definition: pcl_communication_structs.h:694
iterator begin(size_t level)
returns the iterator to the first interface of the layout in the given level.
Definition: pcl_communication_structs.h:659
LevelLayout::const_iterator const_iterator
Definition: pcl_communication_structs.h:638
Definition: pcl_process_communicator.h:70
A Buffer for binary data.
Definition: binary_buffer.h:56
char * buffer()
returns the raw buffer pointer or NULL if the buffer is empty (capacity() == 0)
Definition: binary_buffer_impl.h:94
void set_write_pos(size_t pos)
sets the write position.
Definition: binary_buffer.cpp:69
void reserve(size_t newSize)
resizes the associated buffer to the given size.
Definition: binary_buffer.cpp:58
void read(char *buf, size_t size)
reads data of the given size (in bytes)
Definition: binary_buffer_impl.h:58
void clear()
clears the buffer
Definition: binary_buffer.cpp:48
size_t write_pos() const
returns the current write-pos (in bytes)
Definition: binary_buffer_impl.h:53
void write(const char *buf, size_t size)
writes data of the given size (in bytes)
Definition: binary_buffer_impl.h:71
bool SendRecvListsMatch(const std::vector< int > &recvFromTmp, const std::vector< int > &sendTo, const ProcessCommunicator &involvedProcs)
checks whether proc-entries in send- and recv-lists on participating processes match
Definition: pcl_util.cpp:167
void Waitall(std::vector< MPI_Request > &requests, std::vector< MPI_Status > &statuses)
Definition: pcl_methods.h:136
int NumProcs()
returns the number of processes
Definition: pcl_base.cpp:91
bool SendRecvBuffersMatch(const std::vector< int > &recvFrom, const std::vector< int > &recvBufSizes, const std::vector< int > &sendTo, const std::vector< int > &sendBufSizes, const ProcessCommunicator &involvedProcs)
checks whether matching buffers in send- and recv-lists have the same size
Definition: pcl_util.cpp:230
#define UG_THROW(msg)
Definition: error.h:57
#define UG_DLOG(__debugID__, level, msg)
Definition: log.h:298
#define UG_LOG(msg)
Definition: log.h:367
DebugID LIB_PCL
Definition: debug_id.h:132
Definition: parallel_grid_layout.h:46
MPI_Comm PCL_COMM_WORLD
Definition: pcl_comm_world.cpp:34
#define PCL_PROFILE(name)
Definition: pcl_profiling.h:49
#define PCL_PROFILE_FUNC()
Definition: pcl_profiling.h:48