diff --git a/bdd/go/tests/basic_messaging.go b/bdd/go/tests/basic_messaging.go index 971f5abf62..f1fec9fc51 100644 --- a/bdd/go/tests/basic_messaging.go +++ b/bdd/go/tests/basic_messaging.go @@ -209,6 +209,47 @@ func (s basicMessagingSteps) thenLastPolledMessageMatchesSent(ctx context.Contex return nil } +func (s basicMessagingSteps) whenUpdateStreamName(ctx context.Context, newName string) error { + c := getBasicMessagingCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + if err := c.client.UpdateStream(streamIdentifier, newName); err != nil { + return fmt.Errorf("failed to update stream: %w", err) + } + c.lastStreamName = &newName + return nil +} + +func (s basicMessagingSteps) thenStreamNameUpdated(ctx context.Context, expectedName string) error { + c := getBasicMessagingCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + stream, err := c.client.GetStream(streamIdentifier) + if err != nil { + return fmt.Errorf("failed to get stream: %w", err) + } + if stream.Name != expectedName { + return fmt.Errorf("expected stream name %s, got %s", expectedName, stream.Name) + } + return nil +} + +func (s basicMessagingSteps) whenDeleteStream(ctx context.Context) error { + c := getBasicMessagingCtx(ctx) + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + if err := c.client.DeleteStream(streamIdentifier); err != nil { + return fmt.Errorf("failed to delete stream: %w", err) + } + c.lastStreamID = nil + return nil +} + +func (s basicMessagingSteps) thenStreamDeletedSuccessfully(ctx context.Context) error { + c := getBasicMessagingCtx(ctx) + if c.lastStreamID != nil { + return errors.New("stream ID should be nil after deletion") + } + return nil +} + func (s basicMessagingSteps) givenNoStreams(ctx context.Context) error { client := getBasicMessagingCtx(ctx).client streams, err := client.GetStreams() @@ -322,10 +363,20 @@ func initBasicMessagingScenario(sc *godog.ScenarioContext) { sc.Step(`the topic should be created successfully`, s.thenTopicCreatedSuccessfully) sc.Step(`^the topic should have name "([^"]*)"$`, s.thenTopicHasName) sc.Step(`^the topic should have (\d+) partitions$`, s.thenTopicsHasPartitions) + sc.Step(`^I update the stream name to "([^"]*)"$`, s.whenUpdateStreamName) + sc.Step(`^the stream name should be updated to "([^"]*)"$`, s.thenStreamNameUpdated) + sc.Step(`I delete the stream`, s.whenDeleteStream) + sc.Step(`the stream should be deleted successfully`, s.thenStreamDeletedSuccessfully) sc.After(func(ctx context.Context, sc *godog.Scenario, scErr error) (context.Context, error) { c := getBasicMessagingCtx(ctx) - if err := c.client.Close(); err != nil { - scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err)) + if c.client != nil && c.lastStreamID != nil { + streamIdentifier, _ := iggcon.NewIdentifier(*c.lastStreamID) + _ = c.client.DeleteStream(streamIdentifier) + } + if c.client != nil { + if err := c.client.Close(); err != nil { + scErr = errors.Join(scErr, fmt.Errorf("error closing client: %w", err)) + } } return ctx, scErr }) diff --git a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java index 0ad53dad0f..26df74df84 100644 --- a/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java +++ b/bdd/java/src/test/java/org/apache/iggy/bdd/BasicMessagingSteps.java @@ -43,6 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class BasicMessagingSteps { @@ -209,6 +210,30 @@ public void lastPolledMessageMatchesSent() { assertEquals(context.lastSentMessage, lastPayload, "Last message should match sent message"); } + @When("I update the stream name to {string}") + public void updateStreamName(String newName) { + getClient().streams().updateStream(context.lastStreamId, newName); + context.lastStreamName = newName; + } + + @Then("the stream name should be updated to {string}") + public void streamNameUpdated(String expectedName) { + Optional stream = getClient().streams().getStream(context.lastStreamId); + assertTrue(stream.isPresent(), "Stream should exist"); + assertEquals(expectedName, stream.get().name(), "Stream name should be updated"); + } + + @When("I delete the stream") + public void deleteStream() { + getClient().streams().deleteStream(context.lastStreamId); + context.lastStreamId = null; + } + + @Then("the stream should be deleted successfully") + public void streamDeletedSuccessfully() { + assertNull(context.lastStreamId, "Stream should have been deleted"); + } + private IggyBaseClient getClient() { if (context.client == null) { throw new IllegalStateException("Iggy client not initialized"); diff --git a/bdd/python/tests/test_basic_messaging.py b/bdd/python/tests/test_basic_messaging.py index 445c02a97b..d5d4ae002e 100644 --- a/bdd/python/tests/test_basic_messaging.py +++ b/bdd/python/tests/test_basic_messaging.py @@ -272,3 +272,43 @@ def verify_last_message_match(context): last_polled_payload = last_polled.payload().decode("utf-8") assert last_polled_payload == context.last_sent_message + + +@when(parsers.parse('I update the stream name to "{new_name}"')) +def update_stream_name(context, new_name): + """Update the stream name""" + + async def _update(): + await context.client.update_stream(context.last_stream_id, new_name) + context.last_stream_name = new_name + + asyncio.run(_update()) + + +@then(parsers.parse('the stream name should be updated to "{expected_name}"')) +def verify_stream_name_updated(context, expected_name): + """Verify stream name was updated""" + + async def _verify(): + stream = await context.client.get_stream(context.last_stream_id) + assert stream is not None + assert stream.name == expected_name + + asyncio.run(_verify()) + + +@when("I delete the stream") +def delete_stream(context): + """Delete the stream""" + + async def _delete(): + await context.client.delete_stream(context.last_stream_id) + context.last_stream_id = None + + asyncio.run(_delete()) + + +@then("the stream should be deleted successfully") +def verify_stream_deleted(context): + """Verify stream was deleted""" + assert context.last_stream_id is None diff --git a/bdd/rust/tests/steps/streams.rs b/bdd/rust/tests/steps/streams.rs index f24a679199..5603344306 100644 --- a/bdd/rust/tests/steps/streams.rs +++ b/bdd/rust/tests/steps/streams.rs @@ -18,7 +18,7 @@ use crate::common::global_context::GlobalContext; use cucumber::{given, then, when}; -use iggy::prelude::StreamClient; +use iggy::prelude::{Identifier, StreamClient}; #[given("I have no streams in the system")] pub async fn given_no_streams(world: &mut GlobalContext) { @@ -64,3 +64,48 @@ pub async fn then_stream_has_name(world: &mut GlobalContext, expected_name: Stri "Stream should have expected name" ); } + +#[when(regex = r#"^I update the stream name to "([^"]*)"$"#)] +pub async fn when_update_stream_name(world: &mut GlobalContext, new_name: String) { + let client = world.client.as_ref().expect("Client should be available"); + let stream_id = world.last_stream_id.expect("Stream should exist"); + let identifier = Identifier::numeric(stream_id).unwrap(); + client + .update_stream(&identifier, &new_name) + .await + .expect("Should be able to update stream"); + world.last_stream_name = Some(new_name); +} + +#[then(regex = r#"^the stream name should be updated to "([^"]*)"$"#)] +pub async fn then_stream_name_updated(world: &mut GlobalContext, expected_name: String) { + let client = world.client.as_ref().expect("Client should be available"); + let stream_id = world.last_stream_id.expect("Stream should exist"); + let identifier = Identifier::numeric(stream_id).unwrap(); + let stream = client + .get_stream(&identifier) + .await + .expect("Should be able to get stream") + .expect("Stream should exist"); + assert_eq!(stream.name, expected_name, "Stream name should be updated"); +} + +#[when("I delete the stream")] +pub async fn when_delete_stream(world: &mut GlobalContext) { + let client = world.client.as_ref().expect("Client should be available"); + let stream_id = world.last_stream_id.expect("Stream should exist"); + let identifier = Identifier::numeric(stream_id).unwrap(); + client + .delete_stream(&identifier) + .await + .expect("Should be able to delete stream"); + world.last_stream_id = None; +} + +#[then("the stream should be deleted successfully")] +pub async fn then_stream_deleted_successfully(world: &mut GlobalContext) { + assert!( + world.last_stream_id.is_none(), + "Stream should have been deleted" + ); +} diff --git a/bdd/scenarios/basic_messaging.feature b/bdd/scenarios/basic_messaging.feature index e31eea5995..2a86ef0b94 100644 --- a/bdd/scenarios/basic_messaging.feature +++ b/bdd/scenarios/basic_messaging.feature @@ -43,3 +43,9 @@ Feature: Basic Messaging Operations And the messages should have sequential offsets from 0 to 9 And each message should have the expected payload content And the last polled message should match the last sent message + + When I update the stream name to "test-stream-updated" + Then the stream name should be updated to "test-stream-updated" + + When I delete the stream + Then the stream should be deleted successfully diff --git a/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs b/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs index 17742645a0..106e9270e2 100644 --- a/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs +++ b/foreign/csharp/Iggy_SDK.Tests.BDD/StepDefinitions/BasicMessagingOperationsSteps.cs @@ -196,6 +196,38 @@ public void ThenTheLastPolledMessageShouldMatchTheLastSentMessage() lastPolled.Header.Id.ShouldBe(_context.LastSendMessage.Header.Id); lastPolled.Payload.ShouldBe(_context.LastSendMessage.Payload); } + + [When("I update the stream name to {string}")] + public async Task WhenIUpdateTheStreamNameTo(string newName) + { + _context.CreatedStream.ShouldNotBeNull(); + await _context.IggyClient.UpdateStreamAsync( + Identifier.Numeric(_context.CreatedStream!.Id), newName); + _context.CreatedStream = await _context.IggyClient.GetStreamByIdAsync( + Identifier.Numeric(_context.CreatedStream.Id)); + } + + [Then("the stream name should be updated to {string}")] + public void ThenTheStreamNameShouldBeUpdatedTo(string expectedName) + { + _context.CreatedStream.ShouldNotBeNull(); + _context.CreatedStream!.Name.ShouldBe(expectedName); + } + + [When(@"I delete the stream")] + public async Task WhenIDeleteTheStream() + { + _context.CreatedStream.ShouldNotBeNull(); + await _context.IggyClient.DeleteStreamAsync( + Identifier.Numeric(_context.CreatedStream!.Id)); + _context.CreatedStream = null; + } + + [Then(@"the stream should be deleted successfully")] + public void ThenTheStreamShouldBeDeletedSuccessfully() + { + _context.CreatedStream.ShouldBeNull(); + } } // Test context for sharing data between steps diff --git a/foreign/node/src/bdd/stream.ts b/foreign/node/src/bdd/stream.ts index cdfeddf9c9..47c96bc576 100644 --- a/foreign/node/src/bdd/stream.ts +++ b/foreign/node/src/bdd/stream.ts @@ -45,6 +45,41 @@ Then( } ); +When( + 'I update the stream name to {string}', + async function (this: TestWorld, newName: string) { + assert.ok(await this.client.stream.update({ + streamId: this.stream.id, + name: newName + })); + this.stream = { ...this.stream, name: newName }; + } +); + +Then( + 'the stream name should be updated to {string}', + async function (this: TestWorld, expectedName: string) { + const stream = await this.client.stream.get({ streamId: this.stream.id }); + assert.ok(stream, 'Stream should exist after update'); + assert.equal(stream!.name, expectedName); + } +); + +When( + 'I delete the stream', + async function (this: TestWorld) { + assert.ok(await this.client.stream.delete({ streamId: this.stream.id })); + } +); + +Then( + 'the stream should be deleted successfully', + async function (this: TestWorld) { + // If we reached here without error, the stream was deleted successfully + assert.ok(true); + } +); + // Cleanup: delete stream after test Then( 'I can delete stream with ID {int}', diff --git a/foreign/python/apache_iggy.pyi b/foreign/python/apache_iggy.pyi index 0c0e9a1b3d..9e8b8da37b 100644 --- a/foreign/python/apache_iggy.pyi +++ b/foreign/python/apache_iggy.pyi @@ -259,6 +259,22 @@ class IggyClient: Returns Option of stream details or a PyRuntimeError on failure. """ + def update_stream( + self, stream_id: builtins.str | builtins.int, name: builtins.str + ) -> collections.abc.Awaitable[None]: + r""" + Updates a stream's name. + + Returns Ok(()) on successful stream update or a PyRuntimeError on failure. + """ + def delete_stream( + self, stream_id: builtins.str | builtins.int + ) -> collections.abc.Awaitable[None]: + r""" + Deletes a stream by id. + + Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure. + """ def create_topic( self, stream: builtins.str | builtins.int, diff --git a/foreign/python/src/client.rs b/foreign/python/src/client.rs index c849b89109..5b9a872edc 100644 --- a/foreign/python/src/client.rs +++ b/foreign/python/src/client.rs @@ -171,6 +171,48 @@ impl IggyClient { }) } + /// Updates a stream's name. + /// + /// Returns Ok(()) on successful stream update or a PyRuntimeError on failure. + #[pyo3(signature = (stream_id, name))] + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn update_stream<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + name: String, + ) -> PyResult> { + let stream_id = Identifier::from(stream_id); + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .update_stream(&stream_id, &name) + .await + .map_err(|e| PyErr::new::(format!("{e:?}")))?; + Ok(()) + }) + } + + /// Deletes a stream by id. + /// + /// Returns Ok(()) on successful stream deletion or a PyRuntimeError on failure. + #[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[None]", imports=("collections.abc")))] + fn delete_stream<'a>( + &self, + py: Python<'a>, + stream_id: PyIdentifier, + ) -> PyResult> { + let stream_id = Identifier::from(stream_id); + let inner = self.inner.clone(); + future_into_py(py, async move { + inner + .delete_stream(&stream_id) + .await + .map_err(|e| PyErr::new::(format!("{e:?}")))?; + Ok(()) + }) + } + /// Creates a new topic with the given parameters. /// /// Returns Ok(()) on successful topic creation or a PyRuntimeError on failure.