diff --git a/changelog/README.md b/changelog/README.md
index 40a8430406b..42f7ac59eb4 100644
--- a/changelog/README.md
+++ b/changelog/README.md
@@ -34,6 +34,7 @@
- [bug] JAVA-1070: The Mapper should not prepare queries synchronously.
- [new feature] JAVA-982: Introduce new method ConsistencyLevel.isSerial().
- [bug] JAVA-764: Retry with the normal consistency level (not the serial one) when a write times out on the Paxos phase.
+- [bug] JAVA-727: Allow monotonic timestamp generators to drift in the future + use microsecond precision when possible.
Merged from 2.0 branch:
diff --git a/driver-core/pom.xml b/driver-core/pom.xml
index 18297d33bce..66a5eb0fd31 100644
--- a/driver-core/pom.xml
+++ b/driver-core/pom.xml
@@ -51,6 +51,12 @@
${metrics.version}
+
+ com.github.jnr
+ jnr-ffi
+ ${jnr-ffi.version}
+
+
@@ -170,7 +176,9 @@
com.datastax.driver.core
${project.version}
<_include>-osgi.bnd
-
+
+
+
jar
@@ -189,7 +197,12 @@
${project.build.directory}/META-INF-shaded
-
+
+
+
com.datastax.shaded.*
diff --git a/driver-core/src/main/java/com/datastax/driver/core/AbstractMonotonicTimestampGenerator.java b/driver-core/src/main/java/com/datastax/driver/core/AbstractMonotonicTimestampGenerator.java
new file mode 100644
index 00000000000..4b4688c1fc9
--- /dev/null
+++ b/driver-core/src/main/java/com/datastax/driver/core/AbstractMonotonicTimestampGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright (C) 2012-2015 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Base implementation for monotonic timestamp generators.
+ *
+ * The accuracy of the generated timestamps is largely dependent on the
+ * granularity of the underlying operating system's clock.
+ *
+ * Generally speaking, this granularity is millisecond, and
+ * the sub-millisecond part is simply a counter that gets incremented
+ * until the next clock tick, as provided by {@link System#currentTimeMillis()}.
+ *
+ * On some systems, however, it is possible to have a better granularity by using a JNR
+ * call to {@code gettimeofday}. The driver will use this system call automatically whenever
+ * available, unless the system property {@code com.datastax.driver.USE_NATIVE_CLOCK} is
+ * explicitly set to {@code false}.
+ *
+ * Beware that to guarantee monotonicity, if more than one call to {@link #next()}
+ * is made within the same microsecond, or in the event of a system clock skew, this generator might
+ * return timestamps that drift out in the future.
+ * Whe this happens, {@link #onDrift(long, long)} is invoked.
+ */
+public abstract class AbstractMonotonicTimestampGenerator implements TimestampGenerator {
+
+ @VisibleForTesting
+ volatile Clock clock = ClockFactory.newInstance();
+
+ /**
+ * Compute the next timestamp, given the last timestamp previously generated.
+ *
+ * To guarantee monotonicity, the next timestamp should be strictly greater than the last one.
+ * If the underlying clock fails to generate monotonically increasing timestamps, the generator will simply
+ * increment the previous timestamp, and {@link #onDrift(long, long)} will be invoked.
+ *
+ * This implementation is inspired by {@code org.apache.cassandra.service.ClientState#getTimestamp()}.
+ *
+ * @param last the last timestamp generated by this generator, in microseconds.
+ * @return the next timestamp to use, in microseconds.
+ */
+ protected long computeNext(long last) {
+ long currentTick = clock.currentTimeMicros();
+ if (last >= currentTick) {
+ onDrift(currentTick, last);
+ return last + 1;
+ }
+ return currentTick;
+ }
+
+ /**
+ * Called when generated timestamps drift into the future compared to the underlying clock (in other words, if
+ * {@code lastTimestamp >= currentTick}).
+ *
+ * This could happen if timestamps are requested faster than the clock granularity, or on a clock skew (for example
+ * because of a leap second).
+ *
+ * @param currentTick the current clock tick, in microseconds.
+ * @param lastTimestamp the last timestamp that was generated, in microseconds.
+ */
+ protected abstract void onDrift(long currentTick, long lastTimestamp);
+}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/AbstractTimestampGenerator.java b/driver-core/src/main/java/com/datastax/driver/core/AbstractTimestampGenerator.java
deleted file mode 100644
index 2104940c991..00000000000
--- a/driver-core/src/main/java/com/datastax/driver/core/AbstractTimestampGenerator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright (C) 2012-2015 DataStax Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datastax.driver.core;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Base implementation for generators based on {@link System#currentTimeMillis()} and a counter to generate
- * the sub-millisecond part.
- */
-abstract class AbstractMonotonicTimestampGenerator implements TimestampGenerator {
- private static final Logger logger = LoggerFactory.getLogger(AbstractMonotonicTimestampGenerator.class);
-
- volatile Clock clock = new SystemClock();
-
- protected long computeNext(long last) {
- long millis = last / 1000;
- long counter = last % 1000;
-
- long now = clock.currentTime();
-
- // System.currentTimeMillis can go backwards on an NTP resync, hence the ">" below
- if (millis >= now) {
- if (counter == 999)
- logger.warn("Sub-millisecond counter overflowed, some query timestamps will not be distinct");
- else
- counter += 1;
- } else {
- millis = now;
- counter = 0;
- }
-
- return millis * 1000 + counter;
- }
-}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/AtomicMonotonicTimestampGenerator.java b/driver-core/src/main/java/com/datastax/driver/core/AtomicMonotonicTimestampGenerator.java
index bcf8cca9253..57ba547cab8 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/AtomicMonotonicTimestampGenerator.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/AtomicMonotonicTimestampGenerator.java
@@ -15,22 +15,41 @@
*/
package com.datastax.driver.core;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
- * A timestamp generator based on {@code System.currentTimeMillis()}, with an incrementing atomic counter
- * to generate the sub-millisecond part.
- *
- * This implementation guarantees incrementing timestamps among all client threads, provided that no more than
- * 1000 are requested for a given clock tick (the exact granularity of of {@link System#currentTimeMillis()}
- * depends on the operating system).
- *
- * If that rate is exceeded, a warning is logged and the timestamps don't increment anymore until the next clock
- * tick. If you consistently exceed that rate, consider using {@link ThreadLocalMonotonicTimestampGenerator}.
+ * A timestamp generator that guarantees monotonically increasing timestamps among all client threads, and logs warnings
+ * when timestamps drift in the future.
+ *
+ * @see AbstractMonotonicTimestampGenerator
*/
-public class AtomicMonotonicTimestampGenerator extends AbstractMonotonicTimestampGenerator {
+public class AtomicMonotonicTimestampGenerator extends LoggingMonotonicTimestampGenerator {
+
private AtomicLong lastRef = new AtomicLong(0);
+ /**
+ * Creates a new instance with a warning threshold and warning interval of one second.
+ *
+ * @see #AtomicMonotonicTimestampGenerator(long, TimeUnit, long, TimeUnit)
+ */
+ public AtomicMonotonicTimestampGenerator() {
+ this(1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param warningThreshold how far in the future timestamps are allowed to drift before a warning is logged.
+ * @param warningThresholdUnit the unit for {@code warningThreshold}.
+ * @param warningInterval how often the warning will be logged if timestamps keep drifting above the threshold.
+ * @param warningIntervalUnit the unit for {@code warningIntervalUnit}.
+ */
+ public AtomicMonotonicTimestampGenerator(long warningThreshold, TimeUnit warningThresholdUnit,
+ long warningInterval, TimeUnit warningIntervalUnit) {
+ super(warningThreshold, warningThresholdUnit, warningInterval, warningIntervalUnit);
+ }
+
@Override
public long next() {
while (true) {
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Clock.java b/driver-core/src/main/java/com/datastax/driver/core/Clock.java
index 75d67e5e7fe..4ff922654ea 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/Clock.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/Clock.java
@@ -15,25 +15,120 @@
*/
package com.datastax.driver.core;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.concurrent.TimeUnit.*;
+
/**
- * This interface allows us not to have a direct call to {@code System.currentTimeMillis()} for testing purposes
+ * A small abstraction around system clock that aims to provide microsecond precision with the best accuracy possible.
*/
interface Clock {
+
/**
- * Returns the current time in milliseconds
+ * Returns the current time in microseconds.
*
- * @return the difference, measured in milliseconds, between the current time and midnight, January 1, 1970 UTC.
- * @see System#currentTimeMillis()
+ * @return the difference, measured in microseconds, between the current time and and the Epoch
+ * (that is, midnight, January 1, 1970 UTC).
*/
- long currentTime();
+ long currentTimeMicros();
+}
+
+/**
+ * Factory that returns the best Clock implementation depending on what native libraries are available in the system.
+ * If LibC is available through JNR, and if the system property {@code com.datastax.driver.USE_NATIVE_CLOCK} is set to {@code true}
+ * (which is the default value), then {@link NativeClock} is returned, otherwise {@link SystemClock} is returned.
+ */
+class ClockFactory {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ClockFactory.class);
+
+ private static final String USE_NATIVE_CLOCK_SYSTEM_PROPERTY = "com.datastax.driver.USE_NATIVE_CLOCK";
+
+ static Clock newInstance() {
+ if (Native.isLibCLoaded() && SystemProperties.getBoolean(USE_NATIVE_CLOCK_SYSTEM_PROPERTY, true)) {
+ LOGGER.info("Using native clock to generate timestamps.");
+ return new NativeClock();
+ } else {
+ LOGGER.info("Using java.lang.System clock to generate timestamps.");
+ return new SystemClock();
+ }
+ }
+
}
/**
- * Default implementation of a clock that delegate its calls to the system clock.
+ * Default implementation of a clock that delegates its calls to the system clock.
+ *
+ * @see System#currentTimeMillis()
*/
class SystemClock implements Clock {
+
@Override
- public long currentTime() {
- return System.currentTimeMillis();
+ public long currentTimeMicros() {
+ return System.currentTimeMillis() * 1000;
}
-}
\ No newline at end of file
+
+}
+
+/**
+ * Provides the current time with microseconds precision with some reasonable accuracy through
+ * the use of {@link Native#currentTimeMicros()}.
+ *
+ * Because calling JNR methods is slightly expensive,
+ * we only call it once per second and add the number of nanoseconds since the last call
+ * to get the current time, which is good enough an accuracy for our purpose (see CASSANDRA-6106).
+ *
+ * This reduces the cost of the call to {@link NativeClock#currentTimeMicros()} to levels comparable
+ * to those of a call to {@link System#currentTimeMillis()}.
+ */
+class NativeClock implements Clock {
+
+ private static final long ONE_SECOND_NS = NANOSECONDS.convert(1, SECONDS);
+ private static final long ONE_MILLISECOND_NS = NANOSECONDS.convert(1, MILLISECONDS);
+
+ /**
+ * Records a time in micros along with the System.nanoTime() value at the time the
+ * time is fetched.
+ */
+ private static class FetchedTime {
+
+ private final long timeInMicros;
+ private final long nanoTimeAtCheck;
+
+ private FetchedTime(long timeInMicros, long nanoTimeAtCheck) {
+ this.timeInMicros = timeInMicros;
+ this.nanoTimeAtCheck = nanoTimeAtCheck;
+ }
+ }
+
+ private final AtomicReference lastFetchedTime = new AtomicReference(fetchTimeMicros());
+
+ @Override
+ public long currentTimeMicros() {
+ FetchedTime spec = lastFetchedTime.get();
+ long curNano = System.nanoTime();
+ if (curNano > spec.nanoTimeAtCheck + ONE_SECOND_NS) {
+ lastFetchedTime.compareAndSet(spec, spec = fetchTimeMicros());
+ }
+ return spec.timeInMicros + ((curNano - spec.nanoTimeAtCheck) / 1000);
+ }
+
+ private static FetchedTime fetchTimeMicros() {
+ // To compensate for the fact that the Native.currentTimeMicros call could take
+ // some time, instead of picking the nano time before the call or after the
+ // call, we take the average of both.
+ long start = System.nanoTime();
+ long micros = Native.currentTimeMicros();
+ long end = System.nanoTime();
+ // If it turns out the call took us more than 1 millisecond (can happen while
+ // the JVM warms up, unlikely otherwise, but no reasons to take risks), fall back
+ // to System.currentTimeMillis() temporarily
+ if ((end - start) > ONE_MILLISECOND_NS)
+ return new FetchedTime(System.currentTimeMillis() * 1000, System.nanoTime());
+ return new FetchedTime(micros, (end + start) / 2);
+ }
+
+}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/LoggingMonotonicTimestampGenerator.java b/driver-core/src/main/java/com/datastax/driver/core/LoggingMonotonicTimestampGenerator.java
new file mode 100644
index 00000000000..5bb44e8a1c7
--- /dev/null
+++ b/driver-core/src/main/java/com/datastax/driver/core/LoggingMonotonicTimestampGenerator.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (C) 2012-2015 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * A monotonic timestamp generator that logs warnings when timestamps drift in the future
+ * (see this class's constructors and {@link #onDrift(long, long)} for more information).
+ */
+public abstract class LoggingMonotonicTimestampGenerator extends AbstractMonotonicTimestampGenerator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TimestampGenerator.class);
+
+ private final long warningThresholdMicros;
+ private final long warningIntervalMillis;
+
+ private final AtomicLong lastDriftWarning = new AtomicLong(Long.MIN_VALUE);
+
+ /**
+ * Creates a new instance.
+ *
+ * @param warningThreshold how far in the future timestamps are allowed to drift before a warning is logged.
+ * @param warningThresholdUnit the unit for {@code warningThreshold}.
+ * @param warningInterval how often the warning will be logged if timestamps keep drifting above the threshold.
+ * @param warningIntervalUnit the unit for {@code warningIntervalUnit}.
+ */
+ protected LoggingMonotonicTimestampGenerator(
+ long warningThreshold, TimeUnit warningThresholdUnit,
+ long warningInterval, TimeUnit warningIntervalUnit) {
+ this.warningThresholdMicros = MICROSECONDS.convert(warningThreshold, warningThresholdUnit);
+ this.warningIntervalMillis = MILLISECONDS.convert(warningInterval, warningIntervalUnit);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * This implementation logs a warning at regular intervals when timestamps drift more than a specified threshold in
+ * the future. These messages are emitted at {@code WARN} level in the category
+ * {@code com.datastax.driver.core.TimestampGenerator}.
+ *
+ * @param currentTick the current clock tick.
+ * @param lastTimestamp the last timestamp that was generated.
+ */
+ protected void onDrift(long currentTick, long lastTimestamp) {
+ if (LOGGER.isWarnEnabled() && warningThresholdMicros >= 0 && lastTimestamp > currentTick + warningThresholdMicros) {
+ long now = System.currentTimeMillis();
+ long lastWarning = lastDriftWarning.get();
+ if (now > lastWarning + warningIntervalMillis && lastDriftWarning.compareAndSet(lastWarning, now)) {
+ LOGGER.warn(
+ "Clock skew detected: current tick ({}) was {} microseconds behind the last generated timestamp ({}), " +
+ "returned timestamps will be artificially incremented to guarantee monotonicity.",
+ currentTick, lastTimestamp - currentTick, lastTimestamp);
+ }
+ }
+ }
+}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/Native.java b/driver-core/src/main/java/com/datastax/driver/core/Native.java
new file mode 100644
index 00000000000..f638082e384
--- /dev/null
+++ b/driver-core/src/main/java/com/datastax/driver/core/Native.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright (C) 2012-2015 DataStax Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datastax.driver.core;
+
+import jnr.ffi.LibraryLoader;
+import jnr.ffi.Pointer;
+import jnr.ffi.Runtime;
+import jnr.ffi.Struct;
+import jnr.ffi.annotations.Out;
+import jnr.ffi.annotations.Transient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to deal with native system call through JNR.
+ */
+class Native {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Native.class);
+
+ /**
+ * Interface for LIBC calls through JNR.
+ * Note that this interface must be declared public.
+ */
+ public interface LibC {
+
+ /**
+ * Timeval struct.
+ *
+ * @see GETTIMEOFDAY(2)
+ */
+ class Timeval extends Struct {
+
+ public final time_t tv_sec = new time_t();
+
+ public final Unsigned32 tv_usec = new Unsigned32();
+
+ public Timeval(Runtime runtime) {
+ super(runtime);
+ }
+ }
+
+ /**
+ * JNR call to {@code gettimeofday}.
+ *
+ * @param tv Timeval struct
+ * @param unused Timezone struct (unused)
+ * @return 0 for success, or -1 for failure
+ * @see GETTIMEOFDAY(2)
+ */
+ int gettimeofday(@Out @Transient Timeval tv, Pointer unused);
+ }
+
+ private static final LibC LIB_C;
+
+ private static final Runtime LIB_C_RUNTIME;
+
+ static {
+ LibC libc = null;
+ Runtime runtime = null;
+ try {
+ libc = LibraryLoader.create(LibC.class).load("c");
+ runtime = Runtime.getRuntime(libc);
+ } catch (Throwable t) {
+ if (LOGGER.isDebugEnabled())
+ LOGGER.debug("Could not load JNR LibC Library, native calls will not be available", t);
+ else
+ LOGGER.info("Could not load JNR LibC Library, native calls will not be available (set this logger level to DEBUG to see the full stack trace)");
+ }
+ LIB_C = libc;
+ LIB_C_RUNTIME = runtime;
+ }
+
+ /**
+ * Returns true if LibC could be loaded with JNR.
+ *
+ * @return true if LibC could be loaded with JNR, false otherwise.
+ */
+ static boolean isLibCLoaded() {
+ return LIB_C_RUNTIME != null;
+ }
+
+ /**
+ * Returns the current timestamp with microsecond precision
+ * via a system call to {@code gettimeofday}.
+ *
+ * @return the current timestamp with microsecond precision.
+ */
+ static long currentTimeMicros() {
+ LibC.Timeval tv = new LibC.Timeval(LIB_C_RUNTIME);
+ if (LIB_C.gettimeofday(tv, null) != 0)
+ LOGGER.error("gettimeofday failed");
+ return tv.tv_sec.get() * 1000000 + tv.tv_usec.get();
+ }
+
+}
diff --git a/driver-core/src/main/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGenerator.java b/driver-core/src/main/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGenerator.java
index 185c28abab9..591c8099dbb 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGenerator.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGenerator.java
@@ -16,22 +16,46 @@
package com.datastax.driver.core;
+import java.util.concurrent.TimeUnit;
+
/**
- * A timestamp generator based on {@code System.currentTimeMillis()}, with an incrementing thread-local counter
- * to generate the sub-millisecond part.
- *
- * This implementation guarantees incrementing timestamps for a given client thread, provided that no more than
- * 1000 are requested for a given clock tick (the exact granularity of of {@link System#currentTimeMillis()}
- * depends on the operating system).
+ * A timestamp generator that guarantees monotonically increasing timestamps on a per-thread basis, and logs warnings
+ * when timestamps drift in the future.
*
- * If that rate is exceeded, a warning is logged and the timestamps don't increment anymore until the next clock
- * tick.
+ * Beware that there is a risk of timestamp collision with this generator when accessed
+ * by more than one thread at a time; only use it when threads are not in direct competition
+ * for timestamp ties (i.e., they are executing independent statements).
+ *
+ * @see AbstractMonotonicTimestampGenerator
*/
-public class ThreadLocalMonotonicTimestampGenerator extends AbstractMonotonicTimestampGenerator {
+public class ThreadLocalMonotonicTimestampGenerator extends LoggingMonotonicTimestampGenerator {
+
// We're deliberately avoiding an anonymous subclass with initialValue(), because this can introduce
// classloader leaks in managed environments like Tomcat
private final ThreadLocal lastRef = new ThreadLocal();
+ /**
+ * Creates a new instance with a warning threshold and warning interval of one second.
+ *
+ * @see #ThreadLocalMonotonicTimestampGenerator(long, TimeUnit, long, TimeUnit)
+ */
+ public ThreadLocalMonotonicTimestampGenerator() {
+ this(1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Creates a new instance.
+ *
+ * @param warningThreshold how far in the future timestamps are allowed to drift before a warning is logged.
+ * @param warningThresholdUnit the unit for {@code warningThreshold}.
+ * @param warningInterval how often the warning will be logged if timestamps keep drifting above the threshold.
+ * @param warningIntervalUnit the unit for {@code warningIntervalUnit}.
+ */
+ public ThreadLocalMonotonicTimestampGenerator(long warningThreshold, TimeUnit warningThresholdUnit,
+ long warningInterval, TimeUnit warningIntervalUnit) {
+ super(warningThreshold, warningThresholdUnit, warningInterval, warningIntervalUnit);
+ }
+
@Override
public long next() {
Long last = this.lastRef.get();
diff --git a/driver-core/src/main/java/com/datastax/driver/core/TimestampGenerator.java b/driver-core/src/main/java/com/datastax/driver/core/TimestampGenerator.java
index 0deac7e3b6a..7cd09c2438c 100644
--- a/driver-core/src/main/java/com/datastax/driver/core/TimestampGenerator.java
+++ b/driver-core/src/main/java/com/datastax/driver/core/TimestampGenerator.java
@@ -19,11 +19,19 @@
* Generates client-side, microsecond-precision query timestamps.
*
* Given that Cassandra uses those timestamps to resolve conflicts, implementations should generate
- * incrementing timestamps for successive implementations.
+ * monotonically increasing timestamps for successive invocations of {@link #next()}.
*/
public interface TimestampGenerator {
+
/**
* Returns the next timestamp.
+ *
+ * Implementors should enforce increasing monotonicity of timestamps, that is,
+ * a timestamp returned should always be strictly greater that any previously returned
+ * timestamp.
+ *
+ * Implementors should strive to achieve microsecond precision in the best possible way,
+ * which is usually largely dependent on the underlying operating system's capabilities.
*
* @return the next timestamp (in microseconds). If it equals {@link Long#MIN_VALUE}, it won't be
* sent by the driver, letting Cassandra generate a server-side timestamp.
diff --git a/driver-core/src/test/java/com/datastax/driver/core/AtomicMonotonicTimestampGeneratorTest.java b/driver-core/src/test/java/com/datastax/driver/core/AtomicMonotonicTimestampGeneratorTest.java
index 751dcd9f136..9d59bd49d8d 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/AtomicMonotonicTimestampGeneratorTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/AtomicMonotonicTimestampGeneratorTest.java
@@ -20,6 +20,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.testng.annotations.Test;
import java.util.List;
@@ -33,6 +35,7 @@
import static org.testng.Assert.fail;
public class AtomicMonotonicTimestampGeneratorTest {
+
@Test(groups = "unit")
public void should_generate_incrementing_timestamps_for_all_threads() throws InterruptedException {
// Create a generator with a fixed millisecond value
@@ -40,42 +43,53 @@ public void should_generate_incrementing_timestamps_for_all_threads() throws Int
final AtomicMonotonicTimestampGenerator generator = new AtomicMonotonicTimestampGenerator();
generator.clock = new MockClocks.FixedTimeClock(fixedTime);
- // Generate 1000 timestamps shared among multiple threads
- final int testThreadsCount = 2;
- assertEquals(1000 % testThreadsCount, 0);
- final SortedSet allTimestamps = new ConcurrentSkipListSet();
- ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(testThreadsCount));
-
- List> futures = Lists.newArrayListWithExpectedSize(testThreadsCount);
- for (int i = 0; i < testThreadsCount; i++) {
- futures.add(executor.submit(
- new Runnable() {
- @Override
- public void run() {
- for (int i = 0; i < 1000 / testThreadsCount; i++)
- allTimestamps.add(generator.next());
- }
- }));
- }
- executor.shutdown();
- executor.awaitTermination(1, TimeUnit.SECONDS);
+ MemoryAppender appender = new MemoryAppender();
+ Logger logger = Logger.getLogger(TimestampGenerator.class);
+ Level originalLevel = logger.getLevel();
+ logger.setLevel(Level.WARN);
+ logger.addAppender(appender);
try {
- Futures.allAsList(futures).get();
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof AssertionError)
- throw (AssertionError) cause;
- else
- fail("Error in a test thread", cause);
- }
+ // Generate 1000 timestamps shared among multiple threads
+ final int testThreadsCount = 2;
+ assertEquals(1000 % testThreadsCount, 0);
+ final SortedSet allTimestamps = new ConcurrentSkipListSet();
+ ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(testThreadsCount));
+
+ List> futures = Lists.newArrayListWithExpectedSize(testThreadsCount);
+ for (int i = 0; i < testThreadsCount; i++) {
+ futures.add(executor.submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ for (int i = 0; i < 1000 / testThreadsCount; i++)
+ allTimestamps.add(generator.next());
+ }
+ }));
+ }
+ executor.shutdown();
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+
+ try {
+ Futures.allAsList(futures).get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof AssertionError)
+ throw (AssertionError) cause;
+ else
+ fail("Error in a test thread", cause);
+ }
- // Ensure that the 1000 microseconds for the mocked millisecond value have been generated
- int i = 0;
- for (Long timestamp : allTimestamps) {
- Long expected = fixedTime * 1000 + i;
- assertEquals(timestamp, expected);
- i += 1;
+ // Ensure that the 1000 microseconds for the mocked millisecond value have been generated
+ int i = 0;
+ for (Long timestamp : allTimestamps) {
+ Long expected = fixedTime + i;
+ assertEquals(timestamp, expected);
+ i += 1;
+ }
+ } finally {
+ logger.removeAppender(appender);
+ logger.setLevel(originalLevel);
}
}
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/MockClocks.java b/driver-core/src/test/java/com/datastax/driver/core/MockClocks.java
index 03241e44f0a..ef6dc30a424 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/MockClocks.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/MockClocks.java
@@ -21,7 +21,7 @@ static class BackInTimeClock implements Clock {
int calls;
@Override
- public long currentTime() {
+ public long currentTimeMicros() {
return arbitraryTimeStamp - calls++;
}
}
@@ -34,7 +34,7 @@ public FixedTimeClock(long fixedTime) {
}
@Override
- public long currentTime() {
+ public long currentTimeMicros() {
return fixedTime;
}
}
diff --git a/driver-core/src/test/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGeneratorTest.java b/driver-core/src/test/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGeneratorTest.java
index 90738bbef4d..0cd3de6cbc8 100644
--- a/driver-core/src/test/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGeneratorTest.java
+++ b/driver-core/src/test/java/com/datastax/driver/core/ThreadLocalMonotonicTimestampGeneratorTest.java
@@ -16,10 +16,9 @@
package com.datastax.driver.core;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.*;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.testng.annotations.Test;
import java.util.List;
@@ -27,7 +26,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import static org.testng.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
public class ThreadLocalMonotonicTimestampGeneratorTest {
@@ -51,7 +52,7 @@ public void run() {
// Ensure that each thread gets the 1000 microseconds for the mocked millisecond value,
// in order
for (int i = 0; i < 1000; i++)
- assertEquals(generator.next(), fixedTime * 1000 + i);
+ assertEquals(generator.next(), fixedTime + i);
}
}));
}
@@ -71,12 +72,48 @@ public void run() {
@Test(groups = "unit")
public void should_generate_incrementing_timestamps_on_clock_resync() {
- ThreadLocalMonotonicTimestampGenerator generator = new ThreadLocalMonotonicTimestampGenerator();
+ ThreadLocalMonotonicTimestampGenerator generator = new ThreadLocalMonotonicTimestampGenerator(0, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
generator.clock = new MockClocks.BackInTimeClock();
- long beforeClockResync = generator.next();
- long afterClockResync = generator.next();
+ MemoryAppender appender = new MemoryAppender();
+ Logger logger = Logger.getLogger(TimestampGenerator.class);
+ Level originalLevel = logger.getLevel();
+ logger.setLevel(Level.WARN);
+ logger.addAppender(appender);
+ String logFormat = "Clock skew detected: current tick (%d) was %d microseconds " +
+ "behind the last generated timestamp (%d), returned timestamps will be artificially incremented " +
+ "to guarantee monotonicity.";
- assertTrue(beforeClockResync < afterClockResync, "The generated timestamps are not increasing on block resync");
+ try {
+ long start = generator.next();
+ long previous = start;
+ long next = 0;
+ for (int i = 0; i < 1001; i++) {
+ next = generator.next();
+ assertEquals(next, previous + 1);
+ previous = next;
+ }
+
+ // Ensure log statement generated indicating clock skew, but only once.
+ assertEquals(next, start + 1001);
+ assertThat(appender.getNext())
+ .containsOnlyOnce("Clock skew detected:")
+ .containsOnlyOnce(String.format(logFormat, start - 1, 1, start));
+
+ // Wait for a second to see if we get an additional clock skew message.
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ next = generator.next();
+ assertThat(next).isEqualTo(previous + 1);
+ // Clock has gone backwards 1002 us since we've had that many iterations.
+ // The difference should be 2003 (clock backwards 1002 + 1001 prior compute next calls).
+ // Current timestamp should match the previous one.
+ assertThat(appender.getNext())
+ .containsOnlyOnce("Clock skew detected:")
+ .containsOnlyOnce(String.format(logFormat, start - 1002, 2003, previous));
+ } finally {
+ logger.removeAppender(appender);
+ logger.setLevel(originalLevel);
+ }
}
}
diff --git a/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/api/MailboxMessage.java b/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/api/MailboxMessage.java
index 094304de176..7a03ecfe7b4 100644
--- a/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/api/MailboxMessage.java
+++ b/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/api/MailboxMessage.java
@@ -15,26 +15,34 @@
*/
package com.datastax.driver.osgi.api;
-import java.util.Date;
+import java.util.UUID;
public class MailboxMessage {
private String recipient;
- private Date date;
private String sender;
private String body;
+ private UUID date;
- public MailboxMessage(String recipient, Date date, String sender, String body) {
+ public MailboxMessage(String recipient, String sender, String body) {
+ this(recipient, sender, body, null);
+ }
+
+ public MailboxMessage(String recipient, String sender, String body, UUID date) {
this.recipient = recipient;
- this.date = date;
this.sender = sender;
this.body = body;
+ this.date = date;
}
public String getRecipient() {
return recipient;
}
- public Date getDate() {
+ public void setDate(UUID date) {
+ this.date = date;
+ }
+
+ public UUID getDate() {
return date;
}
diff --git a/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java b/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java
index 3b1c3eab424..4399ca74c31 100644
--- a/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java
+++ b/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/Activator.java
@@ -15,6 +15,7 @@
*/
package com.datastax.driver.osgi.impl;
+import com.datastax.driver.core.AtomicMonotonicTimestampGenerator;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import com.datastax.driver.osgi.api.MailboxService;
@@ -40,7 +41,9 @@ public void start(BundleContext context) throws Exception {
keyspace = "mailbox";
}
- cluster = Cluster.builder().addContactPoints(contactPoints).build();
+ cluster = Cluster.builder().addContactPoints(contactPoints)
+ .withTimestampGenerator(new AtomicMonotonicTimestampGenerator())
+ .build();
Session session = cluster.connect();
MailboxImpl mailbox = new MailboxImpl(session, keyspace);
diff --git a/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/MailboxImpl.java b/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/MailboxImpl.java
index 50fda69bd34..24eb82c8dd4 100644
--- a/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/MailboxImpl.java
+++ b/driver-examples/osgi/src/main/java/com/datastax/driver/osgi/impl/MailboxImpl.java
@@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Date;
import java.util.UUID;
import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
@@ -94,11 +93,10 @@ public Collection getMessages(String recipient) throws MailboxEx
Collection messages = new ArrayList();
for (Row input : result) {
- Date date = new Date(UUIDs.unixTimestamp(input.getUUID("time")));
messages.add(new MailboxMessage(input.getString("recipient"),
- date,
input.getString("sender"),
- input.getString("body")));
+ input.getString("body"),
+ input.getUUID("time")));
}
return messages;
} catch (Exception e) {
@@ -109,8 +107,7 @@ public Collection getMessages(String recipient) throws MailboxEx
@Override
public UUID sendMessage(MailboxMessage message) throws MailboxException {
try {
- UUID time = UUIDs.startOf(message.getDate().getTime());
-
+ UUID time = UUIDs.timeBased();
BoundStatement statement = new BoundStatement(insertStatement);
statement.setString(0, message.getRecipient());
statement.setUUID(1, time);
diff --git a/driver-examples/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceIT.java b/driver-examples/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceIT.java
index 643508ae03b..2454909b008 100644
--- a/driver-examples/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceIT.java
+++ b/driver-examples/osgi/src/test/java/com/datastax/driver/osgi/MailboxServiceIT.java
@@ -31,7 +31,7 @@
import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.GregorianCalendar;
+import java.util.UUID;
import static com.datastax.driver.osgi.VersionProvider.projectVersion;
import static org.ops4j.pax.exam.CoreOptions.*;
@@ -178,9 +178,10 @@ public void service_api_functional() throws MailboxException {
try {
Collection inMessages = new ArrayList();
for (int i = 0; i < 30; i++) {
- MailboxMessage message = new MailboxMessage(recipient, new GregorianCalendar(2015, 1, i).getTime(), recipient, "" + i);
+ MailboxMessage message = new MailboxMessage(recipient, recipient, "" + i);
+ UUID time = service.sendMessage(message);
+ message.setDate(time);
inMessages.add(message);
- service.sendMessage(message);
}
Collection messages = service.getMessages(recipient);
diff --git a/manual/query_timestamps/README.md b/manual/query_timestamps/README.md
index 5db407499e1..e37f3dcbdac 100644
--- a/manual/query_timestamps/README.md
+++ b/manual/query_timestamps/README.md
@@ -59,6 +59,62 @@ session.execute(statement);
[tsg]: http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/TimestampGenerator.html
+The driver ships with two implementations of `TimestampGenerator`:
+
+1. [AtomicMonotonicTimestampGenerator][amtsg], which guarantess monotonicity of timestamps for all threads;
+2. [ThreadLocalMonotonicTimestampGenerator][tlmtsg], which guarantees per-thread monotonicity of timestamps.
+
+There is less contention using `ThreadLocalMonotonicTimestampGenerator`, but beware
+that there is a risk of timestamp collision with this generator when accessed by more than one
+thread; only use it when threads are not in direct competition for timestamp ties (i.e., they are executing
+independent statements).
+
+[amtsg]: http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/AtomicMonotonicTimestampGenerator.html
+[tlmtsg]: http://docs.datastax.com/en/drivers/java/2.1/com/datastax/driver/core/ThreadLocalMonotonicTimestampGenerator.html
+
+#### Accuracy
+
+Both implementations strive to achieve microsecond resolution on a best-effort basis.
+But in practice, the real accuracy of generated timestamps is largely dependent on the
+granularity of the underlying operating system's clock.
+
+For most systems, this minimum granularity is millisecond, and
+the sub-millisecond part of generated timestamps is simply a counter that gets incremented
+until the next clock tick, as provided by `System.currentTimeMillis()`.
+
+On some systems, however, it is possible to have a better granularity by using a [JNR]
+call to [gettimeofday]. This native call will be used when available, unless the system
+property `com.datastax.driver.USE_NATIVE_CLOCK` is explicitly set to `false`.
+
+To check what's available on your system:
+
+* make sure your `Cluster` uses a `TimestampGenerator`;
+* [configure your logging framework](../logging/) to use level `INFO` for the category
+ `com.datastax.driver.core.ClockFactory`;
+* look for one of the following messages at startup:
+
+ ```
+ INFO com.datastax.driver.core.ClockFactory - Using java.lang.System clock to generate timestamps
+ INFO com.datastax.driver.core.ClockFactory - Using native clock to generate timestamps
+ ```
+
+[gettimeofday]: http://man7.org/linux/man-pages/man2/settimeofday.2.html
+[JNR]: https://github.com/jnr/jnr-ffi
+
+#### Monotonicity
+
+The aforementioned implementations also guarantee
+that returned timestamps will always be monotonically increasing, even if multiple updates
+happen under the same millisecond.
+
+Note that to guarantee such monotonicity, if more than one timestamp is generated
+within the same microsecond, or in the event of a system clock skew, _both implementations might
+return timestamps that drift out in the future_.
+
+When this happens, the built-in generators log a periodic warning message in the category
+`com.datastax.driver.core.TimestampGenerator`. See their non-default constructors for ways to control the warning
+interval.
+
### Summary
As shown in the previous sections, there are multiple ways to provide a
diff --git a/pom.xml b/pom.xml
index 4989383ca4b..f97d8d4b5f2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
1.0.5
1.2.0
2.1.4
+ 2.0.7
6.8.8
1.7.0