Using Pipes and Sockets for asynchronous local AI Chats
Recently I was experimenting with writing a CLI interface from scratch. There’s a variety of new AI tools out there, but I was on the lookout for a plugin for Vim that would allow me to use a ChatGPT-like workflow without the constant alt-tabbing between windows and relentless copy-pasting.
The best one I found was vim-ai
by madox2
, I highly
recommend checking it out. It seems to be the closest candidate for what I was looking for, but its
chat functionality is synchronous. Every query paused all actions in Vim, meaning I couldn’t keep
exploring until after the answer had fully generated. I explored the codebase, but it looked like it
was not a simple feature to add, so I decided to explore this problem from a fresh canvas.
I was hesitant to use vimscript
straight away (though I eventually built a solution with this). I
initially took to Python to see if I could make a Vim-independent CLI solution so that:
- Chats would be launched through
:terminal
buffers (for easy OOTB asynchronous execution) - Answers could be piped directly to the clipboard or a Vim register
- Chat buffer would automatically close when the user quit the chat CLI
This would give me the asynchronous implementation straight away. I used ncurses
to generate a
simple CLI and learned about sockets and pipes to allow this CLI to send and receive data to other
processes. Essentially this CLI would behave a bit like less
and watch
combined together with
some data processing thrown in.
Using Ollama for a local API, I could generate resposes using
curl -s http://localhost:11434/api/chat --no-buffer -d '{
"model": "phi4:latest",
"messages": [
{
"role": "user",
"content": "hello"
}
]
}'
which gives me a streaming repose like
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.51450141Z","message":{"role":"assistant","content":"Hello"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.60217441Z","message":{"role":"assistant","content":"!"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.676216968Z","message":{"role":"assistant","content":" How"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.749957446Z","message":{"role":"assistant","content":" can"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.823717019Z","message":{"role":"assistant","content":" I"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.897391602Z","message":{"role":"assistant","content":" assist"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:47.971128894Z","message":{"role":"assistant","content":" you"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:48.045499791Z","message":{"role":"assistant","content":" today"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:48.11924118Z","message":{"role":"assistant","content":"?"},"done":false}
{"model":"phi4:latest","created_at":"2025-02-15T04:15:49.391045405Z","message":{"role":"assistant","content":""},"done_reason":"stop","done":true,"total_duration":21531784605,"load_duration":19128188606,"prompt_eval_count":11,"prompt_eval_duration":522000000,"eval_count":26,"eval_duration":1877000000}
So this CLI would have to listen to these incoming json
objects, parse the content out of them and
display it on a screen.
So I started with defining a single class for this CLI, which has a socket for receiving input:
import time
import os
import curses
import threading
import socket
import textwrap
SOCKET_PATH = "/tmp/pipe_viewer_socket"
class Viewer:
def __init__(self, stdscr):
self.stdscr = stdscr
self.running = True
...
def read_socket(self):
"""Continuously accept connections and read from the Unix domain socket."""
try:
while self.running:
...
except KeyboardInterrupt:
self.running = False
def run(self):
"""Main curses loop to update the display."""
...
if __name__ == "__main__":
try:
curses.wrapper(Viewer)
except KeyboardInterrupt:
pass
In our __init__
method, we’ll declare the state we need to track of and start the threads:
SOCKET_PATH = "/tmp/my_socket"
class Viewer:
def __init__(self, stdscr):
self.stdscr = stdscr
self.running = True
self.buffer = [""]
self.lock = threading.Lock()
# Ensure no leftover socket file
if os.path.exists(SOCKET_PATH):
os.remove(SOCKET_PATH)
# Create a Unix domain socket
self.server_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.server_socket.bind(SOCKET_PATH)
self.server_socket.listen()
First we’ll define the loops for listening to the socket:
def read_socket(self):
"""Continuously accept connections and read from the Unix domain socket."""
try:
while self.running: # Re-open the socket if it closes
conn, _ = self.server_socket.accept()
with conn:
while self.running: # Listen continuously and accept data in 1024 bytes at a time
data = conn.recv(1024)
if not data:
break
with self.lock: # Lock structures to this thread to avoid concurrency issues
line = data.decode()
try:
data = json.loads(line, strict=False)
except json.JSONDecodeError:
continue
content = data.get("message", {}).get("content", "")
done = data.get("done", False)
if not self.request_in_progress:
self.request_in_progress = True
self.buffer[-1] += content
if done:
self.request_in_progress = False
self.buffer[-1] += "\n\n"
self.buffer.append("")
except KeyboardInterrupt:
self.running = False
We can render this data in the screen using a run
method:
def run(self):
"""Main curses loop to update the display."""
curses.curs_set(0) # Hide cursor
self.stdscr.timeout(100) # Refresh every 100ms
while self.running:
self.stdscr.clear()
h, w = self.stdscr.getmaxyx()
with self.lock:
wrapped_buffer = textwrap.wrap("\n".join(self.buffer), width=w - 1))
scroll_offset = max(0, len(wrapped_buffer) - self.stdscr.getmaxyx()[0])
total_lines = len(wrapped_buffer)
start = max(0, total_lines - h - self.scroll_offset)
visible_lines = wrapped_buffer[start : start + h]
for i, line in enumerate(visible_lines):
if i >= h:
break
self.stdscr.addstr(i, 0, line)
self.server_socket.close()
os.remove(SOCKET_PATH)
Then we can launch these this method on separate threads in __init__
, and start running the
application by calling the run
method:
def __init__(self, stdscr):
...
# Start the input listener in a separate thread
self.listener_thread = threading.Thread(target=self.read_socket, daemon=True)
self.listener_thread.start()
self.run()
And now launching this script with python
, we get the rendered buffer where we can pipe curl
outputs from Ollama into so they can be displayed and rendered.
The next steps are to managing copying and pasting, I’ll write about how I solved that in a future post. For now, I hope this article on listening to sockets was informative!