cancel
Showing results for 
Search instead for 
Did you mean: 

how to handle results from async calls to AquaQ TorQ gateway, on

mccormjo
New Contributor
//Deferred Synchronous
gate: `::6007:admin:admin
g:hopen gate;

sync1:{(neg g)(`.gw.asyncexecjpt;("select from trade"); `hdb;raze;();0D00:01:00.000001);g(::)};
sync2:{(neg g)(`.gw.asyncexecjpt;("select sum size by sym from trade"); `hdb;raze;();0D00:01:00.000000);g(::)};
//sync3:{(neg g)(`.gw.asyncexecjpt;("-1000000#select from quote where date = 2015.08.25"); `hdb;raze;();0D00:01:00.000000);g(::)};
sync3:{(neg g)(`.gw.asyncexecjpt;("select from quote where date = 2015.08.25"); `hdb;raze;();0D00:01:00.000000);g(::)};

res1: sync1[];
res2: sync2[];
res3: sync3[];
show count res1;
show count res2;
show count res3;

hclose g;
// This works as expected ... giving:
q)\l runSync.q
4538005
10
1439122
q)


Now I'd like to do the same queries, fully asynchronous.
//Asynchronous
gate: `::6007:admin:admin
g:hopen gate;

//handleresults1:{-1(string .z.z)," got results1";`got1 set 1b; -3!x; `res1 set y};
//handleresults2:{-1(string .z.z)," got results2";`got2 set 1b; -3!x; `res2 set y};
//handleresults3:{-1(string .z.z)," got results3";`got3 set 1b; -3!x; `res3 set y};
handleresults1:{-1(string .z.z)," got results1"; got1::1b; -3!x; res1::y};
handleresults2:{-1(string .z.z)," got results2"; got2::1b; -3!x; res2::y};
handleresults3:{-1(string .z.z)," got results3"; got3::1b; -3!x; res3::y};

async1:{(neg g)(`.gw.asyncexecjpt;("select from trade"); `hdb;raze;handleresults1;0D00:01:00.000000)};
async2:{(neg g)(`.gw.asyncexecjpt;("select sum size by sym from trade"); `hdb;raze;handleresults2;0D00:01:00.000000)};
async3:{(neg g)(`.gw.asyncexecjpt;("-1000000#select from quote where date = 2015.08.25"); `hdb;raze;handleresults3;0D00:01:00.000000)};

f:{system $[.z.o like "w*";"timeout ";"sleep "],string x};

async1[];
async2[];
async3[];
 
//stopwhile: 0b;
//while[not stopwhile; $[1b~got1; stopwhile:1b; f[1] ] ];
//show res1;

//stopwhile: 0b;
//while[not stopwhile; $[1b~got2; stopwhile:1b; f[1] ] ];
//show res2;

//stopwhile: 0b;
//while[not stopwhile; $[1b~got3; stopwhile:1b; f[1] ] ];
//show res3;

//hclose g;
// It works as expected ... giving.  It fires of the queries, returns a q prompt immediately, and fairly quickly after the "got results" for each query
q)\l runAsync.q
q)2015.09.27T15:05:08.947 got results1
2015.09.27T15:05:08.947 got results2
2015.09.27T15:05:09.026 got results3

All good to here.



BUT, after sending the 3 queries, I'D LIKE TO WAIT FOR THE RESULTS, MAKE SURE I HAVE THEM IN res1, res2, res3, AND THEN continue the code (ie using the result sets ...)
I've delcared got1:0b; got2:0b; got3:0b; res1:(); res2:(); res3(); in my console before I call this code.
Uncommenting the while loop lines, my idea was to loop until the global variable "got1", "got2", "got3" are updated in when handleresults1, 2  & 3 return.
But the change in got1, got2, got3 are not being picked up in the while loop and so it is looping forever.  
Only after I ctrl c to kill the command, the "got results" for each query come.
q)\l runAsync_While.q         
{system $[.z.o like "w*";"timeout ";"sleep "],string x}
'os
@
.,["\\"]
"sleep 1"
q))\
q)2015.09.27T15:17:10.758 got results1
2015.09.27T15:17:10.758 got results2
2015.09.27T15:17:10.837 got results3

Same details in 3 attached screen shots.


Question.  What is the best way to fire off a couple of asynchronous calls in the client, then have the client wait for the results of all the calls before continuing.
Thanks
John
3 REPLIES 3

TerryLynch
New Contributor II

Generally speaking, you call follow an async call with a sync chaser in order to ensure that the async call has been processed on the other side:

"To ensure an async message has been processed by the remote, follow with a sync chaser"

As per http://code.kx.com/wiki/Cookbook/IPCInANutshell

However, I'm not sure how this fits into the aquaQ gateways since I've never used them.

Terry

Hi John

I think for what you are asking you need to set up the calls such that when the result comes back async, each handler checks if all the other results have arrived, and calls the continuation code when all are in. 

The wdb process in TorQ does something similar with the end of day operation - it calls the reload function in each process async, then makes them all callback and wait until all are done before releasing them:

/- add entries to dictionary of callbacks. if timeout has expired or d now contains all expected rows then it releases each waiting processhandler:{	.wdb.d[.z.w]:x;	if[(.proc.cp[]>.wdb.timeouttime) or count[.wdb.d]=.wdb.countreload;		.lg.o[`handler;"releasing processes"];		.wdb.flushend[];		.wdb.d:()!()];	}
Note that to manage failures you need to time these out - in case the process you are waiting on never returns.  Something like this:  
.timer.one[.wdb.timeouttime:.proc.cp[]+.wdb.eodwaittime;(value;".wdb.flushend[]");"release all hdbs and rdbs as timer has expired";0b];
Thanks 

Jonny


Thanks Jonny, much appreciated.
Wish I could banish my embarrassing while looping effort from earlier this post!  Was going at it the wrong way.

Below is an initial first cut of the right way.  I'll continue from here to tidy it into one generic handler function, and include the timeout check.
Thanks again,
Regards,
John.

//Aynchronous
gate: `::6007:admin:admin
g:hopen gate;

querycount:3;
d:()!();
handleresults1:{-1(string .z.z)," got results1"; -3!x; d[`res1]:y; if[(count[d]=querycount);keepgoing[]];};
handleresults2:{-1(string .z.z)," got results2"; -3!x; d[`res2]:y; if[(count[d]=querycount);keepgoing[]];};
handleresults3:{-1(string .z.z)," got results3"; -3!x; d[`res3]:y; if[(count[d]=querycount);keepgoing[]];};

async1:{(neg g)(`.gw.asyncexecjpt;("select from trade"); `hdb;raze;handleresults1;0D00:01:00.000000)};
async2:{(neg g)(`.gw.asyncexecjpt;("select sum size by sym from trade"); `hdb;raze;handleresults2;0D00:01:00.000000)};
async3:{(neg g)(`.gw.asyncexecjpt;("-1000000#select from quote where date = 2015.08.25"); `hdb;raze;handleresults3;0D00:01:00.000000)};

async1[];
async2[];
async3[];

keepgoing:{[]
  show count d[`res1];
  show count d[`res2];
  show count d[`res3];
  show d[`res1]
  -1" ... and continue do whatever I want with the results here .... "
  };