Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Unreleased

- Major change: Extract Avro schema class generation out of Deimos and into `avro-gen-ruby`.

## 2.5.3 - 2026-04-28

- Fix: `setup_karafka` applies the merged kafka config to `Karafka.producer`, so kafka overrides set in later `Karafka::App.setup` calls take effect on `Karafka.producer` callers (Karafka::Web, DLQ, ActiveJob).
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ end
class MyActiveRecordConsumer < Deimos::ActiveRecordConsumer
record_class Widget
# Any method that expects a message payload as a hash will instead
# receive an instance of Deimos::SchemaClass::Record.
# receive an instance of AvroGen::SchemaClass::Record.
def record_attributes(payload, key)
# You can interact with the schema class instance in the following way:
super.merge(:some_field => "some_value-#{payload.test_id}")
Expand Down Expand Up @@ -931,9 +931,9 @@ class MyActiveRecordProducer < Deimos::ActiveRecordProducer
record_class Widget
# @param attributes [Hash]
# @param _record [Widget]
# @return [Deimos::SchemaClass::Record]
# @return [AvroGen::SchemaClass::Record]
def self.generate_payload(attributes, _record)
# This method converts your ActiveRecord into a Deimos::SchemaClass::Record. You will be able to use super
# This method converts your ActiveRecord into a AvroGen::SchemaClass::Record. You will be able to use super
# as an instance of Schemas::MySchema and set values that are not on your ActiveRecord schema.
res = super
res.some_value = "some_value-#{res.test_id}"
Expand Down
1 change: 1 addition & 0 deletions deimos-ruby.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Gem::Specification.new do |spec|
spec.executables = spec.files.grep(%r{^bin/}) { |f| File.basename(f) }
spec.require_paths = ['lib']

spec.add_dependency('avro-gen-ruby')
spec.add_dependency('benchmark', '~> 0.5')
spec.add_dependency('fig_tree', '~> 0.2.0')
spec.add_dependency('karafka', '~> 2.0')
Expand Down
6 changes: 3 additions & 3 deletions lib/deimos.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
require 'deimos/backends/test'

require 'deimos/schema_backends/base'
require 'avro_gen'
require 'deimos/schema_class'
require 'deimos/utils/schema_class'
require 'deimos/schema_class/enum'
require 'deimos/schema_class/record'

require 'deimos/ext/schema_route'
require 'deimos/ext/consumer_route'
Expand Down Expand Up @@ -102,7 +102,7 @@ def schema_backend(schema:, namespace:,
# Initialize an instance of the provided schema
# in the event the schema class is an override, the inherited
# schema and namespace will be applied
schema_class = Utils::SchemaClass.klass(schema, namespace)
schema_class = AvroGen::SchemaClass.klass(schema, namespace)
if schema_class.nil?
schema_backend_class(backend: backend).
new(
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_consume/batch_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def columns(_klass)
def record_key(key)
if key.nil?
{}
elsif key.is_a?(Hash) || key.is_a?(SchemaClass::Record)
elsif key.is_a?(Hash) || key.is_a?(AvroGen::SchemaClass::Record)
self.key_converter.convert(key)
elsif self.topic.key_config[:field].nil?
{ @klass.primary_key => key }
Expand Down
4 changes: 2 additions & 2 deletions lib/deimos/active_record_consume/message_consumption.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ module MessageConsumption
# Default is to use the primary key column and the value of the first
# field in the key.
# @param klass [Class<ActiveRecord::Base>]
# @param _payload [Hash,Deimos::SchemaClass::Record]
# @param _payload [Hash,AvroGen::SchemaClass::Record]
# @param key [Object]
# @return [ActiveRecord::Base]
def fetch_record(klass, _payload, key)
Expand All @@ -23,7 +23,7 @@ def fetch_record(klass, _payload, key)

# Assign a key to a new record.
# @param record [ActiveRecord::Base]
# @param _payload [Hash,Deimos::SchemaClass::Record]
# @param _payload [Hash,AvroGen::SchemaClass::Record]
# @param key [Object]
# @return [void]
def assign_key(record, _payload, key)
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def key_converter

# Override this method (with `super`) if you want to add/change the default
# attributes set to the new/existing record.
# @param payload [Hash,Deimos::SchemaClass::Record]
# @param payload [Hash,AvroGen::SchemaClass::Record]
# @param _key [String]
# @return [Hash]
def record_attributes(payload, _key=nil)
Expand Down
2 changes: 1 addition & 1 deletion lib/deimos/active_record_producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def generate_payload(attributes, _record)
return payload if self.karafka_config.use_schema_classes.nil? &&
!Deimos.config.schema.use_schema_classes

Utils::SchemaClass.instance(payload, encoder.schema, encoder.namespace)
AvroGen::SchemaClass.instance(payload, encoder.schema, encoder.namespace)
end

# Deletion payload for a record by default, delegate to the
Expand Down
13 changes: 7 additions & 6 deletions lib/deimos/config/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ module Deimos
config.deserializers[:payload].try(:reset_backend)
config.deserializers[:key].try(:reset_backend)
end
Deimos::SchemaClass.sync_config!
Comment thread
harsha-flipp marked this conversation as resolved.
if self.config.schema.use_schema_classes
load_generated_schema_classes
end
Expand Down Expand Up @@ -42,16 +43,16 @@ def generate_key_schemas
# Loads generated classes
# @return [void]
def load_generated_schema_classes
if Deimos.config.schema.generated_class_path.nil?
raise 'Cannot use schema classes without schema.generated_class_path. ' \
'Please provide a directory.'
path = AvroGen.config.generated_class_path
if path.nil?
raise 'Cannot use schema classes without a generated class path. ' \
'Please set AvroGen.config.generated_class_path.'
end

Dir["./#{Deimos.config.schema.generated_class_path}/**/*.rb"].
Dir["./#{path}/**/*.rb"].
each { |f| require f }
rescue LoadError
raise 'Cannot load schema classes. Please regenerate classes with' \
'rake deimos:generate_schema_models.'
raise 'Cannot load schema classes. Please regenerate classes with rake avro:generate.'
end

# Ensure everything is set up correctly for the DB backend.
Expand Down
4 changes: 2 additions & 2 deletions lib/deimos/ext/producer_middleware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ module ProducerMiddleware
class << self

def allowed_classes
arr = [Hash, SchemaClass::Record]
arr = [Hash, AvroGen::SchemaClass::Record]
if defined?(Google::Protobuf)
arr.push(Google::Protobuf.const_get(:AbstractMessage))
end
Expand All @@ -26,7 +26,7 @@ def call(message)
self.allowed_classes.none? { |k| message[:payload].is_a?(k) }

payload = message[:payload]
payload = payload.to_h if payload.nil? || payload.is_a?(SchemaClass::Record)
payload = payload.to_h if payload.nil? || payload.is_a?(AvroGen::SchemaClass::Record)
m = Deimos::Message.new(payload,
headers: message[:headers],
partition_key: message[:partition_key])
Expand Down
9 changes: 9 additions & 0 deletions lib/deimos/logging.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ def log_warn(*args)
log_add(:warn, *args)
end

# Emit a deprecation warning at most once per unique message.
# @param msg [String]
def deprecate(msg)
@deprecations ||= Set.new
return unless @deprecations.add?(msg)

warn("DEPRECATION WARNING: #{msg}")
end

def metadata_log_text(metadata)
metadata.to_h.slice(:timestamp, :offset, :first_offset, :last_offset, :partition, :topic, :size)
end
Expand Down
10 changes: 5 additions & 5 deletions lib/deimos/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def partition_key(_payload)
end

# Publish the payload to the topic.
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param payload [Hash, AvroGen::SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
# @param headers [Hash] if specifying headers
# @return [void]
Expand All @@ -99,7 +99,7 @@ def produce(messages, backend: determine_backend_class)
end

# Publish a list of messages.
# @param payloads [Array<Hash, SchemaClass::Record>] with optional payload_key hash key.
# @param payloads [Array<Hash, AvroGen::SchemaClass::Record>] with optional payload_key hash key.
# @param sync [Boolean] if given, override the default setting of
# whether to publish synchronously.
# @param force_send [Boolean] if true, ignore the configured backend
Expand All @@ -112,7 +112,7 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head

messages = Array(payloads).map do |p|
payload = p
payload = payload.to_h if p.is_a?(SchemaClass::Record)
payload = payload.to_h if p.is_a?(AvroGen::SchemaClass::Record)
m = {
payload: payload,
headers: headers,
Expand All @@ -121,10 +121,10 @@ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, head
}
if payload.is_a?(Hash) && payload.key?(:key) && payload.key?(:message)
m[:key] = payload[:key]
m[:key] = m[:key].to_h if m[:key].nil? || m[:key].is_a?(SchemaClass::Record)
m[:key] = m[:key].to_h if m[:key].nil? || m[:key].is_a?(AvroGen::SchemaClass::Record)
m[:payload] = payload[:message]
m[:payload] = m[:payload].to_h if m[:payload].nil? ||
m[:payload].is_a?(SchemaClass::Record)
m[:payload].is_a?(AvroGen::SchemaClass::Record)
end
m
end
Expand Down
43 changes: 7 additions & 36 deletions lib/deimos/schema_backends/avro_base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require_relative 'base'
require 'schema_registry_client'
require 'avro_gen/avro_parser'
require_relative 'avro_schema_coercer'

module Deimos
Expand Down Expand Up @@ -106,55 +107,25 @@ def self.content_type

# @param schema [Avro::Schema::NamedSchema] A named schema
# @return [String]
# @deprecated Use AvroGen::AvroParser.schema_classname instead.
def self.schema_classname(schema)
schema.name.underscore.camelize.singularize
AvroGen::AvroParser.schema_classname(schema)
end

# Converts Avro::Schema::NamedSchema's to String form for generated YARD docs.
# Recursively handles the typing for Arrays, Maps and Unions.
# @param avro_schema [Avro::Schema::NamedSchema]
# @return [String] A string representation of the Type of this SchemaField
# @deprecated Use AvroGen::AvroParser.field_type instead.
def self.field_type(avro_schema)
case avro_schema.type_sym
when :string, :boolean
avro_schema.type_sym.to_s.titleize
when :int, :long
'Integer'
when :float, :double
'Float'
when :record, :enum
schema_classname(avro_schema)
when :array
arr_t = field_type(Deimos::SchemaField.new('n/a', avro_schema.items).type)
"Array<#{arr_t}>"
when :map
map_t = field_type(Deimos::SchemaField.new('n/a', avro_schema.values).type)
"Hash<String, #{map_t}>"
when :union
types = avro_schema.schemas.map do |t|
field_type(Deimos::SchemaField.new('n/a', t).type)
end
types.join(', ')
when :null
'nil'
end
AvroGen::AvroParser.field_type(avro_schema)
end

# Returns the base type of this schema. Decodes Arrays, Maps and Unions
# @param schema [Avro::Schema::NamedSchema]
# @return [Avro::Schema::NamedSchema]
# @deprecated Use AvroGen::AvroParser.schema_base_class instead.
def self.schema_base_class(schema)
case schema.type_sym
when :array
schema_base_class(schema.items)
when :map
schema_base_class(schema.values)
when :union
schema.schemas.map(&method(:schema_base_class)).
reject { |s| s.type_sym == :null }.first
else
schema
end
AvroGen::AvroParser.schema_base_class(schema)
end

def generate_key_schema(field_name)
Expand Down
56 changes: 56 additions & 0 deletions lib/deimos/schema_class.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# frozen_string_literal: true

require 'avro_gen'

module Deimos
# Backwards-compatible shim. Schema class generation moved to the avro-gen-ruby
# gem (namespace AvroGen). Previously-generated classes that reference
# `Deimos::SchemaClass::Record` / `Enum` / `Base` still resolve here, with a
# one-time deprecation warning, via const_missing.
module SchemaClass
DEPRECATED_CONSTANTS = {
Base: AvroGen::SchemaClass::Base,
Record: AvroGen::SchemaClass::Record,
Enum: AvroGen::SchemaClass::Enum
}.freeze

# Maps the moved generation settings from Deimos.config.schema to AvroGen.config.
GENERATION_SETTINGS = {
generated_class_path: :generated_class_path,
nest_child_schemas: :nest_child_schemas,
use_full_namespace: :use_full_namespace,
schema_namespace_map: :schema_namespace_map
}.freeze

# @param name [Symbol]
def self.const_missing(name)
klass = DEPRECATED_CONSTANTS[name]
return super unless klass

Deimos::Logging.deprecate(
"Deimos::SchemaClass::#{name} is deprecated; use AvroGen::SchemaClass::#{name} instead. " \
'Run `rake avro:upgrade` to update generated classes.'
)
# Define the constant so the warning is only emitted once.
const_set(name, klass)
klass
end

# Mirror the (now-delegated) Deimos schema settings onto AvroGen.config.
# Deimos.config.schema stays the source of truth within Deimos, so this runs
# on every configure (keeping a reset in sync). Standalone AvroGen users never
# trigger this and set AvroGen.config directly.
# @!visibility private
def self.sync_config!
schema = Deimos.config.schema
AvroGen.config.schema_path = schema.path
# Refresh AvroGen's cached schema stores on (re)configuration so they don't
# serve stale schemas after the schema path or files change.
AvroGen::SchemaValidator.clear_store_cache!

GENERATION_SETTINGS.each do |deimos_key, avro_key|
AvroGen.config.send("#{avro_key}=", schema.send(deimos_key))
end
end
end
end
62 changes: 0 additions & 62 deletions lib/deimos/schema_class/base.rb

This file was deleted.

Loading
Loading