Skip to main content

Converters and Codecs - Python SDK feature guide

This page shows the following:

  • How to use a custom Payload Codec
  • How to use a custom Payload Converter

Custom Payload Codec

How to use a custom Payload Codec in Python.

Custom Data Converters can change the default Temporal Data Conversion behavior by adding hooks, sending payloads to external storage, or performing different encoding steps. If you only need to change the encoding performed on your payloads -- by adding compression or encryption -- you can override the default Data Converter using by creating a new PayloadCodec.

The PayloadCodec needs to implement encode() and decode() functions at a minimum. These should loop through all of a Workflow's payloads, perform all of your necessary marshalling, compression, or encryption steps in order, and set an "encoding" metadata field. Here is an example that marshals and then compresses a payload using Python's cramjam library to provide snappy compression:

import cramjam
from temporalio.api.common.v1 import Payload
from temporalio.converter import PayloadCodec

class EncryptionCodec(PayloadCodec):

async def encode(self, payloads: Iterable[Payload]) -> List[Payload]:
return [
Payload(
metadata={
"encoding": b"binary/snappy",
},
data=(bytes(cramjam.snappy.compress(p.SerializeToString()))),
)
for p in payloads
]

The decode() function should implement the encode() logic in reverse.

async def decode(self, payloads: Iterable[Payload]) -> List[Payload]:
ret: List[Payload] = []
for p in payloads:
if p.metadata.get("encoding", b"").decode() != "binary/snappy":
ret.append(p)
continue
ret.append(Payload.FromString(bytes(cramjam.snappy.decompress(p.data))))
return ret

This example verifies that an encoded payload matches the binary/snappy filetype -- i.e., that it was encoded using the same custom encode() function -- and if so, performs decompression followed by unmarshaling.

Set Data Converter to use custom Payload Codec

Add a data_converter parameter to your Client.connect() options that overrides the default Converter with your Payload Codec:

from codec import EncryptionCodec

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
temporalio.converter.default(), payload_codec=EncryptionCodec()
),
)

For reference, see the following samples:

Payload conversion

Temporal SDKs provide a default Payload Converter that can be customized to convert a custom data type to Payload and back.

Conversion sequence

The order in which your encoding Payload Converters are applied depend on the order given to the Data Converter. You can set multiple encoding Payload Converters to run your conversions. When the Data Converter receives a value for conversion, it passes through each Payload Converter in sequence until the converter that handles the data type does the conversion.

Custom Payload Converter

How to use a custom Payload Converter in Python.

Data Converters convert raw Temporal payloads to/from actual Python types. A custom Data Converter of type temporalio.converter.DataConverter can be set via the data_converter client parameter.

The default Data Converter supports converting multiple types including:

  • None
  • bytes
  • google.protobuf.message.Message - As JSON when encoding, but has ability to decode binary proto from other languages
  • Anything that can be converted to JSON including:
    • Anything that json.dump supports natively
    • dataclasses
    • Iterables including ones JSON dump may not support by default, e.g. set
    • Any class with a dict() method and a static parse_obj() method, e.g. Pydantic models
      • The default data converter is deprecated for Pydantic models and will warn if used since not all fields work. See this sample for the recommended approach.
    • IntEnum, StrEnum based enumerates
    • UUID

This notably doesn't include any date, time, or datetime objects, as they may not work across SDKs.

Users are strongly encouraged to use a single dataclass for parameter and return types so fields with defaults can be easily added without breaking compatibility. Classes with generics may not have the generics properly resolved. The current implementation does not have generic type resolution. Users should use concrete types.

Custom Type Data Conversion

When converting from JSON, Workflow and Activity type hints are taken into account to convert to the proper types. All common Python typings including Optional, Union, all forms of iterables and mappings, and NewType are supported in addition the regular JSON values mentioned before.

In Python, Data Converters contain a reference to a Payload Converter class that is used to convert input and output payloads. By default, the Payload Converter is a CompositePayloadConverter which contains multiple EncodingPayloadConverters to try to serialize/deserialize payloads. Upon serialization, each EncodingPayloadConverter is used in order until one succeeds.

To implement a custom encoding for a custom type, a new EncodingPayloadConverter can be created. For example, to support IPv4Address types:

class IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/ipv4-address"

def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, ipaddress.IPv4Address):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=str(value).encode(),
)
else:
return None

def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
assert not type_hint or type_hint is ipaddress.IPv4Address
return ipaddress.IPv4Address(payload.data.decode())

class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
IPv4AddressEncodingPayloadConverter(),
*DefaultPayloadConverter.default_encoding_payload_converters,
)

my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)

This is good for many custom types. However, you might need to override the behavior of the just the existing JSON encoding payload converter to support a new type. It is already the last encoding data converter in the list, so it's the fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit of making the type work in lists, unions, etc. The conversion can be customized for serialization with a custom json.JSONEncoder and deserialization with a custom JSONTypeConverter. For example, to support IPv4Address types in existing JSON conversion:

class IPv4AddressJSONEncoder(AdvancedJSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, ipaddress.IPv4Address):
return str(o)
return super().default(o)
class IPv4AddressJSONTypeConverter(JSONTypeConverter):
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
if issubclass(hint, ipaddress.IPv4Address):
return ipaddress.IPv4Address(value)
return JSONTypeConverter.Unhandled

class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Replace default JSON plain with our own that has our encoder and type
# converter
json_converter = JSONPlainPayloadConverter(
encoder=IPv4AddressJSONEncoder,
custom_type_converters=[IPv4AddressJSONTypeConverter()],
)
super().__init__(
*[
c if not isinstance(c, JSONPlainPayloadConverter) else json_converter
for c in DefaultPayloadConverter.default_encoding_payload_converters
]
)

my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)

Now IPv4Address can be used in type hints including collections, optionals, etc.