Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Major change: Extract Avro schema class generation out of Deimos and into `avro-gen-ruby`.

## 2.5.3 - 2026-04-28

- Fix: `setup_karafka` applies the merged kafka config to `Karafka.producer`, so kafka overrides set in later `Karafka::App.setup` calls take effect on `Karafka.producer` callers (Karafka::Web, DLQ, ActiveJob).
Expand Down
29 changes: 20 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -840,19 +840,23 @@ DB backend only when your rake task is running.

# Generated Schema Classes

Deimos offers a way to generate classes from Avro schemas. These classes are documented
with YARD to aid in IDE auto-complete, and will help to move errors closer to the code.
Deimos generates classes from Avro schemas using the
[avro-gen-ruby](https://github.com/flipp-oss/avro-gen-ruby) gem (namespace `AvroGen`).
These classes are documented with YARD to aid in IDE auto-complete, and will help to move
errors closer to the code.

Add the following configurations for schema class generation:
Add the following configurations for schema class generation:

```ruby
config.schema.generated_class_path 'path/to/generated/classes' # Defaults to 'app/lib/schema_classes'
config.avrogen.generated_class_path 'path/to/generated/classes' # Defaults to 'app/lib/schema_classes'
```

Run the following command to generate schema classes in your application. It will generate classes for every configured consumer or producer by `Deimos.configure`:

bundle exec rake deimos:generate_schema_classes

(The standalone `bundle exec rake avro:generate` task generates classes for every schema on disk, without the Kafka-aware key/tombstone handling.)

Add the following configurations to start using generated schema classes in your application's Consumers and Producers:

config.schema.use_schema_classes true
Expand All @@ -864,7 +868,14 @@ Note that if you have a schema in your repo but have not configured a producer o

One additional configuration option indicates whether nested records should be generated as top-level classes or would remain nested inside the generated class for its parent schema. The default is to nest them, as a flattened structure can have one sub-schema clobber another sub-schema defined in a different top-level schema.

config.schema.nest_child_schemas = false # Flatten all classes into one directory
config.avrogen.nest_child_schemas = false # Flatten all classes into one directory

> **Note:** The schema-class generation settings moved from `config.schema.*` to
> `config.avrogen.*`. The old `config.schema.generated_class_path` /
> `nest_child_schemas` / `use_full_namespace` / `schema_namespace_map` settings still
> work but are deprecated. Generated classes now inherit from
> `AvroGen::SchemaClass::Record`/`Enum`; previously-generated files referencing
> `Deimos::SchemaClass::*` still load, and `bundle exec rake avro:upgrade` rewrites them.

You can generate a tombstone message (with only a key and no value) by calling the `YourSchemaClass.tombstone(key)` method. If you're using a `:field` key config, you can pass in just the key scalar value. If using a key schema, you can pass it in as a hash or as another schema class.

Expand All @@ -877,7 +888,7 @@ Examples of consumers would look like this:
```ruby
class MyConsumer < Deimos::Consumer
def consume_message(message)
# Same method as before but message.payload is now an instance of Deimos::SchemaClass::Record
# Same method as before but message.payload is now an instance of AvroGen::SchemaClass::Record
# rather than a hash.
# You can interact with the schema class instance in the following way:
do_something(message.payload.test_id, message.payload.some_int)
Expand All @@ -891,7 +902,7 @@ end
class MyActiveRecordConsumer < Deimos::ActiveRecordConsumer
record_class Widget
# Any method that expects a message payload as a hash will instead
# receive an instance of Deimos::SchemaClass::Record.
# receive an instance of AvroGen::SchemaClass::Record.
def record_attributes(payload, key)
# You can interact with the schema class instance in the following way:
super.merge(:some_field => "some_value-#{payload.test_id}")
Expand Down Expand Up @@ -931,9 +942,9 @@ class MyActiveRecordProducer < Deimos::ActiveRecordProducer
record_class Widget
# @param attributes [Hash]
# @param _record [Widget]
# @return [Deimos::SchemaClass::Record]
# @return [AvroGen::SchemaClass::Record]
def self.generate_payload(attributes, _record)
# This method converts your ActiveRecord into a Deimos::SchemaClass::Record. You will be able to use super
# This method converts your ActiveRecord into a AvroGen::SchemaClass::Record. You will be able to use super
# as an instance of Schemas::MySchema and set values that are not on your ActiveRecord schema.
res = super
res.some_value = "some_value-#{res.test_id}"
Expand Down
1 change: 1 addition & 0 deletions deimos-ruby.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.require_paths = ['lib']

spec.add_dependency('avro-gen-ruby')
spec.add_dependency('benchmark', '~> 0.5')
spec.add_dependency('fig_tree', '~> 0.2.0')
spec.add_dependency('karafka', '~> 2.0')
Expand Down
15 changes: 11 additions & 4 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,17 @@ things you need to reference into local variables before calling `configure`.
| schema.password | nil | Basic auth password. |
| schema.path | nil | Local path to find your schemas. |
| schema.use_schema_classes | false | Set this to true to use generated schema classes in your application. |
| schema.generated_class_path | `app/lib/schema_classes` | Local path to generated schema classes. |
| schema.nest_child_schemas | false | Set to true to nest subschemas within the generated class for the parent schema. |
| schema.use_full_namespace | false | Set to true to generate folders for schemas matching the full namespace. |
| schema.schema_namespace_map | {} | A map of namespace prefixes to base module name(s). Example: { 'com.mycompany.suborg' => ['SchemaClasses'] }. Requires `use_full_namespace` to be true. |

#### Schema Class Generation (`avrogen`)

Schema class generation is provided by the [avro-gen-ruby](https://github.com/flipp-oss/avro-gen-ruby) gem (namespace `AvroGen`). These settings are forwarded to `AvroGen.config`. The equivalent `schema.*` settings still work but are deprecated — using one prints a warning pointing to the `avrogen.*` setting, and you can migrate generated files with `rake avro:upgrade`.

| Config name | Default | Description |
|-------------------------------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
| avrogen.generated_class_path | `app/lib/schema_classes` | Local path to generated schema classes. (was `schema.generated_class_path`) |
| avrogen.nest_child_schemas | true | Set to true to nest subschemas within the generated class for the parent schema. (was `schema.nest_child_schemas`) |
| avrogen.use_full_namespace | false | Set to true to generate folders for schemas matching the full namespace. (was `schema.use_full_namespace`) |
| avrogen.schema_namespace_map | {} | A map of namespace prefixes to base module name(s). Example: { 'com.mycompany.suborg' => ['SchemaClasses'] }. Requires `use_full_namespace` to be true. (was `schema.schema_namespace_map`) |

### Outbox Configuration

Expand Down
6 changes: 3 additions & 3 deletions lib/deimos.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
require 'deimos/backends/test'

require 'deimos/schema_backends/base'
require 'avro_gen'
require 'deimos/schema_class'
require 'deimos/utils/schema_class'
require 'deimos/schema_class/enum'
require 'deimos/schema_class/record'

require 'deimos/ext/schema_route'
require 'deimos/ext/consumer_route'
Expand Down Expand Up @@ -102,7 +102,7 @@ def schema_backend(schema:, namespace:,
# Initialize an instance of the provided schema
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = Utils::SchemaClass.klass(schema, namespace)
schema_class = AvroGen::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class(backend: backend).
new(
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def columns(_klass)
def record_key(key)
if key.nil?
{}
elsif key.is_a?(Hash) || key.is_a?(SchemaClass::Record)
elsif key.is_a?(Hash) || key.is_a?(AvroGen::SchemaClass::Record)
self.key_converter.convert(key)
elsif self.topic.key_config[:field].nil?
{ @klass.primary_key => key }
Expand Down
4 changes: 2 additions & 2 deletions lib/deimos/active_record_consume/message_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module MessageConsumption
# Default is to use the primary key column and the value of the first
# field in the key.
# @param klass [Class<ActiveRecord::Base>]
# @param _payload [Hash,Deimos::SchemaClass::Record]
# @param _payload [Hash,AvroGen::SchemaClass::Record]
# @param key [Object]
# @return [ActiveRecord::Base]
def fetch_record(klass, _payload, key)
Expand All @@ -23,7 +23,7 @@ def fetch_record(klass, _payload, key)

# Assign a key to a new record.
# @param record [ActiveRecord::Base]
# @param _payload [Hash,Deimos::SchemaClass::Record]
# @param _payload [Hash,AvroGen::SchemaClass::Record]
# @param key [Object]
# @return [void]
def assign_key(record, _payload, key)
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def key_converter

# Override this method (with `super`) if you want to add/change the default
# attributes set to the new/existing record.
# @param payload [Hash,Deimos::SchemaClass::Record]
# @param payload [Hash,AvroGen::SchemaClass::Record]
# @param _key [String]
# @return [Hash]
def record_attributes(payload, _key=nil)
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def generate_payload(attributes, _record)
return payload if self.karafka_config.use_schema_classes.nil? &&
!Deimos.config.schema.use_schema_classes

Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace)
AvroGen::SchemaClass.instance(payload, encoder.schema, encoder.namespace)
end

# Deletion payload for a record by default, delegate to the
Expand Down
45 changes: 34 additions & 11 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Deimos
config.deserializers[:payload].try(:reset_backend)
config.deserializers[:key].try(:reset_backend)
end
Deimos::SchemaClass.sync_config!
Comment thread
harsha-flipp marked this conversation as resolved.
if self.config.schema.use_schema_classes
load_generated_schema_classes
end
Expand Down Expand Up @@ -42,16 +43,16 @@ def generate_key_schemas
# Loads generated classes
# @return [void]
def load_generated_schema_classes
if Deimos.config.schema.generated_class_path.nil?
raise 'Cannot use schema classes without schema.generated_class_path. ' \
'Please provide a directory.'
path = AvroGen.config.generated_class_path
if path.nil?
raise 'Cannot use schema classes without a generated class path. ' \
'Please set AvroGen.config.generated_class_path.'
end

Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].
Dir["./#{path}/**/*.rb"].
each { |f| require f }
rescue LoadError
raise 'Cannot load schema classes. Please regenerate classes with' \
'rake deimos:generate_schema_models.'
raise 'Cannot load schema classes. Please regenerate classes with rake avro:generate.'
end

# Ensure everything is set up correctly for the DB backend.
Expand Down Expand Up @@ -173,14 +174,39 @@ def validate_outbox_backend
# @return [String]
setting :path

# Local path for schema classes to be generated in.
# @deprecated Use config.avrogen.generated_class_path instead.
# @return [String]
setting :generated_class_path, 'app/lib/schema_classes'

# Set to true to use the generated schema classes in your application.
# @return [Boolean]
setting :use_schema_classes

# @deprecated Use config.avrogen.nest_child_schemas instead.
# @return [Boolean]
setting :nest_child_schemas, true

# @deprecated Use config.avrogen.use_full_namespace instead.
# @return [Boolean]
setting :use_full_namespace, false

# @deprecated Use config.avrogen.schema_namespace_map instead.
# @return [Hash]
setting :schema_namespace_map, {}

# The base directory for generated protobuf key schemas.
setting :proto_schema_key_path, 'protos'
end

# Schema class generation settings, forwarded to the avro-gen-ruby gem
# (AvroGen.config). These previously lived under `schema` (still supported,
# but deprecated).
setting :avrogen do

# Local path for schema classes to be generated in.
# @return [String]
setting :generated_class_path, 'app/lib/schema_classes'

# Set to false to generate child schemas as their own files.
# @return [Boolean]
setting :nest_child_schemas, true
Expand All @@ -191,12 +217,9 @@ def validate_outbox_backend

# Use this option to reduce nesting when using use_full_namespace.
# For example: { 'com.mycompany.suborg' => 'SchemaClasses' }
# would replace a prefixed with the given key with the module name SchemaClasses.
# would replace a prefix matching the given key with the module name SchemaClasses.
# @return [Hash]
setting :schema_namespace_map, {}

# The base directory for generated protobuf key schemas.
setting :proto_schema_key_path, 'protos'
end

# The configured metrics provider.
Expand Down
4 changes: 2 additions & 2 deletions lib/deimos/ext/producer_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module ProducerMiddleware
class << self

def allowed_classes
arr = [Hash, SchemaClass::Record]
arr = [Hash, AvroGen::SchemaClass::Record]
if defined?(Google::Protobuf)
arr.push(Google::Protobuf.const_get(:AbstractMessage))
end
Expand All @@ -26,7 +26,7 @@ def call(message)
self.allowed_classes.none? { |k| message[:payload].is_a?(k) }

payload = message[:payload]
payload = payload.to_h if payload.nil? || payload.is_a?(SchemaClass::Record)
payload = payload.to_h if payload.nil? || payload.is_a?(AvroGen::SchemaClass::Record)
m = Deimos::Message.new(payload,
headers: message[:headers],
partition_key: message[:partition_key])
Expand Down
9 changes: 9 additions & 0 deletions lib/deimos/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ def log_warn(*args)
log_add(:warn, *args)
end

# Emit a deprecation warning at most once per unique message.
# @param msg [String]
def deprecate(msg)
@deprecations ||= Set.new
return unless @deprecations.add?(msg)

warn("DEPRECATION WARNING: #{msg}")
end

def metadata_log_text(metadata)
metadata.to_h.slice(:timestamp, :offset, :first_offset, :last_offset, :partition, :topic, :size)
end
Expand Down
10 changes: 5 additions & 5 deletions lib/deimos/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def partition_key(_payload)
end

# Publish the payload to the topic.
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param payload [Hash, AvroGen::SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
# @param headers [Hash] if specifying headers
# @return [void]
Expand All @@ -99,7 +99,7 @@ def produce(messages, backend: determine_backend_class)
end

# Publish a list of messages.
# @param payloads [Array<Hash, SchemaClass::Record>] with optional payload_key hash key.
# @param payloads [Array<Hash, AvroGen::SchemaClass::Record>] with optional payload_key hash key.
# @param sync [Boolean] if given, override the default setting of
# whether to publish synchronously.
# @param force_send [Boolean] if true, ignore the configured backend
Expand All @@ -112,7 +112,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head

messages = Array(payloads).map do |p|
payload = p
payload = payload.to_h if p.is_a?(SchemaClass::Record)
payload = payload.to_h if p.is_a?(AvroGen::SchemaClass::Record)
m = {
payload: payload,
headers: headers,
Expand All @@ -121,10 +121,10 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head
}
if payload.is_a?(Hash) && payload.key?(:key) && payload.key?(:message)
m[:key] = payload[:key]
m[:key] = m[:key].to_h if m[:key].nil? || m[:key].is_a?(SchemaClass::Record)
m[:key] = m[:key].to_h if m[:key].nil? || m[:key].is_a?(AvroGen::SchemaClass::Record)
m[:payload] = payload[:message]
m[:payload] = m[:payload].to_h if m[:payload].nil? ||
m[:payload].is_a?(SchemaClass::Record)
m[:payload].is_a?(AvroGen::SchemaClass::Record)
end
m
end
Expand Down
Loading
Loading