Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- [bug] JAVA-1070: The Mapper should not prepare queries synchronously.
- [new feature] JAVA-982: Introduce new method ConsistencyLevel.isSerial().
- [bug] JAVA-764: Retry with the normal consistency level (not the serial one) when a write times out on the Paxos phase.
- [bug] JAVA-727: Allow monotonic timestamp generators to drift in the future + use microsecond precision when possible.

Merged from 2.0 branch:

Expand Down
17 changes: 15 additions & 2 deletions driver-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<version>${metrics.version}</version>
</dependency>

<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-ffi</artifactId>
<version>${jnr-ffi.version}</version>
</dependency>

<!-- Compression libraries for the protocol. -->
<!-- Each of them is only a mandatory runtime dependency if you want to use the compression it offers -->

Expand Down Expand Up @@ -170,7 +176,9 @@
<Bundle-SymbolicName>com.datastax.driver.core</Bundle-SymbolicName>
<Bundle-Version>${project.version}</Bundle-Version>
<_include>-osgi.bnd</_include>
<Import-Package><![CDATA[com.google.common*;version="[14.0,19)",*]]></Import-Package>
<Import-Package>
<!-- JNR does not provide OSGi bundles, so exclude it; the driver can live without it -->
<![CDATA[com.google.common*;version="[14.0,19)",!jnr.*,*]]></Import-Package>
</instructions>
<supportedProjectTypes>
<supportedProjectType>jar</supportedProjectType>
Expand All @@ -189,7 +197,12 @@
<configuration>
<manifestLocation>${project.build.directory}/META-INF-shaded</manifestLocation>
<instructions>
<Import-Package><![CDATA[com.google.common.*;version="[14.0,19)",!io.netty.*,javax.security.cert,*]]></Import-Package>
<Import-Package>
<!--
JNR does not provide OSGi bundles, so exclude it; the driver can live without it
Explicitly import javax.security.cert because it's required by Netty, but Netty has been explicitly excluded
-->
<![CDATA[com.google.common.*;version="[14.0,19)",!jnr.*,!io.netty.*,javax.security.cert,*]]></Import-Package>
<Private-Package>com.datastax.shaded.*</Private-Package>
</instructions>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (C) 2012-2015 DataStax Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.driver.core;

import com.google.common.annotations.VisibleForTesting;

/**
* Base implementation for monotonic timestamp generators.
* <p/>
* The accuracy of the generated timestamps is largely dependent on the
* granularity of the underlying operating system's clock.
* <p/>
* Generally speaking, this granularity is millisecond, and
* the sub-millisecond part is simply a counter that gets incremented
* until the next clock tick, as provided by {@link System#currentTimeMillis()}.
* <p/>
* On some systems, however, it is possible to have a better granularity by using a JNR
* call to {@code gettimeofday}. The driver will use this system call automatically whenever
* available, unless the system property {@code com.datastax.driver.USE_NATIVE_CLOCK} is
* explicitly set to {@code false}.
* <p/>
* Beware that to guarantee monotonicity, if more than one call to {@link #next()}
* is made within the same microsecond, or in the event of a system clock skew, this generator might
* return timestamps that drift out in the future.
* Whe this happens, {@link #onDrift(long, long)} is invoked.
*/
public abstract class AbstractMonotonicTimestampGenerator implements TimestampGenerator {

@VisibleForTesting
volatile Clock clock = ClockFactory.newInstance();

/**
* Compute the next timestamp, given the last timestamp previously generated.
* <p/>
* To guarantee monotonicity, the next timestamp should be strictly greater than the last one.
* If the underlying clock fails to generate monotonically increasing timestamps, the generator will simply
* increment the previous timestamp, and {@link #onDrift(long, long)} will be invoked.
* <p/>
* This implementation is inspired by {@code org.apache.cassandra.service.ClientState#getTimestamp()}.
*
* @param last the last timestamp generated by this generator, in microseconds.
* @return the next timestamp to use, in microseconds.
*/
protected long computeNext(long last) {
long currentTick = clock.currentTimeMicros();
if (last >= currentTick) {
onDrift(currentTick, last);
return last + 1;
}
return currentTick;
}

/**
* Called when generated timestamps drift into the future compared to the underlying clock (in other words, if
* {@code lastTimestamp >= currentTick}).
* <p/>
* This could happen if timestamps are requested faster than the clock granularity, or on a clock skew (for example
* because of a leap second).
*
* @param currentTick the current clock tick, in microseconds.
* @param lastTimestamp the last timestamp that was generated, in microseconds.
*/
protected abstract void onDrift(long currentTick, long lastTimestamp);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,41 @@
*/
package com.datastax.driver.core;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* A timestamp generator based on {@code System.currentTimeMillis()}, with an incrementing atomic counter
* to generate the sub-millisecond part.
* <p/>
* This implementation guarantees incrementing timestamps among all client threads, provided that no more than
* 1000 are requested for a given clock tick (the exact granularity of of {@link System#currentTimeMillis()}
* depends on the operating system).
* <p/>
* If that rate is exceeded, a warning is logged and the timestamps don't increment anymore until the next clock
* tick. If you consistently exceed that rate, consider using {@link ThreadLocalMonotonicTimestampGenerator}.
* A timestamp generator that guarantees monotonically increasing timestamps among all client threads, and logs warnings
* when timestamps drift in the future.
*
* @see AbstractMonotonicTimestampGenerator
*/
public class AtomicMonotonicTimestampGenerator extends AbstractMonotonicTimestampGenerator {
public class AtomicMonotonicTimestampGenerator extends LoggingMonotonicTimestampGenerator {

private AtomicLong lastRef = new AtomicLong(0);

/**
* Creates a new instance with a warning threshold and warning interval of one second.
*
* @see #AtomicMonotonicTimestampGenerator(long, TimeUnit, long, TimeUnit)
*/
public AtomicMonotonicTimestampGenerator() {
this(1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);
}

/**
* Creates a new instance.
*
* @param warningThreshold how far in the future timestamps are allowed to drift before a warning is logged.
* @param warningThresholdUnit the unit for {@code warningThreshold}.
* @param warningInterval how often the warning will be logged if timestamps keep drifting above the threshold.
* @param warningIntervalUnit the unit for {@code warningIntervalUnit}.
*/
public AtomicMonotonicTimestampGenerator(long warningThreshold, TimeUnit warningThresholdUnit,
long warningInterval, TimeUnit warningIntervalUnit) {
super(warningThreshold, warningThresholdUnit, warningInterval, warningIntervalUnit);
}

@Override
public long next() {
while (true) {
Expand Down
113 changes: 104 additions & 9 deletions driver-core/src/main/java/com/datastax/driver/core/Clock.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,120 @@
*/
package com.datastax.driver.core;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.*;

/**
* This interface allows us not to have a direct call to {@code System.currentTimeMillis()} for testing purposes
* A small abstraction around system clock that aims to provide microsecond precision with the best accuracy possible.
*/
interface Clock {

/**
* Returns the current time in milliseconds
* Returns the current time in microseconds.
*
* @return the difference, measured in milliseconds, between the current time and midnight, January 1, 1970 UTC.
* @see System#currentTimeMillis()
* @return the difference, measured in microseconds, between the current time and and the Epoch
* (that is, midnight, January 1, 1970 UTC).
*/
long currentTime();
long currentTimeMicros();
}

/**
* Factory that returns the best Clock implementation depending on what native libraries are available in the system.
* If LibC is available through JNR, and if the system property {@code com.datastax.driver.USE_NATIVE_CLOCK} is set to {@code true}
* (which is the default value), then {@link NativeClock} is returned, otherwise {@link SystemClock} is returned.
*/
class ClockFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(ClockFactory.class);

private static final String USE_NATIVE_CLOCK_SYSTEM_PROPERTY = "com.datastax.driver.USE_NATIVE_CLOCK";

static Clock newInstance() {
if (Native.isLibCLoaded() && SystemProperties.getBoolean(USE_NATIVE_CLOCK_SYSTEM_PROPERTY, true)) {
LOGGER.info("Using native clock to generate timestamps.");
return new NativeClock();
} else {
LOGGER.info("Using java.lang.System clock to generate timestamps.");
return new SystemClock();
}
}

}

/**
* Default implementation of a clock that delegate its calls to the system clock.
* Default implementation of a clock that delegates its calls to the system clock.
*
* @see System#currentTimeMillis()
*/
class SystemClock implements Clock {

@Override
public long currentTime() {
return System.currentTimeMillis();
public long currentTimeMicros() {
return System.currentTimeMillis() * 1000;
}
}

}

/**
* Provides the current time with microseconds precision with some reasonable accuracy through
* the use of {@link Native#currentTimeMicros()}.
* <p/>
* Because calling JNR methods is slightly expensive,
* we only call it once per second and add the number of nanoseconds since the last call
* to get the current time, which is good enough an accuracy for our purpose (see CASSANDRA-6106).
* <p/>
* This reduces the cost of the call to {@link NativeClock#currentTimeMicros()} to levels comparable
* to those of a call to {@link System#currentTimeMillis()}.
*/
class NativeClock implements Clock {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comes from Sylvain Lebresne's suggestion in CASSANDRA-6106.


private static final long ONE_SECOND_NS = NANOSECONDS.convert(1, SECONDS);
private static final long ONE_MILLISECOND_NS = NANOSECONDS.convert(1, MILLISECONDS);

/**
* Records a time in micros along with the System.nanoTime() value at the time the
* time is fetched.
*/
private static class FetchedTime {

private final long timeInMicros;
private final long nanoTimeAtCheck;

private FetchedTime(long timeInMicros, long nanoTimeAtCheck) {
this.timeInMicros = timeInMicros;
this.nanoTimeAtCheck = nanoTimeAtCheck;
}
}

private final AtomicReference<FetchedTime> lastFetchedTime = new AtomicReference<FetchedTime>(fetchTimeMicros());

@Override
public long currentTimeMicros() {
FetchedTime spec = lastFetchedTime.get();
long curNano = System.nanoTime();
if (curNano > spec.nanoTimeAtCheck + ONE_SECOND_NS) {
lastFetchedTime.compareAndSet(spec, spec = fetchTimeMicros());
}
return spec.timeInMicros + ((curNano - spec.nanoTimeAtCheck) / 1000);
}

private static FetchedTime fetchTimeMicros() {
// To compensate for the fact that the Native.currentTimeMicros call could take
// some time, instead of picking the nano time before the call or after the
// call, we take the average of both.
long start = System.nanoTime();
long micros = Native.currentTimeMicros();
long end = System.nanoTime();
// If it turns out the call took us more than 1 millisecond (can happen while
// the JVM warms up, unlikely otherwise, but no reasons to take risks), fall back
// to System.currentTimeMillis() temporarily
if ((end - start) > ONE_MILLISECOND_NS)
return new FetchedTime(System.currentTimeMillis() * 1000, System.nanoTime());
return new FetchedTime(micros, (end + start) / 2);
}

}
Loading