# /// script
# requires-python = ">=3.10,<3.14"
# dependencies = [
# "pipecat-ai[aic,local,webrtc]",
# "loguru",
# "llvmlite",
# "pyaudio",
# "fastapi",
# "uvicorn",
# "dotenv",
# "pipecat-ai-small-webrtc-prebuilt",
# ]
# ///
import os
from loguru import logger
from pipecat.audio.filters.aic_filter import AICFilter
from pipecat.frames.frames import Frame, InputAudioRawFrame, OutputAudioRawFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.runner.types import RunnerArguments
from pipecat.runner.utils import create_transport
from pipecat.transports.base_transport import BaseTransport, TransportParams
# Loopback Processor
class AudioFrameConverter(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, InputAudioRawFrame):
output_frame = OutputAudioRawFrame(
audio=frame.audio,
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
)
await self.push_frame(output_frame, direction)
else:
await self.push_frame(frame, direction)
# Bot Logic
async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
logger.info("Bot starting: Direct Audio Loopback with AIC Filter")
converter = AudioFrameConverter()
pipeline = Pipeline(
[
transport.input(),
converter,
transport.output(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(),
)
@transport.event_handler("on_client_connected")
async def on_client_connected(transport, client):
logger.info("WebRTC Client Connected")
@transport.event_handler("on_client_disconnected")
async def on_client_disconnected(transport, client):
logger.info("WebRTC Client Disconnected")
await task.cancel()
runner = PipelineRunner(handle_sigint=runner_args.handle_sigint)
await runner.run(task)
async def bot(runner_args: RunnerArguments):
# initialize aic filter
aic_filter = AICFilter(
license_key=os.getenv("AIC_LICENSE_KEY", ""),
model_id="quail-vf-l-16khz", # or "quail-l-16khz", "quail-s-8khz", etc.
)
transport_params = {
"webrtc": lambda: TransportParams(
audio_in_enabled=True,
audio_out_enabled=True,
audio_in_filter=aic_filter,
)
}
transport = await create_transport(runner_args, transport_params)
await run_bot(transport, runner_args)
if __name__ == "__main__":
from pipecat.runner.run import main
main()