Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ FCCMD = $(FC) $(FFLAGS) -I$(INCDIR) $(G)

TESTS = $(TESTDIR)/sort_test $(TESTDIR)/sort_test2 $(TESTDIR)/sarray_sort_test \
$(TESTDIR)/comm_test $(TESTDIR)/crystal_test \
$(TESTDIR)/sarray_transfer_test $(TESTDIR)/gs_test \
$(TESTDIR)/sarray_transfer_test $(TESTDIR)/sarray_transfer_soa_test \
$(TESTDIR)/gs_test \
$(TESTDIR)/gs_test_gop_blocking $(TESTDIR)/gs_test_gop_nonblocking \
$(TESTDIR)/gs_unique_test \
$(TESTDIR)/findpts_el_2_test \
Expand Down
118 changes: 112 additions & 6 deletions src/sarray_transfer.c
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ uint sarray_transfer_many(
if(!ext) off1 -= sizeof(uint);
row_size=off1; for(i=1;i<An;++i) row_size += size[i];
row_size = (row_size+sizeof(uint)-1)/sizeof(uint);

perm = sortp(&cr->work,0, proc,A[0]->n,proc_stride);

if(!ext) pack_int(&cr->data, row_size, cr->comm.id, A[0]->ptr,A[0]->n,size[0],
Expand All @@ -157,9 +157,9 @@ uint sarray_transfer_many(
proc,proc_stride, perm);
for(off=off1,i=1;i<An;++i) if(size[i])
pack_more(&cr->data,off,row_size, A[i]->ptr,size[i], perm),off+=size[i];

crystal_router(cr);

if(!fixed) {
n = num_rows(&cr->data,row_size);
for(i=0;i<An;++i)
Expand All @@ -171,15 +171,15 @@ uint sarray_transfer_many(
an = n>max?max:n;
for(i=0;i<An;++i) A[i]->n=an;
}

if(!ext) unpack_int (A[0]->ptr,size[0],p_off, &cr->data, row_size, set_src);
else unpack_more(A[0]->ptr,size[0], &cr->data,0,row_size);
for(off=off1,i=1;i<An;++i) if(size[i])
unpack_more(A[i]->ptr,size[i], &cr->data,off,row_size),off+=size[i];

return n;
}


void sarray_transfer_(struct array *const A, const unsigned size,
const unsigned p_off, const int set_src,
Expand All @@ -189,10 +189,116 @@ void sarray_transfer_(struct array *const A, const unsigned size,
(uint*)((char*)A->ptr+p_off),size, cr);
}


void sarray_transfer_ext_(struct array *const A, const unsigned size,
const uint *const proc, const unsigned proc_stride,
struct crystal *const cr)
{
sarray_transfer_many(&A,&size,1, 0,1,0,0, proc,proc_stride, cr);
}

uint sarray_transfer_soa_to_buffer(struct crystal *cr, const uint n_in,
const uint *dest, int n_fields,
const size_t *byte_sizes, void **data_send)
{
uint *perm;
unsigned row_size;
int k;

/* Calculate row size */
size_t row_size_bytes = 0;
for(k=0; k<n_fields; ++k) row_size_bytes += byte_sizes[k];

/* Align to uint */
row_size = (row_size_bytes + sizeof(uint) - 1) / sizeof(uint);

/* Sort based on destination */
perm = sortp(&cr->work, 0, dest, n_in, sizeof(uint));

/* Pack 1st field and specify destination */
pack_ext(&cr->data, row_size, cr->comm.id, (char*)data_send[0], n_in,
byte_sizes[0], dest, sizeof(uint), perm);
size_t off = byte_sizes[0];

/* Pack Remaining Fields */
for(k=1; k<n_fields; ++k) {
pack_more(&cr->data, off, row_size, (char*)data_send[k], byte_sizes[k], perm);
off += byte_sizes[k];
}

/* Transfer */
crystal_router(cr);

/* Return new size */
return num_rows(&cr->data, row_size);
}

void sarray_transfer_unpack_buffer_to_soa(struct crystal *cr, uint n_out,
int n_fields,
const size_t *byte_sizes,
uint *rank_recv, void **data_recv)
{
unsigned row_size;
int k;

if(n_out == 0) return;

/* Recalculate row size */
size_t row_size_bytes = 0;
for(k=0; k<n_fields; ++k) row_size_bytes += byte_sizes[k];

row_size = (row_size_bytes + sizeof(uint) - 1) / sizeof(uint);

/* Unpack source ranks from headers */
if(rank_recv) {
const uint *buf = cr->data.ptr, *buf_end = buf+cr->data.n;
uint *out = rank_recv;
while(buf!=buf_end) {
const uint src = buf[1]; // source rank
const uint len = buf[2]; // payload size in uints
const uint *msg_end = buf+3+len;
buf+=3;
while(buf!=msg_end) {
*out++ = src;
buf+=row_size;
}
}
}

/* Unpack fields */
size_t off = 0;
for(k=0; k<n_fields; ++k) {
if(data_recv[k]) {
unpack_more((char*)data_recv[k], byte_sizes[k], &cr->data, off, row_size);
}
off += byte_sizes[k];
}
}

void sarray_transfer_soa(struct crystal *cr, const uint n_in, const uint *dest,
int n_fields, const size_t *byte_sizes,
void **data_send, uint *n_out, uint **rank_recv,
void **data_recv)
{
int k;

/* Pack + Transfer */
*n_out = sarray_transfer_soa_to_buffer(cr, n_in, dest, n_fields, byte_sizes, data_send);

if (*n_out > 0) {
/* Allocate rank_recv */
*rank_recv = tmalloc(uint, *n_out);

/* Allocate fields */
for (k = 0; k < n_fields; ++k) {
data_recv[k] = smalloc((*n_out) * byte_sizes[k], __FILE__, __LINE__);
}

/* Unpack */
sarray_transfer_unpack_buffer_to_soa(cr, *n_out, n_fields, byte_sizes,
*rank_recv, data_recv);
} else {
*rank_recv = NULL;
for (k = 0; k < n_fields; ++k) data_recv[k] = NULL;
}
}
172 changes: 157 additions & 15 deletions src/sarray_transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,65 +9,67 @@
High-level interface for the crystal router.
Given an array of structs, transfers each to the process indicated
by a field of the struct, which gets set to the source process on output.

For the dynamic "array" type, see "mem.h".

Requires a "crystal router" object:

struct comm c;
struct crystal cr;

comm_init(&c, MPI_COMM_WORLD);
crystal_init(&cr, &c);

Example sarray_transfer usage:

struct T { ...; uint proc; ...; };
struct array A = null_array;
struct T *p, *e;

// resize A to 100 struct T's, fill up with data
p = array_reserve(struct T, &A, 100), A.n=100;
for(e=p+A.n;p!=e;++p) {
...
p->proc = ...;
...
}

// array A represents the array
// struct T ar[A.n] where &ar[0] == A.ptr
// transfer ar[i] to processor ar[i].proc for each i=0,...,A.n-1:

sarray_transfer(struct T, A, proc,set_src, &cr);

// now array A represents a different array with a different size
// struct T ar[A.n] where &ar[0] == A.ptr
// the ordering is arbitrary
// if set_src != 0, ar[i].proc is set to the proc where ar[i] came from
// otherwise ar[i].proc is unchanged (and == this proc id)

// note: two calls of
sarray_transfer(struct T, A, proc,1, &cr);
// in a row should return A to its original state, up to ordering

Cleanup:
array_free(&A);
crystal_free(&cr);
comm_free(&c);

Example sarray_transfer_ext usage:

struct T { ... };
struct array A;
uint proc[A.n];

// array A represents the array
// struct T ar[A.n] where &ar[0] == A.ptr
// transfer ar[i] to processor proc[i] for each i=0,...,A.n-1:
sarray_transfer_ext(struct T, &A, proc, &cr);

// no information is available now on where each struct came from

There is a variant of sarray_transfer that transfers multiple arrays without
requiring the data to be packed in array of structs first. See sarray_transfer_soa.
*/

#define sarray_transfer_many GS_PREFIXED_NAME(sarray_transfer_many)
Expand All @@ -92,4 +94,144 @@ void sarray_transfer_ext_(struct array *const A, const unsigned size,
#define sarray_transfer_ext(T,A,proc,proc_stride,cr) \
sarray_transfer_ext_(A,sizeof(T),proc,proc_stride,cr)

/*
Transfer multiple arrays.
Input:
cr: crystal router object
n_in: number of entities to transfer per array
dest: destination ranks (size n_in)
n_fields: number of arrays to transfer
byte_sizes: array of sizes in bytes (size n_fields).
Each element of this array tells the size of a single entry in the corresponding array. e.g.,
for an array of double, byte_sizes[i] = sizeof(double).
data_send: array of pointers to input arrays (size n_fields).
data_send[i] points to the input array for field i
(must be contiguous in memory with size n_in*byte_sizes[i]).
Output:
Returns n_out (number of received elements after crystal router transfer).

E.g. usage: Consider there are n_in = 10 particles in 3D space, and we have
the following n_fields=3 associated with each particle:
(i) coordinates [x, y, and z, each a double] - we assume that the coordinates
are packed as [x0,y0,z0,x1,y1,z1,...,x9,y9,z9].
(ii) color [uint]
(iii) and charge [double]
These could be initialized on each rank after allocating memory:

double *coords = tmalloc(double, n_in * dim);
uint *color = tmalloc(uint, n_in);
double *charge = tmalloc(double, n_in);

.
.
assign coords, color, and charge for each particle
.
.

Assuming we want to transfer these particles to ranks specified in dest:

uint *dest = tmalloc(uint, n_in);
.
.
assign destination for each particle
.
.

To transfer the particles, first specify byte size for each field:
size_t sizes[] = {sizeof(double)*dim, sizeof(uint), sizeof(double)};

Then specify the pointers to each of the fields to transfer:
n_fields = 3
void **data_send = tmalloc(void*, n_fields);
data_send[0] = coords
data_send[1] = color
data_send[2] = charge

Note that the information in data_send should be consistent with the byte sizes specified in sizes.

uint n_out = sarray_transfer_soa_to_buffer(&cr, n_in, dest, n_fields, sizes, data_send);

This returns number of particles received after transfer. The data for each
particle is currently stored in crystal router buffer and must be unpacked
into user allocated data memory. Note that the crystal object must not be
modified until the data is unpacked using sarray_transfer_unpack_buffer_to_soa.

Note since "coords" is setup such that the x,y,z coordinate of
each particle is contiguously packed, we specify the byte_size for coordinates to be sizeof(double)*dim. This allows the crystal router
to pack/unpack the xyz coordinate of each particle via a single memcpy.
If instead the coordinates are packed
as [x0,x1,x2,..xN,y0,y1,...yN,z0,...zN], they should be transferred as
three separate fields, each with byte_size = sizeof(double)
and corresponding pointer in memory. In that case,

n_fields = 5
void **data_send = tmalloc(void*, n_fields);
data_send[0] = coords
data_send[1] = coords + n_in
data_send[2] = coords + 2*n_in
data_send[3] = color
data_send[4] = charge

size_t sizes[] = {sizeof(double), sizeof(double), sizeof(double),
sizeof(uint), sizeof(double)};
*/
uint sarray_transfer_soa_to_buffer(struct crystal *cr, const uint n_in,
const uint *dest, int n_fields,
const size_t *byte_sizes, void **data_send);

/*
Unpack transferred data from crystal router's internal buffer into provided output arrays. Must be called immediately after sarray_transfer_soa_to_buffer, and before any other crystal operation that may overwrite cr->data.

Input:
cr: crystal router object
n_out: number of received elements (returned by
sarray_transfer_soa_to_buffer)
n_fields: number of data fields
byte_sizes: array of sizes (in bytes) for each field (size n_fields)

Output:
rank_recv: array to store source ranks (size n_out).
Caller must allocate this. Only specify it if keep_src was true
in sarray_transfer_soa_to_buffer.
data_recv: array of pointers to output arrays (size n_fields).
data_recv[k] must point to a buffer of size n_out*byte_sizes[k].
Caller must allocate these buffers.

Following the example from sarray_transfer_soa_to_buffer, the user can unpack
received information by allocating memory for received particles

rank_recv = tmalloc(uint, n_out)
void **data_recv = tmalloc(void*, n_fields);
data_recv[0] = tmalloc(double, n_out*dim)
data_recv[1] = tmalloc(uint, n_out)
data_recv[2] = tmalloc(double, n_out)

sarray_transfer_unpack_buffer_to_soa(&cr, n_out, n_fields, sizes, rank_recv, data_recv);

Alternatively, see sarray_transfer_soa if you would like the crystal router
to pack, transfer, and unpack in a single function call. In that method,
the function allocates the output buffers (`*rank_recv` and `data_recv[k]`);
the caller must free them with `free()`.
*/
void sarray_transfer_unpack_buffer_to_soa(struct crystal *cr, uint n_out,
int n_fields,
const size_t *byte_sizes,
uint *rank_recv, void **data_recv);

/*
Structure-of-Arrays (SoA).
Transfers multiple arrays to different ranks directly without packing in
array of structs first.
(1) Uses sarray_transfer_soa_to_buffer to pack data in crystal router buffer
and transfer.
(2) Unpacks crystal router buffer post-transfer and returns the pointers
to the unpacked data.
Note the memory must be freed by the caller using free(ptr) for each
pointer in data_recv.
*/
void sarray_transfer_soa(struct crystal *cr, const uint n_in, const uint *dest,
int n_fields, const size_t *byte_sizes,
void **data_send,
uint *n_out, uint **rank_recv, void **data_recv);

#endif
Loading