cancel
Showing results for 
Search instead for 
Did you mean: 

PyKX with Prefect

cleung
New Contributor

Hi There, 

 

We're trying to integrate our KDB Processes with Prefect by running PyKX via an IPC port. We are running flows/tasks via Prefect that contain PyKX libraries to establish a connection. A snippet of the code can be found below: 

import os 
import time
import subprocess
from prefect import flow, task, get_run_logger
from prefect_shell import ShellOperation
import pykx as kx 

def setup():
    token = subprocess.Popen('\\\\ccl\\data\\extlib\\KDBSecToken.exe', stdout=subprocess.PIPE).communicate()[0].decode('utf-8')
    masterConn = kx.SyncQConnection( host='v-kdbr-01', port=5000, username='cleung', password=token, timeout=3.0 ) 
    user = masterConn('.z.u') 
    print('User = ' + user.py())
    port = masterConn('getProcessClient[`prefect_testing_base;`pykx_test]')
    print('Port = ' + str(port.py())) 
    masterConn.close()
    global conn
    conn = kx.SyncQConnection( host='v-kdbr-01', port=port.py(), username='cleung', password=token) 
    user = conn('.z.u') 
    print('User = ' + user.py()) 

@flow()
def ibes_pykx():
    setup()
    logger = get_run_logger()
    logger.info("Initializing initial arguments")
    dailyDates = conn('dailyDates: .dt.drb[.dt.shiftdateb[exec max date from QModelD;-3];  exec max date from QModelD];')
    logFile = conn('inLogFile:`')
    updateIBESTickers_result = updateIBESTickers.submit()
    saveRecRevAI260_result = saveRecRevAI260.submit(wait_for=[updateIBESTickers_result])

This works fine when running it on the console (on Visual Studio Code). However, when it comes time to deploy, we keep getting this issue. A deployment on prefect essentially builds the function so it can be used on the Prefect UI. 

 

Script at './Script/ibes_pykx.py' encountered an exception: ValueError('signal only works in main thread')

Were not sure what to do as it seems like an OS level issue. So far, I am confident that it is the 

import pykx as kx 

line that is messing it up. However, Im unsure as to what the best approach is to solve this. 

Thanks so much for the help, I know it's a pretty niche area and any advice would be GREATLY appreciated 🙂 

1 REPLY 1

rocuinneagain
Valued Contributor
Valued Contributor

I have tested and 1.6.0 will allow the import to continue after this error.

 

This runs for me:

 

from prefect import flow
from prefect.deployments import Deployment
import os
os.environ["QARGS"] = "--unlicensed"

import pykx as kx

def setup():
    print("testing pykx")
    masterConn = kx.SyncQConnection( host='localhost', port=5000, timeout=3.0 )
    user = masterConn('.z.u')
    print('User = ' + user.py())
    port = masterConn('5000')
    print('Port = ' + str(port.py()))
    masterConn.close()
    global conn
    conn = kx.SyncQConnection( host='localhost', port=port.py())
    user = conn('.z.u')
    print('User = ' + user.py())

@flow()
def test_pykx():
    setup()
    logger = get_run_logger()
    logger.info("Initializing initial arguments")
    dailyDates = conn('dailyDates:(.z.d-7)+til 7')
    logFile = conn('inLogFile:`')


def deploy():
    deployment = Deployment.build_from_flow(
        flow=test_pykx,
        name="prefect-example-deployment"
    )
    deployment.apply()

if __name__ == "__main__":
    deploy()

 

 If you want to run in licensed mode you need to move the import of pykx to avoid a nosocket error

 

from prefect import flow
from prefect.deployments import Deployment

def setup():
    import pykx as kx
    print("testing pykx")
    masterConn = kx.SyncQConnection( host='localhost', port=5000, timeout=3.0 )
    user = masterConn('.z.u')
    print('User = ' + user.py())
    port = masterConn('5000')
    print('Port = ' + str(port.py()))
    masterConn.close()
    global conn
    conn = kx.SyncQConnection( host='localhost', port=port.py())
    user = conn('.z.u')
    print('User = ' + user.py())

@flow()
def test_pykx():
    setup()
    logger = get_run_logger()
    logger.info("Initializing initial arguments")
    dailyDates = conn('dailyDates:(.z.d-7)+til 7')
    logFile = conn('inLogFile:`')


def deploy():
    deployment = Deployment.build_from_flow(
        flow=test_pykx,
        name="prefect-example-deployment"
    )
    deployment.apply()

if __name__ == "__main__":
    deploy()