Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load data from multiple files in parallel? #49

Open
takluyver opened this issue Apr 16, 2020 · 11 comments · May be fixed by #340
Open

Load data from multiple files in parallel? #49

takluyver opened this issue Apr 16, 2020 · 11 comments · May be fixed by #340

Comments

@takluyver
Copy link
Member

Since #30, EXtra-data can open multiple files in parallel when initially opening a run. But if we then need to pull data from many files - sequence files or detector modules - we still handle these one after the other. I'm noticing this in particular with detector data access - just constructing an object like DSSC1M needs to open all the files to get the number of entries recorded for each train, so it can identify gaps in the data. Maybe we could parallelise loading this.

Creating the 'virtual overview file' we discussed before would improve this case (because the indexes would be copied into the overview file). But it wouldn't help (much) with loading real data (e.g. pulse IDs): HDF5 would do more of the work, but it would still need to open & read from the separate files.

@egorsobolev
Copy link
Member

egorsobolev commented Apr 17, 2020

From my experience, the parallel reading on single node doesn't give much advantages. GPFS can transfer a single file with the full single node bandwidth. I think it is also true for a local disk. Probably it may work on PNFS, I didn't try.
Of cause I tried to read big sequential chunks. For many small pieces parallel reading may work. But we don't have so many files to read in parallel by small chunks, we do?
I would suggest to test idea first.

@philsmt philsmt closed this as completed May 5, 2020
@philsmt philsmt reopened this May 5, 2020
@philsmt
Copy link
Contributor

philsmt commented May 5, 2020

I have to disagree on the GPFS reading performance. One of the reasons for coming up with #32 is the clear benefit of doing even trivial computation per train, i.e. basically just I/O, on a single node. It doesn't scale arbitrarily well, but definitely beyond a few processes. It may be the result of parallel network requests or some other hidden GPFS behaviour distribution the load. Note that these numbers come from loading smallish to medium-sized chunks scattered over several files. From your description it sounds like several indexing steps do exactly this.

Maybe it makes sense to expose the API proposed in #32 only internally for situations just like this?

@egorsobolev
Copy link
Member

egorsobolev commented May 5, 2020

Looking at the plot in #32, I see that the time for 25 processes is about 10 times less than for 1 process. Since there is no way to read data faster than the maximum network bandwidth, should I conclude that the Infiniband is able to transfer data in one stream using only less than 1/10 of the bandwidth?

It's hard to believe. I did the test as I was suggesting. Results are at the end of post.

The test was run on node exfl038 with bandwidth 56 Gb/s (4X FDR), total reading data size is 8Gb.

In one stream, data is read at a speed of 25.9-29.1 Gb/s (in different test runs) and reaches an optimum of 43-51.6 Gb/s on 64 streams. Thus, maximum possible acceleration is about 1.75 times.

I’m not sure that the complication of EXtra-data is worth this acceleration. Perhaps if the implementation is extremely trivial.

The scaling of processing may explain the speedup in your tests. Processing scales much better even it is neglectable. I use calculation of average and variance across frames in my tests. In my tests, it scales about 1.7 times each time when doubling the processes up to 32. And since reading takes only 20% on one process, the total time scales well too.

There is only one reason for almost ideal scaling of file reading. If you use shared node or connection and have to compete for network bandwidth. A way to start arms race. ;-)

You can find my script in
/gpfs/exfel/data/user/esobolev/parallel_tests/test_one_node_parallel_reading.py

% ibstatus
Infiniband device 'mlx5_0' port 1 status:
        default gid:     fe80:0000:0000:0000:a4bf:0103:0019:be39
        base lid:        0x115
        sm lid:          0x11
        state:           4: ACTIVE
        phys state:      5: LinkUp
        rate:            56 Gb/sec (4X FDR)
        link_layer:      InfiniBand

1st run

% python test_one_node_parallel_reading.py
             rate         rate     size        time       time       time
  NP        total     per proc    total        read       comp      total

   1    27.3 Gb/s    27.3 Gb/s   8.0 GB      2.35 s     8.43 s    10.78 s
   2    3.83 Gb/s    1.96 Gb/s   8.0 GB     16.71 s     5.48 s    22.18 s
   4    31.9 Gb/s    10.1 Gb/s   8.0 GB      2.00 s     3.25 s     5.25 s
   8      28 Gb/s    4.61 Gb/s   8.0 GB      2.28 s     2.05 s     4.23 s
  16    26.4 Gb/s    2.12 Gb/s   8.0 GB      2.43 s     1.13 s     3.45 s
  32    32.6 Gb/s    1.31 Gb/s   8.0 GB      1.96 s     0.67 s     2.53 s
  64      43 Gb/s   0.835 Gb/s   8.0 GB      1.49 s     0.59 s     1.93 s
 128    49.4 Gb/s   0.499 Gb/s   8.0 GB      1.30 s     0.50 s     1.73 s
 256    44.8 Gb/s   0.245 Gb/s   8.0 GB      1.43 s     0.82 s     1.97 s

2nd run

% python test_one_node_parallel_reading.py
             rate         rate     size        time       time       time
  NP        total     per proc    total        read       comp      total

   1    25.9 Gb/s    25.9 Gb/s   8.0 GB      2.47 s     8.22 s    10.69 s
   2    26.3 Gb/s    13.7 Gb/s   8.0 GB      2.43 s     5.82 s     8.26 s
   4    31.5 Gb/s    9.84 Gb/s   8.0 GB      2.03 s     3.49 s     5.51 s
   8      28 Gb/s    4.78 Gb/s   8.0 GB      2.29 s     2.05 s     4.17 s
  16    27.7 Gb/s    2.23 Gb/s   8.0 GB      2.31 s     1.22 s     3.27 s
  32    32.6 Gb/s    1.29 Gb/s   8.0 GB      1.96 s     0.72 s     2.47 s
  64    46.9 Gb/s    0.93 Gb/s   8.0 GB      1.36 s     0.56 s     1.76 s
 128    45.4 Gb/s    0.46 Gb/s   8.0 GB      1.41 s     0.53 s     1.80 s
 256    45.7 Gb/s    0.26 Gb/s   8.0 GB      1.40 s     0.89 s     1.96 s

3rd run

% python test_one_node_parallel_reading.py
             rate         rate     size        time       time       time
  NP        total     per proc    total        read       comp      total

   1    29.1 Gb/s    29.1 Gb/s   8.0 GB      2.20 s     8.28 s    10.48 s
   2    30.6 Gb/s    19.4 Gb/s   8.0 GB      2.09 s     4.86 s     6.95 s
   4    37.5 Gb/s      12 Gb/s   8.0 GB      1.71 s     3.38 s     4.99 s
   8    42.3 Gb/s    6.65 Gb/s   8.0 GB      1.51 s     1.89 s     3.37 s
  16    38.2 Gb/s    3.14 Gb/s   8.0 GB      1.67 s     1.15 s     2.62 s
  32      49 Gb/s    1.83 Gb/s   8.0 GB      1.31 s     0.67 s     1.86 s
  64    51.6 Gb/s       1 Gb/s   8.0 GB      1.24 s     0.56 s     1.61 s
 128    47.4 Gb/s   0.519 Gb/s   8.0 GB      1.35 s     0.53 s     1.66 s
 256    50.6 Gb/s   0.276 Gb/s   8.0 GB      1.26 s     0.73 s     1.83 s

@egorsobolev
Copy link
Member

egorsobolev commented May 6, 2020

If we read small data from many files then the performance is far from network bandwidth.

Tests done on node max-exfl191. In tests, I read trainId and pulseId arrays (about 780KB, together) from every AGIPD file in 64 processes.

Perhaps negotiations longer than actual transfer, but it works.

It is strange, actually. In the previous test, I read 4 MB in one reading operation. And that was one chunk in hdf. Indexes are splitted on chunks by 128 bytes. It means thousands reading operation per file. Perhaps this is the answer. If it is the case we need think about data chunking.

You can find my script in
/gpfs/exfel/data/user/esobolev/parallel_tests/test_indexes_parallel_reading.py

  np   speedup     walltm   opentm     rdtm    nettm   n files       rd size       rd rate
  64      36.4       0.85     1.27    29.56    30.83       224      0.161 GB    1.522 Gb/s

  np   speedup     walltm   opentm     rdtm    nettm   n files       rd size       rd rate
  64      53.9       4.07    18.68   200.66   219.34       320      0.242 GB    0.476 Gb/s

  np   speedup     walltm   opentm     rdtm    nettm   n files       rd size       rd rate
  64      20.2       0.74     2.11    12.72    14.83        48      0.024 GB    0.261 Gb/s

@takluyver
Copy link
Member Author

The latest benchmark (reading small data) is similar to what @tmichela found in #30 - a substantial speedup is possible with more processes, even if it's not necessarily linear. This is a massive improvement for extra-data-validate in particular.

I agree about the pathological chunking - train IDs in raw data are written with one number per HDF5 chunk (although a cluster of 32 chunks seems to get written together). In the case I looked at, fixing this could make it about 10x faster to get the metadata from a non-cached file on GPFS. We raised it with ITDM, but apparently it's not trivial to make the file writing code handle it nicely.

[Aside: this almost makes me envy facilities which record to a custom format and then convert the data to HDF5 files afterwards. Writing directly to HDF5 means the file format is a compromise between what makes sense for writing and what makes sense for reading, and of course all the details are determined by people who care a lot about writing it.]

Anyway, if the speedup for reading 'real' data isn't so big, maybe this idea isn't such a good one. Or maybe it makes sense only for index-type information rather than big data. Or perhaps we should extend our cache to cover that.

@egorsobolev
Copy link
Member

egorsobolev commented May 6, 2020

Why is a proper chunking difficult for writing? Just define chunks in proper size and than write data as it convenient. hdf will care about chunking, will not?

@takluyver
Copy link
Member Author

I don't want to say too much about a system I'm not familiar with, but I think that if they just change the chunk size setting, you end up with 0s at the end of the data. This was the case in old data, and EXtra-data handles it, but it's something they want to avoid. This is not something that HDF5 forces on us, but the net result of various internal API layers involved in writing the files - something grows the files by one chunk at a time, and doesn't truncate them if it stops mid-chunk.

@egorsobolev
Copy link
Member

egorsobolev commented May 6, 2020

Actually, I though that you even will not see those 0s at the end. The dataset should contain proper shape excluding meaningless 0s. And additional 0s just take a space. Half-filled chunks in last file in sequence is reasonable price for normal reading performance as for me.

@takluyver
Copy link
Member Author

That's how HDF5 should work, but the internal XFEL APIs that write the file don't tell HDF5 the proper shape. They make the dataset a multiple of the chunk size, so the meaningless 0s are exposed to people reading the files. That got solved by setting the chunk size to 1.

I'm not saying this is a good reason. I'm frustrated by it, and I hope it gets fixed soon. I'm just highlighting that we already asked for this to be fixed, and ran into this problem when we did.

@egorsobolev
Copy link
Member

I understand. I just would like to know the current situation and arguments. I'm asking you because you know more than me. Thanks for explanation.

@takluyver
Copy link
Member Author

This topic just came up again. Just loading JUNGFRAU data and doing nothing with it, we can get something like a 6x speedup by splitting it up and loading parts in parallel. Below are results from a modified version of Yohei's test script, loading from /gpfs/exfel/exp/FXE/202102/p002536/proc/r0072 (10 JUNGFRAU sequence files).

I'm wondering about adding a parameter to .ndarray() accepting a multiprocessing Pool, to load data into a shared memory array if it's used. This would be similar to an option we added to EXtra-geom (European-XFEL/EXtra-geom#17). I'm not sure whether we'd just distribute the data chunks (i.e. files, when all data is selected) to different workers, or split the data up so multiple workers could be reading from the same file.

##### READ ALL THE DATA (NDARRAY)#####
     >>> FINISHED: TIME:4.09 #####

##### READ ALL THE DATA (MULTIPROCESSING:4 CORES)#####
     >>> FINISHED: TIME:1.12 #####

##### READ ALL THE DATA (MULTIPROCESSING:8 CORES)#####
     >>> FINISHED: TIME:0.63 #####

##### READ ALL THE DATA (MULTIPROCESSING:12 CORES)#####
     >>> FINISHED: TIME:0.54 #####

##### READ ALL THE DATA (MULTIPROCESSING:16 CORES)#####
     >>> FINISHED: TIME:0.65 #####

##### READ ALL THE DATA (MULTIPROCESSING:20 CORES)#####
     >>> FINISHED: TIME:0.76 #####

##### READ ALL THE DATA (MULTIPROCESSING:24 CORES)#####
     >>> FINISHED: TIME:0.57 #####

##### READ ALL THE DATA (MULTIPROCESSING:48 CORES)#####
     >>> FINISHED: TIME:0.65 #####

Timing script
import glob, os, time
import multiprocessing as mp

from extra_data import open_run, RunDirectory


jf_mod_src = 'FXE_XAD_JF1M/DET/JNGFR02:daqOutput'
JungFrau = 'JNGFR02'
expdir = '/gpfs/exfel/exp/FXE/202102/p002536'
runnumber = 'r0072'
run = RunDirectory(expdir+'/proc/'+runnumber)

def sumRuns(run_part):
    run_part[jf_mod_src,'data.adc'].ndarray()

if __name__ =='__main__':
    print ('##### READ ALL THE DATA (NDARRAY)#####')
    start = time.time()
    A = run[jf_mod_src,'data.adc'].ndarray()
    print ('     >>> FINISHED: TIME:'+'{:4.2f}'.format(time.time()-start)+' #####\n')

    for N in [4, 8, 12, 16, 20, 24, 48]:
        print ('##### READ ALL THE DATA (MULTIPROCESSING:'+str(N)+ ' CORES)#####')
        sel = run.select([(jf_mod_src, '*')])
        A = sel.split_trains(N)

        start = time.time()
        with mp.Pool(N) as pool:
            Aout = pool.map(sumRuns, A)
        print ('     >>> FINISHED: TIME:'+'{:4.2f}'.format(time.time()-start)+' #####\n')

@takluyver takluyver linked a pull request Sep 8, 2022 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants