Go to the documentation of this file.
1 /*
2  * Copyright (c) 2011-2015: G-CSC, Goethe University Frankfurt
3  * Author: Martin Rupp
4  *
5  * This file is part of UG4.
6  *
7  * UG4 is free software: you can redistribute it and/or modify it under the
8  * terms of the GNU Lesser General Public License version 3 (as published by the
9  * Free Software Foundation) with the following additional attribution
10  * requirements (according to LGPL/GPL v3 §7):
11  *
12  * (1) The following notice must be displayed in the Appropriate Legal Notices
13  * of covered and combined works: "Based on UG4 (".
14  *
15  * (2) The following notice must be displayed at a prominent place in the
16  * terminal output of covered works: "Based on UG4 (".
17  *
18  * (3) The following bibliography is recommended for citation and must be
19  * preserved in all covered files:
20  * "Reiter, S., Vogel, A., Heppner, I., Rupp, M., and Wittum, G. A massively
21  * parallel geometric multigrid solver on hierarchically distributed grids.
22  * Computing and visualization in science 16, 4 (2013), 151-164"
23  * "Vogel, A., Reiter, S., Rupp, M., Nägel, A., and Wittum, G. UG4 -- a novel
24  * flexible software system for simulating pde based models on high performance
25  * computers. Computing and visualization in science 16, 4 (2013), 165-179"
26  *
27  * This program is distributed in the hope that it will be useful,
28  * but WITHOUT ANY WARRANTY; without even the implied warranty of
30  * GNU Lesser General Public License for more details.
31  */
36 #include "parallel_nodes.h"
37 #include "serialize_interfaces.h"
38 #include "common/debug_print.h"
40 namespace ug{
42 template<typename matrix_type>
43 void SerializeRow(BinaryBuffer &stream, const matrix_type &mat, size_t localRowIndex, ParallelNodes &PN)
44 {
45  PROFILE_FUNC_GROUP("algebra parallelization");
46  const AlgebraID &globalRowIndex = PN.local_to_global(localRowIndex);
47  UG_COND_THROW(globalRowIndex.master_proc() > pcl::NumProcs() || globalRowIndex.master_proc() < 0, globalRowIndex);
48  // serialize global row index
49  Serialize(stream, globalRowIndex);
51  size_t num_connections = mat.num_connections(localRowIndex);
53  // serialize number of connections
54  Serialize(stream, num_connections);
55  UG_DLOG(LIB_ALG_AMG, 4, "Sending row " << localRowIndex << " (" << globalRowIndex << "), " << num_connections << " cons: ");
57  for(typename matrix_type::const_row_iterator conn = mat.begin_row(localRowIndex);
58  conn != mat.end_row(localRowIndex); ++conn)
59  {
60  size_t localColIndex = conn.index();
61  const AlgebraID &globalColIndex = PN.local_to_global(localColIndex);
62  UG_COND_THROW(globalColIndex.master_proc() > pcl::NumProcs() || globalColIndex.master_proc() < 0, globalColIndex);
64  UG_DLOG(LIB_ALG_AMG, 4, localColIndex << " (" << globalColIndex << ") -> " << conn.value() << " ");
66  // serialize connection
67  Serialize(stream, globalColIndex);
68  Serialize(stream, conn.value());
69  }
70  UG_DLOG(LIB_ALG_AMG, 4, "\n");
71 }
74 template<typename matrix_type>
75 void SendMatrix(const matrix_type &A, IndexLayout &verticalSlaveLayout, int destproc, ParallelNodes &PN)
76 {
77  PROFILE_FUNC_GROUP("algebra parallelization");
78  UG_DLOG(LIB_ALG_AMG, 1, "\n*********** SendMatrix ************\n\n");
80  pcl::InterfaceCommunicator<IndexLayout> &communicator = A.layouts()->comm();
81  BinaryBuffer stream;
83  Serialize(stream, A.num_rows());
84  for(size_t i=0; i<A.num_rows(); i++)
85  SerializeRow(stream, A, i, PN);
87  SerializeLayout(stream, A.layouts()->master(), PN);
88  SerializeLayout(stream, A.layouts()->slave(), PN);
90  IndexLayout::Interface &verticalInterface = verticalSlaveLayout.interface(destproc);
91  for(size_t i=0; i<A.num_rows(); i++)
92  verticalInterface.push_back(i);
94  UG_DLOG(LIB_ALG_AMG, 3, "Srcproc " << pcl::ProcRank() << " is sending " << stream.write_pos() << " bytes of data to destproc " << destproc << "\n");
95  communicator.send_raw(destproc, stream.buffer(), stream.write_pos(), false);
96  communicator.communicate();
97 }
99 template<typename TConnectionType>
101 {
102  PROFILE_FUNC_GROUP("algebra parallelization");
103  AlgebraID globalRowIndex;
105  // serialize global row index
106  Deserialize(stream, globalRowIndex);
107  size_t localRowIndex = PN.global_to_local(globalRowIndex);
109  UG_DLOG(LIB_ALG_AMG, 4, "Got row " << localRowIndex << " (" << globalRowIndex << "), ");
110  size_t num_connections;
112  // serialize number of connections
113  Deserialize(stream, num_connections);
115  UG_DLOG(LIB_ALG_AMG, 4, num_connections << " connections: ");
117  cons.resize(num_connections);
118  for(size_t i =0; i<num_connections; i++)
119  {
120  AlgebraID globalColIndex;
121  Deserialize(stream, globalColIndex);
122  cons[i].iIndex = PN.global_to_local(globalColIndex);
123  Deserialize(stream, cons[i].dValue);
124  UG_DLOG(LIB_ALG_AMG, 4, cons[i].iIndex << " (" << globalColIndex << ") -> " << cons[i].dValue << " ");
125  }
126  UG_DLOG(LIB_ALG_AMG, 4, "\n");
127  return localRowIndex;
128 }
130 // ReceiveMatrix
131 //---------------------------------------------------------------------------
141 template<typename matrix_type>
142 void ReceiveMatrix(const matrix_type &A, matrix_type &M, IndexLayout &verticalMasterLayout, const std::vector<int> &srcprocs,
143  ParallelNodes &PN)
144 {
145  PROFILE_FUNC_GROUP("algebra parallelization");
146  UG_DLOG(LIB_ALG_AMG, 1, "\n*********** ReceiveMatrix ************\n\n");
147  pcl::InterfaceCommunicator<IndexLayout> &communicator = A.layouts()->comm();
149  M = A;
150  //M.print();
152  typedef std::map<int, BinaryBuffer> BufferMap;
153  BufferMap streams;
155  UG_DLOG(LIB_ALG_AMG, 3, "DestProc " << pcl::ProcRank() << " is waiting on data from ");
156  for(size_t i=0; i<srcprocs.size(); i++)
157  {
158  UG_DLOG(LIB_ALG_AMG, 3, srcprocs[i] << " ");
159  communicator.receive_raw(srcprocs[i], streams[srcprocs[i]]);
160  }
161  UG_DLOG(LIB_ALG_AMG, 3, "\n");
162  communicator.communicate();
164  AlgebraID globalRowIndex, globalColIndex;;
165  size_t num_connections, numRows;
167  for(size_t i=0; i<srcprocs.size(); i++)
168  {
169  int pid = srcprocs[i];
170  BinaryBuffer &stream = streams[pid];
171  stream.set_read_pos(0);
173  UG_DLOG(LIB_ALG_AMG, 4, "received " << stream.write_pos() << " bytes of data from process " << pid << "\n");
174  IndexLayout::Interface &verticalInterface = verticalMasterLayout.interface(pid);
175  typename matrix_type::connection con;
177  Deserialize(stream, numRows);
178  for(size_t i=0; i<numRows; i++)
179  {
180  // serialize global row index, number of connections
181  Deserialize(stream, globalRowIndex);
182  Deserialize(stream, num_connections);
183  UG_COND_THROW(globalRowIndex.master_proc() > pcl::NumProcs() || globalRowIndex.master_proc() < 0, i << " " << globalRowIndex << " " << pid);
185  size_t localRowIndex = PN.get_local_index_or_create_new(globalRowIndex, 0);
186  verticalInterface.push_back(localRowIndex);
187  UG_DLOG(LIB_ALG_AMG, 4, "Got row " << localRowIndex << " (" << globalRowIndex << "), ");
188  UG_DLOG(LIB_ALG_AMG, 4, num_connections << " connections: ");
190  for(size_t pid =0; pid<num_connections; pid++)
191  {
192  Deserialize(stream, globalColIndex);
193  Deserialize(stream, con.dValue);
195  con.iIndex = PN.get_local_index_or_create_new(globalColIndex, 0);
196  UG_DLOG(LIB_ALG_AMG, 4, con.iIndex << " (" << globalColIndex << ") -> " << con.dValue << " ");
197  }
198  UG_DLOG(LIB_ALG_AMG, 4, "\n");
199  }
200  }
202  M.resize_and_keep_values(PN.local_size(), PN.local_size());
203  //M.print();
205  for(size_t i=0; i<srcprocs.size(); i++)
206  {
207  int pid = srcprocs[i];
208  BinaryBuffer &stream = streams[pid];
209  stream.set_read_pos(0);
212  Deserialize(stream, numRows);
213  for(size_t i=0; i<numRows; i++)
214  {
215  size_t localRowIndex = DeserializeRow(stream, cons, PN);
216  if(cons.size())
217  M.add_matrix_row(localRowIndex, &cons[0], cons.size());
218  }
219  }
221  //UG_DLOG(LIB_ALG_AMG, 4, "\n** the matrix M: \n\n");
222  //M.print();
223  //UG_DLOG(LIB_ALG_AMG, 4, "\n");
226  //PrintLayout(processCommunicator, communicator, masterLayout, slaveLayout);
227 }
245 template<typename matrix_type>
246 void CollectMatrixOnOneProc(const matrix_type &A, matrix_type &collectedA, IndexLayout &masterLayout, IndexLayout &slaveLayout)
247 {
248  try{
249  PROFILE_FUNC_GROUP("algebra parallelization");
250  UG_DLOG(LIB_ALG_AMG, 1, "\n*********** SendMatrix ************\n\n");
251  std::vector<int> srcprocs;
252  masterLayout.clear();
253  slaveLayout.clear();
255  const pcl::ProcessCommunicator &pc = A.layouts()->proc_comm();
256  ParallelNodes PN(A.layouts(), A.num_rows());
258  if(pcl::ProcRank() == pc.get_proc_id(0))
259  {
260  srcprocs.resize(pc.size()-1);
261  for(size_t i=1; i<pc.size(); i++)
262  srcprocs[i-1] = pc.get_proc_id(i);
263  ReceiveMatrix(A, collectedA, masterLayout, srcprocs, PN);
264  }
265  else
266  SendMatrix(A, slaveLayout, pc.get_proc_id(0), PN);
267  }UG_CATCH_THROW(__FUNCTION__ << " failed");
268 }
280 template<typename T>
281 void GatherVectorOnOne(IndexLayout &agglomeratedMaster, IndexLayout &agglomeratedSlave,
283  ParallelVector<T> &collectedVec,
284  const ParallelVector<T> &vec,
285  ParallelStorageType type, bool bRoot)
286 {
287  try{
288  PROFILE_FUNC_GROUP("algebra parallelization");
290  if(!bRoot)
291  {
292  ComPol_VecAdd<vector_type > compolAdd(&collectedVec, &vec);
293  com.send_data(agglomeratedSlave, compolAdd);
294  com.communicate();
295  }
296  else
297  {
298  //UG_LOG("gather_vertical: receiving data at level " << level << "\n");
299  UG_COND_THROW(&vec == &collectedVec, "vec and collected vec may not be same");
300  collectedVec.set(0.0);
301  for(size_t i=0; i<vec.size(); i++)
302  collectedVec[i] = vec[i];
304  UG_COND_THROW(!vec.has_storage_type(type), "storage type is " << vec.get_storage_type() << ", not " << type);
305  if(type == PST_ADDITIVE)
306  {
307  ComPol_VecAdd<vector_type > compolAdd(&collectedVec, &vec);
308  com.receive_data(agglomeratedMaster, compolAdd);
309  com.communicate();
310  collectedVec.set_storage_type(PST_ADDITIVE);
311  }
312  else if(type == PST_CONSISTENT)
313  {
314  ComPol_VecCopy<vector_type > compolCopy(&collectedVec, &vec);
315  com.receive_data(agglomeratedMaster, compolCopy);
316  com.communicate();
317  collectedVec.set_storage_type(PST_CONSISTENT);
318  }
319  else { UG_THROW("storage type " << type << "unsupported."); }
320  }
321  }UG_CATCH_THROW(__FUNCTION__ << " failed");
322 }
334 template<typename T>
335 void BroadcastVectorFromOne(IndexLayout &agglomeratedMaster, IndexLayout &agglomeratedSlave,
337  ParallelVector<T> &vec,
338  const ParallelVector<T> &collectedVec,
339  ParallelStorageType type, bool bRoot)
340 {
341  PROFILE_FUNC_GROUP("algebra parallelization");
342  try{
344  if(!bRoot)
345  {
346  ComPol_VecCopy<vector_type> compolCopy(&vec, &collectedVec);
347  com.receive_data(agglomeratedSlave, compolCopy);
348  com.communicate();
349  vec.set_storage_type(type);
350  }
351  else
352  {
353  UG_COND_THROW(&vec == &collectedVec, "vec and collected vec may not be same");
354  for(size_t i=0; i<vec.size(); i++)
355  vec[i] = collectedVec[i];
357  UG_COND_THROW(!collectedVec.has_storage_type(type), "storage type is " << collectedVec.get_storage_type() << ", not " << type);
358  vec.set_storage_type(type);
360  ComPol_VecAdd<vector_type > compolCopy(&vec, &collectedVec);
361  com.send_data(agglomeratedMaster, compolCopy);
362  com.communicate();
363  }
365  if(type == PST_ADDITIVE)
366  {
368  // das problem ist, dass der vektor noch slave-interfaces nach "au�en" haben kann,
369  // diese werden dann f�lschlicherweise auch 0 gesetzt.
372  //vec.set_storage_type(PST_ADDITIVE);
374  }
375  else if(type == PST_CONSISTENT) { }
376  else { UG_THROW("storage type " << type << "unsupported."); }
378  }UG_CATCH_THROW(__FUNCTION__ << " failed");
379 }
381 } // namespace ug
383 #endif /* __H__LIB_ALGEBRA__COLLECT_MATRIX_H_ */
Definition: smart_pointer.h:108
Performs communication between interfaces on different processes.
Definition: pcl_interface_communicator.h:68
void send_raw(int targetProc, const void *pBuff, int bufferSize, bool bSizeKnownAtTarget=false)
sends raw data to a target-proc.
Definition: pcl_interface_communicator_impl.hpp:61
void receive_raw(int srcProc, ug::BinaryBuffer &bufOut, int bufSize=-1)
registers a binary-stream to receive data from a source-proc.
Definition: pcl_interface_communicator_impl.hpp:166
bool communicate(int tag=749345)
sends and receives the collected data.
Definition: pcl_interface_communicator_impl.hpp:409
void send_data(int targetProc, const Interface &interface, ICommunicationPolicy< TLayout > &commPol)
collects data that will be send during communicate.
Definition: pcl_interface_communicator_impl.hpp:80
void receive_data(int srcProc, const Interface &interface, ICommunicationPolicy< TLayout > &commPol)
registers a communication-policy to receive data on communicate.
Definition: pcl_interface_communicator_impl.hpp:188
You may add elements to this interface and iterate over them.
Definition: pcl_communication_structs.h:207
iterator push_back(const Element &elem)
Definition: pcl_communication_structs.h:245
Definition: pcl_process_communicator.h:70
int get_proc_id(size_t index) const
returns the i-th process in the communicator
Definition: pcl_process_communicator.cpp:86
size_t size() const
returns the size of the communicator
Definition: pcl_process_communicator.cpp:71
Interface & interface(iterator iter)
returns the interface to the given iterator.
Definition: pcl_communication_structs.h:505
void clear()
clears the layout
Definition: pcl_communication_structs.h:522
Extends the HorizontalAlgebraLayouts by vertical layouts.
Definition: algebra_layouts.h:121
A Buffer for binary data.
Definition: binary_buffer.h:56
char * buffer()
returns the raw buffer pointer or NULL if the buffer is empty (capacity() == 0)
Definition: binary_buffer_impl.h:94
void set_read_pos(size_t pos)
sets the read position (in bytes).
Definition: binary_buffer.cpp:64
size_t write_pos() const
returns the current write-pos (in bytes)
Definition: binary_buffer_impl.h:53
Communication Policy to add values of a vector.
Definition: communication_policies.h:319
Communication Policy to copy values of a vector.
Definition: communication_policies.h:88
Wrapper for sequential matrices to handle them in parallel.
Definition: parallel_matrix.h:65
void set_layouts(ConstSmartPtr< AlgebraLayouts > layouts)
sets the algebra layouts
Definition: parallel_matrix.h:97
ConstSmartPtr< AlgebraLayouts > layouts() const
returns the algebra layouts
Definition: parallel_matrix.h:94
Definition: parallel_nodes.h:112
const AlgebraID & local_to_global(size_t i) const
Definition: parallel_nodes.h:278
size_t get_local_index_or_create_new(const AlgebraID &globalIndex, int distanceToMasterOrInner, bool &bCreated)
get_index_or_create_new: returns a local index by creating and saving a new one or returning an old
Definition: parallel_nodes.cpp:107
size_t local_size() const
Definition: parallel_nodes.h:273
size_t global_to_local(const AlgebraID &globalIndex) const
Definition: parallel_nodes.cpp:141
Definition: parallel_vector.h:60
bool has_storage_type(uint type) const
returns if the current storage type has a given representation
Definition: parallel_vector.h:119
ParallelStorageType get_storage_type() const
Definition: parallel_vector.h:124
void set(number w, ParallelStorageType type)
set all entries to value and the storage type
Definition: parallel_vector_impl.h:223
void set_storage_type(uint type)
sets the storage type
Definition: parallel_vector.h:104
Definition: stl_debug.h:45
Definition: parallel_storage_type.h:66
Definition: parallel_storage_type.h:68
Definition: parallel_storage_type.h:69
int ProcRank()
returns the rank of the process
Definition: pcl_base.cpp:83
int NumProcs()
returns the number of processes
Definition: pcl_base.cpp:91
#define UG_CATCH_THROW(msg)
Definition: error.h:64
#define UG_THROW(msg)
Definition: error.h:57
#define UG_DLOG(__debugID__, level, msg)
Definition: log.h:298
Definition: debug_id.h:131
#define UG_COND_THROW(cond, msg)
UG_COND_THROW(cond, msg) : performs a UG_THROW(msg) if cond == true.
Definition: error.h:61
CPUAlgebra::vector_type vector_type
the ug namespace
void SerializeRow(BinaryBuffer &stream, const matrix_type &mat, size_t localRowIndex, ParallelNodes &PN)
Definition: collect_matrix.h:43
void CollectMatrixOnOneProc(const matrix_type &A, matrix_type &collectedA, IndexLayout &masterLayout, IndexLayout &slaveLayout)
Definition: collect_matrix.h:246
void Deserialize(TIStream &buf, ParallelVector< T > &v)
Deerialize for ParallelVector<T>
Definition: restart_bridge.cpp:112
size_t DeserializeRow(BinaryBuffer &stream, stdvector< TConnectionType > &cons, ParallelNodes &PN)
Definition: collect_matrix.h:100
void ReceiveMatrix(const matrix_type &A, matrix_type &M, IndexLayout &verticalMasterLayout, const std::vector< int > &srcprocs, ParallelNodes &PN)
Definition: collect_matrix.h:142
void GatherVectorOnOne(HorizontalAlgebraLayouts &agglomerationLayout, ParallelVector< T > &collectedVec, const ParallelVector< T > &vec, ParallelStorageType type)
Definition: agglomerating_solver.h:53
void BroadcastVectorFromOne(HorizontalAlgebraLayouts &agglomerationLayout, ParallelVector< T > &vec, const ParallelVector< T > &collectedVec, ParallelStorageType type)
Definition: agglomerating_solver.h:65
void Serialize(TOStream &buf, const ParallelVector< T > &v)
Serialize for ParallelVector<T>
Definition: restart_bridge.cpp:103
void SerializeLayout(BinaryBuffer &stream, const IndexLayout &layout, const TLocalToGlobal &localToGlobal)
Definition: serialize_interfaces.h:63
void SendMatrix(const matrix_type &A, IndexLayout &verticalSlaveLayout, int destproc, ParallelNodes &PN)
Definition: collect_matrix.h:75
#define PROFILE_FUNC_GROUP(groups)
Definition: profiler.h:258
size_t num_connections(size_t row) const
this type is used to identify distributed objects.
Definition: algebra_id.h:46
int master_proc() const
Definition: algebra_id.h:54