2023.05.30 01:34 PM - edited 2023.05.30 01:35 PM
Hi there,
I have a PyKx process that uses IPC to connect to a port, and we're running some KDB functions that involve multithreading (cutPeach). However everytime I run it, I keep getting the following error:
Traceback (most recent call last):
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 1533, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
result = context.run(func, *args)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py", line 40, in saveRetRawNaive
conn('.imq.model.saveRetRawNaive[dailyDates;`]')
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 806, in __call__
return self._call(query, *args, wait=wait)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 816, in _call
return self._recv(locked=True)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 511, in _recv
res = callback(key.fileobj)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 535, in _recv_socket
size = chunks[4]
IndexError: list index out of range
Any guidance would be appreciated. Thanks!
2023.06.01 01:43 AM
The issue you are having is unrelated to peach/multithreading on the kdb+ side.
The issue is the reuse of the same connection among @task
blocks.
The different @tasks
are attempting to read data from the same connection concurrently leading to junk data being passed through.
kdb+ processes incoming queries sequentially. This means even through you see the log messages from the tasks as if they run in parallel in fact once these arrive to kdb+ they will be processed one after the other always. For this reason switching to @flow
will not be slower and will result is safe consistent results.
2023.05.31 02:13 AM
Hi,
@task
or @flow
?@task
does switching to @flow
(.i.e subflows) stop the issue happening?pykx.__version__
pykx.licensed
conn
?
2023.05.31 02:18 PM
Hi there,
1. Yes we're using Prefect to orchestrate PyKX
2. We're using a @task
to query using PyKX. The connection is done outside of the flow and task (attached code snippet below)
3. Switching to flows does work, but we want some functions to execute in parallel. When we switched to all flows/subflows, it seemed like it was all executing sequentially.
4. We tried doing it in parallel outside of Prefect using from multiprocessing import Pool
and it was operating fine
5. Multiple queries
6. PyKX version 1.5.4
7. Unlicensed mode
8. Using SyncQConnection and IPC to grab handle into port
port = masterConn('getProcessClient[`prefect_pykx;`pykx_test]')
conn = kx.SyncQConnection( host='', port=port.py(), username='cleung', password=token)
Code Snippet:
import pykx as kx
import subprocess
from prefect import flow, task, get_run_logger
masterConn = kx.SyncQConnection( host='v-kdbr-01', port=5000, username='cleung', password=token, timeout=3.0 )
user = masterConn('.z.u')
print('User = ' + user.py())
port = masterConn('getProcessClient[`prefect_pykx;`pykx_test]')
print('Port = ' + str(port.py()))
masterConn.close()
conn = kx.SyncQConnection( host='v-kdbr-01', port=port.py(), username='cleung', password=token)
user = conn('.z.u')
print('User = ' + user.py())
@task
def saveRecRevAI260():
logger = get_run_logger()
logger.info("Executing saveRecRevAI260")
conn('.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;`reportDate]')
logger.info("Finished saveRecRevAI260")
return 1
@task
def saveEstRevAI():
logger = get_run_logger()
logger.info("Executing saveEstRevAI")
conn('.imq.model.saveEstRevAI[.dt.shiftstartdr[max dailyDates;neg 1];();inLogFile]')
logger.info("Finished saveEstRevAI")
return 1
@task
def saveRetRawNaive():
logger = get_run_logger()
logger.info("Executing saveRetRawNaive")
conn('.imq.model.saveRetRawNaive[dailyDates;`]')
logger.info("Finished saveRetRawNaive")
return 1
@task
def saveEstRevStdDev5Yr():
logger = get_run_logger()
logger.info ("Executing saveEstRevStdDev5Yr")
conn(".imq.model.saveEstRevStdDev5Yr[dailyDates;`reportDate;inLogFile]")
logger.info("Finished saveEstRevStdDev5Yr")
return 1
@task
def saveSRinteractionsEarnItems():
logger = get_run_logger()
logger.info("Executing saveSRinteractionsEarnItems")
conn(".imq.model.saveSRInteractionsEarnItems[dailyDates;inLogFile]")
logger.info("Finished saveSRinteractionsEarnItems")
@flow()
def ibes_pykx():
logger = get_run_logger()
logger.info("Initializing initial arguments")
dailyDates = conn('dailyDates: .dt.drb[.dt.shiftdateb[exec max date from QModelD;-3]; exec max date from QModelD];')
logFile = conn('inLogFile:`')
saveRecRevAI260_result = saveRecRevAI260.submit()
saveRetRawNaive_result = saveRetRawNaive.submit()
saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result])
saveSRinteractionsEarnItems_result = saveSRinteractionsEarnItems(wait_for=[saveRetRawNaive_result])
# saveEstRevStdDev5Yr_result = saveEstRevStdDev5Yr(wait_for=[saveEstRevAI_result])
if __name__ == "__main__":
ibes_pykx()
Error Message:
4:15:15.041 | DEBUG | Task run 'saveRecRevAI260-0' - Beginning execution...
14:15:15.046 | INFO | Task run 'saveRecRevAI260-0' - Executing saveRecRevAI260
14:15:15.108 | DEBUG | Task run 'saveRetRawNaive-0' - Beginning execution...
14:15:15.112 | INFO | Task run 'saveRetRawNaive-0' - Executing saveRetRawNaive
14:15:16.486 | DEBUG | prefect.client - Connecting to API at http://v-tm-qeq-05:4200/api/
14:16:31.335 | ERROR | Task run 'saveRecRevAI260-0' - Encountered exception during execution:
Traceback (most recent call last):
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 1533, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
result = context.run(func, *args)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py", line 24, in saveRecRevAI260
conn('.imq.model.saveRecRevAI260[.dt.shiftstartdr[max dailyDates;neg 1];inLogFile;`reportDate]')
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 806, in __call__
return self._call(query, *args, wait=wait)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 816, in _call
return self._recv(locked=True)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 511, in _recv
res = callback(key.fileobj)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\pykx\ipc.py", line 535, in _recv_socket
size = chunks[4]
IndexError: list index out of range
14:16:31.558 | ERROR | Task run 'saveRecRevAI260-0' - Finished in state Failed('Task run encountered an exception: IndexError: list index out of range\n')
14:16:31.622 | ERROR | Flow run 'chocolate-marmot' - Encountered exception during execution:
Traceback (most recent call last):
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 665, in orchestrate_flow_run
result = await run_sync(flow_call)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 156, in run_sync_in_interruptible_worker_thread
tg.start_soon(
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 867, in run
result = context.run(func, *args)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File "//v-tm-qeq-05/PrefectScripts/cleung/Script/ibes_pykx.py", line 69, in ibes_pykx
saveEstRevAI_result = saveEstRevAI(wait_for=[saveRecRevAI260_result])
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\tasks.py", line 469, in __call__
return enter_task_run_engine(
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 965, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\utilities\asyncutils.py", line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "\\v-tm-qeq-05\Prefect\lib\site-packages\anyio\_backends\_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "\\v-tm-qeq-05\Prefect\lib\concurrent\futures\_base.py", line 444, in result
return self.__get_result()
File "\\v-tm-qeq-05\Prefect\lib\concurrent\futures\_base.py", line 389, in __get_result
raise self._exception
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\engine.py", line 1114, in get_task_call_return_value
return await future._result()
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "C:\Users\cleung\AppData\Roaming\Python\Python38\site-packages\prefect\states.py", line 103, in _get_state_result
raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Thanks a lot for your help by the way. Greatly appreciate it!!
2023.06.01 01:43 AM
The issue you are having is unrelated to peach/multithreading on the kdb+ side.
The issue is the reuse of the same connection among @task
blocks.
The different @tasks
are attempting to read data from the same connection concurrently leading to junk data being passed through.
kdb+ processes incoming queries sequentially. This means even through you see the log messages from the tasks as if they run in parallel in fact once these arrive to kdb+ they will be processed one after the other always. For this reason switching to @flow
will not be slower and will result is safe consistent results.
EMEA
Tel: +44 (0)28 3025 2242
AMERICAS
Tel: +1 (212) 447 6700
APAC
Tel: +61 (0)2 9236 5700
KX. All Rights Reserved.
KX and kdb+ are registered trademarks of KX Systems, Inc., a subsidiary of FD Technologies plc.