Skip to content

vllm.plugins.io_processors.interface

IOProcessorInput module-attribute

IOProcessorInput = TypeVar('IOProcessorInput')

IOProcessorOutput module-attribute

IOProcessorOutput = TypeVar('IOProcessorOutput')

IOProcessor

Bases: ABC, Generic[IOProcessorInput, IOProcessorOutput]

Source code in vllm/plugins/io_processors/interface.py
class IOProcessor(ABC, Generic[IOProcessorInput, IOProcessorOutput]):
    def __init__(self, vllm_config: VllmConfig):
        super().__init__()

        self.vllm_config = vllm_config

    def parse_data(self, data: object) -> IOProcessorInput:
        if callable(parse_request := getattr(self, "parse_request", None)):
            warnings.warn(
                "`parse_request` has been renamed to `parse_data`. "
                "Please update your IO Processor Plugin to use the new name. "
                "The old name will be removed in v0.19.",
                DeprecationWarning,
                stacklevel=2,
            )

            return parse_request(data)  # type: ignore

        raise NotImplementedError

    def merge_sampling_params(
        self,
        params: SamplingParams | None = None,
    ) -> SamplingParams:
        if callable(
            validate_or_generate_params := getattr(
                self, "validate_or_generate_params", None
            )
        ):
            warnings.warn(
                "`validate_or_generate_params` has been split into "
                "`merge_sampling_params` and `merge_pooling_params`."
                "Please update your IO Processor Plugin to use the new methods. "
                "The old name will be removed in v0.19.",
                DeprecationWarning,
                stacklevel=2,
            )

            return validate_or_generate_params(params)  # type: ignore

        return params or SamplingParams()

    def merge_pooling_params(
        self,
        params: PoolingParams | None = None,
    ) -> PoolingParams:
        if callable(
            validate_or_generate_params := getattr(
                self, "validate_or_generate_params", None
            )
        ):
            warnings.warn(
                "`validate_or_generate_params` has been split into "
                "`merge_sampling_params` and `merge_pooling_params`."
                "Please update your IO Processor Plugin to use the new methods. "
                "The old name will be removed in v0.19.",
                DeprecationWarning,
                stacklevel=2,
            )

            return validate_or_generate_params(params)  # type: ignore

        return params or PoolingParams(task="plugin")

    @abstractmethod
    def pre_process(
        self,
        prompt: IOProcessorInput,
        request_id: str | None = None,
        **kwargs,
    ) -> PromptType | Sequence[PromptType]:
        raise NotImplementedError

    async def pre_process_async(
        self,
        prompt: IOProcessorInput,
        request_id: str | None = None,
        **kwargs,
    ) -> PromptType | Sequence[PromptType]:
        return self.pre_process(prompt, request_id, **kwargs)

    @abstractmethod
    def post_process(
        self,
        model_output: Sequence[PoolingRequestOutput],
        request_id: str | None = None,
        **kwargs,
    ) -> IOProcessorOutput:
        raise NotImplementedError

    async def post_process_async(
        self,
        model_output: AsyncGenerator[tuple[int, PoolingRequestOutput]],
        request_id: str | None = None,
        **kwargs,
    ) -> IOProcessorOutput:
        # We cannot guarantee outputs are returned in the same order they were
        # fed to vLLM.
        # Let's sort them by id before post_processing
        sorted_output = sorted(
            [(i, item) async for i, item in model_output], key=lambda output: output[0]
        )
        collected_output = [output[1] for output in sorted_output]
        return self.post_process(collected_output, request_id=request_id, **kwargs)

vllm_config instance-attribute

vllm_config = vllm_config

__init__

__init__(vllm_config: VllmConfig)
Source code in vllm/plugins/io_processors/interface.py
def __init__(self, vllm_config: VllmConfig):
    super().__init__()

    self.vllm_config = vllm_config

merge_pooling_params

merge_pooling_params(
    params: PoolingParams | None = None,
) -> PoolingParams
Source code in vllm/plugins/io_processors/interface.py
def merge_pooling_params(
    self,
    params: PoolingParams | None = None,
) -> PoolingParams:
    if callable(
        validate_or_generate_params := getattr(
            self, "validate_or_generate_params", None
        )
    ):
        warnings.warn(
            "`validate_or_generate_params` has been split into "
            "`merge_sampling_params` and `merge_pooling_params`."
            "Please update your IO Processor Plugin to use the new methods. "
            "The old name will be removed in v0.19.",
            DeprecationWarning,
            stacklevel=2,
        )

        return validate_or_generate_params(params)  # type: ignore

    return params or PoolingParams(task="plugin")

merge_sampling_params

merge_sampling_params(
    params: SamplingParams | None = None,
) -> SamplingParams
Source code in vllm/plugins/io_processors/interface.py
def merge_sampling_params(
    self,
    params: SamplingParams | None = None,
) -> SamplingParams:
    if callable(
        validate_or_generate_params := getattr(
            self, "validate_or_generate_params", None
        )
    ):
        warnings.warn(
            "`validate_or_generate_params` has been split into "
            "`merge_sampling_params` and `merge_pooling_params`."
            "Please update your IO Processor Plugin to use the new methods. "
            "The old name will be removed in v0.19.",
            DeprecationWarning,
            stacklevel=2,
        )

        return validate_or_generate_params(params)  # type: ignore

    return params or SamplingParams()

parse_data

parse_data(data: object) -> IOProcessorInput
Source code in vllm/plugins/io_processors/interface.py
def parse_data(self, data: object) -> IOProcessorInput:
    if callable(parse_request := getattr(self, "parse_request", None)):
        warnings.warn(
            "`parse_request` has been renamed to `parse_data`. "
            "Please update your IO Processor Plugin to use the new name. "
            "The old name will be removed in v0.19.",
            DeprecationWarning,
            stacklevel=2,
        )

        return parse_request(data)  # type: ignore

    raise NotImplementedError

post_process abstractmethod

post_process(
    model_output: Sequence[PoolingRequestOutput],
    request_id: str | None = None,
    **kwargs,
) -> IOProcessorOutput
Source code in vllm/plugins/io_processors/interface.py
@abstractmethod
def post_process(
    self,
    model_output: Sequence[PoolingRequestOutput],
    request_id: str | None = None,
    **kwargs,
) -> IOProcessorOutput:
    raise NotImplementedError

post_process_async async

post_process_async(
    model_output: AsyncGenerator[
        tuple[int, PoolingRequestOutput]
    ],
    request_id: str | None = None,
    **kwargs,
) -> IOProcessorOutput
Source code in vllm/plugins/io_processors/interface.py
async def post_process_async(
    self,
    model_output: AsyncGenerator[tuple[int, PoolingRequestOutput]],
    request_id: str | None = None,
    **kwargs,
) -> IOProcessorOutput:
    # We cannot guarantee outputs are returned in the same order they were
    # fed to vLLM.
    # Let's sort them by id before post_processing
    sorted_output = sorted(
        [(i, item) async for i, item in model_output], key=lambda output: output[0]
    )
    collected_output = [output[1] for output in sorted_output]
    return self.post_process(collected_output, request_id=request_id, **kwargs)

pre_process abstractmethod

pre_process(
    prompt: IOProcessorInput,
    request_id: str | None = None,
    **kwargs,
) -> PromptType | Sequence[PromptType]
Source code in vllm/plugins/io_processors/interface.py
@abstractmethod
def pre_process(
    self,
    prompt: IOProcessorInput,
    request_id: str | None = None,
    **kwargs,
) -> PromptType | Sequence[PromptType]:
    raise NotImplementedError

pre_process_async async

pre_process_async(
    prompt: IOProcessorInput,
    request_id: str | None = None,
    **kwargs,
) -> PromptType | Sequence[PromptType]
Source code in vllm/plugins/io_processors/interface.py
async def pre_process_async(
    self,
    prompt: IOProcessorInput,
    request_id: str | None = None,
    **kwargs,
) -> PromptType | Sequence[PromptType]:
    return self.pre_process(prompt, request_id, **kwargs)