Loading [MathJax]/extensions/tex2jax.js
ug4
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
pcl_interface_communicator_impl.hpp
Go to the documentation of this file.
1/*
2 * Copyright (c) 2009-2015: G-CSC, Goethe University Frankfurt
3 * Author: Sebastian Reiter
4 *
5 * This file is part of UG4.
6 *
7 * UG4 is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License version 3 (as published by the
9 * Free Software Foundation) with the following additional attribution
10 * requirements (according to LGPL/GPL v3 §7):
11 *
12 * (1) The following notice must be displayed in the Appropriate Legal Notices
13 * of covered and combined works: "Based on UG4 (www.ug4.org/license)".
14 *
15 * (2) The following notice must be displayed at a prominent place in the
16 * terminal output of covered works: "Based on UG4 (www.ug4.org/license)".
17 *
18 * (3) The following bibliography is recommended for citation and must be
19 * preserved in all covered files:
20 * "Reiter, S., Vogel, A., Heppner, I., Rupp, M., and Wittum, G. A massively
21 * parallel geometric multigrid solver on hierarchically distributed grids.
22 * Computing and visualization in science 16, 4 (2013), 151-164"
23 * "Vogel, A., Reiter, S., Rupp, M., Nägel, A., and Wittum, G. UG4 -- a novel
24 * flexible software system for simulating pde based models on high performance
25 * computers. Computing and visualization in science 16, 4 (2013), 165-179"
26 *
27 * This program is distributed in the hope that it will be useful,
28 * but WITHOUT ANY WARRANTY; without even the implied warranty of
29 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
30 * GNU Lesser General Public License for more details.
31 */
32
33#ifndef __H__PCL__PCL_INTERFACE_COMMUNICATOR_IMPL__
34#define __H__PCL__PCL_INTERFACE_COMMUNICATOR_IMPL__
35
36#include <cassert>
37#include "mpi.h"
38#include "pcl_methods.h"
41#include "pcl_profiling.h"
42#include "pcl_util.h"
43#include "common/log.h"
44
45namespace pcl
46{
47
48template <class TLayout>
51 m_bDebugCommunication(false),
52 m_bSendBuffersFixed(true)
53{
54// UG_LOG("DEBUG: Enabling debug communication in constructor of InterfaceCommunicator\n");
55// enable_communication_debugging();
56}
57
59template <class TLayout>
61send_raw(int targetProc, const void* pBuff, int bufferSize,
62 bool bSizeKnownAtTarget)
63{
64 assert((targetProc == -1) || (targetProc >= 0 && targetProc < pcl::NumProcs()));
65
66 ug::BinaryBuffer& buffer = m_bufMapOut[targetProc];
67 m_curOutProcs.insert(targetProc);
68
69 if(!bSizeKnownAtTarget)
70 buffer.write((char*)&bufferSize, sizeof(int));
71
72 buffer.write((const char*)pBuff, bufferSize);
73 m_bSendBuffersFixed = m_bSendBuffersFixed
74 && bSizeKnownAtTarget;
75}
76
78template <class TLayout>
80send_data(int targetProc, const Interface& interface,
82{
83 if(!interface.empty()){
84 assert((targetProc == -1 || targetProc >= 0) && targetProc < pcl::NumProcs());
85
86 ug::BinaryBuffer& buffer = m_bufMapOut[targetProc];
87 m_curOutProcs.insert(targetProc);
88
89 commPol.collect(buffer, interface);
90 m_bSendBuffersFixed = m_bSendBuffersFixed
91 && (commPol.get_required_buffer_size(interface) >= 0);
92 }
93}
94
96template <class TLayout>
98send_data(const Layout& layout, ICommunicationPolicy<TLayout>& commPol)
99{
100 PCL_PROFILE(pcl_IntCom_send_layout_data);
101 if(!layout.empty()){
102 // through the the category_tag we're able to find the correct send method.
103 send_data(layout, commPol, typename TLayout::category_tag());
104 }
105}
108template <class TLayout>
110send_data(const Layout& layout,
113{
114 typename Layout::const_iterator iter = layout.begin();
115 typename Layout::const_iterator end = layout.end();
117 commPol.begin_layout_collection(&layout);
118
119 for(; iter != end; ++iter)
120 {
121 if(!layout.interface(iter).empty()){
122 ug::BinaryBuffer& buffer = m_bufMapOut[layout.proc_id(iter)];
123 m_curOutProcs.insert(layout.proc_id(iter));
124
125 commPol.collect(buffer, layout.interface(iter));
126 m_bSendBuffersFixed = m_bSendBuffersFixed
127 && (commPol.get_required_buffer_size(layout.interface(iter)) >= 0);
128 }
129 }
130
131 commPol.end_layout_collection(&layout);
132}
133
135template <class TLayout>
137send_data(const Layout& layout,
140{
141 commPol.begin_layout_collection(&layout);
142
143 for(size_t i = 0; i < layout.num_levels(); ++i)
144 {
145 typename Layout::const_iterator iter = layout.begin(i);
146 typename Layout::const_iterator end = layout.end(i);
147
148 for(; iter != end; ++iter)
149 {
150 if(!layout.interface(iter).empty()){
151 ug::BinaryBuffer& buffer = m_bufMapOut[layout.proc_id(iter)];
152 m_curOutProcs.insert(layout.proc_id(iter));
153
154 commPol.collect(buffer, layout.interface(iter));
155 m_bSendBuffersFixed = m_bSendBuffersFixed
156 && (commPol.get_required_buffer_size(layout.interface(iter)) >= 0);
157 }
158 }
159 }
160 commPol.end_layout_collection(&layout);
162
164template <class TLayout>
166receive_raw(int srcProc, ug::BinaryBuffer& bufOut, int bufSize)
167{
168 m_extractorInfos.push_back(ExtractorInfo(srcProc, NULL,
169 NULL, NULL,
170 NULL, &bufOut,
171 bufSize));
172}
173
175template <class TLayout>
177receive_raw(int srcProc, void* bufOut, int bufSize)
178{
179 m_extractorInfos.push_back(ExtractorInfo(srcProc, NULL,
180 NULL, NULL,
181 bufOut, NULL,
182 bufSize));
183}
184
186template <class TLayout>
188receive_data(int srcProc, const Interface& interface,
190{
191 if(!interface.empty()){
192 m_extractorInfos.push_back(ExtractorInfo(srcProc, &commPol
193 &interface, NULL,
194 NULL, NULL, 0));
195 }
196}
197
199template <class TLayout>
202{
203 if(!layout.empty()){
204 m_extractorInfos.push_back(ExtractorInfo(-1, &commPol,
205 NULL, &layout,
206 NULL, NULL, 0));
207 }
208}
209
210template <class TLayout>
211template <class TLayoutMap>
213exchange_data(const TLayoutMap& layoutMap,
214 const typename TLayoutMap::Key& keyFrom,
215 const typename TLayoutMap::Key& keyTo,
217{
218 if(layoutMap.template has_layout<Type>(keyFrom)){
219 send_data(layoutMap.template get_layout<Type>(keyFrom), commPol);
220 }
221
222 if(layoutMap.template has_layout<Type>(keyTo)){
223 receive_data(layoutMap.template get_layout<Type>(keyTo), commPol);
224 }
225}
226
228template <class TLayout>
231 std::set<int>& curProcs,
232 const TLayout& layout)
233{
234 prepare_receiver_buffer_map(bufMap, curProcs, layout,
235 typename TLayout::category_tag());
236}
237
239template <class TLayout>
242 std::set<int>& curProcs,
243 const TLayout& layout,
245{
246// simply 'touch' the buffer to make sure it's in the map.
247 for(typename TLayout::const_iterator li = layout.begin();
248 li != layout.end(); ++li)
249 {
250 if(!layout.interface(li).empty()){
251 bufMap[layout.proc_id(li)];
252 curProcs.insert(layout.proc_id(li));
253 }
254 }
255}
256
258template <class TLayout>
261 std::set<int>& curProcs,
262 const TLayout& layout,
264{
265// simply 'touch' the buffer to make sure it's in the map.
266 for(size_t i = 0; i < layout.num_levels(); ++i)
267 {
268 for(typename TLayout::const_iterator li = layout.begin(i);
269 li != layout.end(i); ++li)
270 {
271 if(!layout.interface(li).empty()){
272 bufMap[layout.proc_id(li)];
273 curProcs.insert(layout.proc_id(li));
274 }
275 }
277}
278
280template <class TLayout>
282collect_layout_buffer_sizes(const TLayout& layout,
284 std::map<int, int>* pMapBuffSizesOut,
287 for(typename TLayout::const_iterator li = layout.begin();
288 li != layout.end(); ++li)
289 {
290 if(!layout.interface(li).empty()){
291 // get the buffer size
292 int buffSize = commPol.get_required_buffer_size(
293 layout.interface(li));
294 if(buffSize < 0){
295 // buffer sizes can't be determined
296 return false;
297 }
298 else if(pMapBuffSizesOut){
299 // find the entry in the map
300 std::map<int, int>::iterator iter = pMapBuffSizesOut->find(layout.proc_id(li));
301 if(iter != pMapBuffSizesOut->end())
302 iter->second += buffSize;
303 else
304 (*pMapBuffSizesOut)[layout.proc_id(li)] = buffSize;
306 }
307 }
308 return true;
309}
310
312template <class TLayout>
314collect_layout_buffer_sizes(const TLayout& layout,
316 std::map<int, int>* pMapBuffSizesOut,
318{
320// iterate through all interfaces
321 for(size_t i = 0; i < layout.num_levels(); ++i){
322 for(typename TLayout::const_iterator li = layout.begin(i);
323 li != layout.end(i); ++li)
325 if(!layout.interface(li).empty()){
326 // get the buffer size
327 int buffSize = commPol.get_required_buffer_size(
328 layout.interface(li));
329 if(buffSize < 0){
330 // buffer sizes can't be determined
331 return false;
332 }
333 else if(pMapBuffSizesOut){
334 // find the entry in the map
335 std::map<int, int>::iterator iter = pMapBuffSizesOut->find(layout.proc_id(li));
336 if(iter != pMapBuffSizesOut->end())
337 iter->second += buffSize;
338 else
339 (*pMapBuffSizesOut)[layout.proc_id(li)] = buffSize;
340 }
341 }
342 }
343 }
344 return true;
345}
346
348template <class TLayout>
350extract_data(const TLayout& layout, BufferMap& bufMap, CommPol& extractor)
351{
353 extract_data(layout, bufMap,
354 extractor,
355 typename TLayout::category_tag());
356}
357
359template <class TLayout>
361extract_data(const TLayout& layout, BufferMap& bufMap, CommPol& extractor,
363{
364 extractor.begin_layout_extraction(&layout);
365 extractor.begin_level_extraction(0);
366
367// extract data for the layouts interfaces
368 for(typename Layout::const_iterator li = layout.begin();
369 li != layout.end(); ++li)
370 {
371 if(!layout.interface(li).empty()){
372 extractor.extract(bufMap[layout.proc_id(li)],
373 layout.interface(li));
374 }
375 }
376
377 extractor.end_layout_extraction(&layout);
378}
379
381template <class TLayout>
383extract_data(const TLayout& layout, BufferMap& bufMap, CommPol& extractor,
385{
386 extractor.begin_layout_extraction(&layout);
387
388// extract data for the layouts interfaces
389 for(size_t i = 0; i < layout.num_levels(); ++i)
390 {
391 extractor.begin_level_extraction(i);
392 for(typename Layout::const_iterator li = layout.begin(i);
393 li != layout.end(i); ++li)
394 {
395 if(!layout.interface(li).empty()){
396 extractor.extract(bufMap[layout.proc_id(li)],
397 layout.interface(li));
398 }
399 }
400 }
401
402 extractor.end_layout_extraction(&layout);
403}
404
405
407template <class TLayout>
409communicate(int tag)
410{
411 bool retValue = communicate_and_resume(tag); // use an arbitrary tag... (this is a little dangerous...)
412 wait();
413 return retValue;
414}
415
416
418template <class TLayout>
421{
422 PCL_PROFILE(pcl_IntCom_communicate);
423
424 if(!(m_vSendRequests.empty() && m_vReceiveRequests.empty())){
425 UG_THROW("Can't communicate since a previous communication is still pending! "
426 "Make sure to call wait() after each communicate_and_resume()!");
427 }
428
429 bool retVal = true;
430
431 m_curInProcs.clear();
432
433// note that we won't free the memory in the stream-packs.
434// we will only reset their write and read pointers.
435 for(BufferMap::iterator iter = m_bufMapIn.begin();
436 iter != m_bufMapIn.end(); ++iter)
437 {
438 iter->second.clear();
439 }
440
441
442// iterate through all registered extractors and create entries for
443// the source-processes in the map (by simply 'touching' the entry).
444 for(typename ExtractorInfoList::iterator iter = m_extractorInfos.begin();
445 iter != m_extractorInfos.end(); ++iter)
446 {
447 ExtractorInfo& info = *iter;
448 if(info.m_srcProc > -1){
449 m_bufMapIn[info.m_srcProc];
450 m_curInProcs.insert(info.m_srcProc);
451 }
452 else
453 {
454 prepare_receiver_buffer_map(m_bufMapIn, m_curInProcs, *info.m_layout);
455 }
456 }
457
458// if communication_debugging is enabled, then we check whether scheduled
459// sends and receives match.
460// those vectors are only filled, if debugging is performed.
461 std::vector<int> dbgRecvFrom, dbgSendTo;
462
463 if(communication_debugging_enabled()){
464 // fill receive and send vectors
465 for(std::set<int>::iterator iter = m_curInProcs.begin();
466 iter != m_curInProcs.end(); ++iter){
467 dbgRecvFrom.push_back(*iter);
468 }
469
470 for(std::set<int>::iterator iter = m_curOutProcs.begin();
471 iter != m_curOutProcs.end(); ++iter)
472 dbgSendTo.push_back(*iter);
473
474 if(!SendRecvListsMatch(dbgRecvFrom, dbgSendTo, m_debugProcComm)){
475 UG_THROW("ERROR in InterfaceCommunicator::communicate(): send / receive mismatch. Aborting.\n");
476 }
477 }
478
479// number of in and out-streams.
480 size_t numOutStreams = m_curOutProcs.size();
481 size_t numInStreams = m_curInProcs.size();
482
483// used for mpi-communication.
484 m_vSendRequests.resize(numOutStreams);
485 m_vReceiveRequests.resize(numInStreams);
486
487
489// determine buffer sizes and communicate them if required.
490 std::vector<int> vBufferSizesIn(numInStreams);
491 bool allBufferSizesFixed = m_bSendBuffersFixed;
492
493 if(allBufferSizesFixed)
494 {
495 // a map with <procId, Size>. Will be used to collect stream-sizes
496 std::map<int, int> mapBuffSizes;
497 // initialise all sizes with 0
498 for(std::set<int>::iterator iter = m_curInProcs.begin();
499 iter != m_curInProcs.end(); ++iter)
500 {
501 mapBuffSizes[*iter] = 0;
502 }
503
504 // iterate over all extractors and collect the buffer sizes
505 for(typename ExtractorInfoList::iterator iter = m_extractorInfos.begin();
506 iter != m_extractorInfos.end(); ++iter)
507 {
508 ExtractorInfo& info = *iter;
509 if(info.m_srcProc > -1){
510 // the extractor only has a single interface.
511 int buffSize = -1;
512 if(info.m_extractor)
513 buffSize = info.m_extractor->get_required_buffer_size(*info.m_interface);
514 else
515 buffSize = info.m_rawSize;
516
517 if(buffSize < 0){
518 allBufferSizesFixed = false;
519 break;
520 }
521 else
522 mapBuffSizes[info.m_srcProc] += buffSize;
523 }
524 else{
525 if(!collect_layout_buffer_sizes(*info.m_layout,
526 *info.m_extractor,
527 &mapBuffSizes,
528 typename TLayout::category_tag()))
529 {
530 allBufferSizesFixed = false;
531 break;
532 }
533 }
534 }
535
536 // if all buffer sizes are fixed, we'll copy them to vBufferSizes.
537 // to reduce the amount of possible errors, we'll iterate over
538 // m_streamPackIn...
539 if(allBufferSizesFixed){
540 int counter = 0;
541 for(std::set<int>::iterator iter = m_curInProcs.begin();
542 iter != m_curInProcs.end(); ++iter, ++counter)
543 {
544 vBufferSizesIn[counter] = mapBuffSizes[*iter];
545 }
546 }
547 }
548
549
550
551// if the buffer size could not be determined, we have to communicate it.
552 if(!allBufferSizesFixed)
553 {
554 PCL_PROFILE(pcl_IntCom_communicateBufSizes);
555
556 int sizeTag = tag;
557 int counter;
558 std::vector<int> streamSizes;
559
560 // shedule receives first
561 counter = 0;
562 for(std::set<int>::iterator iter = m_curInProcs.begin();
563 iter != m_curInProcs.end(); ++iter, ++counter)
564 {
565 MPI_Irecv(&vBufferSizesIn[counter], sizeof(int), MPI_UNSIGNED_CHAR,
566 *iter, sizeTag, PCL_COMM_WORLD, &m_vReceiveRequests[counter]);
567 }
568
569 // send buffer sizes
570 counter = 0;
571 streamSizes.resize(m_curOutProcs.size());
572 for(std::set<int>::iterator iter = m_curOutProcs.begin();
573 iter != m_curOutProcs.end(); ++iter, ++counter)
574 {
575 streamSizes[counter] = (int)m_bufMapOut[*iter].write_pos();
576
577 MPI_Isend(&streamSizes[counter], sizeof(int), MPI_UNSIGNED_CHAR,
578 *iter, sizeTag, PCL_COMM_WORLD, &m_vSendRequests[counter]);
579 }
580
581 // TODO: this can be improved:
582 // instead of waiting for all, one could wait until one has finished and directly
583 // start copying the data to the local receive buffer. Afterwards on could continue
584 // by waiting for the next one etc...
585 Waitall(m_vReceiveRequests, m_vSendRequests);
586 }
587
588// we can now resize the receive buffers to their final sizes
589 {
590 PCL_PROFILE(pcl_IntCom_communicate_resizeRecvBufs);
591 size_t counter = 0;
592 for(std::set<int>::iterator iter = m_curInProcs.begin();
593 iter != m_curInProcs.end(); ++iter, ++counter)
594 {
595 ug::BinaryBuffer& buf = m_bufMapIn[*iter];
596 buf.reserve(vBufferSizesIn[counter]);
597 // since we will write the data to the associated buffer
598 // using a raw copy, we will also set the write-position
599 // at this point (the write position will not be used during raw copy).
600 buf.set_write_pos(vBufferSizesIn[counter]);
601 }
602 }
603
604// if communication_debugging is enabled, we can now check whether associated
605// send / receive buffers have the same size.
606 if(communication_debugging_enabled()){
607 std::vector<int> recvBufSizes, sendBufSizes;
608 for(std::set<int>::iterator iter = m_curInProcs.begin();
609 iter != m_curInProcs.end(); ++iter)
610 recvBufSizes.push_back((int)m_bufMapIn[*iter].write_pos());
611
612 for(std::set<int>::iterator iter = m_curOutProcs.begin();
613 iter != m_curOutProcs.end(); ++iter)
614 sendBufSizes.push_back((int)m_bufMapOut[*iter].write_pos());
615
616 if(!SendRecvBuffersMatch(dbgRecvFrom, recvBufSizes,
617 dbgSendTo, sendBufSizes, m_debugProcComm))
618 {
619 UG_LOG("ERROR in InterfaceCommunicator::communicate(): "
620 "send / receive buffer size mismatch. Aborting.\n");
621 retVal = false;
622 }
623 }
624
625
627// communicate data.
628 PCL_PROFILE(pcl_IntCom_communicateData);
629 int dataTag = tag;
630
631 UG_DLOG(ug::LIB_PCL, 1, "receiving from procs:");
632
633// first schedule receives
634 int counter = 0;
635 for(std::set<int>::iterator iter = m_curInProcs.begin();
636 iter != m_curInProcs.end(); ++iter, ++counter)
637 {
638 UG_DLOG(ug::LIB_PCL, 1, " " << *iter
639 << "(" << vBufferSizesIn[counter] << ")");
640
641 ug::BinaryBuffer& binBuf = m_bufMapIn[*iter];
642 // receive the data
643 MPI_Irecv(binBuf.buffer(), vBufferSizesIn[counter], MPI_UNSIGNED_CHAR,
644 *iter, dataTag, PCL_COMM_WORLD, &m_vReceiveRequests[counter]);
645 }
646
647 UG_DLOG(ug::LIB_PCL, 1, "\nsending to procs:");
648
649// now send data
650 counter = 0;
651 for(std::set<int>::iterator iter = m_curOutProcs.begin();
652 iter != m_curOutProcs.end(); ++iter, ++counter)
653 {
654 ug::BinaryBuffer& binBuf = m_bufMapOut[*iter];
655
656 UG_DLOG(ug::LIB_PCL, 1, " " << *iter
657 << "(" << binBuf.write_pos() << ")");
658
659 MPI_Isend(binBuf.buffer(), binBuf.write_pos(), MPI_UNSIGNED_CHAR,
660 *iter, dataTag, PCL_COMM_WORLD, &m_vSendRequests[counter]);
661 }
662 UG_DLOG(ug::LIB_PCL, 1, "\n");
663
664// reset m_bSendBuffersFixed for the next communication
665 m_bSendBuffersFixed = true;
666
667// done
668 return retVal;
669}
670
671
672template <class TLayout>
674wait()
675{
676// TODO: this can be improved:
677// instead of waiting for all, one could wait until one has finished and directly
678// start copying the data to the local receive buffer. Afterwards on could continue
679// by waiting for the next one etc...
680 {
681 PCL_PROFILE(pcl_IntCom_MPIWait);
682 Waitall(m_vReceiveRequests, m_vSendRequests);
683 }
684
685
686// call the extractors with the received data
687 for(typename ExtractorInfoList::iterator iter = m_extractorInfos.begin();
688 iter != m_extractorInfos.end(); ++iter)
689 {
690 ExtractorInfo& info = *iter;
691 if(info.m_srcProc > -1)
692 {
693 // extract the data for single proc
694 ug::BinaryBuffer& binBuf = m_bufMapIn[info.m_srcProc];
695 // this can be either an interface, a void* buffer or a
696 // binary-stream.
697 if(info.m_interface){
698 info.m_extractor->extract(binBuf, *info.m_interface);
699 }
700 else if(info.m_buffer){
701 binBuf.read((char*)info.m_buffer, info.m_rawSize);
702 }
703 else{
704 assert(info.m_binBuffer && "ERROR in InterfaceCommunicator::communicate: No valid receiver specified.");
705
706 int rawSize = info.m_rawSize;
707 if(rawSize < 0){
708 // the raw size is encoded in the buffer in this case.
709 binBuf.read((char*)&rawSize, sizeof(int));
710 }
711
712 info.m_binBuffer->clear();
713 info.m_binBuffer->reserve(rawSize);
714 binBuf.read((char*)info.m_binBuffer->buffer(), rawSize);
715 info.m_binBuffer->set_write_pos(rawSize);
716 }
717 }
718 else
719 {
720 // extract the data for a layout
721 extract_data(*info.m_layout,
722 m_bufMapIn,
723 *info.m_extractor);
724 }
725 }
726
727// clean up
728 for(BufferMap::iterator iter = m_bufMapOut.begin();
729 iter != m_bufMapOut.end(); ++iter)
730 {
731 // clear does not free memory. Only resets read and write pointers.
732 iter->second.clear();
733 }
734
735 m_curOutProcs.clear();
736 m_extractorInfos.clear();
737 m_vSendRequests.clear();
738 m_vReceiveRequests.clear();
739}
740
741
742
743template <class TLayout>
746{
747 static bool bFirstTime = true;
748 if(bFirstTime){
749 UG_LOG("WARNING: Communication debugging enabled in InterfaceCommunicator.");
750 UG_LOG(" Expect performance penalty!\n");
751 bFirstTime = false;
752 }
753
754 m_bDebugCommunication = true;
755 m_debugProcComm = involvedProcs;
756}
757
758template <class TLayout>
761{
762 m_bDebugCommunication = false;
763}
764
765template <class TLayout>
768{
769 return m_bDebugCommunication;
770}
771
772}// end of namespace pcl
773
774#endif
specializations are responsible to pack and unpack interface data during communication.
Definition pcl_communication_structs.h:790
virtual int get_required_buffer_size(const Interface &interface)
returns the size of the buffer in bytes, that will be required for interface-communication.
Definition pcl_communication_structs.h:813
virtual bool end_layout_collection(const Layout *pLayout)
signals the end of a layout collection
Definition pcl_communication_structs.h:827
virtual bool extract(ug::BinaryBuffer &buff, const Interface &interface)=0
extract data from the buffer and assigns it to the interface-elements.
virtual bool begin_layout_collection(const Layout *pLayout)
signals the beginning of a layout collection.
Definition pcl_communication_structs.h:821
virtual bool collect(ug::BinaryBuffer &buff, const Interface &interface)=0
should write data which is associated with the interface elements to the buffer.
virtual void begin_level_extraction(int level)
signals that a new layout-level will now be processed.
Definition pcl_communication_structs.h:855
virtual bool end_layout_extraction(const Layout *pLayout)
signals the end of a layout extraction
Definition pcl_communication_structs.h:845
virtual bool begin_layout_extraction(const Layout *pLayout)
signals the beginning of a layout extraction.
Definition pcl_communication_structs.h:839
bool communicate_and_resume(int tag=749345)
collects data and communicates it with other processes without waiting for receive
Definition pcl_interface_communicator_impl.hpp:420
void prepare_receiver_buffer_map(BufferMap &bufMap, std::set< int > &curProcs, const TLayout &layout)
prepare stream-pack-in
Definition pcl_interface_communicator_impl.hpp:230
void wait()
waits for the data communicated by communicate_and_resume() and extracts it
Definition pcl_interface_communicator_impl.hpp:674
void send_raw(int targetProc, const void *pBuff, int bufferSize, bool bSizeKnownAtTarget=false)
sends raw data to a target-proc.
Definition pcl_interface_communicator_impl.hpp:61
TLayout Layout
Definition pcl_interface_communicator.h:71
void receive_raw(int srcProc, ug::BinaryBuffer &bufOut, int bufSize=-1)
registers a binary-stream to receive data from a source-proc.
Definition pcl_interface_communicator_impl.hpp:166
Layout::Interface Interface
Definition pcl_interface_communicator.h:72
void extract_data(const TLayout &layout, BufferMap &bufMap, CommPol &extractor)
extract data from stream-pack
Definition pcl_interface_communicator_impl.hpp:350
InterfaceCommunicator()
Definition pcl_interface_communicator_impl.hpp:50
bool communicate(int tag=749345)
sends and receives the collected data.
Definition pcl_interface_communicator_impl.hpp:409
void send_data(int targetProc, const Interface &interface, ICommunicationPolicy< TLayout > &commPol)
collects data that will be send during communicate.
Definition pcl_interface_communicator_impl.hpp:80
void exchange_data(const TLayoutMap &layoutMap, const typename TLayoutMap::Key &keyFrom, const typename TLayoutMap::Key &keyTo, ICommunicationPolicy< TLayout > &commPol)
internally calls send_data and receive_data with the specified layouts.
Definition pcl_interface_communicator_impl.hpp:213
bool communication_debugging_enabled()
returns true if communication debugging is enabled
Definition pcl_interface_communicator_impl.hpp:767
void disable_communication_debugging()
disables debugging of communication
Definition pcl_interface_communicator_impl.hpp:760
void receive_data(int srcProc, const Interface &interface, ICommunicationPolicy< TLayout > &commPol)
registers a communication-policy to receive data on communicate.
Definition pcl_interface_communicator_impl.hpp:188
void enable_communication_debugging(const ProcessCommunicator &involvedProcs=ProcessCommunicator(PCD_WORLD))
enables debugging of communication. This has a severe effect on performance!
Definition pcl_interface_communicator_impl.hpp:745
bool collect_layout_buffer_sizes(const TLayout &layout, ICommunicationPolicy< TLayout > &commPol, std::map< int, int > *pMapBuffSizesOut, const layout_tags::single_level_layout_tag &)
collects buffer sizes for a given layout and stores them in a map
Definition pcl_interface_communicator_impl.hpp:282
std::map< int, ug::BinaryBuffer > BufferMap
Definition pcl_interface_communicator.h:272
Definition pcl_process_communicator.h:70
the standard single-level-layout implementation
Definition pcl_communication_structs.h:452
iterator end(size_t level=0)
returns the iterator to the last interface of the layout.
Definition pcl_communication_structs.h:492
Interface & interface(iterator iter)
returns the interface to the given iterator.
Definition pcl_communication_structs.h:505
iterator begin(size_t level=0)
returns the iterator to the first interface of the layout.
Definition pcl_communication_structs.h:486
int proc_id(iterator iter) const
returns the target process of the interface given in iterator
Definition pcl_communication_structs.h:509
InterfaceMap::const_iterator const_iterator
Definition pcl_communication_structs.h:477
size_t num_levels() const
returns 1
Definition pcl_communication_structs.h:502
marks a layout as a multi-level layout
Definition pcl_communication_structs.h:426
marks a layout as a single-level layout
Definition pcl_communication_structs.h:407
A Buffer for binary data.
Definition binary_buffer.h:56
char * buffer()
returns the raw buffer pointer or NULL if the buffer is empty (capacity() == 0)
Definition binary_buffer_impl.h:94
void set_write_pos(size_t pos)
sets the write position.
Definition binary_buffer.cpp:69
void reserve(size_t newSize)
resizes the associated buffer to the given size.
Definition binary_buffer.cpp:58
void read(char *buf, size_t size)
reads data of the given size (in bytes)
Definition binary_buffer_impl.h:58
void clear()
clears the buffer
Definition binary_buffer.cpp:48
size_t write_pos() const
returns the current write-pos (in bytes)
Definition binary_buffer_impl.h:53
void write(const char *buf, size_t size)
writes data of the given size (in bytes)
Definition binary_buffer_impl.h:71
bool SendRecvListsMatch(const std::vector< int > &recvFromTmp, const std::vector< int > &sendTo, const ProcessCommunicator &involvedProcs)
checks whether proc-entries in send- and recv-lists on participating processes match
Definition pcl_util.cpp:167
void Waitall(std::vector< MPI_Request > &requests, std::vector< MPI_Status > &statuses)
Definition pcl_methods.h:136
int NumProcs()
returns the number of processes
Definition pcl_base.cpp:91
bool SendRecvBuffersMatch(const std::vector< int > &recvFrom, const std::vector< int > &recvBufSizes, const std::vector< int > &sendTo, const std::vector< int > &sendBufSizes, const ProcessCommunicator &involvedProcs)
checks whether matching buffers in send- and recv-lists have the same size
Definition pcl_util.cpp:230
#define UG_THROW(msg)
Definition error.h:57
#define UG_DLOG(__debugID__, level, msg)
Definition log.h:298
#define UG_LOG(msg)
Definition log.h:367
DebugID LIB_PCL
Definition debug_id.h:132
Definition parallel_grid_layout.h:46
MPI_Comm PCL_COMM_WORLD
Definition pcl_comm_world.cpp:34
#define PCL_PROFILE(name)
Definition pcl_profiling.h:49
#define PCL_PROFILE_FUNC()
Definition pcl_profiling.h:48
holds information that will be passed to the extract routines.
Definition pcl_interface_communicator.h:340
void * m_buffer
Definition pcl_interface_communicator.h:356
CommPol * m_extractor
Definition pcl_interface_communicator.h:353
int m_rawSize
Definition pcl_interface_communicator.h:358
int m_srcProc
Definition pcl_interface_communicator.h:352
const Interface * m_interface
Definition pcl_interface_communicator.h:354
const Layout * m_layout
Definition pcl_interface_communicator.h:355
ug::BinaryBuffer * m_binBuffer
Definition pcl_interface_communicator.h:357