cancel
Showing results for 
Search instead for 
Did you mean: 

Query Routing/ Load Balancing / asynchronous callbacks

simon_watson_sj
Contributor

Hi all,

 
I've had a frustrating experience attempting to set up a load balancer and query router on a collection of q processes both started independently and with developer. I wondered if anyone can see the schoolboy error or has any insight to share.
 
I've tried:
 
1) using the base load balancing server script:
 
Process:
start 4 q processes on ports 5001 to 5004
start a 5th process with this script loaded on start.
try loading an HDB and querying directly on that 5th process or by sending queries to that 5th process with a Developer session set up as a client.
I get nothing back
 
2) using the full query router
 
Process:
start up 3 processes on ports 5001 to 5003 with gateway, loadBalancer and service scripts loaded to them (the start order i use is load balancer, then service, then gateway).  The only changes I made to the scripts was to add the text from the white paper to aid my understanding and to define a random number generated to initialise the SEQ variable in the gateway script. The service script actually just creates dummy tables but I wanted to make as few changes as possible to get the POC running.
 
In any case, when I try connecting to gateway with Developer as client. I have no success. I have added the scripts in the post below as I can't attach more than one file at a time and it appears the system is clever enough to thwart my attempt to get around the restriction on attaching zips.
From an earlier post to the Community, I understand that Developer doesn't actually support the use of peach with separate processes rather than threads. This has me thinking that one of the below is true:
  • I am screwing up the code implementation
  • I am screwing up the code implementation AND can't use developer with these multiprocess architectures
  • the code as shown in the above links contains errors
  • There is a bug in the q version available separate to the platform offering that means you can't do asynchronous callbacks on either Developer or a raw q process.

I hope minds greater than my own might be able to cast light on which of these 4 options is true and what the solution might be. In summary, at present I am yet to successfully execute an asynchronous callback. 

Regards,
 
Simon
 

Gateway

////////////////////////
// Gateway
////////////////////////
// https://code.kx.com/q/wp/query-routing/#
////////////////////////
// loading order: 
// * load balancer: p 5001
// * service: p 5002
// * gateway: p 5003
// * client
////////////////////////


// When a connection is opened to the Load Balancer, the handle is set to the variable LB, which will be referenced throughout this paper. 
// As asynchronous messages are used throughout this framework, we also create the variable NLB, 
// which is assigned with the negative handle to the load balancer.

// \p 5555

\p 5003

manageConn:{@[{NLB::neg LB::hopen x};`:localhost:5001;{show x}]}; 
registerGWFunc:{addResource LB(`registerGW;`)};

// The gateway connects to the Load Balancer and retrieves the addresses of all service resources, establishing a connection to each. 
// This is the only time the gateway uses synchronous IPC communication to ensure it has all of the details it requires before accepting 
// user queries. After the gateway registers itself as a subscriber for any new resources that come available, all future communication is 
// sent via asynchronous messages.

resources:([address:()] source:();sh:());

addResource:{
  `resources upsert `address xkey update 
    sh:{hopen first x}each address from x };

// The gateway process creates and maintains an empty query table. The complexity of this table is at the developer’s discretion. In this example we’ll record:

// * Unique sequence number per query (sq)
// * Handle from user process (uh)
// * Timestamps for when the query was received, when the query got sent to an available resource, and when the query results are sent back 
// to the user (rec, snt, ret respectively)
// * The user ID (user)
// * The service handle (sh)
// * The service requested by user (serv)
// * The user’s query

queryTable:([sq:`int$()];
  uh:`int$();
  rec:`timestamp$();
  snt:`timestamp$();
  ret:`timestamp$();
  usr:`$();
  sh:`int$();
  serv:`$();
  query:() );


// This table could be extended to include more information by making small changes to the code in this paper. 
// These fields could include the status of a query, error messages received from service or the total time a query took from start to end.

// As mentioned previously, users make requests by calling the userQuery function on the gateway. This function takes a two-item list argument: (Service;Query). 
// The gateway will validate the existence of a service matching the name passed to userQuery and send an error if no such resource exists. 
// We are setting outside the scope of this paper any further request validation, including access permissioning.

// For further details on access control, please refer to the technical white paper "Permissions with kdb+".

// When a user sends her query via the userQuery function, we assign the query a unique sequence number and publish an asynchronous request to the 
// Load Balancer to be assigned an available resource.


// initialise the query id generator.
SEQ: first 1?0;

userQuery:{
  $[(serv:x 0) in exec distinct source from resources; // valid service?
    [queryTable,:(SEQ+:1;.z.w;.z.p;0Np;0Np;.z.u;0N;serv;x 1); 
      NLB(`requestService;SEQ;serv)];
    (neg .z.w)(`$"Service Unavailable")] };

// The addResource function defined earlier is used to add new service instances to the plant, while the serviceAlloc function is used to 
// pass back an allocated resource for a given query sequence number. The query is retrieved by its sequence number from queryTable and 
// sent to the allocated service resource. If the user has since disconnected from the gateway before a resource could be provided, the gateway 
// informs the Load Balancer to make this resource free again by executing the returnService function in the Load Balancer. After each event, 
// the timestamp fields are updated within the queryTable.

serviceAlloc:{[sq;addr]
  $[null queryTable[sq;`uh];
  // Check if user is still waiting on results
    NLB(`returnService;sq);
  // Service no longer required
    [(neg sh:resources[addr;`sh]) (`queryService;(sq;queryTable[sq;`query]));
  // Send query to allocated resource, update queryTable
      queryTable[sq;`snt`sh]:(.z.p;sh)]] };

// When a service returns results to the gateway, the results arrive tagged with the same sequence number sent in the original query. This 
// incoming message packet executes the returnRes function, which uses the sequence number to identify the user handle and return the results. 
// If the user has disconnected before the results can be returned then the user handle field uh will be set to null (through the .z.pc trigger) 
// causing nothing further to be done.

returnRes:{[res]
  uh:first exec uh from queryTable where sq=(res 0); 
  // (res 0) is the sequence number
  if[not null uh;(neg uh)(res 1)];
  // (res 1) is the result
  queryTable[(res 0);`ret]:.z.p };


// In the situation where a process disconnects from the gateway, .z.pc establishes what actions to take. As mentioned, a disconnected user 
// will cause queryTable to be updated with a null user handle. If the user currently has no outstanding queries, the gateway has nothing to 
// do. If a service disconnects from the gateway whilst processing an outstanding user request, then all users that have outstanding 
// requests to this database are informed and the database is purged from the available resources table.

// If our Load Balancer connection has dropped, all users with queued queries will be informed. All connections are disconnected and purged 
// from the resources table. This ensures that all new queries will be returned directly to users as the Load Balancer is unavailable to 
// respond to their request. A timer is set to attempt to reconnect to the Load Balancer. On reconnection, the gateway will re-register 
// itself, pull all available resources and establish new connections. The .z.ts trigger is executed once, on script startup, 
// to initialize and register the process.

.z.pc:{[handle]
  // if handle is for a user process, set the query handle (uh) as null
  update uh:0N from `queryTable where uh=handle;
  // if handle is for a resource process, remove from resources
    delete from `resources where sh=handle;
  // if any user query is currently being processed on the service which 
  // disconnected, send message to user
  if[count sq:exec distinct sq from queryTable where sh=handle,null ret;
    returnRes'[sq cross `$"Service Disconnect"]]; 
  if[handle~LB; // if handle is Load Balancer
    // Send message to each connected user, which has not received results
    (neg exec uh from queryTable where not null uh,null snt)@\: 
      `$"Service Unavailable";
    // Close handle to all resources and clear resources table
    hclose each (0!resources)`sh;
    delete from `resources;
    // update queryTable to close outstanding user queries
    update snt:.z.p,ret:.z.p from `queryTable where not null uh,null snt; 
    // reset LB handle and set timer of 10 seconds
    // to try and reconnect to Load Balancer process
    LB::0; NLB::0; value"\\t 10000"] };

.z.ts:{
  manageConn[]; 
  if[0<LB;@[registerGWFunc;`;{show x}];value"\\t 0"] };

.z.ts[];
 
 

Load Balancer

////////////////////////
// LoadBalancer
////////////////////////
// https://code.kx.com/q/wp/query-routing/#
////////////////////////
// loading order: 
// * load balancer: p 5001
// * service: p 5002
// * gateway: p 5003
// * client
////////////////////////


// Within our Load Balancer there are two tables and a list:

// \p 1234

\p 5001

services:([handle:`int$()]
  address:`$();
  source:`$();
  gwHandle:`int$();
  sq:`int$();
  udt:`timestamp$() );

serviceQueue:([gwHandle:`int$();sq:`int$()]
  source:`$();
  time:`timestamp$() );

gateways:();

// The service table maintains all available instances/resources of services registered and the gateways currently using each service resource. 
// The serviceQueue maintains a list of requests waiting on resources. A list is also maintained, called gateways, which contains all gateway handles.

// Gateways connecting to the Load Balancer add their handle to the gateways list. New service resources add their connection details to the services table. 
// When a service resource registers itself using the registerResource function, the Load Balancer informs all registered gateways of the newly available 
// resource. The next outstanding query within the serviceQueue table is allocated immediately to this new resource.

registerGW:{gateways,:.z.w ; select source, address from services};

registerResource:{[name;addr]
  `services upsert (.z.w;addr;name;0N;0N;.z.p);
  (neg gateways)@\:(`addResource;enlist`source`address!(name;addr)); 
  // Sends resource information to all registered gateway handles 
  serviceAvailable[.z.w;name] };

// Incoming requests for service allocation arrive with a corresponding sequence number. The combination of gateway handle and sequence number will 
// always be unique. The requestService function either provides a service to the gateway or adds the request to the serviceQueue. When a resource is allocated 
// to a user query, the resource address is returned to the gateway along with the query sequence number that made the initial request.

sendService:{[gw;h]neg[gw]raze(`serviceAlloc;services[h;`sq`address])};
// Returns query sequence number and resource address to gateway handle

requestService:{[seq;serv]
  res:exec first handle from services where source=serv,null gwHandle; 
  // Check if any idle service resources are available
  $[null res;
    addRequestToQueue[seq;serv;.z.w]; 
    [services[res;`gwHandle`sq`udt]:(.z.w;seq;.z.p);
      sendService[.z.w;res]]] };

// If all matching resources are busy, then the gateway handle + sequence number combination is appended to the serviceQueue table along with the service required.

addRequestToQueue:{[seq;serv;gw]`serviceQueue upsert (gw;seq;serv;.z.p)};

// After a service resource has finished processing a request, it sends an asynchronous message to the Load Balancer, executing the returnService function. 
// As mentioned previously, if the user disconnects from the gateway prior to being allocated a service resource, the gateway also calls this function. 
// The incoming handle differentiates between these two situations.

returnService:{
  serviceAvailable . $[.z.w in (0!services)`handle;
    (.z.w;x);
    value first select handle,source from services 
      where gwHandle=.z.w,sq=x ] };

// On execution of the serviceAvailable function, the Load Balancer will either mark this resource as free, or allocate the resource to the next gateway + 
// sequence number combination that has requested this service, updating the services and serviceQueue tables accordingly.

serviceAvailable:{[zw;serv]
  nxt:first n:select gwHandle,sq from serviceQueue where source=serv; 
  serviceQueue::(1#n)_ serviceQueue;
  // Take first request for service and remove from queue 
  services[zw;`gwHandle`sq`udt]:(nxt`gwHandle;nxt`sq;.z.p);
  if[count n; sendService[nxt`gwHandle;zw]] };


// Any resource that disconnects from the Load Balancer is removed from the services table. If a gateway has disconnected, 
// it is removed from the resource subscriber list gateways and all queued queries for any resources must also be removed, 
// and the resource freed up for other gateways. Unlike other components in this framework, the Load Balancer does not 
// attempt to reconnect to processes, as they may have permanently been removed from the service pool of resources. In a 
// dynamically adjustable system, service resources could be added and removed on demand based on the size of the 
// serviceQueue table.

.z.pc:{[h]
  services _:h;
  gateways::gateways except h;
  delete from `serviceQueue where gwHandle=h;
  update gwHandle:0N from `services where gwHandle=h };


// If a gateway dies, data services will continue to run queries that have already been routed to them, 
// which will not subsequently be returned to the client. It is also possible that the next query assigned to this 
// resource may experience a delay as the previous query is still being evaluated. As mentioned later, 
// all resources should begin with a timeout function to limit interruption of service.

 

Service

////////////////////////
// Service
////////////////////////
// https://code.kx.com/q/wp/query-routing/#
////////////////////////
// loading order: 
// * load balancer: p 5001
// * service: p 5002
// * gateway: p 5003
// * client
////////////////////////


// The example below takes a simple in-memory database containing trade and quote data that users can query. 
// An example timeout of ten seconds is assigned, to prevent queries running for too long.

\T 10
\p 5002

LB:0

egQuote:([]
  date:10#.z.D-1;
  sym:10#`FDP;
  time:09:30t+00:30t*til 10;
  bid:100.+0.01*til 10;
  ask:101.+0.01*til 10 );

egTrade:([]
  date:10#.z.D-1;
  sym:10#`FDP;
  time:09:30t+00:30t*til 10;
  price:100.+0.01*til 10;
  size:10#100 );


// Each instance of a service uses the same service name. Within this example, the service name is hard-coded, but this would ideally be set 
// via a command line parameter. In our example below, our service name is set to `EQUITY_MARKET_RDB. In designing a user-friendly system, 
// service names should be carefully set to clearly describe a service’s purpose. Similar processes (with either a different port number or 
// running on a different host) can be started up with this service name, increasing the pool of resources available to users.


// The serviceDetails function is executed on connection to the Load Balancer to register each service address.

manageConn:{@[{NLB::neg LB::hopen x}; `:localhost:5001;
  {show "Can't connect to Load Balancer-> ",x}] };

serviceName:`EQUITY_MARKET_RDB;

serviceDetails:(`registerResource; 
  serviceName;
  `$":" sv string (();.z.h;system"p") );

// When a gateway sends the service a request via the queryService function, a unique sequence number assigned by a given gateway arrives as 
// the first component of the incoming asynchronous message. The second component, the query itself, is then evaluated. The results of this query 
// is stamped with the same original sequence number and returned to the gateway handle.

// As mentioned previously, query interpretation/validation on the gateway side is outside of the scope of this paper. 
// Any errors that occur due to malformed queries will be returned via protected evaluation from database back to the user. 
// In the situation where the process query times out, 'stop will be returned to the user via the projection errProj.

// On completion of a request, an asynchronous message is sent to the Load Balancer informing it that the service is now available for the next request.

execRequest:{[nh;rq]nh(`returnRes;(rq 0;@[value;rq 1;{x}]));nh[]};

queryService:{ 
  errProj:{[nh;sq;er]nh(sq;`$er);nh[]}; 
  @[execRequest[neg .z.w];x;errProj[neg .z.w;x 0]]; 
  NLB(`returnService;serviceName) };

// Note that in the execRequest function, nh is the asynchronous handle to the gateway. Calling nh[] after sending the result causes the outgoing 
// message queue for this handle to be flushed immediately.

// Like our gateway, the .z.pc handle is set to reconnect to the Load Balancer on disconnect. The .z.ts function retries to connect to the Load Balancer, 
// and once successful the service registers its details. The .z.ts function is executed once on start-up – like the gateway – to initialize the first connection.

.z.ts:{manageConn[];if[0<LB;@[NLB;serviceDetails;{show x}];value"\\t 0"]};
.z.pc:{[handle]if[handle~LB;LB::0;value"\\t 10000"]};
.z.ts[];
6 REPLIES 6

davidcrossey
Moderator Moderator
Moderator

Hi Simon,

Thanks for sharing your query.

Just curious if you have you tried either of these approaches outside of Developer on raw q processes to run as async callbacks?

Regards,

David

Hi David,

Thank you for your speedy response! I have tried the load balancer on basic q processes.

I did the following:

copy the following files to QHOME:

  • loadBalancer.q
  • service.q
  • gateway.q

Next, I defined function qmserve in .bashrc  as follows:

qmserve(){
q loadBalancer.q -p 5001 &
q service.q -p 5002 &
q gateway.q -p 5003 &
q -p 5004 &
q -p 5005
}

and ran

source .bashrc 

from the command line in the home directory to load up the function.

I then executed the function qmserve and noted 5 processes starting (see attached screenshot).

at q prompt did this:

gw:{h:hopen x;{(neg x)(`userQuery;y);x[]}[h]}[`:localhost:5003]

followed by this:

gw(`EQUITY_MARKET_RDB;"select from trade where date=max date")

The result was a hanging cursor.

Looking in the service.q code, what does LB do when it is used in manageConn?

My thoughts for troubleshooting was that I should load up the processes separately on multiple terminals as using '&' suppresses error returns as I'm sure you will know. An alternative view though is that my final process doesn't have a '&' so should return errors normally.

Keen for any thoughts you might have even if it's on the next steps for my troubleshooting. I wondered if it was worth loading up the scripts in a q startup process wrapped in my qmserve function but then start up my client q process directly on the command line. I don't think the function wrapper in the bash script should change things but it never hurts to try. 

Simon

 

I think the issue is that you are blocking on the client as your last command

x[]

Interprocess communication | Basics | kdb+ and q documentation - Kdb+ and q documentation (kx.com)

They may be no message being sent back from the client, especially true if is an issue on the server, or if you don't have a callback on the calling handle.

Perhaps you meant to async flush?

neg[x][]

Might be worth while putting a nohup on the front of your '&' commands, and I'd also suggest opening you handle as the projection, instead of opening per call to GW. You could add a check in your funct to check if the handle is valid and reopen as needed?

Just an note - you could save that function in any file, and source; doesn't have to be .bashrc unless that's your intention.

Thanks Dave,

I still get no luck - I should say that if I open a port to 5003 directly using 

`h_gw set hopen 5003;

I can get it to do say "5+6" (returning 11) or "til 5" (returning 1 2 3 4).

As you advised, I restructured the gateway function:

gw:{(neg `h_gw[])(`userQuery;x);`h_gw[]""}

I now get an error back from

gw(`EQUITY_MARKET_RDB;"select from trade where date=max date")

where it doesn't seem to recognize the table. It does return properly now though. I will add log functions to each component on the gw and see what gets done when. Also I should say, I took all the code directly from the query routing white paper. Perhaps I'm being too flippant with my cut and paste - either way, this process will get the skills built. I will report back with news from the logs.

Simon

Hey David - I wanted to follow up on this. I have recently built myself a fancypants new PC. I'm keen to squeeze as much as possible out of it so I went for a bare metal environment rather than in containers. I have just tried loading up the load balancer example using mserve.q and it seems to be working fine.

I'll update when I try the full query router example and let you know if there are still issues. I would like to get it going in containers - I wonder if the docker approach would mean using one process per container. I will advise as I discover!!!

Cheers,

Simon

Hey David - updates!

I think the Query Router white paper might have a couple of quirks that are me causing issues. (Although at the moment I feel like the guy in the 2000's who blames Bill Gates when he stuffs up is VB code)

I think the main issue is this:

returnRes:{[res]
  uh:first exec uh from queryTable where sq=(res 0); 
  // (res 0) is the sequence number
  if[not null uh;(neg uh)(res 1)];
  // (res 1) is the result
  queryTable[(res 0);`ret]:.z.p }

In particular:

(neg uh)(res 1)

I think res causes a `Type failure because it needs to be a string. I've tried:

(neg uh) .Q.s1 (res 1)

but that doesn't return anything that shows up anywhere. Interestingly, if I do something like (neg uh)"foo:123";(neg uh)"";

from the gateway, then I do get a variable called foo with that value on the client.

My next attempt will be to either adjust the query process so it assigns return data to a variable or go through the async paper again and see what the standard method is.

I will let you know how I go.

Simon