乐于分享
好东西不私藏

TensorRT-LLM 0.5.0 源码之八

TensorRT-LLM 0.5.0 源码之八

activation.py

class Mish(Module):

    def
 forward(self, input):
        return
 input * tanh(softplus(input, beta=1.0, threshold=20.0))

cast.py

class Cast(Module):

    def
 __init__(self, output_dtype: str = 'float32') -> None:
        super
().__init__()
        assert
 output_dtype in ('float32', 'float16', 'bfloat16', 'bool',
                                'int32'
, 'int8'), TypeError(
                                    "%s is not supported"
 % output_dtype)
        self
.output_dtype = output_dtype

    def
 forward(self, x):
        return
 cast(x, self.output_dtype)
def cast(input: Tensor, dtype: Union[str, trt.DataType]):
    '''
    Add a cast operation.

    For an input tensor of type INT8, this function sets the dynamic range of
    the input to [-127, 127] for automatic dequantization. For a cast into
    INT8, that function sets the dynamic range of the output to [-127, 127] for
    automatic quantization.

    Parameters:
        input : Tensor
            The input tensor on which the cast is applied.

        dtype : str or trt.DataType
            The data type of the output tensor after the cast. When 'dtype' is
            provided as a string, it must be a name amongst the valid names.
            See _str_to_trt_dtype_dict in _utils.py for a list of supported
            types and type names.

    Returns:
        The tensor produced by the inserted layer.
    '''

    if
 isinstance(dtype, str):
        cvt_dtype = str_dtype_to_trt(dtype)
    elif
 isinstance(dtype, trt.DataType):
        cvt_dtype = dtype
    else
:
        raise
 TypeError("%s is not supported" % type(dtype))

    layer = default_trtnet().add_cast(input.trt_tensor, cvt_dtype)
    if
 not default_net().strongly_typed:
        layer.set_output_type(0, cvt_dtype)
    output = _create_tensor(layer.get_output(0), layer)
    if
 input.dtype == str_dtype_to_trt('int8'):
        layer.get_input(0).set_dynamic_range(-127, 127)
    if
 cvt_dtype == str_dtype_to_trt('int8'):
        layer.get_output(0).set_dynamic_range(-127, 127)

    return
 output

pooling.py

class AvgPool2d(Module):

    def
 __init__(self,
                 kernel_size: Tuple[int],
                 stride: Optional[Tuple[int]] = None,
                 padding: Optional[Tuple[int]] = (0, 0),
                 ceil_mode: bool = False,
                 count_include_pad: bool = True
) -> None:
        super
().__init__()
        self
.kernel_szie = kernel_size
        self
.stride = stride
        self
.padding = padding
        self
.ceil_mode = ceil_mode
        self
.count_include_pad = count_include_pad

    def
 forward(self, input):
        return
 avg_pool2d(input, self.kernel_szie, self.stride, self.padding,
                          self
.ceil_mode, self.count_include_pad)



def
 avg_pool2d(input: Tensor,
               kernel_size: Tuple[int],
               stride: Optional[Tuple[int]] = None,
               padding: Optional[Tuple[int]] = (0, 0),
               ceil_mode: bool = False,
               count_include_pad: bool = True
) -> Tensor:

    ##

    ## TODO: Document that function!

    ##


    assert
 not input.is_dynamic() # [B, C, H, W] or [B, C, T] = [B, H, W]
    ndim = input.ndim()
    if
 ndim == 3:
        input
 = expand_dims(input, 0) # [1, B, C, T]

    layer = default_trtnet().add_pooling(input.trt_tensor,
                                         trt.PoolingType.AVERAGE, kernel_size)
    if
 stride is None:
        layer.stride = kernel_size
    else
:
        layer.stride = stride

    output = _create_tensor(layer.get_output(0), layer)

    if
 ndim == 3:
        return
 output.view(
            concat([output.size(1),
                    output.size(2),
                    output.size(3)])) # [B, C, T] = [B, H, W]

    return
 output


def
 expand_dims(input: Tensor, dim: Union[int, Sequence[int]]) -> Tensor:
    '''
    Add an operation to expand the tensor shape with singleton dimensions.

    That function adds a tensorrt.IShuffleLayer to the network. Given an 'input'
    of rank N and a sequence of M dimensions, the output tensor produced by
    this operation (when executed by TensorRT) will have a rank of N+M. Singleton
    dimensions will be inserted at the different positions in 'dim'.

    The pseudo-code for that operation is:

        new_shape, ii = [], 0
        for jj in range(input.rank() + len(dim)):
            new_shape.append(1 if jj in dims else input.shape[ii++])

    For example, for a tensor of shape [3, 4, 1, 5]

        expand_dims(input, [0, 2])

    will produce a tensor of shape [1, 3, 1, 4, 1, 5].

    Parameters:
        input : Tensor
            The input tensor to expand.

        dim : Union[int, Sequence[int]]
            The positions in the output tensor where to insert singleton
            dimensions.

    Returns:
        The tensor produced by the shuffle layer.
    '''

    if
 isinstance(dim, int):
        dim = (dim, )

    out_ndim = len(dim) + input.ndim()

    input_shape = shape(input)
    out_shapes = []
    j = 0
    for
 i in range(out_ndim):
        if
 i in dim:
            out_shapes.append(1)
        else
:
            out_shapes.append(gather(input_shape, 0, j))
            j = j + 1

    out_shape = concat(out_shapes)

    return
 view(input, out_shape)
def gather(input: Tensor, dim: int, indices: Union[Tensor, int]) -> Tensor:
    '''
    Add an operation to gather elements from a tensor.

    That function implements the GatherElements operator from the ONNX
    specification as described in

        https://github.com/onnx/onnx/blob/main/docs/Operators.md#GatherElements

    The input and indices arguments must have the same rank >= 1. The operation
    will produce a tensor with the same shape as the indices tensor. The axis
    is the dimension to gather on.

    As shown in the ONNX description, for a 3D tensor, the output is:

        out[i][j][k] = input[indices[i][j][k]][j][k] if axis = 0,
        out[i][j][k] = input[i][indices[i][j][k]][k] if axis = 1,
        out[i][j][k] = input[i][j][indices[i][j][k]] if axis = 2.

    For example,

        gather([[4, 2], [5, 3]], 0, [[1, 0], [0, 1]])

    will produce [[5, 2], [4, 3]].

        gather([[1, 2, 3], [4, 5, 6], 1, [[1], [0]])

    will produce [[2], [4]]. See the ONNX documentation for more examples.

    That operation maps to the TensorRT IGatherLayer.

    Parameters:
        input : Tensor
            The input tensor to gather elements from.

        dim : int
            The dimension to gather on.

        indices : Union[Tensor, int]
            The positions in the 'dim' dimension to gather from.

    Returns:
        The tensor containing the gathered elements. It has the same shape as
        the indices tensor.
    '''

    if
 isinstance(indices, int):
        indices = constant(int32_array([indices]))

    # The input and indices tensors must have the same rank.

    assert
 input.rank() == indices.rank()

    layer = default_trtnet().add_gather_v2(input.trt_tensor,
                                           indices.trt_tensor,
                                           mode=trt.GatherMode.ELEMENT)

    if
 dim < 0:
        dim = input.ndim() + dim
    layer.axis = dim
    return
 _create_tensor(layer.get_output(0), layer)
def view(input: Tensor,
         shape: Union[Tensor, Sequence[int]],
         zero_is_placeholder: bool = True
) -> Tensor:
    '''
    Add an operation to create a view of a tensor.

    That operation adds a tensorrt.IShuffleLayer to the network. If the 'shape'
    parameter is a Tensor, that view is dynamic. Otherwise, it is a static
    view.

    Note that TensorRT limits the number of inferred dimensions to 1. It means
    that the shape sequence or tensor cannot contain more than one -1. This
    function enforces that constraint and will assert if it is not respected.

    Parameters:
        input : Tensor
            The input tensor to transpose.

        shape : Union[Tensor, Sequence[int]]
            The shape of the new tensor.

        zero_is_placeholder : bool
            When that parameter is True, the 0s in 'shape' are replaced by the
            sizes of the corresponding dimensions from the 'input'. Otherwise,
            the dimensions corresponding to 0s are shrinked.

    Returns:
        The tensor produced by the view/shuffle layer.
    '''


    # TensorRT demands that at most one dimension is permitted to be specified as -1

    def
 assert_no_more_than_one_inferred_dim(list):
        inferred_dim_list = [i for i in list if i == -1]
        assert
 len(inferred_dim_list) <= 1

    layer = default_trtnet().add_shuffle(input.trt_tensor)
    layer.zero_is_placeholder = zero_is_placeholder
    if
 isinstance(shape, Tensor):
        assert_no_more_than_one_inferred_dim(shape.shape)
        layer.set_input(1, shape.trt_tensor)
    elif
 isinstance(shape, (list, tuple)):
        assert_no_more_than_one_inferred_dim(shape)
        layer.reshape_dims = tuple(shape)
    else
:
        raise
 TypeError("%s is not supported" % type(shape))
    return
 _create_tensor(layer.get_output(0), layer)
def concat(inputs: Sequence[Union[Tensor, int]], dim: int = 0) -> Tensor:
    '''
    Add an operation to concatenate tensors.

    The function creates an operation that concatenates the tensors from the
    sequence 'inputs'. The concatenation is done along the dimension 'dim'.

    All the tensors in 'inputs' must have the same shape expect for the
    dimension 'dim'.

        for ii in range(inputs[0].rank()):
            assert (ii == dim) or all(inp.shape[ii] == inputs[0].shape[ii] for inp in inputs)

    The shape of the output tensor is defined as:

        for ii in range(inputs[0].rank()):
            # Same size as all the inputs in dimension ii != dim.
            output.shape[ii] = inputs[0].shape[ii]

            # Sum of the sizes in the different inputs in dimension 'dim'.
            if ii == dim:
                for jj in range(1, len(inputs)):
                    output.shape[ii] += inputs[jj].shape[ii]

    For example, given a sequence of two 2D tensors [[0, 1], [2, 3]] and
    [[4, 5], [6, 7]] both of shape [2, 2],

        concat(inputs, 0)

    will produce [[[0, 1], [2, 3]], [[4, 5], [6, 7]]] of shape [4, 2] and

        concat(inputs, 1)

    will produce [[0, 1, 4, 5], [2, 3, 6, 7]] of shape [2, 4].

    Parameters:
        inputs : Sequence[Union[Tensor, int]]
            The sequence of tensors to concatenate. For integers, that function
            creates constant tensors.

        dim : int
            The dimension in which the concatenation is performed.

    Returns:
        A tensor that contains the concatenation of the tensors.
    '''

    tmp = []
    for
 i in inputs:
        if
 isinstance(i, int):
            tmp.append(constant(int32_array([i])))
        elif
 i.rank() == 0:
            tmp.append(i.view([1]))
        else
:
            tmp.append(i)

    layer = default_trtnet().add_concatenation([i.trt_tensor for i in tmp])
    layer.axis = dim
    return
 _create_tensor(layer.get_output(0), layer)

normalization.py

class LayerNorm(Module):

    def
 __init__(self,
                 normalized_shape,
                 eps=1e-05,
                 elementwise_affine=True,
                 dtype=None
):
        super
().__init__()
        if
 isinstance(normalized_shape, int):
            normalized_shape = (normalized_shape, )
        self
.normalized_shape = tuple(normalized_shape)
        self
.elementwise_affine = elementwise_affine
        if
 self.elementwise_affine:
            self
.weight = Parameter(shape=self.normalized_shape, dtype=dtype)
            self
.bias = Parameter(shape=self.normalized_shape, dtype=dtype)
        else
:
            self
.register_parameter('weight', None)
            self
.register_parameter('bias', None)

        self
.eps = eps

    def
 forward(self, x):
        weight = None if self.weight is None else self.weight.value
        bias = None if self.bias is None else self.bias.value
        return
 layer_norm(x, self.normalized_shape, weight, bias, self.eps)

def
 layer_norm(input: Tensor,
               normalized_shape: Union[int, Tuple[int]],
               weight: Optional[Tensor] = None,
               bias: Optional[Tensor] = None,
               eps: float = 1e-05,
               use_diff_of_squares: bool = True
) -> Tensor:
    '''
    Add a layer-norm operation on a tensor.

    That operation applies the layer-normalization to its input tensor. In its
    simplest form, for large language models, the 'normalized_shape' should be
    set to the hidden dimension of the activation tensor. Otherwise, it is the
    shape of the normalized fraction of the tensor (starting from the
    right-most dimension).

    The 'weight' tensor corresponds to 'gamma' in the layer-norm formula and
    'bias' is 'beta'. The 'eps' value is added to the variance before computing
    the squared-root.

    This implementation (when using the plugin) supports an additional flag to
    enable/disable the use of a difference of squares ('Var = Mean(X^2) -
    Mean(X)^2').

    Parameters:
        input : Tensor
            The tensor to normalize.

        normalized_shape : Union[int, Tuple[int]]
            The shape of the sub-tensor that is normalized. Use 'hidden_dim' to
            normalize the inner-most dimension of an activation tensor in LLMs.

        weight : Optional[Tensor] = None
            The 'gamma' term in layer-norm. Its shape must be
            'normalized_shape'.

        bias : Optional[Tensor] = None
            The 'beta' term in layer-norm. Its shape must be
            'normalized_shape'.

        eps : float
            The epsilon term to be added to the variance in the squared-root.

        use_diff_of_squares : bool
            Does the plugin use the difference of squares to compute the
            variance?

    Returns:
        The output tensor of that operation.
    '''

    if
 not default_net().plugin_config.layernorm_plugin:
        input
, weight = broadcast_helper(input, weight)
        input
, bias = broadcast_helper(input, bias)
        if
 isinstance(normalized_shape, int):  # FIXME: better way?
            axis = input.ndim() - 1
        else
:
            axis = input.ndim() - len(normalized_shape)
        axes_mask = 0
        for
 i in range(axis, input.ndim()):
            axes_mask |= 1 << i
        layer = default_trtnet().add_normalization(input.trt_tensor,
                                                   weight.trt_tensor,
                                                   bias.trt_tensor, axes_mask)
        layer.epsilon = eps
        return
 _create_tensor(layer.get_output(0), layer)
    else
:
        plg_creator = trt.get_plugin_registry().get_plugin_creator(
            'Layernorm'
, '1', TRT_LLM_PLUGIN_NAMESPACE)
        assert
 plg_creator is not None

        eps = trt.PluginField("eps", np.array(eps, dtype=np.float32),
                              trt.PluginFieldType.FLOAT32)
        use_diff_of_squares = trt.PluginField(
            "use_diff_of_squares"
,
            np.array([int(use_diff_of_squares)], dtype=np.int32),
            trt.PluginFieldType.INT32)
        p_dtype = default_net().plugin_config.layernorm_plugin
        pf_type = trt.PluginField(
            "type_id"
, np.array([int(str_dtype_to_trt(p_dtype))], np.int32),
            trt.PluginFieldType.INT32)
        pfc = trt.PluginFieldCollection([eps, use_diff_of_squares, pf_type])
        layernorm_plug = plg_creator.create_plugin("layernorm", pfc)

        normalized_shape = [normalized_shape] if isinstance(
            normalized_shape, int) else normalized_shape
        if
 weight is None:
            weight = constant(
                np.ones(normalized_shape, dtype=str_dtype_to_np(p_dtype)))
        if
 bias is None:
            bias = constant(
                np.zeros(normalized_shape, dtype=str_dtype_to_np(p_dtype)))

        plug_inputs = [input.trt_tensor, weight.trt_tensor, bias.trt_tensor]
        layer = default_trtnet().add_plugin_v2(plug_inputs, layernorm_plug)
        return
 _create_tensor(layer.get_output(0), layer)
class RmsNorm(Module):

    def
 __init__(self,
                 normalized_shape,
                 eps=1e-06,
                 elementwise_affine=True,
                 dtype=None
):
        super
().__init__()
        if
 isinstance(normalized_shape, int):
            normalized_shape = (normalized_shape, )
        self
.normalized_shape = tuple(normalized_shape)
        self
.elementwise_affine = elementwise_affine
        if
 self.elementwise_affine:
            self
.weight = Parameter(shape=self.normalized_shape, dtype=dtype)
        else
:
            self
.register_parameter('weight', None)

        self
.eps = eps

    def
 forward(self, x):
        weight = None if self.weight is None else self.weight.value
        return
 rms_norm(x, self.normalized_shape, weight, self.eps)

def
 rms_norm(input: Tensor,
             normalized_shape: Union[int, Tuple[int]],
             weight: Optional[Tensor] = None,
             eps: float = 1e-06
) -> Tensor:
    '''
    Add a RMS norm operation on a tensor.

    That operation applies the rms-normalization to its input tensor. In its
    simplest form, for large language models, the 'normalized_shape' should be
    set to the hidden dimension of the activation tensor. Otherwise, it is the
    shape of the normalized fraction of the tensor (starting from the
    right-most dimension).

    The 'weight' tensor corresponds to 'gamma' in the rms-norm formula.
    The 'eps' value is added to the variance before computing the squared-root.

    Parameters:
        input: Tensor
            The tensor to normalize.

        normalized_shape : Union[int, Tuple[int]]
            The shape of the sub-tensor that is normalized. Use 'hidden_dim' to
            normalize the inner-most dimension of an activation tensor in LLMs.

        weight : Optional[Tensor] = None
            The 'gamma' term in layer-norm. Its shape must be
            'normalized_shape'.

        eps : float
            The epsilon term to be added to the variance in the squared-root.weig
    Returns:
        The output tensor of that operation.
    '''

    if
 not default_net().plugin_config.rmsnorm_plugin:
        normalized_shape = [normalized_shape] if isinstance(
            normalized_shape, int) else normalized_shape

        dim = tuple([-i - 1 for i in range(len(normalized_shape))])

        if
 default_net().strongly_typed:
            input_dtype = input.dtype
            fp32_input = cast(input, "float32")
            varx = pow(fp32_input, 2.0)

            varx = varx.mean(dim, keepdim=True)
            denom = varx + eps
            denom = denom.sqrt()
            fp32_y = fp32_input / denom
            y = cast(fp32_y, input_dtype)
        else
:
            with
 precision("float32"):
                varx = pow(input, 2.0)
                varx = varx.mean(dim, keepdim=True)
                denom = varx + eps
                denom = denom.sqrt()
                y = input / denom

        if
 weight is not None:
            y = y * weight

        return
 y
    else
:
        # TODO remove the plugin version if rmsnorm operation can be offloaded

        # to Myelin.

        plg_creator = trt.get_plugin_registry().get_plugin_creator(
            'Rmsnorm'
, '1', TRT_LLM_PLUGIN_NAMESPACE)
        assert
 plg_creator is not None

        eps = trt.PluginField("eps", np.array(eps, dtype=np.float32),
                              trt.PluginFieldType.FLOAT32)
        p_dtype = default_net().plugin_config.rmsnorm_plugin
        pf_type = trt.PluginField(
            "type_id"
, np.array([int(str_dtype_to_trt(p_dtype))], np.int32),
            trt.PluginFieldType.INT32)
        pfc = trt.PluginFieldCollection([eps, pf_type])
        rmsnorm_plug = plg_creator.create_plugin("rmsnorm", pfc)

        normalized_shape = [normalized_shape] if isinstance(
            normalized_shape, int) else normalized_shape
        if
 weight is None:
            weight = constant(
                np.zeros(normalized_shape, dtype=str_dtype_to_np(p_dtype)))

        plug_inputs = [input.trt_tensor, weight.trt_tensor]
        layer = default_trtnet().add_plugin_v2(plug_inputs, rmsnorm_plug)
        return
 _create_tensor(layer.get_output(0), layer)

class
 GroupNorm(Module):

    def
 __init__(self,
                 num_groups,
                 num_channels,
                 eps=1e-05,
                 affine=True,
                 dtype=None
):
        super
().__init__()

        if
 num_channels % num_groups != 0:
            raise
 ValueError('num_channels must be divisible by num_groups')

        self
.num_groups = num_groups
        self
.num_channels = num_channels
        self
.affine = affine

        if
 self.affine:
            self
.weight = Parameter(shape=(self.num_channels, ), dtype=dtype)
            self
.bias = Parameter(shape=(self.num_channels, ), dtype=dtype)
        else
:
            self
.register_parameter('weight', None)
            self
.register_parameter('bias', None)

        self
.eps = eps

    def
 forward(self, x):
        weight = None if self.weight is None else self.weight.value
        bias = None if self.bias is None else self.bias.value
        return
 group_norm(x, self.num_groups, weight, bias, self.eps)


def
 group_norm(input: Tensor,
               num_groups: int,
               weight: Optional[Tensor] = None,
               bias: Optional[Tensor] = None,
               eps: float = 1e-05
):

    ##

    ## TODO: Document that function!

    ##


    assert
 not input.is_dynamic(1) # [B, C, T, D] or [B, C, ...]
    num_channels = input.size()[1]

    ndim = input.ndim()
    old_shape = shape(input)
    new_shape = concat([
        input
.size(0),
        num_groups,
        num_channels // num_groups,
    ] + [input.size(i) for i in range(2, ndim)])
    x = input.view(new_shape) # [B, G, C//G, ...]

    reduce_dim = tuple(range(2, ndim + 1)) # (2, ..., ndim)
    ux = x.mean(reduce_dim, keepdim=True) # [B, G, 1, 1, 1] or [B, G, 1, ..., 1]
    numerator = x - ux
    varx = numerator * numerator
    varx = varx.mean(reduce_dim, keepdim=True)

    denom = varx + eps
    denom = denom.sqrt()
    y = numerator / denom
    y = y.view(old_shape)

    new_shape = concat([num_channels] + [1 for _ in range(2, ndim)]) #[C,1,...1]
    if
 weight is not None:
        y = y * weight.view(new_shape)
    if
 bias is not None:
        y = y + bias.view(new_shape)

    return
 y  # [B, C, T, D] or [B, C, ...]

conv.py

class Conv2d(Module):

    def
 __init__(
            self,
            in_channels: int,
            out_channels: int,
            kernel_size: Tuple[int, int],
            stride: Tuple[int, int] = (1, 1),
            padding: Tuple[int, int] = (0, 0),
            dilation: Tuple[int, int] = (1, 1),
            groups: int = 1,
            bias: bool = True,
            padding_mode: str = 'zeros',  # TODO: refine this type
            dtype=None
) -> None:
        super
().__init__()
        if
 groups <= 0:
            raise
 ValueError('groups must be a positive integer')
        if
 in_channels % groups != 0:
            raise
 ValueError('in_channels must be divisible by groups')
        if
 out_channels % groups != 0:
            raise
 ValueError('out_channels must be divisible by groups')

        self
.in_channels = in_channels
        self
.out_channels = out_channels
        self
.kernel_size = kernel_size
        self
.stride = stride
        self
.padding = padding
        self
.dilation = dilation
        self
.groups = groups
        self
.padding_mode = padding_mode

        self
.weight = Parameter(shape=(out_channels, in_channels // groups,
                                       *kernel_size),
                                dtype=dtype)
        if
 bias:
            self
.bias = Parameter(shape=(out_channels, ), dtype=dtype)
        else
:
            self
.register_parameter('bias', None)

    def
 forward(self, input):
        return
 conv2d(input, self.weight.value,
                      None
 if self.bias is None else self.bias.value,
                      self
.stride, self.padding, self.dilation, self.groups)
class ConvTranspose2d(Module):

    def
 __init__(
            self,
            in_channels: int,
            out_channels: int,
            kernel_size: Tuple[int, int],
            stride: Tuple[int, int] = (1, 1),
            padding: Tuple[int, int] = (0, 0),
            output_padding: Tuple[int, int] = (0, 0),
            dilation: Tuple[int, int] = (1, 1),
            groups: int = 1,
            bias: bool = True,
            padding_mode: str = 'zeros',  # TODO: refine this type
            dtype=None
) -> None:
        super
().__init__()
        if
 groups <= 0:
            raise
 ValueError('groups must be a positive integer')
        if
 in_channels % groups != 0:
            raise
 ValueError('in_channels must be divisible by groups')
        if
 out_channels % groups != 0:
            raise
 ValueError('out_channels must be divisible by groups')

        self
.in_channels = in_channels
        self
.out_channels = out_channels
        self
.kernel_size = kernel_size
        self
.stride = stride
        self
.padding = padding
        self
.output_padding = output_padding
        self
.dilation = dilation
        self
.groups = groups
        self
.padding_mode = padding_mode

        self
.weight = Parameter(shape=(in_channels, out_channels // groups,
                                       *kernel_size),
                                dtype=dtype)

        if
 bias:
            self
.bias = Parameter(shape=(out_channels, ), dtype=dtype)
        else
:
            self
.register_parameter('bias', None)

    # `_output_padding` 方法的主要作用是根据期望的输出尺寸自动计算需要的输出填充值,或者在没有指定输出尺寸时返回预设的输出填充值

    def
 _output_padding(self,
                        input
,  # 输入张量
                        output_size, # 期望的输出尺寸(可选)
                        stride,
                        padding,
                        kernel_size,
                        num_spatial_dims: int, # 空间维度数(2D卷积为2)
                        dilation=None
): # 膨胀率
        if
 output_size is None:
            # 如果用户没有指定output_size,直接返回预设的self.output_padding值

            ret = self.output_padding
        else
:
            # 判断输入张量是否包含batch维度

            has_batch_dim = input.dim() == num_spatial_dims + 2
            num_non_spatial_dims = 2 if has_batch_dim else 1

            # 调整output_size,移除非空间维度

            # 确保output_size只包含空间维度(高度和宽度)

            if
 len(output_size) == num_non_spatial_dims + num_spatial_dims:
                output_size = output_size[num_non_spatial_dims:]
            if
 len(output_size) != num_spatial_dims:
                raise
 ValueError(
                    "ConvTranspose{}D: for {}D input, output_size must have {} or {} elements (got {})"

                    .format(num_spatial_dims, input.dim(), num_spatial_dims,
                            num_non_spatial_dims + num_spatial_dims,
                            len
(output_size)))

                    # 这里使用转置卷积的输出尺寸计算公式来计算每个维度的最小和最大可能输出尺寸

                    # H_out = (H_in - 1) × stride - 2 × padding + dilation × (kernel_size - 1) + 1

            min_sizes = []
            max_sizes = []
            for
 d in range(num_spatial_dims):
                dim_size = (
                    (input.size(d + num_non_spatial_dims) - 1) * stride[d] -
                    2
 * padding[d] +
                    (dilation[d] if dilation is not None else 1) *
                    (kernel_size[d] - 1) + 1)
                min_sizes.append(dim_size)
                max_sizes.append(min_sizes[d] + stride[d] - 1)

                    # 确保请求的output_size在理论上的最小和最大值之间

            for
 i in range(len(output_size)):
                size = output_size[i]
                min_size = min_sizes[i]
                max_size = max_sizes[i]
                if
 size < min_size or size > max_size:
                    raise
 ValueError((
                        "requested an output size of {}, but valid sizes range "

                        "from {} to {} (for an input of {})"
).format(
                            output_size, min_sizes, max_sizes,
                            input
.size()[2:]))

                    # 计算需要添加的output_padding值,即期望尺寸与最小可能尺寸的差值

            res = []
            for
 d in range(num_spatial_dims):
                res.append(output_size[d] - min_sizes[d])

            ret = res
        return
 ret

    def
 forward(self, input, output_size=None):
        num_spatial_dims = 2   # 2D卷积
        # 计算输出填充

        output_padding = self._output_padding(input, output_size, self.stride,
                                              self
.padding, self.kernel_size,
                                              num_spatial_dims, self.dilation)

        return
 conv_transpose2d(input, self.weight.value,
                                None
 if self.bias is None else self.bias.value,
                                self
.stride, self.padding, output_padding,
                                self
.dilation, self.groups)
  1. 1. 解决尺寸歧义问题
    转置卷积的一个特点是相同的输入尺寸在不同参数下可能产生相同的输出尺寸_output_padding方法通过允许用户直接指定期望的输出尺寸来解决这种多对一映射的歧义问题。
  2. 2. 确保尺寸匹配
    在编码器-解码器结构(如U-Net)中,精确的尺寸匹配至关重要。这个方法确保了解卷积后的特征图与编码器中的对应特征图尺寸完全一致,便于后续的拼接或相加操作。
  3. 3. 输出尺寸计算公式
    该方法基于的标准转置卷积输出尺寸公式为:
H_out = (H_in - 1) × stride - 2 × padding + dilation × (kernel_size - 1) + output_padding + 1
  1. 4. 实际应用示例

假设在U-Net模型中,需要确保解码器的输出与编码器的输入尺寸匹配:

# 编码器中的卷积
conv = nn.Conv2d(1, 1, 3, stride=2, padding=1)
# 输入尺寸: 7x7 → 输出尺寸: 4x4


# 解码器中的转置卷积

deconv = nn.ConvTranspose2d(1, 1, 3, stride=2, padding=1)
# 正常情况: 4x4 → 7x7

# 但如果希望输出8x8,就需要使用output_padding

deconv_with_padding = nn.ConvTranspose2d(1, 1, 3, stride=2, padding=1, output_padding=1)
# 结果: 4x4 → 8x8

这种方法为深度学习中的上采样操作提供了精确的尺寸控制,特别在需要尺寸对称的架构中非常有用。

参考文献

  • • https://github.com/NVIDIA/TensorRT-LLM/blob/v0.5.0/tensorrt_llm/functional.py
  • • https://github.com/NVIDIA/TensorRT-LLM/blob/v0.5.0/tensorrt_llm/layers/activation.py
  • • https://www.mindspore.cn/tutorials/zh-CN/r2.7.1/model_infer/ms_infer/ms_infer_parallel_infer.html
  • • https://github.com/NVIDIA/TensorRT-LLM/blob/v0.5.0/tensorrt_llm/layers/embedding.py
  • • https://github.com/NVIDIA/TensorRT-LLM/blob/v0.5.0/tensorrt_llm/layers/linear.py
点个「赞」+「在看」❤️
让我们知道这份文字有温暖到你,也是我们持续创作的最大动力!
推荐
Lock-Free 队列实现原理
Share Memory 的 Bank Conflict
告别高成本!TensorRT-LLM实战:如何将LLM推理速度提升数倍
使用LoRA对LLM进行微调的实用技巧
强化学习小白必看:PTX Loss 到底是个啥?
GPT-5 Prompt Migration and Improvement Using the New Optimizer
Task 异步流 coroutine 实现
C++ corotine 介绍
搭建 VSCode 离线开发环境
nlohmann/json 库简介
Intro to C++ Coroutines: Concept
Hugging Face BPE Tokenizer 的资源文件
移动语义 std::move 和完美转发 std::forward
ACEBench: Who Wins the Match Point in Tool Usage?
什么是 GN
RULER: Relative Universal LLM-Elicited Rewards
SFT和RFT的区别
CosyVoice 3: 面向真实场景的大规模零样本语音生成模型
CosyVoice 3: Towards In-the-wild Speech Generation
语音合成(TTS)中文自然度:问题、成因、解决方案
上下文工程如何实现
上下文工程(Context Engineering)
新手必看!LangGraph 101:手把手教你搭一个深度研究 Agent
LangGraph 简介
SFT 泛化新解读:强化学习 + 奖励修正,一文读懂
程序员狂喜!Self-Instruct 框架全解析:无限生成高质量指令集,从此告别标注噩梦!
Evol-Instruct 竟能精准生成领域专属数据?实操技巧速看!
指令微调数据-少即是多
LLM generate 参数怎么用?
语音合成(TTS)跳跃与重复问题的解析:成因、机制及解决方案
大模型训练新思路:GEPA 靠 “反思” 赢过 RL,看完秒懂
F5-TTS:用 Flow Matching 玩转语音,流畅度和真实感都 “拉满” 了
E2 TTS:令人尴尬地简单、完全非自回归、零样本的语音合成技术
Voicebox:大规模文本引导的多语言通用语音生成技术
为什么都在聊 Kimi K2?Open Agentic Intelligence 藏着哪些新惊喜
Step-Audio-AQAA 端到端音频模型
DPO、PPO、GRPO的原理,区别与联系
OPENCSG 中文语料库:一系列高质量的中文数据集,用于语言模型训练
什么是 Classifier-Free Guidance?
Conditional Flow Matching : 连续标准流 Continuous Normalizing Flow
CFM 与 OT-CFM:条件流匹配与最优传输的碰撞
DPO损失实现
Conditional Flow Matching : 常微分方程ODE、欧拉方法和Neural ODE
当 Normalizing flow 遇上语音生成:AI 说话变 “真人” 的秘密在这里!
深度剖析:Kimi – Audio 中 BigVGAN 的神奇作用
为什么说分布变换是 Normalizing flow 的「灵魂操作」?
MATCHA-TTS 来了!条件流匹配让文本转语音效率飙升
从知识增长的角度提升RAG上下文的质量
MiniMax-Speech,零样本语音合成新突破,32 种语言轻松拿捏!
手把手教你创建 evol-instruct 数据集!附完整流程~
社交类聊天的 Query 分析与应答策略
SFT 中指令选择和响应选择哪个更重要?
角色扮演大模型技术分享2-超拟人模型的困境
最新!SpeechLLM 综述:架构、能力、挑战与未来全揭秘
如何低成本生成高质量指令微调数据?
从数量到质量:通过自引导数据选择来提升语言模型性能以实现指令调优
Kimi-Audio:开源音频基础模型全面解析
Kimi-Audio 的 TTS 效果如何?
Qwen 的训练数据是怎么做的?
GeForce RTX 3090, 4090, A10, A40, A100, A800, L20, L40 显卡性能对比
如何低成本生成高质量指令微调数据?
掌握RAG:投入生产前要评估的8个场景
掌握RAG:如何评估RAG的LLM
掌握RAG:如何在部署后观察您的RAG
掌握RAG:如何选择嵌入模型
基础模型中的新范式:为什么o1是不同的,以及它将如何改变LLM应用
Semantic token和连续特征在SLLM下的对比
从数量到质量:通过自引导数据选择来提升语言模型性能以实现指令调优
RLHF及其变体:进展和实际工程见解
Freeze-Omni: 低延迟语音对话模型
Fully Sharded Data Parallelism (FSDP)
什么是置信度?置信度模型怎么做?
晦涩难懂的 Flow matching!图形化理解
中文指令微调数据,质量就是一切!
基于 LLM 的文本泛化
CosyVoice 2:基于大型语言模型的可扩展流式语音合成技术
Mini-Omni2: with Vision, Speech and Duplex Capabilities
FSQ的原理与VQ-VAE的区别和联系
大模型并行训练的一些知识——极简版
亲测有效!如何用 Address Sanitizer 精准定位内存漏洞?附保姆级操作指南
要用 AI 裁员 50% 的千亿独角兽,公开认错,重启招聘!
single codebook和dual codebook在LLM中向量量化上有什么区别?
一些文档去重算法
最佳的指令数据应当是什么样的?
Prefill-Decode分离
亲测有效!如何用 Address Sanitizer 精准定位内存漏洞?附保姆级操作指南
Simhash-文档去重算法简介
RLHF 入门,高手勿进!
最佳的指令数据应当是什么样的?
CosyVoice:一种基于监督式语义标记的可扩展多语言 Zero-Shot 语音合成器
Model Context Protocol (MCP)
MCP(模型上下文协议)是什么以及它是如何运作的
压力测试LLMs——大海捞针实现