KX Community

Find answers, ask questions, and connect with our KX Community around the world.

Home Forums PyKX PyKX IPC work with multithreading?

  • PyKX IPC work with multithreading?

    Posted by cleung on May 30, 2023 at 12:00 am

    Hi there, 

     

    I have a PyKx process that uses IPC to connect to a port, and we’re running some KDB functions that involve multithreading (cutPeach). However everytime I run it, I keep getting the following error: 

    Traceback (most recent call last):
    File "C:UserscleungAppDataRoamingPythonPython38site-packagesprefectengine.py", line 1533, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
    File "C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
    File "\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
    File "\v-tm-qeq-05Prefectlibsite-packagesanyioto_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
    File "\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
    File "\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py", line 867, in run
    result = context.run(func, *args)
    File "C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py", line 135, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
    File "//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py", line 40, in saveRetRawNaive
    conn(‘.imq.model.saveRetRawNaive[dailyDates;`]’)
    File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 806, in __call__
    return self._call(query, *args, wait=wait)
    File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 816, in _call
    return self._recv(locked=True)
    File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 511, in _recv
    res = callback(key.fileobj)
    File "\v-tm-qeq-05Prefectlibsite-packagespykxipc.py", line 535, in _recv_socket
    size = chunks[4]
    IndexError: list index out of range

     

    Any guidance would be appreciated. Thanks!

     

    rocuinneagain replied 6 minutes ago 2 Members · 6 Replies
  • 6 Replies
  • cleung

    Member
    May 31, 2023 at 12:00 am

    Hi there, 1. Yes we’re using Prefect to orchestrate PyKX 2. We’re using a @taskto query using PyKX. The connection is done outside of the flow and task (attached code snippet below) 3. Switching to flows does work, but we want some functions to execute in parallel. When we switched to all flows/subflows, it seemed like it was all executing sequentially. 4. We tried doing it in parallel outside of Prefect using from multiprocessing import Pool and it was operating fine 5. Multiple queries 6. PyKX version 1.5.4 7. Unlicensed mode 8. Using SyncQConnection and IPC to grab handle into port port = masterConn(‘getProcessClient[`prefect_pykx;`pykx_test]’) conn = kx.SyncQConnection( host=”, port=port.py(), username=’cleung’, password=token) Code Snippet: import pykx as kx import subprocess from prefect import flow, task, get_run_logger masterConn = kx.SyncQConnection( host=’v-kdbr-01′, port=5000, username=’cleung’, password=token, timeout=3.0 ) user = masterConn(‘.z.u’) print(‘User = ‘ + user.py()) port = masterConn(‘getProcessClient[`prefect_pykx;`pykx_test]’) print(‘Port = ‘ + str(port.py())) masterConn.close() conn = kx.SyncQConnection( host=’v-kdbr-01′, port=port.py(), username=’cleung’, password=token) user = conn(‘.z.u’) print(‘User = ‘ + user.py()) @task def saveRecRevAI260(): logger = get_run_logger() logger.info(“Executing saveRecRevAI260”) conn(‘.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;`reportDate]’) logger.info(“Finished saveRecRevAI260”) return 1 @task def saveEstRevAI(): logger = get_run_logger() logger.info(“Executing saveEstRevAI”) conn(‘.imq.model.saveEstRevAI[.dt.shiftstartdr[max dailyDates;neg 1];();inLogFile]’) logger.info(“Finished saveEstRevAI”) return 1 @task def saveRetRawNaive(): logger = get_run_logger() logger.info(“Executing saveRetRawNaive”) conn(‘.imq.model.saveRetRawNaive[dailyDates;`]’) logger.info(“Finished saveRetRawNaive”) return 1 @task def saveEstRevStdDev5Yr(): logger = get_run_logger() logger.info (“Executing saveEstRevStdDev5Yr”) conn(“.imq.model.saveEstRevStdDev5Yr[dailyDates;`reportDate;inLogFile]”) logger.info(“Finished saveEstRevStdDev5Yr”) return 1 @task def saveSRinteractionsEarnItems(): logger = get_run_logger() logger.info(“Executing saveSRinteractionsEarnItems”) conn(“.imq.model.saveSRInteractionsEarnItems[dailyDates;inLogFile]”) logger.info(“Finished saveSRinteractionsEarnItems”) @flow() def ibes_pykx(): logger = get_run_logger() logger.info(“Initializing initial arguments”) dailyDates = conn(‘dailyDates: .dt.drb[.dt.shiftdateb[exec max date from QModelD;-3]; exec max date from QModelD];’) logFile = conn(‘inLogFile:`’) saveRecRevAI260_result = saveRecRevAI260.submit() saveRetRawNaive_result = saveRetRawNaive.submit() saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result]) saveSRinteractionsEarnItems_result = saveSRinteractionsEarnItems(wait_for=[saveRetRawNaive_result]) # saveEstRevStdDev5Yr_result = saveEstRevStdDev5Yr(wait_for=[saveEstRevAI_result]) if __name__ == “__main__”: ibes_pykx() Error Message: 4:15:15.041 | DEBUG | Task run ‘saveRecRevAI260-0’ – Beginning execution… 14:15:15.046 | INFO | Task run ‘saveRecRevAI260-0’ – Executing saveRecRevAI260 14:15:15.108 | DEBUG | Task run ‘saveRetRawNaive-0’ – Beginning execution… 14:15:15.112 | INFO | Task run ‘saveRetRawNaive-0’ – Executing saveRetRawNaive 14:15:16.486 | DEBUG | prefect.client – Connecting to API at http://v-tm-qeq-05:4200/api/ 14:16:31.335 | ERROR | Task run ‘saveRecRevAI260-0’ – Encountered exception during execution: Traceback (most recent call last): File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectengine.py”, line 1533, in orchestrate_task_run result = await run_sync(task.fn, *args, **kwargs) File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py”, line 156, in run_sync_in_interruptible_worker_thread tg.start_soon( File “\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py”, line 662, in __aexit__ raise exceptions[0] File “\v-tm-qeq-05Prefectlibsite-packagesanyioto_thread.py”, line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File “\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py”, line 937, in run_sync_in_worker_thread return await future File “\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py”, line 867, in run result = context.run(func, *args) File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py”, line 135, in capture_worker_thread_and_result result = __fn(*args, **kwargs) File “//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py”, line 24, in saveRecRevAI260 conn(‘.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;`reportDate]’) File “\v-tm-qeq-05Prefectlibsite-packagespykxipc.py”, line 806, in __call__ return self._call(query, *args, wait=wait) File “\v-tm-qeq-05Prefectlibsite-packagespykxipc.py”, line 816, in _call return self._recv(locked=True) File “\v-tm-qeq-05Prefectlibsite-packagespykxipc.py”, line 511, in _recv res = callback(key.fileobj) File “\v-tm-qeq-05Prefectlibsite-packagespykxipc.py”, line 535, in _recv_socket size = chunks[4] IndexError: list index out of range 14:16:31.558 | ERROR | Task run ‘saveRecRevAI260-0’ – Finished in state Failed(‘Task run encountered an exception: IndexError: list index out of rangen’) 14:16:31.622 | ERROR | Flow run ‘chocolate-marmot’ – Encountered exception during execution: Traceback (most recent call last): File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectengine.py”, line 665, in orchestrate_flow_run result = await run_sync(flow_call) File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py”, line 156, in run_sync_in_interruptible_worker_thread tg.start_soon( File “\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py”, line 662, in __aexit__ raise exceptions[0] File “\v-tm-qeq-05Prefectlibsite-packagesanyioto_thread.py”, line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File “\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py”, line 937, in run_sync_in_worker_thread return await future File “\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py”, line 867, in run result = context.run(func, *args) File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py”, line 135, in capture_worker_thread_and_result result = __fn(*args, **kwargs) File “//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py”, line 69, in ibes_pykx saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result]) File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefecttasks.py”, line 469, in __call__ return enter_task_run_engine( File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectengine.py”, line 965, in enter_task_run_engine return run_async_from_worker_thread(begin_run) File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectutilitiesasyncutils.py”, line 177, in run_async_from_worker_thread return anyio.from_thread.run(call) File “\v-tm-qeq-05Prefectlibsite-packagesanyiofrom_thread.py”, line 49, in run return asynclib.run_async_from_thread(func, *args) File “\v-tm-qeq-05Prefectlibsite-packagesanyio_backends_asyncio.py”, line 970, in run_async_from_thread return f.result() File “\v-tm-qeq-05Prefectlibconcurrentfutures_base.py”, line 444, in result return self.__get_result() File “\v-tm-qeq-05Prefectlibconcurrentfutures_base.py”, line 389, in __get_result raise self._exception File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectengine.py”, line 1114, in get_task_call_return_value return await future._result() File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectfutures.py”, line 237, in _result return await final_state.result(raise_on_failure=raise_on_failure, fetch=True) File “C:UserscleungAppDataRoamingPythonPython38site-packagesprefectstates.py”, line 103, in _get_state_result raise MissingResult( prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API. Thanks a lot for your help by the way. Greatly appreciate it!!

  • cleung

    Member
    May 31, 2023 at 12:00 am

    Hi there,

    1. Yes we’re using Prefect to orchestrate PyKX

    2. We’re using a @taskto query using PyKX. The connection is done outside of the flow and task (attached code snippet below)

    3. Switching to flows does work, but we want some functions to execute in parallel. When we switched to all flows/subflows, it seemed like it was all executing sequentially.

    4. We tried doing it in parallel outside of Prefect using from multiprocessing import Pool and it was operating fine

    5. Multiple queries

    6. PyKX version 1.5.4

    7. Unlicensed mode

    8. Using SyncQConnection and IPC to grab handle into port

    port = masterConn('getProcessClient[`prefect_pykx;`pykx_test]')

    conn = kx.SyncQConnection( host='', port=port.py(), username='cleung', password=token)

     

    Code Snippet:

    import pykx as kx 
    import subprocess
    from prefect import flow, task, get_run_logger
    
    masterConn = kx.SyncQConnection( host='v-kdbr-01', port=5000, username='cleung', password=token, timeout=3.0 ) 
    user = masterConn('.z.u') 
    print('User = ' + user.py())
    port = masterConn('getProcessClient[`prefect_pykx;`pykx_test]')
    print('Port = ' + str(port.py())) 
    masterConn.close()
    
    conn = kx.SyncQConnection( host='v-kdbr-01', port=port.py(), username='cleung', password=token) 
    user = conn('.z.u') 
    print('User = ' + user.py()) 
    
    @task
    def saveRecRevAI260():
        logger = get_run_logger()
        logger.info("Executing saveRecRevAI260")
        conn('.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;`reportDate]')
        logger.info("Finished saveRecRevAI260")
        return 1
    
    @task
    def saveEstRevAI():
        logger = get_run_logger()
        logger.info("Executing saveEstRevAI")
        conn('.imq.model.saveEstRevAI[.dt.shiftstartdr[max dailyDates;neg 1];();inLogFile]')
        logger.info("Finished saveEstRevAI")
        return 1
    
    @task
    def saveRetRawNaive():
        logger = get_run_logger()
        logger.info("Executing saveRetRawNaive")
        conn('.imq.model.saveRetRawNaive[dailyDates;`]')
        logger.info("Finished saveRetRawNaive")
        return 1
    
    @task
    def saveEstRevStdDev5Yr():
        logger = get_run_logger()
        logger.info ("Executing saveEstRevStdDev5Yr")
        conn(".imq.model.saveEstRevStdDev5Yr[dailyDates;`reportDate;inLogFile]")
        logger.info("Finished saveEstRevStdDev5Yr")
        return 1
    
    @task
    def saveSRinteractionsEarnItems():
        logger = get_run_logger()
        logger.info("Executing saveSRinteractionsEarnItems")
        conn(".imq.model.saveSRInteractionsEarnItems[dailyDates;inLogFile]")
        logger.info("Finished saveSRinteractionsEarnItems")
    
    @flow()
    def ibes_pykx():
        logger = get_run_logger()
        logger.info("Initializing initial arguments")
        dailyDates = conn('dailyDates: .dt.drb[.dt.shiftdateb[exec max date from QModelD;-3];  exec max date from QModelD];')
        logFile = conn('inLogFile:`')
        
        saveRecRevAI260_result = saveRecRevAI260.submit()
        saveRetRawNaive_result = saveRetRawNaive.submit()
        
        saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result])
        saveSRinteractionsEarnItems_result = saveSRinteractionsEarnItems(wait_for=[saveRetRawNaive_result])
    
        # saveEstRevStdDev5Yr_result = saveEstRevStdDev5Yr(wait_for=[saveEstRevAI_result])
        
    
    if __name__ == "__main__":
        ibes_pykx()

     

    Error Message:

    4:15:15.041 | DEBUG   | Task run 'saveRecRevAI260-0' - Beginning execution...
    14:15:15.046 | INFO    | Task run 'saveRecRevAI260-0' - Executing saveRecRevAI260
    14:15:15.108 | DEBUG   | Task run 'saveRetRawNaive-0' - Beginning execution...
    14:15:15.112 | INFO    | Task run 'saveRetRawNaive-0' - Executing saveRetRawNaive
    14:15:16.486 | DEBUG   | prefect.client - Connecting to API at http://v-tm-qeq-05:4200/api/
    14:16:31.335 | ERROR   | Task run 'saveRecRevAI260-0' - Encountered exception during execution:
    Traceback (most recent call last):
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 1533, in orchestrate_task_run
        result = await run_sync(task.fn, *args, **kwargs)
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
        tg.start_soon(
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
        raise exceptions[0]
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
        result = __fn(*args, **kwargs)
      File "//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py", line 24, in saveRecRevAI260
        conn('.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;`reportDate]')
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 806, in __call__
        return self._call(query, *args, wait=wait)
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 816, in _call
        return self._recv(locked=True)
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 511, in _recv
        res = callback(key.fileobj)
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 535, in _recv_socket
        size = chunks[4]
    IndexError: list index out of range
    14:16:31.558 | ERROR   | Task run 'saveRecRevAI260-0' - Finished in state Failed('Task run encountered an exception: IndexError: list index out of range\n')
    14:16:31.622 | ERROR   | Flow run 'chocolate-marmot' - Encountered exception during execution:
    Traceback (most recent call last):
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 665, in orchestrate_flow_run
        result = await run_sync(flow_call)
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
        tg.start_soon(
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
        raise exceptions[0]
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
        return await get_asynclib().run_sync_in_worker_thread(
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
        return await future
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
        result = context.run(func, *args)
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
        result = __fn(*args, **kwargs)
      File "//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py", line 69, in ibes_pykx
        saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result])
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\tasks.py", line 469, in __call__
        return enter_task_run_engine(
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 965, in enter_task_run_engine
        return run_async_from_worker_thread(begin_run)
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 177, in run_async_from_worker_thread
        return anyio.from_thread.run(call)
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\from_thread.py", line 49, in run
        return asynclib.run_async_from_thread(func, *args)
      File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 970, in run_async_from_thread
        return f.result()
      File "\\v-tm-qeq-05\Prefect\lib\concurrent\futures\_base.py", line 444, in result
        return self.__get_result()
      File "\\v-tm-qeq-05\Prefect\lib\concurrent\futures\_base.py", line 389, in __get_result
        raise self._exception
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 1114, in get_task_call_return_value
        return await future._result()
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\futures.py", line 237, in _result
        return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
      File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\states.py", line 103, in _get_state_result
        raise MissingResult(
    prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.

     

    Thanks a lot for your help by the way. Greatly appreciate it!!

  • rocuinneagain

    Member
    May 31, 2023 at 12:00 am

    Hi,

    1. Are you running PyKX under Prefect? http://www.prefect.io
    2. Are you connecting and querying using PyKX in @task or @flow ?
    3. If using @task does switching to @flow (.i.e subflows) stop the issue happening?
    4. Do you see the same issue if you query in a standalone python process outside of Prefect?
    5. Are you making a single query when the issue happens or are multiple queries being run?
    6. What version of PyKX are you running? pykx.__version__
    7. Are you running in licensed or unlicensed mode? pykx.licensed
    8. How did you create the connection conn?

     

  • rocuinneagain

    Member
    June 1, 2023 at 12:00 am

    The issue you are having is unrelated to peach/multithreading on the kdb+ side. The issue is the reuse of the same connection among@taskblocks. The different @tasks are attempting to read data from the same connection concurrently leading to junk data being passed through. kdb+ processes incoming queries sequentially. This means even through you see the log messages from the tasks as if they run in parallel in fact once these arrive to kdb+ they will be processed one after the other always. For this reason switching to @flow will not be slower and will result is safe consistent results.

  • rocuinneagain

    Member
    June 1, 2023 at 12:00 am

    The issue you are having is unrelated to peach/multithreading on the kdb+ side.

     

    The issue is the reuse of the same connection among @task blocks.

    The different @tasks are attempting to read data from the same connection concurrently leading to junk data being passed through.

     

    kdb+ processes incoming queries sequentially. This means even through you see the log messages from the tasks as if they run in parallel in fact once these arrive to kdb+ they will be processed one after the other always. For this reason switching to @flow will not be slower and will result is safe consistent results.

     

     

  • rocuinneagain

    Member
    April 19, 2024 at 12:13 pm

    We added some functionality in version 2.3.0

    https://code.kx.com/pykx/2.4/release-notes/changelog.html#pykx-230

    If you enable beta features as well as pykx threading all calls into q from any python thread will be run as if they were calling from the main thread which allows python multithreaded programs to use IPC connections in licensed mode.

    You can enable this functionality like this:

    import os
    os.environ['PYKX_THREADING'] = '1'
    os.environ['PYKX_BETA_FEATURES'] = '1'
    import pykx as kx

    You will also want to ensure that kx.shutdown_thread()​ is called when the script finishes. The safest way to do this is within a try​ – finally​ block like this.

    if __name__ == '__main__':
    try:
    main()
    finally:
    kx.shutdown_thread()

    More information about this functionality and an example can be found within our documentation:

    https://code.kx.com/pykx/2.4/examples/threaded_execution/threading.html

Log in to reply.