How to write faster from Python to Neo4j with OpenMpi

How to write faster from Python to Neo4j with OpenMpi

  • @Tsartsaris

    Posted on 2015-07-28

    Using Neo4j from the Python programming language is a bit of a headache when it comes to speed. Even thought the Neo4j database provides the means for better perfomance, Python as an interpreted language is not fast enough. Hopefully the OpenMpi message passing protocol comes in handy, and using it from Python is as simple as using the mpi4py library. 

    In our case for the test using OpenMpi, or should I better say MPICH2 on Ubuntu we will try to write down 140.000 records in a loop from Python to Neo4j. We will not use any of the so many great libraries existing out there, to connect and write to the database. We will write our own code using urllib3 exploiting the transactional end point of the Rest API. Here is the script in it's most simple form. 

    from urllib3 import HTTPConnectionPool
    import time
    import json
    from urlparse import urlparse
    
    time1 = time.time()
    
    
    headers = {'Content-type': 'application/json',
    	'Accept': 'application/json; charset=UTF-8'}
    
    
    params = json.dumps({
    			"statements" : []
    			})
    
    
    new_query = "CREATE (n:Person {props}) RETURN n"
    
    params = json.dumps({
    			"statements" : [ {
    			"statement" : ""+new_query+"",
    			 "parameters" : {
          "props" : {
            "name" : "Node"
          }
        }
    			} ]
    			})
    
    
    pool = HTTPConnectionPool('192.168.1.3',port=7474, maxsize=4)
    r = pool.urlopen('POST', '/db/data/transaction',headers=headers,body=params)
    print type(pool)
    response_array = r.data.split("\"")
    transaction_end_point = response_array[3]
    stransaction_end_point = transaction_end_point[0:-7]
    o = urlparse(stransaction_end_point)
    endpoint =  o[2]
    oo = urlparse(transaction_end_point)
    commitpoint = oo[2]
    
    for i in xrange(140000):
    	pool.urlopen('POST', o[2], headers=headers, body=params.replace('Node', str(i)))
    
    pool.urlopen('POST', commitpoint, headers=headers, body=params)
    
    time2 = time.time()
    print time2-time1
    

    Running this script takes about 100 secs to finish. Even in a simple running script the urllib3 with the connection pool performs better giving a nice speed up. 

    99.5348901749
    [Finished in 99.6s]

    Let's transform now the script to work with the MPICH2 and mpi4py. I have 4 machines from which the 3 of them have 4 cores at 2.8 Gs and the last one with 2 cores at 2.8 Gs all equally balanced. To work with mpi4py we have to install it on every machine and have the script populated on all of them in the exact same folder. Also we will need a machine file to instract the mpi for our machines and slots available. The form of our machinefile will be 

    192.168.1.3 slots=4
    192.168.1.4 slots=4
    192.168.1.5 slots=4
    192.168.1.6 slots=2

    To run the script we type in the terminal 

    mpirun.openmpi -np 14 -machinefile machinefile --byslot python pyneompi.py

    With this we tell the OpenMpi to run the python script using 14 processes reading the machinefile to distribute to our machines ips. Here is the script.

    from urllib3 import HTTPConnectionPool
    import json
    import time
    from urlparse import urlparse
    from mpi4py import MPI
    
    time1 = time.time()
    
    comm = MPI.COMM_WORLD
    
    rank = comm.rank
    
    
    headers = {'Content-type': 'application/json',
    	'Accept': 'application/json; charset=UTF-8'}
    
    empty_params = json.dumps({
    			"statements" : []
    			})
    
    
    new_query = "CREATE (n:Person:Rank"+str(rank)+" {props}) RETURN n"
    
    params = json.dumps({
    			"statements" : [ {
    			"statement" : ""+new_query+"",
    			 "parameters" : {
          "props" : {
            "name" : "Node"
          }
        }
    			} ]
    			})
    
    chunks = lambda l, n: [l[x: x+n] for x in xrange(0, len(l), n)]
    l=range(140000)
    
    
    for each in range(14):
    	if rank==each:
    		pool = HTTPConnectionPool('192.168.1.3',port=7474, maxsize=14)
    		r = pool.urlopen('POST', '/db/data/transaction',headers=headers,body=empty_params)
    		print type(pool)
    		response_array = r.data.split("\"")
    		transaction_end_point = response_array[3]
    		stransaction_end_point = transaction_end_point[0:-7]
    		o = urlparse(stransaction_end_point)
    		endpoint =  o[2]
    		oo = urlparse(transaction_end_point)
    		commitpoint = oo[2]
    		for i in chunks(l, 10000)[rank]:
    			r = pool.urlopen('POST', endpoint, headers=headers, body=params.replace('Node', str(i)), retries=300)
    		pool.urlopen('POST', commitpoint, headers=headers, body=empty_params)
    
    
    time2 = time.time()
    print time2-time1
    
    

    And the output is 

    9.8762629032
    10.3321390152
    10.3471448421
    10.3504700661
    11.5176978111
    11.9181101322
    11.9384191036
    11.9578149319
    11.9679019451
    12.9604918957
    12.9734508991
    13.2258069515
    13.2225809097
    13.2297999859

    Clearly we have an enormus speedup. Key points here are that each process creates each own transaction and connection pool. This is a bit of frustrating though but if we try to exploit from different processes the same trasnaction endpoint we come up with a lot of lost records and 404s. Notice that for each slot we write down the rank of the procces as a Label in the record. This is for the next step to read faster and increase performance. 

     

Tag-cloud

webNeo4Jphpd3jsubuntuworkcypherinternetbootstrapdevelopmentflaskpython

Social Me!

Twitter Logo LinkedIn Logo Google+ Logo Tumblr Logo